Filesystem store
Make sure you understand and use the chunkSizeMb
setting and the setForceFlush()
store method. Failure to do so may cause the processor to output an empty dataset.
Overview
Squid SDK provides append-only Store
interface implementations for saving the data generated by squids to files. The store is designed primarily for offline analytics. It supports CSV and Parquet files, persisted either locally or to a S3 storage. See the list of Packages below.
The core component of all file-based stores is the Database
class from @subsquid/file-store
. To use it, construct an instance and pass it to processor.run()
. This results in ctx.store
exposing table writers that accept rows of data to be stored.
File-based stores always partition data sets along the "block height" dimension, even when it is not in the schema. The number of blocks per partition is variable: a new partition is written when either
- the internal buffer (size governed by the
chunkSizeMb
Database
constructor option) of the store fills up, or - after any call to batch handler during which there was a call to
setForceFlush()
.
Same failover guarantees as with the Postgres-based store are provided: the processor will roll back to the last successful state after a restart.
Example
Save ERC20 Transfer
events retrieved by EVM processor in transfers.csv
files:
import {EvmBatchProcessor} from '@subsquid/evm-processor'
import * as erc20abi from './abi/erc20'
import {Database, LocalDest} from '@subsquid/file-store'
import {Column, Table, Types} from '@subsquid/file-store-csv'
const processor = /* processor definition */
const dbOptions = {
tables: {
TransfersTable: new Table('transfers.csv', {
from: Column(Types.String()),
to: Column(Types.String()),
value: Column(Types.Numeric())
})
},
dest: new LocalDest('./data'),
chunkSizeMb: 10
}
processor.run(new Database(dbOptions), async (ctx) => {
for (let c of ctx.blocks) {
for (let log of c.logs) {
if (/* the log item is a Transfer we're interested in */) {
let { from, to, value } =
erc20abi.events.Transfer.decode(log)
ctx.store.TransfersTable.write({ from, to, value })
}
}
}
})
The resulting ./data
folder may look like this:
./data/
├── 0000000000-0007688959
│ └── transfers.csv
├── 0007688960-0007861589
│ └── transfers.csv
...
├── 0016753040-0016762029
│ └── transfers.csv
└── status.txt
Each of the folders here contains a little over 10 MBytes of data. status.txt
contains the height of the last indexed block and its hash.
Packages
@subsquid/file-store
is the core package that contains the implementation of Database
for filesystems. At least one file format add-on must be installed alongside it:
- CSV: Supported via
@subsquid/file-store-csv
. - Parquet: An advanced format that works well for larger data sets. Supported via
@subsquid/file-store-parquet
.
Data in either of these formats can be written to
- A local filesystem: Supported by
@subsquid/file-store
out of the box. - A bucket in an Amazon S3-compatible cloud: Supported via
@subsquid/file-store-s3
.
Database
Options
Constructor of the Database
class from file-store
accepts a configuration object as its only argument. Its format is as follows:
DatabaseOptions {
tables: Record<string, Table>
dest: Dest
chunkSizeMb?: number
hooks?: DatabaseHooks<Dest>
}
Here,
Table
is an interface for classes that make table writers, objects that convert in-memory tabular data into format-specific file contents. An implementation ofTable
is available for every file format supported byfile-store
. Consult pages about specific output formats to find out how to defineTable
s.tables
is a mapping from developer-defined string handles toTable
instances. A table writer will be created for eachTable
in this mapping. It will be exposed atctx.store.<tableHandle>
.dest
is an instance ofDest
, an interface for objects that take the properly formatted file contents and write them onto a particular filesystem. An implementation ofDest
is available for every filesystem supported byfile-store
. For local filesystems use theLocalDest
class from the@subsquid/file-store
package and supplynew LocalDest(outputDirectoryName)
here. For other targets consult documentation pages specific to your filesystem choice.chunkSizeMb
governs the size of the internal data buffer. A dataset partition will be written as soon as this buffer fills up, or at the end of the batch if setForceFlush() was called.hooks
are useful for persisting data between batches.
Table Writer Interface
For each Table
supplied via the tables
field of the constructor argument, Database
adds a table writer property to ctx.store
. The writer is exposed at ctx.store.<tableHandle>
, where <tableHandle>
is the key of the Table
instance in the tables
mapping. It has the following methods:
ctx.store.<tableHandle>.write(record: T)
ctx.store.<tableHandle>.writeMany(records: T[])
Here, T
is a Table
implementation-specific data row type. See the documentation pages on specific file formats for details.
These synchronous methods add rows of data to an in-memory buffer and perform no actual filesystem writes. Instead, the write happens automatically when the internal buffer reaches chunkSizeMb
or at the end of the batch during which setForceFlush()
was called. The methods return the table writer instance and can be chained.
For example, with a Database
defined like this:
const db = new Database({
tables: {
TransfersTable: new Table(/* table options */)
},
// ...dest and dataset partitioning options
})
the following calls become available in the batch handler:
processor.run(db, async ctx => {
let record = // row in a format specific to Table implementation
ctx.store.TransfersTable.write(record)
ctx.store.TransfersTable.writeMany([record])
})
setForceFlush()
Both of the following calls
ctx.setForceFlush()
ctx.setForceFlush(true)
set the force flush flag within the store. If the flag is still set at the end of a batch, a dataset partition will be written regardless of how full the data buffer currently is. This is useful e.g. in ensuring that the partition size is not much greater than a constant value when writing data at a low rate:
let blockCount = 0
processor.run(db, async ctx => {
// ...data is transformed and queued for writing
blockCount += ctx.blocks.length
if (blockCount >= 500_000) {
ctx.store.setForceFlush()
}
})
Unset the flag with
ctx.setForceFlush(false)
Hooks
By default, Database
maintains a record of the syncing progress in the status.txt
file. When the processor with a Database
instance starts, it calls the onStateRead()
function that reads the highest reached block from status.txt
on the target filesystem and returns its hash and height. If the file does not exist, the function returns -1 for height and the syncing resumes starting at the next (zeroth/genesis) block.
Syncing status record is updated every time a new partition is written to the dataset: the processor calls onFlush()
which overwrites status.txt
with the new highest reached block.
As a result, the integrity of the data set is guaranteed given the blockchain history up to the point recorded in status.txt
.
The functions onStateRead()
and onStateUpdate()
can be overridden using the hooks
constructor argument field. To do that, set that field to
DatabaseHooks<Dest> {
onStateRead(dest: Dest): Promise<HashAndHeight | undefined>
onStateUpdate(dest: Dest, info: HashAndHeight): Promise<void>
}
Parameters:
- dest: the
Dest
object used byDatabase
. Use it to access the filesystem. - info: a
{height: number, hash: string}
object.
Overriding these functions can be useful for transferring some processor state between batches reliably. A basic example of using hooks
can be found here.