Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 105 additions & 0 deletions src/utils/pipeline-manager.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import { afterEach, describe, expect, it, vi } from "vitest";

import { MemoryPipelineManager, type CapturedMessage, type PipelineConfig } from "./pipeline-manager.js";

const config: PipelineConfig = {
everyNConversations: 1,
enableWarmup: false,
l1: {
idleTimeoutSeconds: 60,
},
l2: {
delayAfterL1Seconds: 1,
minIntervalSeconds: 5,
maxIntervalSeconds: 10,
sessionActiveWindowHours: 24,
},
};

function message(content: string): CapturedMessage {
return {
role: "user",
content,
timestamp: new Date().toISOString(),
};
}

async function flushMicrotasks(): Promise<void> {
await Promise.resolve();
await Promise.resolve();
}

describe("MemoryPipelineManager", () => {
afterEach(() => {
vi.useRealTimers();
});

it("stops idle L2 maxInterval polling after the cold-start retry also skips", async () => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-06-11T00:00:00.000Z"));

const manager = new MemoryPipelineManager(config);
const l1Runner = vi.fn().mockResolvedValue(undefined);
const l2Runner = vi.fn().mockResolvedValue({ skipped: true });

manager.setL1Runner(l1Runner);
manager.setL2Runner(l2Runner);

await manager.notifyConversation("session-a", [message("first")]);
await flushMicrotasks();
await vi.advanceTimersByTimeAsync(1_000);
await flushMicrotasks();

expect(l1Runner).toHaveBeenCalledTimes(1);
expect(l2Runner).toHaveBeenCalledTimes(1);

await vi.advanceTimersByTimeAsync(10_000);
await flushMicrotasks();

expect(l2Runner).toHaveBeenCalledTimes(2);

await vi.advanceTimersByTimeAsync(60_000);
await flushMicrotasks();

expect(l2Runner).toHaveBeenCalledTimes(2);

l2Runner.mockResolvedValueOnce({ skipped: true });
await manager.notifyConversation("session-a", [message("second")]);
await flushMicrotasks();
await vi.advanceTimersByTimeAsync(1_000);
await flushMicrotasks();

expect(l1Runner).toHaveBeenCalledTimes(2);
expect(l2Runner).toHaveBeenCalledTimes(3);

await manager.destroy();
});

it("keeps maxInterval polling after L2 has established a cursor", async () => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-06-11T00:00:00.000Z"));

const manager = new MemoryPipelineManager(config);
const l1Runner = vi.fn().mockResolvedValue(undefined);
const l2Runner = vi.fn()
.mockResolvedValueOnce({ latestCursor: "2026-06-11T00:00:01.000Z" })
.mockResolvedValue({ skipped: true });

manager.setL1Runner(l1Runner);
manager.setL2Runner(l2Runner);

await manager.notifyConversation("session-b", [message("first")]);
await flushMicrotasks();
await vi.advanceTimersByTimeAsync(1_000);
await flushMicrotasks();

expect(l2Runner).toHaveBeenCalledTimes(1);

await vi.advanceTimersByTimeAsync(10_000);
await flushMicrotasks();

expect(l2Runner).toHaveBeenCalledTimes(2);

await manager.destroy();
});
});
17 changes: 14 additions & 3 deletions src/utils/pipeline-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -858,7 +858,7 @@ export class MemoryPipelineManager {
this.logger?.debug?.(`${TAG} [${sessionKey}] Enqueuing L2 (trigger=${trigger}, queue=${this.l2Queue.name})`);

this.l2Queue.add(async () => {
await this.runL2(sessionKey);
await this.runL2(sessionKey, trigger);
}).catch((err) => {
this.logger?.error(
`${TAG} [${sessionKey}] L2 task failed: ${err instanceof Error ? err.stack ?? err.message : String(err)}`,
Expand All @@ -868,7 +868,7 @@ export class MemoryPipelineManager {
});
}

private async runL2(sessionKey: string): Promise<void> {
private async runL2(sessionKey: string, trigger: string): Promise<void> {
const state = this.sessionStates.get(sessionKey);
if (!state) return;

Expand Down Expand Up @@ -903,15 +903,26 @@ export class MemoryPipelineManager {
// and it was skipped (no new records), do NOT update l2LastRunTime.
// This prevents l2MinIntervalSeconds from blocking the next L2 trigger
// when the first L1 extraction produces actual memories shortly after.
//
// If the follow-up maxInterval retry still has no cursor, stop polling this
// idle session. The next L1 completion will re-arm L2 via advanceL2Timer().
const isFirstL2 = !this.l2LastRunTime.has(sessionKey);
const wasSkipped = result?.skipped === true;
const shouldStopIdleColdStartPolling =
trigger === "timer:max-interval" && !state.last_extraction_updated_time;

if (isFirstL2 && wasSkipped) {
this.logger?.info?.(
`${TAG} [${sessionKey}] L2 cold-start skip: not updating l2LastRunTime ` +
`(minInterval won't block next trigger)`,
);
this.armL2MaxInterval(sessionKey);
if (shouldStopIdleColdStartPolling) {
this.logger?.debug?.(
`${TAG} [${sessionKey}] L2 cold-start skip: stopping idle maxInterval polling until next L1 event`,
);
} else {
this.armL2MaxInterval(sessionKey);
}
await this.persistStates();
return;
}
Expand Down
Loading