import { solanaInstructionDecoder, solanaPortalSource } from '@subsquid/pipes/solana'
import { batchForInsert, drizzleTarget } from '@subsquid/pipes/targets/drizzle/node-postgres'
import { drizzle } from 'drizzle-orm/node-postgres'
import { bigint, integer, pgTable, primaryKey, varchar } from 'drizzle-orm/pg-core'
const swapsTable = pgTable('swaps', {
slot: bigint({ mode: 'bigint' }).notNull(),
instructionIndex: integer().notNull(),
programId: varchar().notNull(),
}, (t) => [primaryKey({ columns: [t.slot, t.instructionIndex] })])
await solanaPortalSource({
portal: 'https://portal.sqd.dev/datasets/solana-mainnet',
outputs: solanaInstructionDecoder({
range: { from: '340000000' },
programId: '...',
instructions: { swap: swapInstruction },
}),
}).pipeTo(
drizzleTarget({
db: drizzle('postgresql://postgres:postgres@localhost:5432/postgres'),
tables: [swapsTable],
onData: async ({ tx, data }) => {
for (const batch of batchForInsert(data.swap)) {
await tx.insert(swapsTable).values(
batch.map((d) => ({
slot: BigInt(d.block.slot),
instructionIndex: d.instruction.instructionIndex,
programId: d.programId,
})),
)
}
},
}),
)