> ## Documentation Index
> Fetch the complete documentation index at: https://docs.sqd.dev/llms.txt
> Use this file to discover all available pages before exploring further.

<AgentInstructions>

## Submitting Feedback

If you encounter incorrect, outdated, or confusing documentation on this page, submit feedback:

POST https://docs.sqd.dev/feedback

```json
{
  "path": "/en/sdk/pipes-sdk/solana/guides/basic-development/targets/clickhouse",
  "feedback": "Description of the issue"
}
```

Only submit feedback when you have something specific and actionable to report.

</AgentInstructions>

# ClickHouse

> Store pipe output in ClickHouse

Install the ClickHouse Node.js client:

```bash theme={"system"}
npm install @clickhouse/client
```

At a glance, the pipeline looks like this:

```ts theme={"system"}
import { createClient } from '@clickhouse/client'
import { clickhouseTarget } from '@subsquid/pipes/targets/clickhouse'

await solanaPortalSource({ ... }).pipeTo(
  clickhouseTarget({
    client: createClient({ url: 'http://localhost:8123' }),
    onData: async ({ store, data }) => {
      store.insert({ table: 'swaps', values: data.swap.map(...), format: 'JSONEachRow' })
    },
    onRollback: async ({ store, safeCursor }) => {
      await store.removeAllRows({ tables: ['swaps'], where: `slot > ${safeCursor.number}` })
    },
  }),
)
```

## Table design

Use `CollapsingMergeTree` with a `sign Int8 DEFAULT 1` column. This engine enables efficient fork rollbacks: to cancel rows, the target re-inserts them with `sign = -1` and ClickHouse merges the pair during background processing.

```sql theme={"system"}
CREATE TABLE IF NOT EXISTS swaps (
  slot              UInt64 CODEC(DoubleDelta, ZSTD),
  instruction_index UInt32,
  program_id        String,
  sign              Int8 DEFAULT 1
) ENGINE = CollapsingMergeTree(sign)
  ORDER BY (slot, instruction_index);
```

Design notes:

* Apply `DoubleDelta + ZSTD` codecs to monotonically increasing columns such as block numbers and timestamps.
* Use `LowCardinality` for columns with low cardinality like addresses to reduce storage and speed up filtering.
* Store 256-bit integers as `UInt256`; serialize JavaScript `BigInt` values to strings before insertion.

Create the table in `onStart` using `store.command()`:

```ts theme={"system"}
onStart: async ({ store }) => {
  await store.command({ query: `CREATE TABLE IF NOT EXISTS transfers ( ... )` })
}
```

## `onData`

Call `store.insert()` to queue an insert. The call is non-blocking — inserts fire concurrently and are fully flushed when the target closes:

```ts theme={"system"}
onData: async ({ store, data }) => {
  store.insert({
    table: 'swaps',
    values: data.swap.map((d) => ({
      slot:              d.block.slot,
      instruction_index: d.instruction.instructionIndex,
      program_id:        d.programId,
    })),
    format: 'JSONEachRow',
  })
}
```

## `onRollback`

Implement `onRollback` to handle blockchain forks. It is invoked in two situations:

* `type: 'offset_check'` — on startup, when a saved cursor is found, to discard writes from a previous crashed or partial run
* `type: 'blockchain_fork'` — when the stream detects a chain reorganisation

Use `store.removeAllRows()` to cancel rows past the safe point. For `CollapsingMergeTree` tables this re-inserts matching rows with `sign = -1`:

```ts theme={"system"}
onRollback: async ({ store, safeCursor }) => {
  await store.removeAllRows({
    tables: ['swaps'],
    where: `slot > ${safeCursor.number}`,
  })
}
```

## Complete example

```ts expandable theme={"system"}
import { solanaInstructionDecoder, solanaPortalSource } from '@subsquid/pipes/solana'
import { clickhouseTarget } from '@subsquid/pipes/targets/clickhouse'
import { createClient } from '@clickhouse/client'

const client = createClient({ url: 'http://localhost:8123' })

await solanaPortalSource({
  portal: 'https://portal.sqd.dev/datasets/solana-mainnet',
  outputs: solanaInstructionDecoder({
    range: { from: '340000000' },
    programId: '...',
    instructions: { swap: swapInstruction },
  }),
}).pipeTo(
  clickhouseTarget({
    client,
    onStart: async ({ store }) => {
      await store.command({
        query: `
          CREATE TABLE IF NOT EXISTS swaps (
            slot              UInt64 CODEC(DoubleDelta, ZSTD),
            instruction_index UInt32,
            program_id        String,
            sign              Int8 DEFAULT 1
          ) ENGINE = CollapsingMergeTree(sign)
            ORDER BY (slot, instruction_index)
        `,
      })
    },
    onData: async ({ store, data }) => {
      store.insert({
        table: 'swaps',
        values: data.swap.map((d) => ({
          slot:              d.block.slot,
          instruction_index: d.instruction.instructionIndex,
          program_id:        d.programId,
        })),
        format: 'JSONEachRow',
      })
    },
    onRollback: async ({ store, safeCursor }) => {
      await store.removeAllRows({
        tables: ['swaps'],
        where: `slot > ${safeCursor.number}`,
      })
    },
  }),
)
```

## Docker setup

```yaml docker-compose.yml theme={"system"}
services:
  clickhouse:
    image: clickhouse/clickhouse-server:latest
    ports:
      - "8123:8123"
      - "9000:9000"
    environment:
      CLICKHOUSE_DB: default
      CLICKHOUSE_USER: default
      CLICKHOUSE_PASSWORD: default
    volumes:
      - clickhouse-data:/var/lib/clickhouse

volumes:
  clickhouse-data:
```

```bash theme={"system"}
docker compose up -d
```

See the [clickhouseTarget reference](../../../reference/basic-components/target/clickhouse) for the full API.
