Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions src/crawl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ import { FastifyInstance, FastifyRequest, FastifyReply } from 'fastify';
import type { Browser } from 'playwright-core';
import { loadEnvNumber } from './env.js';
import { runSerpQuery } from './serp.js';
import { getPool } from './pool.js';
import { getPool, type PooledSession } from './pool.js';
import type { SessionPool } from '@browsercash/pool';
import { CrawlResult, scrapeUrlWithFallback, isPdfUrl, isPdfSupportEnabled } from './scraper.js';
import { createPerfLogger } from './perf.js';

Expand Down Expand Up @@ -75,7 +76,7 @@ export async function registerCrawlRoutes(app: FastifyInstance): Promise<void> {

const indexedQueue = urls.map((url, index) => ({ url, index }));
const results: CrawlResult[] = new Array(urls.length);
const pool = getPool();
const pool = await getPool();

while (indexedQueue.length > 0) {
const { maxSize } = pool.stats();
Expand Down Expand Up @@ -182,11 +183,14 @@ export async function registerCrawlRoutes(app: FastifyInstance): Promise<void> {
});
}

const pool = getPool();
const session = await pool.acquire();
let pool: SessionPool | null = null;
let session: PooledSession | null = null;
let hadError = false;

try {
pool = await getPool();
session = await pool.acquire();

perf.beginStep('Scrape URL');
const result = await scrapeUrlWithFallback(session.browser as Browser, url);
perf.endStep('Scrape URL', { status: result.status });
Expand All @@ -207,7 +211,9 @@ export async function registerCrawlRoutes(app: FastifyInstance): Promise<void> {
message,
});
} finally {
pool.release(session, hadError);
if (pool && session) {
pool.release(session, hadError);
}
}
});
}
Expand Down
26 changes: 18 additions & 8 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import cors from '@fastify/cors';
import { loadEnvNumber, loadEnvString } from './env.js';
import { registerSerpRoutes } from './serp.js';
import { registerCrawlRoutes } from './crawl.js';
import { initPool, shutdownPool } from './pool.js';
import { configurePool, closeAllSessions } from './pool.js';

const DEBUG_LOG = process.env.DEBUG_LOG === '1' || process.env.DEBUG_LOG === 'true';

Expand All @@ -14,6 +14,19 @@ async function buildServer() {
// Health check endpoint
app.get('/health', async () => ({ ok: true }));

// Drain endpoint — closes local pool + all remote Browser.cash sessions
const drainToken = process.env.DRAIN_TOKEN;
app.post('/drain', async (req, reply) => {
if (drainToken) {
const supplied = req.headers['x-drain-token'] as string | undefined;
if (!supplied || supplied !== drainToken) {
return reply.status(403).send({ error: 'forbidden' });
}
}
const result = await closeAllSessions();
return reply.send({ ok: true, ...result });
});
Comment thread
coderabbitai[bot] marked this conversation as resolved.

// Register route handlers
await registerSerpRoutes(app);
await registerCrawlRoutes(app);
Expand All @@ -31,24 +44,21 @@ async function main() {
const HOST = loadEnvString('HOST', '0.0.0.0');
const POOL_SIZE = loadEnvNumber('POOL_SIZE', 1);

// Pre-warm browser session pool
if (DEBUG_LOG) {
console.log(`[crawler] Pre-warming session pool (size=${POOL_SIZE})`);
}
await initPool(POOL_SIZE);
// Configure pool size (lazy — no sessions created until first scrape)
configurePool(POOL_SIZE);

// Start server
const app = await buildServer();
await app.listen({ port: PORT, host: HOST });
app.log.info(`Server listening on http://${HOST}:${PORT}`);

// Graceful shutdown
// Graceful shutdown — close pool + all remote sessions
const shutdown = async () => {
if (DEBUG_LOG) {
console.log('[crawler] Shutting down...');
}
await app.close();
await shutdownPool();
await closeAllSessions();
process.exit(0);
};

Expand Down
134 changes: 108 additions & 26 deletions src/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,49 +14,131 @@ export type { PooledSession };

// Singleton pool instance
let pool: SessionPool | null = null;
let poolSize: number = 1;
let poolInitPromise: Promise<void> | null = null;

/**
* Get the initialized pool instance
* @throws Error if pool not initialized
* Get the initialized pool instance (lazy — creates on first call).
* Uses a single-flight promise so concurrent callers share one init.
*/
export function getPool(): SessionPool {
export async function getPool(): Promise<SessionPool> {
if (!pool) {
throw new Error('Pool not initialized - call initPool() first');
pool = new SessionPool({
apiKey: BROWSER_API_KEY,
chromium,
size: poolSize,
maxUses: SESSION_MAX_USES,
maxAgeMs: SESSION_MAX_AGE_MS,
enableHealthCheck: true,
healthCheckIntervalMs: HEALTH_CHECK_INTERVAL_MS,
enableWaitQueue: true,
enableDisconnectHandling: true,
debug: DEBUG_LOG,
});
}

if (!poolInitPromise) {
if (DEBUG_LOG) {
console.log(`[pool] Lazy-initializing session pool (size=${poolSize})`);
}
poolInitPromise = pool.init().catch((err) => {
pool = null;
poolInitPromise = null;
throw err;
});
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

await poolInitPromise;
return pool;
}

/**
* Initialize the browser session pool
* Configure pool size (call before first getPool)
*/
export async function initPool(size: number): Promise<SessionPool> {
if (pool) {
return pool;
export function configurePool(size: number): void {
if (pool || poolInitPromise) {
console.warn('[pool] configurePool called after pool initialization; size change ignored');
return;
}

pool = new SessionPool({
apiKey: BROWSER_API_KEY,
chromium,
size,
maxUses: SESSION_MAX_USES,
maxAgeMs: SESSION_MAX_AGE_MS,
enableHealthCheck: true,
healthCheckIntervalMs: HEALTH_CHECK_INTERVAL_MS,
enableWaitQueue: true,
enableDisconnectHandling: true,
debug: DEBUG_LOG,
});

await pool.init();
return pool;
poolSize = size;
}

/**
* Shutdown the pool and close all sessions
*/
export async function shutdownPool(): Promise<void> {
if (pool) {
await pool.shutdown();
pool = null;
try {
await pool.shutdown();
} catch (err) {
if (DEBUG_LOG) {
console.error('[pool] Error during pool.shutdown():', err);
}
} finally {
pool = null;
poolInitPromise = null;
}
}
}

/**
* Close all active Browser.cash sessions via the REST API
*/
const CLOSE_TIMEOUT_MS = 10_000;

export async function closeAllSessions(): Promise<{ closed: number }> {
try {
await shutdownPool();
} catch (err) {
if (DEBUG_LOG) {
console.error('[pool] shutdownPool failed, continuing with session cleanup:', err);
}
}

let closed = 0;
try {
let page = 1;
let totalPages = 1;
Comment thread
coderabbitai[bot] marked this conversation as resolved.

do {
const listCtrl = AbortSignal.timeout(CLOSE_TIMEOUT_MS);
const res = await fetch(
`https://api.driver.dev/v1/browser/sessions?pageSize=100&page=${page}`,
{ headers: { Authorization: `Bearer ${BROWSER_API_KEY}` }, signal: listCtrl }
);
if (!res.ok) {
throw new Error(`Session list failed: ${res.status} ${res.statusText}`);
}
const data = await res.json() as {
sessions?: Array<{ sessionId: string; status: string }>;
totalPages?: number;
};
Comment thread
ImranJM1425 marked this conversation as resolved.
totalPages = data.totalPages ?? 1;
const sessions = data.sessions ?? [];
const active = sessions.filter((s) => s.status === 'active');

const deletePromises = active.map(async (s) => {
try {
const delCtrl = AbortSignal.timeout(CLOSE_TIMEOUT_MS);
const delRes = await fetch(
`https://api.driver.dev/v1/browser/session?sessionId=${s.sessionId}`,
{ method: 'DELETE', headers: { Authorization: `Bearer ${BROWSER_API_KEY}` }, signal: delCtrl }
);
return delRes.ok ? 1 : 0;
} catch {
return 0;
}
});
const results = await Promise.allSettled(deletePromises);
closed += results.reduce((sum, r) => sum + (r.status === 'fulfilled' ? r.value : 0), 0);

page++;
} while (page <= totalPages);
} catch (err) {
if (DEBUG_LOG) {
console.error('[pool] Error closing sessions via API:', err);
}
}

return { closed };
}