diff --git a/src/http-server.ts b/src/http-server.ts index 5db4c9a..b537877 100644 --- a/src/http-server.ts +++ b/src/http-server.ts @@ -49,8 +49,9 @@ const refreshOnStart = process.env.ADVISORY_REFRESH_ON_START !== 'false'; // Def const refreshIntervalMs = parseInt(process.env.ADVISORY_REFRESH_INTERVAL_MS || '0'); // Default: disabled (0) // Store transports by session ID -const transports: { [sessionId: string]: StreamableHTTPServerTransport } = {}; -const sessionTimeouts: Record = {}; +const transports = new Map(); +const sessionTimeouts = new Map(); +const cleanupInProgress = new Set(); const SESSION_TIMEOUT_MS = 30 * 60 * 1000; // 30 minutes // Periodic refresh cleanup function @@ -60,19 +61,31 @@ let stopPeriodicRefresh: (() => void) | null = null; * Clean up expired session */ function cleanupSession(sessionId: string): void { + if (cleanupInProgress.has(sessionId)) { + return; + } + + cleanupInProgress.add(sessionId); logger.info('Cleaning up session', { sessionId }); - const transport = transports[sessionId]; - if (transport) { - try { - transport.close(); - } catch (error) { - logger.warn('Error closing transport', { sessionId, error: error instanceof Error ? error.message : String(error) }); + try { + const transport = transports.get(sessionId); + if (transport) { + // Remove from registry first and detach onclose to avoid recursive cleanup. + transports.delete(sessionId); + transport.onclose = undefined; + try { + transport.close(); + } catch (error) { + logger.warn('Error closing transport', { sessionId, error: error instanceof Error ? error.message : String(error) }); + } } - delete transports[sessionId]; - } - if (sessionTimeouts[sessionId]) { - clearTimeout(sessionTimeouts[sessionId]); - delete sessionTimeouts[sessionId]; + const timeout = sessionTimeouts.get(sessionId); + if (timeout) { + clearTimeout(timeout); + sessionTimeouts.delete(sessionId); + } + } finally { + cleanupInProgress.delete(sessionId); } } @@ -80,13 +93,14 @@ function cleanupSession(sessionId: string): void { * Reset session timeout */ function resetSessionTimeout(sessionId: string): void { - if (sessionTimeouts[sessionId]) { - clearTimeout(sessionTimeouts[sessionId]); + const existingTimeout = sessionTimeouts.get(sessionId); + if (existingTimeout) { + clearTimeout(existingTimeout); } - sessionTimeouts[sessionId] = setTimeout(() => { + sessionTimeouts.set(sessionId, setTimeout(() => { logger.info('Session timeout', { sessionId }); cleanupSession(sessionId); - }, SESSION_TIMEOUT_MS); + }, SESSION_TIMEOUT_MS)); } // Store local API server instance @@ -154,9 +168,9 @@ app.post("/mcp", async (req: Request, res: Response) => { try { let transport: StreamableHTTPServerTransport; - if (sessionId && transports[sessionId]) { + if (sessionId && transports.has(sessionId)) { // Reuse existing transport and reset timeout - transport = transports[sessionId]; + transport = transports.get(sessionId)!; resetSessionTimeout(sessionId); await transport.handleRequest(req, res, req.body); } else if (!sessionId && isInitializeRequest(req.body)) { @@ -172,7 +186,7 @@ app.post("/mcp", async (req: Request, res: Response) => { }); // Store transport - transports[newSessionId] = transport; + transports.set(newSessionId, transport); // Set up session timeout resetSessionTimeout(newSessionId);