evmRpcLatencyWatcher subscribes to RPC endpoints via WebSocket and measures when blocks arrive at the Portal versus when they appear at the RPC endpoints.
The measured values include client-side network data freshness. For RPC endpoints, only the arrival time of blocks is measured—this does not capture the node’s internal processing or response latency if queried directly. Results represent end-to-end delays as experienced by the client, not pure Portal or RPC processing performance.
import { formatBlock } from "@subsquid/pipes";
import { evmPortalSource, evmRpcLatencyWatcher } from "@subsquid/pipes/evm";
import { metricsServer } from "@subsquid/pipes/metrics/node";
async function main() {
const stream = evmPortalSource({
portal: "https://portal.sqd.dev/datasets/base-mainnet",
outputs: evmDecoder({ range: { from: 'latest' }, events: {} }),
metrics: metricsServer({ port: 9090 }),
}).pipe(
evmRpcLatencyWatcher({
rpcUrl: ["https://base.drpc.org", "https://base-rpc.publicnode.com"],
}).pipe((data, { metrics }) => {
if (!data) return;
// Update Prometheus metrics for each RPC endpoint
for (const rpc of data.rpc) {
metrics
.gauge({
name: "rpc_latency_ms",
help: "RPC Latency in ms",
labelNames: ["url"],
})
.set({ url: rpc.url }, rpc.portalDelayMs);
}
return data;
})
);
for await (const { data } of stream) {
if (!data) continue;
console.log(`Block: ${formatBlock(data.number)} / ${data.timestamp}`);
console.table(data.rpc);
}
}
void main()
Output format
Data freshness data includes:url: RPC endpoint URLreceivedAt: Timestamp when the RPC endpoint received the blockportalDelayMs: Milliseconds between RPC arrival and Portal availability
Block: 36,046,611 / Fri Sep 26 2025 14:29:29 GMT+0400
┌───┬─────────────────────────────────┬──────────────────────────┬───────────────┐
│ │ url │ receivedAt │ portalDelayMs │
├───┼─────────────────────────────────┼──────────────────────────┼───────────────┤
│ 0 │ https://base.drpc.org │ 2025-09-26T10:29:29.134Z │ 646 │
│ 1 │ https://base-rpc.publicnode.com │ 2025-09-26T10:29:29.130Z │ 642 │
└───┴─────────────────────────────────┴──────────────────────────┴───────────────┘
Custom metrics integration
Export data freshness metrics to Prometheus or other monitoring systems.import { solanaPortalSource, solanaRpcLatencyWatcher } from '@subsquid/pipes/solana'
import { Registry, Counter } from 'prom-client'
const registry = new Registry()
const latencyCounter = new Counter({
name: 'portal_latency_ms_total',
help: 'Total Portal latency in milliseconds',
registers: [registry],
})
const stream = solanaPortalSource({
portal: 'https://portal.sqd.dev/datasets/solana-mainnet',
outputs: new SolanaQueryBuilder()
.addFields({ block: { number: true, hash: true, timestamp: true } })
.includeAllBlocks({ from: 'latest' })
.build(),
}).pipe(
solanaRpcLatencyWatcher({
rpcUrl: ['https://api.mainnet-beta.solana.com'],
}).pipe({
transform: (data) => {
if (!data) return
// Record latency metrics
for (const rpc of data.rpc) {
latencyCounter.inc(rpc.portalDelayMs)
}
return data
},
}),
)
// Expose metrics endpoint
import http from 'http'
const server = http.createServer(async (req, res) => {
if (req.url === '/metrics') {
res.setHeader('Content-Type', registry.contentType)
res.end(await registry.metrics())
} else {
res.end('OK')
}
})
server.listen(9090)
for await (const { data } of stream) {
// Process blocks
}