Skip to main content

Filesystem store

warning

Make sure you understand and use the chunkSizeMb setting and the setForceFlush() store method. Failure to do so may cause the processor to output an empty dataset.

Overview

Squid SDK provides append-only Store interface implementations for saving the data generated by squids to files. The store is designed primarily for offline analytics. It supports CSV and Parquet files, persisted either locally or to a S3 storage. See the list of Packages below.

The core component of all file-based stores is the Database class from @subsquid/file-store. To use it, construct an instance and pass it to processor.run(). This results in ctx.store exposing table writers that accept rows of data to be stored.

File-based stores always partition data sets along the "block height" dimension, even when it is not in the schema. The number of blocks per partition is variable: a new partition is written when either

Same failover guarantees as with the Postgres-based store are provided: the processor will roll back to the last successful state after a restart.

Example

Save ERC20 Transfer events retrieved by EVM processor in transfers.csv files:

import {EvmBatchProcessor} from '@subsquid/evm-processor'
import * as erc20abi from './abi/erc20'
import {Database, LocalDest} from '@subsquid/file-store'
import {Column, Table, Types} from '@subsquid/file-store-csv'

const processor = /* processor definition */

const dbOptions = {
tables: {
TransfersTable: new Table('transfers.csv', {
from: Column(Types.String()),
to: Column(Types.String()),
value: Column(Types.Numeric())
})
},
dest: new LocalDest('./data'),
chunkSizeMb: 10
}

processor.run(new Database(dbOptions), async (ctx) => {
for (let c of ctx.blocks) {
for (let log of c.logs) {
if (/* the log item is a Transfer we're interested in */) {
let { from, to, value } =
erc20abi.events.Transfer.decode(log)
ctx.store.TransfersTable.write({ from, to, value })
}
}
}
})

The resulting ./data folder may look like this:

./data/
├── 0000000000-0007688959
│   └── transfers.csv
├── 0007688960-0007861589
│   └── transfers.csv
...
├── 0016753040-0016762029
│   └── transfers.csv
└── status.txt

Each of the folders here contains a little over 10 MBytes of data. status.txt contains the height of the last indexed block and its hash.

Packages

@subsquid/file-store is the core package that contains the implementation of Database for filesystems. At least one file format add-on must be installed alongside it:

Data in either of these formats can be written to

  • A local filesystem: Supported by @subsquid/file-store out of the box.
  • A bucket in an Amazon S3-compatible cloud: Supported via @subsquid/file-store-s3.

Database Options

Constructor of the Database class from file-store accepts a configuration object as its only argument. Its format is as follows:

DatabaseOptions {
tables: Record<string, Table>
dest: Dest
chunkSizeMb?: number
hooks?: DatabaseHooks<Dest>
}

Here,

  • Table is an interface for classes that make table writers, objects that convert in-memory tabular data into format-specific file contents. An implementation of Table is available for every file format supported by file-store. Consult pages about specific output formats to find out how to define Tables.
  • tables is a mapping from developer-defined string handles to Table instances. A table writer will be created for each Table in this mapping. It will be exposed at ctx.store.<tableHandle>.
  • dest is an instance of Dest, an interface for objects that take the properly formatted file contents and write them onto a particular filesystem. An implementation of Dest is available for every filesystem supported by file-store. For local filesystems use the LocalDest class from the @subsquid/file-store package and supply new LocalDest(outputDirectoryName) here. For other targets consult documentation pages specific to your filesystem choice.
  • chunkSizeMb governs the size of the internal data buffer. A dataset partition will be written as soon as this buffer fills up, or at the end of the batch if setForceFlush() was called.
  • hooks are useful for persisting data between batches.

Table Writer Interface

For each Table supplied via the tables field of the constructor argument, Database adds a table writer property to ctx.store. The writer is exposed at ctx.store.<tableHandle>, where <tableHandle> is the key of the Table instance in the tables mapping. It has the following methods:

ctx.store.<tableHandle>.write(record: T)
ctx.store.<tableHandle>.writeMany(records: T[])

Here, T is a Table implementation-specific data row type. See the documentation pages on specific file formats for details.

These synchronous methods add rows of data to an in-memory buffer and perform no actual filesystem writes. Instead, the write happens automatically when the internal buffer reaches chunkSizeMb or at the end of the batch during which setForceFlush() was called. The methods return the table writer instance and can be chained.

For example, with a Database defined like this:

const db = new Database({
tables: {
TransfersTable: new Table(/* table options */)
},
// ...dest and dataset partitioning options
})

the following calls become available in the batch handler:

processor.run(db, async ctx => {
let record = // row in a format specific to Table implementation
ctx.store.TransfersTable.write(record)
ctx.store.TransfersTable.writeMany([record])
})

setForceFlush()

Both of the following calls

ctx.setForceFlush()

ctx.setForceFlush(true)

set the force flush flag within the store. If the flag is still set at the end of a batch, a dataset partition will be written regardless of how full the data buffer currently is. This is useful e.g. in ensuring that the partition size is not much greater than a constant value when writing data at a low rate:

let blockCount = 0

processor.run(db, async ctx => {
// ...data is transformed and queued for writing
blockCount += ctx.blocks.length
if (blockCount >= 500_000) {
ctx.store.setForceFlush()
}
})

Unset the flag with

ctx.setForceFlush(false)

Hooks

By default, Database maintains a record of the syncing progress in the status.txt file. When the processor with a Database instance starts, it calls the onStateRead() function that reads the highest reached block from status.txt on the target filesystem and returns its hash and height. If the file does not exist, the function returns -1 for height and the syncing resumes starting at the next (zeroth/genesis) block.

Syncing status record is updated every time a new partition is written to the dataset: the processor calls onFlush() which overwrites status.txt with the new highest reached block.

As a result, the integrity of the data set is guaranteed given the blockchain history up to the point recorded in status.txt.

The functions onStateRead() and onStateUpdate() can be overridden using the hooks constructor argument field. To do that, set that field to

DatabaseHooks<Dest> {
onStateRead(dest: Dest): Promise<HashAndHeight | undefined>
onStateUpdate(dest: Dest, info: HashAndHeight): Promise<void>
}

Parameters:

  • dest: the Dest object used by Database. Use it to access the filesystem.
  • info: a {height: number, hash: string} object.

Overriding these functions can be useful for transferring some processor state between batches reliably. A basic example of using hooks can be found here.

Is this page useful?