diff --git a/src/utils/pipeline-manager.test.ts b/src/utils/pipeline-manager.test.ts new file mode 100644 index 00000000..2be8efa5 --- /dev/null +++ b/src/utils/pipeline-manager.test.ts @@ -0,0 +1,105 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; + +import { MemoryPipelineManager, type CapturedMessage, type PipelineConfig } from "./pipeline-manager.js"; + +const config: PipelineConfig = { + everyNConversations: 1, + enableWarmup: false, + l1: { + idleTimeoutSeconds: 60, + }, + l2: { + delayAfterL1Seconds: 1, + minIntervalSeconds: 5, + maxIntervalSeconds: 10, + sessionActiveWindowHours: 24, + }, +}; + +function message(content: string): CapturedMessage { + return { + role: "user", + content, + timestamp: new Date().toISOString(), + }; +} + +async function flushMicrotasks(): Promise { + await Promise.resolve(); + await Promise.resolve(); +} + +describe("MemoryPipelineManager", () => { + afterEach(() => { + vi.useRealTimers(); + }); + + it("stops idle L2 maxInterval polling after the cold-start retry also skips", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-06-11T00:00:00.000Z")); + + const manager = new MemoryPipelineManager(config); + const l1Runner = vi.fn().mockResolvedValue(undefined); + const l2Runner = vi.fn().mockResolvedValue({ skipped: true }); + + manager.setL1Runner(l1Runner); + manager.setL2Runner(l2Runner); + + await manager.notifyConversation("session-a", [message("first")]); + await flushMicrotasks(); + await vi.advanceTimersByTimeAsync(1_000); + await flushMicrotasks(); + + expect(l1Runner).toHaveBeenCalledTimes(1); + expect(l2Runner).toHaveBeenCalledTimes(1); + + await vi.advanceTimersByTimeAsync(10_000); + await flushMicrotasks(); + + expect(l2Runner).toHaveBeenCalledTimes(2); + + await vi.advanceTimersByTimeAsync(60_000); + await flushMicrotasks(); + + expect(l2Runner).toHaveBeenCalledTimes(2); + + l2Runner.mockResolvedValueOnce({ skipped: true }); + await manager.notifyConversation("session-a", [message("second")]); + await flushMicrotasks(); + await vi.advanceTimersByTimeAsync(1_000); + await flushMicrotasks(); + + expect(l1Runner).toHaveBeenCalledTimes(2); + expect(l2Runner).toHaveBeenCalledTimes(3); + + await manager.destroy(); + }); + + it("keeps maxInterval polling after L2 has established a cursor", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-06-11T00:00:00.000Z")); + + const manager = new MemoryPipelineManager(config); + const l1Runner = vi.fn().mockResolvedValue(undefined); + const l2Runner = vi.fn() + .mockResolvedValueOnce({ latestCursor: "2026-06-11T00:00:01.000Z" }) + .mockResolvedValue({ skipped: true }); + + manager.setL1Runner(l1Runner); + manager.setL2Runner(l2Runner); + + await manager.notifyConversation("session-b", [message("first")]); + await flushMicrotasks(); + await vi.advanceTimersByTimeAsync(1_000); + await flushMicrotasks(); + + expect(l2Runner).toHaveBeenCalledTimes(1); + + await vi.advanceTimersByTimeAsync(10_000); + await flushMicrotasks(); + + expect(l2Runner).toHaveBeenCalledTimes(2); + + await manager.destroy(); + }); +}); diff --git a/src/utils/pipeline-manager.ts b/src/utils/pipeline-manager.ts index bccf2936..1de071e6 100644 --- a/src/utils/pipeline-manager.ts +++ b/src/utils/pipeline-manager.ts @@ -858,7 +858,7 @@ export class MemoryPipelineManager { this.logger?.debug?.(`${TAG} [${sessionKey}] Enqueuing L2 (trigger=${trigger}, queue=${this.l2Queue.name})`); this.l2Queue.add(async () => { - await this.runL2(sessionKey); + await this.runL2(sessionKey, trigger); }).catch((err) => { this.logger?.error( `${TAG} [${sessionKey}] L2 task failed: ${err instanceof Error ? err.stack ?? err.message : String(err)}`, @@ -868,7 +868,7 @@ export class MemoryPipelineManager { }); } - private async runL2(sessionKey: string): Promise { + private async runL2(sessionKey: string, trigger: string): Promise { const state = this.sessionStates.get(sessionKey); if (!state) return; @@ -903,15 +903,26 @@ export class MemoryPipelineManager { // and it was skipped (no new records), do NOT update l2LastRunTime. // This prevents l2MinIntervalSeconds from blocking the next L2 trigger // when the first L1 extraction produces actual memories shortly after. + // + // If the follow-up maxInterval retry still has no cursor, stop polling this + // idle session. The next L1 completion will re-arm L2 via advanceL2Timer(). const isFirstL2 = !this.l2LastRunTime.has(sessionKey); const wasSkipped = result?.skipped === true; + const shouldStopIdleColdStartPolling = + trigger === "timer:max-interval" && !state.last_extraction_updated_time; if (isFirstL2 && wasSkipped) { this.logger?.info?.( `${TAG} [${sessionKey}] L2 cold-start skip: not updating l2LastRunTime ` + `(minInterval won't block next trigger)`, ); - this.armL2MaxInterval(sessionKey); + if (shouldStopIdleColdStartPolling) { + this.logger?.debug?.( + `${TAG} [${sessionKey}] L2 cold-start skip: stopping idle maxInterval polling until next L1 event`, + ); + } else { + this.armL2MaxInterval(sessionKey); + } await this.persistStates(); return; }