Skip to content

Fraccaman/topic0

Repository files navigation

Topic0 - EVM Indexer

A lean, config-driven EVM log indexer in Rust. It indexes only the contracts and events you declare in config.toml — never full blocks — to keep paid RPC usage (Alchemy/QuickNode compute units & credits) as low as possible. Logs are fetched with filtered eth_getLogs, decoded against their ABI at runtime (no codegen), and written as structured, typed rows into Postgres.

Design goals:

  • Minimal RPC spend — filtered eth_getLogs over wide adaptive ranges, block timestamps fetched once and cached, tx data piggybacked on the block fetch, WebSocket subscription at the tip instead of polling. Every call is metered through a per-provider cost model into a spend ledger with a monthly free-quota guard.
  • Fast without spending more — a two-level parallel pipeline (concurrent getLogs ranges × concurrent block/receipt RPCs per range) saturates the provider's rate budget instead of idling between round-trips. Same call count and $spent as the serial path — concurrency changes ordering, not volume.
  • Pay RPC once, ever — raw logs are persisted before decode, so re-decode / resync (new event, bug fix, added column) replays from disk with zero new RPC.
  • Reorg-safe — block hashes tracked per row; reorgs are cheap point deletes + re-index within the confirmation window.
  • Swappable seams — the RPC source, the storage read layer, and the query protocol all sit behind traits. Default wiring is Alchemy/QuickNode + Postgres + GraphQL.
  • Multi-chain — one deployment, chain_id everywhere, one ingest task per chain.

Full design rationale lives in ARCHITECTURE.md.


Architecture

Three services + a Postgres-backed queue. The ingestor fetches filtered logs and enqueues pointers; decode workers read the raw logs, decode them, and upsert typed rows; the query service serves GraphQL over those rows.

flowchart LR
    cfg[config.toml<br/>chains · contracts · ABIs · events]
    rpc[(Alchemy / QuickNode<br/>JSON-RPC + WS)]
    pg[(Postgres)]

    cfg -.loaded by all.-> ING & DEC & API

    subgraph svc[services]
      ING[INGESTOR<br/>per chain<br/>filtered eth_getLogs<br/>ws subscribe]
      DEC[DECODER + WRITER<br/>ABI decode<br/>N workers]
      API[QUERY API<br/>GraphQL R/O]
    end

    rpc -->|logs · blocks · receipts<br/>matched only| ING
    ING -->|raw_* written first| pg
    ING -->|work item pointer| DEC
    DEC -->|read raw| pg
    DEC -->|typed upserts| pg
    pg --> API
Loading

Fetch path (the cost-saver)

Both levels of the diagram run concurrently — range_concurrency getLogs ranges in flight, and aux_concurrency block/receipt RPCs per range — all gated by the same PlanProfile token bucket so throughput fills the rate budget without exceeding it.

flowchart TD
    A[one filter per chain<br/>address: all contracts<br/>topics: union of event sigs] --> B[eth_getLogs<br/>provider-sized range<br/>× range_concurrency in flight]
    B -->|result-cap hit| C[split range in half,<br/>retry each side → 1 block]
    B --> E[distinct matched blocks]
    E --> F[batched eth_getBlockByNumber<br/>full=true → timestamp + txs<br/>× aux_concurrency]
    F --> G[(blocks / transactions<br/>cached, deduped per chain)]
    E --> H[eth_getTransactionReceipt<br/>batched, deduped<br/>× aux_concurrency]
    H --> G
    B & F & H -.rps/CU-gated.-> T[PlanProfile token bucket]
    F -.metered.-> M[CostModel → SpendLedger<br/>$spent · monthly quota guard]
Loading

Queue & decode (parallel across chains, serial within a chain)

sequenceDiagram
    participant I as Ingestor (chain N)
    participant Q as work_queue (Postgres)
    participant W as Decode Worker
    participant DB as Event tables
    I->>Q: raw insert + enqueue (same tx)
    W->>Q: pull_any() — lease oldest, no in-flight for its chain
    Q-->>W: WorkItem {from, to, kind}
    W->>DB: decode raw → upsert typed rows (idempotent PK)
    W->>Q: ack
    Note over Q: FOR UPDATE SKIP LOCKED<br/>competing consumers, at-least-once
Loading

Quick start

cp .env.example .env          # fill ALCHEMY_HTTP / ALCHEMY_WS
cp config.toml.example config.toml

just pg-up                    # throwaway local Postgres on :55432
just migrate                  # ABI → DDL: create event tables
just backfill 19000000 19010000        # index a fixed range on chain 1 (default)
just backfill 19000000 19010000 8453   # …or pass a chain id explicitly
just run 4                    # or: supervisor — ingest all chains + 4 decode workers
just query                    # GraphQL at :8080

Run just with no args to list every recipe.

Common commands

Recipe What it does
just migrate / just migrate-dry Apply / preview ABI→DDL schema diff
just backfill <from> <to> Index a height range (one chain)
just resync <from> <to> Re-decode from raw_*, zero RPC
just follow Track the tip via WS, resume on restart
just run <workers> Supervisor: ingest all chains + in-proc decode pool
just decode <workers> Standalone decode-worker pool (scale-out)
just train-dict Train a zstd dictionary on log data
just refresh-views Recompute the per-contract stats materialized views
just query Start the GraphQL server

Docker compose mirrors these as profiles: just up indexer query, just scale-decode 4, just logs.

Per-contract stats

Two materialized views aggregate transaction activity per chain + contract, built generically from raw_records (matched logs only) joined to blocks — one row per (chain_id, address), no per-contract DDL:

  • mv_contract_stats — total txs and rolling 1/7/30/90-day tx counts.
  • mv_contract_tx_daily — per-day tx counts for the last 30 days.

"txs" = distinct transactions (a tx emitting several matched logs counts once). Windows are relative to refresh time; the views are created by just migrate and recomputed by just refresh-views (the rolling windows therefore reflect the last refresh). They are exposed read-only by the GraphQL server as contractStats and contractTxDaily, each optionally filtered by chainId / address:

{ contractStats(address: "0xA0b8…") { chainId totalTx tx1d tx7d tx30d tx90d } }
{ contractTxDaily(chainId: 1) { address day txCount } }

Configuration

config.toml is the single source of truth — it drives schema (migrate) and every runtime service. Secrets are ${ENV} placeholders, never inline. Copy config.toml.example to start.

[indexer]
log_level         = "info"
batch_size        = 500       # decoder write batch into Postgres
range_concurrency = 4         # getLogs ranges in flight at once (backfill pipeline)
aux_concurrency   = 8         # concurrent block/receipt RPCs per range (rps-gated)
tip_interval_secs = 6         # tip poll cadence (follow); CLI --interval overrides

[database]
url       = "${DATABASE_URL}"
max_conns = 16

[queue]
kind = "postgres"             # work_queue table, FOR UPDATE SKIP LOCKED, polled

[query]
api    = "graphql"
listen = "0.0.0.0:8080"
expose = "finalized"          # finalized | provisional — read visibility

[compression]
level = 3                     # app-side zstd of raw log `data` + tx calldata
# dictionary = "dict.bin"     # optional trained dict — IMMUTABLE once data written

[[chains]]
id            = 1
name          = "ethereum"
confirmations = 12            # unfinalized window for reorg safety
start_block   = 19_000_000

  [chains.source]
  kind = "alchemy"            # alchemy | quicknode | generic_rpc | free_node
  http = "${ALCHEMY_HTTP}"
  ws   = "${ALCHEMY_WS}"

    [chains.source.limits]    # PlanProfile — provider caps, all explicit
    max_rps             = 8
    max_cu_per_sec      = 330
    max_batch           = 100
    max_getlogs_blocks  = 10          # free-tier getLogs range cap
    max_getlogs_results = 10_000
    monthly_quota_cu    = 300_000_000 # free-quota guard in the spend ledger

  [[chains.contracts]]
  address = "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"   # USDC
  abi     = "abis/erc20.json"
  events  = ["Transfer", "Approval"]   # subset of ABI; omit = all events
  # table = "usdc_transfer"            # optional table-name override
  # start_block = 19_500_000           # optional per-contract start override

Add more [[chains]] blocks for multi-chain; each can use a different provider and plan. source.kind picks the LogSource/CostModel impl; [chains.source.limits] sets the plan caps that the client self-tunes against.

Preparing config & ABIs

Steps from zero to an indexable config:

  1. Copy the template.

    cp config.toml.example config.toml
    mkdir -p abis
  2. Get the contract ABI as JSON and drop it in abis/. Sources:

    # from Etherscan (verified contracts) — needs an API key
    curl -s "https://api.etherscan.io/api?module=contract&action=getabi&address=0xA0b8...&apikey=$ETHERSCAN_KEY" \
      | jq -r '.result' > abis/erc20.json
    
    # or from a local Foundry/forge build artifact
    jq '.abi' out/ERC20.sol/ERC20.json > abis/erc20.json

    The file must be the ABI array (or an object with an .abi field). Only event entries are used; functions/constructors are ignored. One ABI can be reused across many contracts (e.g. one erc20.json for every ERC-20).

  3. Declare the chainid, name, confirmations, start_block, and a [chains.source] with kind + ${ENV} endpoints. Set [chains.source.limits] to your provider plan's real caps (see the cost model for your kind).

  4. Declare each contract under [[chains.contracts]]:

    [[chains.contracts]]
    address = "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"  # checksummed or lowercase
    abi     = "abis/erc20.json"          # path relative to config.toml
    events  = ["Transfer", "Approval"]   # by ABI event name; omit = all events
    # table = "usdc_transfer"            # override evt_<contract>_<event>
    # start_block = 19_500_000           # per-contract start override
    • events names must match the ABI exactly. Each event → one table evt_<contract>_<event> (or your table override).
    • The getLogs filter unions all contract addresses + event topic0s per chain → one call covers every contract.
  5. Validate — the migrator parses every ABI and computes the schema diff:

    just migrate-dry        # parse ABIs + print DDL plan, apply nothing
    just migrate            # create the event tables

    A bad ABI path, malformed JSON, or an events name absent from the ABI fails here before any RPC is spent.

Adding a new event or contract later: edit config.toml, drop/extend the ABI, just migrate, then just resync <from> <to> to backfill the new columns from raw_* with zero new RPC.

Environment variables

Copy .env.example to .env. Compose reads it, and config.toml ${VAR} placeholders resolve from it too.

Var Purpose
DATABASE_URL Postgres DSN used by database.url
POSTGRES_USER / POSTGRES_PASSWORD / POSTGRES_DB / POSTGRES_PORT Compose-provisioned DB creds
ALCHEMY_HTTP / ALCHEMY_WS RPC source endpoints referenced by config.toml
RUST_LOG Tracing filter (e.g. info, debug)
CHAIN_ID Default chain for CLI recipes
QUERY_PORT GraphQL server port

Docker Compose

Compose ships the whole stack as profiles — each service starts only when its profile is named, so you compose exactly the pieces you need. All app containers share one image (built from the Dockerfile), mount config.toml read-only, share a dicts volume for the compression dictionary, and read .env.

Profile Service What it does
db postgres Postgres only (:5432, pgdata volume)
migrate migrate One-shot: diff ABIs → apply DDL, then exit
train train-dict One-shot: train a zstd dict from stored raw data, then exit
indexer indexer Supervisor: per-chain ingest loops + in-process decode pool
decode decode Standalone decode-worker pool (scale out)
query query GraphQL read API on :8080

postgres is attached to every app profile and gated by a healthcheck, so any app service waits for the DB to be ready before booting.

Setup

cp .env.example .env                 # fill ALCHEMY_HTTP / ALCHEMY_WS, DB creds
cp config.toml.example config.toml   # mounted read-only into every container
docker compose build                 # or: just docker-build

Run

# 1. apply the schema once (one-shot, exits 0)
docker compose --profile migrate up

# 2. start indexing + the query API
docker compose --profile indexer --profile query up -d
#   equivalently:  just up indexer query

# tail logs / stop
docker compose logs -f               # just logs
docker compose down                  # just down

Scaling decode throughput

For many or fast-blocktime chains, run the ingest supervisor and a separate, horizontally-scaled decode pool (competing consumers of the shared work_queue):

docker compose --profile indexer up -d
docker compose --profile decode up -d --scale decode=4    # just scale-decode 4

WORKERS (default 4) sets the in-process pool size for indexer/decode.

Compression dictionary (optional)

CHAIN_ID=1 docker compose --profile train up    # writes dicts/dict.bin
# then set  [compression] dictionary = "dicts/dict.bin"  in config.toml and restart

The dict lives on the shared dicts volume — the ingestor writes it, decoders read it. It is immutable once data is written under it (changing it breaks decompression of existing rows). It is trained on log data and also compresses tx calldata, since both blobs share the process codec.

Configuration knobs

Compose reads these from .env (see the environment variables table): POSTGRES_*, DATABASE_URL (overridden to point at the postgres service), RUST_LOG, QUERY_PORT, plus WORKERS and CHAIN_ID shown above. App containers get a 45s stop_grace_period so in-flight ranges/decodes drain on SIGTERM.


Development

just build      # cargo build --workspace
just test       # cargo test --workspace
just clippy     # -D warnings
just fmt        # cargo fmt --all
just ci         # fmt-check + clippy + test (what CI runs)

About

Topic0 — Config-driven, cost-efficient EVM log indexer in Rust. Indexes only declared contracts/events, decodes via ABI into typed Postgres rows.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors