Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion internal/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,9 @@ func (p *Pipeline) Run() error {
slog.Info("pipeline.discovered", "files", len(files))
logHeapStats("pre_index")

// Use MEMORY journal mode during fresh indexing for faster bulk writes.
// Boost cache to 64 MB and set synchronous = OFF for write throughput.
// WAL mode is preserved so the DB remains crash-safe: the WAL file is
// replayed on next open if the process is killed mid-write.
p.Store.BeginBulkWrite(p.ctx)

wroteData := false
Expand Down
148 changes: 148 additions & 0 deletions internal/store/bulkwrite_crash_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package store

import (
"context"
"os"
"os/exec"
"path/filepath"
"testing"
)

// pragmaVal queries a single PRAGMA and returns its value as a string.
func pragmaVal(t *testing.T, s *Store, pragma string) string {
t.Helper()
var val string
if err := s.DB().QueryRowContext(context.Background(), "PRAGMA "+pragma).Scan(&val); err != nil {
t.Fatalf("PRAGMA %s: %v", pragma, err)
}
return val
}

// TestBulkWriteKeepsWAL asserts that BeginBulkWrite does not exit WAL mode and
// that EndBulkWrite restores synchronous = NORMAL (1).
func TestBulkWriteKeepsWAL(t *testing.T) {
dbPath := filepath.Join(t.TempDir(), "test.db")
s, err := OpenPath(dbPath)
if err != nil {
t.Fatalf("OpenPath: %v", err)
}
defer s.Close()

if err := s.UpsertProject("test", t.TempDir()); err != nil {
t.Fatalf("UpsertProject: %v", err)
}

ctx := context.Background()

// Baseline: WAL mode must be active after open.
if got := pragmaVal(t, s, "journal_mode"); got != "wal" {
t.Errorf("baseline journal_mode = %q, want \"wal\"", got)
}

s.BeginBulkWrite(ctx)

// After BeginBulkWrite: journal mode must still be WAL (fix regression check).
if got := pragmaVal(t, s, "journal_mode"); got != "wal" {
t.Errorf("during bulk write: journal_mode = %q, want \"wal\"", got)
}
// After BeginBulkWrite: synchronous must be OFF (0).
if got := pragmaVal(t, s, "synchronous"); got != "0" {
t.Errorf("during bulk write: synchronous = %q, want \"0\" (OFF)", got)
}

s.EndBulkWrite(ctx)

// After EndBulkWrite: journal mode still WAL.
if got := pragmaVal(t, s, "journal_mode"); got != "wal" {
t.Errorf("after EndBulkWrite: journal_mode = %q, want \"wal\"", got)
}
// After EndBulkWrite: synchronous must be NORMAL (1).
if got := pragmaVal(t, s, "synchronous"); got != "1" {
t.Errorf("after EndBulkWrite: synchronous = %q, want \"1\" (NORMAL)", got)
}
}

// TestCrashHelper is the subprocess body for TestBulkWriteCrashRecovery.
// When CRASH_HELPER_DB is set, it opens the DB, calls BeginBulkWrite, inserts a
// row, and exits via os.Exit(1) — simulating a SIGKILL mid-bulk-write.
// When the env var is absent the test is a no-op (running as part of the normal suite).
func TestCrashHelper(t *testing.T) {
dbPath := os.Getenv("CRASH_HELPER_DB")
if dbPath == "" {
return // not running as subprocess
}
s, err := OpenPath(dbPath)
if err != nil {
os.Exit(2)
}
ctx := context.Background()
_ = s.UpsertProject("crash-test", dbPath)
s.BeginBulkWrite(ctx)
// Insert a node to create write activity before the simulated crash.
_, _ = s.UpsertNode(&Node{
Project: "crash-test",
Label: "Function",
Name: "CrashFunc",
QualifiedName: "crash.CrashFunc",
FilePath: "crash.go",
StartLine: 1,
EndLine: 5,
})
// Simulate crash: exit without calling EndBulkWrite.
os.Exit(1)
}

// TestBulkWriteCrashRecovery forks a subprocess that opens a real file-backed DB,
// calls BeginBulkWrite, inserts a row, then exits via os.Exit(1). The parent then
// reopens the DB and verifies that PRAGMA integrity_check returns "ok".
func TestBulkWriteCrashRecovery(t *testing.T) {
dbPath := filepath.Join(t.TempDir(), "crash.db")

// Pre-create the DB so the schema exists before the subprocess opens it.
s, err := OpenPath(dbPath)
if err != nil {
t.Fatalf("OpenPath (pre-create): %v", err)
}
s.Close()

// Fork subprocess that crashes mid-bulk-write.
cmd := exec.Command(os.Args[0], "-test.run=TestCrashHelper", "-test.v")
cmd.Env = append(os.Environ(), "CRASH_HELPER_DB="+dbPath)
out, err := cmd.CombinedOutput()
t.Logf("subprocess output: %s", out)
// The subprocess must have exited with code 1 (simulated crash via os.Exit(1)).
// Any other outcome — clean exit (0), setup failure (2), or launch error —
// means the crash was never exercised and the rest of the test is meaningless.
if exitErr, ok := err.(*exec.ExitError); !ok || exitErr.ExitCode() != 1 {
t.Fatalf("subprocess did not crash as expected: err=%v, output=%s", err, out)
}

// Reopen the DB — must not error.
s2, err := OpenPath(dbPath)
if err != nil {
t.Fatalf("reopen after crash: %v", err)
}
defer s2.Close()

// Run integrity check — must return "ok".
ctx := context.Background()
var result string
if err := s2.DB().QueryRowContext(ctx, "PRAGMA integrity_check").Scan(&result); err != nil {
t.Fatalf("integrity_check query: %v", err)
}
if result != "ok" {
t.Errorf("DB corrupted after crash: integrity_check = %q, want \"ok\"", result)
}

// Check whether the row inserted by the subprocess survived the crash.
// With synchronous = OFF, SQLite delegates durability to the OS page cache.
// On a true power failure the row may be lost; on a normal process-kill the
// OS typically flushes the cache and the row survives. Either outcome is
// acceptable — this is an intentional trade-off of the bulk-write mode.
// We log the result so test runs surface the actual behavior without failing.
var rowCount int
_ = s2.DB().QueryRowContext(ctx,
"SELECT COUNT(*) FROM nodes WHERE qualified_name = 'crash.CrashFunc'",
).Scan(&rowCount)
t.Logf("row survived crash: %v (synchronous=OFF means this may legitimately be false on power-loss)", rowCount > 0)
}
10 changes: 4 additions & 6 deletions internal/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,19 +217,17 @@ func (s *Store) WALSize() int64 {
return fi.Size()
}

// BeginBulkWrite switches to MEMORY journal mode for faster bulk writes.
// Also boosts cache to 64 MB for write throughput.
// Call EndBulkWrite when done to restore WAL mode and adaptive cache.
// BeginBulkWrite boosts cache to 64 MB for write throughput. WAL mode is kept
// throughout for crash safety — a SIGKILL during indexing will not corrupt the DB.
// Call EndBulkWrite when done to restore synchronous=NORMAL and adaptive cache.
func (s *Store) BeginBulkWrite(ctx context.Context) {
_, _ = s.db.ExecContext(ctx, "PRAGMA journal_mode = MEMORY")
_, _ = s.db.ExecContext(ctx, "PRAGMA synchronous = OFF")
_, _ = s.db.ExecContext(ctx, "PRAGMA cache_size = -65536") // 64 MB
}

// EndBulkWrite restores WAL journal mode, NORMAL synchronous, and adaptive cache.
// EndBulkWrite restores synchronous=NORMAL and the adaptive cache size.
func (s *Store) EndBulkWrite(ctx context.Context) {
_, _ = s.db.ExecContext(ctx, "PRAGMA synchronous = NORMAL")
_, _ = s.db.ExecContext(ctx, "PRAGMA journal_mode = WAL")
s.restoreDefaultCache(ctx)
}

Expand Down