diff --git a/packages/core/src/session/context-epoch.ts b/packages/core/src/session/context-epoch.ts index 1fb8df92e6e1..2c9a1ebeaf02 100644 --- a/packages/core/src/session/context-epoch.ts +++ b/packages/core/src/session/context-epoch.ts @@ -18,7 +18,7 @@ type DatabaseService = Database.Interface["db"] class RevisionMismatch extends Error {} class LocationMismatch extends Error {} -export class AgentMismatch extends Error {} +export class AgentMismatch extends Schema.TaggedErrorClass()("SessionContextEpoch.AgentMismatch", {}) {} export class AgentReplacementBlocked extends Schema.TaggedErrorClass()( "SessionContextEpoch.AgentReplacementBlocked", { sessionID: SessionSchema.ID, previous: AgentV2.ID, current: AgentV2.ID }, @@ -45,7 +45,7 @@ export function initialize( sessionID: SessionSchema.ID, location: Location.Ref, agent: AgentV2.ID, -): Effect.Effect { +): Effect.Effect { return retryRevisionMismatch(() => initializeOnce(db, context, sessionID, location, agent)).pipe( Effect.withSpan("SessionContextEpoch.initialize"), ) @@ -58,7 +58,10 @@ export function prepare( sessionID: SessionSchema.ID, location: Location.Ref, agent: AgentV2.ID, -): Effect.Effect { +): Effect.Effect< + Prepared, + SystemContext.InitializationBlocked | ContextSnapshotDecodeError | AgentMismatch | AgentReplacementBlocked +> { return retryRevisionMismatch(() => prepareOnce(db, events, context, sessionID, location, agent)).pipe( Effect.withSpan("SessionContextEpoch.prepare"), ) @@ -153,7 +156,7 @@ const requireAgentSelection = Effect.fnUntraced(function* ( .where(eq(SessionTable.id, sessionID)) .get() .pipe(Effect.orDie) - if (!selected || (selected.agent !== null && selected.agent !== agent)) return yield* Effect.die(new AgentMismatch()) + if (!selected || (selected.agent !== null && selected.agent !== agent)) return yield* new AgentMismatch({}) }) export const requestReplacement = Effect.fn("SessionContextEpoch.requestReplacement")(function* ( @@ -212,7 +215,7 @@ const insert = Effect.fnUntraced(function* ( .get() .pipe(Effect.orDie) if (!placed) return yield* Effect.die(new LocationMismatch()) - if (placed.agent !== null && placed.agent !== agent) return yield* Effect.die(new AgentMismatch()) + if (placed.agent !== null && placed.agent !== agent) return yield* new AgentMismatch({}) const baselineSeq = yield* SessionInput.latestSeq(db, sessionID) yield* db .insert(SessionContextEpochTable) @@ -235,7 +238,7 @@ const insert = Effect.fnUntraced(function* ( }), { behavior: "immediate" }, ) - .pipe(Effect.orDie) + .pipe(Effect.catch((error) => (error instanceof AgentMismatch ? Effect.fail(error) : Effect.die(error)))) }) const replace = Effect.fnUntraced(function* ( @@ -274,7 +277,7 @@ const replace = Effect.fnUntraced(function* ( }), { behavior: "immediate" }, ) - .pipe(Effect.orDie) + .pipe(Effect.catch((error) => (error instanceof AgentMismatch ? Effect.fail(error) : Effect.die(error)))) }) const fence = Effect.fnUntraced(function* ( @@ -290,8 +293,7 @@ const fence = Effect.fnUntraced(function* ( .where(eq(SessionContextEpochTable.session_id, sessionID)) .get() .pipe(Effect.orDie) - if (!current || (current.selected !== null && current.selected !== agent)) - return yield* Effect.die(new AgentMismatch()) + if (!current || (current.selected !== null && current.selected !== agent)) return yield* new AgentMismatch({}) if (current.revision !== expectedRevision) return yield* Effect.die(new RevisionMismatch()) }) diff --git a/packages/core/src/session/runner/llm.ts b/packages/core/src/session/runner/llm.ts index 5d84e985a620..8eee29459432 100644 --- a/packages/core/src/session/runner/llm.ts +++ b/packages/core/src/session/runner/llm.ts @@ -8,7 +8,7 @@ import { isContextOverflowFailure, type ProviderErrorEvent, } from "@opencode-ai/llm" -import { Cause, DateTime, Effect, FiberSet, Layer, Option, Schema, Semaphore, Stream } from "effect" +import { Cause, Data, DateTime, Effect, FiberSet, Layer, Option, Schema, Semaphore, Stream } from "effect" import { AgentV2 } from "../../agent" import { Config } from "../../config" import { Database } from "../../database/database" @@ -36,6 +36,18 @@ import { createLLMEventPublisher } from "./publish-llm-event" import { toLLMMessages } from "./to-llm-message" import { MAX_STEPS_PROMPT } from "./max-steps" +type TurnState = Data.TaggedEnum<{ + OverflowAvailable: { readonly promotion: SessionInput.Delivery | undefined } + OverflowExhausted: {} +}> +const TurnState = Data.taggedEnum() + +type TurnResult = Data.TaggedEnum<{ + Retry: { readonly state: TurnState } + Complete: { readonly needsContinuation: boolean } +}> +const TurnResult = Data.taggedEnum() + /** * Runs one durable coding-agent Session until it settles. * @@ -140,29 +152,12 @@ export const layer = Layer.effect( const isQuestionRejected = (cause: Cause.Cause) => cause.reasons.some((reason) => Cause.isDieReason(reason) && reason.defect instanceof QuestionV2.RejectedError) - type TurnTransition = - // Request preparation observed a concurrent Session change and must restart from durable state. - | { readonly _tag: "RebuildPreparedTurn"; readonly promotion?: SessionInput.Delivery } - // Overflow compaction completed; rebuild once through the path without overflow recovery. - | { readonly _tag: "ContinueAfterOverflowCompaction" } - - class TurnTransitionError extends Error { - constructor(readonly transition: TurnTransition) { - super() - } - } - - const rebuildPreparedTurn = (promotion?: SessionInput.Delivery) => - new TurnTransitionError({ _tag: "RebuildPreparedTurn", promotion }) - const continueAfterOverflowCompaction = new TurnTransitionError({ - _tag: "ContinueAfterOverflowCompaction", - }) - - const retryAgentMismatch = (promotion: SessionInput.Delivery | undefined) => - Effect.catchDefect((defect) => - defect instanceof SessionContextEpoch.AgentMismatch - ? Effect.die(rebuildPreparedTurn(promotion)) - : Effect.die(defect), + const optionOnAgentMismatch = ( + effect: Effect.Effect, + ): Effect.Effect, E, R> => + effect.pipe( + Effect.asSome, + Effect.catchTag("SessionContextEpoch.AgentMismatch", () => Effect.succeedNone), ) const sameModel = Schema.toEquivalence(Schema.UndefinedOr(ModelV2.Ref)) @@ -171,23 +166,27 @@ export const layer = Layer.effect( concurrency: "unbounded", }).pipe(Effect.map(SystemContext.combine)) - const runTurnAttempt = Effect.fn("SessionRunner.runTurn")(function* ( + type RunTurnAttempt = ( sessionID: SessionSchema.ID, - promotion: SessionInput.Delivery | undefined, + state: TurnState, + step: number, + ) => Effect.Effect + + const runTurnAttempt: RunTurnAttempt = Effect.fn("SessionRunner.runTurn")(function* ( + sessionID: SessionSchema.ID, + state: TurnState, step: number, - recoverOverflow?: typeof compaction.compactAfterOverflow, ) { + const promotion = state._tag === "OverflowAvailable" ? state.promotion : undefined const session = yield* getSession(sessionID) if (session.location.directory !== location.directory || session.location.workspaceID !== location.workspaceID) return yield* Effect.interrupt const agent = yield* agents.select(session.agent) - const initialized = yield* SessionContextEpoch.initialize( - db, - loadSystemContext(agent), - session.id, - session.location, - agent.id, - ).pipe(retryAgentMismatch(promotion)) + const initialization = yield* optionOnAgentMismatch( + SessionContextEpoch.initialize(db, loadSystemContext(agent), session.id, session.location, agent.id), + ) + if (Option.isNone(initialization)) return TurnResult.Retry({ state }) + const initialized = initialization.value const toolFibers = yield* FiberSet.make() let needsContinuation = false if (promotion) { @@ -198,19 +197,19 @@ export const layer = Layer.effect( yield* SessionInput.promoteSteers(db, events, session.id, cutoff) } } - const system = - initialized ?? - (yield* SessionContextEpoch.prepare( - db, - events, - loadSystemContext(agent), - session.id, - session.location, - agent.id, - ).pipe(retryAgentMismatch(undefined))) + const preparation = + initialized === undefined + ? yield* optionOnAgentMismatch( + SessionContextEpoch.prepare(db, events, loadSystemContext(agent), session.id, session.location, agent.id), + ) + : Option.some(initialized) + const nextState = + state._tag === "OverflowAvailable" ? TurnState.OverflowAvailable({ promotion: undefined }) : state + if (Option.isNone(preparation)) return TurnResult.Retry({ state: nextState }) + const system = preparation.value const current = yield* getSession(sessionID) if ((yield* agents.select(current.agent)).id !== agent.id || !sameModel(current.model, session.model)) - return yield* Effect.die(rebuildPreparedTurn()) + return TurnResult.Retry({ state: nextState }) const model = yield* models.resolve(session) const entries = yield* SessionHistory.entriesForRunner(db, session.id, system.baselineSeq) const context = entries.map((entry) => entry.message) @@ -228,7 +227,7 @@ export const layer = Layer.effect( toolChoice: isLastStep ? "none" : undefined, }) if (yield* compaction.compactIfNeeded({ sessionID: session.id, entries, model, request })) - return yield* Effect.die(rebuildPreparedTurn()) + return TurnResult.Retry({ state: nextState }) const publisher = createLLMEventPublisher(events, { sessionID: session.id, agent: agent.id, @@ -243,7 +242,7 @@ export const layer = Layer.effect( withPublication(publisher.publish(event, outputPaths)) let overflowFailure: ProviderErrorEvent | undefined if (!(yield* SessionContextEpoch.current(db, session.id, agent.id, system.revision))) - return yield* Effect.die(rebuildPreparedTurn()) + return TurnResult.Retry({ state: nextState }) const providerStream = llm.stream(request).pipe( Stream.runForEach((event) => Effect.gen(function* () { @@ -295,12 +294,12 @@ export const layer = Layer.effect( const failure = stream._tag === "Failure" ? Option.getOrUndefined(Cause.findErrorOption(stream.cause)) : undefined if ( - recoverOverflow && + state._tag === "OverflowAvailable" && !publisher.hasAssistantStarted() && isContextOverflowFailure(overflowFailure ?? failure) && - (yield* restore(recoverOverflow({ sessionID: session.id, entries, model, request }))) + (yield* restore(compaction.compactAfterOverflow({ sessionID: session.id, entries, model, request }))) ) - return yield* Effect.die(continueAfterOverflowCompaction) + return TurnResult.Retry({ state: TurnState.OverflowExhausted() }) if (overflowFailure) yield* publish(overflowFailure) const llmFailure = failure instanceof LLMError ? failure : undefined if (llmFailure && !publisher.hasProviderError()) { @@ -334,7 +333,7 @@ export const layer = Layer.effect( yield* withPublication(publisher.failUnsettledTools("Provider did not return a tool result", true)) if (stream._tag === "Failure") return yield* Effect.failCause(stream.cause) if (settled._tag === "Failure") return yield* Effect.failCause(settled.cause) - return !publisher.hasProviderError() && needsContinuation + return TurnResult.Complete({ needsContinuation: !publisher.hasProviderError() && needsContinuation }) }), ) }, Effect.scoped) @@ -344,32 +343,21 @@ export const layer = Layer.effect( step: number, ) => Effect.Effect - const runAfterOverflowCompaction: RunTurn = Effect.fnUntraced(function* (sessionID, promotion, step) { - return yield* runTurnAttempt(sessionID, promotion, step).pipe( - Effect.catchDefect( - Effect.fnUntraced(function* (defect) { - if (!(defect instanceof TurnTransitionError)) return yield* Effect.die(defect) - if (defect.transition._tag === "ContinueAfterOverflowCompaction") - return yield* Effect.die("Post-compaction provider attempt cannot recover another overflow") - yield* Effect.yieldNow - return yield* runAfterOverflowCompaction(sessionID, defect.transition.promotion, step) - }), - ), - ) - }) - const runTurn: RunTurn = Effect.fnUntraced(function* (sessionID, promotion, step) { - return yield* runTurnAttempt(sessionID, promotion, step, compaction.compactAfterOverflow).pipe( - Effect.catchDefect( - Effect.fnUntraced(function* (defect) { - if (!(defect instanceof TurnTransitionError)) return yield* Effect.die(defect) + let state: TurnState = TurnState.OverflowAvailable({ promotion }) + while (true) { + const result: TurnResult = yield* runTurnAttempt(sessionID, state, step) + switch (result._tag) { + case "Complete": + return result.needsContinuation + case "Retry": + state = result.state yield* Effect.yieldNow - if (defect.transition._tag === "ContinueAfterOverflowCompaction") - return yield* runAfterOverflowCompaction(sessionID, undefined, step) - return yield* runTurn(sessionID, defect.transition.promotion, step) - }), - ), - ) + break + default: + return result satisfies never + } + } }) const run = Effect.fn("SessionRunner.run")(function* (input: { diff --git a/packages/core/test/session-runner.test.ts b/packages/core/test/session-runner.test.ts index 862bb56d33d6..28915b74411d 100644 --- a/packages/core/test/session-runner.test.ts +++ b/packages/core/test/session-runner.test.ts @@ -1138,7 +1138,7 @@ describe("SessionRunnerLLM", () => { sessionID, location, AgentV2.defaultID, - ).pipe(Effect.catchDefect(Effect.succeed)), + ).pipe(Effect.flip), ).toBeInstanceOf(SessionContextEpoch.AgentMismatch) expect(