> ## Documentation Index
> Fetch the complete documentation index at: https://docs.sqd.dev/llms.txt
> Use this file to discover all available pages before exploring further.

# ClickHouse

> Store pipe output in ClickHouse

Install the ClickHouse Node.js client:

```bash theme={"system"}
npm install @clickhouse/client
```

At a glance, the pipeline looks like this:

```ts theme={"system"}
import { createClient } from '@clickhouse/client'
import { clickhouseTarget } from '@subsquid/pipes/targets/clickhouse'

await evmPortalSource({ ... }).pipeTo(
  clickhouseTarget({
    client: createClient({ url: 'http://localhost:8123' }),
    onData: async ({ store, data }) => {
      store.insert({ table: 'transfers', values: data.transfers.map(...), format: 'JSONEachRow' })
    },
    onRollback: async ({ store, safeCursor }) => {
      await store.removeAllRows({ tables: ['transfers'], where: `block_number > ${safeCursor.number}` })
    },
  }),
)
```

## Table design

Use `CollapsingMergeTree` with a `sign Int8 DEFAULT 1` column. This engine enables efficient fork rollbacks: to cancel rows, the target re-inserts them with `sign = -1` and ClickHouse merges the pair during background processing.

```sql theme={"system"}
CREATE TABLE IF NOT EXISTS transfers (
  block_number     UInt32  CODEC(DoubleDelta, ZSTD),
  transaction_hash String,
  log_index        UInt16,
  from_address     LowCardinality(FixedString(42)),
  to_address       LowCardinality(FixedString(42)),
  value            UInt256,
  sign             Int8 DEFAULT 1
) ENGINE = CollapsingMergeTree(sign)
  ORDER BY (block_number, transaction_hash, log_index);
```

Design notes:

* Apply `DoubleDelta + ZSTD` codecs to monotonically increasing columns such as block numbers and timestamps.
* Use `LowCardinality` for columns with low cardinality like addresses to reduce storage and speed up filtering.
* Store 256-bit integers as `UInt256`; serialize JavaScript `BigInt` values to strings before insertion.

Create the table in `onStart` using `store.command()`:

```ts theme={"system"}
onStart: async ({ store }) => {
  await store.command({ query: `CREATE TABLE IF NOT EXISTS transfers ( ... )` })
}
```

## `onData`

Call `store.insert()` to queue an insert. The call is non-blocking — inserts fire concurrently and are fully flushed when the target closes:

```ts theme={"system"}
onData: async ({ store, data }) => {
  store.insert({
    table: 'transfers',
    values: data.transfers.map((t) => ({
      block_number:     t.block.number,
      transaction_hash: t.rawEvent.transactionHash,
      log_index:        t.rawEvent.logIndex,
      from_address:     t.event.from,
      to_address:       t.event.to,
      value:            t.event.value.toString(),
    })),
    format: 'JSONEachRow',
  })
}
```

## `onRollback`

Implement `onRollback` to handle blockchain forks. It is invoked in two situations:

* `type: 'offset_check'` — on startup, when a saved cursor is found, to discard writes from a previous crashed or partial run
* `type: 'blockchain_fork'` — when the stream detects a chain reorganisation

Use `store.removeAllRows()` to cancel rows past the safe point. For `CollapsingMergeTree` tables this re-inserts matching rows with `sign = -1`:

```ts theme={"system"}
onRollback: async ({ store, safeCursor }) => {
  await store.removeAllRows({
    tables: ['transfers'],
    where: `block_number > ${safeCursor.number}`,
  })
}
```

## Complete example

```ts expandable theme={"system"}
import { commonAbis, evmDecoder, evmPortalSource } from '@subsquid/pipes/evm'
import { clickhouseTarget } from '@subsquid/pipes/targets/clickhouse'
import { createClient } from '@clickhouse/client'

const client = createClient({ url: 'http://localhost:8123' })

await evmPortalSource({
  portal: 'https://portal.sqd.dev/datasets/ethereum-mainnet',
  outputs: evmDecoder({
    range: { from: 'latest' },
    events: { transfers: commonAbis.erc20.events.Transfer },
  }),
}).pipeTo(
  clickhouseTarget({
    client,
    onStart: async ({ store }) => {
      await store.command({
        query: `
          CREATE TABLE IF NOT EXISTS transfers (
            block_number     UInt32  CODEC(DoubleDelta, ZSTD),
            transaction_hash String,
            log_index        UInt16,
            from_address     LowCardinality(FixedString(42)),
            to_address       LowCardinality(FixedString(42)),
            value            UInt256,
            sign             Int8 DEFAULT 1
          ) ENGINE = CollapsingMergeTree(sign)
            ORDER BY (block_number, transaction_hash, log_index)
        `,
      })
    },
    onData: async ({ store, data }) => {
      store.insert({
        table: 'transfers',
        values: data.transfers.map((t) => ({
          block_number:     t.block.number,
          transaction_hash: t.rawEvent.transactionHash,
          log_index:        t.rawEvent.logIndex,
          from_address:     t.event.from,
          to_address:       t.event.to,
          value:            t.event.value.toString(),
        })),
        format: 'JSONEachRow',
      })
    },
    onRollback: async ({ store, safeCursor }) => {
      await store.removeAllRows({
        tables: ['transfers'],
        where: `block_number > ${safeCursor.number}`,
      })
    },
  }),
)
```

## Docker setup

```yaml docker-compose.yml theme={"system"}
services:
  clickhouse:
    image: clickhouse/clickhouse-server:latest
    ports:
      - "8123:8123"
      - "9000:9000"
    environment:
      CLICKHOUSE_DB: default
      CLICKHOUSE_USER: default
      CLICKHOUSE_PASSWORD: default
    volumes:
      - clickhouse-data:/var/lib/clickhouse

volumes:
  clickhouse-data:
```

```bash theme={"system"}
docker compose up -d
```

See the [clickhouseTarget reference](../../../reference/basic-components/target/clickhouse) for the full API.
