consensus: persist AppQC, blocks, and CommitQCs with async persistence#2896
consensus: persist AppQC, blocks, and CommitQCs with async persistence#2896wen-coding merged 3 commits intomainfrom
Conversation
|
The latest Buf updates on your PR. Results from workflow Buf / buf (pull_request).
|
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #2896 +/- ##
==========================================
+ Coverage 58.27% 58.48% +0.21%
==========================================
Files 2077 2113 +36
Lines 171308 175414 +4106
==========================================
+ Hits 99823 102595 +2772
- Misses 62590 63755 +1165
- Partials 8895 9064 +169
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
ebf93df to
f4a9c1e
Compare
05beddb to
2f0bbad
Compare
| } | ||
|
|
||
| // Queue enqueues a block for async persistence. Blocks if the queue is full | ||
| // until space is available or ctx is cancelled. We must not drop blocks because |
There was a problem hiding this comment.
It should be easy to check here that blocks are received in order and return an error if that's not the case (holes are possible, because inner state can skip forward)
| for lane, q := range inner.blocks { | ||
| if inner.nextBlockToPersist[lane] < q.next { | ||
| return true | ||
| } | ||
| } |
Check warning
Code scanning / CodeQL
Iteration over map Warning
| for lane, q := range inner.blocks { | ||
| start := max(inner.nextBlockToPersist[lane], q.first) | ||
| for n := start; n < q.next; n++ { | ||
| b.blocks = append(b.blocks, q.q[n]) | ||
| } | ||
| b.laneFirsts[lane] = q.first | ||
| } |
Check warning
Code scanning / CodeQL
Iteration over map Warning
| for lane, bs := range raw { | ||
| sorted := slices.Sorted(maps.Keys(bs)) | ||
| blocks := make([]LoadedBlock, 0, len(sorted)) | ||
| for _, n := range sorted { | ||
| blocks = append(blocks, LoadedBlock{Number: n, Proposal: bs[n]}) | ||
| } | ||
| result[lane] = blocks | ||
| } |
Check warning
Code scanning / CodeQL
Iteration over map Warning
| if err := pers.commitQCs.DeleteBefore(batch.commitQCFirst); err != nil { | ||
| return fmt.Errorf("commitqc deleteBefore: %w", err) | ||
| } | ||
| s.markCommitQCsPersisted(batch.commitQCs[len(batch.commitQCs)-1]) |
There was a problem hiding this comment.
markCommitQCsPersisted can be called directly after PersistCommitQC loop.
| // innerFile is the A/B file prefix for avail inner state persistence. | ||
| const innerFile = "avail_inner" | ||
|
|
||
| func decodePruneAnchor(a *pb.PersistedAvailPruneAnchor) (*types.AppQC, *types.CommitQC, error) { |
There was a problem hiding this comment.
nit: you might want to add Anchor type and define a full proto.Conv for it.
| return i, nil | ||
| } | ||
|
|
||
| if l.pruneAppQC != nil { |
| i.commitQCs.pushBack(l.commitQCs[0].QC) | ||
| for _, lqc := range l.commitQCs[1:] { | ||
| if lqc.Index != i.commitQCs.next { | ||
| log.Warn(). |
There was a problem hiding this comment.
Given that we persist anchor first now, this should never happen.
There was a problem hiding this comment.
i.e. return error instead of logging
| var lastHash types.BlockHeaderHash | ||
| for j, b := range bs { | ||
| if q.Len() >= BlocksPerLane { | ||
| log.Warn(). |
| Msg("capping loaded blocks at lane capacity") | ||
| break | ||
| } | ||
| if b.Number != q.next { |
| Msg("skipping non-contiguous persisted blocks (orphans will be cleaned up)") | ||
| break | ||
| } | ||
| if j > 0 { |
| if err := pers.pruneAnchor.Persist(anchor); err != nil { | ||
| return fmt.Errorf("persist prune anchor: %w", err) | ||
| } | ||
| commitQC, err := types.CommitQCConv.Decode(anchor.CommitQc) |
There was a problem hiding this comment.
you shouldn't need to re-decode it. It shouldn't be encoded within the batch.
| // collectPersistBatch is in the same goroutine and reads it directly. | ||
| nextBlockToPersist map[types.LaneID]types.BlockNumber | ||
|
|
||
| // persistedBlockStart is the per-lane block number derived from the last |
There was a problem hiding this comment.
nit: this is fine (as we discussed yesterday), but without persistedBlockStart it would work as well, which now I see that it would be strictly less logic. This is just observation, no changes requested.
| // Restore persisted CommitQCs into the queue. Stale entries below the | ||
| // prune anchor have already been filtered by loadPersistedState. | ||
| if len(l.commitQCs) > 0 { | ||
| i.commitQCs.reset(l.commitQCs[0].Index) |
There was a problem hiding this comment.
nit: single i.prune(anchor) call will move all the lane/vote and commitqc queues to the right position (no resets needed) and imo it would clarify the logic here.
| var wantAppQCIdx types.RoadIndex | ||
| var wantNextBlocks map[types.LaneID]types.BlockNumber | ||
|
|
||
| require.NoError(t, scope.Run(context.Background(), func(ctx context.Context, s scope.Scope) error { |
| for lane := range i.blocks { | ||
| i.persistedBlockStart[lane] = anchor.CommitQC.LaneRange(lane).First() | ||
| } |
Check warning
Code scanning / CodeQL
Iteration over map Warning
| for lane, bs := range l.blocks { | ||
| q, ok := i.blocks[lane] | ||
| if !ok || len(bs) == 0 { | ||
| continue | ||
| } | ||
| var lastHash types.BlockHeaderHash | ||
| for j, b := range bs { | ||
| if q.Len() >= BlocksPerLane { | ||
| return nil, fmt.Errorf("lane %s: loaded %d blocks exceeds capacity %d", lane, len(bs), BlocksPerLane) | ||
| } | ||
| if b.Number != q.next { | ||
| return nil, fmt.Errorf("lane %s: non-contiguous persisted blocks: expected %d, got %d", lane, q.next, b.Number) | ||
| } | ||
| if j > 0 { | ||
| if got := b.Proposal.Msg().Block().Header().ParentHash(); got != lastHash { | ||
| return nil, fmt.Errorf("lane %s: parent hash mismatch at block %d", lane, b.Number) | ||
| } | ||
| } | ||
| lastHash = b.Proposal.Msg().Block().Header().Hash() | ||
| q.pushBack(b.Proposal) | ||
| } | ||
| if q.next > q.first { | ||
| i.nextBlockToPersist[lane] = q.next | ||
| } | ||
| } |
Check warning
Code scanning / CodeQL
Iteration over map Warning
| for lane, bs := range blocks { | ||
| first := anchor.CommitQC.LaneRange(lane).First() | ||
| j := 0 | ||
| for j < len(bs) && bs[j].Number < first { | ||
| j++ | ||
| } | ||
| if j > 0 { | ||
| loaded.blocks[lane] = bs[j:] | ||
| } | ||
| } |
Check warning
Code scanning / CodeQL
Iteration over map Warning
528a35b to
da8050a
Compare
da8050a to
58893a4
Compare
- Fix doc comment: stateDir -> PersistentStateDir - Remove len(batch.commitQCs)>0 guard so commitQC DeleteBefore always runs - Clarify empty laneFirsts no-op semantics in BlockPersister.DeleteBefore - Fix WriteRawFile comment: "one of the A/B files" -> "the A file" Made-with: Cursor
Summary
Crash-safe persistence for availability state (AppQC, signed lane proposals, and CommitQCs). All I/O is fully asynchronous — no disk operations on the critical path or under locks.
Ref: sei-protocol/sei-v3#512
Persist layer (
consensus/persist/)Persister[T proto.Message]interface with crash-safe A/B file strategy (abPersister[T]). No-op implementation for test/disabled paths. A/B suffixes unexported;WriteRawFilehelper for corruption tests.BlockPersister(persist/blocks.go): Each signed lane proposal stored as<lane_hex>_<blocknum>.pb. On load, returns all valid files sorted per lane.DeleteBeforeremoves pruned blocks and orphaned lanes from previous committees.CommitQCPersister(persist/commitqcs.go): Each CommitQC stored as<roadindex>.pb. On load, returns all valid files sorted.ResetNextmethod allows the consumer to realign the persist cursor after filtering.DeleteBeforeremoves old CommitQC files.loadPersistedState+newInner) enforces contiguity and returns errors on gaps.Prune anchor (
PersistedAvailPruneAnchor)PruneAnchorGo type +PruneAnchorConv(protoutils.Conv) for encoding/decoding, withutils.Optionfor nullable fields.prune(). The embedded CommitQC ensures lane range information is always available even if the corresponding CommitQC file hasn't been written yet (crash between anchor write and file write).Availability state (
avail/)innerstate directly.collectPersistBatchacquires the lock, waits for new data, reads persistence cursors directly from inner state, clamps past pruned entries, and collects the batch. I/O runs with no lock held. No channel, no backpressure.latestCommitQCis published immediately so consensus can advance without waiting for block writes. Blocks are persisted last. Old data is deleted at the end.nextBlockToPersistupdated after each write, so vote latency equals single-block write time regardless of batch size.latestCommitQCafter writing to disk (or immediately for no-op).PushCommitQCno longer publishes directly — consensus subscribes toLastCommitQC()and won't advance until the QC is durable.RecvBatchonly yields blocks below thenextBlockToPersistwatermark, so votes are only signed for durably written blocks.persistedBlockStart):PushBlock,ProduceBlock,WaitForCapacity, andPushVoteusepersistedBlockStart + BlocksPerLaneas the capacity limit, wherepersistedBlockStartis derived from the last durably persisted prune anchor. This ensures we never admit more blocks than can be recovered after a crash.Restart / state restoration
loadPersistedState(avail/state.go): Decodes the prune anchor and filters out stale commitQCs (below the anchor's road index) and blocks (below the anchor's per-lane range) before passing data tonewInner. This keeps domain filtering close to the I/O layer and simplifiesnewInner.newInner(avail/inner.go): Appliesprune()first when a prune anchor is present — this positions all queues (commitQCs, blocks, votes) at the correct indices so that subsequentpushBackcalls insert at the right position without needingreset(). CommitQCs already pushed byprune()are skipped during loading vialqc.Index < commitQCs.next.newInnerreturns errors (not warnings) for non-contiguous commitQCs, non-contiguous blocks, parent-hash mismatches, and blocks exceedingBlocksPerLanecapacity. Since the anchor is always persisted first and data is written sequentially, any of these indicate corruption or a bug.NewStatecallsDeleteBeforeon both block and commitQC persisters to immediately remove stale files that were filtered out, rather than waiting for the first persist cycle.ResetNextcursor fix: After filtering,NewStatecallspers.commitQCs.ResetNext(inner.commitQCs.next)to realign the persister's cursor. Without this, orphaned files would inflatecp.nextbeyond what was actually loaded, causingPersistCommitQCto reject valid new QCs on restart.prune()advancesnextBlockToPersist: When prune fast-forwards a lane'sblocks.firstpast the persist cursor, the cursor is bumped to prevent busy-looping in the persist goroutine.persistedBlockStartinitialization: On restart, initialized from the prune anchor's CommitQC lane ranges, establishing the capacity limit from the first batch iteration.queue.reset(): No longer needed —prune()handles all queue positioning on startup.Proposal verification hardening (
types/proposal.go)NewProposalrejects callers who aren't the view leader.FullProposal.Verifynow checks the proposer's signature, verifies the proposal's lane structure against the committee, and validates LaneQC header hash matches the lane range'sLastHash.Proposal.Verifyvalidates that every present lane range belongs to the committee.Other changes
data/state.go: Fix off-by-one inPushBlockwait condition (n <= nextQC→n < nextQC). Cap block insertion loop atinner.nextQCto avoid accessing unverified QC entries.data/testonly.go: Use actual leader key for test proposals (required by the newNewProposalleader check).latestCommitQC: RemovednextCommitQCToPersistfield — the cursor is derived from what's already published, which is safe becauselatestCommitQCis only advanced after disk write.t.Context()instead ofcontext.Background()in tests.avail/inner.go,consensus/persist/blocks.go, andconsensus/persist/commitqcs.go(aligns with Remove dependency to zerolog in favour of slog #3024).Test plan
persist/blocks_test.go: load/store, gap returns all files, DeleteBefore, orphaned lane cleanup, header mismatch, corrupt filespersist/commitqcs_test.go: load/store, gap returns all files, DeleteBefore, corrupt files, mismatched index, ResetNextpersist/persist_test.go: A/B file crash safety, seq management, corrupt fallback, OS error propagation, generic typed APIavail/inner_test.go: newInner fresh start, loaded blocks (contiguous, gap error, parent hash mismatch error, over-capacity error, multiple lanes, empty, unknown lane), loaded CommitQCs (no anchor, with AppQC anchor, gap error, gap with anchor error, stale QCs below anchor skipped, all before AppQC pruned, gap after anchor error), anchor with no CommitQC files (crash recovery), anchor prunes block queues, anchor CommitQC used for prune, loaded all three, prune mismatched indices, prune advances nextBlockToPersist, incomplete prune anchor erroravail/state_test.go: fresh start, load AppQC, load blocks, load both, load commitQCs, load commitQCs with AppQC, non-contiguous commitQC files error, corrupt data error, PushBlock rejects bad parent hash, PushBlock rejects wrong signer, PushAppQC mismatchavail/state_test.go(TestStateWithPersistence): end-to-end persist + prune race regression (5 iterations with disk persistence)avail/state_test.go(TestStateRestartFromPersisted): end-to-end persist → stop → restart from same directory, verifies AppQC/CommitQC/block state restored correctlyavail/queue_test.go: newQueue, pushBack, prune, stale prune, prune past nextconsensus/inner_test.go: consensus inner persistence round-trip, persist error propagation via newState injectiondata/state_test.go: data state teststypes/proposal_test.go: proposal verification hardening tests