> ## Documentation Index
> Fetch the complete documentation index at: https://docs.sqd.dev/llms.txt
> Use this file to discover all available pages before exploring further.

# Cursor management

> How pipelines track progress and resume after restarts

A cursor records the last successfully processed slot. Built-in targets (ClickHouse, Drizzle) handle persistence automatically. When using `createTarget` directly you own the full lifecycle.

## The cursor object

```typescript theme={"system"}
type BlockCursor = {
  number: number      // stream resumes from number + 1
  hash?: string       // slot hash — used as parentBlockHash for fork detection
  timestamp?: number  // slot timestamp in seconds
}
```

`hash` is the fork detection tripwire: the SDK sends `parentBlockHash = cursor.hash` in each portal request. An absent hash silently skips fork detection for that request. See [cursor semantics](./fork-handling#5-cursor-semantics) for the full picture.

## Startup: range.from and stored cursors

`range.from` in the decoder sets where the stream begins on a first run — before any cursor exists:

```typescript theme={"system"}
solanaInstructionDecoder({
  range: { from: 'latest' },           // chain head
  // range: { from: 200_000_000 },     // slot number
  // range: { from: '2024-01-01' },    // ISO date string
  // range: { from: new Date() },      // Date object
})
```

Once a cursor is stored, `range.from` is ignored — the stream resumes from `cursor.number + 1`.

## The stream id

The `id` on `solanaPortalStream` is the primary key for all stored state:

```typescript theme={"system"}
solanaPortalStream({ id: 'my-pipeline', ... })
```

Both built-in targets use it to isolate state records, so multiple streams can share one physical table. **Never rename an active stream's id** — the stored cursor is keyed on it, and renaming causes the pipeline to restart from `range.from`.

## ClickHouse target

`clickhouseTarget` saves the cursor after every successful `onData` call and resolves fork and crash-recovery callbacks automatically.

```typescript theme={"system"}
clickhouseTarget({
  client,
  settings: {
    id: 'my-stream',      // overrides solanaPortalStream id (default: 'stream')
    table: 'sync',        // state table name (default: 'sync')
    database: 'default',  // ClickHouse database
    maxRows: 10_000,      // cursor rows to keep per stream id (default: 10,000)
  },
  onData: ...,
  onRollback: ...,
})
```

**`onRollback` is called in two situations:**

* `type: 'offset_check'` — on every startup when a cursor exists. ClickHouse is non-transactional: a crash between `onData` and the cursor save leaves rows newer than the saved cursor. Delete them here. See [non-transactional databases](./fork-handling#4-state-rollback-atomicity).
* `type: 'blockchain_fork'` — when the portal signals a reorg. The rollback cursor is resolved automatically from stored history; your callback only needs to delete rows after `safeCursor.number`.

The same implementation typically serves both:

```typescript theme={"system"}
onRollback: async ({ store, safeCursor }) => {
  await store.removeAllRows({
    tables: ['my_table'],
    where: `slot_number > {n:UInt32}`,
    params: { n: safeCursor.number },
  })
},
```

**State table.** Each row stores the cursor, the last finalized slot, and the unfinalized slot history used for fork recovery. Rows beyond `maxRows` are pruned every 25 saves. Set `maxRows` to cover your network's worst-case reorg depth — see [rollback depth](./fork-handling#3-rollback-depth-and-history-limits).

## Drizzle target

`drizzleTarget` saves the cursor inside the same PostgreSQL transaction as the data write — fully atomic, no crash-recovery pass needed.

```typescript theme={"system"}
drizzleTarget({
  db: drizzle(DB_URL),
  tables: [swapsTable],       // every table onData writes to — required
  settings: {
    state: {
      id: 'my-stream',
      schema: 'public',
      table: 'sync',
      unfinalizedBlocksRetention: 1000,  // cursor rows to keep (default: 1,000)
    },
    transaction: { isolationLevel: 'serializable' },  // default
  },
  onData: async ({ tx, data }) => {
    await tx.insert(swapsTable).values(...)
  },
})
```

**`tables` is required** for every table written in `onData`. At startup the target installs a PostgreSQL trigger on each listed table; the trigger copies the pre-change row into a `<name>__snapshots` table (keyed by slot number and primary key). On a fork the target replays these snapshots in reverse, restoring pre-fork state automatically. Writing to a table not in `tables` raises a runtime error.

Snapshotting only fires for slots at or above the current finalized head — historical slots can never be reorged.

**Advisory lock.** Every batch acquires `pg_try_advisory_xact_lock(hashtext(id))` inside the transaction, preventing concurrent writers on the same stream. Two `drizzleTarget` instances sharing the same `id` will serialize correctly; two with different `id`s run independently.

**Retention.** Snapshot rows below `min(current, finalizedHead) - unfinalizedBlocksRetention` are deleted every 25 batches. Set this to cover your network's worst-case reorg depth.

**Rollback hooks.** `onBeforeRollback` and `onAfterRollback` receive `{ tx, cursor }` and run inside the fork transaction. Use them to perform additional cleanup that the snapshot mechanism cannot cover (e.g., rows in tables not tracked by `tables`).

## Async iterator

When consuming a pipeline with `for await...of` instead of `pipeTo`, the native `[Symbol.asyncIterator]()` always calls `read()` with no cursor — it has no way to accept one. The stream therefore starts from `range.from` on every run.

**Finalized streams.** If the stream only consumes already-finalized slots (no forks possible), rebuilding the stream with `range.from` set to the stored cursor is sufficient:

```typescript theme={"system"}
let cursor = loadCursor()  // BlockCursor | undefined

const stream = solanaPortalStream({
  id: 'my-pipeline',
  portal: '...',
  outputs: solanaInstructionDecoder({
    // cursor.number is the last processed slot; resume from the next one
    range: { from: cursor ? cursor.number + 1 : 0 },
  }),
})

for await (const { data, ctx } of stream) {
  await processData(data)
  saveCursor(ctx.stream.state.current)  // { number, hash, timestamp }
}
```

Save `ctx.stream.state.current` — the full `BlockCursor` of the batch's last slot — not just the number. The `hash` is needed if you later switch to real-time or need the cursor as a fork anchor.

**Real-time streams.** Setting `range.from` to a stored number loses the slot hash. On restart the first request carries no `parentBlockHash`, so fork detection is silently disabled for that request. For real-time streams, use the `pipeToIterator` helper from the [async iteration tab of the fork handling guide](./fork-handling), which accepts an `initialCursor` and passes it directly to `read()` inside `pipeTo`:

```typescript theme={"system"}
const stream = pipeToIterator(
  solanaPortalStream({ id: 'my-pipeline', portal: '...', outputs: solanaInstructionDecoder({ range: { from: 'latest' } }) }),
  loadCursor(),   // full BlockCursor with hash — passed to read(), not range.from
  onFork,
)

for await (const { data, ctx } of stream) {
  await processData(data)
  saveCursor(ctx.stream.state.current)
}
```

`pipeToIterator` preserves `parentBlockHash` across fork rounds because it uses `pipeTo` internally. On a fresh first run, pass `undefined` as `initialCursor` and the stream begins from `range.from` as normal.

## Custom cursor management

When using `createTarget` directly, you own the full cursor lifecycle.

At the start of `write`, fetch the stored cursor and pass it to `read`:

```typescript theme={"system"}
write: async ({ read }) => {
  const cursor = await db.getLatestCursor()
  for await (const { data, ctx } of read(cursor)) {
    // ...
  }
}
```

After processing each batch, persist the cursor together with the fork-recovery state:

```typescript theme={"system"}
await db.transaction(async (tx) => {
  await writeData(tx, data)
  await tx.saveCursor({
    cursor:        ctx.stream.state.current,
    rollbackChain: ctx.stream.state.rollbackChain,
    finalized:     ctx.stream.head.finalized,
  })
})
```

For **transactional stores** (Postgres): save all three fields in the same transaction as the data write. For **non-transactional stores** (ClickHouse): write data first, cursor last, and implement a startup check that detects and corrects any data written after the last cursor save. See [state rollback atomicity](./fork-handling#4-state-rollback-atomicity).

The `fork` callback and the algorithm for resolving rollback cursors from stored history are covered in detail in the [fork handling guide](./fork-handling).

<Card title="Example: cursor management with createTarget" icon="github" href="https://github.com/subsquid-labs/pipes-sdk-docs/blob/master/src/basics/06-cursor-from-target.ts">
  A minimal example showing manual cursor passing in createTarget
</Card>

<Card title="Example: ClickHouse target" icon="github" href="https://github.com/subsquid-labs/pipes-sdk-docs/blob/master/src/basics/08-clickhouse-target.ts">
  Full pipeline with onRollback and onData
</Card>

<Card title="Example: Drizzle / PostgreSQL target" icon="github" href="https://github.com/subsquid-labs/pipes-sdk/blob/main/docs/examples/evm/08.drizzle.example.ts">
  Full pipeline including GraphQL API
</Card>
