Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions sei-tendermint/internal/autobahn/autobahn.proto
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,6 @@ message PersistedInner {
optional FullTimeoutVote timeout_vote = 6;
}

// Wrapper for persisted data with sequence number.
message PersistedWrapper {
optional uint64 seq = 1;
optional bytes data = 2;
}

// Persisted availability prune anchor (AppQC + matching CommitQC pair).
// Stored atomically in an A/B file; used as the crash-recovery pruning watermark.
message PersistedAvailPruneAnchor {
Expand Down
19 changes: 10 additions & 9 deletions sei-tendermint/internal/autobahn/avail/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"testing"
"time"

"google.golang.org/protobuf/proto"

"github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/consensus/persist"
"github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/data"
pb "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/pb"
Expand Down Expand Up @@ -759,14 +759,15 @@ func TestNewStateWithPersistence(t *testing.T) {
dir := t.TempDir()
ds := data.NewState(&data.Config{Committee: committee}, utils.None[data.BlockStore]())

// Write a valid PersistedWrapper whose Data payload is garbage.
// This simulates corruption at the application data level while
// keeping the outer A/B wrapper intact.
seq := uint64(1)
wrapper := &pb.PersistedWrapper{Seq: &seq, Data: []byte("not a valid protobuf")}
bz, err := proto.Marshal(wrapper)
// Create a throwaway persister to discover the A/B filenames,
// then corrupt them so NewState fails on load.
_, _, err := persist.NewPersister[*pb.PersistedAvailPruneAnchor](utils.Some(dir), innerFile)
require.NoError(t, err)
entries, err := os.ReadDir(dir)
require.NoError(t, err)
require.NoError(t, persist.WriteRawFile(dir, innerFile, bz))
for _, e := range entries {
require.NoError(t, os.WriteFile(filepath.Join(dir, e.Name()), []byte("corrupt"), 0600))
}

_, err = NewState(keys[0], ds, utils.Some(dir))
require.Error(t, err)
Expand Down
156 changes: 90 additions & 66 deletions sei-tendermint/internal/autobahn/consensus/persist/persist.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,13 @@
package persist

import (
"encoding/binary"
"errors"
"fmt"
"hash/crc32"
"os"
"path/filepath"

"google.golang.org/protobuf/proto"

"github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/pb"
"github.com/sei-protocol/sei-chain/sei-tendermint/internal/protoutils"
"github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils"
)
Expand All @@ -52,11 +51,13 @@ const (
suffixB = "_b.pb"
)

// WriteRawFile writes raw bytes to the A file for a given prefix.
// Intended for tests that need to simulate corruption from outside the package.
func WriteRawFile(dir, prefix string, data []byte) error {
return os.WriteFile(filepath.Join(dir, prefix+suffixA), data, 0600)
}
var crc32c = crc32.MakeTable(crc32.Castagnoli)

const (
crcSize = 4 // CRC32-C prefix length
seqSize = 8 // uint64 little-endian
headerSize = crcSize + seqSize // file header: [4-byte CRC32-C BE][8-byte seq LE]
)

// ErrNoData is returned by loadPersisted when no persisted files exist for the prefix.
var ErrNoData = errors.New("no persisted data")
Expand All @@ -67,6 +68,12 @@ var ErrNoData = errors.New("no persisted data")
// I/O errors) are NOT wrapped with ErrCorrupt and cause loadPersisted to fail.
var ErrCorrupt = errors.New("corrupt persisted data")

// dataWithSeq is the unit stored in each A/B file: a sequence number and a proto payload.
type dataWithSeq struct {
seq uint64
data []byte // nil on fresh start
}

// Persister[T] is a strongly-typed persister for a proto message type.
type Persister[T protoutils.Message] interface {
Persist(T) error
Expand All @@ -82,7 +89,7 @@ func newNoOpPersister[T protoutils.Message]() Persister[T] {
}

// abPersister writes data to A/B files with automatic seq management.
// Uses PersistedWrapper protobuf for crash-safe persistence.
// File format: [4-byte CRC32-C BE] [8-byte seq LE] [proto-marshalled message].
// Only created when config has a state dir; dir is always a valid path.
// File selection is derived from seq: odd seq → A, even seq → B.
type abPersister[T protoutils.Message] struct {
Expand Down Expand Up @@ -121,13 +128,13 @@ func NewPersister[T protoutils.Message](dir utils.Option[string], prefix string)
_ = probe.Close()
_ = os.Remove(probe.Name())

wrapper, err := loadPersisted(d, prefix)
ds, err := loadPersisted(d, prefix)
if err != nil && !errors.Is(err, ErrNoData) {
return nil, none, err
}

// Ensure both A/B files exist and are writable so Persist never creates new
// directory entries. Empty files are treated as non-existent by loadWrapped,
// directory entries. Empty files are treated as non-existent by loadFile,
// so they won't interfere with loading on restart.
for _, suffix := range []string{suffixA, suffixB} {
path := filepath.Join(d, prefix+suffix)
Expand All @@ -147,10 +154,9 @@ func NewPersister[T protoutils.Message](dir utils.Option[string], prefix string)
_ = df.Close()
}

// wrapper is nil on fresh start (ErrNoData); protobuf Get methods return zero values for nil.
var loaded utils.Option[T]
if bz := wrapper.GetData(); bz != nil {
msg, err := protoutils.Unmarshal[T](bz)
if ds.data != nil {
msg, err := protoutils.Unmarshal[T](ds.data)
if err != nil {
return nil, none, fmt.Errorf("unmarshal persisted %s: %w", prefix, err)
}
Expand All @@ -159,13 +165,11 @@ func NewPersister[T protoutils.Message](dir utils.Option[string], prefix string)
return &abPersister[T]{
dir: d,
prefix: prefix,
seq: wrapper.GetSeq(),
seq: ds.seq,
}, loaded, nil
}

// Persist writes a proto message to persistent storage with seq wrapper.
// Not safe for concurrent use.
// Returns error on marshal or write failure.
// Persist writes a proto message to persistent storage. Not safe for concurrent use.
func (w *abPersister[T]) Persist(msg T) error {
data := protoutils.Marshal(msg)
seq := w.seq + 1
Expand All @@ -175,61 +179,57 @@ func (w *abPersister[T]) Persist(msg T) error {
if seq%2 == 1 {
suffix = suffixA
}
filename := w.prefix + suffix

wrapper := &pb.PersistedWrapper{
Seq: &seq,
Data: data,
}
bz, err := proto.Marshal(wrapper)
if err != nil {
return fmt.Errorf("marshal wrapper: %w", err)
}

if err := writeAndSync(filepath.Join(w.dir, filename), bz); err != nil {
filename := w.prefix + suffix
if err := writeFile(filepath.Join(w.dir, filename), dataWithSeq{seq: seq, data: data}); err != nil {
return fmt.Errorf("persist to %s: %w", filename, err)
}
w.seq = seq
return nil
}

// loadWrapped loads a wrapped file, returning the PersistedWrapper proto.
// Returns os.ErrNotExist when the file does not exist (caller can use errors.Is).
// Returns other error on read or unmarshal failure. loadPersisted calls loadWrapped
// for both A and B and only fails when both fail; one corrupt file is tolerated.
// stateDir must be an existing directory (we do not create it).
func loadWrapped(stateDir, filename string) (*pb.PersistedWrapper, error) {
// loadFile reads a single A/B file and returns its contents as a dataWithSeq.
// Returns os.ErrNotExist when the file does not exist.
// Returns ErrCorrupt on CRC mismatch or truncated header.
// OS-level errors (permission denied, I/O) are returned unwrapped.
func loadFile(stateDir, filename string) (dataWithSeq, error) {
path := filepath.Join(stateDir, filename)
bz, err := os.ReadFile(path) //nolint:gosec // path is constructed from operator-configured stateDir + hardcoded filename suffix; no user-controlled input
if errors.Is(err, os.ErrNotExist) {
return nil, os.ErrNotExist
return dataWithSeq{}, os.ErrNotExist
}
if err != nil {
// OS-level read error (permission denied, I/O error, etc.) —
// not wrapped with ErrCorrupt so loadPersisted propagates it.
return nil, fmt.Errorf("read %s: %w", filename, err)
return dataWithSeq{}, fmt.Errorf("read %s: %w", filename, err)
}
// Treat empty files as non-existent. A valid wrapper must contain at least
// a seq number. Empty files are created by NewPersister to pre-populate
// directory entries so that Persist never needs to dir-sync.
// Empty files are created by NewPersister to pre-populate directory entries.
if len(bz) == 0 {
return nil, os.ErrNotExist
return dataWithSeq{}, os.ErrNotExist
}
if len(bz) < headerSize {
return dataWithSeq{}, fmt.Errorf("%s: truncated (len %d < header %d): %w", filename, len(bz), headerSize, ErrCorrupt)
}
var wrapper pb.PersistedWrapper
if err := proto.Unmarshal(bz, &wrapper); err != nil {
return nil, fmt.Errorf("unmarshal %s: %w", filename, fmt.Errorf("%v: %w", err, ErrCorrupt))

wantCRC := binary.BigEndian.Uint32(bz[:crcSize])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: crc computation seems to belong rather in writeFile imo (which would simplify the function signature - currently is accepts crc32 which it assumes to be computed correctly)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea, done

payload := bz[crcSize:]
if got := crc32.Checksum(payload, crc32c); got != wantCRC {
return dataWithSeq{}, fmt.Errorf("%s: crc32 mismatch (got %08x, want %08x): %w", filename, got, wantCRC, ErrCorrupt)
}

seq := binary.LittleEndian.Uint64(payload[:seqSize])
if seq == 0 {
return dataWithSeq{}, fmt.Errorf("%s: zero seq: %w", filename, ErrCorrupt)
}
return &wrapper, nil
return dataWithSeq{seq: seq, data: payload[seqSize:]}, nil
}

// loadPersisted loads persisted data for the given directory and prefix.
// Tries both A and B files; if one is corrupt (e.g. crash during write), the other is used
// so the validator can restart. Returns ErrNoData when no persisted files exist (use errors.Is).
// Returns other error only when both files fail to load or state is inconsistent (same seq).
func loadPersisted(dir string, prefix string) (*pb.PersistedWrapper, error) {
func loadPersisted(dir string, prefix string) (dataWithSeq, error) {
fileA, fileB := prefix+suffixA, prefix+suffixB
wrapperA, errA := loadWrapped(dir, fileA)
wrapperB, errB := loadWrapped(dir, fileB)
a, errA := loadFile(dir, fileA)
b, errB := loadFile(dir, fileB)

// Fail fast on OS-level errors (permission denied, I/O errors).
// Only ErrNotExist (fresh start) and ErrCorrupt (crash mid-write) are tolerable.
Expand All @@ -247,45 +247,69 @@ func loadPersisted(dir string, prefix string) (*pb.PersistedWrapper, error) {
logger.Warn("corrupt state file", "file", fe.file, "err", fe.err)
continue
}
return nil, fmt.Errorf("load %s: %w", fe.file, fe.err)
return dataWithSeq{}, fmt.Errorf("load %s: %w", fe.file, fe.err)
}

switch {
case errA == nil && errB == nil:
switch {
case wrapperA.GetSeq() > wrapperB.GetSeq():
return wrapperA, nil
case wrapperB.GetSeq() > wrapperA.GetSeq():
return wrapperB, nil
case a.seq > b.seq:
return a, nil
case b.seq > a.seq:
return b, nil
default:
return nil, fmt.Errorf("corrupt state: both %s and %s have same seq; remove %s if acceptable", fileA, fileB, fileB)
return dataWithSeq{}, fmt.Errorf("corrupt state: both %s and %s have same seq; remove %s if acceptable", fileA, fileB, fileB)
}
case errA == nil:
return wrapperA, nil
return a, nil
case errB == nil:
return wrapperB, nil
return b, nil
default:
if errors.Is(errA, os.ErrNotExist) && errors.Is(errB, os.ErrNotExist) {
return nil, ErrNoData
return dataWithSeq{}, ErrNoData
}
return nil, fmt.Errorf("no valid state: %s: %v; %s: %v", fileA, errA, fileB, errB)
return dataWithSeq{}, fmt.Errorf("no valid state: %s: %v; %s: %v", fileA, errA, fileB, errB)
}
}

// writeAndSync writes data to a file path and fsyncs. No dir sync needed because
// NewPersister pre-creates both A/B files at startup.
// writeAndSync atomically replaces path contents with data (O_TRUNC) and fsyncs.
// Used by WAL persistence (blocks, commitqcs).
func writeAndSync(path string, data []byte) error {
f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0600) //nolint:gosec // path is stateDir + hardcoded suffix; not user-controlled
if err != nil {
return err
}
defer func() { _ = f.Close() }()
if _, err := f.Write(data); err != nil {
_ = f.Close()
return err
}
if err := f.Sync(); err != nil {
_ = f.Close()
return f.Sync()
}

// writeFile writes an A/B state file: [4-byte CRC32-C BE][8-byte seq LE][proto data].
// Encodes seq and computes CRC internally; writes chunks directly to avoid
// copying data into an intermediate buffer. The file is fsynced before return.
func writeFile(path string, d dataWithSeq) error {
f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0600) //nolint:gosec // path is stateDir + hardcoded suffix; not user-controlled
if err != nil {
return err
}
return f.Close()
defer func() { _ = f.Close() }()

var seqBuf [seqSize]byte
binary.LittleEndian.PutUint64(seqBuf[:], d.seq)

// hash.Hash.Write never returns an error.
h := crc32.New(crc32c)
_, _ = h.Write(seqBuf[:])
_, _ = h.Write(d.data)

var crcBuf [crcSize]byte
binary.BigEndian.PutUint32(crcBuf[:], h.Sum32())
for _, chunk := range [][]byte{crcBuf[:], seqBuf[:], d.data} {
if _, err := f.Write(chunk); err != nil {
return err
}
}
return f.Sync()
}
Loading
Loading