# Feedback If you encounter incorrect, outdated, or confusing documentation on any page, submit feedback: POST https://docs.sqd.dev/feedback ```json { "path": "/current-page-path", "feedback": "Description of the issue" } ``` Only submit feedback when you have something specific and actionable to report. # Home Source: https://docs.sqd.dev/en/home Build blockchain data pipelines with SQD — Portal HTTP APIs across 225+ chains, Squid and Pipes SDKs in TypeScript, and managed Cloud hosting for indexers.
The new blockchain data standard

High-Throughput Blockchain Data Access. Finally, an Alternative to RPC

Portal is a high-performance HTTP API for querying blockchain data at scale. With native finality, real-time streaming, and deep historical access. Query arbitrary block ranges in a single request with automatic reorg handling.

Arbitrary block ranges
Query any block range in a single request, no manual pagination needed, even on high-throughput chains
Native finalization
Automatic chain-specific finality & reorg handling. Skip the custom rollback logic.
Streaming responses
Ingest high-throughput data ingestion with constant memory footprint
No vendor lock-in
Self-host or use decentralized infrastructure. Run the same code anywhere.
Build with Portal API Use with SDK

The Complete Data Stack

Four integrated products that work together to power your blockchain data infrastructure

Portal SDK Cloud Network
Portal - HTTP API for blockchain data

Portal

HTTP API for raw blockchain data with arbitrary ranges, streaming, and finality handling

Learn more →
SDK - TypeScript libraries for data transformation

SDK

TypeScript libraries for data transformation, decoding, and persistence to any database

Learn more →
Cloud - Managed deployment platform

Cloud

Managed deployment platform with monitoring, scaling, and zero DevOps required

Learn more →
Network - Decentralized data lake

Network

Decentralized data lake powering Portal with 200+ networks and self-hosting options

Learn more →

Query the blockchain instantly

SQD Portal provides an HTTP API for extracting blockchain data with arbitrary block ranges, streaming, and reorg handling

200+ Networks Supported

Access data from all major blockchains through Portal

Ethereum - EVM compatible chains Solana - High-performance blockchain Base - Coinbase L2 on Ethereum Arbitrum - Ethereum L2 scaling solution Binance Smart Chain - BSC Plasma - High-performance blockchain Monad - Next-gen parallel EVM HyperEVM - High-performance blockchain MegaETH - Ultra-fast real-time blockchain HyperCore - Hyperliquid high-performance blockchain Bitcoin - The original blockchain View All Networks - 200+ supported blockchains

Production Use Cases

Leading Web3 projects trust SQD to power their blockchain data infrastructure

PancakeSwap case study
DeFi

PancakeSwap

Large-scale multichain data indexing for top DEX

GMX case study
DeFi

GMX

Accelerated cross-chain indexing for decentralized exchange

ChainSafe case study
Gaming

ChainSafe

Seamless game development across multiple chains

RAILGUN case study
Privacy

RAILGUN

Crosschain privacy with trace data indexing

Hydration case study
DeFi

Hydration

Data-powered growth for liquidity protocol

Polimec case study
DeFi

Polimec

Compliant DeFi with SQD data infrastructure

Skybreach case study
Gaming

Skybreach

SQD data access powering blockchain games

Fantasy.Top case study
Gaming

Fantasy.Top

Seamless social gaming experience with real-time data

Chillwhales case study
Social

Chillwhales

Expanding NFT browsing on LUKSO with SQD

Levr.Bet case study
Social

Levr.Bet

Real-time sports betting on Monad powered by SQD

CoolWallet case study
Social

CoolWallet

Securing data back-end infrastructure with SQD

Interface case study
Social

Interface

Enabling people to explore Ethereum with SQD

Guru Network case study
AI

Guru Network

Powering AI processors with blockchain data from SQD

# Getting Started with Portal Source: https://docs.sqd.dev/en/portal/migration Choose your SQD Portal migration path — migrate Cloud squids, develop locally with the Portal API, or self-host a Portal instance for full control. Portal provides access to blockchain data from the permissionless SQD Network. Whether you're migrating existing squids or setting up Portal for the first time, choose the path that matches your deployment scenario. ## Choose Your Setup Path Select the option that best describes your situation: Migrate your Cloud squids from gateways to Portal for improved performance and stability. Migrate EVM SDK-based squids running locally or self-hosted from gateways to Portal. Migrate Solana SDK-based squids from gateways and RPC to Portal. Migrate your EVM indexer from RPC-based real-time data to Portal API. Set up Portal locally for development and testing with your own devnet or testnet. Run your own Portal instance for complete control over your data infrastructure. ## Cloud Migration by Network Type If you're using SQD Cloud, choose your network type: Migrate squids running on EVM or Substrate networks to Cloud Portal. Migrate Solana squids to Portal with real-time data support. ## Why Migrate to Portal? Migrating to Portal provides significant benefits over traditional gateways and centralized services. * **Reduced reliance on centralized services:** The permissionless SQD Network consists of [over 2500](https://arbiscan.io/address/0x36e2b147db67e76ab67a4d07c293670ebefcae4e#readContract#F6) nodes [ran by independent operators](subsquid-network/worker) * **Improved stability:** With a total capacity of roughly 2Pb, the permissionless SQD Network provides significant redundancy * **Improved speed:** Portals use available bandwidth more effectively than gateways. Data fetching is 5-10 times faster in our tests * **Future-proof:** All future development will focus on portals and the permissionless SQD Network ## Additional Resources Complete Portal API documentation with examples and field definitions. Learn about Portal architecture and core capabilities. # Choosing Your SQD Tool Source: https://docs.sqd.dev/en/sdk/options-comparison Compare the Portal API, Pipes SDK, and Squid SDK to choose the right SQD blockchain indexing tool — features, complexity, and use-case fit. This guide helps you understand the differences between SQD's three offerings and choose the right tool for your project. ## Overview: Three Ways to Access Blockchain Data SQD provides three tools for working with blockchain data, each serving different use cases: 1. **Portal API** - Direct HTTP access to raw blockchain data 2. **Pipes SDK** - Lightweight streaming library for custom data pipelines 3. **Squid SDK** - Complete framework with built-in PostgreSQL and GraphQL ## How They Work Together Portal data flow showing raw blockchain data being transformed by SDK and stored in your database * **Portal** provides the raw blockchain data through HTTP API * **SDKs** (Pipes and Squid) use Portal as their data source and add: * Event and transaction decoding * Type-safe data transformation * Batch processing capabilities * Database persistence * Real-time data ingestion Both SDKs use Portal under the hood. When you use an SDK, you're still accessing blockchain data through Portal, the SDK just makes it easier to work with. ## Raw API vs SDKs Portal and SDKs serve different use cases in the blockchain data indexing pipeline. ### Portal: Direct Data Access Portal provides raw blockchain data access through a simple HTTP API. You query specific blocks, transactions, logs, or traces and receive the raw data directly. ```bash Portal Query Example theme={"system"} curl --compress -X POST 'https://portal.sqd.dev/datasets/ethereum-mainnet/stream' \ -H 'Content-Type: application/json' \ -d '{ "type": "evm", "fromBlock": 18000000, "toBlock": 18001000, "fields": { "log": { "address": true, "topics": true, "data": true } }, "logs": [{ "address": ["0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"] }] }' ``` ```python Portal Python Example theme={"system"} import requests import json url = "https://portal.sqd.dev/datasets/ethereum-mainnet/stream" response = requests.post(url, json={ "type": "evm", "fromBlock": 18000000, "toBlock": 18001000, "fields": {"log": {"address": True, "topics": True}}, "logs": [{"address": ["0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"]}] }) for line in response.text.strip().split('\n'): data = json.loads(line) # Process raw blockchain data ``` ### SDKs: Data Transformation Frameworks SDKs (Pipes and Squid) build on top of Portal to provide data transformation, decoding, and persistence capabilities. ```typescript Pipes SDK Example theme={"system"} import { createTarget } from "@subsquid/pipes"; import { evmPortalSource, EvmQueryBuilder } from "@subsquid/pipes/evm"; const queryBuilder = new EvmQueryBuilder() .addFields({ block: { number: true, hash: true }, log: { data: true } }) .addLog({ request: { address: ["0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"] }, range: { from: 20000000, to: 20001000 } }); const source = evmPortalSource({ portal: "https://portal.sqd.dev/datasets/ethereum-mainnet", query: queryBuilder }); const target = createTarget({ write: async ({ read }) => { for await (const { data } of read()) { // Transform and persist to your database } } }); await source.pipeTo(target); ``` ```typescript Squid SDK Example theme={"system"} import { EvmBatchProcessor } from "@subsquid/evm-processor"; import { TypeormDatabase } from "@subsquid/typeorm-store"; const processor = new EvmBatchProcessor() .setGateway("https://v2.archive.subsquid.io/network/ethereum-mainnet") .addLog({ address: ["0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"], topic0: [usdcAbi.events.Transfer.topic] }); const db = new TypeormDatabase(); processor.run(db, async (ctx) => { const transfers: Transfer[] = []; for (let block of ctx.blocks) { for (let log of block.logs) { let { from, to, value } = usdcAbi.events.Transfer.decode(log); transfers.push(new Transfer({ from, to, value })); } } await ctx.store.insert(transfers); }); ``` ### Detailed Comparison | Aspect | Portal | SDKs (Pipes/Squid) | | ----------------------- | --------------------------------------- | ---------------------------------------- | | **Purpose** | Raw data retrieval | Data transformation and persistence | | **Output** | Raw blockchain data (JSON) | Processed data in databases/APIs | | **Performance** | 10-50x faster than RPC | Additional processing overhead | | **Setup Complexity** | Minimal (HTTP requests only) | Project scaffolding, TypeScript required | | **Data Decoding** | Manual (you handle ABI decoding) | Built-in ABI decoding | | **Database** | Bring your own | PostgreSQL (Squid), custom (Pipes) | | **GraphQL API** | Not included | Auto-generated (Squid only) | | **Type Safety** | Limited (JSON responses) | Full TypeScript support | | **State Management** | Manual | Built-in with fork handling | | **Best For** | Analytics, data lakes, custom pipelines | dApps, APIs, traditional backends | | **Languages Supported** | Any (HTTP API) | TypeScript | | **Learning Curve** | Low (HTTP + your language) | Medium (TypeScript + framework concepts) | ### When to Use Portal Stream raw data directly into analytics platforms like ClickHouse, BigQuery, or Snowflake Build data pipelines in Python, Go, Rust, or any language with HTTP support Populate data warehouses with raw blockchain data for long-term analysis Quick experiments without setting up a full indexing framework ### When to Use SDKs Build GraphQL APIs that power decentralized applications Decode events, track relationships, and maintain state across blocks Deploy scalable indexers with built-in monitoring and deployment tools Leverage TypeScript for compile-time safety and better developer experience ## Pipes SDK vs Squid SDK Both SDKs consume data from Portal but offer different levels of abstraction and flexibility. ### Architecture Differences **Streaming Library Approach** Pipes SDK is a lightweight streaming library that gives you maximum flexibility: * You define the data flow using a pipe-and-target pattern * You bring your own database and persistence logic * You control exactly how data is processed and stored * No CLI tools or scaffolding - integrate into existing projects ```typescript theme={"system"} // Minimal setup - you control everything const source = evmPortalSource({ portal: url, query }); const target = createTarget({ write: yourCustomLogic }); await source.pipeTo(target); ``` **Complete Framework Approach** Squid SDK is a full-featured framework with built-in conventions: * CLI tools for project scaffolding and management * PostgreSQL database with automatic migrations * Auto-generated GraphQL API from schema * Built-in deployment tools for SQD Cloud ```typescript theme={"system"} // Opinionated structure with built-in features const processor = new EvmBatchProcessor() .setGateway(url) .addLog({ ... }); const db = new TypeormDatabase(); processor.run(db, async (ctx) => { // Framework handles state, rollbacks, persistence }); ``` ### Feature Comparison Matrix | Feature | Pipes SDK | Squid SDK | | --------------------- | ----------------------------- | ------------------------------------- | | **Type** | Streaming library | Complete framework | | **Installation** | `npm install @subsquid/pipes` | `npm i -g @subsquid/cli` | | **Project Setup** | Manual integration | CLI scaffolding (`sqd init`) | | **Database** | Bring your own (any) | PostgreSQL (built-in) | | **GraphQL API** | Not included | Auto-generated from schema | | **Migrations** | You implement | Auto-generated | | **Type Generation** | Manual | Auto-generated from ABI and schema | | **Event Decoding** | Built-in codec | Built-in codec | | **State Rollbacks** | You implement | Built-in fork handling | | **CLI Tools** | None | Full CLI suite | | **Cloud Deployment** | Manual | `sqd deploy` command | | **Local Development** | Standard Node.js | Docker Compose included | | **Data Targets** | Custom (you implement) | TypeORM, BigQuery, CSV, JSON, Parquet | | **Real-time Support** | Yes (streaming) | Yes (unfinal blocks) | | **Best For** | Custom pipelines, flexibility | Full-stack dApps, rapid development | | **Learning Curve** | Lower (simpler API) | Higher (more concepts) | | **Bundle Size** | Smaller | Larger (includes framework) | | **Customization** | Maximum flexibility | Opinionated but extensible | ### Decision Matrix You should use Pipes SDK if: * ✅ You want maximum flexibility and control * ✅ You're integrating into an existing codebase * ✅ You want to use a specific database (MongoDB, ClickHouse, etc.) * ✅ You don't need a GraphQL API * ✅ You prefer lightweight dependencies * ✅ You want to build custom data pipelines * ✅ You're experienced with database design and management **Example use cases:** * Real-time dashboards with custom databases * Data pipelines feeding multiple systems * Microservices that need blockchain data * Custom analytics engines You should use Squid SDK if: * ✅ You want a complete solution with minimal setup * ✅ You need a GraphQL API * ✅ You're building a dApp backend * ✅ You want automatic type generation * ✅ You prefer PostgreSQL * ✅ You need built-in deployment tools * ✅ You want comprehensive documentation and examples **Example use cases:** * dApp backends with GraphQL APIs * NFT marketplaces * DeFi analytics platforms * Governance tools ## Why We Built Pipes SDK Pipes SDK represents a fundamental rethinking of how blockchain indexing SDKs should work, informed by years of experience with Squid SDK 1.0 and feedback from the developer community. ### Challenges with Squid SDK 1.0 While Squid SDK 1.0 successfully served many production use cases, we identified several architectural limitations that hindered developer experience and community adoption: Although designed with modularity in mind, many components became tightly coupled in practice. This made it extremely difficult to replace or extend parts of the system without understanding the entire SDK's internal workings, creating a steep learning curve for developers attempting customization. Insufficient documentation and internal test coverage made introducing changes risky and time-consuming. Developers faced significant friction when trying to extend or modify the SDK's behavior. The SDK was built to function as a standalone process, making it challenging to embed into existing applications or workflows. This architectural decision limited flexibility for teams wanting to integrate blockchain indexing into their existing codebases. The framework enforced a very opinionated development style with limited room for customization. Developers often found themselves constrained by the SDK's assumptions rather than supported by its abstractions. The absence of a built-in testing framework for client applications increased friction for developers who needed to validate their integrations and business logic. These combined limitations significantly hindered external contributions, resulting in minimal meaningful engagement from the developer community. ### Pipes SDK Design Goals Pipes SDK was built from the ground up to address these challenges and provide a modern, flexible foundation for blockchain data indexing: Enable developers to concentrate on application-specific business logic rather than dealing with low-level blockchain implementation details. Promote code sharing and maintainability by extracting common functionality into reusable, composable packages. Provide ready-to-use extensions for common tasks including Portal caching layers, factory contract handling, and database integrations (PostgreSQL, ClickHouse, Kafka). Design a plugin system that makes it easy for developers to create and share extensions, fostering a vibrant ecosystem of community contributions. Include first-class support for custom metrics, profiling tools, and centralized logging services to help developers monitor and optimize their indexers. Ensure compatibility with modern JavaScript runtimes like Bun for faster development and improved performance. Pipes SDK maintains backward compatibility with Squid SDK's data sources (Portal) while offering a completely redesigned developer experience focused on flexibility, composability, and ease of use. ## Key Benefits by Tool ### Portal API Benefits * **Language Agnostic** - Use any programming language with HTTP support * **Minimal Setup** - Start querying in minutes with simple HTTP requests * **Maximum Control** - Full control over data processing and storage * **High Performance** - 10-50x faster than RPC for historical data ### Pipes SDK Benefits * **Lightweight** - Minimal dependencies and bundle size * **Flexible** - Use any database or data target * **Streaming** - Built-in backpressure handling and memory efficiency * **Type-Safe** - Full TypeScript support with auto-generated types ### Squid SDK Benefits * **Complete Solution** - Everything you need in one framework * **Rapid Development** - CLI tools and scaffolding for quick starts * **Auto-Generated APIs** - GraphQL API generated from your schema * **Production Ready** - Built-in deployment, monitoring, and scaling tools * **Comprehensive Tooling** - Hot reload, migrations, type generation ## Next Steps Now that you understand the differences, choose your path: Start querying raw blockchain data in minutes Build a lightweight data pipeline Create a full-featured indexer with GraphQL API # SQD SDK Overview Source: https://docs.sqd.dev/en/sdk/overview Choose how to access blockchain data with SQD — direct HTTP queries via the Portal API, lightweight Pipes SDK pipelines, or full Squid SDK GraphQL stacks. ## Choose Your Data Access Method SQD offers three ways to access blockchain data. Each is designed for different use cases and developer preferences. Direct HTTP access to raw blockchain data. Query using simple HTTP requests in any programming language. Lightweight TypeScript streaming library for custom data pipelines with automatic decoding and type safety. Complete TypeScript framework with built-in PostgreSQL and auto-generated GraphQL API. ## Quick Comparison **What it is:** Language-agnostic HTTP API for raw blockchain data **Best for:** Analytics, data lakes, custom pipelines in any language **Key advantage:** 10-50x faster than RPC with minimal setup **What it is:** TypeScript streaming library with flexible architecture **Best for:** Custom pipelines, existing codebases, non-PostgreSQL databases **Key advantage:** Full control over data flow with minimal dependencies **What it is:** Full-stack TypeScript framework with database and API **Best for:** dApp backends, GraphQL APIs, rapid development **Key advantage:** Built-in PostgreSQL + auto-generated GraphQL API ## Next Steps See feature matrices, code examples, and technical trade-offs between Portal API, Pipes SDK, and Squid SDK. Follow the quickstart to set up Portal API and run your first query. **Enterprise Custom Development** For enterprise clients requiring custom indexer development, please contact our team to discuss your specific requirements. [Schedule a Consultation →](https://calendly.com/t-tyrie-subsquid/30min) # Factory transformers Source: https://docs.sqd.dev/en/sdk/pipes-sdk/evm/guides/advanced-topics/factory-transformers Index dynamic contracts with the factory pattern Use the factory pattern when you need to index events from contracts that are deployed dynamically by a known factory contract — for example, Uniswap V3 pools created by the `UniswapV3Factory`. The examples below use typegen-generated ABI modules. See [Specifying events](../basic-development/handling-events#specifying-events) for how to generate them from a JSON ABI. ## Basic factory Track events from contracts created by a factory. The `factory()` helper discovers child contracts from the factory's creation events and maintains the address list in a local SQLite database. ```ts theme={"system"} import { evmPortalStream, evmDecoder, factory, factorySqliteDatabase } from "@subsquid/pipes/evm"; import { createTarget } from "@subsquid/pipes"; import * as factoryAbi from "./abi/uniswap-v3-factory"; import * as poolAbi from "./abi/uniswap-v3-pool"; await evmPortalStream({ portal: "https://portal.sqd.dev/datasets/ethereum-mainnet", outputs: evmDecoder({ range: { from: 12369621 }, contracts: factory({ address: "0x1f98431c8ad98523631ae4a59f267346ea31f984", event: factoryAbi.events.PoolCreated, parameter: "pool", database: factorySqliteDatabase({ path: "./uniswap-v3-pools.sqlite" }), }), events: { swap: poolAbi.events.Swap }, }), }).pipeTo(createTarget({ write: async ({ logger, read }) => { for await (const { data } of read()) { logger.info(`Parsed ${data.swap.length} swaps`); } }, })); ``` ## Filtering factory events To narrow which child contracts are tracked, pass an `event` object with a `params` field. Only creation events matching the specified parameter values are stored — unmatched contracts are ignored at both the portal and the local database level. ```ts theme={"system"} contracts: factory({ address: "0x1f98431c8ad98523631ae4a59f267346ea31f984", event: { event: factoryAbi.events.PoolCreated, params: { token0: "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2", // WETH }, }, parameter: "pool", database: factorySqliteDatabase({ path: "./uniswap-v3-weth-pools.sqlite" }), }) ``` **Filter rules:** * Only **indexed parameters** can be used for filtering. * Multiple parameters are combined with AND logic. * Passing an **array** of values for a parameter matches any of them (OR logic). * Address matching is case-insensitive. ```ts theme={"system"} import { evmPortalStream, evmDecoder, factory, factorySqliteDatabase } from "@subsquid/pipes/evm"; import { createTarget } from "@subsquid/pipes"; import * as factoryAbi from "./abi/uniswap-v3-factory"; import * as poolAbi from "./abi/uniswap-v3-pool"; const WETH = "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2"; await evmPortalStream({ portal: "https://portal.sqd.dev/datasets/ethereum-mainnet", outputs: evmDecoder({ range: { from: 12369621 }, contracts: factory({ address: "0x1f98431c8ad98523631ae4a59f267346ea31f984", event: { event: factoryAbi.events.PoolCreated, params: { token0: WETH }, }, parameter: "pool", database: factorySqliteDatabase({ path: "./uniswap-v3-weth-pools.sqlite" }), }), events: { swap: poolAbi.events.Swap }, }), }).pipeTo(createTarget({ write: async ({ logger, read }) => { for await (const { data } of read()) { logger.info(`Parsed ${data.swap.length} swaps from WETH pools`); } }, })); ``` ## Including factory event data `DecodedEvent` carries a `.factory` field with the creation event. Use it when you need to include factory context (e.g. pool token addresses) alongside each decoded event. ```ts theme={"system"} import { evmPortalStream, evmDecoder, factory, DecodedEvent, factorySqliteDatabase, } from "@subsquid/pipes/evm"; import { createTarget } from "@subsquid/pipes"; import * as factoryAbi from "./abi/uniswap-v3-factory"; import * as poolAbi from "./abi/uniswap-v3-pool"; function addFactoryMetadata(event: DecodedEvent) { return { ...event.event, blockNumber: event.block.number, factoryEvent: event.factory?.event, }; } const decoder = evmDecoder({ range: { from: 12369621 }, contracts: factory({ address: "0x1f98431c8ad98523631ae4a59f267346ea31f984", event: factoryAbi.events.PoolCreated, parameter: "pool", database: factorySqliteDatabase({ path: "./uniswap-v3-pools.sqlite" }), }), events: { swap: poolAbi.events.Swap, mint: poolAbi.events.Mint }, }).pipe(({ swap, mint }) => ({ swap: swap.map(addFactoryMetadata), mint: mint.map(addFactoryMetadata), })); await evmPortalStream({ portal: "https://portal.sqd.dev/datasets/ethereum-mainnet", outputs: decoder, }).pipeTo(createTarget({ write: async ({ logger, read }) => { for await (const { data } of read()) { for (const s of data.swap) { logger.info({ pool: s.factoryEvent?.pool, token0: s.factoryEvent?.token0, token1: s.factoryEvent?.token1, amount0: s.amount0.toString(), amount1: s.amount1.toString(), }); } } }, })); ``` ## Multiple factories Pass separate `evmDecoder` outputs to track contracts from different factory addresses in a single pipeline. ```ts theme={"system"} import { evmPortalStream, evmDecoder, factory, factorySqliteDatabase, } from "@subsquid/pipes/evm"; import { createTarget } from "@subsquid/pipes"; import * as uniswapV3FactoryAbi from "./abi/uniswap-v3-factory"; import * as uniswapV2FactoryAbi from "./abi/uniswap-v2-factory"; import * as poolAbi from "./abi/pool"; await evmPortalStream({ portal: "https://portal.sqd.dev/datasets/ethereum-mainnet", outputs: { v3: evmDecoder({ range: { from: 12369621 }, contracts: factory({ address: "0x1f98431c8ad98523631ae4a59f267346ea31f984", event: uniswapV3FactoryAbi.events.PoolCreated, parameter: "pool", database: factorySqliteDatabase({ path: "./v3-pools.sqlite" }), }), events: { swap: poolAbi.events.Swap }, }), v2: evmDecoder({ range: { from: 10000835 }, contracts: factory({ address: "0x5C69bEe701ef814a2B6a3EDD4B1652CB9cc5aA6f", event: uniswapV2FactoryAbi.events.PairCreated, parameter: "pair", database: factorySqliteDatabase({ path: "./v2-pairs.sqlite" }), }), events: { swap: poolAbi.events.Swap }, }), }, }).pipeTo(createTarget({ write: async ({ logger, read }) => { for await (const { data } of read()) { logger.info({ v3Swaps: data.v3.swap.length, v2Swaps: data.v2.swap.length }); } }, })); ``` ## Pre-indexing factory Experimental Pre-populate the factory database before the main pipeline to ensure all historical child contracts are known before live indexing begins. This is an experimental feature. The pre-indexing request is limited and this approach won't work for thousands of addresses. ```ts theme={"system"} import { evmPortalStream, evmDecoder, factory, factorySqliteDatabase, } from "@subsquid/pipes/evm"; import { createTarget } from "@subsquid/pipes"; import * as factoryAbi from "./abi/uniswap-v3-factory"; import * as poolAbi from "./abi/uniswap-v3-pool"; const factoryDb = factorySqliteDatabase({ path: "./uniswap-v3-pools.sqlite" }); // Step 1: pre-index historical pool creations await evmPortalStream({ portal: "https://portal.sqd.dev/datasets/ethereum-mainnet", outputs: evmDecoder({ range: { from: 12369621, to: 20000000 }, contracts: ["0x1f98431c8ad98523631ae4a59f267346ea31f984"], events: { poolCreated: factoryAbi.events.PoolCreated }, }), }).pipeTo(createTarget({ write: async ({ logger, read }) => { for await (const { data } of read()) { for (const event of data.poolCreated) { await factoryDb.add(event.event.pool); logger.info(`Added pool: ${event.event.pool}`); } } }, })); // Step 2: run main pipeline with populated factory database await evmPortalStream({ portal: "https://portal.sqd.dev/datasets/ethereum-mainnet", outputs: evmDecoder({ range: { from: 20000000 }, contracts: factory({ address: "0x1f98431c8ad98523631ae4a59f267346ea31f984", event: factoryAbi.events.PoolCreated, parameter: "pool", database: factoryDb, }), events: { swap: poolAbi.events.Swap }, }), }).pipeTo(createTarget({ write: async ({ logger, read }) => { for await (const { data } of read()) { logger.info(`Processed ${data.swap.length} swaps`); } }, })); ``` # Data freshness monitoring Source: https://docs.sqd.dev/en/sdk/pipes-sdk/evm/guides/advanced-topics/latency-monitoring Compare Portal data freshness against external RPC providers The `evmRpcLatencyWatcher` subscribes to RPC endpoints via WebSocket and measures when blocks arrive at the Portal versus when they appear at the RPC endpoints. The measured values include client-side network data freshness. For RPC endpoints, only the arrival time of blocks is measured—this does not capture the node’s internal processing or response latency if queried directly. Results represent end-to-end delays as experienced by the client, not pure Portal or RPC processing performance. ```ts theme={"system"} import { formatBlock } from "@subsquid/pipes"; import { evmPortalSource, evmRpcLatencyWatcher } from "@subsquid/pipes/evm"; import { metricsServer } from "@subsquid/pipes/metrics/node"; async function main() { const stream = evmPortalSource({ portal: "https://portal.sqd.dev/datasets/base-mainnet", outputs: evmDecoder({ range: { from: 'latest' }, events: {} }), metrics: metricsServer({ port: 9090 }), }).pipe( evmRpcLatencyWatcher({ rpcUrl: ["https://base.drpc.org", "https://base-rpc.publicnode.com"], }).pipe((data, { metrics }) => { if (!data) return; // Update Prometheus metrics for each RPC endpoint for (const rpc of data.rpc) { metrics .gauge({ name: "rpc_latency_ms", help: "RPC Latency in ms", labelNames: ["url"], }) .set({ url: rpc.url }, rpc.portalDelayMs); } return data; }) ); for await (const { data } of stream) { if (!data) continue; console.log(`Block: ${formatBlock(data.number)} / ${data.timestamp}`); console.table(data.rpc); } } void main() ``` ## Output format Data freshness data includes: * `url`: RPC endpoint URL * `receivedAt`: Timestamp when the RPC endpoint received the block * `portalDelayMs`: Milliseconds between RPC arrival and Portal availability ``` Block: 36,046,611 / Fri Sep 26 2025 14:29:29 GMT+0400 ┌───┬─────────────────────────────────┬──────────────────────────┬───────────────┐ │ │ url │ receivedAt │ portalDelayMs │ ├───┼─────────────────────────────────┼──────────────────────────┼───────────────┤ │ 0 │ https://base.drpc.org │ 2025-09-26T10:29:29.134Z │ 646 │ │ 1 │ https://base-rpc.publicnode.com │ 2025-09-26T10:29:29.130Z │ 642 │ └───┴─────────────────────────────────┴──────────────────────────┴───────────────┘ ``` ## Custom metrics integration Export data freshness metrics to Prometheus or other monitoring systems. ```ts expandable theme={"system"} import { solanaPortalSource, solanaRpcLatencyWatcher } from '@subsquid/pipes/solana' import { Registry, Counter } from 'prom-client' const registry = new Registry() const latencyCounter = new Counter({ name: 'portal_latency_ms_total', help: 'Total Portal latency in milliseconds', registers: [registry], }) const stream = solanaPortalSource({ portal: 'https://portal.sqd.dev/datasets/solana-mainnet', outputs: new SolanaQueryBuilder() .addFields({ block: { number: true, hash: true, timestamp: true } }) .includeAllBlocks({ from: 'latest' }) .build(), }).pipe( solanaRpcLatencyWatcher({ rpcUrl: ['https://api.mainnet-beta.solana.com'], }).pipe({ transform: (data) => { if (!data) return // Record latency metrics for (const rpc of data.rpc) { latencyCounter.inc(rpc.portalDelayMs) } return data }, }), ) // Expose metrics endpoint import http from 'http' const server = http.createServer(async (req, res) => { if (req.url === '/metrics') { res.setHeader('Content-Type', registry.contentType) res.end(await registry.metrics()) } else { res.end('OK') } }) server.listen(9090) for await (const { data } of stream) { // Process blocks } ``` # Logging Source: https://docs.sqd.dev/en/sdk/pipes-sdk/evm/guides/advanced-topics/logging Customize logging with Pino-compatible transports [evmPortalSource()](../../reference/basic-components/source) accepts a Pino-compatible `logger`, allowing you to integrate custom log transports and send logs to external services like GCP Cloud Logging, Sentry, or any other Pino-compatible destination. ## Basic custom logger Pass a custom logger to the source to configure logging for your entire pipeline. ```ts theme={"system"} import { createTarget } from "@subsquid/pipes"; import { evmPortalSource } from "@subsquid/pipes/evm"; import pino from "pino"; async function main() { const transport = pino.transport({ target: "pino-pretty", options: { colorize: true, translateTime: "HH:MM:ss", }, }); const source = evmPortalSource({ portal: "https://portal.sqd.dev/datasets/ethereum-mainnet", logger: pino(transport), }); const target = createTarget({ write: async ({ logger, read }) => { for await (const { data } of read()) { logger.info({ count: data.length }, "Processed batch"); } }, }); await source.pipeTo(target); } void main() ``` ## Integration with cloud services You can use any Pino transport to send logs to cloud services. Pass the configured logger to the source. ```ts theme={"system"} import { createTarget } from "@subsquid/pipes"; import { evmPortalSource } from "@subsquid/pipes/evm"; import pino from "pino"; async function main() { const transport = pino.transport({ target: "@google-cloud/logging-pino", options: { projectId: "your-project-id", logName: "pipes-indexer", }, }); const source = evmPortalSource({ portal: "https://portal.sqd.dev/datasets/ethereum-mainnet", logger: pino(transport), }); const target = createTarget({ write: async ({ logger, read }) => { for await (const { data } of read()) { logger.info( { blocksProcessed: data.blocks?.length, eventsCount: data.transfer?.length, }, "Batch processed" ); } }, }); await source.pipeTo(target); } void main() ``` ```ts theme={"system"} import { createTarget } from "@subsquid/pipes"; import { evmPortalSource } from "@subsquid/pipes/evm"; import pino from "pino"; async function main() { const transport = pino.transport({ target: "pino-sentry-transport", options: { sentry: { dsn: process.env.SENTRY_DSN, environment: "production", }, level: "error", // Only send errors to Sentry }, }); const source = evmPortalSource({ portal: "https://portal.sqd.dev/datasets/ethereum-mainnet", logger: pino(transport), }); const target = createTarget({ write: async ({ logger, read }) => { for await (const { data } of read()) { try { await processData(data); logger.info({ count: data.length }, "Batch processed"); } catch (error) { logger.error({ error, data }, "Failed to process batch"); } } }, }); await source.pipeTo(target); } void main() ``` ```ts theme={"system"} import { createTarget } from "@subsquid/pipes"; import { evmPortalSource } from "@subsquid/pipes/evm"; import pino from "pino"; async function main() { const transport = pino.transport({ targets: [ { target: "pino-pretty", options: { colorize: true }, level: "info", }, { target: "@google-cloud/logging-pino", options: { projectId: "your-project-id" }, level: "info", }, { target: "pino-sentry-transport", options: { sentry: { dsn: process.env.SENTRY_DSN } }, level: "error", }, ], }); const source = evmPortalSource({ portal: "https://portal.sqd.dev/datasets/ethereum-mainnet", logger: pino(transport), }); const target = createTarget({ write: async ({ logger, read }) => { for await (const { data } of read()) { logger.info({ count: data.length }, "Processed batch"); } }, }); await source.pipeTo(target); } void main() ``` The `ctx.logger` in transformers and targets is the same logger instance passed to the source. Configure logging at the source level, then use `ctx.logger` throughout your pipeline for consistent logging. # Metrics Source: https://docs.sqd.dev/en/sdk/pipes-sdk/evm/guides/advanced-topics/metrics Track custom Prometheus metrics in EVM pipes Pipes SDK can expose a Prometheus-compatible metrics server. You can customize it to add counters, gauges, histograms, and summaries. ```ts theme={"system"} import { commonAbis, evmDecoder, evmPortalStream } from "@subsquid/pipes/evm"; import { metricsServer } from "@subsquid/pipes/metrics/node"; async function main() { const stream = evmPortalStream({ id: 'evm-decoder', portal: 'https://portal.sqd.dev/datasets/ethereum-mainnet', outputs: evmDecoder({ range: { from: 'latest', }, events: { transfers: commonAbis.erc20.events.Transfer, }, }), metrics: metricsServer({ port: 9090 }), // equivalent to metricsServer(), as 9090 is the default port }) for await (const { data, ctx } of stream) { // Add custom counter metric ctx.metrics .counter({ name: "my_transfers_counter", help: "Number of processed transactions", }) .inc(data.transfers.length); } } void main() ``` Access metrics at `http://localhost:9090/metrics` to verify they're being exposed correctly. ``` # HELP my_transfers_counter Number of processed transactions # TYPE my_transfers_counter counter my_transfers_counter 218598 ``` Use Grafana dashboards to visualize block processing rate, error rates, and latency trends from your Prometheus metrics. ## Available metric types You can create different types of Prometheus metrics: ```ts theme={"system"} for await (const { data, ctx } of stream) { // Counter - monotonically increasing value ctx.metrics.counter({ name: "events_total", help: "Total events" }).inc(); // Gauge - value that can go up or down ctx.metrics .gauge({ name: "queue_size", help: "Current queue size" }) .set(queueSize); // Histogram - observations with configurable buckets ctx.metrics .histogram({ name: "batch_size", help: "Batch size distribution" }) .observe(data.transfers.length); } ``` Expose metrics with `metricsServer()` on your source, then visualize them with [Pipes UI](../basic-development/pipes-ui). See the [Profiling](./profiling) guide for the built-in per-batch profiler exposed on the same metrics endpoint. # Profiling Source: https://docs.sqd.dev/en/sdk/pipes-sdk/evm/guides/advanced-topics/profiling Measure where time is spent in a EVM pipe Pipes SDK ships a built-in per-batch profiler. It records how long each part of the pipeline takes. When a [`metricsServer()`](../../reference/utility-components/metrics-server) is attached to the source, the profiler output is served as JSON at `http://localhost:/profiler` and rendered live in [Pipes UI](../basic-development/pipes-ui). ## Enabling the profiler The profiler is **on by default** when `process.env.NODE_ENV !== 'production'` and **off** otherwise. Override explicitly with `profiler: true` or `profiler: false` on the source. ```ts theme={"system"} evmPortalStream({ portal: 'https://portal.sqd.dev/datasets/ethereum-mainnet', outputs: /* ... */, metrics: metricsServer({ port: 9090 }), profiler: true, // force on — useful in production }) ``` ## Interpreting the output The pipeline is represented as a tree. Each node reports how long was spent in that stage of a batch. A typical tree looks like: ``` batch ├── fetch data ├── apply transformers │ ├── track progress │ └── EVM decoder ├── clickhouse │ ├── data handler │ ├── insert cursor │ └── cleanup cursors └── metrics processing ``` ## Custom spans Wrap any code in your target or transformer to get it to appear as a tree node. `ctx.profiler.start()` is a no-op when the profiler is disabled, so the instrumentation is safe to leave in place. ```ts theme={"system"} onData: async ({ data, ctx }) => { const span = ctx.profiler.start('my measure') await myDataProcessing(data) span.end() }, ``` The named span appears under its parent node in the tree: ``` batch ... ├── clickhouse │ ├── data handler │ │ └── my measure ... ``` # Railway deployment Source: https://docs.sqd.dev/en/sdk/pipes-sdk/evm/guides/advanced-topics/railway-deployment Deploying a Pipes SDK project to Railway This guide walks you through deploying a Pipes SDK indexer to [Railway](https://railway.app). You'll end up with four services running together: your indexer, a database (PostgreSQL or ClickHouse), and the Pipe UI dashboard. ## Prerequisites * A [Railway account](https://railway.app) * Your project pushed to a **public or private GitHub repository** * Your project built with `pipes init` (which generates a `Dockerfile` and `docker-compose.yaml`) *** ## Option A — Drag & Drop via the Railway Dashboard The quickest way to get started is to drop your `docker-compose.yaml` directly onto the Railway project canvas. ### Step 1 — Create a new project on Railway 1. Go to [railway.app/new](https://railway.app/new) and click **Empty Project**. ### Step 2 — Drag and drop your `docker-compose.yaml` 1. Open your project canvas. 2. Drag the `docker-compose.yaml` file from your project root and drop it anywhere on the canvas. Railway parses the file and creates a service for each entry. For a typical Pipes SDK project this produces: | Service | Image / Source | | -------------------------- | ---------------------------------- | | Your indexer | Built from your local `Dockerfile` | | `postgres` or `clickhouse` | Official Docker images | ### Step 3 — Link the indexer service to your GitHub repository 1. Click the indexer service card on the canvas. 2. Go to **Settings → Source** and choose **GitHub Repo**. 3. Select your repository and branch. Railway will now redeploy automatically on every push. ### Step 4 — Add the Pipe UI service 1. Click **+ New Service** on the canvas. 2. Choose **Docker Image** and enter `iankguimaraes/pipe-ui:latest`. 3. Go to the service's **Variables** tab and add: ``` METRICS_SERVER_URL=${{Pipes.RAILWAY_PRIVATE_DOMAIN}}:9090 ``` Replace `Pipes` with the actual name Railway assigned to your indexer service. ### Step 5 — Generate public domains For each service that needs a public URL: 1. Click the service card. 2. Go to **Settings → Networking → Public Networking**. 3. Click **Generate Domain** and set the correct port (`3000` for Pipe UI, `5432` for PostgreSQL, `8123` for ClickHouse). *** ## Option B — Railway CLI If you prefer the terminal, the Railway CLI gives you full control over every service and environment variable. ### Step 1 — Install the Railway CLI ```bash theme={"system"} # macOS / Linux curl -fsSL https://railway.app/install.sh | sh # or via npm npm install -g @railway/cli ``` ### Step 2 — Log in to Railway ```bash theme={"system"} railway login ``` This opens a browser window for OAuth authentication. After approving, the CLI is authenticated for the current session. ### Step 3 — Initialize the Railway project Run this from the root of your indexer project: ```bash theme={"system"} railway init --name "your-project-name" ``` Use the same name as the `name` field in your `package.json`. This creates a new project on Railway and links the current directory to it. ### Step 4 — Add the database service **If your project uses PostgreSQL** (projects with `drizzle.config.ts`): ```bash theme={"system"} railway add -d postgres ``` Railway provisions a managed PostgreSQL instance and injects a `DATABASE_URL` variable automatically. **If your project uses ClickHouse:** ```bash theme={"system"} railway add \ --service Clickhouse \ --image clickhouse/clickhouse-server:latest \ --variables CLICKHOUSE_DB=pipes \ --variables CLICKHOUSE_USER=default \ --variables CLICKHOUSE_PASSWORD=password ``` ### Step 5 — Add the indexer service Replace `your-org/your-repo` with your actual GitHub repository slug. **PostgreSQL project:** ```bash theme={"system"} railway add \ --service Pipes \ --repo your-org/your-repo \ --variables "DB_CONNECTION_STR=\${{Postgres.DATABASE_URL}}" ``` **ClickHouse project:** ```bash theme={"system"} railway add \ --service Pipes \ --repo your-org/your-repo \ --variables "CLICKHOUSE_URL=http://\${{Clickhouse.RAILWAY_PRIVATE_DOMAIN}}:8123" \ --variables CLICKHOUSE_DB=pipes \ --variables CLICKHOUSE_USER=default \ --variables CLICKHOUSE_PASSWORD=password ``` > **Note on variable syntax:** `${{ServiceName.VARIABLE}}` is Railway's cross-service reference syntax. The shell requires escaping the `$` as `\$` when passing it through the CLI; Railway resolves it at runtime. The `--repo` flag links this service to your GitHub repository and enables automatic deployments on every push to the default branch. ### Step 6 — Add the Pipe UI dashboard ```bash theme={"system"} railway add \ --service PipeUI \ --image iankguimaraes/pipe-ui:latest \ --variables "METRICS_SERVER_URL=\${{Pipes.RAILWAY_PRIVATE_DOMAIN}}:9090" ``` The Pipe UI connects to your indexer's metrics endpoint (port 9090, exposed by the indexer at runtime). ### Step 7 — Generate public domains Give each service a publicly accessible URL: **PostgreSQL project:** ```bash theme={"system"} # Expose the database railway domain --service Postgres --port 5432 # Expose the UI railway domain --service PipeUI --port 3000 ``` **ClickHouse project:** ```bash theme={"system"} # Expose the database railway domain --service Clickhouse --port 8123 # Expose the UI railway domain --service PipeUI --port 3000 ``` ### Step 8 — Open the Railway dashboard ```bash theme={"system"} railway open ``` This opens your project in the Railway web UI where you can monitor deployments, view logs, and manage environment variables. *** ## Service Architecture ``` ┌──────────────────────────────────────────────────────┐ │ Railway Project │ │ │ │ ┌─────────────┐ private network │ │ │ Database │◄────────────────────┐ │ │ │ (Postgres │ │ │ │ │ /Clickhouse)│ │ │ │ └──────┬──────┘ │ │ │ │ public domain (optional) │ │ │ ┌──────┴──────┐ │ │ │ Indexer │ │ │ │ (Pipes) │ │ │ └──────┬──────┘ │ │ │ :9090 metrics │ │ ┌──────▼──────┐ │ │ │ Pipe UI │ │ │ │ (PipeUI) │ │ │ └──────┬──────┘ │ │ │ public domain │ └──────────────────────────────────────┼───────────────┘ ▼ Browser / API ``` Services communicate over Railway's private network using `${{ServiceName.RAILWAY_PRIVATE_DOMAIN}}` references. Only the UI (and optionally the database) need public domains. *** ## Environment Variables Reference ### Indexer (PostgreSQL) | Variable | Value | | ------------------- | ---------------------------- | | `DB_CONNECTION_STR` | `${{Postgres.DATABASE_URL}}` | ### Indexer (ClickHouse) | Variable | Value | | --------------------- | ---------------------------------------------------- | | `CLICKHOUSE_URL` | `http://${{Clickhouse.RAILWAY_PRIVATE_DOMAIN}}:8123` | | `CLICKHOUSE_DB` | `pipes` | | `CLICKHOUSE_USER` | `default` | | `CLICKHOUSE_PASSWORD` | `password` | ### ClickHouse service | Variable | Value | | --------------------- | ---------- | | `CLICKHOUSE_DB` | `pipes` | | `CLICKHOUSE_USER` | `default` | | `CLICKHOUSE_PASSWORD` | `password` | ### Pipe UI | Variable | Value | | -------------------- | ---------------------------------------- | | `METRICS_SERVER_URL` | `${{Pipes.RAILWAY_PRIVATE_DOMAIN}}:9090` | *** ## Dockerfile Overview Your project's `Dockerfile` (generated by `pipes init`) uses a two-stage build: 1. **Builder stage** — installs dependencies with `pnpm`, compiles TypeScript to `dist/`. 2. **Runner stage** — copies only the production build, runs migrations (PostgreSQL only), then starts the indexer. The indexer exposes **port 9090** for metrics, which Pipe UI connects to. ```dockerfile theme={"system"} EXPOSE 9090 CMD ["sh", "-lc", "pnpm db:generate && pnpm db:migrate && node dist/index.js"] # (PostgreSQL only; ClickHouse projects skip the migration step) ``` *** ## Troubleshooting **Indexer fails to start — cannot connect to database** The database service may not be healthy yet. Railway starts services in parallel; the indexer's health-check retry logic should handle this, but you can also set a startup delay under **Settings → Deploy → Start Command**. **`${{...}}` variables show as literal strings** Cross-service references are resolved at deploy time. Make sure both services are in the same Railway project and the referenced service name matches exactly (case-sensitive). **ClickHouse connection refused** Confirm `CLICKHOUSE_URL` uses the private domain (`RAILWAY_PRIVATE_DOMAIN`), not a public URL, and that port 8123 is correct. **Pipe UI shows no data** Check that `METRICS_SERVER_URL` points to the private domain of the indexer service and that port 9090 is included. # Stateful transforms Source: https://docs.sqd.dev/en/sdk/pipes-sdk/evm/guides/advanced-topics/stateful-transforms Six approaches to maintaining state across pipe batches, with trade-offs and examples A stateful transform is any step in your pipeline that produces output based on more than the current batch — for example, running balances, sliding-window aggregates, or enrichment lookups. This page surveys the available approaches and when to choose each one. ## Before you add state The cleanest solution is often to emit raw events and let the downstream database derive the state at query time. ClickHouse materialized views and Postgres views both work for this. If your logic can be expressed as SQL and you can tolerate slightly higher query latency, prefer this over transformer state — it eliminates crash recovery and fork handling entirely on the transformer side. If your logic is hard to express in SQL, or if the derived state must be pre-computed before reaching the target, read on. ## At a glance | Approach | State lives in | Persistence across restarts | Fork handling | Extra infra | Best for | | ------------------------------------------------------- | -------------- | ------------------------------ | ----------------------- | ------------- | -------------------------------------------- | | [A. Pure in-RAM](#a-pure-in-ram) | JS heap | rebuilt from portal on startup | `fork()` callback | none | sliding windows, candles, rolling aggregates | | [B. ClickHouse MVs](#b-clickhouse-materialized-views) | ClickHouse | ✓ | `sign = -1` rows | ClickHouse | SQL-expressible analytics | | [C. SQLite transformer](#c-sqlite-transformer) | Local file | ✓ (delta table) | `fork()` callback | none | moderate state | | [D. Postgres/Drizzle target](#d-postgresdrizzle-target) | Postgres | ✓ | ✓ automatic | Postgres | atomic state + output, Postgres target | | [E. Apache Flink](#e-apache-flink) | Flink cluster | ✓ | via compensating events | Kafka + Flink | TB-scale distributed state | | [F. External KV store](#f-external-kv-store) | Redis / Valkey | ✓ (with AOF/RDB) | `fork()` callback | Redis | µs-latency lookups, multi-process state | *** ## A. Pure in-RAM Keep state in a JavaScript `Map` or array inside the transformer closure. No external storage is involved. **When to use:** * State can be derived from a bounded window of recent blocks (e.g., last N blocks or last M seconds). * You can afford to replay that window on restart (warm-up time ∝ window size). * State loss is contained: at most one window's worth of history needs to be replayed. **When not to use:** * State grows without bound (e.g., all-time ERC-20 balances). Use [Postgres approach D](#d-postgresdrizzle-target) instead — specifically the in-memory + Postgres mirror sub-approach. * The warm-up window is too large to replay quickly on every restart. ### The warm-up pattern When the process restarts, the target's cursor tells you where the pipeline left off. The in-RAM state is gone. To rebuild it, call `portal.getStream()` in the `start()` callback — the raw portal client API, independent of the main stream: ```typescript theme={"system"} start: async ({ portal, state, logger }) => { if (!state.current) return // first ever run: start empty const warmupFrom = Math.max(state.initial, state.current.number - LOOKBACK_BLOCKS) if (warmupFrom >= state.current.number) return for await (const { blocks } of portal.getStream({ type: 'evm', fromBlock: warmupFrom, toBlock: state.current.number, fields: { block: { number: true }, log: { data: true } }, logs: [{ address: [CONTRACT], topic0: [EVENT_TOPIC] }], })) { for (const block of blocks) { for (const log of block.logs) { // rebuild in-RAM state from block and log fields } } } } ``` `portal` is a live `PortalClient` already connected to the dataset. The warm-up query's `toBlock` is the saved cursor, so it terminates immediately after the pipeline resumes from `cursor + 1`. Multiple in-RAM transformers each run their own `start()` warm-up in parallel (the SDK calls child `start()` callbacks concurrently). ### Fork handling `target.fork()` fires first (ClickHouse `onRollback` or drizzle snapshot rollback), then the transformer's `fork()` callback. At that point the database already reflects pre-fork state. In `fork()`, drop in-RAM entries for blocks beyond the rollback cursor: ```typescript theme={"system"} fork: async (cursor, { logger }) => { recentEntries = recentEntries.filter(e => e.blockNumber <= cursor.number) } ``` ### Composability Use the same `initQueue`/`WriteQueue` pattern as the Postgres examples. For ClickHouse targets the queue holds closures over `ClickhouseStore` instead of a Postgres `Transaction`: ```typescript theme={"system"} type CHS = { insert(params: { table: string; values: unknown[]; format: string }): Promise } class WriteQueue { private ops: Array<(store: CHS) => Promise> = [] push(op: (store: CHS) => Promise): void { this.ops.push(op) } async flush(store: CHS): Promise { for (const op of this.ops) await op(store) } } ``` See [`13.stateful-transform-in-ram.example.ts`](https://github.com/subsquid-labs/pipes-sdk-docs/blob/master/src/advanced/evm/13.stateful-transform-in-ram.example.ts) for the full implementation: a rolling \~1-hour transfer volume tracker for the SQD token on Arbitrum, with portal warm-up, fork handling, and the WriteQueue composability pattern targeting ClickHouse. *** ## B. ClickHouse materialized views Write raw events to a base table; let ClickHouse compute derived state via materialized views (MVs). The transformer is stateless — it only emits events, not pre-computed state. **When to use:** * Your aggregation logic is expressible in SQL. * ClickHouse is already your target. * You want derived state updated automatically without any transformer code. **When not to use:** * Logic requires imperative iteration (e.g., order-dependent simulation). * Each MV chain adds latency — avoid long dependency chains for latency-sensitive consumers. * Very frequent writes on lightweight data: prefer plain (non-materialized) views if you have spare CPU on the database machine. ### The core limitation: MVs see only new rows A materialized view fires when new rows are inserted into its source table. Its `SELECT` clause only operates on the **newly inserted batch**, not the full table. Running totals like cumulative balance cannot be written directly in the MV `SELECT`. ### Workaround: auxiliary aggregating tables Maintain a separate "current state" table using `AggregatingMergeTree` with `argMaxState`. The MV reads this table alongside the new rows to resolve the latest value before the current batch: ```sql theme={"system"} -- Stores latest balance per pool (AggregatingMergeTree = efficient upsert) CREATE TABLE current_balances ( pool_address String, token_a_balance_raw AggregateFunction(argMax, Int256, Tuple(DateTime, UInt16, UInt16)), token_b_balance_raw AggregateFunction(argMax, Int256, Tuple(DateTime, UInt16, UInt16)) ) ENGINE = AggregatingMergeTree() ORDER BY pool_address; -- MV that keeps current_balances up to date CREATE MATERIALIZED VIEW current_balances_mv TO current_balances AS SELECT pool_address, argMaxState(token_a_balance_raw, (timestamp, transaction_index, log_index)) AS token_a_balance_raw, argMaxState(token_b_balance_raw, (timestamp, transaction_index, log_index)) AS token_b_balance_raw FROM balances_history GROUP BY pool_address; ``` A downstream MV that needs the running balance queries `current_balances` with `argMaxMerge()`: ```sql theme={"system"} latest_pool_balances AS ( SELECT pool_address, argMaxMerge(token_a_balance_raw) AS balance_token_a_raw, argMaxMerge(token_b_balance_raw) AS balance_token_b_raw FROM current_balances WHERE pool_address IN (SELECT pool_address FROM unique_pools_to_insert) GROUP BY pool_address ) ``` ### Temporal joins with ASOF JOIN When you need "the most recent price before each event", use `ASOF JOIN`: ```sql theme={"system"} SELECT ... FROM liquidity_events_raw ml ASOF JOIN latest_prices wp ON wp.pool_address = ml.pool_address AND wp.ts_num + wp.transaction_index * 100_000 + wp.log_index <= ml.ts_num + ml.transaction_index * 100_000 + ml.log_index WHERE ml.protocol = 'uniswap_v4' ``` The `ASOF JOIN` selects the latest row in `latest_prices` whose ordering key is ≤ the current event's key — effectively "last price before this log". ### Fork rollback ClickHouse is non-transactional. Use `CollapsingMergeTree` with a `sign` column: insert `sign = 1` rows on the way forward and `sign = -1` rows to cancel them on rollback. Your `onRollback` handler computes which blocks to cancel and inserts the negating rows. See [`pipes-sqdgn-dex-example/pipes/evm/liquidity/liquidity.sql`](https://github.com/subsquid-labs/pipes-sqdgn-dex-example/blob/master/pipes/evm/liquidity/liquidity.sql) for a full production SQL schema: `liquidity_events_raw` as the base table, `CollapsingMergeTree` for rollback, `AggregatingMergeTree` + `argMaxState` for current pool balances, an `ASOF JOIN` MV for V4 liquidity, and separate V2/V3/V4 MV chains targeting `balances_history`. *** ## C. SQLite transformer Keep transformer state in a local SQLite database. The transformer reads and writes SQLite; the downstream target (typically ClickHouse) receives the pre-computed rows. **When to use:** * State is too large for RAM. * You need random-access lookups (e.g., "current balance of address X") that would be slow as a linear scan of in-RAM arrays. * You're not using Postgres as your target (otherwise see [approach D](#d-postgresdrizzle-target) for better atomicity). * The indexer runs on persistent infrastructure (SQLite file must survive restarts). **When not to use:** * The indexer runs on ephemeral infrastructure (containers, spot VMs). SQLite is lost on restart. * State requires complex analytical SQL (window functions, multi-table joins) — consider DuckDB as a drop-in alternative with full analytical query support. ### The delta table pattern SQLite and the downstream target commit separately — a crash between them leaves the two out of sync. A `balance_deltas` table records the net change per address per block, allowing `rollbackTo(blockNumber)` to invert any set of blocks atomically: ```typescript theme={"system"} // Schema db.exec(` CREATE TABLE IF NOT EXISTS balance_deltas ( address TEXT NOT NULL, block_number INTEGER NOT NULL, delta TEXT NOT NULL, PRIMARY KEY (address, block_number) ) `) function rollbackTo(blockNumber: number) { db.transaction(() => { const deltas = db.prepare('SELECT address, delta FROM balance_deltas WHERE block_number > ?').all(blockNumber) const net = new Map() for (const { address, delta } of deltas) net.set(address, (net.get(address) ?? 0n) + BigInt(delta)) for (const [address, delta] of net) { const { balance } = db.prepare('SELECT balance FROM balances WHERE address = ?').get(address) as any db.prepare('INSERT INTO balances (address, balance) VALUES (?, ?) ON CONFLICT(address) DO UPDATE SET balance = excluded.balance') .run(address, (BigInt(balance) - delta).toString()) } db.prepare('DELETE FROM balance_deltas WHERE block_number > ?').run(blockNumber) db.prepare('DELETE FROM processed_blocks WHERE block_number > ?').run(blockNumber) })() } ``` ### Crash recovery in start() Compare the SQLite high-water mark with the pipeline cursor. If SQLite is ahead, roll back to match: ```typescript theme={"system"} start: async ({ state }) => { const sqliteLastBlock = db.prepare('SELECT MAX(block_number) as m FROM processed_blocks').get().m ?? null const pipelineLastBlock = state.current?.number ?? null if (sqliteLastBlock !== null && (pipelineLastBlock === null || sqliteLastBlock > pipelineLastBlock)) { rollbackTo(pipelineLastBlock ?? -1) // SQLite crashed ahead of cursor — roll back } else if (sqliteLastBlock !== pipelineLastBlock) { throw new Error(`State mismatch: SQLite=${sqliteLastBlock}, cursor=${pipelineLastBlock}. Delete the SQLite file to rebuild.`) } } ``` ### Historical-only variant If you're indexing only finalized data and will never see forks, drop the delta table and accept that a crash requires rebuilding from scratch: ```typescript theme={"system"} start: async ({ state }) => { if (sqliteLastBlock !== pipelineLastBlock) { throw new Error(`Delete ${SQLITE_DB_PATH} to rebuild.`) } } ``` * [`09.stateful-transform-on-sqlite.example.ts`](https://github.com/subsquid-labs/pipes-sdk-docs/blob/master/src/advanced/evm/09.stateful-transform-on-sqlite.example.ts) — full delta-table implementation with fork and crash recovery * [`10.stateful-transform-on-sqlite-no-forks.example.ts`](https://github.com/subsquid-labs/pipes-sdk-docs/blob/master/src/advanced/evm/10.stateful-transform-on-sqlite-no-forks.example.ts) — simpler historical-only variant *** ## D. Postgres/Drizzle target When your target is already Postgres, the `drizzleTarget` can commit transformer state and output rows inside the **same serializable transaction** as the cursor save. This gives the strongest atomicity guarantees of any approach: a crash between `transform()` and the cursor save is impossible because both commit together. **When to use:** * Postgres is your output target. * You want zero crash recovery code (atomicity handles it automatically). * Fork rollback should be automatic (drizzleTarget installs snapshot triggers). **When not to use:** * Your target is ClickHouse or another non-Postgres database. * State is too large for Postgres (rare). ### The WriteQueue / initQueue pattern Multiple stateful transformers all need to write inside the same transaction. The `WriteQueue` collects their write closures; `initQueue` wraps each batch in a `Piped` with a fresh queue; `onData` flushes everything: ```typescript theme={"system"} class WriteQueue { private ops: Array<(tx: Transaction) => Promise> = [] push(op: (tx: Transaction) => Promise): void { this.ops.push(op) } async flush(tx: Transaction): Promise { for (const op of this.ops) await op(tx) } } function initQueue() { return createTransformer>({ transform: (data) => ({ payload: data, writes: new WriteQueue() }), }) } // Pipeline: stream .pipe(initQueue()) .pipe(transformerA(db)) .pipe(transformerB(db)) .pipeTo(drizzleTarget({ db, tables: [tableA, tableB], onData: async ({ tx, data }) => { await data.writes.flush(tx) }, })) ``` `onData` stays a one-liner regardless of how many transformers are chained. ### Sub-approach 1 — Stateless transform (per-batch DB reads) `transform()` reads current state from Postgres (the last committed snapshot), computes the delta, and pushes write closures to the queue. No in-RAM Map survives between batches. * ✓ No RAM limit on state size * ✓ Zero fork handling code (snapshot triggers on `tables` cover rollback) * ✗ One `SELECT … WHERE address IN (…)` per batch See [`11.stateful-transforms-postgres-stateless.example.ts`](https://github.com/subsquid-labs/pipes-sdk-docs/blob/master/src/advanced/evm/11.stateful-transforms-postgres-stateless.example.ts): two transformers (`BalanceTransformer` + `TransferCountTransformer`) reading from Postgres each batch and writing atomically via WriteQueue. ### Sub-approach 2 — In-memory + Postgres mirror `start()` loads the full state into in-RAM Maps. `transform()` reads/writes the Maps with no DB round trips per batch. `fork()` reloads the Maps from Postgres after drizzleTarget commits the snapshot rollback. * ✓ No per-batch DB reads — all reads from memory after startup * ✓ Fast for large batches with many distinct keys * ✗ Full state must fit in RAM * ✗ Startup time is O(state size) * ✗ `fork()` callbacks required to resync Maps after rollback See [`12.stateful-transforms-postgres-in-memory.example.ts`](https://github.com/subsquid-labs/pipes-sdk-docs/blob/master/src/advanced/evm/12.stateful-transforms-postgres-in-memory.example.ts): same two transformers with in-RAM Maps loaded from Postgres at startup and reloaded on fork. For both sub-approaches, all state tables must be listed in `drizzleTarget`'s `tables` array. This installs PostgreSQL snapshot triggers that roll them back automatically on a blockchain reorg. The `onStart` callback can run `CREATE TABLE IF NOT EXISTS` for quick setup; in production, use [drizzle-kit migrations](https://orm.drizzle.team/docs/migrations) instead. *** ## E. Apache Flink [Apache Flink](https://flink.apache.org) is a distributed stateful stream-processing framework. The Pipes SDK acts as a data source feeding Flink via Kafka or a direct connector. **When to use:** * State is too large for a single machine (terabytes). * Your problem requires stateful joins across multiple independent streams (e.g., correlate DEX trades with lending liquidations across different chains). * You need exactly-once semantics across multiple heterogeneous sinks. **When not to use:** * Single-node deployments — the operational overhead (JVM runtime, cluster management, ZooKeeper or KRaft, checkpoint storage) is only justified when the problem genuinely requires distributed state. **Architecture:** The Pipes SDK emits raw events to Kafka (one topic per event type). On a blockchain fork, it emits compensating rows (e.g., `sign = -1`) that Flink sees as normal data and can handle with a subtract-and-recompute pattern. Flink manages its own checkpoints; crash recovery is handled entirely by Flink. *** ## F. External KV store Use Redis, Valkey, or a similar key-value store as a fast external state backend. **When to use:** * Multiple parallel pipeline instances must share state (horizontal scaling of the indexer). * Per-key lookups must complete in under 1 ms (e.g., enriching 50 k events per second with metadata from a 100 M-entry map that doesn't fit in RAM). **When not to use:** * A single-process indexer is sufficient — adding Redis increases operational complexity for no benefit. * You need transactional state + output commits (use approach D instead). **Fork handling:** The transformer's `fork()` callback must delete or undo the Redis keys written for rolled-back blocks. Keep a per-block write log (similar to the SQLite delta table) to know which keys to revert. **Crash safety:** Redis is not durable by default. Enable AOF or RDB persistence, or treat Redis purely as a warm cache and accept that a Redis restart requires a replay from the pipeline cursor. *** ## Fork callbacks and crash recovery The fork handling responsibilities differ by approach: | Approach | `fork()` needed in transformer | How DB state is rolled back | | ------------------- | ------------------------------------- | ---------------------------------- | | A. In-RAM | ✓ — prune entries > cursor | n/a (state is RAM-only) | | B. ClickHouse MVs | ✗ — handled in `onRollback` | `sign = -1` rows via `onRollback` | | C. SQLite | ✓ — calls `rollbackTo(cursor.number)` | `rollbackTo()` reverts delta table | | D. Postgres/drizzle | ✗ — automatic | snapshot triggers via `tables` | | E. Flink | ✗ — compensating events | Flink checkpoint rollback | | F. External KV | ✓ — revert write log | manual key deletion | **Ordering guarantee:** `target.fork()` always fires before transformer `fork()` callbacks. By the time your transformer's `fork()` runs, the target (ClickHouse `onRollback`, drizzleTarget snapshot rollback) has already committed the database rollback. It is safe to read the database in `fork()`. **Crash recovery** (approaches A, B, C only): a crash between the transformer's store write and the target's cursor save leaves state ahead of the pipeline cursor. Handle this in `start()` by comparing your local high-water mark to `state.current`: ```typescript theme={"system"} start: async ({ state }) => { const localLastBlock = /* read your checkpoint */ const pipelineLastBlock = state.current?.number ?? null if (localLastBlock !== null && (pipelineLastBlock === null || localLastBlock > pipelineLastBlock)) { rollbackTo(pipelineLastBlock ?? -1) // crash recovery: undo ahead-of-cursor writes } } ``` Approaches D (Postgres/drizzle) and E (Flink) are immune to this problem: state and cursor commit atomically. *** ## Composing multiple stateful transformers When multiple stateful transformers write to the same target, they must not each independently call the target's write API. Use the `WriteQueue` / `initQueue` pattern to collect all writes and flush them in a single `onData` call: 1. **`initQueue()`** wraps the raw batch in `Piped` with a fresh `WriteQueue`. Place it as the first `.pipe()`. 2. **Each transformer** receives `Piped`, pushes closures to `writes`, and returns `Piped` unchanged. 3. **`onData`** calls `data.writes.flush(store_or_tx)` — a one-liner that scales to any number of transformers. ```typescript theme={"system"} type Piped = { payload: T; writes: WriteQueue } function initQueue() { return createTransformer>({ transform: (data) => ({ payload: data, writes: new WriteQueue() }), }) } ``` Because every domain transformer takes `Piped` as input and produces `Piped` as output, none of them assume a fixed position in the chain — they are all order-independent and can be added or removed without touching the others. For Postgres targets, `WriteQueue` closures take a `Transaction`; for ClickHouse, they take the structural `CHS` type shown in approach A. The pattern is identical in both cases. # Cursor management Source: https://docs.sqd.dev/en/sdk/pipes-sdk/evm/guides/architecture-deep-dives/cursor-management How pipelines track progress and resume after restarts A cursor records the last successfully processed block. Built-in targets (ClickHouse, Drizzle) handle persistence automatically. When using `createTarget` directly you own the full lifecycle. ## The cursor object ```typescript theme={"system"} type BlockCursor = { number: number // stream resumes from number + 1 hash?: string // block hash — used as parentBlockHash for fork detection timestamp?: number // block timestamp in seconds } ``` `hash` is the fork detection tripwire: the SDK sends `parentBlockHash = cursor.hash` in each portal request. An absent hash silently skips fork detection for that request. See [cursor semantics](./fork-handling#5-cursor-semantics) for the full picture. ## Startup: range.from and stored cursors `range.from` in the decoder sets where the stream begins on a first run — before any cursor exists: ```typescript theme={"system"} evmDecoder({ range: { from: 'latest' }, // chain head // range: { from: 20_000_000 }, // block number // range: { from: '2024-01-01' }, // ISO date string // range: { from: new Date() }, // Date object }) ``` Once a cursor is stored, `range.from` is ignored — the stream resumes from `cursor.number + 1`. ## The stream id The `id` on `evmPortalStream` is the primary key for all stored state: ```typescript theme={"system"} evmPortalStream({ id: 'my-pipeline', ... }) ``` Both built-in targets use it to isolate state records, so multiple streams can share one physical table. **Never rename an active stream's id** — the stored cursor is keyed on it, and renaming causes the pipeline to restart from `range.from`. ## ClickHouse target `clickhouseTarget` saves the cursor after every successful `onData` call and resolves fork and crash-recovery callbacks automatically. ```typescript theme={"system"} clickhouseTarget({ client, settings: { id: 'my-stream', // overrides evmPortalStream id (default: 'stream') table: 'sync', // state table name (default: 'sync') database: 'default', // ClickHouse database maxRows: 10_000, // cursor rows to keep per stream id (default: 10,000) }, onData: ..., onRollback: ..., }) ``` **`onRollback` is called in two situations:** * `type: 'offset_check'` — on every startup when a cursor exists. ClickHouse is non-transactional: a crash between `onData` and the cursor save leaves rows newer than the saved cursor. Delete them here. See [non-transactional databases](./fork-handling#4-state-rollback-atomicity). * `type: 'blockchain_fork'` — when the portal signals a reorg. The rollback cursor is resolved automatically from stored history; your callback only needs to delete rows after `safeCursor.number`. The same implementation typically serves both: ```typescript theme={"system"} onRollback: async ({ store, safeCursor }) => { await store.removeAllRows({ tables: ['my_table'], where: `block_number > {n:UInt32}`, params: { n: safeCursor.number }, }) }, ``` **State table.** Each row stores the cursor, the last finalized block, and the unfinalized block history used for fork recovery. Rows beyond `maxRows` are pruned every 25 saves. Set `maxRows` to cover your network's worst-case reorg depth — see [rollback depth](./fork-handling#3-rollback-depth-and-history-limits). ## Drizzle target `drizzleTarget` saves the cursor inside the same PostgreSQL transaction as the data write — fully atomic, no crash-recovery pass needed. ```typescript theme={"system"} drizzleTarget({ db: drizzle(DB_URL), tables: [transfersTable], // every table onData writes to — required settings: { state: { id: 'my-stream', schema: 'public', table: 'sync', unfinalizedBlocksRetention: 1000, // cursor rows to keep (default: 1,000) }, transaction: { isolationLevel: 'serializable' }, // default }, onData: async ({ tx, data }) => { await tx.insert(transfersTable).values(...) }, }) ``` **`tables` is required** for every table written in `onData`. At startup the target installs a PostgreSQL trigger on each listed table; the trigger copies the pre-change row into a `__snapshots` table (keyed by block number and primary key). On a fork the target replays these snapshots in reverse, restoring pre-fork state automatically. Writing to a table not in `tables` raises a runtime error. Snapshotting only fires for blocks at or above the current finalized head — historical blocks can never be reorged. **Advisory lock.** Every batch acquires `pg_try_advisory_xact_lock(hashtext(id))` inside the transaction, preventing concurrent writers on the same stream. Two `drizzleTarget` instances sharing the same `id` will serialize correctly; two with different `id`s run independently. **Retention.** Snapshot rows below `min(current, finalizedHead) - unfinalizedBlocksRetention` are deleted every 25 batches. Set this to cover your network's worst-case reorg depth. **Rollback hooks.** `onBeforeRollback` and `onAfterRollback` receive `{ tx, cursor }` and run inside the fork transaction. Use them to perform additional cleanup that the snapshot mechanism cannot cover (e.g., rows in tables not tracked by `tables`). ## Async iterator When consuming a pipeline with `for await...of` instead of `pipeTo`, the native `[Symbol.asyncIterator]()` always calls `read()` with no cursor — it has no way to accept one. The stream therefore starts from `range.from` on every run. **Finalized streams.** If the stream only consumes already-finalized blocks (no forks possible), rebuilding the stream with `range.from` set to the stored cursor is sufficient: ```typescript theme={"system"} let cursor = loadCursor() // BlockCursor | undefined const stream = evmPortalStream({ id: 'my-pipeline', portal: '...', outputs: evmDecoder({ // cursor.number is the last processed block; resume from the next one range: { from: cursor ? cursor.number + 1 : 0 }, }), }) for await (const { data, ctx } of stream) { await processData(data) saveCursor(ctx.stream.state.current) // { number, hash, timestamp } } ``` Save `ctx.stream.state.current` — the full `BlockCursor` of the batch's last block — not just the number. The `hash` is needed if you later switch to real-time or need the cursor as a fork anchor. **Real-time streams.** Setting `range.from` to a stored number loses the block hash. On restart the first request carries no `parentBlockHash`, so fork detection is silently disabled for that request. For real-time streams, use the `pipeToIterator` helper from the [async iteration tab of the fork handling guide](./fork-handling), which accepts an `initialCursor` and passes it directly to `read()` inside `pipeTo`: ```typescript theme={"system"} const stream = pipeToIterator( evmPortalStream({ id: 'my-pipeline', portal: '...', outputs: evmDecoder({ range: { from: 'latest' } }) }), loadCursor(), // full BlockCursor with hash — passed to read(), not range.from onFork, ) for await (const { data, ctx } of stream) { await processData(data) saveCursor(ctx.stream.state.current) } ``` `pipeToIterator` preserves `parentBlockHash` across fork rounds because it uses `pipeTo` internally. On a fresh first run, pass `undefined` as `initialCursor` and the stream begins from `range.from` as normal. ## Custom cursor management When using `createTarget` directly, you own the full cursor lifecycle. At the start of `write`, fetch the stored cursor and pass it to `read`: ```typescript theme={"system"} write: async ({ read }) => { const cursor = await db.getLatestCursor() for await (const { data, ctx } of read(cursor)) { // ... } } ``` After processing each batch, persist the cursor together with the fork-recovery state: ```typescript theme={"system"} await db.transaction(async (tx) => { await writeData(tx, data) await tx.saveCursor({ cursor: ctx.stream.state.current, rollbackChain: ctx.stream.state.rollbackChain, finalized: ctx.stream.head.finalized, }) }) ``` For **transactional stores** (Postgres): save all three fields in the same transaction as the data write. For **non-transactional stores** (ClickHouse): write data first, cursor last, and implement a startup check that detects and corrects any data written after the last cursor save. See [state rollback atomicity](./fork-handling#4-state-rollback-atomicity). The `fork` callback and the algorithm for resolving rollback cursors from stored history are covered in detail in the [fork handling guide](./fork-handling). A minimal example showing manual cursor passing in createTarget Full pipeline with onRollback and onData Full pipeline including GraphQL API # Fork handling Source: https://docs.sqd.dev/en/sdk/pipes-sdk/evm/guides/architecture-deep-dives/fork-handling Handle blockchain forks and rollbacks in real-time streams When consuming a real-time stream near the chain head, the portal can detect that the client's view of the chain has diverged from the canonical chain — a situation known as a fork or reorg. The portal signals this with an HTTP 409 response containing a sample of blocks from the new canonical chain. Your code must find the highest block that both chains agree on, roll back any state written after that point, and replay from there. Fork handling is only needed for real-time streams (`range.from: 'latest'`). Historical streams consume already-finalized data and never produce forks. See [Fork detection scope](#7-fork-detection-scope-real-time-streams-only) below. The SDK provides two patterns for consuming a stream. Both use the same state-tracking logic; they differ in how the fork signal is delivered. If your pipeline includes a [stateful transformer](../advanced-topics/stateful-transforms#fork-callbacks-in-stateful-transformers), it must also implement a `fork` callback to roll back its own state in lockstep with the target. The `pipeTo(createTarget({write, fork}))` pattern keeps fork handling completely separate from batch processing. The SDK catches the 409 internally and calls `fork()` with the portal's consensus block sample; `write()` never sees the interruption and continues iterating batches without restarting. Two variables span the lifetime of the stream: ```typescript theme={"system"} let recentUnfinalizedBlocks: BlockCursor[] = [] let finalizedHighWatermark: BlockCursor | undefined ``` `recentUnfinalizedBlocks` is the local history of unfinalized blocks used to find the common ancestor during a fork. `finalizedHighWatermark` tracks the highest finalized block ever seen — stored as a full `BlockCursor` (number **and** hash) so it can double as a rollback cursor when needed. Both must be declared outside `pipeTo` so `fork()` can access them. Inside `write()`, append each batch's unfinalized blocks to the local history: ```typescript theme={"system"} ctx.stream.state.rollbackChain.forEach((bc) => { recentUnfinalizedBlocks.push(bc) }) ``` `ctx.stream.state.rollbackChain` contains only the blocks from **this batch** that are above the current finalized head — it is a per-batch delta, not a full snapshot. Always append to the end; never replace or reorder. After collecting history, prune blocks that are now finalized and cap the queue: ```typescript theme={"system"} if (ctx.stream.head.finalized) { if (!finalizedHighWatermark || ctx.stream.head.finalized.number > finalizedHighWatermark.number) { finalizedHighWatermark = ctx.stream.head.finalized } recentUnfinalizedBlocks = recentUnfinalizedBlocks.filter(b => b.number >= finalizedHighWatermark!.number) } recentUnfinalizedBlocks = recentUnfinalizedBlocks.slice(recentUnfinalizedBlocks.length - 1000) ``` Portal instances behind a load balancer can report different finalized heads. Using the **maximum** seen so far (the high-water mark) prevents the pruning threshold from moving backwards when the stream reconnects to a lagging instance. See [consideration 6](#6-load-balanced-portals-and-a-non-monotonic-finalized-head) for details. `fork()` receives `previousBlocks` — the portal's current-chain sample — and must return the last good block cursor, or `null` if recovery is impossible: ```typescript theme={"system"} fork: async (newConsensusBlocks) => { const rollbackIndex = findRollbackIndex(recentUnfinalizedBlocks, newConsensusBlocks) if (rollbackIndex >= 0) { recentUnfinalizedBlocks.length = rollbackIndex + 1 return recentUnfinalizedBlocks[rollbackIndex] } if (finalizedHighWatermark && newConsensusBlocks.every(b => b.number < finalizedHighWatermark!.number)) { recentUnfinalizedBlocks = recentUnfinalizedBlocks.filter(b => b.number <= finalizedHighWatermark!.number) return finalizedHighWatermark } return null } ``` Three cases: (1) a common ancestor is found in local history — truncate and return it; (2) all `previousBlocks` fall below the finalized high-water mark, meaning the portal's sample doesn't reach local history — return the high-water mark cursor; (3) no recovery possible — return `null`, which surfaces a `ForkCursorMissingError`. ```typescript theme={"system"} import { BlockCursor, createTarget } from '@subsquid/pipes' import { evmPortalStream, evmDecoder, commonAbis } from '@subsquid/pipes/evm' async function main() { let recentUnfinalizedBlocks: BlockCursor[] = [] let finalizedHighWatermark: BlockCursor | undefined await evmPortalStream({ id: 'forks', portal: 'https://portal.sqd.dev/datasets/ethereum-mainnet', outputs: evmDecoder({ contracts: ['0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48'], // USDC events: { transfer: commonAbis.erc20.events.Transfer }, range: { from: 'latest' } }), }) .pipeTo(createTarget({ write: async ({read}) => { for await (const {data, ctx} of read(recentUnfinalizedBlocks[recentUnfinalizedBlocks.length-1])) { console.log(`Got ${data.transfer.length} transfers`) ctx.stream.state.rollbackChain.forEach((bc) => { recentUnfinalizedBlocks.push(bc) }) if (ctx.stream.head.finalized) { if (!finalizedHighWatermark || ctx.stream.head.finalized.number > finalizedHighWatermark.number) { finalizedHighWatermark = ctx.stream.head.finalized } recentUnfinalizedBlocks = recentUnfinalizedBlocks.filter(b => b.number >= finalizedHighWatermark!.number) } recentUnfinalizedBlocks = recentUnfinalizedBlocks.slice(recentUnfinalizedBlocks.length - 1000) } }, fork: async (newConsensusBlocks) => { const rollbackIndex = findRollbackIndex(recentUnfinalizedBlocks, newConsensusBlocks) if (rollbackIndex >= 0) { recentUnfinalizedBlocks.length = rollbackIndex + 1 return recentUnfinalizedBlocks[rollbackIndex] } if (finalizedHighWatermark && newConsensusBlocks.every(b => b.number < finalizedHighWatermark!.number)) { recentUnfinalizedBlocks = recentUnfinalizedBlocks.filter(b => b.number <= finalizedHighWatermark!.number) return finalizedHighWatermark } return null } })) } main().then(() => { console.log('\ndone') }) function findRollbackIndex(chainA: BlockCursor[], chainB: BlockCursor[]): number { let aIndex = 0, bIndex = 0, lastCommonIndex = -1 while (aIndex < chainA.length && bIndex < chainB.length) { const a = chainA[aIndex], b = chainB[bIndex] if (a.number < b.number) { aIndex++; continue } if (a.number > b.number) { bIndex++; continue } if (a.hash !== b.hash) return lastCommonIndex lastCommonIndex = aIndex; aIndex++; bIndex++ } return lastCommonIndex } ``` The native `[Symbol.asyncIterator]()` on a `PortalSource` cannot handle forks that require multiple 409 rounds. After a fork, the only option with native iteration is to re-create the stream — but the re-created stream's first request carries no `parentBlockHash`, so the portal cannot detect whether the client is still on the wrong chain and will not send the second 409. The root cause: `pipeTo`'s internal `read()` generator maintains a `cursor` variable across fork rounds. After `target.fork()` returns a rollback cursor it sets `cursor = forkedCursor` before re-entering `self.read(cursor)`, keeping `parentBlockHash` populated on every subsequent request. The native async iterator calls `this.read()` with no cursor and has no equivalent mechanism. **Workaround:** wrap `pipeTo` in a helper called `pipeToIterator` that bridges its push-based `write()` into a pull-based iterator via a single-item queue with producer acknowledgement. This preserves the `for await...of` interface while using `pipeTo`'s cursor-tracking machinery internally. Same two variables as the `pipeTo` approach — no extra `resumeCursor` needed, since `pipeTo` handles cursor updates internally: ```typescript theme={"system"} let recentUnfinalizedBlocks: BlockCursor[] = [] let finalizedHighWatermark: BlockCursor | undefined ``` The fork callback passed to `pipeToIterator` is identical to `fork()` in the `pipeTo` example — the same three-case logic, the same state mutations: ```typescript theme={"system"} async (newConsensusBlocks) => { const rollbackIndex = findRollbackIndex(recentUnfinalizedBlocks, newConsensusBlocks) if (rollbackIndex >= 0) { recentUnfinalizedBlocks.length = rollbackIndex + 1 return recentUnfinalizedBlocks[rollbackIndex] } if (finalizedHighWatermark && newConsensusBlocks.every(b => b.number < finalizedHighWatermark!.number)) { recentUnfinalizedBlocks = recentUnfinalizedBlocks.filter(b => b.number <= finalizedHighWatermark!.number) return finalizedHighWatermark } return null } ``` The SDK awaits this callback before resuming the stream, so `recentUnfinalizedBlocks` is safe to mutate here without additional locking. Pass the source, the initial cursor, and the fork callback to `pipeToIterator`, then iterate normally: ```typescript theme={"system"} const stream = pipeToIterator(source, recentUnfinalizedBlocks.at(-1), onFork) for await (const {data, ctx} of stream) { // batch processing — identical to the pipeTo example } ``` ```typescript theme={"system"} // WORKAROUND — see explanation above the tab function pipeToIterator( source: { pipeTo(t: ReturnType>): Promise }, initialCursor: BlockCursor | undefined, onFork: (previousBlocks: BlockCursor[]) => Promise ): AsyncIterableIterator<{ data: T; ctx: any }> { type Slot = | { k: 'batch'; v: { data: T; ctx: any } } | { k: 'end' } | { k: 'error'; err: unknown } const queue: Slot[] = [] let consumerWake: (() => void) | null = null let producerAck: (() => void) | null = null const wake = () => { consumerWake?.(); consumerWake = null } ;(source.pipeTo as any)(createTarget({ write: async ({ read }: any) => { for await (const batch of read(initialCursor)) { queue.push({ k: 'batch', v: batch }) wake() await new Promise(r => { producerAck = r }) } queue.push({ k: 'end' }) wake() }, fork: onFork, })).catch((err: unknown) => { queue.push({ k: 'error', err }); wake() }) return { async next(): Promise> { if (!queue.length) await new Promise(r => { consumerWake = r }) const slot = queue.shift()! if (slot.k === 'end') return { done: true, value: undefined as any } if (slot.k === 'error') throw slot.err producerAck?.(); producerAck = null return { done: false, value: slot.v } }, [Symbol.asyncIterator]() { return this }, } } ``` ```typescript theme={"system"} import { BlockCursor, createTarget } from '@subsquid/pipes' import { evmPortalStream, evmDecoder, commonAbis } from '@subsquid/pipes/evm' // WORKAROUND — pipeToIterator defined above (see implementation expandable) async function main() { let recentUnfinalizedBlocks: BlockCursor[] = [] let finalizedHighWatermark: BlockCursor | undefined const stream = pipeToIterator( evmPortalStream({ id: 'forks-async', portal: 'https://portal.sqd.dev/datasets/ethereum-mainnet', outputs: evmDecoder({ contracts: ['0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48'], // USDC events: { transfer: commonAbis.erc20.events.Transfer }, range: { from: 'latest' } }), }), recentUnfinalizedBlocks.at(-1), async (newConsensusBlocks) => { const rollbackIndex = findRollbackIndex(recentUnfinalizedBlocks, newConsensusBlocks) if (rollbackIndex >= 0) { recentUnfinalizedBlocks.length = rollbackIndex + 1 return recentUnfinalizedBlocks[rollbackIndex] } if (finalizedHighWatermark && newConsensusBlocks.every(b => b.number < finalizedHighWatermark!.number)) { recentUnfinalizedBlocks = recentUnfinalizedBlocks.filter(b => b.number <= finalizedHighWatermark!.number) return finalizedHighWatermark } recentUnfinalizedBlocks.length = 0 return null } ) for await (const {data, ctx} of stream) { console.log(`Got ${data.transfer.length} transfers`) ctx.stream.state.rollbackChain.forEach((bc: BlockCursor) => { recentUnfinalizedBlocks.push(bc) }) if (ctx.stream.head.finalized) { if (!finalizedHighWatermark || ctx.stream.head.finalized.number > finalizedHighWatermark.number) { finalizedHighWatermark = ctx.stream.head.finalized } recentUnfinalizedBlocks = recentUnfinalizedBlocks.filter(b => b.number >= finalizedHighWatermark!.number) } recentUnfinalizedBlocks = recentUnfinalizedBlocks.slice(recentUnfinalizedBlocks.length - 1000) } } main().then(() => { console.log('\ndone') }) function findRollbackIndex(chainA: BlockCursor[], chainB: BlockCursor[]): number { let aIndex = 0, bIndex = 0, lastCommonIndex = -1 while (aIndex < chainA.length && bIndex < chainB.length) { const a = chainA[aIndex], b = chainB[bIndex] if (a.number < b.number) { aIndex++; continue } if (a.number > b.number) { bIndex++; continue } if (a.hash !== b.hash) return lastCommonIndex lastCommonIndex = aIndex; aIndex++; bIndex++ } return lastCommonIndex } ``` ## The common-ancestor search Both approaches use the same merge-sort scan. Given two ascending-sorted arrays of `BlockCursor` — local history and the portal's `previousBlocks` — `findRollbackIndex` returns the index in local history of the last entry that both chains agree on (same block number **and** hash): ```typescript theme={"system"} function findRollbackIndex(chainA: BlockCursor[], chainB: BlockCursor[]): number { let aIndex = 0, bIndex = 0, lastCommonIndex = -1 while (aIndex < chainA.length && bIndex < chainB.length) { const a = chainA[aIndex], b = chainB[bIndex] if (a.number < b.number) { aIndex++; continue } if (a.number > b.number) { bIndex++; continue } if (a.hash !== b.hash) return lastCommonIndex // chains diverged here lastCommonIndex = aIndex; aIndex++; bIndex++ } return lastCommonIndex } ``` The scan advances the pointer for the lower-numbered entry until both point to the same block number. A hash mismatch means the chains diverged at this number; `lastCommonIndex` holds the last agreement point. Returning `-1` means no common ancestor was found in the sample. ## Edge cases and considerations **Empty history at stream start.** The rollback chain is built batch-by-batch from `ctx.stream.state.rollbackChain`. Until the first batch arrives the history is empty. A fork arriving before any batch has been processed means `fork()` will find no common ancestor and must return `null`, which the SDK turns into a fatal error. For a long-running process this window is typically acceptable, but it matters for freshly started consumers. **History gaps from fast-moving finalization.** `rollbackChain` in each batch contains only the blocks from *that batch* that are strictly above the current finalized head. A block that was already at or below the finalized head when its batch was fetched will never appear in any rollback chain and will therefore be absent from history. This can leave gaps in the number sequence. Algorithms that assume a contiguous history will fail; always match by both number *and* hash. **No finalized-head info in a batch.** When `batch.head.finalized` is absent, no history is accumulated. On networks or portal deployments that do not yet surface finality data, the rollback chain stays empty indefinitely. On such networks fork recovery is impossible unless unfinalized blocks are tracked through another mechanism. **Ascending order, match by hash *and* number.** The API spec requires matching on both. Matching only by number is wrong — different chains can have the same block number. The array is ordered ascending (lowest number first); the last entry is the most recent block the portal knows about. **`previousBlocks` may have no overlap with local history.** The portal sends a bounded sample. If `findRollbackIndex` finds no agreement point at all (returns -1) and no HWM fallback applies, fork recovery is impossible — return `null`. The SDK will surface a `ForkCursorMissingError`. Do not silently roll back to block 0 or crash. **Multiple consecutive 409s converge to the common ancestor.** These two cases are distinct from each other: when `findRollbackIndex` *does* find an overlap point, the stream rolls back there and resumes. If the true common ancestor is deeper still — because the `previousBlocks` sample only reached partway — the portal detects another mismatch and sends a fresh 409 with an older window, this time closer to the true ancestor. The stream converges over several rounds. `fork()` must be idempotent across these calls; truncating the history array in place handles this correctly, since each call receives a shorter local history. Database-backed approaches must also handle re-entrant rollback calls. **Fork deeper than your history.** If you cap rollback history (e.g. to 1000 blocks), a reorg deeper than the cap is unrecoverable. Choose the cap based on the worst-case reorg depth for your target network. Ethereum mainnet finalizes within \~64 blocks (\~2 epochs), but PoW or pre-finality networks can reorg much deeper. Fail loudly rather than silently replaying from block 0. **The finalized block as the last-resort anchor.** Keep the current finalized block *in* your rollback history even though it is technically not unfinalized. It is the guaranteed safe floor: the portal will never ask you to roll back past it. Having it available means `fork()` can always return a valid cursor for the deepest possible reorg. Pruning with `number > finalized` instead of `number >= finalized` removes this anchor and makes very deep reorgs unrecoverable. **History that never gets pruned.** If the portal never sends a finalized head, rollback history will grow without bound. Apply a block-count cap as a secondary safeguard. **Business state and rollback-chain history must be rolled back atomically.** For databases with transactions (Postgres), both must be updated in the same transaction — a crash between the two leaves `fork()` computing the wrong rollback point. **For non-transactional databases (ClickHouse), atomicity is not achievable; use a crash-recovery callback instead.** Write application data first, write the rollback-chain checkpoint second. A crash after data but before the checkpoint save leaves the checkpoint pointing to the previous batch. On every restart, before the stream resumes, the checkpoint cursor should be read and used to purge any rows written after it — this closes the gap. This is how the Pipes SDK ClickHouse target works: `onRollback` is invoked with `type: 'offset_check'` on every startup so user code can delete the partial batch. Because ClickHouse `DELETE`s are asynchronous and unsafe under concurrent writes, the SDK inserts tombstone rows (`sign = -1`) via `CollapsingMergeTree` instead of issuing true deletes; queries that need to see only live rows must use the `FINAL` modifier. **Rolling back spans multiple batches.** A single reorg can invalidate data written across many batches. Your rollback mechanism must undo *all* rows/documents written after the rollback point, not just the last batch. **Idempotency of re-processing.** After a rollback the stream replays blocks from the rollback cursor forward. Write logic that is not idempotent (e.g. unconditional INSERT instead of UPSERT, incrementing a counter instead of setting it) will corrupt state on replay. Design writes so they are safe to run more than once for the same block. **Side effects that cannot be rolled back.** Database writes can be undone; emails, webhook calls, and Kafka publishes cannot. Either defer all external side effects until the block is finalized, or build a separate reconciliation layer. Treating unfinalized state as permanent is the most common source of production incidents in real-time blockchain consumers. **The cursor returned from `fork()` is inclusive.** Return the last block you consider good; the SDK resumes from `cursor.number + 1`. Off-by-one errors cause either duplicate re-processing or skipped blocks. **The cursor hash must be set.** The SDK sends `parentBlockHash = cursor.hash` in the next request so the portal can detect the next fork. A cursor with a missing hash silently disables fork detection for that request. **The cursor in `write()`'s `read()` call is only the initial startup cursor.** `pipeTo()` handles post-fork cursor updates inside the `read()` generator; `write()` runs continuously through forks and is never restarted by the SDK. The cursor you pass to `read()` is only relevant if `write()` is re-invoked by an external retry mechanism. For in-memory implementations the cursor is effectively always `undefined`. **Process restart loses in-memory rollback history.** An in-memory rollback chain survives forks but not process restarts. After a restart you have no history. For services that must survive restarts, persist the rollback chain alongside application state and restore it on startup. See [Cursor management](./cursor-management) for patterns. **The `X-Sqd-Finalized-Head-Number` header can go backwards.** Portal instances behind a load balancer can be at different heights. When a reconnected stream lands on a lagging instance, the `finalized` value in `batch.head.finalized` may be lower than what was previously reported. Do not use the current batch's finalized number as a pruning threshold directly. **Treat the finalized head as a high-water mark.** Maintain the highest finalized number seen across all batches and key all pruning on that value. For database-backed implementations this is critical: a DELETE keyed on the current (possibly lower) finalized number will over-retain rows on some batches, and under-retain them if the logic is structured the other way. **A 409 from a lagging instance may have `previousBlocks` entirely below the high-water mark.** Two cases: * *All* of `previousBlocks` are strictly below the high-water mark. The lagging instance's sample doesn't reach local history. Because the high-water mark is truly final, every correct instance agrees on it: the fork is somewhere *above* it. Return the high-water mark cursor. This requires storing the finalized head as a full `BlockCursor` (number **and** hash), not just a number — the hash is needed for the next request's `parentBlockHash`. * Some of `previousBlocks` are at or above the high-water mark but no hash match is found. This is a genuine inconsistency at a height the client already considers final. Return `null` and surface the error. **Forks only occur in the real-time (unfinalized) portion of the stream.** The `/finalized-stream` endpoint never returns a 409. Fork handling is only needed when consuming the `/stream` endpoint with `fromBlock` near or at the chain head. If your range is bounded and entirely in the past, you will never see a fork. **`parentBlockHash` is the tripwire.** Every request to the portal includes the hash of the last block the client has seen. A mismatch triggers a 409. Anything that disrupts this — starting from a cursor with a wrong or missing hash, replaying from a checkpoint that has drifted from the chain — will produce spurious fork events. **`rollbackChain` is per-batch, not cumulative.** It contains only the blocks in *this batch* that are above the current finalized head. Treat it as a delta to append to running history, not as a full snapshot of the current unfinalized chain. **Blocks near the finality boundary move between finalized and unfinalized.** A block that appears in one batch's `rollbackChain` may be at or below the finalized head in the next batch. The pruning filter must remove these once they are finalized, or rollback history will slowly fill with blocks that can never be the subject of a reorg. **Empty `rollbackChain` is valid.** It means either (a) the batch contained no blocks above the finalized head, or (b) the finalized head was unknown. Do not treat an empty rollback chain as an error. **Both arrays must be in ascending order.** The merge-sort scan breaks silently if either array is unsorted. Local history is ascending if you always append to the end; `previousBlocks` from the portal is ascending by protocol convention. After a rollback, the truncated history remains ascending. **Gaps in block numbers do not break correctness, only efficiency.** A gap (e.g. blocks 100, 101, 103 — 102 missing because it was already finalized) means a fork at 102 resolves by rolling back to 101. The extra re-processing of 102 is harmless because finalized blocks are immutable. **Duplicate entries break the scan.** If the same block number appears more than once with different hashes in your history, the scan may report the wrong common ancestor. UPSERT rather than INSERT when persisting rollback chain entries to a store. **Hash comparison requires both sides to be non-null.** `BlockCursor.hash` is optional in the type system. If either side is `undefined`, `undefined !== "0x..."` evaluates to `true`, which looks like a fork on a block that may be fine. Always verify hashes are present before comparing. **`fork()` is called synchronously relative to the batch stream.** The SDK awaits `fork()` before resuming the stream. No new batches arrive while `fork()` is running. It is safe to mutate shared state inside `fork()` without additional locking. **`write()` and `fork()` share mutable state without synchronization.** This is safe only because the SDK never calls them concurrently. If you introduce background workers or async tasks that also read or write rollback state, you must add explicit synchronization. **The order in which you update rollback history and application state matters.** If you update application state first and crash before updating rollback history, the next restart will not know how far to roll back. Prefer database transactions that update both atomically, or update rollback history first so a crash leaves you conservative — you can always re-process a block you have already seen. # Pipe anatomy Source: https://docs.sqd.dev/en/sdk/pipes-sdk/evm/guides/basic-development/anatomy How a pipe is put together Pipe anatomy An EVM pipe made with SQD's Pipes SDK consists of: * A **source** - typically made with `evmPortalSource()`. Can have one or more outputs. * **Queries** - tell the source which data has to be retrieved to compute each output. A query is defined by a chain call terminated by `.build()`. Here's an example: ```ts theme={"system"} evmQuery() .addFields({ block: { timestamp: true }, log: { address: true, transactionHash: true }, }) .addLog({ topic0: [ TRANSFER_TOPIC ] }) .build() ``` * **Per-query transforms** (optional) - you can pass data from each query through a chain of simple transforms: ```ts theme={"system"} query .pipe(data => data.map(item => ({ funkyNumber: item.header.timestamp + item.header.number, ...item }))) .pipe(someOtherSimpleTransformCallback) ``` Source object streams the data you get out of each chain of transforms as the value of the corresponding output field. * Making utils that return reusable **query-transform combos** is a very useful pattern. In particular, on EVM it is often convenient to keep retrieval and decoding of event logs in a single module. You can easily make such combos with the `evmDecoder()` function - see the [Handling events](./handling-events) guide. * **Whole pipe transformers** (optional) - use this if you need to compute something based on data originating from multiple queries, or if you need access to per-batch context (cursor, logger, profiler, fork callbacks). Use `createTransformer()` so the SDK can thread cursor and rollback information ([1](../architecture-deep-dives/cursor-management), [2](../architecture-deep-dives/fork-handling)) through your transform: ```ts theme={"system"} import { createTransformer } from '@subsquid/pipes' const enrichTransfers = createTransformer< { transfers: Transfer[]; approvals: Approval[] }, { events: EnrichedEvent[] } >({ transform: ({ transfers, approvals }, ctx) => { ctx.logger.info({ batch: ctx.stream.state.current?.number }, 'enriching') return { events: [ ...transfers.map((t) => ({ kind: 'transfer' as const, ...t })), ...approvals.map((a) => ({ kind: 'approval' as const, ...a })), ], } }, }) evmPortalStream({ /* ... */ }).pipe(enrichTransfers) ``` * Pipe termination: a plain async iterator or a **target**. * If you use the pipe as an async iterator it will throw exceptions if the underlying chain is experiencing reorgs, see [Fork handling](../architecture-deep-dives/fork-handling). * We offer two targets out of the box: * Postgres via Drizzle * ClickHouse You can make your own using [`createTarget()`](../../reference/basic-components/target/create-target). # Developing pipes Source: https://docs.sqd.dev/en/sdk/pipes-sdk/evm/guides/basic-development/flow Typical development workflow for pipes Begin by making sure you know * which on-chain data items (transations, event logs, traces etc) you need; * how you are going to transform these items into usable data, based on your business logic; * what is your preferred mode of consuming the transformed data. Use [Pipes CLI](../../quickstart) to quickly generate a starter project. The process of getting from here to a useful data pipeline follows directly from [Pipe anatomy](./anatomy). ## Adding queries and developing transforms Here are some things to keep in mind as you're developing the heart of your pipeline. ### Writing maintainable pipes Recall that there are two kinds of transforms in Pipes SDK: * **Per-query transforms** work on subsets of raw data. They can be bundled with their queries, making them logically self-contained and easily reusable. * **Whole pipe transforms** process outputs of all per-query transforms at the same time. This unlocks arbitrary data combinations, but it also means that that each such transform might need to be changed whenever any of the upstream transforms changes. For maximum mainatainability you'll have to balance the following two objectives: 1. Aim to push as much of your business logic into per-query transforms. Make the code of any whole pipe transforms as simple as possible. 2. Use ready-made, validated query-transform combos. For example, [evmDecoder()](./handling-events) fetches and decodes contract event logs, supports factory-discovered contracts, and indexed parameter filtering. It'll often be preferable to build your pipeline out of such modules, even if that happens to make whole-pipe transforms slightly more complicated. ### Stateful transforms It's often the case that you need access to some part of the previously processed data to do the transform. For example, to compute a running ERC20 balance from transfers you need to know its value preceeding the current transfer. There are multiple ways to accomplish this in Pipes SDK, each with its advantages and disadvantages. Consult the [Stateful transforms](../advanced-topics/stateful-transforms) guide. ## Writing data ### Postgres and ClickHouse If you need your transformed data in Postgres or ClickHouse, you should already have a basic configuration generated by [Pipes CLI](../../quickstart). If you're working with real-time data, **it is very important to** * **on Postgres** when adding or removing any relevant tables: update the list of tables in the target configuration; * **on ClickHouse** when any data dependencies or structure of the stored data changes: update the `onRollback()` callback. Consult the [Postgres via Drizzle](./targets/postgres-drizzle) and [ClickHouse](./targets/clickhouse) guides. ### Plain iterator A complete pipeline without a `.pipeTo` is a valid async iterator. * The pipeline will produce some logs by default. Disable them by setting `logger: false` when creating the data source. If you're looking to convert an existing standalone pipe into a module in a larger program and wish to get rid of any side effects, consult the [Running bare bones](./running-bare-bones) guide. * If you're working with unfinalized data (default setting of the source), the iterator will throw `ForkException`s on blockchain reorgs. You should catch these and process them correctly. Consult the [fork handling guide](../architecture-deep-dives/fork-handling) for details. Alternatively, configure the data source to use final data only: ```ts theme={"system"} const source = evmPortalSource({ portal: { url: '', finalized: true, }, ... }) ``` * By default, the pipeline is stateless: when re-created it'll restart from the earliest block relevant to any of the queries. If you want the pipeline to persist its sync state between restarts, you'll have to manage the state by yourself. See [Cursor management](../architecture-deep-dives/cursor-management). ### Developing your own target Use the [createTarget() function](../../reference/basic-components/target/create-target). * If you're working with unfinalized data (default setting of the source), you must define a fork handler callback. Consult the [fork handling guide](../architecture-deep-dives/fork-handling) for details. Alternatively, configure the data source to use final data only: ```ts theme={"system"} const source = evmPortalSource({ portal: { url: '', finalized: true, }, ... }) ``` * If you want your pipeline to preserve its sync state between restarts, you'll have to manage this state in your `write` callback. See [Cursor management](../architecture-deep-dives/cursor-management). # Handling contract events Source: https://docs.sqd.dev/en/sdk/pipes-sdk/evm/guides/basic-development/handling-events Fetching and decoding EVM event logs with evmDecoder `evmDecoder()` bundles an EVM log query with a decoding transform into a single reusable module. Pass the result as an output to `evmPortalStream`: ```ts theme={"system"} import { commonAbis, evmDecoder, evmPortalStream } from '@subsquid/pipes/evm' const stream = evmPortalStream({ portal: 'https://portal.sqd.dev/datasets/ethereum-mainnet', outputs: { transfers: evmDecoder({ range: { from: '0' }, contracts: ['0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48'], events: { transfer: commonAbis.erc20.events.Transfer }, }), }, }) ``` `evmDecoder()` can: * Fetch from **specific contracts** — pass an array of addresses to `contracts`. Omit it entirely to receive matching events from every contract on-chain. * Filter by **indexed parameters** — instead of a bare event, supply `{ event, params }` to select only logs where specific indexed arguments match. * Dynamically discover contracts via **factories** — pass a `contractFactory()` to `contracts` instead of a static list. See the [Factory guide](../advanced-topics/factory-transformers). * Handle decode errors with a custom **`onError` callback** instead of letting them propagate. See the [evmDecoder() reference](../../reference/utility-components/evm-decoder) for all parameters. ## Specifying events The `events` parameter maps output field names to event specifications. There are three ways to obtain an event specification. ### `commonAbis` `commonAbis` is a built-in collection of ABI modules for common token standards. Currently it ships one module, `erc20`: ```ts theme={"system"} import { commonAbis, evmDecoder } from '@subsquid/pipes/evm' evmDecoder({ range: { from: 'latest' }, contracts: ['0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48'], events: { transfers: commonAbis.erc20.events.Transfer, approvals: commonAbis.erc20.events.Approval, }, }) ``` See the [`commonAbis` reference](../../reference/utility-components/evm-decoder#commonabis) for the full list of available events and functions. ### Typegen modules `@subsquid/evm-typegen` generates TypeScript ABI modules from JSON ABIs. Each generated module exports typed `events` and `functions` objects, translating Solidity types to TypeScript — event argument types are statically known at compile time, so you get precise type checking and IDE autocompletion across the entire pipeline. **Install the tool:** ```bash theme={"system"} npm install -D @subsquid/evm-typegen ``` **Generate a module from a local JSON ABI file:** ```bash theme={"system"} npx squid-evm-typegen src/abi your-contract.json ``` This creates `src/abi/your-contract.ts`. The tool also accepts a contract address (requires specifying `--chain-id`) or an arbitrary URL. Use events from a generated module exactly as with `commonAbis`: ```ts theme={"system"} import * as usdcAbi from './abi/usdc' evmDecoder({ range: { from: 'latest' }, contracts: ['0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48'], events: { transfers: usdcAbi.events.Transfer, approvals: usdcAbi.events.Approval, }, }) ``` ### Raw JSON via `defineAbi()` `defineAbi()` converts a JSON ABI array to a subsquid ABI module at runtime, with no code generation step. This is the quickest route — useful for one-off scripts or prototypes — but it comes at a cost: when the ABI is loaded from an external JSON file, event argument fields are typed as `any`, since TypeScript cannot inspect the runtime JSON value at compile time. ```ts theme={"system"} import { defineAbi, evmDecoder } from '@subsquid/pipes/evm' import erc20Json from './erc20.json' const erc20 = defineAbi(erc20Json) // event args are `any` evmDecoder({ range: { from: 'latest' }, contracts: ['0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48'], events: { transfers: erc20.events.Transfer }, }) ``` `defineAbi()` also accepts Hardhat and Foundry artifact objects — it reads the `abi` field automatically: ```ts theme={"system"} import artifact from './artifacts/MyContract.json' const myContract = defineAbi(artifact) // reads artifact.abi ``` If you define the ABI inline with `as const`, TypeScript can infer the exact decoded types for scalar fields: ```ts theme={"system"} const erc20 = defineAbi([ { type: 'event', name: 'Transfer', inputs: [ { indexed: true, name: 'from', type: 'address' }, { indexed: true, name: 'to', type: 'address' }, { indexed: false, name: 'value', type: 'uint256' }, ], }, ] as const) // erc20.events.Transfer.decode() returns { from: string, to: string, value: bigint } ``` For projects where full type safety matters end-to-end, prefer the [typegen route](#typegen-modules) instead. # Pipes UI Source: https://docs.sqd.dev/en/sdk/pipes-sdk/evm/guides/basic-development/pipes-ui Live dashboard for monitoring a running pipe Pipes UI is a local web dashboard that connects to a running pipe and visualises its progress, speed, portal query, and profiler breakdown. It reads the metrics server that the SDK exposes on the pipe process — nothing needs to be deployed or hosted.