diff --git a/src/core/tdai-core.ts b/src/core/tdai-core.ts index 9185e51d..00a5a0d5 100644 --- a/src/core/tdai-core.ts +++ b/src/core/tdai-core.ts @@ -392,6 +392,51 @@ export class TdaiCore { return this.schedulerStartPromise !== undefined; } + /** Read-only runtime diagnostics for status/debug endpoints. */ + getDiagnostics(sessionKey?: string): { + stores: { vectorStore: boolean; embeddingService: boolean }; + scheduler: { + enabled: boolean; + started: boolean; + destroyed: boolean; + queues: ReturnType; + }; + sessions: ReturnType; + } { + const emptyQueues: ReturnType = { + l1: 0, + l2: 0, + l3: 0, + l1Pending: false, + l2Pending: false, + l3Pending: false, + l1Idle: true, + l2Idle: true, + l3Idle: true, + }; + + const scheduler = this.scheduler; + const sessions = scheduler + ? sessionKey + ? [scheduler.getSessionDiagnostics(sessionKey)] + : scheduler.getAllSessionDiagnostics() + : []; + + return { + stores: { + vectorStore: !!this.vectorStore, + embeddingService: !!this.embeddingService, + }, + scheduler: { + enabled: !!scheduler, + started: this.isSchedulerStarted(), + destroyed: scheduler?.isDestroyed ?? false, + queues: scheduler?.getQueueSizes() ?? emptyQueues, + }, + sessions, + }; + } + /** Set the instance ID for metrics (may be resolved asynchronously). */ setInstanceId(id: string): void { this.instanceId = id; diff --git a/src/gateway/server.ts b/src/gateway/server.ts index 1da5592e..5f573e14 100644 --- a/src/gateway/server.ts +++ b/src/gateway/server.ts @@ -3,6 +3,7 @@ * * Exposes TDAI Core capabilities as HTTP endpoints: * GET /health — Health check + * GET /status — Read-only pipeline diagnostics * POST /recall — Memory recall (prefetch) * POST /capture — Conversation capture (sync_turn) * POST /search/memories — L1 memory search @@ -25,6 +26,7 @@ import { initDataDirectories } from "../utils/pipeline-factory.js"; import { SessionFilter } from "../utils/session-filter.js"; import type { HealthResponse, + StatusResponse, RecallRequest, RecallResponse, CaptureRequest, @@ -260,6 +262,8 @@ export class TdaiGateway { if (!this.checkAuth(req, res)) return; switch (`${method} ${pathname}`) { + case "GET /status": + return this.handleStatus(url, res); case "POST /recall": return await this.handleRecall(req, res); case "POST /capture": @@ -368,6 +372,12 @@ export class TdaiGateway { sendJson(res, 200, response); } + private handleStatus(url: URL, res: http.ServerResponse): void { + const sessionKey = url.searchParams.get("session_key") ?? undefined; + const response: StatusResponse = this.core.getDiagnostics(sessionKey); + sendJson(res, 200, response); + } + private async handleRecall(req: http.IncomingMessage, res: http.ServerResponse): Promise { const body = await parseJsonBody(req); diff --git a/src/gateway/types.ts b/src/gateway/types.ts index 50b2ff4c..f0785348 100644 --- a/src/gateway/types.ts +++ b/src/gateway/types.ts @@ -25,6 +25,58 @@ export interface HealthResponse { }; } +// ============================ +// /status +// ============================ + +export interface StatusResponse { + stores: { + vectorStore: boolean; + embeddingService: boolean; + }; + scheduler: { + enabled: boolean; + started: boolean; + destroyed: boolean; + queues: { + l1: number; + l2: number; + l3: number; + l1Pending: boolean; + l2Pending: boolean; + l3Pending: boolean; + l1Idle: boolean; + l2Idle: boolean; + l3Idle: boolean; + }; + }; + sessions: Array<{ + sessionKey: string; + known: boolean; + state?: { + conversation_count: number; + last_extraction_time: string; + last_extraction_updated_time: string; + last_active_time: number; + l2_pending_l1_count: number; + warmup_threshold: number; + l2_last_extraction_time: string; + }; + effectiveL1Threshold?: number; + bufferedMessageCount: number; + timers: { + l1IdlePending: boolean; + l1IdleScheduledTime: number; + l2SchedulePending: boolean; + l2ScheduleScheduledTime: number; + l1Queued: boolean; + l2Queued: boolean; + l1RetryCount: number; + }; + queues: StatusResponse["scheduler"]["queues"]; + }>; +} + // ============================ // /recall // ============================ diff --git a/src/utils/pipeline-manager.test.ts b/src/utils/pipeline-manager.test.ts new file mode 100644 index 00000000..7cbf0a00 --- /dev/null +++ b/src/utils/pipeline-manager.test.ts @@ -0,0 +1,52 @@ +import { describe, expect, it } from "vitest"; + +import { MemoryPipelineManager } from "./pipeline-manager.js"; + +const config = { + everyNConversations: 5, + enableWarmup: false, + l1: { idleTimeoutSeconds: 60 }, + l2: { + delayAfterL1Seconds: 90, + minIntervalSeconds: 300, + maxIntervalSeconds: 1800, + sessionActiveWindowHours: 24, + }, +}; + +describe("MemoryPipelineManager diagnostics", () => { + it("reports per-session L0/L1 waiting state without mutating queues", async () => { + const manager = new MemoryPipelineManager(config); + + await manager.notifyConversation("session-a", [ + { role: "user", content: "remember I prefer Chinese replies", timestamp: "2026-06-26T00:00:00.000Z" }, + { role: "assistant", content: "好的", timestamp: "2026-06-26T00:00:01.000Z" }, + ]); + + const diagnostic = manager.getSessionDiagnostics("session-a"); + + expect(diagnostic.known).toBe(true); + expect(diagnostic.sessionKey).toBe("session-a"); + expect(diagnostic.state?.conversation_count).toBe(1); + expect(diagnostic.effectiveL1Threshold).toBe(5); + expect(diagnostic.bufferedMessageCount).toBe(2); + expect(diagnostic.timers.l1IdlePending).toBe(true); + expect(diagnostic.timers.l2SchedulePending).toBe(false); + expect(diagnostic.queues.l1Idle).toBe(true); + + await manager.destroy(); + }); + + it("reports unknown sessions explicitly", () => { + const manager = new MemoryPipelineManager(config); + + const diagnostic = manager.getSessionDiagnostics("missing-session"); + + expect(diagnostic).toMatchObject({ + sessionKey: "missing-session", + known: false, + bufferedMessageCount: 0, + }); + expect(diagnostic.state).toBeUndefined(); + }); +}); diff --git a/src/utils/pipeline-manager.ts b/src/utils/pipeline-manager.ts index bccf2936..33f6956f 100644 --- a/src/utils/pipeline-manager.ts +++ b/src/utils/pipeline-manager.ts @@ -170,6 +170,24 @@ export type L3Runner = () => Promise; /** Callback to persist session states to checkpoint. */ export type PipelineStatePersister = (states: Record) => Promise; +export interface PipelineSessionDiagnostics { + sessionKey: string; + known: boolean; + state?: PipelineSessionState; + effectiveL1Threshold?: number; + bufferedMessageCount: number; + timers: { + l1IdlePending: boolean; + l1IdleScheduledTime: number; + l2SchedulePending: boolean; + l2ScheduleScheduledTime: number; + l1Queued: boolean; + l2Queued: boolean; + l1RetryCount: number; + }; + queues: ReturnType; +} + const TAG = "[memory-tdai] [pipeline]"; // ============================ @@ -1152,6 +1170,35 @@ export class MemoryPipelineManager { return Array.from(this.sessionStates.keys()); } + /** Read-only diagnostics for one tracked or requested session. */ + getSessionDiagnostics(sessionKey: string): PipelineSessionDiagnostics { + const state = this.sessionStates.get(sessionKey); + const timers = this.sessionTimers.get(sessionKey); + + return { + sessionKey, + known: !!state, + state: state ? { ...state } : undefined, + effectiveL1Threshold: state ? this.getEffectiveThreshold(state) : undefined, + bufferedMessageCount: this.getBufferedMessageCount(sessionKey), + timers: { + l1IdlePending: timers?.l1Idle.pending ?? false, + l1IdleScheduledTime: timers?.l1Idle.scheduledTime ?? 0, + l2SchedulePending: timers?.l2Schedule.pending ?? false, + l2ScheduleScheduledTime: timers?.l2Schedule.scheduledTime ?? 0, + l1Queued: timers?.l1Queued ?? false, + l2Queued: timers?.l2Queued ?? false, + l1RetryCount: timers?.l1RetryCount ?? 0, + }, + queues: this.getQueueSizes(), + }; + } + + /** Read-only diagnostics for every tracked session. */ + getAllSessionDiagnostics(): PipelineSessionDiagnostics[] { + return this.getSessionKeys().map((sessionKey) => this.getSessionDiagnostics(sessionKey)); + } + /** Whether the pipeline has been destroyed. */ get isDestroyed(): boolean { return this.destroyed;