diff --git a/CHANGELOG.md b/CHANGELOG.md index 0ad6ed09..a3f620a3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,10 @@ ## [Unreleased] +### ๐Ÿ› Bug ไฟฎๅค + +- **Checkpoint ่ฎกๆ•ฐๅ™จๆผ‚็งป** ([#157](https://github.com/Tencent/TencentDB-Agent-Memory/issues/157))๏ผš`total_memories_extracted`ใ€`l0_conversations_count`ใ€`total_processed`ใ€`memories_since_last_persona` ๅ››ไธชๅ…จๅฑ€่ฎกๆ•ฐๅ™จๅชๅขžไธๅ‡๏ผŒ`memory-cleaner` ๆธ…็†ๆˆ–ไบบๅทฅไฟฎๅ‰ช JSONL ๅŽๆฐธไน…้ซ˜ไผฐๅฎž้™…ๆ•ฐๆฎใ€‚ๆ–ฐๅขž `CheckpointManager.recalibrate()`๏ผŒๅœจ Gateway ๅฏๅŠจๆ—ถ็”จ็œŸๅฎžๆ•ฐๆฎ๏ผˆL0/L1 JSONL ่กŒๆ•ฐใ€store `countL0()`๏ผ‰้‡็ฎ—่ฟ™ๅ››ไธช่ฎกๆ•ฐๅ™จ๏ผŒ็บ ๆญฃๆผ‚็งปใ€‚ๅ…ถไธญ `memories_since_last_persona` ๆ กๅ‡†ๅŽไธๅ†่™š้ซ˜่ฏฏ่งฆๅ‘ persona ็”Ÿๆˆใ€‚ๅขž้‡ๆๅ–้—จๆŽงไฝฟ็”จ per-session ๆธธๆ ‡๏ผŒไธ่ฏป่ฟ™ไบ›ๅ…จๅฑ€่ฎกๆ•ฐๅ™จ๏ผŒๆ•…ไธไผšๅ› ๆผ‚็งป่ทณ่ฟ‡่ฎฐๅฝ•ใ€‚ + ### โœจ ๆ–ฐๅŠŸ่ƒฝ - **ๆ—ถๅŒบๅฏ้…็ฝฎ** ([#75](https://github.com/Tencent/TencentDB-Agent-Memory/issues/75) / [#87](https://github.com/Tencent/TencentDB-Agent-Memory/issues/87))๏ผšๆ–ฐๅขž้กถๅฑ‚ `timezone` ้…็ฝฎ้กน๏ผŒๆ”ฏๆŒ IANA ๆ—ถๅŒบๅ๏ผˆ`Asia/Shanghai`ใ€`Europe/Berlin`๏ผ‰ๅ’Œ UTC ๅ็งปไธฒ๏ผˆ`+08:00`ใ€`-05:30`๏ผ‰ใ€‚้ป˜่ฎค `"system"`๏ผˆ่ทŸ้š่ฟ›็จ‹็ณป็ปŸๆ—ถๅŒบ๏ผ‰๏ผŒๅ‡็บง้›ถๆ„Ÿใ€‚ diff --git a/index.ts b/index.ts index 868a7701..06ad6cab 100644 --- a/index.ts +++ b/index.ts @@ -35,6 +35,9 @@ import { registerMemoryTdaiCli } from "./src/cli/index.js"; import { initDataDirectories, resetStores } from "./src/utils/pipeline-factory.js"; import { getOrCreateInstanceId, initReporter, report, resetReporter } from "./src/core/report/reporter.js"; import { ensureL2L3Local } from "./src/core/profile/profile-sync.js"; +import { CheckpointManager } from "./src/utils/checkpoint.js"; +import { countL1JsonlLines, countL1JsonlLinesSince } from "./src/core/record/l1-reader.js"; +import { countL0JsonlStats } from "./src/core/conversation/l0-recorder.js"; // Core abstractions (host-neutral) import { OpenClawHostAdapter } from "./src/adapters/openclaw/host-adapter.js"; @@ -271,7 +274,7 @@ export default function register(api: OpenClawPluginApi) { }); // Initialize TdaiCore (async โ€” store init, pipeline wiring) - const coreReady = core.initialize().then(() => { + const coreReady = core.initialize().then(async () => { // Keep cleaner's SQLite handle updated after store init memoryCleaner?.setVectorStore(core.getVectorStore()); @@ -282,6 +285,49 @@ export default function register(api: OpenClawPluginApi) { api.logger.warn(`${TAG} Startup L2/L3 pull failed (non-fatal): ${err instanceof Error ? err.message : String(err)}`); }); } + + // Startup recalibration: recompute the four global counters against + // authoritative data (issue #157). Must be awaited inside the coreReady + // chain โ€” `agent_end` does `await coreReady` (see below), so the first + // agent_end cannot overtake recalibration. This keeps recalibration + // free of concurrency with the L2 repair path in pipeline-factory + // (which would otherwise Math.max the counters back to stale values). + try { + const cpManager = new CheckpointManager(pluginDataDir, api.logger); + const cp = await cpManager.read(); + const lastPersonaTime = cp.last_persona_time ?? ""; + const [totalMemoriesExtracted, l0Stats, memoriesSinceLastPersona] = await Promise.all([ + countL1JsonlLines(pluginDataDir), + countL0JsonlStats(pluginDataDir), + countL1JsonlLinesSince(pluginDataDir, lastPersonaTime), + ]); + // Degraded fallback: when the store is unavailable, countL0() returns 0 + // and would erase the real value โ€” fall back to JSONL line count. + const vectorStore = core.getVectorStore(); + const totalProcessed = vectorStore && !vectorStore.isDegraded() + ? await vectorStore.countL0() + : l0Stats.lines; + const { was } = await cpManager.recalibrate({ + totalMemoriesExtracted, + l0ConversationsCount: l0Stats.captures, + totalProcessed, + memoriesSinceLastPersona, + }); + if (was.total_memories_extracted !== totalMemoriesExtracted + || was.l0_conversations_count !== l0Stats.captures + || was.total_processed !== totalProcessed + || was.memories_since_last_persona !== memoriesSinceLastPersona) { + api.logger.info( + `${TAG} Checkpoint recalibrated: ` + + `total_memories_extracted ${was.total_memories_extracted}โ†’${totalMemoriesExtracted}, ` + + `l0_conversations_count ${was.l0_conversations_count}โ†’${l0Stats.captures}, ` + + `total_processed ${was.total_processed}โ†’${totalProcessed}, ` + + `memories_since_last_persona ${was.memories_since_last_persona}โ†’${memoriesSinceLastPersona}`, + ); + } + } catch (err) { + api.logger.warn(`${TAG} Checkpoint recalibrate failed (non-fatal): ${err instanceof Error ? err.message : String(err)}`); + } }).catch((err) => { api.logger.error(`${TAG} Core init failed: ${err instanceof Error ? err.message : String(err)}`); }); diff --git a/src/core/conversation/l0-recorder.test.ts b/src/core/conversation/l0-recorder.test.ts new file mode 100644 index 00000000..71776556 --- /dev/null +++ b/src/core/conversation/l0-recorder.test.ts @@ -0,0 +1,164 @@ +import { describe, it, expect, beforeEach, afterEach } from "vitest"; +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; + +import { countL0JsonlStats } from "./l0-recorder.js"; + +/** + * Builds an L0MessageRecord JSON line with the given recordedAt (and optional + * role/content). Fields mirror the L0MessageRecord shape from l0-recorder.ts. + */ +function line(recordedAt: string, opts: { role?: string; content?: string } = {}): string { + return JSON.stringify({ + sessionKey: "s1", + sessionId: "sess1", + recordedAt, + id: `msg_${Math.random().toString(36).slice(2)}`, + role: opts.role ?? "user", + content: opts.content ?? "hello", + timestamp: Date.now(), + }); +} + +/** + * Writes the given raw string content to /conversations/. + * Creates the conversations directory (and baseDir) as needed. + */ +async function writeFile(baseDir: string, filename: string, content: string): Promise { + const dir = path.join(baseDir, "conversations"); + await fs.mkdir(dir, { recursive: true }); + await fs.writeFile(path.join(dir, filename), content, "utf-8"); +} + +describe("countL0JsonlStats", () => { + let baseDir: string; + + beforeEach(async () => { + baseDir = await fs.mkdtemp(path.join(os.tmpdir(), "l0-stats-")); + }); + + afterEach(async () => { + await fs.rm(baseDir, { recursive: true, force: true }); + }); + + // ============================ + // Tracer bullet: basic single-file counting + // ============================ + it("counts one capture and all physical lines from a single file", async () => { + // Two messages sharing one recordedAt => 1 capture, 2 lines + const ts = "2026-06-24T10:00:00.000Z"; + await writeFile(baseDir, "2026-06-24.jsonl", `${line(ts)}\n${line(ts)}\n`); + + const stats = await countL0JsonlStats(baseDir); + expect(stats.captures).toBe(1); + expect(stats.lines).toBe(2); + }); + + // ============================ + // Multiple captures + // ============================ + it("counts distinct recordedAt values as separate captures", async () => { + const ts1 = "2026-06-24T10:00:00.000Z"; + const ts2 = "2026-06-24T11:00:00.000Z"; + const ts3 = "2026-06-24T12:00:00.000Z"; + // 3 different recordedAt => 3 captures, 4 lines + await writeFile(baseDir, "2026-06-24.jsonl", + `${line(ts1)}\n${line(ts2)}\n${line(ts2)}\n${line(ts3)}\n`); + + const stats = await countL0JsonlStats(baseDir); + expect(stats.captures).toBe(3); + expect(stats.lines).toBe(4); + }); + + // ============================ + // Multi-shard (multiple daily files) + // ============================ + it("aggregates across multiple daily shard files", async () => { + const ts1 = "2026-06-23T09:00:00.000Z"; + const ts2 = "2026-06-24T10:00:00.000Z"; + await writeFile(baseDir, "2026-06-23.jsonl", `${line(ts1)}\n${line(ts1)}\n`); + await writeFile(baseDir, "2026-06-24.jsonl", `${line(ts2)}\n`); + + const stats = await countL0JsonlStats(baseDir); + expect(stats.captures).toBe(2); + expect(stats.lines).toBe(3); + }); + + // ============================ + // Invariant: a bad line (missing recordedAt) must not inflate captures + // ============================ + it("counts a line missing recordedAt toward lines but not captures", async () => { + const ts = "2026-06-24T10:00:00.000Z"; + const badLine = JSON.stringify({ sessionKey: "s1", role: "user", content: "no-timestamp", id: "x" }); + await writeFile(baseDir, "2026-06-24.jsonl", `${line(ts)}\n${badLine}\n`); + + const stats = await countL0JsonlStats(baseDir); + expect(stats.lines).toBe(2); + expect(stats.captures).toBe(1); + }); + + it("counts a line with non-string recordedAt toward lines but not captures", async () => { + const ts = "2026-06-24T10:00:00.000Z"; + const badLine = JSON.stringify({ sessionKey: "s1", recordedAt: 12345, role: "user", content: "x", id: "y" }); + await writeFile(baseDir, "2026-06-24.jsonl", `${line(ts)}\n${badLine}\n`); + + const stats = await countL0JsonlStats(baseDir); + expect(stats.lines).toBe(2); + expect(stats.captures).toBe(1); + }); + + it("counts a malformed (unparseable JSON) line toward lines but not captures", async () => { + const ts = "2026-06-24T10:00:00.000Z"; + const garbage = "{not valid json"; + await writeFile(baseDir, "2026-06-24.jsonl", `${line(ts)}\n${garbage}\n`); + + const stats = await countL0JsonlStats(baseDir); + expect(stats.lines).toBe(2); + expect(stats.captures).toBe(1); + }); + + // ============================ + // Non-shard filename filtering + // ============================ + it("ignores files not matching the YYYY-MM-DD.jsonl pattern", async () => { + const ts = "2026-06-24T10:00:00.000Z"; + await writeFile(baseDir, "2026-06-24.jsonl", `${line(ts)}\n`); + // These should be ignored: + await writeFile(baseDir, "2026-6-24.jsonl", `${line(ts)}\n`); // non-zero-padded + await writeFile(baseDir, "notes.txt", `${line(ts)}\n`); + await writeFile(baseDir, "2026-06-24.jsonl.bak", `${line(ts)}\n`); + await writeFile(baseDir, "2026-06-24.jsonl", `${line(ts)}\n`); // overwrite legit file + + const stats = await countL0JsonlStats(baseDir); + expect(stats.lines).toBe(1); + expect(stats.captures).toBe(1); + }); + + // ============================ + // Blank lines are neither lines nor captures + // ============================ + it("skips whitespace-only lines entirely", async () => { + const ts = "2026-06-24T10:00:00.000Z"; + await writeFile(baseDir, "2026-06-24.jsonl", + `${line(ts)}\n \n\t\n\n${line(ts)}\n`); + + const stats = await countL0JsonlStats(baseDir); + expect(stats.lines).toBe(2); + expect(stats.captures).toBe(1); + }); + + // ============================ + // Missing / empty directory + // ============================ + it("returns zeros when the conversations directory does not exist", async () => { + const stats = await countL0JsonlStats(baseDir); + expect(stats).toEqual({ captures: 0, lines: 0 }); + }); + + it("returns zeros when the conversations directory is empty", async () => { + await fs.mkdir(path.join(baseDir, "conversations"), { recursive: true }); + const stats = await countL0JsonlStats(baseDir); + expect(stats).toEqual({ captures: 0, lines: 0 }); + }); +}); diff --git a/src/core/conversation/l0-recorder.ts b/src/core/conversation/l0-recorder.ts index 50e904c4..d8c8f9ca 100644 --- a/src/core/conversation/l0-recorder.ts +++ b/src/core/conversation/l0-recorder.ts @@ -511,6 +511,86 @@ export async function readConversationMessagesGroupedBySessionId( return groups; } +/** + * Count L0 JSONL capture/line stats across all daily shard files. + * + * Walks every `/conversations/YYYY-MM-DD.jsonl` file once and returns: + * - `captures`: number of **distinct** `recordedAt` values (one per recording event) + * - `lines`: total physical non-empty line count (message count) + * + * No fallback timestamp: a line that is missing `recordedAt`, has a + * non-string `recordedAt`, or is otherwise unparseable still counts toward + * `lines` (it is a physical message) but is **never** added to the distinct + * `recordedAt` Set, so it cannot inflate `captures`. This deliberately differs + * from readConversationRecords (which backfills `new Date().toISOString()`); + * backfilling would manufacture phantom capture events. + * + * Files are filtered by the same `dateFilePattern` as readConversationRecords + * (`/^\d{4}-\d{2}-\d{2}\.jsonl$/`). Empty lines (whitespace-only after trim) + * are skipped โ€” they are neither a message nor a capture. + * + * Returns `{captures:0, lines:0}` when the conversations directory does not + * exist (does not throw โ€” mirrors readConversationRecords' first-run behavior). + */ +export async function countL0JsonlStats( + baseDir: string, +): Promise<{ captures: number; lines: number }> { + const conversationsDir = path.join(baseDir, "conversations"); + const dateFilePattern = /^\d{4}-\d{2}-\d{2}\.jsonl$/; + + let entries: string[]; + try { + const dirEntries = await fs.readdir(conversationsDir, { withFileTypes: true }); + entries = dirEntries + .filter((entry) => entry.isFile()) + .map((entry) => entry.name); + } catch { + // Directory doesn't exist yet โ€” normal for first conversation + return { captures: 0, lines: 0 }; + } + + const targetFiles = entries + .filter((name) => dateFilePattern.test(name)) + .sort(); + + const recordedAtSet = new Set(); + let lines = 0; + + for (const fileName of targetFiles) { + const filePath = path.join(conversationsDir, fileName); + + let raw: string; + try { + raw = await fs.readFile(filePath, "utf-8"); + } catch { + // Unreadable file: skip without aborting the whole scan + continue; + } + + const fileLines = raw.split("\n"); + for (const fileLine of fileLines) { + // Whitespace-only lines are not physical messages โ€” skip entirely. + if (!fileLine.trim()) continue; + + lines++; + + try { + const parsed = JSON.parse(fileLine) as Record; + // Only a genuine string recordedAt counts toward distinct captures. + // Missing / non-string / unparseable โ†’ still counted as a line above, + // but never added to the Set (no fallback timestamp). + if (typeof parsed.recordedAt === "string" && parsed.recordedAt) { + recordedAtSet.add(parsed.recordedAt); + } + } catch { + // Malformed JSON: counted as a physical line, not a capture. + } + } + } + + return { captures: recordedAtSet.size, lines }; +} + // ============================ // Helpers // ============================ diff --git a/src/core/record/l1-reader.test.ts b/src/core/record/l1-reader.test.ts new file mode 100644 index 00000000..df31ea1d --- /dev/null +++ b/src/core/record/l1-reader.test.ts @@ -0,0 +1,235 @@ +import { describe, it, expect, beforeEach, afterEach } from "vitest"; +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; + +import { countL1JsonlLines, countL1JsonlLinesSince } from "./l1-reader.js"; + +/** + * Helper: build a temp baseDir with a `records/` subdirectory and write the + * given files into it. Returns the baseDir path. Caller is responsible for + * cleanup via `afterEach`. + */ +async function makeBaseDir(files: Record): Promise { + const baseDir = await fs.mkdtemp(path.join(os.tmpdir(), "l1-reader-test-")); + const recordsDir = path.join(baseDir, "records"); + await fs.mkdir(recordsDir, { recursive: true }); + for (const [name, content] of Object.entries(files)) { + await fs.writeFile(path.join(recordsDir, name), content, "utf-8"); + } + return baseDir; +} + +/** Minimal valid record line with a given updatedAt. */ +function line(updatedAt: string, extra: string = ""): string { + return JSON.stringify({ + id: "r1", + content: "c", + type: "persona", + priority: 50, + scene_name: "s", + source_message_ids: [], + metadata: {}, + timestamps: [], + createdAt: "2026-01-01T00:00:00.000Z", + updatedAt, + sessionKey: "sk", + sessionId: "sid", + ...JSON.parse(extra || "{}"), + }); +} + +describe("countL1JsonlLines", () => { + let baseDirs: string[] = []; + beforeEach(() => { + baseDirs = []; + }); + afterEach(async () => { + await Promise.all(baseDirs.map((d) => fs.rm(d, { recursive: true, force: true }))); + }); + + it("counts total lines across multiple shard files", async () => { + const dir = await makeBaseDir({ + "2026-01-01.jsonl": line("2026-01-01T10:00:00.000Z") + "\n" + line("2026-01-01T11:00:00.000Z") + "\n", + "2026-01-02.jsonl": line("2026-01-02T09:00:00.000Z") + "\n", + }); + baseDirs.push(dir); + expect(await countL1JsonlLines(dir)).toBe(3); + }); + + it("skips empty/whitespace-only lines", async () => { + const dir = await makeBaseDir({ + "2026-01-01.jsonl": line("2026-01-01T10:00:00.000Z") + "\n\n \n" + line("2026-01-01T11:00:00.000Z") + "\n", + }); + baseDirs.push(dir); + expect(await countL1JsonlLines(dir)).toBe(2); + }); + + it("ignores non-shard files (.json, backups, temp)", async () => { + const dir = await makeBaseDir({ + "2026-01-01.jsonl": line("2026-01-01T10:00:00.000Z") + "\n", + "2026-01-01.json": line("2026-01-01T10:00:00.000Z") + "\n", + "2026-01-01.jsonl.bak": line("2026-01-01T10:00:00.000Z") + "\n", + "2026-01-01.jsonl.tmp": line("2026-01-01T10:00:00.000Z") + "\n", + "notes.txt": "irrelevant\n", + }); + baseDirs.push(dir); + expect(await countL1JsonlLines(dir)).toBe(1); + }); + + it("returns 0 when records/ directory does not exist", async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "l1-reader-test-")); + baseDirs.push(dir); + // no records/ subdir created + expect(await countL1JsonlLines(dir)).toBe(0); + }); + + it("returns 0 for empty records/ directory", async () => { + const dir = await makeBaseDir({}); + baseDirs.push(dir); + expect(await countL1JsonlLines(dir)).toBe(0); + }); + + it("skips malformed JSON lines (only parseable lines are counted)", async () => { + // countL1JsonlLines counts non-empty parseable lines; malformed JSON lines + // are skipped (not counted), per spec "parse each line". + const dir = await makeBaseDir({ + "2026-01-01.jsonl": + line("2026-01-01T10:00:00.000Z") + "\n" + "{not valid json}\n" + line("2026-01-01T11:00:00.000Z") + "\n", + }); + baseDirs.push(dir); + // 2 parseable lines, 1 malformed skipped + expect(await countL1JsonlLines(dir)).toBe(2); + }); +}); + +describe("countL1JsonlLinesSince", () => { + let baseDirs: string[] = []; + beforeEach(() => { + baseDirs = []; + }); + afterEach(async () => { + await Promise.all(baseDirs.map((d) => fs.rm(d, { recursive: true, force: true }))); + }); + + it("counts only lines with updatedAt > sinceIso", async () => { + const dir = await makeBaseDir({ + "2026-01-01.jsonl": + line("2026-01-01T10:00:00.000Z") + + "\n" + + line("2026-01-01T11:00:00.000Z") + + "\n" + + line("2026-01-02T09:00:00.000Z") + + "\n", + }); + baseDirs.push(dir); + expect(await countL1JsonlLinesSince(dir, "2026-01-01T11:00:00.000Z")).toBe(1); + }); + + it("returns all lines when sinceIso is empty string", async () => { + const dir = await makeBaseDir({ + "2026-01-01.jsonl": line("2026-01-01T10:00:00.000Z") + "\n" + line("2026-01-02T09:00:00.000Z") + "\n", + }); + baseDirs.push(dir); + expect(await countL1JsonlLinesSince(dir, "")).toBe(2); + }); + + it("equal updatedAt is not counted (strict >)", async () => { + const dir = await makeBaseDir({ + "2026-01-01.jsonl": line("2026-01-01T10:00:00.000Z") + "\n" + line("2026-01-01T10:00:00.001Z") + "\n", + }); + baseDirs.push(dir); + expect(await countL1JsonlLinesSince(dir, "2026-01-01T10:00:00.000Z")).toBe(1); + }); + + it("skips lines with missing updatedAt field", async () => { + const noUpdatedAt = JSON.stringify({ + id: "r1", + content: "c", + type: "persona", + priority: 50, + scene_name: "s", + source_message_ids: [], + metadata: {}, + timestamps: [], + createdAt: "2026-01-01T00:00:00.000Z", + sessionKey: "sk", + sessionId: "sid", + }); + const dir = await makeBaseDir({ + "2026-01-01.jsonl": + noUpdatedAt + "\n" + line("2026-06-01T10:00:00.000Z") + "\n", + }); + baseDirs.push(dir); + expect(await countL1JsonlLinesSince(dir, "2026-01-01T00:00:00.000Z")).toBe(1); + }); + + it("skips lines with malformed/non-string updatedAt", async () => { + const numUpdatedAt = JSON.stringify({ + id: "r1", + content: "c", + type: "persona", + priority: 50, + scene_name: "s", + source_message_ids: [], + metadata: {}, + timestamps: [], + createdAt: "2026-01-01T00:00:00.000Z", + updatedAt: 12345, + sessionKey: "sk", + sessionId: "sid", + }); + const dir = await makeBaseDir({ + "2026-01-01.jsonl": + numUpdatedAt + "\n" + line("2026-06-01T10:00:00.000Z") + "\n", + }); + baseDirs.push(dir); + expect(await countL1JsonlLinesSince(dir, "2026-01-01T00:00:00.000Z")).toBe(1); + }); + + it("skips lines with malformed-string updatedAt (e.g. zzz / not-a-time)", async () => { + // updatedAt is a string but not a parseable ISO timestamp โ€” must be + // skipped (not silently compared as a string). + const dir = await makeBaseDir({ + "2026-01-01.jsonl": + line("zzz") + "\n" + line("not-a-time") + "\n" + line("2026-06-01T10:00:00.000Z") + "\n", + }); + baseDirs.push(dir); + // Only the one valid ISO line is > cutoff; the two malformed-string lines are skipped. + expect(await countL1JsonlLinesSince(dir, "2026-01-01T00:00:00.000Z")).toBe(1); + }); + + it("skips malformed JSON lines entirely", async () => { + const dir = await makeBaseDir({ + "2026-01-01.jsonl": + "{broken\n" + line("2026-06-01T10:00:00.000Z") + "\n", + }); + baseDirs.push(dir); + expect(await countL1JsonlLinesSince(dir, "2026-01-01T00:00:00.000Z")).toBe(1); + }); + + it("returns 0 when records/ directory does not exist", async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "l1-reader-test-")); + baseDirs.push(dir); + expect(await countL1JsonlLinesSince(dir, "2026-01-01T00:00:00.000Z")).toBe(0); + }); + + it("counts across multiple shard files", async () => { + const dir = await makeBaseDir({ + "2026-01-01.jsonl": line("2026-05-01T10:00:00.000Z") + "\n", + "2026-06-01.jsonl": line("2026-06-01T10:00:00.000Z") + "\n" + line("2026-06-02T10:00:00.000Z") + "\n", + }); + baseDirs.push(dir); + expect(await countL1JsonlLinesSince(dir, "2026-06-01T00:00:00.000Z")).toBe(2); + }); + + it("ignores non-shard files", async () => { + const dir = await makeBaseDir({ + "2026-06-01.jsonl": line("2026-06-01T10:00:00.000Z") + "\n", + "2026-06-01.json": line("2026-06-01T10:00:00.000Z") + "\n", + "2026-06-01.jsonl.bak": line("2026-06-01T10:00:00.000Z") + "\n", + }); + baseDirs.push(dir); + expect(await countL1JsonlLinesSince(dir, "2026-01-01T00:00:00.000Z")).toBe(1); + }); +}); diff --git a/src/core/record/l1-reader.ts b/src/core/record/l1-reader.ts index 15611618..acdbccb9 100644 --- a/src/core/record/l1-reader.ts +++ b/src/core/record/l1-reader.ts @@ -203,6 +203,144 @@ export async function readAllMemoryRecords( } } +// ============================ +// JSONL line counters (for checkpoint recalibration) +// ============================ + +// Same shard pattern as readMemoryRecords โ€” only YYYY-MM-DD.jsonl, no .json/.bak/.tmp. +const l1DateFilePattern = /^\d{4}-\d{2}-\d{2}\.jsonl$/; + +/** + * Count the total number of record lines across all daily-shard JSONL files + * in `/records/`. Mirrors readMemoryRecords' file selection and + * line-parsing style: only `YYYY-MM-DD.jsonl` files are matched, empty lines + * are skipped, and malformed (unparseable) lines are skipped (not counted). + * + * Returns 0 when the records directory does not exist (does not throw), + * matching readMemoryRecords' try/catch fallback. + */ +export async function countL1JsonlLines(baseDir: string): Promise { + const recordsDir = path.join(baseDir, "records"); + + let entries: import("node:fs").Dirent[]; + try { + entries = await fs.readdir(recordsDir, { withFileTypes: true }); + } catch { + // Directory doesn't exist yet + return 0; + } + + const targetFiles = entries + .filter((entry) => entry.isFile() && l1DateFilePattern.test(entry.name)) + .map((entry) => entry.name) + .sort(); + + let count = 0; + for (const fileName of targetFiles) { + const filePath = path.join(recordsDir, fileName); + + let raw: string; + try { + raw = await fs.readFile(filePath, "utf-8"); + } catch { + continue; + } + + const lines = raw.split("\n").filter((line) => line.trim()); + for (const line of lines) { + try { + JSON.parse(line); + count++; + } catch { + // malformed JSON line โ€” skip (not counted) + } + } + } + + return count; +} + +/** + * Count record lines whose `updatedAt` field is strictly greater than + * `sinceIso` across all daily-shard JSONL files in `/records/`. + * + * Semantics: + * - Only `YYYY-MM-DD.jsonl` shard files are matched (same pattern as + * readMemoryRecords). + * - Each non-empty line is JSON.parsed; the `updatedAt` field is compared + * via string ordering against `sinceIso` (both expected to be canonical + * ISO 8601 UTC strings, e.g. `2026-06-01T10:00:00.000Z`). + * - Lines with malformed JSON, a missing `updatedAt`, or a non-string + * `updatedAt` are skipped (not counted). + * - When `sinceIso === ""`, returns the total line count (equivalent to + * countL1JsonlLines). + * + * Returns 0 when the records directory does not exist (does not throw). + */ +export async function countL1JsonlLinesSince( + baseDir: string, + sinceIso: string, +): Promise { + if (sinceIso === "") { + return countL1JsonlLines(baseDir); + } + + const recordsDir = path.join(baseDir, "records"); + + let entries: import("node:fs").Dirent[]; + try { + entries = await fs.readdir(recordsDir, { withFileTypes: true }); + } catch { + // Directory doesn't exist yet + return 0; + } + + const targetFiles = entries + .filter((entry) => entry.isFile() && l1DateFilePattern.test(entry.name)) + .map((entry) => entry.name) + .sort(); + + let count = 0; + for (const fileName of targetFiles) { + const filePath = path.join(recordsDir, fileName); + + let raw: string; + try { + raw = await fs.readFile(filePath, "utf-8"); + } catch { + continue; + } + + const lines = raw.split("\n").filter((line) => line.trim()); + for (const line of lines) { + let parsed: unknown; + try { + parsed = JSON.parse(line); + } catch { + // malformed JSON line โ€” skip + continue; + } + + const updatedAt = (parsed as Partial)?.updatedAt; + if (typeof updatedAt !== "string") { + // missing or non-string updatedAt โ€” skip + continue; + } + if (Number.isNaN(new Date(updatedAt).getTime())) { + // malformed ISO timestamp (e.g. "zzz", "not-a-time") โ€” skip, do not + // participate in the string ordering comparison. + continue; + } + + if (updatedAt > sinceIso) { + count++; + } + } + } + + return count; +} + // ============================ // Helpers // ============================ diff --git a/src/utils/checkpoint.test.ts b/src/utils/checkpoint.test.ts new file mode 100644 index 00000000..3239a61c --- /dev/null +++ b/src/utils/checkpoint.test.ts @@ -0,0 +1,629 @@ +import { describe, it, expect, beforeEach, afterEach } from "vitest"; +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; + +import { CheckpointManager } from "./checkpoint.js"; +import { countL1JsonlLines, countL1JsonlLinesSince } from "../core/record/l1-reader.js"; +import { countL0JsonlStats } from "../core/conversation/l0-recorder.js"; + +describe("CheckpointManager.recalibrate", () => { + let dataDirs: string[] = []; + + beforeEach(() => { + dataDirs = []; + }); + + afterEach(async () => { + await Promise.all(dataDirs.map((d) => fs.rm(d, { recursive: true, force: true }))); + }); + + async function makeDataDir(): Promise { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "checkpoint-test-")); + dataDirs.push(dir); + return dir; + } + + it("overwrites the four calibrated counters with the supplied actual values", async () => { + const dir = await makeDataDir(); + const mgr = new CheckpointManager(dir); + + // Seed some non-zero initial state via existing public API so the file + // exists with real values, then recalibrate to authoritative counts. + await mgr.markL1ExtractionComplete("sess-a", 5, 1000, "scene-x"); + + const result = await mgr.recalibrate({ + totalMemoriesExtracted: 42, + l0ConversationsCount: 7, + totalProcessed: 100, + memoriesSinceLastPersona: 13, + }); + + // Four calibrated fields reflect the supplied actual values. + expect(result.was.total_memories_extracted).toBe(5); + expect(result.was.l0_conversations_count).toBe(0); + expect(result.was.total_processed).toBe(0); + expect(result.was.memories_since_last_persona).toBe(5); + + const cp = await mgr.read(); + expect(cp.total_memories_extracted).toBe(42); + expect(cp.l0_conversations_count).toBe(7); + expect(cp.total_processed).toBe(100); + expect(cp.memories_since_last_persona).toBe(13); + }); + + it("returns the pre-recalibration values in `was` for caller logging", async () => { + const dir = await makeDataDir(); + const mgr = new CheckpointManager(dir); + + // Establish known prior values via captureAtomically (sets total_processed + // and l0_conversations_count) and markL1ExtractionComplete (sets the + // memory counters). + await mgr.captureAtomically("sess-b", undefined, async () => ({ + maxTimestamp: 2000, + messageCount: 10, + })); + await mgr.markL1ExtractionComplete("sess-b", 3, 2000, "scene-y"); + + const result = await mgr.recalibrate({ + totalMemoriesExtracted: 999, + l0ConversationsCount: 999, + totalProcessed: 999, + memoriesSinceLastPersona: 999, + }); + + // was must snapshot the values that existed BEFORE this recalibrate call. + expect(result.was).toEqual({ + total_memories_extracted: 3, + l0_conversations_count: 1, + total_processed: 10, + memories_since_last_persona: 3, + }); + }); + + it("is idempotent: repeated calls with the same values leave state unchanged", async () => { + const dir = await makeDataDir(); + const mgr = new CheckpointManager(dir); + + const actual = { + totalMemoriesExtracted: 50, + l0ConversationsCount: 8, + totalProcessed: 200, + memoriesSinceLastPersona: 20, + }; + + await mgr.recalibrate(actual); + const first = await mgr.recalibrate(actual); + + // Second call's `was` equals the values set by the first call. + expect(first.was).toEqual({ + total_memories_extracted: 50, + l0_conversations_count: 8, + total_processed: 200, + memories_since_last_persona: 20, + }); + + const cp = await mgr.read(); + expect(cp.total_memories_extracted).toBe(50); + expect(cp.l0_conversations_count).toBe(8); + expect(cp.total_processed).toBe(200); + expect(cp.memories_since_last_persona).toBe(20); + }); + + it("does not touch unrelated fields (persona, scenes, cursors, per-session state)", async () => { + const dir = await makeDataDir(); + const mgr = new CheckpointManager(dir); + + // Populate unrelated fields via existing public methods. + await mgr.setPersonaUpdateRequest("drift-detected"); + await mgr.incrementScenesProcessed(); + await mgr.markL1ExtractionComplete("sess-c", 4, 3000, "scene-z"); + await mgr.captureAtomically("sess-c", undefined, async () => ({ + maxTimestamp: 3500, + messageCount: 6, + })); + + // Snapshot the unrelated fields before recalibrate. + const before = await mgr.read(); + const beforePersonaAt = before.last_persona_at; + const beforePersonaTime = before.last_persona_time; + const beforeRequestPersona = before.request_persona_update; + const beforePersonaReason = before.persona_update_reason; + const beforeScenes = before.scenes_processed; + const beforeLastCapturedTs = before.last_captured_timestamp; + const beforeRunner = before.runner_states["sess-c"]; + const beforePipeline = before.pipeline_states["sess-c"]; + + await mgr.recalibrate({ + totalMemoriesExtracted: 77, + l0ConversationsCount: 9, + totalProcessed: 300, + memoriesSinceLastPersona: 0, + }); + + const after = await mgr.read(); + // Unrelated global fields are untouched. + expect(after.last_persona_at).toBe(beforePersonaAt); + expect(after.last_persona_time).toBe(beforePersonaTime); + expect(after.request_persona_update).toBe(beforeRequestPersona); + expect(after.persona_update_reason).toBe(beforePersonaReason); + expect(after.scenes_processed).toBe(beforeScenes); + expect(after.last_captured_timestamp).toBe(beforeLastCapturedTs); + // Per-session split state is untouched. + expect(after.runner_states["sess-c"]).toEqual(beforeRunner); + expect(after.pipeline_states["sess-c"]).toEqual(beforePipeline); + // Only the four calibrated fields changed. + expect(after.total_memories_extracted).toBe(77); + expect(after.l0_conversations_count).toBe(9); + expect(after.total_processed).toBe(300); + expect(after.memories_since_last_persona).toBe(0); + }); +}); + +// ============================================================ +// Integration tests: real JSONL files โ†’ drift reproduction โ†’ recalibrate +// Covers drift reproduction, stale-row counting, persona consistency, +// degraded fallback, and empty-last_persona_time semantics. +// Uses the real filesystem (temp dirs) โ€” no store mocks โ€” so the +// recalibrate path is exercised end-to-end against actual JSONL shards. +// ============================================================ + +/** Build a minimal valid L1 MemoryRecord JSONL line with the given updatedAt. */ +function l1Line(opts: { + id?: string; + updatedAt: string; + createdAt?: string; + sessionKey?: string; + sessionId?: string; +}): string { + return JSON.stringify({ + id: opts.id ?? "mem-1", + content: "some memory content", + type: "episodic", + priority: 50, + scene_name: "scene-default", + source_message_ids: [], + metadata: {}, + timestamps: [], + createdAt: opts.createdAt ?? opts.updatedAt, + updatedAt: opts.updatedAt, + sessionKey: opts.sessionKey ?? "sess-int", + sessionId: opts.sessionId ?? "sid-int", + }); +} + +/** Build a minimal valid L0MessageRecord JSONL line with the given recordedAt. */ +function l0Line(opts: { + recordedAt: string; + id?: string; + role?: "user" | "assistant"; + sessionKey?: string; + sessionId?: string; +}): string { + return JSON.stringify({ + sessionKey: opts.sessionKey ?? "sess-int", + sessionId: opts.sessionId ?? "sid-int", + recordedAt: opts.recordedAt, + id: opts.id ?? "msg-1", + role: opts.role ?? "user", + content: "hello", + timestamp: Date.parse(opts.recordedAt) || 0, + }); +} + +describe("CheckpointManager integration: drift โ†’ recalibrate", () => { + let dataDirs: string[] = []; + + beforeEach(() => { + dataDirs = []; + }); + + afterEach(async () => { + await Promise.all(dataDirs.map((d) => fs.rm(d, { recursive: true, force: true }))); + }); + + async function makeDataDir(): Promise { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "checkpoint-int-")); + dataDirs.push(dir); + return dir; + } + + /** Write the given string content to /records/. */ + async function writeRecords(baseDir: string, fileName: string, content: string): Promise { + const dir = path.join(baseDir, "records"); + await fs.mkdir(dir, { recursive: true }); + await fs.writeFile(path.join(dir, fileName), content, "utf-8"); + } + + /** Write the given string content to /conversations/. */ + async function writeConversations(baseDir: string, fileName: string, content: string): Promise { + const dir = path.join(baseDir, "conversations"); + await fs.mkdir(dir, { recursive: true }); + await fs.writeFile(path.join(dir, fileName), content, "utf-8"); + } + + // โ”€โ”€ Acceptance 1: drift end-to-end reproduction + correction โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + it("reproduces counter drift and corrects all four fields to authoritative JSONL counts", async () => { + const dir = await makeDataDir(); + const mgr = new CheckpointManager(dir); + + // --- Seed real JSONL data --- + // records/*.jsonl: 3 L1 memory lines across two daily shards. + await writeRecords( + dir, + "2026-06-24.jsonl", + l1Line({ id: "m1", updatedAt: "2026-06-24T10:00:00.000Z" }) + + "\n" + + l1Line({ id: "m2", updatedAt: "2026-06-24T11:00:00.000Z" }) + + "\n", + ); + await writeRecords( + dir, + "2026-06-25.jsonl", + l1Line({ id: "m3", updatedAt: "2026-06-25T09:00:00.000Z" }) + "\n", + ); + + // conversations/*.jsonl: 2 distinct capture events (distinct recordedAt), + // 4 physical message lines. + await writeConversations( + dir, + "2026-06-24.jsonl", + l0Line({ recordedAt: "2026-06-24T10:00:00.000Z" }) + + "\n" + + l0Line({ recordedAt: "2026-06-24T10:00:00.000Z" }) + + "\n" + + l0Line({ recordedAt: "2026-06-24T11:30:00.000Z" }) + + "\n" + + l0Line({ recordedAt: "2026-06-24T11:30:00.000Z" }) + + "\n", + ); + + // --- Simulate drift: inflate counters far above actual JSONL reality --- + // Use the public mutating API to push the counters to virtual-high values + // (mimicking a real drift where cleaners deleted data but counters never + // decremented). markL1ExtractionComplete += memoriesExtracted and + // memories_since_last_persona; captureAtomically += total_processed and + // l0_conversations_count. + await mgr.markL1ExtractionComplete("sess-int", 999, 0, "scene-x"); + await mgr.captureAtomically("sess-int", undefined, async () => ({ + maxTimestamp: 999999, + messageCount: 999, + })); + + const drifted = await mgr.read(); + // Sanity: drift is in place before recalibrate. + expect(drifted.total_memories_extracted).toBe(999); + expect(drifted.memories_since_last_persona).toBe(999); + expect(drifted.total_processed).toBe(999); + expect(drifted.l0_conversations_count).toBe(1); // one captureAtomically call + + // --- Compute authoritative truth from the real JSONL files --- + const totalMemoriesExtracted = await countL1JsonlLines(dir); + const l0Stats = await countL0JsonlStats(dir); + const memoriesSinceLastPersona = await countL1JsonlLinesSince( + dir, + drifted.last_persona_time ?? "", + ); + // total_processed authoritative source is store.countL0(); store is not + // available in this test, so we use the JSONL-lines fallback (same value + // the startup degraded path in index.ts picks). + const totalProcessed = l0Stats.lines; + + expect(totalMemoriesExtracted).toBe(3); + expect(l0Stats.captures).toBe(2); + expect(l0Stats.lines).toBe(4); + expect(memoriesSinceLastPersona).toBe(3); // last_persona_time is "" โ†’ all lines + + // --- Recalibrate against the authoritative truth --- + const { was } = await mgr.recalibrate({ + totalMemoriesExtracted, + l0ConversationsCount: l0Stats.captures, + totalProcessed, + memoriesSinceLastPersona, + }); + + // `was` reflects the drifted (pre-recalibrate) values. + expect(was).toEqual({ + total_memories_extracted: 999, + l0_conversations_count: 1, + total_processed: 999, + memories_since_last_persona: 999, + }); + + // After recalibrate, all four fields equal the JSONL truth โ€” no longer 999. + const cp = await mgr.read(); + expect(cp.total_memories_extracted).toBe(3); + expect(cp.l0_conversations_count).toBe(2); + expect(cp.total_processed).toBe(4); + expect(cp.memories_since_last_persona).toBe(3); + }); + + // โ”€โ”€ Acceptance 2: update/merge stale rows are counted (not deduped) โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + it("counts update/merge stale rows in total_memories_extracted (JSONL is append-only, not deduped)", async () => { + const dir = await makeDataDir(); + const mgr = new CheckpointManager(dir); + + // Simulate the JSONL append-only trail left by an update/merge sequence: + // the SAME id appears multiple times (the older rows are "stale rows" + // that store-side dedup would remove, but JSONL keeps them because it is + // append-only). countL1JsonlLines must count every physical line. + await writeRecords( + dir, + "2026-06-24.jsonl", + // initial store + l1Line({ id: "mem-x", updatedAt: "2026-06-24T10:00:00.000Z" }) + + "\n" + + // update of mem-x appends a new line (same id, newer updatedAt) + l1Line({ id: "mem-x", updatedAt: "2026-06-24T11:00:00.000Z" }) + + "\n" + + // merge of mem-x + mem-y appends another line (same id mem-x) + l1Line({ id: "mem-x", updatedAt: "2026-06-24T12:00:00.000Z" }) + + "\n" + + // a fresh store, unrelated + l1Line({ id: "mem-y", updatedAt: "2026-06-24T12:30:00.000Z" }) + + "\n", + ); + + const totalCount = await countL1JsonlLines(dir); + // 4 physical lines โ€” the 3 stale rows for mem-x are NOT deduped. + expect(totalCount).toBe(4); + + // Recalibrate pushes the (dedup-agnostic) physical line count into the + // counter, so the counter reflects the JSONL append-only semantics + // (matching the field's accumulation semantics โ€” each append counts once). + await mgr.recalibrate({ + totalMemoriesExtracted: totalCount, + l0ConversationsCount: 0, + totalProcessed: 0, + memoriesSinceLastPersona: totalCount, + }); + + const cp = await mgr.read(); + expect(cp.total_memories_extracted).toBe(4); + expect(cp.memories_since_last_persona).toBe(4); + }); + + // โ”€โ”€ Acceptance 7: persona consistency โ€” memories_since_last_persona matches JSONL โ”€โ”€ + it("keeps memories_since_last_persona consistent with the JSONL truth after recalibrate", async () => { + const dir = await makeDataDir(); + const mgr = new CheckpointManager(dir); + + const lastPersonaTime = "2026-06-24T12:00:00.000Z"; + // 2 rows older than (or equal to) last_persona_time, 3 rows strictly after. + await writeRecords( + dir, + "2026-06-24.jsonl", + l1Line({ id: "old-1", updatedAt: "2026-06-24T10:00:00.000Z" }) + + "\n" + + l1Line({ id: "old-2", updatedAt: "2026-06-24T11:00:00.000Z" }) + + "\n" + + l1Line({ id: "eq", updatedAt: "2026-06-24T12:00:00.000Z" }) + // equal, not counted (strict >) + "\n" + + l1Line({ id: "new-1", updatedAt: "2026-06-24T13:00:00.000Z" }) + + "\n" + + l1Line({ id: "new-2", updatedAt: "2026-06-24T14:00:00.000Z" }) + + "\n" + + l1Line({ id: "new-3", updatedAt: "2026-06-25T09:00:00.000Z" }) + + "\n", + ); + + // Set checkpoint.last_persona_time via the public persona API so the + // stored value is realistic. markPersonaGenerated also zeroes + // memories_since_last_persona โ€” we then re-inflate it to simulate drift. + await mgr.markPersonaGenerated(100); + const afterPersona = await mgr.read(); + expect(afterPersona.last_persona_time).toBeTruthy(); + expect(afterPersona.memories_since_last_persona).toBe(0); + + // Simulate drift: memories_since_last_persona was inflated to a virtual-high + // value (e.g. counter never decremented after cleaner deleted old rows). + await mgr.recalibrate({ + totalMemoriesExtracted: 999, + l0ConversationsCount: 0, + totalProcessed: 0, + memoriesSinceLastPersona: 999, + }); + const drifted = await mgr.read(); + expect(drifted.memories_since_last_persona).toBe(999); + + // Authoritative truth: rows strictly newer than last_persona_time. + const since = await countL1JsonlLinesSince(dir, lastPersonaTime); + expect(since).toBe(3); // new-1, new-2, new-3 + + // Recalibrate memories_since_last_persona to the JSONL truth. + await mgr.recalibrate({ + totalMemoriesExtracted: 999, // unchanged for this test's focus + l0ConversationsCount: 0, + totalProcessed: 0, + memoriesSinceLastPersona: since, + }); + + const cp = await mgr.read(); + // memories_since_last_persona now equals the JSONL ground truth, not the + // inflated 999. PersonaTrigger reads this field for its P3/P4 threshold + // checks; because it matches reality, a persona interval check against + // this value will not spuriously fire due to pre-cleanup inflation. + expect(cp.memories_since_last_persona).toBe(3); + }); + + // โ”€โ”€ Acceptance 8: degraded fallback uses JSONL lines, not store's 0 โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + it("degraded fallback: total_processed uses countL0JsonlStats().lines, not store.countL0()=0", async () => { + const dir = await makeDataDir(); + + // 5 physical message lines across two shards. The store is degraded in + // this scenario, so the authoritative source for total_processed falls + // back to the JSONL line count (startup recalibrate degraded fallback). + await writeConversations( + dir, + "2026-06-24.jsonl", + l0Line({ recordedAt: "2026-06-24T10:00:00.000Z" }) + + "\n" + + l0Line({ recordedAt: "2026-06-24T10:00:00.000Z" }) + + "\n" + + l0Line({ recordedAt: "2026-06-24T11:00:00.000Z" }) + + "\n", + ); + await writeConversations( + dir, + "2026-06-25.jsonl", + l0Line({ recordedAt: "2026-06-25T09:00:00.000Z" }) + + "\n" + + l0Line({ recordedAt: "2026-06-25T10:00:00.000Z" }) + + "\n", + ); + + const l0Stats = await countL0JsonlStats(dir); + // lines is the physical message count โ€” the degraded-fallback data source. + expect(l0Stats.lines).toBe(5); + // 4 distinct recordedAt values: 06-24T10:00, 06-24T11:00, 06-25T09:00, 06-25T10:00. + expect(l0Stats.captures).toBe(4); + + // Simulate the index.ts degraded-fallback selection logic: + // vectorStore && !vectorStore.isDegraded() ? store.countL0() : l0Stats.lines + // Here the store reports degraded (countL0() would return 0), so the + // chosen totalProcessed must be l0Stats.lines, NOT 0. + const storeCountL0 = 0; // degraded store returns 0 + const isDegraded = true; + const totalProcessed = !isDegraded ? storeCountL0 : l0Stats.lines; + expect(totalProcessed).toBe(5); // lines, not 0 + + // Recalibrate would then write this authoritative value. + const mgr = new CheckpointManager(dir); + await mgr.recalibrate({ + totalMemoriesExtracted: 0, + l0ConversationsCount: l0Stats.captures, + totalProcessed, + memoriesSinceLastPersona: 0, + }); + const cp = await mgr.read(); + expect(cp.total_processed).toBe(5); + expect(cp.l0_conversations_count).toBe(4); + }); + + // โ”€โ”€ Acceptance 5: last_persona_time === "" counts all lines โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + it("treats empty last_persona_time as 'never generated persona' and counts all L1 lines", async () => { + const dir = await makeDataDir(); + const mgr = new CheckpointManager(dir); + + // A fresh checkpoint has last_persona_time === "" (DEFAULT_CHECKPOINT). + const fresh = await mgr.read(); + expect(fresh.last_persona_time).toBe(""); + + // Seed 3 L1 lines. + await writeRecords( + dir, + "2026-06-24.jsonl", + l1Line({ id: "a", updatedAt: "2026-06-24T10:00:00.000Z" }) + + "\n" + + l1Line({ id: "b", updatedAt: "2026-06-24T11:00:00.000Z" }) + + "\n" + + l1Line({ id: "c", updatedAt: "2026-06-25T09:00:00.000Z" }) + + "\n", + ); + + // With last_persona_time === "", countL1JsonlLinesSince must equal + // countL1JsonlLines (all lines) โ€” i.e. every memory counts as "since + // last persona" when no persona has ever been generated. + const totalLines = await countL1JsonlLines(dir); + const sinceEmpty = await countL1JsonlLinesSince(dir, ""); + expect(sinceEmpty).toBe(totalLines); + expect(sinceEmpty).toBe(3); + + // Recalibrate memories_since_last_persona to the full count. + await mgr.recalibrate({ + totalMemoriesExtracted: totalLines, + l0ConversationsCount: 0, + totalProcessed: 0, + memoriesSinceLastPersona: sinceEmpty, + }); + + const cp = await mgr.read(); + // All lines are "since last persona" โ€” memories_since_last_persona equals + // the total L1 line count, reflecting the JSONL truth (not a virtual 0 or + // an inflated value). + expect(cp.memories_since_last_persona).toBe(3); + expect(cp.total_memories_extracted).toBe(3); + }); + + // โ”€โ”€ Issue #157 repro: manual JSONL pruning (delete lines) โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + it("issue repro: manual JSONL pruning โ€” counter inflated, recalibrate drops to surviving lines", async () => { + const dir = await makeDataDir(); + const mgr = new CheckpointManager(dir); + + // Seed records/*.jsonl with 5 L1 lines. + await writeRecords( + dir, + "2026-06-24.jsonl", + l1Line({ id: "m1", updatedAt: "2026-06-24T10:00:00.000Z" }) + "\n" + + l1Line({ id: "m2", updatedAt: "2026-06-24T11:00:00.000Z" }) + "\n" + + l1Line({ id: "m3", updatedAt: "2026-06-24T12:00:00.000Z" }) + "\n" + + l1Line({ id: "m4", updatedAt: "2026-06-24T13:00:00.000Z" }) + "\n" + + l1Line({ id: "m5", updatedAt: "2026-06-24T14:00:00.000Z" }) + "\n", + ); + + // Counter was inflated to 5 via normal extraction flow. + await mgr.markL1ExtractionComplete("sess-int", 5); + expect((await mgr.read()).total_memories_extracted).toBe(5); + + // Manual pruning: rewrite the shard with only 2 surviving lines (delete 3). + await writeRecords( + dir, + "2026-06-24.jsonl", + l1Line({ id: "m4", updatedAt: "2026-06-24T13:00:00.000Z" }) + "\n" + + l1Line({ id: "m5", updatedAt: "2026-06-24T14:00:00.000Z" }) + "\n", + ); + + // Counter still says 5 (never decremented) โ€” drift reproduced. + expect((await mgr.read()).total_memories_extracted).toBe(5); + + // Recalibrate against the surviving JSONL truth. + const surviving = await countL1JsonlLines(dir); + expect(surviving).toBe(2); + await mgr.recalibrate({ + totalMemoriesExtracted: surviving, + l0ConversationsCount: 0, + totalProcessed: 0, + memoriesSinceLastPersona: 0, + }); + + // Counter now reflects actual data (5 โ†’ 2). + expect((await mgr.read()).total_memories_extracted).toBe(2); + }); + + // โ”€โ”€ Issue #157 case: memory-cleaner deletes an expired shard file โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + it("cleaner-style: deleting an expired shard file drops the counter via recalibrate", async () => { + const dir = await makeDataDir(); + const mgr = new CheckpointManager(dir); + + // Two daily shards: 3 lines + 2 lines = 5 total. + await writeRecords( + dir, + "2026-06-23.jsonl", + l1Line({ id: "old1", updatedAt: "2026-06-23T10:00:00.000Z" }) + "\n" + + l1Line({ id: "old2", updatedAt: "2026-06-23T11:00:00.000Z" }) + "\n" + + l1Line({ id: "old3", updatedAt: "2026-06-23T12:00:00.000Z" }) + "\n", + ); + await writeRecords( + dir, + "2026-06-24.jsonl", + l1Line({ id: "new1", updatedAt: "2026-06-24T10:00:00.000Z" }) + "\n" + + l1Line({ id: "new2", updatedAt: "2026-06-24T11:00:00.000Z" }) + "\n", + ); + await mgr.markL1ExtractionComplete("sess-int", 5); + expect((await mgr.read()).total_memories_extracted).toBe(5); + + // Cleaner removes the expired 2026-06-23 shard file entirely. + await fs.unlink(path.join(dir, "records", "2026-06-23.jsonl")); + + // Counter still 5 (drift). Recalibrate to surviving shard. + const surviving = await countL1JsonlLines(dir); + expect(surviving).toBe(2); + await mgr.recalibrate({ + totalMemoriesExtracted: surviving, + l0ConversationsCount: 0, + totalProcessed: 0, + memoriesSinceLastPersona: 0, + }); + expect((await mgr.read()).total_memories_extracted).toBe(2); + }); +}); diff --git a/src/utils/checkpoint.ts b/src/utils/checkpoint.ts index 301fc0df..755a626c 100644 --- a/src/utils/checkpoint.ts +++ b/src/utils/checkpoint.ts @@ -431,6 +431,51 @@ export class CheckpointManager { ); } + // ============================ + // Recalibration (authoritative re-sync of derived counters) + // ============================ + + /** + * Re-synchronize the four derived counters against authoritative source-of- + * truth counts (e.g. re-counted L0/L1 files at startup). Only these four + * fields are overwritten; persona/scene/cursor/per-session state is left + * untouched. Returns the pre-call values in `was` so the caller can log the + * beforeโ†’after delta. + */ + async recalibrate(actual: { + totalMemoriesExtracted: number; + l0ConversationsCount: number; + totalProcessed: number; + memoriesSinceLastPersona: number; + }): Promise<{ + was: { + total_memories_extracted: number; + l0_conversations_count: number; + total_processed: number; + memories_since_last_persona: number; + }; + }> { + let was!: { + total_memories_extracted: number; + l0_conversations_count: number; + total_processed: number; + memories_since_last_persona: number; + }; + await this.mutate((cp) => { + was = { + total_memories_extracted: cp.total_memories_extracted, + l0_conversations_count: cp.l0_conversations_count, + total_processed: cp.total_processed, + memories_since_last_persona: cp.memories_since_last_persona, + }; + cp.total_memories_extracted = actual.totalMemoriesExtracted; + cp.l0_conversations_count = actual.l0ConversationsCount; + cp.total_processed = actual.totalProcessed; + cp.memories_since_last_persona = actual.memoriesSinceLastPersona; + }); + return { was }; + } + // ============================ // Atomic capture (race-condition fix) // ============================