> ## Documentation Index
> Fetch the complete documentation index at: https://docs.sqd.dev/llms.txt
> Use this file to discover all available pages before exploring further.

# Postgres via Drizzle

> Store pipe output in PostgreSQL using Drizzle ORM

Install Drizzle ORM and the PostgreSQL driver:

```bash theme={"system"}
npm install drizzle-orm pg
npm install -D drizzle-kit @types/pg
```

At a glance, the pipeline looks like this:

```ts theme={"system"}
await evmPortalSource({ ... }).pipeTo(
  drizzleTarget({
    db: drizzle('postgresql://...'),
    tables: [transfersTable],
    onData: async ({ tx, data }) => {
      for (const batch of batchForInsert(data.transfers)) {
        await tx.insert(transfersTable).values(batch.map(...))
      }
    },
  }),
)
```

## Schema

Define your tables with Drizzle ORM. Every table needs a primary key, and every table written to in [`onData`](#ondata) must appear in [`tables`](#tables).

```ts theme={"system"}
import { integer, numeric, pgTable, primaryKey, varchar } from 'drizzle-orm/pg-core'

const transfersTable = pgTable('transfers', {
  blockNumber: integer().notNull(),
  logIndex:    integer().notNull(),
  from:        varchar({ length: 42 }).notNull(),
  to:          varchar({ length: 42 }).notNull(),
  value:       numeric({ mode: 'bigint' }).notNull(),
}, (t) => [primaryKey({ columns: [t.blockNumber, t.logIndex] })])
```

## `onData` and `batchForInsert`

`onData` runs inside a serializable transaction. Use `batchForInsert` to split data arrays into chunks that fit within PostgreSQL's 32,767-parameter limit — chunk size is calculated automatically from the number of columns:

```ts theme={"system"}
import { batchForInsert, drizzleTarget } from '@subsquid/pipes/targets/drizzle/node-postgres'

onData: async ({ tx, data }) => {
  for (const batch of batchForInsert(data.transfers)) {
    await tx.insert(transfersTable).values(
      batch.map((d) => ({
        blockNumber: d.block.number,
        logIndex:    d.rawEvent.logIndex,
        from:        d.event.from,
        to:          d.event.to,
        value:       d.event.value,
      })),
    )
  }
}
```

Pass an explicit second argument to `batchForInsert` to cap chunk size:

```ts theme={"system"}
for (const batch of batchForInsert(data.transfers, 100)) { ... }
```

## `tables`

Every table written to in `onData` must be listed in `tables`. At startup, the target installs PostgreSQL trigger functions on these tables to track row-level changes for automatic fork handling. Inserting into an unlisted table throws at runtime.

## Schema migrations

Use [Drizzle Kit](https://orm.drizzle.team/docs/kit-overview) to generate and apply migrations:

```bash theme={"system"}
npx drizzle-kit generate
npx drizzle-kit migrate
```

Alternatively, run migrations automatically on startup via `onStart`:

```ts theme={"system"}
import { migrate } from 'drizzle-orm/node-postgres/migrator'

drizzleTarget({
  db,
  tables: [...],
  onStart: async ({ db }) => {
    await migrate(db, { migrationsFolder: './drizzle' })
  },
  onData: async ({ tx, data }) => { ... },
})
```

## Rollback handling

Fork handling is fully automatic. Each batch runs inside a transaction that snapshots row-level changes. When the stream detects a fork, the target replays those snapshots in reverse to restore the pre-fork state.

Use `onBeforeRollback` and `onAfterRollback` to run custom logic around a rollback. Both callbacks receive the Drizzle transaction and the `cursor` (`BlockCursor`) to which state was rolled back:

```ts theme={"system"}
drizzleTarget({
  db,
  tables: [...],
  onBeforeRollback: async ({ tx, cursor }) => { /* e.g. log or acquire an external lock */ },
  onAfterRollback:  async ({ tx, cursor }) => { /* e.g. invalidate a cache */ },
  onData: async ({ tx, data }) => { ... },
})
```

## Complete example

```ts expandable theme={"system"}
import { commonAbis, evmDecoder, evmPortalSource } from '@subsquid/pipes/evm'
import { batchForInsert, drizzleTarget } from '@subsquid/pipes/targets/drizzle/node-postgres'
import { drizzle } from 'drizzle-orm/node-postgres'
import { integer, numeric, pgTable, primaryKey, varchar } from 'drizzle-orm/pg-core'

const transfersTable = pgTable('transfers', {
  blockNumber: integer().notNull(),
  logIndex:    integer().notNull(),
  from:        varchar({ length: 42 }).notNull(),
  to:          varchar({ length: 42 }).notNull(),
  value:       numeric({ mode: 'bigint' }).notNull(),
}, (t) => [primaryKey({ columns: [t.blockNumber, t.logIndex] })])

await evmPortalSource({
  portal: 'https://portal.sqd.dev/datasets/ethereum-mainnet',
  outputs: evmDecoder({
    range: { from: '0' },
    events: { transfers: commonAbis.erc20.events.Transfer },
  }),
}).pipeTo(
  drizzleTarget({
    db: drizzle('postgresql://postgres:postgres@localhost:5432/postgres'),
    tables: [transfersTable],
    onData: async ({ tx, data }) => {
      for (const batch of batchForInsert(data.transfers)) {
        await tx.insert(transfersTable).values(
          batch.map((d) => ({
            blockNumber: d.block.number,
            logIndex:    d.rawEvent.logIndex,
            from:        d.event.from,
            to:          d.event.to,
            value:       d.event.value,
          })),
        )
      }
    },
  }),
)
```

See the [drizzleTarget reference](../../../reference/basic-components/target/postgres-drizzle) for the full API.
