diff --git a/db/migrations/002_resumable_queue_jobs.sql b/db/migrations/002_resumable_queue_jobs.sql
new file mode 100644
index 0000000..b521486
--- /dev/null
+++ b/db/migrations/002_resumable_queue_jobs.sql
@@ -0,0 +1,30 @@
+ALTER TABLE jobs ADD COLUMN IF NOT EXISTS check_run_completed_at TIMESTAMPTZ;
+ALTER TABLE jobs ADD COLUMN IF NOT EXISTS lease_owner TEXT;
+ALTER TABLE jobs ADD COLUMN IF NOT EXISTS lease_expires_at TIMESTAMPTZ;
+ALTER TABLE jobs ADD COLUMN IF NOT EXISTS heartbeat_at TIMESTAMPTZ;
+ALTER TABLE jobs ADD COLUMN IF NOT EXISTS recovery_count INTEGER NOT NULL DEFAULT 0;
+ALTER TABLE jobs ADD COLUMN IF NOT EXISTS last_queue_message_at TIMESTAMPTZ;
+ALTER TABLE file_reviews ADD COLUMN IF NOT EXISTS transient_error_count INTEGER NOT NULL DEFAULT 0;
+
+CREATE INDEX IF NOT EXISTS jobs_lease_expiry_idx
+ ON jobs (lease_expires_at)
+ WHERE status = 'running' AND lease_expires_at IS NOT NULL;
+
+CREATE INDEX IF NOT EXISTS jobs_terminal_check_idx
+ ON jobs (status, check_run_completed_at)
+ WHERE check_run_id IS NOT NULL AND check_run_completed_at IS NULL;
+
+CREATE INDEX IF NOT EXISTS jobs_unleased_running_idx
+ ON jobs (last_queue_message_at, heartbeat_at)
+ WHERE status = 'running' AND lease_expires_at IS NULL;
+
+DELETE FROM file_reviews fr
+USING (
+ SELECT id, ROW_NUMBER() OVER (PARTITION BY job_id, file_path ORDER BY created_at ASC, id ASC) AS row_number
+ FROM file_reviews
+) ranked
+WHERE fr.id = ranked.id
+ AND ranked.row_number > 1;
+
+CREATE UNIQUE INDEX IF NOT EXISTS file_reviews_job_file_path_key
+ ON file_reviews (job_id, file_path);
diff --git a/scripts/test.mjs b/scripts/test.mjs
index 0eeed33..804fba1 100644
--- a/scripts/test.mjs
+++ b/scripts/test.mjs
@@ -71,7 +71,7 @@ if (!usableEnvValue(process.env.TEST_DATABASE_URL)) {
process.exit(1);
}
-process.env.DATABASE_URL = usableEnvValue(process.env.DATABASE_URL) ?? process.env.TEST_DATABASE_URL;
+process.env.DATABASE_URL = process.env.TEST_DATABASE_URL;
run(process.execPath, ['scripts/migrate.mjs']);
run(process.execPath, ['node_modules/vitest/vitest.mjs', 'run']);
diff --git a/src/client/components/features/job-detail/job-meta-cards.tsx b/src/client/components/features/job-detail/job-meta-cards.tsx
index 56ae254..acdae17 100644
--- a/src/client/components/features/job-detail/job-meta-cards.tsx
+++ b/src/client/components/features/job-detail/job-meta-cards.tsx
@@ -9,6 +9,8 @@ interface JobMetaCardsProps {
}
export function JobMetaCards({ job }: JobMetaCardsProps) {
+ const isPartialReview = job.status === 'done' && job.errorMessage?.startsWith('Partial review:');
+
return (
{/* Details */}
@@ -75,10 +77,16 @@ export function JobMetaCards({ job }: JobMetaCardsProps) {
{job.errorMessage && (
-
Error
-
{job.errorMessage}
+
+ {isPartialReview ? 'Partial review' : 'Error'}
+
+
{job.errorMessage}
)}
diff --git a/src/client/components/ui/badge.tsx b/src/client/components/ui/badge.tsx
index 58a5bb3..dff9c95 100644
--- a/src/client/components/ui/badge.tsx
+++ b/src/client/components/ui/badge.tsx
@@ -58,6 +58,14 @@ function StatusBadge({ label, job }: { label: string; job?: JobSummary }) {
return
;
}
+ if (job && label === 'done' && job.errorMessage) {
+ return (
+
+ partial
+
+ );
+ }
+
return (
{label.replace(/_/g, ' ')}
diff --git a/src/client/pages/settings.tsx b/src/client/pages/settings.tsx
index 5394c67..cc75810 100644
--- a/src/client/pages/settings.tsx
+++ b/src/client/pages/settings.tsx
@@ -39,12 +39,12 @@ const DEFAULT_GLOBAL_CONFIG: ModelRouteConfig = {
],
};
-function normalizeGlobalConfig(config: any): ModelRouteConfig {
+export function normalizeGlobalConfig(config: any): ModelRouteConfig {
if (!config || !config.main) return DEFAULT_GLOBAL_CONFIG;
return {
main: config.main,
- fallbacks: config.fallbacks?.length ? config.fallbacks : DEFAULT_GLOBAL_CONFIG.fallbacks,
- size_overrides: config.size_overrides ?? DEFAULT_GLOBAL_CONFIG.size_overrides,
+ fallbacks: Array.isArray(config.fallbacks) ? config.fallbacks : DEFAULT_GLOBAL_CONFIG.fallbacks,
+ size_overrides: Array.isArray(config.size_overrides) ? config.size_overrides : DEFAULT_GLOBAL_CONFIG.size_overrides,
};
}
diff --git a/src/server/core/diff.ts b/src/server/core/diff.ts
index 153706f..64a5641 100644
--- a/src/server/core/diff.ts
+++ b/src/server/core/diff.ts
@@ -255,12 +255,23 @@ export function truncateFileDiff(file: FileDiff, maxLines: number): FileDiff {
const keptHunks: DiffHunk[] = [];
for (const hunk of file.hunks) {
- if (currentLines + hunk.lines.length > maxLines && keptHunks.length > 0) {
+ const remainingLines = maxLines - currentLines;
+ if (remainingLines <= 0) {
break;
}
- keptHunks.push(hunk);
- currentLines += hunk.lines.length;
- if (currentLines > maxLines) break;
+
+ if (hunk.lines.length <= remainingLines) {
+ keptHunks.push(hunk);
+ currentLines += hunk.lines.length;
+ continue;
+ }
+
+ keptHunks.push({
+ ...hunk,
+ lines: hunk.lines.slice(0, remainingLines),
+ });
+ currentLines += remainingLines;
+ break;
}
return {
diff --git a/src/server/core/job-recovery.ts b/src/server/core/job-recovery.ts
new file mode 100644
index 0000000..9b3a80a
--- /dev/null
+++ b/src/server/core/job-recovery.ts
@@ -0,0 +1,61 @@
+import type { AppBindings } from '@server/env';
+import { getTerminalJobsNeedingCheckRunCompletion, markJobCheckRunCompleted, recoverExpiredJobLeases } from '@server/db/jobs';
+import { logger } from '@server/core/logger';
+import { GitHubService } from '@server/services/github';
+
+const MAX_RECOVERY_COUNT = 3;
+
+export async function recoverJobs(env: AppBindings) {
+ try {
+ const recovered = await recoverExpiredJobLeases(env, MAX_RECOVERY_COUNT);
+ for (const jobId of recovered.requeuedJobIds) {
+ await env.REVIEW_QUEUE.send({
+ jobId,
+ deliveryId: crypto.randomUUID(),
+ phase: 'review',
+ });
+ }
+
+ if (recovered.requeuedJobIds.length > 0 || recovered.failedJobs.length > 0) {
+ logger.warn('Expired job leases recovered', {
+ requeued: recovered.requeuedJobIds.length,
+ failed: recovered.failedJobs.length,
+ });
+ }
+ } catch (err) {
+ logger.error('Failed to recover expired job leases', err instanceof Error ? err : new Error(String(err)));
+ }
+}
+
+export async function completeTerminalCheckRuns(env: AppBindings) {
+ const jobs = await getTerminalJobsNeedingCheckRunCompletion(env);
+ for (const job of jobs) {
+ if (!job.check_run_id) continue;
+
+ try {
+ const github = new GitHubService(env, job.installation_id);
+ await github.updateCheckRun(job.owner, job.repo, job.check_run_id, {
+ status: 'completed',
+ conclusion: job.status === 'superseded' ? 'neutral' : 'failure',
+ title: job.status === 'superseded' ? 'Review superseded' : 'Review failed',
+ summary: job.error_msg ?? (job.status === 'superseded' ? 'Superseded by a newer commit or job.' : 'Review failed.'),
+ });
+ await markJobCheckRunCompleted(env, job.id);
+ } catch (error) {
+ logger.error(`Failed to complete terminal check run for job ${job.id}`, error instanceof Error ? error : new Error(String(error)));
+ }
+ }
+}
+
+export async function runOpportunisticJobMaintenance(env: AppBindings) {
+ await recoverJobs(env);
+ await completeTerminalCheckRuns(env);
+}
+
+export async function runBestEffortJobMaintenance(env: AppBindings) {
+ try {
+ await runOpportunisticJobMaintenance(env);
+ } catch (error) {
+ logger.error('Opportunistic job maintenance failed', error instanceof Error ? error : new Error(String(error)));
+ }
+}
diff --git a/src/server/core/model-output.ts b/src/server/core/model-output.ts
index fc8ebc9..856912e 100644
--- a/src/server/core/model-output.ts
+++ b/src/server/core/model-output.ts
@@ -5,6 +5,13 @@ import { findClosestValidLine, findPositionForLine, getValidNewLines, getValidPo
import type { FileDiff } from './diff';
import { jsonrepair } from 'jsonrepair';
+const MAX_LOGGED_MODEL_OUTPUT_CHARS = 2_000;
+
+function truncateForLog(value: string) {
+ if (value.length <= MAX_LOGGED_MODEL_OUTPUT_CHARS) return value;
+ return `${value.slice(0, MAX_LOGGED_MODEL_OUTPUT_CHARS)}... [truncated ${value.length - MAX_LOGGED_MODEL_OUTPUT_CHARS} chars]`;
+}
+
function hasReviewKeys(input: string) {
return /"(findings|overall_explanation|overall_correctness|overall_confidence_score|summary)"\s*:/.test(input);
}
@@ -253,7 +260,7 @@ export function parseFileReviewResponse(raw: string, file: FileDiff): {
throw new Error('Model response did not contain review JSON keys.');
}
} catch (e) {
- logger.error('Failed to extract JSON from model response', { raw, error: e });
+ logger.error('Failed to extract JSON from model response', { raw: truncateForLog(raw), error: e });
throw new Error('Could not find JSON root in model response.');
}
@@ -269,14 +276,14 @@ export function parseFileReviewResponse(raw: string, file: FileDiff): {
try {
repaired = jsonrepair(preprocessed);
} catch (e) {
- logger.warn('jsonrepair failed to fix model output, using preprocessed text', { preprocessed, error: e });
+ logger.warn('jsonrepair failed to fix model output, using preprocessed text', { preprocessed: truncateForLog(preprocessed), error: e });
}
let parsedJson: any;
try {
parsedJson = JSON.parse(repaired);
} catch (e) {
- logger.error('Critical JSON parse error after extraction and repair', { repaired, error: e });
+ logger.error('Critical JSON parse error after extraction and repair', { repaired: truncateForLog(repaired), error: e });
throw new Error(`Invalid JSON format: ${e instanceof Error ? e.message : 'Unknown error'}`);
}
diff --git a/src/server/core/review.ts b/src/server/core/review.ts
index de56f0b..57187fa 100644
--- a/src/server/core/review.ts
+++ b/src/server/core/review.ts
@@ -1,14 +1,14 @@
import { logger } from './logger';
import { isSupportedGitHubWebhookEvent, type GitHubWebhookEventName, type GitHubWebhookPayload, type IssueCommentWebhookPayload, type PullRequestWebhookPayload } from '@shared/github';
-import { defaultRepoConfig, type ParsedReviewComment, type RepoConfig, type ReviewJobMessage } from '@shared/schema';
+import { defaultRepoConfig, normalizeModelId, type ParsedReviewComment, type RepoConfig, type ReviewJobMessage } from '@shared/schema';
import type { AppBindings } from '@server/env';
-import { getFileReviewsForJobs } from '@server/db/file-reviews';
-import { completeJob, failJob, findExistingJobForHead, getJobForProcessing, insertJob, mapJob, startJobProcessing, completePreparationStep, supersedeOlderJobs, updateJobCheckRun, updateJobStep } from '@server/db/jobs';
+import { getFileReviewsForJobs, recordRetryableFileReviewFailure, upsertFileReview } from '@server/db/file-reviews';
+import { claimJobLease, completeJob, completePreparationStep, failJob, findExistingJobForHead, getJobForProcessing, heartbeatJobLease, insertJob, mapJob, markJobCheckRunCompleted, markJobContinuationQueued, releaseJobLease, supersedeOlderJobs, updateJobCheckRun, updateJobStep } from '@server/db/jobs';
import { filterReviewableFiles, parseUnifiedDiff } from './diff';
import { GitHubService } from '../services/github';
import { GitHubClient } from './github';
-import { ModelService } from '../services/model';
+import { isRetryableModelError, ModelService } from '../services/model';
import { FormatterService } from '../services/formatter';
import { TokenTracker } from './token-tracker';
import { loadRepoConfig } from './config';
@@ -16,6 +16,67 @@ import { getWebhookDelivery } from '@server/db/webhook-deliveries';
type PersistedReviewJob = ReturnType;
+export type ReviewJobRunResult = { action: 'ack' } | { action: 'retry'; delaySeconds: number };
+
+const REVIEW_CHUNK_FILE_LIMIT = 2;
+const REVIEW_CHUNK_WALL_CLOCK_MS = 8 * 60 * 1000;
+const JOB_LEASE_SECONDS = 10 * 60;
+const BUSY_RETRY_SECONDS = 60;
+const RETRYABLE_MODEL_FAILURE_RETRY_SECONDS = 60;
+const MAX_RETRYABLE_FILE_REVIEW_FAILURES = 3;
+
+function isRetryableFileReviewErrorMessage(message: string | null | undefined) {
+ if (!message) return false;
+ const lower = message.toLowerCase();
+ return (
+ lower.includes('all configured review models failed') ||
+ lower.includes('retrying later') ||
+ lower.includes('google request failed with 5') ||
+ lower.includes('cloudflare') ||
+ lower.includes('timeout') ||
+ lower.includes('timed out') ||
+ lower.includes('internal error') ||
+ lower.includes('unavailable') ||
+ lower.includes('high demand') ||
+ lower.includes('temporary') ||
+ lower.includes('[redacted]') ||
+ lower.includes('returned no review content') ||
+ lower.includes('empty response')
+ );
+}
+
+function shouldRetryExistingFileReview(review: { file_status: string; error_msg: string | null }) {
+ return review.file_status === 'failed' && isRetryableFileReviewErrorMessage(review.error_msg);
+}
+
+function countsAsHandledFileReview(review: { file_status: string; error_msg: string | null }) {
+ return !shouldRetryExistingFileReview(review);
+}
+
+function configuredModelSet(config: RepoConfig) {
+ const models = new Set();
+ const addModel = (model: string | null | undefined) => {
+ if (model) models.add(normalizeModelId(model));
+ };
+
+ addModel(config.model?.main ?? 'gemma-4-31b-it');
+ for (const fallback of config.model?.fallbacks ?? []) {
+ addModel(fallback);
+ }
+ for (const tier of config.model?.size_overrides ?? []) {
+ addModel(tier.model);
+ for (const fallback of tier.fallbacks ?? []) {
+ addModel(fallback);
+ }
+ }
+
+ return models;
+}
+
+function canInheritParentFileReview(config: RepoConfig, review: { model_used: string }) {
+ return configuredModelSet(config).has(normalizeModelId(review.model_used));
+}
+
function shouldTriggerFromPullRequest(action: PullRequestWebhookPayload['action'], config: RepoConfig['review']) {
return (config.on as string[]).includes(action);
}
@@ -94,341 +155,396 @@ export function extractReviewRequest(input: {
return null;
}
-export async function runReviewJob(env: AppBindings, message: ReviewJobMessage) {
- let job: PersistedReviewJob;
-
- if (message.jobId) {
- const row = await getJobForProcessing(env, message.jobId);
- if (!row) {
- logger.warn(`Job not found for processing: ${message.jobId}`);
- return;
- }
-
- job = mapJob(row);
- if (job.status === 'superseded') {
- logger.info(`Job ${job.id} is superseded, skipping processing.`);
- return;
- }
- if (job.status === 'running') {
- logger.info(`Job ${job.id} is already running, skipping duplicate queue delivery.`);
- return;
- }
- } else {
- if (!message.eventName) {
- logger.warn('Queue message ignored: missing eventName');
- return;
- }
+export async function runReviewJob(env: AppBindings, message: ReviewJobMessage): Promise {
+ const resolved = await resolveQueuedJob(env, message);
+ if (!resolved) {
+ return { action: 'ack' };
+ }
- let eventName = message.eventName;
- let payload = message.payload as GitHubWebhookPayload | undefined;
+ const leaseOwner = crypto.randomUUID();
+ const claim = await claimJobLease(env, resolved.job.id, leaseOwner, JOB_LEASE_SECONDS);
+ if (claim.status === 'missing') {
+ logger.warn(`Job not found for processing: ${resolved.job.id}`);
+ return { action: 'ack' };
+ }
+ if (claim.status === 'terminal') {
+ logger.info(`Job ${resolved.job.id} is already terminal (${claim.row.status}), acking queue delivery.`);
+ return { action: 'ack' };
+ }
+ if (claim.status === 'busy') {
+ logger.info(`Job ${resolved.job.id} has a fresh lease; retrying queue delivery later.`);
+ return { action: 'retry', delaySeconds: Math.min(BUSY_RETRY_SECONDS, claim.retryAfterSeconds) };
+ }
- if (payload === undefined) {
- const delivery = await getWebhookDelivery(env, message.deliveryId);
- if (!delivery) {
- logger.warn(`Queue message ignored: webhook delivery not found: ${message.deliveryId}`);
- return;
- }
+ const job = mapJob(claim.row);
+ const phase = resolved.phase;
+ const tracker = new TokenTracker();
+ const github = new GitHubService(env, job.installationId, tracker);
+ const model = new ModelService(env, tracker, { jobId: job.id });
+ const formatter = new FormatterService(env.APP_URL);
- eventName = delivery.event_name;
- payload = delivery.payload as GitHubWebhookPayload;
+ try {
+ if (phase === 'prepare') {
+ await runPreparePhase(env, job, leaseOwner, github);
+ } else if (phase === 'finalize') {
+ await runFinalizePhase(env, job, leaseOwner, github, model, formatter);
+ } else {
+ await runReviewPhase(env, job, leaseOwner, github, model);
}
- if (!isSupportedGitHubWebhookEvent(eventName)) {
- logger.info(`Queue message ignored: unsupported GitHub event ${eventName}`);
- return;
+ await releaseJobLease(env, job.id, leaseOwner);
+ return { action: 'ack' };
+ } catch (error) {
+ const messageText = error instanceof Error ? error.message : 'Unknown review failure';
+ if (messageText === 'JOB_SUPERSEDED') {
+ logger.info(`Job ${job.id} was superseded during execution, stopping.`);
+ await releaseJobLease(env, job.id, leaseOwner);
+ return { action: 'ack' };
}
- const installationId = String(payload.installation?.id ?? '');
- if (!installationId || !('repository' in payload) || !payload.repository) {
- logger.info('Queue message ignored: missing installation or repository info');
- return;
+ if (isRetryableModelError(error)) {
+ logger.warn(`Review job hit transient model/provider failure; scheduling delayed continuation: ${job.owner}/${job.repo} PR #${job.prNumber}`, {
+ error: messageText,
+ phase,
+ delaySeconds: RETRYABLE_MODEL_FAILURE_RETRY_SECONDS,
+ });
+ await enqueueJobPhase(env, job.id, phase, RETRYABLE_MODEL_FAILURE_RETRY_SECONDS);
+ await releaseJobLease(env, job.id, leaseOwner);
+ return { action: 'ack' };
}
- // 1. Load Repo Config
- const repoConfig = await loadRepoConfig(env, {
- installationId,
- owner: payload.repository.owner.login,
- repo: payload.repository.name,
- });
+ logger.error(`Review job failed: ${job.owner}/${job.repo} PR #${job.prNumber}`, error);
+ await failJobAndCheckRun(env, job, github, messageText);
+ return { action: 'ack' };
+ }
+}
- if (repoConfig.enabled === false) {
- logger.info(`Job ignored: repository ${payload.repository.owner.login}/${payload.repository.name} is disabled`);
- return;
- }
+async function resolveQueuedJob(
+ env: AppBindings,
+ message: ReviewJobMessage,
+): Promise<{ job: PersistedReviewJob; phase: 'prepare' | 'review' | 'finalize' } | null> {
+ if (message.jobId) {
+ const row = await getJobForProcessing(env, message.jobId);
+ return row ? { job: mapJob(row), phase: message.phase ?? 'review' } : null;
+ }
- // 2. Extract Review Request
- const extracted = extractReviewRequest({
- eventName,
- payload,
- botUsername: env.BOT_USERNAME,
- config: repoConfig.parsedJson,
- });
+ if (!message.eventName) {
+ logger.warn('Queue message ignored: missing eventName');
+ return null;
+ }
- if (!extracted) {
- // Handle specific PR closed events if needed (cleanup)
- if (eventName === 'pull_request') {
- const prPayload = payload as PullRequestWebhookPayload;
- if (prPayload.action === 'closed' && repoConfig.parsedJson.review.labels !== false) {
- const labels = repoConfig.parsedJson.review.labels;
- const gh = new GitHubClient(env, installationId);
- await gh.removeIssueLabel(prPayload.repository.owner.login, prPayload.repository.name, prPayload.pull_request.number, labels.p1);
- await gh.removeIssueLabel(prPayload.repository.owner.login, prPayload.repository.name, prPayload.pull_request.number, labels.p2);
- await gh.removeIssueLabel(prPayload.repository.owner.login, prPayload.repository.name, prPayload.pull_request.number, labels.p3);
- }
- }
- return;
- }
+ let eventName = message.eventName;
+ let payload = message.payload as GitHubWebhookPayload | undefined;
- // 3. Resolve full PR info for mentions
- let resolved = extracted;
- const githubClient = new GitHubClient(env, installationId);
- if (eventName === 'issue_comment') {
- const pr = await githubClient.getPullRequest(extracted.owner, extracted.repo, extracted.prNumber);
- resolved = {
- ...extracted,
- prTitle: pr.title,
- prAuthor: pr.user.login,
- commitSha: pr.head.sha,
- baseSha: pr.base.sha,
- headRef: pr.head.ref,
- baseRef: pr.base.ref,
- };
+ if (payload === undefined) {
+ const delivery = await getWebhookDelivery(env, message.deliveryId);
+ if (!delivery) {
+ logger.warn(`Queue message ignored: webhook delivery not found: ${message.deliveryId}`);
+ return null;
}
- // 4. Duplicate Check
- const duplicateJob = await findExistingJobForHead(env, {
- owner: resolved.owner,
- repo: resolved.repo,
- prNumber: resolved.prNumber,
- commitSha: resolved.commitSha,
- trigger: resolved.trigger,
- });
- if (duplicateJob) {
- if (duplicateJob.status === 'running') {
- logger.info(`Duplicate in-flight job ${duplicateJob.id} is already running for ${resolved.owner}/${resolved.repo} PR #${resolved.prNumber}.`);
- return;
- }
- if (duplicateJob.status === 'queued') {
- logger.info(`Resuming duplicate in-flight job ${duplicateJob.id} for ${resolved.owner}/${resolved.repo} PR #${resolved.prNumber}.`);
- job = duplicateJob;
- } else {
- logger.info(`Duplicate terminal job found for ${resolved.owner}/${resolved.repo} PR #${resolved.prNumber}, skipping.`);
- return;
- }
- } else {
- // 5. Insert Job
- job = await insertJob(env, {
- installationId: resolved.installationId,
- owner: resolved.owner,
- repo: resolved.repo,
- prNumber: resolved.prNumber,
- prTitle: resolved.prTitle,
- prAuthor: resolved.prAuthor,
- commitSha: resolved.commitSha,
- baseSha: resolved.baseSha,
- trigger: resolved.trigger,
- headRef: resolved.headRef,
- baseRef: resolved.baseRef,
- configSnapshot: repoConfig.parsedJson,
- });
-
- // 6. Supersede older jobs
- await supersedeOlderJobs(env, {
- installationId: resolved.installationId,
- owner: resolved.owner,
- repo: resolved.repo,
- prNumber: resolved.prNumber,
- newJobId: job.id,
- });
- }
+ eventName = delivery.event_name;
+ payload = delivery.payload as GitHubWebhookPayload;
}
- const tracker = new TokenTracker();
- const github = new GitHubService(env, job.installationId, tracker);
- const model = new ModelService(env, tracker);
- const formatter = new FormatterService(env.APP_URL);
-
- let checkRunId = job.checkRunId;
+ if (!isSupportedGitHubWebhookEvent(eventName)) {
+ logger.info(`Queue message ignored: unsupported GitHub event ${eventName}`);
+ return null;
+ }
- try {
- tracker.incrementSubrequests(1);
- const claimed = await startJobProcessing(env, job.id, 'Preparation');
- if (!claimed) {
- logger.info(`Job ${job.id} was already claimed or no longer queued, skipping duplicate queue delivery.`);
- return;
- }
+ const installationId = String(payload.installation?.id ?? '');
+ if (!installationId || !('repository' in payload) || !payload.repository) {
+ logger.info('Queue message ignored: missing installation or repository info');
+ return null;
+ }
- const pr = await github.getPullRequest(job.owner, job.repo, job.prNumber);
- const config = (job.configSnapshot ?? defaultRepoConfig) as RepoConfig;
+ const repoConfig = await loadRepoConfig(env, {
+ installationId,
+ owner: payload.repository.owner.login,
+ repo: payload.repository.name,
+ });
- if (!checkRunId) {
- const checkRun = await github.createCheckRun(job.owner, job.repo, {
- headSha: pr.head.sha,
- title: 'Review queued',
- summary: 'Codra has started reviewing this pull request.',
- });
- checkRunId = checkRun.id;
+ if (repoConfig.enabled === false) {
+ logger.info(`Job ignored: repository ${payload.repository.owner.login}/${payload.repository.name} is disabled`);
+ return null;
+ }
- tracker.incrementSubrequests(1);
- await updateJobCheckRun(env, job.id, checkRun.id);
+ const extracted = extractReviewRequest({
+ eventName,
+ payload,
+ botUsername: env.BOT_USERNAME,
+ config: repoConfig.parsedJson,
+ });
+
+ if (!extracted) {
+ if (eventName === 'pull_request') {
+ const prPayload = payload as PullRequestWebhookPayload;
+ if (prPayload.action === 'closed' && repoConfig.parsedJson.review.labels !== false) {
+ const labels = repoConfig.parsedJson.review.labels;
+ const gh = new GitHubClient(env, installationId);
+ await gh.removeIssueLabel(prPayload.repository.owner.login, prPayload.repository.name, prPayload.pull_request.number, labels.p1);
+ await gh.removeIssueLabel(prPayload.repository.owner.login, prPayload.repository.name, prPayload.pull_request.number, labels.p2);
+ await gh.removeIssueLabel(prPayload.repository.owner.login, prPayload.repository.name, prPayload.pull_request.number, labels.p3);
+ }
}
+ return null;
+ }
- const rawDiff = await github.getPullRequestDiff(job.owner, job.repo, job.prNumber);
- const files = filterReviewableFiles(parseUnifiedDiff(rawDiff), config.review);
-
- tracker.incrementSubrequests(1);
- await completePreparationStep(env, job.id, files.length);
+ let resolved = extracted;
+ const githubClient = new GitHubClient(env, installationId);
+ if (eventName === 'issue_comment') {
+ const pr = await githubClient.getPullRequest(extracted.owner, extracted.repo, extracted.prNumber);
+ resolved = {
+ ...extracted,
+ prTitle: pr.title,
+ prAuthor: pr.user.login,
+ commitSha: pr.head.sha,
+ baseSha: pr.base.sha,
+ headRef: pr.head.ref,
+ baseRef: pr.base.ref,
+ };
+ }
- tracker.incrementSubrequests(1);
- const preparedJob = await getJobForProcessing(env, job.id);
- if (preparedJob?.status === 'superseded') {
- throw new Error('JOB_SUPERSEDED');
+ const duplicateJob = await findExistingJobForHead(env, {
+ owner: resolved.owner,
+ repo: resolved.repo,
+ prNumber: resolved.prNumber,
+ commitSha: resolved.commitSha,
+ trigger: resolved.trigger,
+ });
+ if (duplicateJob) {
+ if (duplicateJob.status === 'queued' || duplicateJob.status === 'running') {
+ logger.info(`Resuming duplicate in-flight job ${duplicateJob.id} for ${resolved.owner}/${resolved.repo} PR #${resolved.prNumber}.`);
+ return { job: duplicateJob, phase: message.phase ?? 'prepare' };
}
- tracker.incrementSubrequests(1);
- await updateJobStep(env, job.id, 'Reviewing Files', { status: 'running' });
- const reviewedComments: ParsedReviewComment[] = [];
- const fileSummaries: Array<{ path: string; summary: string; verdict: string }> = [];
- const newReviewsToInsert: any[] = [];
- let stoppedBeforeAllFiles = false;
-
- const jobIdsToQuery = [job.id];
- if (job.retryOfJobId) jobIdsToQuery.push(job.retryOfJobId);
- const allExistingReviews = await getFileReviewsForJobs(env, jobIdsToQuery);
-
- const currentJobReviews = allExistingReviews.filter(r => r.job_id === job.id);
- const existingReviews = [...currentJobReviews];
- for (const r of allExistingReviews) {
- if (r.job_id !== job.id && !existingReviews.some(er => er.file_path === r.file_path)) {
- existingReviews.push(r);
- }
- }
+ logger.info(`Duplicate terminal job found for ${resolved.owner}/${resolved.repo} PR #${resolved.prNumber}, skipping.`);
+ return null;
+ }
- const totalLineCount = files.reduce((sum, f) => sum + f.lineCount, 0);
- for (const [index, file] of files.entries()) {
- // Safety break to avoid hitting Cloudflare 50-subrequest limit
- if (!tracker.hasRemainingSubrequests(5)) {
- logger.warn(`Approaching subrequest limit (${tracker.getSubrequestCount()}), stopping review loop at file ${index + 1}/${files.length}`);
- stoppedBeforeAllFiles = true;
- break;
- }
+ const job = await insertJob(env, {
+ installationId: resolved.installationId,
+ owner: resolved.owner,
+ repo: resolved.repo,
+ prNumber: resolved.prNumber,
+ prTitle: resolved.prTitle,
+ prAuthor: resolved.prAuthor,
+ commitSha: resolved.commitSha,
+ baseSha: resolved.baseSha,
+ trigger: resolved.trigger,
+ headRef: resolved.headRef,
+ baseRef: resolved.baseRef,
+ configSnapshot: repoConfig.parsedJson,
+ });
+
+ await supersedeOlderJobs(env, {
+ installationId: resolved.installationId,
+ owner: resolved.owner,
+ repo: resolved.repo,
+ prNumber: resolved.prNumber,
+ newJobId: job.id,
+ });
+
+ return { job, phase: 'prepare' };
+}
- // Periodic check for supersession (every 50 files - reduced frequency to save subrequests)
- if (index % 50 === 0 && index > 0) {
- tracker.incrementSubrequests(1);
- const currentJob = await getJobForProcessing(env, job.id);
- if (currentJob?.status === 'superseded') {
- throw new Error('JOB_SUPERSEDED');
- }
- }
+async function runPreparePhase(
+ env: AppBindings,
+ job: PersistedReviewJob,
+ leaseOwner: string,
+ github: GitHubService,
+) {
+ await updateJobStep(env, job.id, 'Preparation', { status: 'running' });
+ const pr = await github.getPullRequest(job.owner, job.repo, job.prNumber);
+ const config = (job.configSnapshot ?? defaultRepoConfig) as RepoConfig;
- const existing = existingReviews.find((r) => r.file_path === file.path && r.file_status === 'done');
+ let checkRunId = job.checkRunId;
+ if (!checkRunId) {
+ const checkRun = await github.createCheckRun(job.owner, job.repo, {
+ headSha: pr.head.sha,
+ title: 'Review queued',
+ summary: 'Codra has started reviewing this pull request.',
+ });
+ checkRunId = checkRun.id;
+ await updateJobCheckRun(env, job.id, checkRun.id);
+ }
- if (existing) {
- reviewedComments.push(...(existing.parsed_comments as ParsedReviewComment[]));
- fileSummaries.push({
- path: file.path,
- summary: existing.file_summary ?? '',
- verdict: existing.verdict ?? 'comment',
- });
+ const rawDiff = await github.getPullRequestDiff(job.owner, job.repo, job.prNumber);
+ const files = filterReviewableFiles(parseUnifiedDiff(rawDiff), config.review);
+ await completePreparationStep(env, job.id, files.length);
+ await heartbeatJobLease(env, job.id, leaseOwner, JOB_LEASE_SECONDS);
- if (existing.model_used && (existing.input_tokens || existing.output_tokens)) {
- tracker.record(existing.model_used, existing.input_tokens ?? 0, existing.output_tokens ?? 0);
- }
-
- // If this review was from a parent job, we'll include it in our batch insert for the current job
- if (!currentJobReviews.some((r) => r.file_path === file.path)) {
- newReviewsToInsert.push({
- filePath: file.path,
- fileStatus: 'done',
- modelUsed: existing.model_used,
- modelProvider: (existing as any).model_provider,
- diffLineCount: existing.diff_line_count,
- diffInput: existing.diff_input,
- rawAiOutput: existing.raw_ai_output,
- parsedComments: existing.parsed_comments as ParsedReviewComment[],
- inputTokens: existing.input_tokens,
- outputTokens: existing.output_tokens,
- durationMs: existing.duration_ms,
- verdict: existing.verdict,
- fileSummary: existing.file_summary,
- overallCorrectness: existing.overall_correctness,
- confidenceScore: existing.confidence_score,
- errorMessage: null,
- });
- }
- continue;
- }
+ if (files.length === 0) {
+ await updateJobStep(env, job.id, 'Reviewing Files', { status: 'done' });
+ await enqueueJobPhase(env, job.id, 'finalize');
+ return;
+ }
- // Update check run less frequently (every 50 files)
- if ((index > 0 && index % 50 === 0) || index === files.length - 1) {
- await github.updateCheckRun(job.owner, job.repo, checkRunId, {
- title: `Reviewing (${index + 1}/${files.length})`,
- summary: `Analyzing ${file.path}`,
- });
- }
+ if (checkRunId) {
+ await github.updateCheckRun(job.owner, job.repo, checkRunId, {
+ title: `Reviewing (0/${files.length})`,
+ summary: 'Codra is analyzing changed files.',
+ });
+ }
+ await enqueueJobPhase(env, job.id, 'review');
+}
- const startedAt = Date.now();
- try {
- // AI call (ModelService handles its own subrequest incrementing)
- const response = await model.reviewFile({
- file,
- prTitle: pr.title ?? null,
- prDescription: pr.body ?? null,
- config: config,
- totalLineCount,
- });
+async function runReviewPhase(
+ env: AppBindings,
+ job: PersistedReviewJob,
+ leaseOwner: string,
+ github: GitHubService,
+ model: ModelService,
+) {
+ if (!hasCompletedStep(job, 'Preparation')) {
+ await runPreparePhase(env, job, leaseOwner, github);
+ return;
+ }
- reviewedComments.push(...response.parsed.comments);
- fileSummaries.push({
- path: file.path,
- summary: response.parsed.fileSummary,
- verdict: response.parsed.verdict,
- });
+ await updateJobStep(env, job.id, 'Reviewing Files', { status: 'running' });
+
+ const pr = await github.getPullRequest(job.owner, job.repo, job.prNumber);
+ const config = (job.configSnapshot ?? defaultRepoConfig) as RepoConfig;
+ const rawDiff = await github.getPullRequestDiff(job.owner, job.repo, job.prNumber);
+ const files = filterReviewableFiles(parseUnifiedDiff(rawDiff), config.review);
+ const totalLineCount = files.reduce((sum, file) => sum + file.lineCount, 0);
+ const startedAt = Date.now();
+ let processedThisChunk = 0;
+
+ const jobIdsToQuery = [job.id];
+ if (job.retryOfJobId) jobIdsToQuery.push(job.retryOfJobId);
+ const allExistingReviews = await getFileReviewsForJobs(env, jobIdsToQuery);
+ const currentReviews = new Map(allExistingReviews.filter((review) => review.job_id === job.id).map((review) => [review.file_path, review]));
+ const parentReviews = new Map(allExistingReviews.filter((review) => review.job_id !== job.id && review.file_status === 'done').map((review) => [review.file_path, review]));
+
+ for (const file of files) {
+ const existingReview = currentReviews.get(file.path);
+ if (existingReview && countsAsHandledFileReview(existingReview)) {
+ continue;
+ }
- newReviewsToInsert.push({
+ const inherited = parentReviews.get(file.path);
+ if (inherited) {
+ if (!canInheritParentFileReview(config, inherited)) {
+ logger.info(`Ignoring inherited review for ${file.path}; parent model ${inherited.model_used} is not in the current model strategy`);
+ await reviewAndPersistFile(env, job, file, pr, config, totalLineCount, model, existingReview);
+ processedThisChunk += 1;
+ await heartbeatAndCheckSuperseded(env, job.id, leaseOwner);
+ } else {
+ await upsertFileReview(env, job.id, {
filePath: file.path,
fileStatus: 'done',
- modelUsed: response.modelUsed,
- modelProvider: response.provider,
- diffLineCount: file.lineCount,
- diffInput: response.userPrompt,
- rawAiOutput: response.rawText,
- parsedComments: response.parsed.comments,
- inputTokens: response.inputTokens,
- outputTokens: response.outputTokens,
- durationMs: Date.now() - startedAt,
- verdict: response.parsed.verdict,
- fileSummary: response.parsed.fileSummary,
- overallCorrectness: response.parsed.overallCorrectness,
- confidenceScore: response.parsed.confidenceScore,
+ modelUsed: inherited.model_used,
+ modelProvider: inherited.model_provider,
+ diffLineCount: inherited.diff_line_count,
+ diffInput: inherited.diff_input,
+ rawAiOutput: inherited.raw_ai_output,
+ parsedComments: inherited.parsed_comments as ParsedReviewComment[],
+ inputTokens: inherited.input_tokens,
+ outputTokens: inherited.output_tokens,
+ durationMs: inherited.duration_ms,
+ verdict: inherited.verdict,
+ fileSummary: inherited.file_summary,
+ overallCorrectness: inherited.overall_correctness,
+ confidenceScore: inherited.confidence_score,
errorMessage: null,
});
- } catch (error) {
- const errorMessage = error instanceof Error ? error.message : 'Unknown file review error';
- logger.error(`File review failed for ${file.path}`, { error });
-
- // If we hit a hard limit (subrequests or neuron quota), STOP EVERYTHING.
- const isHardLimit =
- errorMessage.toLowerCase().includes('subrequest') ||
- errorMessage.includes('4006') ||
- errorMessage.toLowerCase().includes('allocation');
-
- if (isHardLimit) {
- throw error;
- }
-
- fileSummaries.push({
- path: file.path,
- summary: `Review failed: ${errorMessage}`,
- verdict: 'failed',
- });
+ currentReviews.set(file.path, inherited);
+ processedThisChunk += 1;
+ await heartbeatAndCheckSuperseded(env, job.id, leaseOwner);
+ }
+ } else {
+ await reviewAndPersistFile(env, job, file, pr, config, totalLineCount, model, existingReview);
+ processedThisChunk += 1;
+ await heartbeatAndCheckSuperseded(env, job.id, leaseOwner);
+ }
+
+ if (processedThisChunk >= REVIEW_CHUNK_FILE_LIMIT || Date.now() - startedAt >= REVIEW_CHUNK_WALL_CLOCK_MS) {
+ break;
+ }
+ }
+
+ const latestReviews = await getFileReviewsForJobs(env, [job.id]);
+ const reviewedPaths = new Set(latestReviews.filter(countsAsHandledFileReview).map((review) => review.file_path));
+ const completedCount = files.filter((file) => reviewedPaths.has(file.path)).length;
+
+ if (completedCount >= files.length) {
+ await updateJobStep(env, job.id, 'Reviewing Files', { status: 'done' });
+ await enqueueJobPhase(env, job.id, 'finalize');
+ return;
+ }
- newReviewsToInsert.push({
+ if (job.checkRunId) {
+ await github.updateCheckRun(job.owner, job.repo, job.checkRunId, {
+ title: `Reviewing (${completedCount}/${files.length})`,
+ summary: 'Codra is continuing this review in the next queue chunk.',
+ });
+ }
+ await enqueueJobPhase(env, job.id, 'review');
+}
+
+async function reviewAndPersistFile(
+ env: AppBindings,
+ job: PersistedReviewJob,
+ file: ReturnType[number],
+ pr: Awaited>,
+ config: RepoConfig,
+ totalLineCount: number,
+ model: ModelService,
+ previousReview?: { transient_error_count: number },
+) {
+ const startedAt = Date.now();
+ const compactPrompt = (previousReview?.transient_error_count ?? 0) > 0;
+ try {
+ const response = await model.reviewFile({
+ file,
+ prTitle: pr.title ?? null,
+ prDescription: pr.body ?? null,
+ config,
+ totalLineCount,
+ compactPrompt,
+ });
+
+ await upsertFileReview(env, job.id, {
+ filePath: file.path,
+ fileStatus: 'done',
+ modelUsed: response.modelUsed,
+ modelProvider: response.provider,
+ diffLineCount: file.lineCount,
+ diffInput: response.userPrompt,
+ rawAiOutput: response.rawText,
+ parsedComments: response.parsed.comments,
+ inputTokens: response.inputTokens,
+ outputTokens: response.outputTokens,
+ durationMs: Date.now() - startedAt,
+ verdict: response.parsed.verdict,
+ fileSummary: response.parsed.fileSummary,
+ overallCorrectness: response.parsed.overallCorrectness,
+ confidenceScore: response.parsed.confidenceScore,
+ errorMessage: null,
+ });
+ } catch (error) {
+ const errorMessage = error instanceof Error ? error.message : 'Unknown file review error';
+
+ if (isRetryableModelError(error)) {
+ const modelId = config.model?.main ?? 'gemma-4-31b-it';
+ const failureCount = await recordRetryableFileReviewFailure(env, job.id, {
+ filePath: file.path,
+ modelUsed: modelId,
+ modelProvider: modelId.startsWith('@cf/') ? 'cloudflare' : 'google',
+ diffLineCount: file.lineCount,
+ diffInput: '',
+ durationMs: Date.now() - startedAt,
+ errorMessage,
+ });
+
+ if (failureCount >= MAX_RETRYABLE_FILE_REVIEW_FAILURES) {
+ const finalError = `Review skipped after ${failureCount} repeated model provider outages.`;
+ await upsertFileReview(env, job.id, {
filePath: file.path,
fileStatus: 'failed',
- modelUsed: config.model?.main ?? 'gemma-4-31b-it',
- modelProvider: (config.model?.main ?? 'gemma-4-31b-it').startsWith('@cf/') ? 'cloudflare' : 'google',
+ modelUsed: modelId,
+ modelProvider: modelId.startsWith('@cf/') ? 'cloudflare' : 'google',
diffLineCount: file.lineCount,
diffInput: '',
rawAiOutput: null,
@@ -438,137 +554,211 @@ export async function runReviewJob(env: AppBindings, message: ReviewJobMessage)
durationMs: Date.now() - startedAt,
verdict: null,
fileSummary: null,
- errorMessage,
+ errorMessage: finalError,
});
+ logger.error(`File review failed permanently for ${file.path} after transient retries`, {
+ attempts: failureCount,
+ error: errorMessage,
+ });
+ return;
}
- }
- // Batch insert all NEW or parent-inherited reviews at once (1 subrequest for reviews, 1 for comments)
- if (newReviewsToInsert.length > 0) {
- const { batchInsertFileReviews } = await import('@server/db/file-reviews');
- tracker.incrementSubrequests(2); // 1 for reviews, 1 for comments
- await batchInsertFileReviews(env, job.id, newReviewsToInsert);
- }
-
- if (stoppedBeforeAllFiles) {
- tracker.incrementSubrequests(1);
- await updateJobStep(env, job.id, 'Reviewing Files', {
- status: 'failed',
- error: 'Review stopped before all files were analyzed due to subrequest limits.',
+ logger.warn(`File review deferred for ${file.path}; transient model/provider failure will retry later`, {
+ error: errorMessage,
+ attempts: failureCount,
});
- throw new Error('Review stopped before all files were analyzed due to subrequest limits.');
+ throw error;
}
- if (fileSummaries.length > 0 && fileSummaries.every((f) => f.verdict === 'failed')) {
- tracker.incrementSubrequests(1);
- await updateJobStep(env, job.id, 'Reviewing Files', { status: 'failed', error: 'All files failed to review' });
- throw new Error('All files failed to review');
- }
-
- tracker.incrementSubrequests(1);
- await updateJobStep(env, job.id, 'Reviewing Files', { status: 'done' });
+ logger.error(`File review failed for ${file.path}`, { error });
- tracker.incrementSubrequests(1);
- await updateJobStep(env, job.id, 'Generating Summary', { status: 'running' });
- const hasFailures = fileSummaries.some((f) => f.verdict === 'failed');
- const verdictSummary = formatter.summarizeVerdict(reviewedComments, hasFailures);
+ const isHardLimit =
+ errorMessage.toLowerCase().includes('subrequest') ||
+ errorMessage.includes('4006') ||
+ errorMessage.toLowerCase().includes('allocation');
- // Final check before generating summary and posting review
- const finalJobCheck = await getJobForProcessing(env, job.id);
- if (finalJobCheck?.status === 'superseded') {
- throw new Error('JOB_SUPERSEDED');
+ if (isHardLimit) {
+ throw error;
}
- const summaryResponse = await model.generateSummary({
- prTitle: pr.title ?? null,
- verdict: verdictSummary.verdict,
- fileSummaries,
- config,
+ const modelId = config.model?.main ?? 'gemma-4-31b-it';
+ await upsertFileReview(env, job.id, {
+ filePath: file.path,
+ fileStatus: 'failed',
+ modelUsed: modelId,
+ modelProvider: modelId.startsWith('@cf/') ? 'cloudflare' : 'google',
+ diffLineCount: file.lineCount,
+ diffInput: '',
+ rawAiOutput: null,
+ parsedComments: [],
+ inputTokens: null,
+ outputTokens: null,
+ durationMs: Date.now() - startedAt,
+ verdict: null,
+ fileSummary: null,
+ errorMessage,
});
+ }
+}
- await updateJobStep(env, job.id, 'Generating Summary', { status: 'done' });
-
- const formattedSummary = formatter.formatReviewOverview(pr.head.sha, env.BOT_USERNAME);
+async function runFinalizePhase(
+ env: AppBindings,
+ job: PersistedReviewJob,
+ leaseOwner: string,
+ github: GitHubService,
+ model: ModelService,
+ formatter: FormatterService,
+) {
+ await updateJobStep(env, job.id, 'Generating Summary', { status: 'running' });
+
+ const pr = await github.getPullRequest(job.owner, job.repo, job.prNumber);
+ const config = (job.configSnapshot ?? defaultRepoConfig) as RepoConfig;
+ const rawDiff = await github.getPullRequestDiff(job.owner, job.repo, job.prNumber);
+ const files = filterReviewableFiles(parseUnifiedDiff(rawDiff), config.review);
+ const reviews = await getFileReviewsForJobs(env, [job.id]);
+
+ if (reviews.length < files.length) {
+ await updateJobStep(env, job.id, 'Reviewing Files', { status: 'running' });
+ await enqueueJobPhase(env, job.id, 'review');
+ return;
+ }
- await updateJobStep(env, job.id, 'Completing', { status: 'running' });
- const review = await github.createReview(job.owner, job.repo, job.prNumber, {
- commitSha: pr.head.sha,
- event: formatter.toReviewEvent(verdictSummary.verdict),
- body: formattedSummary,
- comments: reviewedComments.map(c => ({
- path: c.path,
- position: c.position ?? undefined,
- body: formatter.formatInlineComment(c)
- })),
- });
+ const reviewedComments = reviews.flatMap((review) => review.parsed_comments as ParsedReviewComment[]);
+ const fileSummaries = reviews.map((review) => ({
+ path: review.file_path,
+ summary: review.file_status === 'failed'
+ ? `Review failed: ${review.error_msg ?? 'Unknown file review error'}`
+ : (review.file_summary ?? ''),
+ verdict: review.file_status === 'failed' ? 'failed' : (review.verdict ?? 'comment'),
+ }));
+
+ if (fileSummaries.length > 0 && fileSummaries.every((file) => file.verdict === 'failed')) {
+ await updateJobStep(env, job.id, 'Generating Summary', { status: 'failed', error: 'All files failed to review' });
+ throw new Error('All files failed to review');
+ }
- if (config.review.labels !== false) {
- const labels = config.review.labels;
- const labelMap = {
- comment: { name: labels.p1, color: 'f79009' },
- approve: { name: labels.p2, color: '027a48' },
- } as const;
- const label = labelMap[verdictSummary.verdict];
-
- // Remove other verdict labels if they exist
- const allPotentialLabels = [labels.p1, labels.p2, labels.p3];
- for (const l of allPotentialLabels) {
- if (l !== label.name) {
- await github.removeIssueLabel(job.owner, job.repo, job.prNumber, l);
- }
+ const hasFailures = fileSummaries.some((file) => file.verdict === 'failed');
+ const failedFileCount = fileSummaries.filter((file) => file.verdict === 'failed').length;
+ const verdictSummary = formatter.summarizeVerdict(reviewedComments, hasFailures);
+ const summaryResponse = await model.generateSummary({
+ prTitle: pr.title ?? null,
+ verdict: verdictSummary.verdict,
+ fileSummaries,
+ config,
+ });
+ await updateJobStep(env, job.id, 'Generating Summary', { status: 'done' });
+ await heartbeatAndCheckSuperseded(env, job.id, leaseOwner);
+
+ const formattedSummary = formatter.formatReviewOverview(pr.head.sha, env.BOT_USERNAME);
+
+ await updateJobStep(env, job.id, 'Completing', { status: 'running' });
+ const review = await github.createReview(job.owner, job.repo, job.prNumber, {
+ commitSha: pr.head.sha,
+ event: formatter.toReviewEvent(verdictSummary.verdict),
+ body: formattedSummary,
+ comments: reviewedComments.map(comment => ({
+ path: comment.path,
+ position: comment.position ?? undefined,
+ body: formatter.formatInlineComment(comment),
+ })),
+ });
+
+ if (config.review.labels !== false) {
+ const labels = config.review.labels;
+ const labelMap = {
+ comment: { name: labels.p1, color: 'f79009' },
+ approve: { name: labels.p2, color: '027a48' },
+ } as const;
+ const label = labelMap[verdictSummary.verdict];
+
+ for (const possibleLabel of [labels.p1, labels.p2, labels.p3]) {
+ if (possibleLabel !== label.name) {
+ await github.removeIssueLabel(job.owner, job.repo, job.prNumber, possibleLabel);
}
-
- await github.ensureLabel(job.owner, job.repo, label.name, label.color);
- await github.addIssueLabels(job.owner, job.repo, job.prNumber, [label.name]);
}
- await github.updateCheckRun(job.owner, job.repo, checkRunId, {
+ await github.ensureLabel(job.owner, job.repo, label.name, label.color);
+ await github.addIssueLabels(job.owner, job.repo, job.prNumber, [label.name]);
+ }
+
+ if (job.checkRunId) {
+ await github.updateCheckRun(job.owner, job.repo, job.checkRunId, {
status: 'completed',
conclusion: hasFailures ? 'failure' : (verdictSummary.verdict === 'approve' ? 'success' : 'neutral'),
title: hasFailures ? 'Review partially failed' : (verdictSummary.verdict === 'approve' ? 'LGTM' : 'Comments posted'),
- summary: `${reviewedComments.length} inline comments across ${files.length} files.${hasFailures ? ' Some files failed to parse.' : ''}`,
+ summary: `${reviewedComments.length} inline comments across ${files.length} files.${hasFailures ? ` ${failedFileCount} file${failedFileCount === 1 ? '' : 's'} could not be reviewed after repeated provider outages.` : ''}`,
});
+ }
- const finalUsage = tracker.getTotalUsage();
- logger.info(`Final token usage for job ${job.id}:`, {
- total: finalUsage,
- breakdown: tracker.getBreakdown()
- });
+ const fileInputTokens = reviews.reduce((sum, review) => sum + (review.input_tokens ?? 0), 0);
+ const fileOutputTokens = reviews.reduce((sum, review) => sum + (review.output_tokens ?? 0), 0);
+ const partialErrorMessage = hasFailures
+ ? `Partial review: ${failedFileCount} of ${files.length} file${files.length === 1 ? '' : 's'} could not be reviewed after repeated model/provider outages.`
+ : null;
+ await completeJob(env, job.id, {
+ verdict: verdictSummary.verdict,
+ fileCount: files.length,
+ commentCount: reviewedComments.length,
+ totalInputTokens: fileInputTokens + (summaryResponse.inputTokens ?? 0),
+ totalOutputTokens: fileOutputTokens + (summaryResponse.outputTokens ?? 0),
+ summaryMarkdown: formattedSummary,
+ reviewId: review.id,
+ summaryModel: summaryResponse.modelUsed,
+ errorMessage: partialErrorMessage,
+ });
+ await updateJobStep(env, job.id, 'Completing', { status: 'done' });
+ logger.info(`Review job completed: ${job.owner}/${job.repo} PR #${job.prNumber}`);
+}
- await completeJob(env, job.id, {
- verdict: verdictSummary.verdict,
- fileCount: files.length,
- commentCount: reviewedComments.length,
- totalInputTokens: finalUsage.input,
- totalOutputTokens: finalUsage.output,
- summaryMarkdown: formattedSummary,
- reviewId: review.id,
- summaryModel: summaryResponse.modelUsed,
- });
- await updateJobStep(env, job.id, 'Completing', { status: 'done' });
- logger.info(`Review job completed: ${job.owner}/${job.repo} PR #${job.prNumber}`);
- } catch (error) {
- const message = error instanceof Error ? error.message : 'Unknown review failure';
- if (message === 'JOB_SUPERSEDED') {
- logger.info(`Job ${job.id} was superseded during execution, stopping.`);
- return;
- }
+async function heartbeatAndCheckSuperseded(env: AppBindings, jobId: string, leaseOwner: string) {
+ await heartbeatJobLease(env, jobId, leaseOwner, JOB_LEASE_SECONDS);
+ const currentJob = await getJobForProcessing(env, jobId);
+ if (currentJob?.status === 'superseded') {
+ throw new Error('JOB_SUPERSEDED');
+ }
+}
- logger.error(`Review job failed: ${job.owner}/${job.repo} PR #${job.prNumber}`, error);
+async function enqueueJobPhase(
+ env: AppBindings,
+ jobId: string,
+ phase: 'prepare' | 'review' | 'finalize',
+ delaySeconds = 0,
+) {
+ await markJobContinuationQueued(env, jobId);
+ await env.REVIEW_QUEUE.send(
+ {
+ jobId,
+ deliveryId: crypto.randomUUID(),
+ phase,
+ },
+ delaySeconds > 0 ? { delaySeconds } : undefined,
+ );
+}
- // Attempt to record failure, but don't crash if we are out of subrequests
- try {
- await failJob(env, job.id, message);
- if (checkRunId) {
- await github.updateCheckRun(job.owner, job.repo, checkRunId, {
- status: 'completed',
- conclusion: 'failure',
- title: 'Review failed',
- summary: message,
- });
- }
- } catch (innerError) {
- logger.error('Failed to record job failure in DB/GitHub (likely subrequest limit reached)', innerError);
+function hasCompletedStep(job: PersistedReviewJob, stepName: string) {
+ return job.steps.some((step) => step.name === stepName && step.status === 'done');
+}
+
+async function failJobAndCheckRun(
+ env: AppBindings,
+ job: PersistedReviewJob,
+ github: GitHubService,
+ message: string,
+) {
+ try {
+ await failJob(env, job.id, message);
+ const latest = await getJobForProcessing(env, job.id);
+ const checkRunId = latest?.check_run_id ?? job.checkRunId;
+ if (checkRunId) {
+ await github.updateCheckRun(job.owner, job.repo, checkRunId, {
+ status: 'completed',
+ conclusion: 'failure',
+ title: 'Review failed',
+ summary: message,
+ });
+ await markJobCheckRunCompleted(env, job.id);
}
+ } catch (innerError) {
+ logger.error('Failed to record job failure in DB/GitHub', innerError);
}
}
diff --git a/src/server/db/client.ts b/src/server/db/client.ts
index 934d487..f37a6fd 100644
--- a/src/server/db/client.ts
+++ b/src/server/db/client.ts
@@ -13,13 +13,13 @@ function createDbClient(env: DbEnv): DbClient {
const sql = postgres(env.HYPERDRIVE.connectionString, {
max: 5,
fetch_types: false,
- prepare: true,
+ prepare: false,
onnotice: () => {},
});
return {
async query(sqlText: string, params: unknown[] = []) {
- return (await sql.unsafe(sqlText, params.map(normalizeParam) as any[], { prepare: true })) as T[];
+ return (await sql.unsafe(sqlText, params.map(normalizeParam) as any[], { prepare: false })) as T[];
},
};
}
diff --git a/src/server/db/file-reviews.ts b/src/server/db/file-reviews.ts
index bc99aac..19d2ff1 100644
--- a/src/server/db/file-reviews.ts
+++ b/src/server/db/file-reviews.ts
@@ -93,6 +93,184 @@ export async function insertFileReview(
}
}
+export async function upsertFileReview(
+ env: Pick,
+ jobId: string,
+ input: {
+ filePath: string;
+ fileStatus: 'pending' | 'done' | 'skipped' | 'failed';
+ modelUsed: string;
+ modelProvider?: string | null;
+ diffLineCount: number;
+ diffInput: string | null;
+ rawAiOutput: string | null;
+ parsedComments: ParsedReviewComment[];
+ inputTokens: number | null;
+ outputTokens: number | null;
+ durationMs: number | null;
+ verdict: 'approve' | 'comment' | null;
+ fileSummary: string | null;
+ overallCorrectness?: string | null;
+ confidenceScore?: number | null;
+ errorMessage: string | null;
+ },
+) {
+ const [review] = await queryRows<{ id: string }>(
+ env,
+ `
+ INSERT INTO file_reviews (
+ job_id,
+ file_path,
+ file_status,
+ model_used,
+ diff_line_count,
+ diff_input,
+ raw_ai_output,
+ input_tokens,
+ output_tokens,
+ duration_ms,
+ verdict,
+ file_summary,
+ overall_correctness,
+ confidence_score,
+ error_msg,
+ model_provider
+ )
+ VALUES ($1::uuid, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
+ ON CONFLICT (job_id, file_path) DO UPDATE SET
+ file_status = EXCLUDED.file_status,
+ model_used = EXCLUDED.model_used,
+ diff_line_count = EXCLUDED.diff_line_count,
+ diff_input = EXCLUDED.diff_input,
+ raw_ai_output = EXCLUDED.raw_ai_output,
+ input_tokens = EXCLUDED.input_tokens,
+ output_tokens = EXCLUDED.output_tokens,
+ duration_ms = EXCLUDED.duration_ms,
+ verdict = EXCLUDED.verdict,
+ file_summary = EXCLUDED.file_summary,
+ overall_correctness = EXCLUDED.overall_correctness,
+ confidence_score = EXCLUDED.confidence_score,
+ error_msg = EXCLUDED.error_msg,
+ model_provider = EXCLUDED.model_provider,
+ transient_error_count = 0
+ RETURNING id
+ `,
+ [
+ jobId,
+ input.filePath,
+ input.fileStatus,
+ input.modelUsed,
+ input.diffLineCount,
+ input.diffInput,
+ input.rawAiOutput,
+ input.inputTokens,
+ input.outputTokens,
+ input.durationMs,
+ input.verdict,
+ input.fileSummary,
+ input.overallCorrectness ?? null,
+ input.confidenceScore ?? null,
+ input.errorMessage,
+ input.modelProvider ?? null,
+ ],
+ );
+
+ await queryRows(env, 'DELETE FROM review_comments WHERE file_review_id = $1::uuid', [review.id]);
+
+ if (input.parsedComments.length > 0) {
+ await queryRows(
+ env,
+ `
+ INSERT INTO review_comments (
+ file_review_id, path, line, position, severity, category, title, body, code_suggestion
+ )
+ SELECT $1::uuid, * FROM UNNEST($2::text[], $3::int[], $4::int[], $5::text[], $6::text[], $7::text[], $8::text[], $9::text[])
+ `,
+ [
+ review.id,
+ input.parsedComments.map(c => c.path),
+ input.parsedComments.map(c => c.line ?? null),
+ input.parsedComments.map(c => c.position ?? null),
+ input.parsedComments.map(c => c.severity),
+ input.parsedComments.map(c => c.category),
+ input.parsedComments.map(c => c.title),
+ input.parsedComments.map(c => c.body),
+ input.parsedComments.map(c => c.codeSuggestion ?? null),
+ ],
+ );
+ }
+}
+
+export async function recordRetryableFileReviewFailure(
+ env: Pick,
+ jobId: string,
+ input: {
+ filePath: string;
+ modelUsed: string;
+ modelProvider?: string | null;
+ diffLineCount: number;
+ diffInput: string | null;
+ durationMs: number | null;
+ errorMessage: string;
+ },
+) {
+ const [review] = await queryRows<{ id: string; transient_error_count: number }>(
+ env,
+ `
+ INSERT INTO file_reviews (
+ job_id,
+ file_path,
+ file_status,
+ model_used,
+ model_provider,
+ diff_line_count,
+ diff_input,
+ raw_ai_output,
+ input_tokens,
+ output_tokens,
+ duration_ms,
+ verdict,
+ file_summary,
+ overall_correctness,
+ confidence_score,
+ error_msg,
+ transient_error_count
+ )
+ VALUES ($1::uuid, $2, 'failed', $3, $4, $5, $6, NULL, NULL, NULL, $7, NULL, NULL, NULL, NULL, $8, 1)
+ ON CONFLICT (job_id, file_path) DO UPDATE SET
+ file_status = 'failed',
+ model_used = EXCLUDED.model_used,
+ model_provider = EXCLUDED.model_provider,
+ diff_line_count = EXCLUDED.diff_line_count,
+ diff_input = EXCLUDED.diff_input,
+ raw_ai_output = NULL,
+ input_tokens = NULL,
+ output_tokens = NULL,
+ duration_ms = EXCLUDED.duration_ms,
+ verdict = NULL,
+ file_summary = NULL,
+ overall_correctness = NULL,
+ confidence_score = NULL,
+ error_msg = EXCLUDED.error_msg,
+ transient_error_count = file_reviews.transient_error_count + 1
+ RETURNING id, transient_error_count
+ `,
+ [
+ jobId,
+ input.filePath,
+ input.modelUsed,
+ input.modelProvider ?? null,
+ input.diffLineCount,
+ input.diffInput,
+ input.durationMs,
+ input.errorMessage,
+ ],
+ );
+
+ await queryRows(env, 'DELETE FROM review_comments WHERE file_review_id = $1::uuid', [review.id]);
+ return review.transient_error_count;
+}
+
export async function getModelUsageStats(env: Pick) {
return queryRows<{
model_used: string;
@@ -262,6 +440,7 @@ export async function getFileReviewsForJobs(env: Pick
confidence_score: number | null;
error_msg: string | null;
model_provider: string | null;
+ transient_error_count: number;
}>(
env,
`
diff --git a/src/server/db/jobs.ts b/src/server/db/jobs.ts
index 76e1944..ffe3cbd 100644
--- a/src/server/db/jobs.ts
+++ b/src/server/db/jobs.ts
@@ -3,7 +3,7 @@ import { parseJsonColumn, queryRows } from './client';
import { defaultRepoConfig, jobDetailSchema, jobSummarySchema, repoConfigSchema, type RepoConfig } from '@shared/schema';
import { getOrCreateRepository } from './repositories';
-type JobRow = {
+export type JobRow = {
id: string;
installation_id: string;
owner: string;
@@ -17,9 +17,15 @@ type JobRow = {
status: 'queued' | 'running' | 'done' | 'failed' | 'superseded';
config_snapshot: { review?: RepoConfig['review']; model?: RepoConfig['model'] } | string | null;
check_run_id: number | null;
+ check_run_completed_at: string | null;
created_at: string;
started_at: string | null;
finished_at: string | null;
+ lease_owner: string | null;
+ lease_expires_at: string | null;
+ heartbeat_at: string | null;
+ recovery_count: number | null;
+ last_queue_message_at: string | null;
total_input_tokens: number | null;
total_output_tokens: number | null;
verdict: 'approve' | 'comment' | null;
@@ -50,6 +56,12 @@ type JobDetailRow = JobRow & {
type ByteaValue = ArrayBuffer | ArrayBufferView | string;
+export type JobLeaseClaim =
+ | { status: 'claimed'; row: JobRow }
+ | { status: 'busy'; row: JobRow; retryAfterSeconds: number }
+ | { status: 'terminal'; row: JobRow }
+ | { status: 'missing' };
+
function hexToBytes(hex: string) {
const bytes = new Uint8Array(hex.length / 2);
for (let index = 0; index < bytes.length; index += 1) {
@@ -366,6 +378,108 @@ export async function startJobProcessing(env: Pick, j
return rows.length > 0;
}
+export async function claimJobLease(
+ env: Pick,
+ jobId: string,
+ leaseOwner: string,
+ leaseSeconds: number,
+): Promise {
+ const [claimed] = await queryRows(
+ env,
+ `
+ WITH claimed AS (
+ UPDATE jobs
+ SET status = CASE WHEN status = 'queued' THEN 'running' ELSE status END,
+ started_at = COALESCE(started_at, now()),
+ lease_owner = $2,
+ lease_expires_at = now() + ($3 || ' seconds')::interval,
+ heartbeat_at = now(),
+ last_queue_message_at = now()
+ WHERE id = $1
+ AND status IN ('queued', 'running')
+ AND (
+ lease_expires_at IS NULL
+ OR lease_expires_at < now()
+ OR lease_owner = $2
+ )
+ RETURNING *
+ )
+ SELECT c.*, r.owner, r.repo, r.installation_id
+ FROM claimed c
+ JOIN repositories r ON c.repository_id = r.id
+ `,
+ [jobId, leaseOwner, String(leaseSeconds)],
+ );
+
+ if (claimed) {
+ return { status: 'claimed', row: claimed };
+ }
+
+ const row = await getJobForProcessing(env, jobId);
+ if (!row) {
+ return { status: 'missing' };
+ }
+
+ if (!['queued', 'running'].includes(row.status)) {
+ return { status: 'terminal', row };
+ }
+
+ const expiresAt = row.lease_expires_at ? new Date(row.lease_expires_at).getTime() : 0;
+ const secondsUntilExpiry = Math.ceil((expiresAt - Date.now()) / 1000);
+ return {
+ status: 'busy',
+ row,
+ retryAfterSeconds: Math.max(15, Math.min(60, Number.isFinite(secondsUntilExpiry) ? secondsUntilExpiry : 60)),
+ };
+}
+
+export async function heartbeatJobLease(
+ env: Pick,
+ jobId: string,
+ leaseOwner: string,
+ leaseSeconds: number,
+) {
+ await queryRows(
+ env,
+ `
+ UPDATE jobs
+ SET heartbeat_at = now(),
+ lease_expires_at = now() + ($3 || ' seconds')::interval
+ WHERE id = $1
+ AND lease_owner = $2
+ AND status = 'running'
+ `,
+ [jobId, leaseOwner, String(leaseSeconds)],
+ );
+}
+
+export async function releaseJobLease(env: Pick, jobId: string, leaseOwner: string) {
+ await queryRows(
+ env,
+ `
+ UPDATE jobs
+ SET lease_owner = NULL,
+ lease_expires_at = NULL
+ WHERE id = $1
+ AND lease_owner = $2
+ `,
+ [jobId, leaseOwner],
+ );
+}
+
+export async function markJobContinuationQueued(env: Pick, jobId: string) {
+ await queryRows(
+ env,
+ `
+ UPDATE jobs
+ SET last_queue_message_at = now()
+ WHERE id = $1
+ AND status = 'running'
+ `,
+ [jobId],
+ );
+}
+
export async function updateJobCheckRun(env: Pick, jobId: string, checkRunId: number) {
await queryRows(
env,
@@ -391,6 +505,7 @@ export async function completeJob(
reviewId: number | null;
summaryModel: string | null;
overallConfidenceScore?: number | null;
+ errorMessage?: string | null;
},
) {
await queryRows(
@@ -399,6 +514,9 @@ export async function completeJob(
UPDATE jobs
SET status = 'done',
finished_at = now(),
+ check_run_completed_at = now(),
+ lease_owner = NULL,
+ lease_expires_at = NULL,
verdict = $2,
file_count = $3,
comment_count = $4,
@@ -408,7 +526,7 @@ export async function completeJob(
review_id = $8,
summary_model = $9,
overall_confidence_score = $10,
- error_msg = NULL
+ error_msg = $11
WHERE id = $1
`,
[
@@ -421,7 +539,8 @@ export async function completeJob(
input.summaryMarkdown,
input.reviewId,
input.summaryModel,
- input.overallConfidenceScore ?? null
+ input.overallConfidenceScore ?? null,
+ input.errorMessage ?? null
],
);
}
@@ -433,6 +552,8 @@ export async function failJob(env: Pick, jobId: strin
UPDATE jobs
SET status = 'failed',
finished_at = now(),
+ lease_owner = NULL,
+ lease_expires_at = NULL,
error_msg = $2,
steps = CASE
WHEN steps IS NOT NULL THEN (
@@ -452,6 +573,18 @@ export async function failJob(env: Pick, jobId: strin
);
}
+export async function markJobCheckRunCompleted(env: Pick, jobId: string) {
+ await queryRows(
+ env,
+ `
+ UPDATE jobs
+ SET check_run_completed_at = now()
+ WHERE id = $1
+ `,
+ [jobId],
+ );
+}
+
export async function updateJobFileCount(env: Pick, jobId: string, fileCount: number) {
await queryRows(
env,
@@ -580,6 +713,126 @@ export async function recoverStaleJobs(
return rows.length;
}
+export async function recoverExpiredJobLeases(
+ env: Pick,
+ maxRecoveryCount = 3,
+ unleasedGraceSeconds = 300,
+) {
+ const requeued = await queryRows<{ id: string }>(
+ env,
+ `
+ WITH expired AS (
+ SELECT id
+ FROM jobs
+ WHERE status = 'running'
+ AND (
+ (
+ lease_expires_at IS NOT NULL
+ AND lease_expires_at < now()
+ )
+ OR (
+ lease_expires_at IS NULL
+ AND COALESCE(last_queue_message_at, heartbeat_at, started_at, created_at) < now() - ($2 || ' seconds')::interval
+ )
+ )
+ AND recovery_count < $1
+ ORDER BY COALESCE(lease_expires_at, last_queue_message_at, heartbeat_at, started_at, created_at) ASC
+ LIMIT 25
+ FOR UPDATE SKIP LOCKED
+ )
+ UPDATE jobs j
+ SET lease_owner = NULL,
+ lease_expires_at = NULL,
+ heartbeat_at = NULL,
+ recovery_count = recovery_count + 1,
+ last_queue_message_at = now(),
+ error_msg = NULL
+ FROM expired
+ WHERE j.id = expired.id
+ RETURNING j.id
+ `,
+ [maxRecoveryCount, String(unleasedGraceSeconds)],
+ );
+
+ const failed = await queryRows(
+ env,
+ `
+ WITH expired AS (
+ SELECT id
+ FROM jobs
+ WHERE status = 'running'
+ AND (
+ (
+ lease_expires_at IS NOT NULL
+ AND lease_expires_at < now()
+ )
+ OR (
+ lease_expires_at IS NULL
+ AND COALESCE(last_queue_message_at, heartbeat_at, started_at, created_at) < now() - ($2 || ' seconds')::interval
+ )
+ )
+ AND recovery_count >= $1
+ ORDER BY COALESCE(lease_expires_at, last_queue_message_at, heartbeat_at, started_at, created_at) ASC
+ LIMIT 25
+ FOR UPDATE SKIP LOCKED
+ ),
+ updated AS (
+ UPDATE jobs j
+ SET status = 'failed',
+ finished_at = now(),
+ lease_owner = NULL,
+ lease_expires_at = NULL,
+ heartbeat_at = NULL,
+ error_msg = 'Job timed out: worker crashed or was evicted.',
+ steps = CASE
+ WHEN steps IS NOT NULL THEN (
+ SELECT jsonb_agg(
+ CASE
+ WHEN s->>'status' = 'running'
+ THEN s || jsonb_build_object('status', 'failed', 'finishedAt', now(), 'error', 'Job timed out: worker crashed or was evicted.')
+ ELSE s
+ END
+ ) FROM jsonb_array_elements(steps) s
+ )
+ ELSE steps
+ END
+ FROM expired
+ WHERE j.id = expired.id
+ RETURNING j.*
+ )
+ SELECT u.*, r.owner, r.repo, r.installation_id
+ FROM updated u
+ JOIN repositories r ON u.repository_id = r.id
+ `,
+ [maxRecoveryCount, String(unleasedGraceSeconds)],
+ );
+
+ return {
+ requeuedJobIds: requeued.map((row) => row.id),
+ failedJobs: failed,
+ };
+}
+
+export async function getTerminalJobsNeedingCheckRunCompletion(
+ env: Pick,
+ limit = 25,
+) {
+ return queryRows(
+ env,
+ `
+ SELECT j.*, r.owner, r.repo, r.installation_id
+ FROM jobs j
+ JOIN repositories r ON j.repository_id = r.id
+ WHERE j.status IN ('failed', 'superseded')
+ AND j.check_run_id IS NOT NULL
+ AND j.check_run_completed_at IS NULL
+ ORDER BY COALESCE(j.finished_at, j.started_at, j.created_at) ASC
+ LIMIT $1
+ `,
+ [limit],
+ );
+}
+
export async function supersedeOlderJobs(
env: Pick,
input: {
@@ -596,6 +849,8 @@ export async function supersedeOlderJobs(
UPDATE jobs j
SET status = 'superseded',
finished_at = now(),
+ lease_owner = NULL,
+ lease_expires_at = NULL,
error_msg = 'Superseded by a newer commit or job.'
FROM repositories r
WHERE j.repository_id = r.id
diff --git a/src/server/env.ts b/src/server/env.ts
index d591cb6..03a765b 100644
--- a/src/server/env.ts
+++ b/src/server/env.ts
@@ -5,7 +5,7 @@ export interface WorkersAiBinding {
}
export interface QueueProducer {
- send(message: T): Promise;
+ send(message: T, options?: { delaySeconds?: number }): Promise;
}
export interface AssetsBinding {
diff --git a/src/server/index.ts b/src/server/index.ts
index df53f8e..17bfd70 100644
--- a/src/server/index.ts
+++ b/src/server/index.ts
@@ -2,19 +2,12 @@ import { createApp } from './app';
import { runReviewJob } from './core/review';
import type { AppBindings } from './env';
import { reviewJobMessageSchema } from '@shared/schema';
-import { recoverStaleJobs } from '@server/db/jobs';
import { logger } from '@server/core/logger';
import { runWithDb } from '@server/db/client';
+import { runBestEffortJobMaintenance } from '@server/core/job-recovery';
const app = createApp();
-/**
- * Jobs left in 'running' after a worker crash must be recovered before the
- * next batch is processed. The threshold is set to 20 minutes — well above
- * the longest expected review job but below Cloudflare's 30-minute CPU limit.
- */
-const STALE_JOB_THRESHOLD_MINUTES = 20;
-
export default {
fetch(request: Request, env: AppBindings, ctx: ExecutionContext) {
return runWithDb(env, () => app.fetch(request, env, ctx));
@@ -22,44 +15,34 @@ export default {
async queue(batch: MessageBatch, env: AppBindings, _ctx: ExecutionContext) {
return runWithDb(env, async () => {
- // ── Stale-job recovery ──────────────────────────────────────────────────
- // Run once per batch. Any job that was 'running' for > threshold is a
- // leftover from a previous crashed invocation; mark it failed now so the
- // dashboard and future retries see an accurate state.
- try {
- const recovered = await recoverStaleJobs(env, STALE_JOB_THRESHOLD_MINUTES);
- if (recovered > 0) {
- logger.warn('Stale jobs recovered', { count: recovered, thresholdMinutes: STALE_JOB_THRESHOLD_MINUTES });
- }
- } catch (err) {
- // Non-fatal: log and continue processing the batch.
- logger.error('Failed to recover stale jobs', err instanceof Error ? err : new Error(String(err)));
- }
+ await runBestEffortJobMaintenance(env);
- // ── Process messages ────────────────────────────────────────────────────
- for (const message of batch.messages) {
- const parseResult = reviewJobMessageSchema.safeParse(message.body);
+ for (const message of batch.messages) {
+ const parseResult = reviewJobMessageSchema.safeParse(message.body);
- if (!parseResult.success) {
- // Malformed message — cannot be retried meaningfully. Ack it so
- // Cloudflare delivers it to the DLQ for inspection instead of burning
- // retries on something that will never be valid.
- logger.error('Invalid queue message schema; discarding message', {
- body: message.body,
- error: parseResult.error.flatten(),
- });
- message.ack();
- continue;
- }
+ if (!parseResult.success) {
+ logger.error('Invalid queue message schema; retrying so it can reach the DLQ', {
+ body: message.body,
+ error: parseResult.error.flatten(),
+ });
+ message.retry();
+ continue;
+ }
- try {
- await runReviewJob(env, parseResult.data);
- message.ack();
- } catch (error) {
- logger.error('Queue message processing failed; retrying', error instanceof Error ? error : new Error(String(error)));
- message.retry();
+ try {
+ const result = await runReviewJob(env, parseResult.data);
+ if (result.action === 'retry') {
+ message.retry({ delaySeconds: result.delaySeconds });
+ } else {
+ message.ack();
+ }
+ } catch (error) {
+ logger.error('Queue message processing failed; retrying', error instanceof Error ? error : new Error(String(error)));
+ message.retry();
+ }
}
- }
+
+ await runBestEffortJobMaintenance(env);
});
},
} satisfies ExportedHandler;
diff --git a/src/server/models/cloudflare.ts b/src/server/models/cloudflare.ts
index b0f20cc..4814f25 100644
--- a/src/server/models/cloudflare.ts
+++ b/src/server/models/cloudflare.ts
@@ -3,8 +3,89 @@ import type { AppBindings } from '@server/env';
import { TimeoutError } from '@server/core/timeout';
import type { ModelResponse } from './types';
-/** Max wall-clock time allowed for a single Workers-AI call (600 s). */
-const CLOUDFLARE_TIMEOUT_MS = 600_000;
+/** Max wall-clock time allowed for a single Workers-AI call. */
+const CLOUDFLARE_TIMEOUT_MS = 45_000;
+const CLOUDFLARE_MAX_RETRIES = 1;
+
+type UnknownRecord = Record;
+
+function isRecord(value: unknown): value is UnknownRecord {
+ return typeof value === 'object' && value !== null;
+}
+
+function isText(value: unknown): value is string {
+ return typeof value === 'string' && value.trim().length > 0;
+}
+
+function getRecord(value: unknown, key: string): UnknownRecord | null {
+ if (!isRecord(value)) return null;
+ const child = value[key];
+ return isRecord(child) ? child : null;
+}
+
+function getText(value: unknown, key: string): string | null {
+ if (!isRecord(value)) return null;
+ const child = value[key];
+ return isText(child) ? child.trim() : null;
+}
+
+function getNumber(value: unknown, key: string) {
+ if (!isRecord(value)) return null;
+ const child = value[key];
+ return typeof child === 'number' ? child : null;
+}
+
+function extractMessageContent(content: unknown): string | null {
+ if (isText(content)) return content.trim();
+
+ if (Array.isArray(content)) {
+ const text = content
+ .map((part) => {
+ if (isText(part)) return part;
+ if (isRecord(part) && isText(part.text)) return part.text;
+ return '';
+ })
+ .join('')
+ .trim();
+ return text || null;
+ }
+
+ return null;
+}
+
+function extractCloudflareText(result: unknown, model: string): string {
+ if (isText(result)) return result.trim();
+ const response = getText(result, 'response');
+ if (response) return response;
+
+ const nestedResult = getRecord(result, 'result');
+ const nestedResponse = getText(nestedResult, 'response');
+ if (nestedResponse) return nestedResponse;
+
+ const choices = isRecord(result) && Array.isArray(result.choices) ? result.choices : null;
+ const choice = choices?.[0];
+ const message = getRecord(choice, 'message');
+ const content = extractMessageContent(message?.content);
+ if (content) return content;
+
+ const finishReason = isRecord(choice) ? choice.finish_reason ?? choice.stop_reason : null;
+ if (finishReason) {
+ throw new Error(`Cloudflare model ${model} returned no review content (finish_reason=${finishReason}).`);
+ }
+ if (isText(message?.reasoning) || isText(message?.reasoning_content)) {
+ throw new Error(`Cloudflare model ${model} returned reasoning without review content.`);
+ }
+
+ throw new Error(`Cloudflare model ${model} returned an empty response.`);
+}
+
+function extractCloudflareUsage(result: unknown) {
+ const usage = getRecord(result, 'usage') ?? getRecord(getRecord(result, 'result'), 'usage');
+ return {
+ inputTokens: getNumber(usage, 'prompt_tokens') ?? 0,
+ outputTokens: getNumber(usage, 'completion_tokens') ?? 0,
+ };
+}
export async function reviewWithCloudflare(
env: Pick,
@@ -12,8 +93,8 @@ export async function reviewWithCloudflare(
input: { systemPrompt: string; userPrompt: string },
tracker?: { incrementSubrequests(count?: number): void },
): Promise {
- const maxRetries = 2;
- let lastError: any;
+ const maxRetries = CLOUDFLARE_MAX_RETRIES;
+ let lastError: unknown;
for (let attempt = 0; attempt <= maxRetries; attempt++) {
let timer: ReturnType | undefined;
@@ -39,22 +120,20 @@ export async function reviewWithCloudflare(
{ role: 'user', content: input.userPrompt },
],
max_completion_tokens: 4096,
+ temperature: 0,
}),
timeoutPromise,
]);
const durationMs = Date.now() - startTime;
logger.info(`AI model ${model} responded in ${durationMs}ms`);
- const rawText =
- result?.response ??
- result?.result?.response ??
- result?.choices?.[0]?.message?.content ??
- (typeof result === 'string' ? result : JSON.stringify(result));
+ const rawText = extractCloudflareText(result, model);
+ const usage = extractCloudflareUsage(result);
return {
rawText,
- inputTokens: result?.usage?.prompt_tokens ?? result?.result?.usage?.prompt_tokens ?? 0,
- outputTokens: result?.usage?.completion_tokens ?? result?.result?.usage?.completion_tokens ?? 0,
+ inputTokens: usage.inputTokens,
+ outputTokens: usage.outputTokens,
modelUsed: model,
provider: 'cloudflare',
};
diff --git a/src/server/models/google.ts b/src/server/models/google.ts
index bdd1d30..b742768 100644
--- a/src/server/models/google.ts
+++ b/src/server/models/google.ts
@@ -3,8 +3,10 @@ import type { AppBindings } from '@server/env';
import { withTimeout } from '@server/core/timeout';
import type { ModelResponse } from './types';
-/** Max wall-clock time allowed for a single Google AI Studio call (120 s). */
-const GOOGLE_TIMEOUT_MS = 120_000;
+/** Max wall-clock time allowed for a single Google AI Studio call. */
+const GOOGLE_TIMEOUT_MS = 45_000;
+const GOOGLE_MAX_RETRIES = 1;
+const GOOGLE_MAX_OUTPUT_TOKENS = 3072;
export async function reviewWithGoogle(
env: Pick,
@@ -15,8 +17,8 @@ export async function reviewWithGoogle(
logger.info(`Calling Google AI model: ${model}`);
const startTime = Date.now();
const url = `https://generativelanguage.googleapis.com/v1beta/models/${model}:generateContent?key=${env.GEMINI_API_KEY}`;
- const maxRetries = 2;
- let lastError: any;
+ const maxRetries = GOOGLE_MAX_RETRIES;
+ let lastError: unknown;
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
@@ -46,7 +48,7 @@ export async function reviewWithGoogle(
],
generationConfig: {
responseMimeType: 'application/json',
- maxOutputTokens: 4096,
+ maxOutputTokens: GOOGLE_MAX_OUTPUT_TOKENS,
},
}),
}),
diff --git a/src/server/prompts/file-review.ts b/src/server/prompts/file-review.ts
index 9a2846f..b2f1b28 100644
--- a/src/server/prompts/file-review.ts
+++ b/src/server/prompts/file-review.ts
@@ -12,6 +12,8 @@ Your goal is to identify bugs, security vulnerabilities, performance bottlenecks
4. Output EXACTLY ONE JSON object matching the schema below.
5. Focus on identifying critical issues (P0-P2). Nits (P3) should be minimized.
6. For each finding, provide a clear 'title', a 'body' explaining the issue, and 'code_location' (line or line_range).
+7. Return at most 10 findings. Keep each body under 160 words.
+8. If there are no material issues, return an empty findings array and a short explanation.
### SCHEMA FORMAT:
{
@@ -58,6 +60,8 @@ export function buildFileReviewPrompts(input: {
`File path: ${input.file.path}`,
languageGuidelines,
`Custom rules:\n${rules}`,
+ 'Review only the diff shown below. If the diff note says it was truncated, do not infer issues from omitted lines.',
+ 'Prioritize correctness, security, and production-impacting bugs. Avoid speculative style feedback.',
'',
`## Output JSON Schema (STRICTLY REQUIRED)`,
`{
diff --git a/src/server/routes/api/jobs.ts b/src/server/routes/api/jobs.ts
index 1125d79..88c2c2d 100644
--- a/src/server/routes/api/jobs.ts
+++ b/src/server/routes/api/jobs.ts
@@ -1,13 +1,17 @@
import { Hono } from 'hono';
-import { defaultRepoConfig, jobsQuerySchema } from '@shared/schema';
+import { jobsQuerySchema } from '@shared/schema';
import type { AppEnv } from '@server/env';
import { bytesToHex, getJobDetail, getJobForProcessing, insertJob, listJobs, mapJob, supersedeOlderJobs } from '@server/db/jobs';
import { jsonError } from '@server/core/http';
+import { runBestEffortJobMaintenance } from '@server/core/job-recovery';
+import { loadRepoConfig } from '@server/core/config';
export function createJobsRouter() {
const app = new Hono();
app.get('/', async (c) => {
+ await runBestEffortJobMaintenance(c.env);
+
const rawQuery = c.req.query();
const query = jobsQuerySchema.parse(rawQuery);
@@ -16,6 +20,8 @@ export function createJobsRouter() {
});
app.get('/:id', async (c) => {
+ await runBestEffortJobMaintenance(c.env);
+
const job = await getJobDetail(c.env, c.req.param('id'));
if (!job) {
return jsonError('Job not found.', 404);
@@ -30,6 +36,11 @@ export function createJobsRouter() {
return jsonError('Job not found.', 404);
}
const source = mapJob(rawSource);
+ const currentConfig = await loadRepoConfig(c.env, {
+ installationId: source.installationId,
+ owner: source.owner,
+ repo: source.repo,
+ });
const job = await insertJob(c.env, {
installationId: source.installationId,
@@ -43,7 +54,7 @@ export function createJobsRouter() {
trigger: 'retry',
headRef: rawSource.head_ref,
baseRef: rawSource.base_ref,
- configSnapshot: source.configSnapshot ?? defaultRepoConfig,
+ configSnapshot: currentConfig.parsedJson,
retryOfJobId: source.id,
});
@@ -60,6 +71,7 @@ export function createJobsRouter() {
await c.env.REVIEW_QUEUE.send({
jobId: job.id,
deliveryId: crypto.randomUUID(),
+ phase: 'prepare',
requestId: c.get('requestId'),
});
diff --git a/src/server/routes/webhook.ts b/src/server/routes/webhook.ts
index f1f7dc6..3241b73 100644
--- a/src/server/routes/webhook.ts
+++ b/src/server/routes/webhook.ts
@@ -115,6 +115,7 @@ export function createWebhookRouter() {
await c.env.REVIEW_QUEUE.send({
jobId: job.id,
deliveryId,
+ phase: 'prepare',
requestId: c.get('requestId'),
});
diff --git a/src/server/services/model.ts b/src/server/services/model.ts
index afff3eb..649a744 100644
--- a/src/server/services/model.ts
+++ b/src/server/services/model.ts
@@ -4,6 +4,7 @@ import { reviewWithCloudflare } from '../models/cloudflare';
import { buildFileReviewPrompts } from '../prompts/file-review';
import { buildSummaryPrompt, SUMMARY_SYSTEM_PROMPT } from '../prompts/summary';
import { parseFileReviewResponse } from '../core/model-output';
+import { truncateFileDiff } from '../core/diff';
import type { RepoConfig } from '@shared/schema';
import type { TokenTracker } from '../core/token-tracker';
import type { ModelResponse } from '../models/types';
@@ -11,10 +12,33 @@ import { logger } from '../core/logger';
import { normalizeModelId } from '@shared/schema';
const DEFAULT_GOOGLE_FALLBACK = 'gemma-4-31b-it';
+const PROVIDER_UNAVAILABLE_TTL_SECONDS = 24 * 60 * 60;
+const COMPACT_REVIEW_PROMPT_LINE_CAP = 400;
const MODEL_ALIASES: Record = {
'gemma-4-31b': 'gemma-4-31b-it',
'gemma-4-26b': 'gemma-4-26b-a4b-it',
};
+type ModelProvider = 'cloudflare';
+
+export class RetryableModelError extends Error {
+ readonly retryable = true;
+
+ constructor(message: string, cause?: unknown) {
+ super(message);
+ this.name = 'RetryableModelError';
+ if (cause !== undefined) {
+ Object.defineProperty(this, 'cause', {
+ value: cause,
+ writable: true,
+ configurable: true,
+ });
+ }
+ }
+}
+
+export function isRetryableModelError(error: unknown) {
+ return Boolean(error && typeof error === 'object' && 'retryable' in error && error.retryable === true);
+}
function isCloudflareModel(model: string) {
return model.startsWith('@cf/');
@@ -38,8 +62,73 @@ function isGoogleRateLimitError(error: unknown) {
return message.includes('429') || message.includes('RESOURCE_EXHAUSTED') || message.toLowerCase().includes('quota exceeded');
}
+function isTransientModelFailure(error: unknown) {
+ if (isRetryableModelError(error)) return true;
+ if (isCloudflareAllocationError(error)) return false;
+ const message = error instanceof Error ? error.message : String(error);
+ const lower = message.toLowerCase();
+
+ return (
+ isGoogleRateLimitError(error) ||
+ /\b50[0-9]\b/.test(message) ||
+ lower.includes('internal error') ||
+ lower.includes('unavailable') ||
+ lower.includes('high demand') ||
+ lower.includes('timeout') ||
+ lower.includes('timed out') ||
+ lower.includes('fetch failed') ||
+ lower.includes('network') ||
+ lower.includes('temporar') ||
+ lower.includes('returned no review content') ||
+ lower.includes('empty response') ||
+ lower.includes('[redacted]')
+ );
+}
+
export class ModelService {
- constructor(private env: AppBindings, private tracker?: TokenTracker) {}
+ constructor(
+ private env: AppBindings,
+ private tracker?: TokenTracker,
+ private options: { jobId?: string } = {},
+ ) {}
+
+ private providerUnavailableKey(provider: ModelProvider) {
+ return this.options.jobId ? `jobs:${this.options.jobId}:provider-unavailable:${provider}` : null;
+ }
+
+ private async isProviderUnavailable(provider: ModelProvider) {
+ const key = this.providerUnavailableKey(provider);
+ if (!key) return false;
+
+ try {
+ return (await this.env.APP_KV.get(key)) !== null;
+ } catch (error) {
+ logger.warn(`Failed to read unavailable provider marker for ${provider}`, {
+ error: error instanceof Error ? error.message : String(error),
+ });
+ return false;
+ }
+ }
+
+ private async markProviderUnavailable(provider: ModelProvider, reason: string) {
+ const key = this.providerUnavailableKey(provider);
+ if (!key) return;
+
+ try {
+ await this.env.APP_KV.put(
+ key,
+ JSON.stringify({
+ reason,
+ markedAt: new Date().toISOString(),
+ }),
+ { expirationTtl: PROVIDER_UNAVAILABLE_TTL_SECONDS },
+ );
+ } catch (error) {
+ logger.warn(`Failed to write unavailable provider marker for ${provider}`, {
+ error: error instanceof Error ? error.message : String(error),
+ });
+ }
+ }
private selectModel(params: {
totalLineCount: number;
@@ -97,9 +186,16 @@ export class ModelService {
prDescription: string | null;
config: RepoConfig;
totalLineCount: number;
+ compactPrompt?: boolean;
}) {
+ const configuredLineCap = params.config.review.max_diff_lines_per_file;
+ const modelLineCap = params.compactPrompt
+ ? Math.min(configuredLineCap, COMPACT_REVIEW_PROMPT_LINE_CAP)
+ : configuredLineCap;
+ const reviewFile = truncateFileDiff(params.file, modelLineCap);
const { systemPrompt, userPrompt } = buildFileReviewPrompts({
...params,
+ file: reviewFile,
config: params.config.review,
});
@@ -109,16 +205,17 @@ export class ModelService {
});
const modelsToTry = [primary, ...fallbacks];
- let lastError: any;
- const unavailableProviders = new Set();
+ let lastError: unknown;
+ let lastTransientError: unknown;
+ let sawTransientFailure = false;
for (const currentModel of modelsToTry) {
- if (isCloudflareModel(currentModel) && unavailableProviders.has('cloudflare')) {
- logger.warn(`Skipping Cloudflare model ${currentModel} because Cloudflare AI allocation is unavailable`);
+ if (isCloudflareModel(currentModel) && await this.isProviderUnavailable('cloudflare')) {
+ logger.warn(`Skipping Cloudflare model ${currentModel} because Cloudflare AI allocation is unavailable for job ${this.options.jobId ?? 'unknown'}`);
continue;
}
let attempts = 0;
- const maxAttempts = 2;
+ const maxAttempts = 1;
while (attempts < maxAttempts) {
try {
@@ -133,19 +230,26 @@ export class ModelService {
...response,
parsed,
userPrompt,
+ reviewedLineCount: reviewFile.lineCount,
+ wasPromptTruncated: reviewFile.isTruncated === true,
};
- } catch (error: any) {
+ } catch (error) {
lastError = error;
+ if (isTransientModelFailure(error)) {
+ sawTransientFailure = true;
+ lastTransientError = error;
+ }
attempts++;
if (isCloudflareModel(currentModel) && isCloudflareAllocationError(error)) {
- unavailableProviders.add('cloudflare');
+ await this.markProviderUnavailable('cloudflare', error instanceof Error ? error.message : String(error));
}
const isRateLimit = isGoogleRateLimitError(error);
const isRetryable = false;
+ const errorMessage = error instanceof Error ? error.message : String(error);
logger.warn(`Model ${currentModel} failed for ${params.file.path} (attempt ${attempts}/${maxAttempts})`, {
- error: error.message || error,
+ error: errorMessage,
rateLimited: isRateLimit,
willRetrySameModel: isRetryable,
willTryFallback: !isRetryable && modelsToTry.indexOf(currentModel) < modelsToTry.length - 1
@@ -159,6 +263,15 @@ export class ModelService {
}
}
+ if (sawTransientFailure) {
+ const retryCause = lastTransientError ?? lastError;
+ const lastMessage = retryCause instanceof Error ? retryCause.message : String(retryCause ?? 'Unknown model error');
+ throw new RetryableModelError(
+ `All configured review models failed for ${params.file.path}; retrying later. Last error: ${lastMessage}`,
+ retryCause,
+ );
+ }
+
throw lastError;
}
@@ -171,11 +284,12 @@ export class ModelService {
const { primary, fallbacks } = this.selectModel({ totalLineCount: 0, config: params.config });
const modelsToTry = [primary, ...fallbacks];
- let lastError: any;
- const unavailableProviders = new Set();
+ let lastError: unknown;
+ let lastTransientError: unknown;
+ let sawTransientFailure = false;
for (const currentModel of modelsToTry) {
- if (isCloudflareModel(currentModel) && unavailableProviders.has('cloudflare')) {
- logger.warn(`Skipping Cloudflare summary model ${currentModel} because Cloudflare AI allocation is unavailable`);
+ if (isCloudflareModel(currentModel) && await this.isProviderUnavailable('cloudflare')) {
+ logger.warn(`Skipping Cloudflare summary model ${currentModel} because Cloudflare AI allocation is unavailable for job ${this.options.jobId ?? 'unknown'}`);
continue;
}
@@ -190,15 +304,28 @@ export class ModelService {
}
return response;
- } catch (error: any) {
+ } catch (error) {
lastError = error;
+ if (isTransientModelFailure(error)) {
+ sawTransientFailure = true;
+ lastTransientError = error;
+ }
if (isCloudflareModel(currentModel) && isCloudflareAllocationError(error)) {
- unavailableProviders.add('cloudflare');
+ await this.markProviderUnavailable('cloudflare', error instanceof Error ? error.message : String(error));
}
- logger.warn(`Summary model ${currentModel} failed`, { error: error.message || error });
+ logger.warn(`Summary model ${currentModel} failed`, { error: error instanceof Error ? error.message : String(error) });
}
}
+ if (sawTransientFailure) {
+ const retryCause = lastTransientError ?? lastError;
+ const lastMessage = retryCause instanceof Error ? retryCause.message : String(retryCause ?? 'Unknown model error');
+ throw new RetryableModelError(
+ `All configured summary models failed; retrying later. Last error: ${lastMessage}`,
+ retryCause,
+ );
+ }
+
throw lastError;
}
}
diff --git a/src/shared/schema.ts b/src/shared/schema.ts
index cf12a90..63bc503 100644
--- a/src/shared/schema.ts
+++ b/src/shared/schema.ts
@@ -145,6 +145,7 @@ export const repoConfigSchema = z.object({
export const reviewJobMessageSchema = z.object({
jobId: z.string().uuid().optional(),
deliveryId: z.string().min(1),
+ phase: z.enum(['prepare', 'review', 'finalize']).optional(),
eventName: z.string().min(1).optional(),
payload: z.any().optional(),
installationId: z.string().min(1).optional(),
diff --git a/test/api.spec.ts b/test/api.spec.ts
index 1b0ccd5..ce29724 100644
--- a/test/api.spec.ts
+++ b/test/api.spec.ts
@@ -1,5 +1,5 @@
import { createApp } from '@server/app';
-import { insertJob } from '@server/db/jobs';
+import { getJobForProcessing, insertJob } from '@server/db/jobs';
import { insertFileReview } from '@server/db/file-reviews';
import { getRepoConfigRecord } from '@server/db/repo-configs';
import { loadRepoConfig, updateGlobalConfig } from '@server/core/config';
@@ -545,6 +545,7 @@ describe('Dashboard API Suite', () => {
});
expect(loaded.parsedJson.model.main).toBe('@cf/zai-org/glm-4.7-flash');
+ expect(loaded.parsedJson.model.fallbacks).toEqual([]);
const record = await getRepoConfigRecord(env, 'api-test-owner', repo);
expect(record?.mainModel).toBeNull();
@@ -566,6 +567,73 @@ describe('Dashboard API Suite', () => {
expect(reloaded.parsedJson.model.main).toBe('gemma-4-26b-a4b-it');
});
+ it('uses the current global model strategy when retrying an older job', async () => {
+ const env = createTestEnv();
+ const token = await getAuthCookie(env);
+ const repo = `retry-current-config-${Date.now()}`;
+
+ const source = await insertJob(env, {
+ installationId: '123',
+ owner: 'api-test-owner',
+ repo,
+ prNumber: 12,
+ prTitle: 'Retry Current Config',
+ prAuthor: 'author',
+ commitSha: 'a'.repeat(40),
+ baseSha: 'b'.repeat(40),
+ trigger: 'auto',
+ headRef: 'feature',
+ baseRef: 'main',
+ configSnapshot: {
+ ...defaultRepoConfig,
+ model: {
+ main: 'gemma-4-31b-it',
+ fallbacks: ['gemma-4-26b-a4b-it', '@cf/zai-org/glm-4.7-flash'],
+ size_overrides: [],
+ },
+ },
+ });
+
+ await updateGlobalConfig(env, {
+ main: 'gemma-4-31b-it',
+ fallbacks: ['gemma-4-26b-a4b-it'],
+ size_overrides: [
+ {
+ max_lines: 300,
+ model: 'gemma-4-31b-it',
+ fallbacks: ['gemma-4-26b-a4b-it'],
+ },
+ ],
+ });
+
+ const response = await app.request(`/api/jobs/${source.id}/retry`, {
+ method: 'POST',
+ headers: {
+ Cookie: `codra_session=${token}`,
+ 'x-requested-with': 'XMLHttpRequest',
+ },
+ }, env);
+
+ expect(response.status).toBe(202);
+ const body = await response.json() as { job: { id: string } };
+ const retry = await getJobForProcessing(env, body.job.id);
+ const snapshot = typeof retry?.config_snapshot === 'string'
+ ? JSON.parse(retry.config_snapshot)
+ : retry?.config_snapshot;
+
+ expect(snapshot.model).toEqual({
+ main: 'gemma-4-31b-it',
+ fallbacks: ['gemma-4-26b-a4b-it'],
+ size_overrides: [
+ {
+ max_lines: 300,
+ model: 'gemma-4-31b-it',
+ fallbacks: ['gemma-4-26b-a4b-it'],
+ },
+ ],
+ });
+ });
+
it('accepts legacy jobId-only queue messages during schema transition', () => {
const parsed = reviewJobMessageSchema.safeParse({
jobId: crypto.randomUUID(),
diff --git a/test/diff.spec.ts b/test/diff.spec.ts
index d02313f..71060e5 100644
--- a/test/diff.spec.ts
+++ b/test/diff.spec.ts
@@ -106,8 +106,29 @@ Binary files a/image.png and b/image.png differ
const truncated = truncateFileDiff(largeFile, 60);
expect(truncated.isTruncated).toBe(true);
- expect(truncated.hunks).toHaveLength(1); // Second hunk exceeds limit
- expect(truncated.lineCount).toBe(50);
+ expect(truncated.hunks).toHaveLength(2);
+ expect(truncated.hunks[1].lines).toHaveLength(10);
+ expect(truncated.lineCount).toBe(60);
+ });
+
+ it('slices a single oversized hunk to the line limit', () => {
+ const largeFile = {
+ path: 'large.ts',
+ previousPath: null,
+ isNew: false,
+ isDeleted: false,
+ isBinary: false,
+ lineCount: 500,
+ hunks: [
+ { header: '@@ -1,500 +1,500 @@', lines: Array(500).fill({ kind: 'add', content: 'line', position: 1 }) },
+ ],
+ } as any;
+
+ const truncated = truncateFileDiff(largeFile, 300);
+ expect(truncated.isTruncated).toBe(true);
+ expect(truncated.hunks).toHaveLength(1);
+ expect(truncated.hunks[0].lines).toHaveLength(300);
+ expect(truncated.lineCount).toBe(300);
});
});
diff --git a/test/helpers.ts b/test/helpers.ts
index 251b8b7..440fd50 100644
--- a/test/helpers.ts
+++ b/test/helpers.ts
@@ -49,8 +49,8 @@ export class MockAssets {
export class MockQueue {
public readonly sent: any[] = [];
- async send(message: any) {
- this.sent.push(message);
+ async send(message: any, options?: { delaySeconds?: number }) {
+ this.sent.push({ ...message, options });
}
}
@@ -74,6 +74,10 @@ export function getTestDatabaseUrl() {
return requiredEnv('TEST_DATABASE_URL');
}
+export function hasConfiguredTestDatabaseUrl() {
+ return Boolean(usableEnvValue(process.env.TEST_DATABASE_URL));
+}
+
export function createTestEnv(overrides: Partial = {}): AppBindings {
return {
AI: {
diff --git a/test/model-service.spec.ts b/test/model-service.spec.ts
index 22f522d..dc5aed9 100644
--- a/test/model-service.spec.ts
+++ b/test/model-service.spec.ts
@@ -1,8 +1,14 @@
-import { describe, expect, it } from 'vitest';
-import { ModelService } from '@server/services/model';
+import { afterEach, describe, expect, it, vi } from 'vitest';
+import { isRetryableModelError, ModelService } from '@server/services/model';
+import { reviewWithCloudflare } from '@server/models/cloudflare';
import { createTestEnv } from './helpers';
+import { defaultRepoConfig } from '@shared/schema';
describe('ModelService', () => {
+ afterEach(() => {
+ vi.restoreAllMocks();
+ });
+
it('routes legacy Kimi K2.5 ids to Kimi K2.6 for new Cloudflare requests', async () => {
let requestedModel = '';
const env = createTestEnv({
@@ -23,4 +29,300 @@ describe('ModelService', () => {
expect(requestedModel).toBe('@cf/moonshotai/kimi-k2.6');
expect(response.modelUsed).toBe('@cf/moonshotai/kimi-k2.6');
});
+
+ it('preserves an explicitly empty fallback chain', () => {
+ const service = new ModelService(createTestEnv());
+ const selected = (service as any).selectModel({
+ totalLineCount: 500,
+ config: {
+ ...defaultRepoConfig,
+ model: {
+ main: 'gemma-4-31b-it',
+ fallbacks: [],
+ size_overrides: [],
+ },
+ },
+ });
+
+ expect(selected).toEqual({
+ primary: 'gemma-4-31b-it',
+ fallbacks: [],
+ });
+ });
+
+ it('rejects Cloudflare reasoning-only responses instead of trying to parse the response envelope', async () => {
+ const env = createTestEnv({
+ AI: {
+ async run() {
+ return {
+ choices: [
+ {
+ message: {
+ content: null,
+ reasoning: 'Long reasoning that consumed the completion budget.',
+ },
+ finish_reason: 'length',
+ },
+ ],
+ usage: { prompt_tokens: 1, completion_tokens: 4096 },
+ };
+ },
+ } as any,
+ });
+
+ await expect(
+ reviewWithCloudflare(env, '@cf/moonshotai/kimi-k2.6', {
+ systemPrompt: 'system',
+ userPrompt: 'user',
+ }),
+ ).rejects.toThrow('returned no review content');
+ });
+
+ it('retries the same Cloudflare model once before failing it', async () => {
+ let attempts = 0;
+ const env = createTestEnv({
+ AI: {
+ async run() {
+ attempts++;
+ throw new Error('temporary provider error');
+ },
+ } as any,
+ });
+
+ await expect(
+ reviewWithCloudflare(env, '@cf/zai-org/glm-4.7-flash', {
+ systemPrompt: 'system',
+ userPrompt: 'user',
+ }),
+ ).rejects.toThrow('temporary provider error');
+ expect(attempts).toBe(2);
+ });
+
+ it('marks exhausted transient provider failures as retryable for the queue', async () => {
+ const env = createTestEnv({
+ AI: {
+ async run() {
+ throw new Error('[REDACTED]');
+ },
+ } as any,
+ });
+
+ const service = new ModelService(env);
+ await expect(
+ service.reviewFile({
+ file: {
+ path: 'test/setup.ts',
+ lineCount: 1,
+ hunks: [],
+ isDeleted: false,
+ isBinary: false,
+ isNew: false,
+ previousPath: null,
+ },
+ prTitle: 'Test',
+ prDescription: null,
+ config: {
+ review: {
+ on: ['opened'],
+ ignore_drafts: true,
+ mention_trigger: '@codra-app',
+ skip_files: [],
+ max_files: 15,
+ large_file_threshold_lines: 200,
+ max_diff_lines_per_file: 800,
+ max_total_diff_chars: 150_000,
+ focus: ['quality'],
+ custom_rules: [],
+ labels: false,
+ exec: {
+ enabled: false,
+ on_file_types: ['.ts'],
+ command: 'npm run lint',
+ },
+ },
+ model: {
+ main: '@cf/zai-org/glm-4.7-flash',
+ fallbacks: [],
+ size_overrides: [],
+ },
+ },
+ totalLineCount: 1,
+ }),
+ ).rejects.toSatisfy(isRetryableModelError);
+ });
+
+ it('skips Cloudflare for the rest of a job after allocation is exhausted', async () => {
+ let cloudflareCalls = 0;
+ const fetchMock = vi.spyOn(globalThis, 'fetch').mockImplementation(async () =>
+ new Response(
+ JSON.stringify({
+ candidates: [{ content: { parts: [{ text: '{"findings":[]}' }] } }],
+ usageMetadata: { promptTokenCount: 1, candidatesTokenCount: 1 },
+ }),
+ { status: 200, headers: { 'content-type': 'application/json' } },
+ ),
+ );
+ const env = createTestEnv({
+ AI: {
+ async run() {
+ cloudflareCalls++;
+ throw new Error('Cloudflare daily free allocation exhausted (4006)');
+ },
+ } as any,
+ GEMINI_API_KEY: 'test-key',
+ });
+ const service = new ModelService(env, undefined, { jobId: 'job-provider-skip' });
+ const file = {
+ path: 'src/app.ts',
+ lineCount: 1,
+ hunks: [],
+ isDeleted: false,
+ isBinary: false,
+ isNew: false,
+ previousPath: null,
+ };
+ const config = {
+ ...defaultRepoConfig,
+ model: {
+ main: '@cf/zai-org/glm-4.7-flash',
+ fallbacks: ['gemma-4-31b-it'],
+ size_overrides: [],
+ },
+ };
+
+ await service.reviewFile({
+ file,
+ prTitle: 'Test',
+ prDescription: null,
+ config,
+ totalLineCount: 1,
+ });
+ await service.reviewFile({
+ file: { ...file, path: 'src/other.ts' },
+ prTitle: 'Test',
+ prDescription: null,
+ config,
+ totalLineCount: 1,
+ });
+
+ expect(cloudflareCalls).toBe(1);
+ expect(fetchMock).toHaveBeenCalledTimes(2);
+ });
+
+ it('uses the configured Gemma prompt cap and output token budget on the first attempt', async () => {
+ let requestBody: any = null;
+ const fetchMock = vi.spyOn(globalThis, 'fetch').mockImplementation(async (_url, init) => {
+ requestBody = JSON.parse(String(init?.body));
+ return new Response(
+ JSON.stringify({
+ candidates: [{ content: { parts: [{ text: '{"findings":[],"overall_correctness":"patch is correct","overall_explanation":"ok","overall_confidence_score":0.9}' }] } }],
+ usageMetadata: { promptTokenCount: 1, candidatesTokenCount: 1 },
+ }),
+ { status: 200, headers: { 'content-type': 'application/json' } },
+ );
+ });
+ const env = createTestEnv({ GEMINI_API_KEY: 'test-key' });
+ const service = new ModelService(env);
+ const largeFile = {
+ path: 'src/large.ts',
+ previousPath: null,
+ isNew: false,
+ isDeleted: false,
+ isBinary: false,
+ lineCount: 900,
+ hunks: [
+ {
+ header: '@@ -1,900 +1,900 @@',
+ lines: Array.from({ length: 900 }, (_, index) => ({
+ kind: 'add' as const,
+ content: `const value${index} = ${index};`,
+ newLineNumber: index + 1,
+ position: index + 1,
+ })),
+ },
+ ],
+ };
+
+ const response = await service.reviewFile({
+ file: largeFile,
+ prTitle: 'Test',
+ prDescription: null,
+ config: {
+ ...defaultRepoConfig,
+ model: {
+ main: 'gemma-4-31b-it',
+ fallbacks: [],
+ size_overrides: [],
+ },
+ },
+ totalLineCount: 500,
+ });
+
+ const userPrompt = requestBody.contents[0].parts[0].text as string;
+ expect(fetchMock).toHaveBeenCalledOnce();
+ expect(requestBody.generationConfig.maxOutputTokens).toBe(3072);
+ expect(userPrompt).toContain('[NOTE: This diff has been truncated from 900 lines to 800 lines for brevity.]');
+ expect(userPrompt).toContain('const value799 = 799;');
+ expect(userPrompt).not.toContain('const value800 = 800;');
+ expect(response.reviewedLineCount).toBe(800);
+ expect(response.wasPromptTruncated).toBe(true);
+ });
+
+ it('uses a compact Gemma prompt only after a prior transient failure', async () => {
+ let requestBody: any = null;
+ vi.spyOn(globalThis, 'fetch').mockImplementation(async (_url, init) => {
+ requestBody = JSON.parse(String(init?.body));
+ return new Response(
+ JSON.stringify({
+ candidates: [{ content: { parts: [{ text: '{"findings":[],"overall_correctness":"patch is correct","overall_explanation":"ok","overall_confidence_score":0.9}' }] } }],
+ usageMetadata: { promptTokenCount: 1, candidatesTokenCount: 1 },
+ }),
+ { status: 200, headers: { 'content-type': 'application/json' } },
+ );
+ });
+ const env = createTestEnv({ GEMINI_API_KEY: 'test-key' });
+ const service = new ModelService(env);
+ const largeFile = {
+ path: 'src/large.ts',
+ previousPath: null,
+ isNew: false,
+ isDeleted: false,
+ isBinary: false,
+ lineCount: 900,
+ hunks: [
+ {
+ header: '@@ -1,900 +1,900 @@',
+ lines: Array.from({ length: 900 }, (_, index) => ({
+ kind: 'add' as const,
+ content: `const value${index} = ${index};`,
+ newLineNumber: index + 1,
+ position: index + 1,
+ })),
+ },
+ ],
+ };
+
+ const response = await service.reviewFile({
+ file: largeFile,
+ prTitle: 'Test',
+ prDescription: null,
+ config: {
+ ...defaultRepoConfig,
+ model: {
+ main: 'gemma-4-31b-it',
+ fallbacks: [],
+ size_overrides: [],
+ },
+ },
+ totalLineCount: 900,
+ compactPrompt: true,
+ });
+
+ const userPrompt = requestBody.contents[0].parts[0].text as string;
+ expect(userPrompt).toContain('[NOTE: This diff has been truncated from 900 lines to 400 lines for brevity.]');
+ expect(userPrompt).toContain('const value399 = 399;');
+ expect(userPrompt).not.toContain('const value400 = 400;');
+ expect(response.reviewedLineCount).toBe(400);
+ expect(response.wasPromptTruncated).toBe(true);
+ });
});
diff --git a/test/resumable-queue.spec.ts b/test/resumable-queue.spec.ts
new file mode 100644
index 0000000..c494817
--- /dev/null
+++ b/test/resumable-queue.spec.ts
@@ -0,0 +1,293 @@
+import worker from '@server/index';
+import { claimJobLease, getJobForProcessing, insertJob, markJobContinuationQueued, recoverExpiredJobLeases, releaseJobLease } from '@server/db/jobs';
+import { getFileReviewsForJobs, recordRetryableFileReviewFailure, upsertFileReview } from '@server/db/file-reviews';
+import { getDb } from '@server/db/client';
+import { createTestEnv, hasConfiguredTestDatabaseUrl } from './helpers';
+
+const sha = (char: string) => char.repeat(40);
+const dbDescribe = hasConfiguredTestDatabaseUrl() ? describe : describe.skip;
+
+dbDescribe('resumable queue primitives', () => {
+ const env = createTestEnv();
+
+ it('sets a fresh lease when claiming a queued job', async () => {
+ const job = await insertJob(env, {
+ installationId: '123',
+ owner: 'test-owner',
+ repo: `lease-${Date.now()}`,
+ prNumber: 1,
+ prTitle: 'Lease Test',
+ prAuthor: 'author',
+ commitSha: sha('a'),
+ baseSha: sha('b'),
+ trigger: 'auto',
+ headRef: 'feature',
+ baseRef: 'main',
+ });
+
+ const claim = await claimJobLease(env, job.id, 'lease-a', 600);
+ expect(claim.status).toBe('claimed');
+
+ const row = await getJobForProcessing(env, job.id);
+ expect(row?.status).toBe('running');
+ expect(row?.lease_owner).toBe('lease-a');
+ expect(row?.lease_expires_at).toBeTruthy();
+ expect(row?.heartbeat_at).toBeTruthy();
+ });
+
+ it('reports busy for a fresh duplicate delivery instead of reclaiming', async () => {
+ const job = await insertJob(env, {
+ installationId: '123',
+ owner: 'test-owner',
+ repo: `busy-${Date.now()}`,
+ prNumber: 1,
+ prTitle: 'Busy Test',
+ prAuthor: 'author',
+ commitSha: sha('c'),
+ baseSha: sha('d'),
+ trigger: 'auto',
+ headRef: 'feature',
+ baseRef: 'main',
+ });
+
+ await claimJobLease(env, job.id, 'lease-a', 600);
+ const duplicate = await claimJobLease(env, job.id, 'lease-b', 600);
+ expect(duplicate.status).toBe('busy');
+ });
+
+ it('reclaims an expired lease', async () => {
+ const job = await insertJob(env, {
+ installationId: '123',
+ owner: 'test-owner',
+ repo: `expired-${Date.now()}`,
+ prNumber: 1,
+ prTitle: 'Expired Test',
+ prAuthor: 'author',
+ commitSha: sha('e'),
+ baseSha: sha('f'),
+ trigger: 'auto',
+ headRef: 'feature',
+ baseRef: 'main',
+ });
+
+ await claimJobLease(env, job.id, 'lease-a', 600);
+ await getDb(env).query(`UPDATE jobs SET lease_expires_at = now() - interval '1 minute' WHERE id = $1`, [job.id]);
+
+ const reclaimed = await claimJobLease(env, job.id, 'lease-b', 600);
+ expect(reclaimed.status).toBe('claimed');
+
+ const row = await getJobForProcessing(env, job.id);
+ expect(row?.lease_owner).toBe('lease-b');
+ });
+
+ it('fails repeatedly expired jobs after the recovery limit', async () => {
+ const job = await insertJob(env, {
+ installationId: '123',
+ owner: 'test-owner',
+ repo: `recovery-${Date.now()}`,
+ prNumber: 1,
+ prTitle: 'Recovery Test',
+ prAuthor: 'author',
+ commitSha: sha('1'),
+ baseSha: sha('2'),
+ trigger: 'auto',
+ headRef: 'feature',
+ baseRef: 'main',
+ });
+
+ await claimJobLease(env, job.id, 'lease-a', 600);
+ await getDb(env).query(
+ `UPDATE jobs SET lease_expires_at = now() - interval '1 minute', recovery_count = 3 WHERE id = $1`,
+ [job.id],
+ );
+
+ const recovered = await recoverExpiredJobLeases(env, 3);
+ expect(recovered.failedJobs.map((row) => row.id)).toContain(job.id);
+
+ const row = await getJobForProcessing(env, job.id);
+ expect(row?.status).toBe('failed');
+ });
+
+ it('requeues running jobs that have no lease and an old continuation handoff', async () => {
+ const job = await insertJob(env, {
+ installationId: '123',
+ owner: 'test-owner',
+ repo: `unleased-${Date.now()}`,
+ prNumber: 1,
+ prTitle: 'Unleased Test',
+ prAuthor: 'author',
+ commitSha: sha('5'),
+ baseSha: sha('6'),
+ trigger: 'auto',
+ headRef: 'feature',
+ baseRef: 'main',
+ });
+
+ await claimJobLease(env, job.id, 'lease-a', 600);
+ await getDb(env).query(
+ `
+ UPDATE jobs
+ SET lease_owner = NULL,
+ lease_expires_at = NULL,
+ heartbeat_at = now() - interval '5 minutes',
+ last_queue_message_at = now() - interval '5 minutes'
+ WHERE id = $1
+ `,
+ [job.id],
+ );
+
+ const recovered = await recoverExpiredJobLeases(env, 3, 120);
+ expect(recovered.requeuedJobIds).toContain(job.id);
+
+ const row = await getJobForProcessing(env, job.id);
+ expect(row?.status).toBe('running');
+ expect(row?.lease_owner).toBeNull();
+ expect(row?.recovery_count).toBe(1);
+ expect(row?.error_msg).toBeNull();
+ });
+
+ it('does not recover an unleased job that just scheduled a retry continuation', async () => {
+ const job = await insertJob(env, {
+ installationId: '123',
+ owner: 'test-owner',
+ repo: `retry-handoff-${Date.now()}`,
+ prNumber: 1,
+ prTitle: 'Retry Handoff Test',
+ prAuthor: 'author',
+ commitSha: sha('7'),
+ baseSha: sha('8'),
+ trigger: 'auto',
+ headRef: 'feature',
+ baseRef: 'main',
+ });
+
+ await claimJobLease(env, job.id, 'lease-a', 600);
+ await getDb(env).query(
+ `
+ UPDATE jobs
+ SET heartbeat_at = now() - interval '10 minutes',
+ last_queue_message_at = now() - interval '10 minutes'
+ WHERE id = $1
+ `,
+ [job.id],
+ );
+
+ await markJobContinuationQueued(env, job.id);
+ await releaseJobLease(env, job.id, 'lease-a');
+
+ const recovered = await recoverExpiredJobLeases(env, 3, 120);
+ expect(recovered.requeuedJobIds).not.toContain(job.id);
+
+ const row = await getJobForProcessing(env, job.id);
+ expect(row?.status).toBe('running');
+ expect(row?.lease_owner).toBeNull();
+ expect(row?.recovery_count).toBe(0);
+ });
+
+ it('upserts file reviews without duplicating the same file', async () => {
+ const job = await insertJob(env, {
+ installationId: '123',
+ owner: 'test-owner',
+ repo: `upsert-${Date.now()}`,
+ prNumber: 1,
+ prTitle: 'Upsert Test',
+ prAuthor: 'author',
+ commitSha: sha('3'),
+ baseSha: sha('4'),
+ trigger: 'auto',
+ headRef: 'feature',
+ baseRef: 'main',
+ });
+
+ const baseReview = {
+ filePath: 'src/app.ts',
+ fileStatus: 'done' as const,
+ modelUsed: 'test-model',
+ modelProvider: 'test-provider',
+ diffLineCount: 1,
+ diffInput: 'diff',
+ rawAiOutput: '{}',
+ parsedComments: [],
+ inputTokens: 1,
+ outputTokens: 1,
+ durationMs: 1,
+ verdict: 'approve' as const,
+ fileSummary: 'ok',
+ errorMessage: null,
+ };
+
+ await upsertFileReview(env, job.id, baseReview);
+ await upsertFileReview(env, job.id, { ...baseReview, fileSummary: 'updated' });
+
+ const reviews = await getFileReviewsForJobs(env, [job.id]);
+ expect(reviews).toHaveLength(1);
+ expect(reviews[0].file_summary).toBe('updated');
+ });
+
+ it('tracks retryable file review failures and resets the count after success', async () => {
+ const job = await insertJob(env, {
+ installationId: '123',
+ owner: 'test-owner',
+ repo: `transient-file-${Date.now()}`,
+ prNumber: 1,
+ prTitle: 'Transient File Test',
+ prAuthor: 'author',
+ commitSha: sha('9'),
+ baseSha: sha('0'),
+ trigger: 'auto',
+ headRef: 'feature',
+ baseRef: 'main',
+ });
+
+ const failureInput = {
+ filePath: 'src/app.ts',
+ modelUsed: 'gemma-4-31b-it',
+ modelProvider: 'google',
+ diffLineCount: 1,
+ diffInput: 'diff',
+ durationMs: 1,
+ errorMessage: 'All configured review models failed; retrying later.',
+ };
+
+ await expect(recordRetryableFileReviewFailure(env, job.id, failureInput)).resolves.toBe(1);
+ await expect(recordRetryableFileReviewFailure(env, job.id, failureInput)).resolves.toBe(2);
+
+ await upsertFileReview(env, job.id, {
+ filePath: 'src/app.ts',
+ fileStatus: 'done',
+ modelUsed: 'gemma-4-31b-it',
+ modelProvider: 'google',
+ diffLineCount: 1,
+ diffInput: 'diff',
+ rawAiOutput: '{}',
+ parsedComments: [],
+ inputTokens: 1,
+ outputTokens: 1,
+ durationMs: 1,
+ verdict: 'approve',
+ fileSummary: 'ok',
+ errorMessage: null,
+ });
+
+ const reviews = await getFileReviewsForJobs(env, [job.id]);
+ expect(reviews).toHaveLength(1);
+ expect(reviews[0].file_status).toBe('done');
+ expect(reviews[0].transient_error_count).toBe(0);
+ });
+});
+
+describe('queue handler', () => {
+ it('retries invalid messages instead of acknowledging them', async () => {
+ const env = createTestEnv();
+ const message = {
+ body: { bad: true },
+ ack: vi.fn(),
+ retry: vi.fn(),
+ };
+
+ await worker.queue({ messages: [message] } as any, env, {} as ExecutionContext);
+
+ expect(message.retry).toHaveBeenCalledTimes(1);
+ expect(message.ack).not.toHaveBeenCalled();
+ });
+});
diff --git a/test/review-flow.spec.ts b/test/review-flow.spec.ts
index e022d3b..7da7c31 100644
--- a/test/review-flow.spec.ts
+++ b/test/review-flow.spec.ts
@@ -1,8 +1,10 @@
import { runReviewJob } from '@server/core/review';
-import { createTestEnv, generateMockDiff } from './helpers';
+import { createTestEnv, generateMockDiff, hasConfiguredTestDatabaseUrl } from './helpers';
import { vi } from 'vitest';
-import { findExistingJobForHead, getJobForProcessing, insertJob } from '@server/db/jobs';
+import { findExistingJobForHead, getJobForProcessing, insertJob, updateJobFileCount, updateJobStep } from '@server/db/jobs';
+import { getFileReviewsForJobs, upsertFileReview } from '@server/db/file-reviews';
import { defaultRepoConfig } from '@shared/schema';
+import { runWithDb } from '@server/db/client';
const sha = (char: string) => char.repeat(40);
@@ -65,18 +67,36 @@ vi.mock('@server/services/model', () => {
};
}
}
- return { ModelService: MockModelService };
+ return {
+ ModelService: MockModelService,
+ isRetryableModelError: (error: unknown) => Boolean(error && typeof error === 'object' && (error as any).retryable === true),
+ };
});
-describe('Review Flow Lifecycle', () => {
+const dbDescribe = hasConfiguredTestDatabaseUrl() ? describe : describe.skip;
+const REVIEW_FLOW_TIMEOUT_MS = 60_000;
+
+dbDescribe('Review Flow Lifecycle', () => {
const env = createTestEnv();
+ async function runAndDrain(message: Parameters[1]) {
+ await runWithDb(env, async () => {
+ (env.REVIEW_QUEUE as any).sent.length = 0;
+ await runReviewJob(env, message);
+ const queue = env.REVIEW_QUEUE as any;
+ while (queue.sent.length > 0) {
+ const next = queue.sent.shift();
+ await runReviewJob(env, next);
+ }
+ });
+ }
+
it('completes a full review from pending job to finished', async () => {
const repo = `test-repo-${Date.now()}-full`;
const headSha = sha('a');
const baseSha = sha('b');
- await runReviewJob(env, {
+ await runAndDrain({
deliveryId: 'delivery-123',
eventName: 'pull_request',
payload: {
@@ -102,7 +122,7 @@ describe('Review Flow Lifecycle', () => {
trigger: 'auto',
});
expect(finalJob?.status).toBe('done');
- });
+ }, REVIEW_FLOW_TIMEOUT_MS);
it('stops processing if the job is superseded mid-way', async () => {
const { GitHubService } = await import('@server/services/github');
@@ -131,7 +151,7 @@ describe('Review Flow Lifecycle', () => {
return generateMockDiff([{ path: 'test.ts', content: 'a' }]);
});
- await runReviewJob(env, {
+ await runAndDrain({
deliveryId: 'delivery-456',
eventName: 'pull_request',
payload: {
@@ -158,7 +178,7 @@ describe('Review Flow Lifecycle', () => {
});
expect(finalJob?.status).toBe('superseded');
expect(finalJob?.verdict).toBeNull();
- });
+ }, REVIEW_FLOW_TIMEOUT_MS);
it('processes a pre-created retry job from a queue message', async () => {
const repo = `test-repo-${Date.now()}-retry`;
@@ -197,14 +217,95 @@ describe('Review Flow Lifecycle', () => {
retryOfJobId: source.id,
});
- await runReviewJob(env, {
+ await runAndDrain({
jobId: retry.id,
deliveryId: 'delivery-retry',
});
const finalJob = await getJobForProcessing(env, retry.id);
expect(finalJob?.status).toBe('done');
- });
+ }, REVIEW_FLOW_TIMEOUT_MS);
+
+ it('does not inherit parent file reviews from models outside the current retry strategy', async () => {
+ const { ModelService } = await import('@server/services/model');
+ const reviewSpy = vi.spyOn(ModelService.prototype, 'reviewFile');
+ const repo = `test-repo-${Date.now()}-retry-model-filter`;
+ const sourceHeadSha = sha('8');
+ const retryHeadSha = sha('9');
+ const baseSha = sha('0');
+
+ const source = await insertJob(env, {
+ installationId: '123',
+ owner: 'test-owner',
+ repo,
+ prNumber: 6,
+ prTitle: 'Retry Model Filter',
+ prAuthor: 'author',
+ commitSha: sourceHeadSha,
+ baseSha,
+ trigger: 'auto',
+ headRef: 'feature',
+ baseRef: 'main',
+ configSnapshot: {
+ ...defaultRepoConfig,
+ model: {
+ main: 'gemma-4-31b-it',
+ fallbacks: ['gemma-4-26b-a4b-it', '@cf/zai-org/glm-4.7-flash'],
+ size_overrides: [],
+ },
+ },
+ });
+
+ await upsertFileReview(env, source.id, {
+ filePath: 'src/app.ts',
+ fileStatus: 'done',
+ modelUsed: '@cf/zai-org/glm-4.7-flash',
+ modelProvider: 'cloudflare',
+ diffLineCount: 1,
+ diffInput: 'old diff',
+ rawAiOutput: '{}',
+ parsedComments: [],
+ inputTokens: 1,
+ outputTokens: 1,
+ durationMs: 1,
+ verdict: 'approve',
+ fileSummary: 'old',
+ errorMessage: null,
+ });
+
+ const retry = await insertJob(env, {
+ installationId: '123',
+ owner: 'test-owner',
+ repo,
+ prNumber: 6,
+ prTitle: 'Retry Model Filter',
+ prAuthor: 'author',
+ commitSha: retryHeadSha,
+ baseSha,
+ trigger: 'retry',
+ headRef: 'feature',
+ baseRef: 'main',
+ configSnapshot: {
+ ...defaultRepoConfig,
+ model: {
+ main: 'gemma-4-31b-it',
+ fallbacks: ['gemma-4-26b-a4b-it'],
+ size_overrides: [],
+ },
+ },
+ retryOfJobId: source.id,
+ });
+
+ await runAndDrain({
+ jobId: retry.id,
+ deliveryId: 'delivery-retry-model-filter',
+ });
+
+ expect(reviewSpy).toHaveBeenCalled();
+ const reviews = await getFileReviewsForJobs(env, [retry.id]);
+ expect(reviews.find((review) => review.file_path === 'src/app.ts')?.model_used).toBe('test-model');
+ reviewSpy.mockRestore();
+ }, REVIEW_FLOW_TIMEOUT_MS);
it('resumes an existing queued duplicate job instead of stranding it', async () => {
const repo = `test-repo-${Date.now()}-duplicate`;
@@ -226,7 +327,7 @@ describe('Review Flow Lifecycle', () => {
configSnapshot: defaultRepoConfig,
});
- await runReviewJob(env, {
+ await runAndDrain({
deliveryId: 'delivery-duplicate',
eventName: 'pull_request',
payload: {
@@ -246,5 +347,133 @@ describe('Review Flow Lifecycle', () => {
const finalJob = await getJobForProcessing(env, existing.id);
expect(finalJob?.status).toBe('done');
- });
+ }, REVIEW_FLOW_TIMEOUT_MS);
+
+ it('schedules a delayed continuation instead of spending queue retries on transient model failures', async () => {
+ const { ModelService } = await import('@server/services/model');
+ const retryableError = Object.assign(new Error('Google API timed out after 45000ms'), { retryable: true });
+ const reviewSpy = vi.spyOn(ModelService.prototype, 'reviewFile').mockRejectedValue(retryableError);
+ const repo = `test-repo-${Date.now()}-transient`;
+ const headSha = sha('6');
+ const baseSha = sha('7');
+
+ const job = await insertJob(env, {
+ installationId: '123',
+ owner: 'test-owner',
+ repo,
+ prNumber: 5,
+ prTitle: 'Transient Test',
+ prAuthor: 'author',
+ commitSha: headSha,
+ baseSha,
+ trigger: 'auto',
+ headRef: 'feature',
+ baseRef: 'main',
+ configSnapshot: defaultRepoConfig,
+ });
+ await updateJobFileCount(env, job.id, 1);
+ await updateJobStep(env, job.id, 'Preparation', { status: 'done' });
+
+ await runWithDb(env, async () => {
+ (env.REVIEW_QUEUE as any).sent.length = 0;
+ const result = await runReviewJob(env, {
+ jobId: job.id,
+ deliveryId: 'delivery-transient',
+ phase: 'review',
+ });
+
+ expect(result).toEqual({ action: 'ack' });
+ expect(reviewSpy).toHaveBeenCalled();
+ expect((env.REVIEW_QUEUE as any).sent).toHaveLength(1);
+ expect((env.REVIEW_QUEUE as any).sent[0]).toMatchObject({
+ jobId: job.id,
+ phase: 'review',
+ options: { delaySeconds: 60 },
+ });
+ });
+
+ const finalJob = await getJobForProcessing(env, job.id);
+ expect(finalJob?.status).toBe('running');
+ expect(finalJob?.lease_owner).toBeNull();
+
+ reviewSpy.mockRestore();
+ }, REVIEW_FLOW_TIMEOUT_MS);
+
+ it('marks completed jobs with skipped files as partial reviews', async () => {
+ const { GitHubService } = await import('@server/services/github');
+ const repo = `test-repo-${Date.now()}-partial`;
+ const headSha = sha('e');
+ const baseSha = sha('f');
+ const getDiffSpy = vi.spyOn(GitHubService.prototype, 'getPullRequestDiff').mockResolvedValue(
+ generateMockDiff([
+ { path: 'src/app.ts', content: 'console.log(1);' },
+ { path: 'src/failed.ts', content: 'console.log(2);' },
+ ]),
+ );
+
+ const job = await insertJob(env, {
+ installationId: '123',
+ owner: 'test-owner',
+ repo,
+ prNumber: 7,
+ prTitle: 'Partial Test',
+ prAuthor: 'author',
+ commitSha: headSha,
+ baseSha,
+ trigger: 'auto',
+ headRef: 'feature',
+ baseRef: 'main',
+ configSnapshot: defaultRepoConfig,
+ });
+ await updateJobFileCount(env, job.id, 2);
+ await updateJobStep(env, job.id, 'Preparation', { status: 'done' });
+ await updateJobStep(env, job.id, 'Reviewing Files', { status: 'done' });
+ await upsertFileReview(env, job.id, {
+ filePath: 'src/app.ts',
+ fileStatus: 'done',
+ modelUsed: 'test-model',
+ modelProvider: 'test-provider',
+ diffLineCount: 1,
+ diffInput: 'diff',
+ rawAiOutput: '{}',
+ parsedComments: [],
+ inputTokens: 1,
+ outputTokens: 1,
+ durationMs: 1,
+ verdict: 'approve',
+ fileSummary: 'ok',
+ errorMessage: null,
+ });
+ await upsertFileReview(env, job.id, {
+ filePath: 'src/failed.ts',
+ fileStatus: 'failed',
+ modelUsed: 'gemma-4-31b-it',
+ modelProvider: 'google',
+ diffLineCount: 1,
+ diffInput: '',
+ rawAiOutput: null,
+ parsedComments: [],
+ inputTokens: null,
+ outputTokens: null,
+ durationMs: 1,
+ verdict: null,
+ fileSummary: null,
+ errorMessage: 'Review skipped after 3 repeated model provider outages.',
+ });
+
+ await runWithDb(env, async () => {
+ (env.REVIEW_QUEUE as any).sent.length = 0;
+ const result = await runReviewJob(env, {
+ jobId: job.id,
+ deliveryId: 'delivery-partial',
+ phase: 'finalize',
+ });
+ expect(result).toEqual({ action: 'ack' });
+ });
+
+ const finalJob = await getJobForProcessing(env, job.id);
+ expect(finalJob?.status).toBe('done');
+ expect(finalJob?.error_msg).toContain('Partial review: 1 of 2 files');
+ getDiffSpy.mockRestore();
+ }, REVIEW_FLOW_TIMEOUT_MS);
});
diff --git a/test/settings.spec.ts b/test/settings.spec.ts
new file mode 100644
index 0000000..fbbe084
--- /dev/null
+++ b/test/settings.spec.ts
@@ -0,0 +1,21 @@
+import { describe, expect, it } from 'vitest';
+import { normalizeGlobalConfig } from '@client/pages/settings';
+
+describe('settings model strategy', () => {
+ it('preserves an explicit empty global fallback list', () => {
+ const config = normalizeGlobalConfig({
+ main: 'gemma-4-31b-it',
+ fallbacks: [],
+ size_overrides: [
+ {
+ max_lines: 300,
+ model: 'gemma-4-31b-it',
+ fallbacks: [],
+ },
+ ],
+ });
+
+ expect(config.fallbacks).toEqual([]);
+ expect(config.size_overrides[0].fallbacks).toEqual([]);
+ });
+});
diff --git a/test/webhook-handling.spec.ts b/test/webhook-handling.spec.ts
index 86c86c9..66eb5ff 100644
--- a/test/webhook-handling.spec.ts
+++ b/test/webhook-handling.spec.ts
@@ -114,6 +114,7 @@ describe('Webhook Handling Suite', () => {
expect(queue.sent).toHaveLength(1);
expect(queue.sent[0].jobId).toBe(json.job.id);
expect(queue.sent[0].deliveryId).toBeDefined();
+ expect(queue.sent[0].phase).toBe('prepare');
expect(queue.sent[0].eventName).toBeUndefined();
expect(queue.sent[0].payload).toBeUndefined();
});