Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions packages/core/src/session/context-epoch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type DatabaseService = Database.Interface["db"]

class RevisionMismatch extends Error {}
class LocationMismatch extends Error {}
export class AgentMismatch extends Error {}
export class AgentMismatch extends Schema.TaggedErrorClass<AgentMismatch>()("SessionContextEpoch.AgentMismatch", {}) {}
export class AgentReplacementBlocked extends Schema.TaggedErrorClass<AgentReplacementBlocked>()(
"SessionContextEpoch.AgentReplacementBlocked",
{ sessionID: SessionSchema.ID, previous: AgentV2.ID, current: AgentV2.ID },
Expand All @@ -45,7 +45,7 @@ export function initialize(
sessionID: SessionSchema.ID,
location: Location.Ref,
agent: AgentV2.ID,
): Effect.Effect<Prepared | undefined, SystemContext.InitializationBlocked> {
): Effect.Effect<Prepared | undefined, SystemContext.InitializationBlocked | AgentMismatch> {
return retryRevisionMismatch(() => initializeOnce(db, context, sessionID, location, agent)).pipe(
Effect.withSpan("SessionContextEpoch.initialize"),
)
Expand All @@ -58,7 +58,10 @@ export function prepare(
sessionID: SessionSchema.ID,
location: Location.Ref,
agent: AgentV2.ID,
): Effect.Effect<Prepared, SystemContext.InitializationBlocked | ContextSnapshotDecodeError | AgentReplacementBlocked> {
): Effect.Effect<
Prepared,
SystemContext.InitializationBlocked | ContextSnapshotDecodeError | AgentMismatch | AgentReplacementBlocked
> {
return retryRevisionMismatch(() => prepareOnce(db, events, context, sessionID, location, agent)).pipe(
Effect.withSpan("SessionContextEpoch.prepare"),
)
Expand Down Expand Up @@ -153,7 +156,7 @@ const requireAgentSelection = Effect.fnUntraced(function* (
.where(eq(SessionTable.id, sessionID))
.get()
.pipe(Effect.orDie)
if (!selected || (selected.agent !== null && selected.agent !== agent)) return yield* Effect.die(new AgentMismatch())
if (!selected || (selected.agent !== null && selected.agent !== agent)) return yield* new AgentMismatch({})
})

export const requestReplacement = Effect.fn("SessionContextEpoch.requestReplacement")(function* (
Expand Down Expand Up @@ -212,7 +215,7 @@ const insert = Effect.fnUntraced(function* (
.get()
.pipe(Effect.orDie)
if (!placed) return yield* Effect.die(new LocationMismatch())
if (placed.agent !== null && placed.agent !== agent) return yield* Effect.die(new AgentMismatch())
if (placed.agent !== null && placed.agent !== agent) return yield* new AgentMismatch({})
const baselineSeq = yield* SessionInput.latestSeq(db, sessionID)
yield* db
.insert(SessionContextEpochTable)
Expand All @@ -235,7 +238,7 @@ const insert = Effect.fnUntraced(function* (
}),
{ behavior: "immediate" },
)
.pipe(Effect.orDie)
.pipe(Effect.catch((error) => (error instanceof AgentMismatch ? Effect.fail(error) : Effect.die(error))))
})

const replace = Effect.fnUntraced(function* (
Expand Down Expand Up @@ -274,7 +277,7 @@ const replace = Effect.fnUntraced(function* (
}),
{ behavior: "immediate" },
)
.pipe(Effect.orDie)
.pipe(Effect.catch((error) => (error instanceof AgentMismatch ? Effect.fail(error) : Effect.die(error))))
})

const fence = Effect.fnUntraced(function* (
Expand All @@ -290,8 +293,7 @@ const fence = Effect.fnUntraced(function* (
.where(eq(SessionContextEpochTable.session_id, sessionID))
.get()
.pipe(Effect.orDie)
if (!current || (current.selected !== null && current.selected !== agent))
return yield* Effect.die(new AgentMismatch())
if (!current || (current.selected !== null && current.selected !== agent)) return yield* new AgentMismatch({})
if (current.revision !== expectedRevision) return yield* Effect.die(new RevisionMismatch())
})

Expand Down
138 changes: 63 additions & 75 deletions packages/core/src/session/runner/llm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
isContextOverflowFailure,
type ProviderErrorEvent,
} from "@opencode-ai/llm"
import { Cause, DateTime, Effect, FiberSet, Layer, Option, Schema, Semaphore, Stream } from "effect"
import { Cause, Data, DateTime, Effect, FiberSet, Layer, Option, Schema, Semaphore, Stream } from "effect"
import { AgentV2 } from "../../agent"
import { Config } from "../../config"
import { Database } from "../../database/database"
Expand Down Expand Up @@ -36,6 +36,18 @@ import { createLLMEventPublisher } from "./publish-llm-event"
import { toLLMMessages } from "./to-llm-message"
import { MAX_STEPS_PROMPT } from "./max-steps"

type TurnState = Data.TaggedEnum<{
OverflowAvailable: { readonly promotion: SessionInput.Delivery | undefined }
OverflowExhausted: {}
}>
const TurnState = Data.taggedEnum<TurnState>()

type TurnResult = Data.TaggedEnum<{
Retry: { readonly state: TurnState }
Complete: { readonly needsContinuation: boolean }
}>
const TurnResult = Data.taggedEnum<TurnResult>()

/**
* Runs one durable coding-agent Session until it settles.
*
Expand Down Expand Up @@ -140,29 +152,12 @@ export const layer = Layer.effect(
const isQuestionRejected = (cause: Cause.Cause<unknown>) =>
cause.reasons.some((reason) => Cause.isDieReason(reason) && reason.defect instanceof QuestionV2.RejectedError)

type TurnTransition =
// Request preparation observed a concurrent Session change and must restart from durable state.
| { readonly _tag: "RebuildPreparedTurn"; readonly promotion?: SessionInput.Delivery }
// Overflow compaction completed; rebuild once through the path without overflow recovery.
| { readonly _tag: "ContinueAfterOverflowCompaction" }

class TurnTransitionError extends Error {
constructor(readonly transition: TurnTransition) {
super()
}
}

const rebuildPreparedTurn = (promotion?: SessionInput.Delivery) =>
new TurnTransitionError({ _tag: "RebuildPreparedTurn", promotion })
const continueAfterOverflowCompaction = new TurnTransitionError({
_tag: "ContinueAfterOverflowCompaction",
})

const retryAgentMismatch = (promotion: SessionInput.Delivery | undefined) =>
Effect.catchDefect((defect) =>
defect instanceof SessionContextEpoch.AgentMismatch
? Effect.die(rebuildPreparedTurn(promotion))
: Effect.die(defect),
const optionOnAgentMismatch = <A, E, R>(
effect: Effect.Effect<A, E | SessionContextEpoch.AgentMismatch, R>,
): Effect.Effect<Option.Option<A>, E, R> =>
effect.pipe(
Effect.asSome,
Effect.catchTag("SessionContextEpoch.AgentMismatch", () => Effect.succeedNone),
)

const sameModel = Schema.toEquivalence(Schema.UndefinedOr(ModelV2.Ref))
Expand All @@ -171,23 +166,27 @@ export const layer = Layer.effect(
concurrency: "unbounded",
}).pipe(Effect.map(SystemContext.combine))

const runTurnAttempt = Effect.fn("SessionRunner.runTurn")(function* (
type RunTurnAttempt = (
sessionID: SessionSchema.ID,
promotion: SessionInput.Delivery | undefined,
state: TurnState,
step: number,
) => Effect.Effect<TurnResult, RunError>

const runTurnAttempt: RunTurnAttempt = Effect.fn("SessionRunner.runTurn")(function* (
sessionID: SessionSchema.ID,
state: TurnState,
step: number,
recoverOverflow?: typeof compaction.compactAfterOverflow,
) {
const promotion = state._tag === "OverflowAvailable" ? state.promotion : undefined
const session = yield* getSession(sessionID)
if (session.location.directory !== location.directory || session.location.workspaceID !== location.workspaceID)
return yield* Effect.interrupt
const agent = yield* agents.select(session.agent)
const initialized = yield* SessionContextEpoch.initialize(
db,
loadSystemContext(agent),
session.id,
session.location,
agent.id,
).pipe(retryAgentMismatch(promotion))
const initialization = yield* optionOnAgentMismatch(
SessionContextEpoch.initialize(db, loadSystemContext(agent), session.id, session.location, agent.id),
)
if (Option.isNone(initialization)) return TurnResult.Retry({ state })
const initialized = initialization.value
const toolFibers = yield* FiberSet.make<void, ToolOutputStore.Error>()
let needsContinuation = false
if (promotion) {
Expand All @@ -198,19 +197,19 @@ export const layer = Layer.effect(
yield* SessionInput.promoteSteers(db, events, session.id, cutoff)
}
}
const system =
initialized ??
(yield* SessionContextEpoch.prepare(
db,
events,
loadSystemContext(agent),
session.id,
session.location,
agent.id,
).pipe(retryAgentMismatch(undefined)))
const preparation =
initialized === undefined
? yield* optionOnAgentMismatch(
SessionContextEpoch.prepare(db, events, loadSystemContext(agent), session.id, session.location, agent.id),
)
: Option.some(initialized)
const nextState =
state._tag === "OverflowAvailable" ? TurnState.OverflowAvailable({ promotion: undefined }) : state
if (Option.isNone(preparation)) return TurnResult.Retry({ state: nextState })
const system = preparation.value
const current = yield* getSession(sessionID)
if ((yield* agents.select(current.agent)).id !== agent.id || !sameModel(current.model, session.model))
return yield* Effect.die(rebuildPreparedTurn())
return TurnResult.Retry({ state: nextState })
const model = yield* models.resolve(session)
const entries = yield* SessionHistory.entriesForRunner(db, session.id, system.baselineSeq)
const context = entries.map((entry) => entry.message)
Expand All @@ -228,7 +227,7 @@ export const layer = Layer.effect(
toolChoice: isLastStep ? "none" : undefined,
})
if (yield* compaction.compactIfNeeded({ sessionID: session.id, entries, model, request }))
return yield* Effect.die(rebuildPreparedTurn())
return TurnResult.Retry({ state: nextState })
const publisher = createLLMEventPublisher(events, {
sessionID: session.id,
agent: agent.id,
Expand All @@ -243,7 +242,7 @@ export const layer = Layer.effect(
withPublication(publisher.publish(event, outputPaths))
let overflowFailure: ProviderErrorEvent | undefined
if (!(yield* SessionContextEpoch.current(db, session.id, agent.id, system.revision)))
return yield* Effect.die(rebuildPreparedTurn())
return TurnResult.Retry({ state: nextState })
const providerStream = llm.stream(request).pipe(
Stream.runForEach((event) =>
Effect.gen(function* () {
Expand Down Expand Up @@ -295,12 +294,12 @@ export const layer = Layer.effect(
const failure =
stream._tag === "Failure" ? Option.getOrUndefined(Cause.findErrorOption(stream.cause)) : undefined
if (
recoverOverflow &&
state._tag === "OverflowAvailable" &&
!publisher.hasAssistantStarted() &&
isContextOverflowFailure(overflowFailure ?? failure) &&
(yield* restore(recoverOverflow({ sessionID: session.id, entries, model, request })))
(yield* restore(compaction.compactAfterOverflow({ sessionID: session.id, entries, model, request })))
)
return yield* Effect.die(continueAfterOverflowCompaction)
return TurnResult.Retry({ state: TurnState.OverflowExhausted() })
if (overflowFailure) yield* publish(overflowFailure)
const llmFailure = failure instanceof LLMError ? failure : undefined
if (llmFailure && !publisher.hasProviderError()) {
Expand Down Expand Up @@ -334,7 +333,7 @@ export const layer = Layer.effect(
yield* withPublication(publisher.failUnsettledTools("Provider did not return a tool result", true))
if (stream._tag === "Failure") return yield* Effect.failCause(stream.cause)
if (settled._tag === "Failure") return yield* Effect.failCause(settled.cause)
return !publisher.hasProviderError() && needsContinuation
return TurnResult.Complete({ needsContinuation: !publisher.hasProviderError() && needsContinuation })
}),
)
}, Effect.scoped)
Expand All @@ -344,32 +343,21 @@ export const layer = Layer.effect(
step: number,
) => Effect.Effect<boolean, RunError>

const runAfterOverflowCompaction: RunTurn = Effect.fnUntraced(function* (sessionID, promotion, step) {
return yield* runTurnAttempt(sessionID, promotion, step).pipe(
Effect.catchDefect(
Effect.fnUntraced(function* (defect) {
if (!(defect instanceof TurnTransitionError)) return yield* Effect.die(defect)
if (defect.transition._tag === "ContinueAfterOverflowCompaction")
return yield* Effect.die("Post-compaction provider attempt cannot recover another overflow")
yield* Effect.yieldNow
return yield* runAfterOverflowCompaction(sessionID, defect.transition.promotion, step)
}),
),
)
})

const runTurn: RunTurn = Effect.fnUntraced(function* (sessionID, promotion, step) {
return yield* runTurnAttempt(sessionID, promotion, step, compaction.compactAfterOverflow).pipe(
Effect.catchDefect(
Effect.fnUntraced(function* (defect) {
if (!(defect instanceof TurnTransitionError)) return yield* Effect.die(defect)
let state: TurnState = TurnState.OverflowAvailable({ promotion })
while (true) {
const result: TurnResult = yield* runTurnAttempt(sessionID, state, step)
switch (result._tag) {
case "Complete":
return result.needsContinuation
case "Retry":
state = result.state
yield* Effect.yieldNow
if (defect.transition._tag === "ContinueAfterOverflowCompaction")
return yield* runAfterOverflowCompaction(sessionID, undefined, step)
return yield* runTurn(sessionID, defect.transition.promotion, step)
}),
),
)
break
default:
return result satisfies never
}
}
})

const run = Effect.fn("SessionRunner.run")(function* (input: {
Expand Down
2 changes: 1 addition & 1 deletion packages/core/test/session-runner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1138,7 +1138,7 @@ describe("SessionRunnerLLM", () => {
sessionID,
location,
AgentV2.defaultID,
).pipe(Effect.catchDefect(Effect.succeed)),
).pipe(Effect.flip),
).toBeInstanceOf(SessionContextEpoch.AgentMismatch)

expect(
Expand Down
Loading