Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

## [Unreleased]

### 🐛 Bug 修复

- **Checkpoint 计数器漂移** ([#157](https://github.com/Tencent/TencentDB-Agent-Memory/issues/157)):`total_memories_extracted`、`l0_conversations_count`、`total_processed`、`memories_since_last_persona` 四个全局计数器只增不减,`memory-cleaner` 清理或人工修剪 JSONL 后永久高估实际数据。新增 `CheckpointManager.recalibrate()`,在 Gateway 启动时用真实数据(L0/L1 JSONL 行数、store `countL0()`)重算这四个计数器,纠正漂移。其中 `memories_since_last_persona` 校准后不再虚高误触发 persona 生成。增量提取门控使用 per-session 游标,不读这些全局计数器,故不会因漂移跳过记录。

### ✨ 新功能

- **时区可配置** ([#75](https://github.com/Tencent/TencentDB-Agent-Memory/issues/75) / [#87](https://github.com/Tencent/TencentDB-Agent-Memory/issues/87)):新增顶层 `timezone` 配置项,支持 IANA 时区名(`Asia/Shanghai`、`Europe/Berlin`)和 UTC 偏移串(`+08:00`、`-05:30`)。默认 `"system"`(跟随进程系统时区),升级零感。
Expand Down
48 changes: 47 additions & 1 deletion index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ import { registerMemoryTdaiCli } from "./src/cli/index.js";
import { initDataDirectories, resetStores } from "./src/utils/pipeline-factory.js";
import { getOrCreateInstanceId, initReporter, report, resetReporter } from "./src/core/report/reporter.js";
import { ensureL2L3Local } from "./src/core/profile/profile-sync.js";
import { CheckpointManager } from "./src/utils/checkpoint.js";
import { countL1JsonlLines, countL1JsonlLinesSince } from "./src/core/record/l1-reader.js";
import { countL0JsonlStats } from "./src/core/conversation/l0-recorder.js";

// Core abstractions (host-neutral)
import { OpenClawHostAdapter } from "./src/adapters/openclaw/host-adapter.js";
Expand Down Expand Up @@ -271,7 +274,7 @@ export default function register(api: OpenClawPluginApi) {
});

// Initialize TdaiCore (async — store init, pipeline wiring)
const coreReady = core.initialize().then(() => {
const coreReady = core.initialize().then(async () => {
// Keep cleaner's SQLite handle updated after store init
memoryCleaner?.setVectorStore(core.getVectorStore());

Expand All @@ -282,6 +285,49 @@ export default function register(api: OpenClawPluginApi) {
api.logger.warn(`${TAG} Startup L2/L3 pull failed (non-fatal): ${err instanceof Error ? err.message : String(err)}`);
});
}

// Startup recalibration: recompute the four global counters against
// authoritative data (issue #157). Must be awaited inside the coreReady
// chain — `agent_end` does `await coreReady` (see below), so the first
// agent_end cannot overtake recalibration. This keeps recalibration
// free of concurrency with the L2 repair path in pipeline-factory
// (which would otherwise Math.max the counters back to stale values).
try {
const cpManager = new CheckpointManager(pluginDataDir, api.logger);
const cp = await cpManager.read();
const lastPersonaTime = cp.last_persona_time ?? "";
const [totalMemoriesExtracted, l0Stats, memoriesSinceLastPersona] = await Promise.all([
countL1JsonlLines(pluginDataDir),
countL0JsonlStats(pluginDataDir),
countL1JsonlLinesSince(pluginDataDir, lastPersonaTime),
]);
// Degraded fallback: when the store is unavailable, countL0() returns 0
// and would erase the real value — fall back to JSONL line count.
const vectorStore = core.getVectorStore();
const totalProcessed = vectorStore && !vectorStore.isDegraded()
? await vectorStore.countL0()
: l0Stats.lines;
const { was } = await cpManager.recalibrate({
totalMemoriesExtracted,
l0ConversationsCount: l0Stats.captures,
totalProcessed,
memoriesSinceLastPersona,
});
if (was.total_memories_extracted !== totalMemoriesExtracted
|| was.l0_conversations_count !== l0Stats.captures
|| was.total_processed !== totalProcessed
|| was.memories_since_last_persona !== memoriesSinceLastPersona) {
api.logger.info(
`${TAG} Checkpoint recalibrated: ` +
`total_memories_extracted ${was.total_memories_extracted}→${totalMemoriesExtracted}, ` +
`l0_conversations_count ${was.l0_conversations_count}→${l0Stats.captures}, ` +
`total_processed ${was.total_processed}→${totalProcessed}, ` +
`memories_since_last_persona ${was.memories_since_last_persona}→${memoriesSinceLastPersona}`,
);
}
} catch (err) {
api.logger.warn(`${TAG} Checkpoint recalibrate failed (non-fatal): ${err instanceof Error ? err.message : String(err)}`);
}
}).catch((err) => {
api.logger.error(`${TAG} Core init failed: ${err instanceof Error ? err.message : String(err)}`);
});
Expand Down
164 changes: 164 additions & 0 deletions src/core/conversation/l0-recorder.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
import { describe, it, expect, beforeEach, afterEach } from "vitest";
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";

import { countL0JsonlStats } from "./l0-recorder.js";

/**
* Builds an L0MessageRecord JSON line with the given recordedAt (and optional
* role/content). Fields mirror the L0MessageRecord shape from l0-recorder.ts.
*/
function line(recordedAt: string, opts: { role?: string; content?: string } = {}): string {
return JSON.stringify({
sessionKey: "s1",
sessionId: "sess1",
recordedAt,
id: `msg_${Math.random().toString(36).slice(2)}`,
role: opts.role ?? "user",
content: opts.content ?? "hello",
timestamp: Date.now(),
});
}

/**
* Writes the given raw string content to <baseDir>/conversations/<filename>.
* Creates the conversations directory (and baseDir) as needed.
*/
async function writeFile(baseDir: string, filename: string, content: string): Promise<void> {
const dir = path.join(baseDir, "conversations");
await fs.mkdir(dir, { recursive: true });
await fs.writeFile(path.join(dir, filename), content, "utf-8");
}

describe("countL0JsonlStats", () => {
let baseDir: string;

beforeEach(async () => {
baseDir = await fs.mkdtemp(path.join(os.tmpdir(), "l0-stats-"));
});

afterEach(async () => {
await fs.rm(baseDir, { recursive: true, force: true });
});

// ============================
// Tracer bullet: basic single-file counting
// ============================
it("counts one capture and all physical lines from a single file", async () => {
// Two messages sharing one recordedAt => 1 capture, 2 lines
const ts = "2026-06-24T10:00:00.000Z";
await writeFile(baseDir, "2026-06-24.jsonl", `${line(ts)}\n${line(ts)}\n`);

const stats = await countL0JsonlStats(baseDir);
expect(stats.captures).toBe(1);
expect(stats.lines).toBe(2);
});

// ============================
// Multiple captures
// ============================
it("counts distinct recordedAt values as separate captures", async () => {
const ts1 = "2026-06-24T10:00:00.000Z";
const ts2 = "2026-06-24T11:00:00.000Z";
const ts3 = "2026-06-24T12:00:00.000Z";
// 3 different recordedAt => 3 captures, 4 lines
await writeFile(baseDir, "2026-06-24.jsonl",
`${line(ts1)}\n${line(ts2)}\n${line(ts2)}\n${line(ts3)}\n`);

const stats = await countL0JsonlStats(baseDir);
expect(stats.captures).toBe(3);
expect(stats.lines).toBe(4);
});

// ============================
// Multi-shard (multiple daily files)
// ============================
it("aggregates across multiple daily shard files", async () => {
const ts1 = "2026-06-23T09:00:00.000Z";
const ts2 = "2026-06-24T10:00:00.000Z";
await writeFile(baseDir, "2026-06-23.jsonl", `${line(ts1)}\n${line(ts1)}\n`);
await writeFile(baseDir, "2026-06-24.jsonl", `${line(ts2)}\n`);

const stats = await countL0JsonlStats(baseDir);
expect(stats.captures).toBe(2);
expect(stats.lines).toBe(3);
});

// ============================
// Invariant: a bad line (missing recordedAt) must not inflate captures
// ============================
it("counts a line missing recordedAt toward lines but not captures", async () => {
const ts = "2026-06-24T10:00:00.000Z";
const badLine = JSON.stringify({ sessionKey: "s1", role: "user", content: "no-timestamp", id: "x" });
await writeFile(baseDir, "2026-06-24.jsonl", `${line(ts)}\n${badLine}\n`);

const stats = await countL0JsonlStats(baseDir);
expect(stats.lines).toBe(2);
expect(stats.captures).toBe(1);
});

it("counts a line with non-string recordedAt toward lines but not captures", async () => {
const ts = "2026-06-24T10:00:00.000Z";
const badLine = JSON.stringify({ sessionKey: "s1", recordedAt: 12345, role: "user", content: "x", id: "y" });
await writeFile(baseDir, "2026-06-24.jsonl", `${line(ts)}\n${badLine}\n`);

const stats = await countL0JsonlStats(baseDir);
expect(stats.lines).toBe(2);
expect(stats.captures).toBe(1);
});

it("counts a malformed (unparseable JSON) line toward lines but not captures", async () => {
const ts = "2026-06-24T10:00:00.000Z";
const garbage = "{not valid json";
await writeFile(baseDir, "2026-06-24.jsonl", `${line(ts)}\n${garbage}\n`);

const stats = await countL0JsonlStats(baseDir);
expect(stats.lines).toBe(2);
expect(stats.captures).toBe(1);
});

// ============================
// Non-shard filename filtering
// ============================
it("ignores files not matching the YYYY-MM-DD.jsonl pattern", async () => {
const ts = "2026-06-24T10:00:00.000Z";
await writeFile(baseDir, "2026-06-24.jsonl", `${line(ts)}\n`);
// These should be ignored:
await writeFile(baseDir, "2026-6-24.jsonl", `${line(ts)}\n`); // non-zero-padded
await writeFile(baseDir, "notes.txt", `${line(ts)}\n`);
await writeFile(baseDir, "2026-06-24.jsonl.bak", `${line(ts)}\n`);
await writeFile(baseDir, "2026-06-24.jsonl", `${line(ts)}\n`); // overwrite legit file

const stats = await countL0JsonlStats(baseDir);
expect(stats.lines).toBe(1);
expect(stats.captures).toBe(1);
});

// ============================
// Blank lines are neither lines nor captures
// ============================
it("skips whitespace-only lines entirely", async () => {
const ts = "2026-06-24T10:00:00.000Z";
await writeFile(baseDir, "2026-06-24.jsonl",
`${line(ts)}\n \n\t\n\n${line(ts)}\n`);

const stats = await countL0JsonlStats(baseDir);
expect(stats.lines).toBe(2);
expect(stats.captures).toBe(1);
});

// ============================
// Missing / empty directory
// ============================
it("returns zeros when the conversations directory does not exist", async () => {
const stats = await countL0JsonlStats(baseDir);
expect(stats).toEqual({ captures: 0, lines: 0 });
});

it("returns zeros when the conversations directory is empty", async () => {
await fs.mkdir(path.join(baseDir, "conversations"), { recursive: true });
const stats = await countL0JsonlStats(baseDir);
expect(stats).toEqual({ captures: 0, lines: 0 });
});
});
80 changes: 80 additions & 0 deletions src/core/conversation/l0-recorder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,86 @@ export async function readConversationMessagesGroupedBySessionId(
return groups;
}

/**
* Count L0 JSONL capture/line stats across all daily shard files.
*
* Walks every `<baseDir>/conversations/YYYY-MM-DD.jsonl` file once and returns:
* - `captures`: number of **distinct** `recordedAt` values (one per recording event)
* - `lines`: total physical non-empty line count (message count)
*
* No fallback timestamp: a line that is missing `recordedAt`, has a
* non-string `recordedAt`, or is otherwise unparseable still counts toward
* `lines` (it is a physical message) but is **never** added to the distinct
* `recordedAt` Set, so it cannot inflate `captures`. This deliberately differs
* from readConversationRecords (which backfills `new Date().toISOString()`);
* backfilling would manufacture phantom capture events.
*
* Files are filtered by the same `dateFilePattern` as readConversationRecords
* (`/^\d{4}-\d{2}-\d{2}\.jsonl$/`). Empty lines (whitespace-only after trim)
* are skipped — they are neither a message nor a capture.
*
* Returns `{captures:0, lines:0}` when the conversations directory does not
* exist (does not throw — mirrors readConversationRecords' first-run behavior).
*/
export async function countL0JsonlStats(
baseDir: string,
): Promise<{ captures: number; lines: number }> {
const conversationsDir = path.join(baseDir, "conversations");
const dateFilePattern = /^\d{4}-\d{2}-\d{2}\.jsonl$/;

let entries: string[];
try {
const dirEntries = await fs.readdir(conversationsDir, { withFileTypes: true });
entries = dirEntries
.filter((entry) => entry.isFile())
.map((entry) => entry.name);
} catch {
// Directory doesn't exist yet — normal for first conversation
return { captures: 0, lines: 0 };
}

const targetFiles = entries
.filter((name) => dateFilePattern.test(name))
.sort();

const recordedAtSet = new Set<string>();
let lines = 0;

for (const fileName of targetFiles) {
const filePath = path.join(conversationsDir, fileName);

let raw: string;
try {
raw = await fs.readFile(filePath, "utf-8");
} catch {
// Unreadable file: skip without aborting the whole scan
continue;
}

const fileLines = raw.split("\n");
for (const fileLine of fileLines) {
// Whitespace-only lines are not physical messages — skip entirely.
if (!fileLine.trim()) continue;

lines++;

try {
const parsed = JSON.parse(fileLine) as Record<string, unknown>;
// Only a genuine string recordedAt counts toward distinct captures.
// Missing / non-string / unparseable → still counted as a line above,
// but never added to the Set (no fallback timestamp).
if (typeof parsed.recordedAt === "string" && parsed.recordedAt) {
recordedAtSet.add(parsed.recordedAt);
}
} catch {
// Malformed JSON: counted as a physical line, not a capture.
}
}
}

return { captures: recordedAtSet.size, lines };
}

// ============================
// Helpers
// ============================
Expand Down
Loading