> ## Documentation Index
> Fetch the complete documentation index at: https://docs.sqd.dev/llms.txt
> Use this file to discover all available pages before exploring further.

<AgentInstructions>

## Submitting Feedback

If you encounter incorrect, outdated, or confusing documentation on this page, submit feedback:

POST https://docs.sqd.dev/feedback

```json
{
  "path": "/en/sdk/pipes-sdk/evm/reference/basic-components/target/create-target",
  "feedback": "Description of the issue"
}
```

Only submit feedback when you have something specific and actionable to report.

</AgentInstructions>

# createTarget

> API reference for createTarget

Build a custom data sink. A target drains batches from the pipe and is responsible for persisting them.

```ts theme={"system"}
createTarget<In>(config: Target<In>): Target<In>
```

**Config fields:**

* `write`: (required) Async function `({ read, logger }) => Promise<void>`. Iterate the stream by calling `read()` and consuming `{ data, ctx }` batches. The function returns when the stream ends.
* `fork`: (optional) `(previousBlocks: BlockCursor[]) => Promise<BlockCursor | null>`. Called when the source detects a chain reorg. Return the last safe cursor to roll back to, or `null` if no common ancestor can be determined (the stream will throw). See [Fork handling](../../../guides/architecture-deep-dives/fork-handling). You don't need this callback when the source is configured to [read only finalized blocks](../source#finalized-blocks).

## The `write` context

```ts theme={"system"}
type WriteCtx<In> = {
  read: (cursor?: BlockCursor) => AsyncIterableIterator<PortalBatch<In>>
  logger: Logger
}
```

| Field    | Description                                                                                                            |
| -------- | ---------------------------------------------------------------------------------------------------------------------- |
| `read`   | Opens an async iterator over pipeline batches. Pass `cursor` to resume from a specific block the target has persisted. |
| `logger` | Pino-compatible logger scoped to this target.                                                                          |

## Per-batch context (`ctx`)

Each `{ data, ctx }` yielded by `read()` carries the same `BatchContext` that transformers receive. Fields:

| Field                        | Type                        | Description                                                                                                 |
| ---------------------------- | --------------------------- | ----------------------------------------------------------------------------------------------------------- |
| `id`                         | `string`                    | Pipeline ID — the `id` passed to `evmPortalStream()`.                                                       |
| `logger`                     | `Logger`                    | Batch-scoped logger.                                                                                        |
| `metrics`                    | `Metrics`                   | Prometheus metrics registry. See [Metrics](../../../guides/advanced-topics/metrics).                        |
| `profiler`                   | `Profiler`                  | Open a span with `ctx.profiler.start('label')`. See [Profiling](../../../guides/advanced-topics/profiling). |
| `stream.dataset`             | `ApiDataset`                | Dataset metadata.                                                                                           |
| `stream.head.finalized`      | `BlockCursor \| undefined`  | Current finalized head.                                                                                     |
| `stream.head.latest`         | `BlockCursor \| undefined`  | Current unfinalized head.                                                                                   |
| `stream.state.initial`       | `number`                    | First block number the stream was configured to read.                                                       |
| `stream.state.last`          | `number`                    | Last block number the stream intends to read.                                                               |
| `stream.state.current`       | `BlockCursor`               | Latest block in this batch.                                                                                 |
| `stream.state.rollbackChain` | `BlockCursor[]`             | Tail of unfinalized cursors subject to rollback.                                                            |
| `stream.progress`            | `ProgressEvent['progress']` | Progress metrics when `progress` is enabled.                                                                |
| `stream.query`               | `{ url, hash, raw }`        | Portal query details for the batch.                                                                         |
| `batch.blocksCount`          | `number`                    | Number of blocks in this batch.                                                                             |
| `batch.bytesSize`            | `number`                    | Compressed payload size received from the portal.                                                           |
| `batch.requests`             | `Record<number, number>`    | Map of HTTP status code → count of responses that produced this batch.                                      |
| `batch.lastBlockReceivedAt`  | `Date`                      | Wall-clock time the last block was received.                                                                |

## Example

```ts theme={"system"}
const target = createTarget({
  write: async ({ read, logger }) => {
    for await (const { data, ctx } of read()) {
      const span = ctx.profiler.start('save')
      await database.save(data)
      span.end()
      logger.info(
        { block: ctx.stream.state.current.number, rows: ctx.batch.blocksCount },
        'saved batch',
      )
    }
  },
  fork: async (previousBlocks) => {
    // Return a cursor from your persisted state; null to fail hard.
    return previousBlocks[previousBlocks.length - 1] ?? null
  },
})
```

## Resuming from a persisted cursor

Stateful targets typically persist a cursor and resume from it on restart:

```ts theme={"system"}
createTarget({
  write: async ({ read, logger }) => {
    const lastSaved = await database.getCursor() // BlockCursor | undefined
    for await (const { data, ctx } of read(lastSaved)) {
      await database.save(data)
      await database.saveCursor(ctx.stream.state.current)
    }
  },
})
```
