Skip to content

Commit f48f24e

Browse files
authored
refactor(core): simplify session input promotion (anomalyco#33443)
1 parent 34b3d59 commit f48f24e

21 files changed

Lines changed: 160 additions & 493 deletions

packages/core/src/database/migration.gen.ts

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
import { Effect } from "effect"
2+
import type { DatabaseMigration } from "../migration"
3+
4+
export default {
5+
id: "20260622202450_simplify_session_input",
6+
up(tx) {
7+
return Effect.gen(function* () {
8+
yield* tx.run(`DELETE FROM \`session_context_epoch\`;`)
9+
yield* tx.run(`DELETE FROM \`session_input\`;`)
10+
yield* tx.run(`DELETE FROM \`session_message\`;`)
11+
yield* tx.run(`DELETE FROM \`event\`;`)
12+
yield* tx.run(`DELETE FROM \`event_sequence\`;`)
13+
yield* tx.run(`UPDATE \`session\` SET \`workspace_id\` = NULL WHERE \`workspace_id\` IS NOT NULL;`)
14+
yield* tx.run(`DELETE FROM \`workspace\`;`)
15+
})
16+
},
17+
} satisfies DatabaseMigration.Migration

packages/core/src/event.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,19 @@ export type Payload<D extends Definition = Definition> = {
4646
export type Subscriber<D extends Definition = Definition> = (event: Payload<D>) => Effect.Effect<void>
4747
export type Unsubscribe = Effect.Effect<void>
4848

49+
export const latestSequence = Effect.fn("EventV2.latestSequence")(function* (
50+
db: Database.Interface["db"],
51+
aggregateID: string,
52+
) {
53+
const row = yield* db
54+
.select({ seq: EventSequenceTable.seq })
55+
.from(EventSequenceTable)
56+
.where(eq(EventSequenceTable.aggregate_id, aggregateID))
57+
.get()
58+
.pipe(Effect.orDie)
59+
return row?.seq ?? -1
60+
})
61+
4962
export type SerializedEvent = {
5063
readonly id: ID
5164
readonly type: string

packages/core/src/session/context-epoch.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ const prepareOnce = Effect.fnUntraced(function* (
6464
return { baseline: stored.baseline, baselineSeq: stored.baseline_seq }
6565
}
6666
if (result._tag === "ReplacementReady") {
67-
const baselineSeq = replacementSeq ?? (yield* SessionInput.latestSeq(db, sessionID))
67+
const baselineSeq = replacementSeq ?? (yield* EventV2.latestSequence(db, sessionID))
6868
yield* replace(db, sessionID, baselineSeq, result.generation)
6969
return { baseline: result.generation.baseline, baselineSeq }
7070
}
@@ -124,7 +124,7 @@ const insert = Effect.fnUntraced(function* (
124124
sessionID: SessionSchema.ID,
125125
generation: SystemContext.Generation,
126126
) {
127-
const baselineSeq = yield* SessionInput.latestSeq(db, sessionID)
127+
const baselineSeq = yield* EventV2.latestSequence(db, sessionID)
128128
yield* db
129129
.insert(SessionContextEpochTable)
130130
.values({

packages/core/src/session/event.ts

Lines changed: 14 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,12 @@ const Base = {
2525
timestamp: V2Schema.DateTimeUtcFromMillis,
2626
sessionID: SessionSchema.ID,
2727
}
28+
const PromptFields = {
29+
...Base,
30+
messageID: SessionMessageID.ID,
31+
prompt: Prompt,
32+
delivery: Schema.Literals(["steer", "queue"]),
33+
}
2834

2935
const options = {
3036
durable: {
@@ -83,40 +89,16 @@ export type Moved = typeof Moved.Type
8389
export const Prompted = EventV2.define({
8490
type: "session.next.prompted",
8591
...options,
86-
schema: {
87-
...Base,
88-
messageID: SessionMessageID.ID,
89-
prompt: Prompt,
90-
delivery: Schema.Literals(["steer", "queue"]),
91-
},
92+
schema: PromptFields,
9293
})
9394
export type Prompted = typeof Prompted.Type
9495

95-
export namespace PromptLifecycle {
96-
export const Admitted = EventV2.define({
97-
type: "session.next.prompt.admitted",
98-
...options,
99-
schema: {
100-
...Base,
101-
messageID: SessionMessageID.ID,
102-
prompt: Prompt,
103-
delivery: Schema.Literals(["steer", "queue"]),
104-
},
105-
})
106-
export type Admitted = typeof Admitted.Type
107-
108-
export const Promoted = EventV2.define({
109-
type: "session.next.prompt.promoted",
110-
...options,
111-
schema: {
112-
...Base,
113-
messageID: SessionMessageID.ID,
114-
prompt: Prompt,
115-
timeCreated: V2Schema.DateTimeUtcFromMillis,
116-
},
117-
})
118-
export type Promoted = typeof Promoted.Type
119-
}
96+
export const PromptAdmitted = EventV2.define({
97+
type: "session.next.prompt.admitted",
98+
...options,
99+
schema: PromptFields,
100+
})
101+
export type PromptAdmitted = typeof PromptAdmitted.Type
120102

121103
export const ContextUpdated = EventV2.define({
122104
type: "session.next.context.updated",
@@ -455,8 +437,7 @@ const DurableDefinitions = [
455437
ModelSwitched,
456438
Moved,
457439
Prompted,
458-
PromptLifecycle.Admitted,
459-
PromptLifecycle.Promoted,
440+
PromptAdmitted,
460441
ContextUpdated,
461442
Synthetic,
462443
Shell.Started,

packages/core/src/session/input.ts

Lines changed: 48 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import { and, asc, eq, isNull, lte } from "drizzle-orm"
44
import { DateTime, Effect, Schema } from "effect"
55
import type { Database } from "../database/database"
66
import type { EventV2 } from "../event"
7-
import { EventSequenceTable } from "../event/sql"
87
import { NonNegativeInt } from "../schema"
98
import { V2Schema } from "../v2-schema"
109
import { SessionEvent } from "./event"
@@ -65,7 +64,7 @@ export const admit = Effect.fn("SessionInput.admit")(function* (
6564
if (existing !== undefined) return existing
6665
const timestamp = yield* DateTime.now
6766
return yield* events
68-
.publish(SessionEvent.PromptLifecycle.Admitted, {
67+
.publish(SessionEvent.PromptAdmitted, {
6968
messageID: input.id,
7069
sessionID: input.sessionID,
7170
timestamp,
@@ -93,19 +92,6 @@ export const admit = Effect.fn("SessionInput.admit")(function* (
9392
)
9493
})
9594

96-
export const latestSeq = Effect.fn("SessionInput.latestSeq")(function* (
97-
db: DatabaseService,
98-
sessionID: SessionSchema.ID,
99-
) {
100-
const row = yield* db
101-
.select({ seq: EventSequenceTable.seq })
102-
.from(EventSequenceTable)
103-
.where(eq(EventSequenceTable.aggregate_id, sessionID))
104-
.get()
105-
.pipe(Effect.orDie)
106-
return row?.seq ?? -1
107-
})
108-
10995
export const projectAdmitted = Effect.fn("SessionInput.projectAdmitted")(function* (
11096
db: DatabaseService,
11197
input: {
@@ -117,6 +103,13 @@ export const projectAdmitted = Effect.fn("SessionInput.projectAdmitted")(functio
117103
readonly timeCreated: DateTime.Utc
118104
},
119105
) {
106+
const message = yield* db
107+
.select({ id: SessionMessageTable.id })
108+
.from(SessionMessageTable)
109+
.where(eq(SessionMessageTable.id, input.id))
110+
.get()
111+
.pipe(Effect.orDie)
112+
if (message !== undefined) return yield* Effect.die(new LifecycleConflict({ id: input.id }))
120113
const stored = yield* db
121114
.insert(SessionInputTable)
122115
.values({
@@ -134,12 +127,13 @@ export const projectAdmitted = Effect.fn("SessionInput.projectAdmitted")(functio
134127
if (!stored) return yield* Effect.die(new LifecycleConflict({ id: input.id }))
135128
})
136129

137-
export const projectPromoted = Effect.fn("SessionInput.projectPromoted")(function* (
130+
export const projectPrompted = Effect.fn("SessionInput.projectPrompted")(function* (
138131
db: DatabaseService,
139132
input: {
140133
readonly id: SessionMessage.ID
141134
readonly sessionID: SessionSchema.ID
142135
readonly prompt: Prompt
136+
readonly delivery: Delivery
143137
readonly timeCreated: DateTime.Utc
144138
readonly promotedSeq: number
145139
},
@@ -157,14 +151,32 @@ export const projectPromoted = Effect.fn("SessionInput.projectPromoted")(functio
157151
.returning()
158152
.get()
159153
.pipe(Effect.orDie)
160-
if (!updated) return yield* Effect.die(new LifecycleConflict({ id: input.id }))
161-
const stored = fromRow(updated)
162-
if (
163-
!matchesPrompt(stored, input) ||
164-
DateTime.toEpochMillis(stored.timeCreated) !== DateTime.toEpochMillis(input.timeCreated)
165-
)
166-
return yield* Effect.die(new LifecycleConflict({ id: input.id }))
167-
return toMessage(stored)
154+
if (updated) {
155+
const stored = fromRow(updated)
156+
if (!matchesProjection(stored, input)) return yield* Effect.die(new LifecycleConflict({ id: input.id }))
157+
return
158+
}
159+
160+
const stored = yield* find(db, input.id)
161+
if (stored) {
162+
if (!matchesProjection(stored, input) || stored.promotedSeq !== input.promotedSeq)
163+
return yield* Effect.die(new LifecycleConflict({ id: input.id }))
164+
return
165+
}
166+
167+
yield* db
168+
.insert(SessionInputTable)
169+
.values({
170+
id: input.id,
171+
session_id: input.sessionID,
172+
prompt: encodePrompt(input.prompt),
173+
delivery: input.delivery,
174+
admitted_seq: input.promotedSeq,
175+
promoted_seq: input.promotedSeq,
176+
time_created: DateTime.toEpochMillis(input.timeCreated),
177+
})
178+
.run()
179+
.pipe(Effect.orDie)
168180
})
169181

170182
export const hasPending = Effect.fn("SessionInput.hasPending")(function* (
@@ -201,35 +213,17 @@ const matchesPrompt = (input: Admitted, expected: { readonly sessionID: SessionS
201213
input.sessionID === expected.sessionID &&
202214
JSON.stringify(encodePrompt(input.prompt)) === JSON.stringify(encodePrompt(expected.prompt))
203215

204-
export const projectLegacyPrompted = Effect.fn("SessionInput.projectLegacyPrompted")(function* (
205-
db: DatabaseService,
206-
input: {
207-
readonly id: SessionMessage.ID
216+
const matchesProjection = (
217+
input: Admitted,
218+
expected: {
208219
readonly sessionID: SessionSchema.ID
209220
readonly prompt: Prompt
210221
readonly delivery: Delivery
211222
readonly timeCreated: DateTime.Utc
212-
readonly promotedSeq: number
213223
},
214-
) {
215-
const inserted = yield* db
216-
.insert(SessionInputTable)
217-
.values({
218-
id: input.id,
219-
session_id: input.sessionID,
220-
admitted_seq: input.promotedSeq,
221-
prompt: encodePrompt(input.prompt),
222-
delivery: input.delivery,
223-
promoted_seq: input.promotedSeq,
224-
time_created: DateTime.toEpochMillis(input.timeCreated),
225-
})
226-
.onConflictDoNothing()
227-
.returning()
228-
.get()
229-
.pipe(Effect.orDie)
230-
if (!inserted) return yield* Effect.die("Prompt projection conflicts with admitted input")
231-
return fromRow(inserted)
232-
})
224+
) =>
225+
equivalent(input, expected) &&
226+
DateTime.toEpochMillis(input.timeCreated) === DateTime.toEpochMillis(expected.timeCreated)
233227

234228
const publish = Effect.fn("SessionInput.publish")(function* (
235229
db: DatabaseService,
@@ -238,18 +232,19 @@ const publish = Effect.fn("SessionInput.publish")(function* (
238232
rows: ReadonlyArray<typeof SessionInputTable.$inferSelect>,
239233
) {
240234
for (const row of rows) {
235+
const id = SessionMessage.ID.make(row.id)
241236
yield* events
242-
.publish(SessionEvent.PromptLifecycle.Promoted, {
237+
.publish(SessionEvent.Prompted, {
243238
sessionID,
244-
timestamp: yield* DateTime.now,
245-
messageID: SessionMessage.ID.make(row.id),
239+
timestamp: DateTime.makeUnsafe(row.time_created),
240+
messageID: id,
246241
prompt: decodePrompt(row.prompt),
247-
timeCreated: DateTime.makeUnsafe(row.time_created),
242+
delivery: row.delivery,
248243
})
249244
.pipe(
250245
Effect.catchDefect((defect) =>
251246
defect instanceof LifecycleConflict
252-
? find(db, SessionMessage.ID.make(row.id)).pipe(
247+
? find(db, id).pipe(
253248
Effect.flatMap((stored) => (stored?.promotedSeq === undefined ? Effect.die(defect) : Effect.void)),
254249
)
255250
: Effect.die(defect),
@@ -303,13 +298,3 @@ export const promoteNextQueued = Effect.fn("SessionInput.promoteNextQueued")(fun
303298
.pipe(Effect.orDie)
304299
return row === undefined ? false : yield* publish(db, events, sessionID, [row]).pipe(Effect.as(true))
305300
})
306-
307-
const toMessage = (input: Admitted) =>
308-
new SessionMessage.User({
309-
id: input.id,
310-
type: "user",
311-
text: input.prompt.text,
312-
files: input.prompt.files,
313-
agents: input.prompt.agents,
314-
time: { created: input.timeCreated },
315-
})

packages/core/src/session/message-updater.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,6 @@ export function update(adapter: Adapter, event: SessionEvent.Event) {
137137
)
138138
},
139139
"session.next.prompt.admitted": () => Effect.void,
140-
"session.next.prompt.promoted": () => Effect.void,
141140
"session.next.context.updated": (event) =>
142141
adapter.appendMessage(
143142
new SessionMessage.System({

packages/core/src/session/projector.ts

Lines changed: 4 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ type DatabaseService = Database.Interface["db"]
2121
const decodeMessage = Schema.decodeUnknownSync(SessionMessage.Message)
2222
const encodeMessage = Schema.encodeSync(SessionMessage.Message)
2323

24-
class PromptAlreadyProjected extends Error {}
2524
export class SessionAlreadyProjected extends Error {}
2625

2726
type Usage = {
@@ -350,27 +349,19 @@ export const layer = Layer.effectDiscard(
350349
)
351350
yield* events.project(SessionEvent.Prompted, (event) =>
352351
Effect.gen(function* () {
353-
const messageID = event.data.messageID
354-
const existing = yield* db
355-
.select({ id: SessionMessageTable.id })
356-
.from(SessionMessageTable)
357-
.where(eq(SessionMessageTable.id, messageID))
358-
.get()
359-
.pipe(Effect.orDie)
360-
if (existing) return yield* Effect.die(new PromptAlreadyProjected())
361-
yield* run(db, event)
362352
if (event.durable === undefined) return yield* Effect.die("Durable Session event is missing aggregate sequence")
363-
yield* SessionInput.projectLegacyPrompted(db, {
364-
id: messageID,
353+
yield* SessionInput.projectPrompted(db, {
354+
id: event.data.messageID,
365355
sessionID: event.data.sessionID,
366356
prompt: event.data.prompt,
367357
delivery: event.data.delivery,
368358
timeCreated: event.data.timestamp,
369359
promotedSeq: event.durable.seq,
370360
})
361+
yield* run(db, event)
371362
}),
372363
)
373-
yield* events.project(SessionEvent.PromptLifecycle.Admitted, (event) =>
364+
yield* events.project(SessionEvent.PromptAdmitted, (event) =>
374365
Effect.gen(function* () {
375366
if (event.durable === undefined) return yield* Effect.die("Durable Session event is missing aggregate sequence")
376367
yield* SessionInput.projectAdmitted(db, {
@@ -383,22 +374,6 @@ export const layer = Layer.effectDiscard(
383374
})
384375
}),
385376
)
386-
yield* events.project(SessionEvent.PromptLifecycle.Promoted, (event) =>
387-
Effect.gen(function* () {
388-
if (event.durable === undefined) return yield* Effect.die("Durable Session event is missing aggregate sequence")
389-
yield* insertMessage(
390-
db,
391-
event,
392-
yield* SessionInput.projectPromoted(db, {
393-
id: event.data.messageID,
394-
sessionID: event.data.sessionID,
395-
prompt: event.data.prompt,
396-
timeCreated: event.data.timeCreated,
397-
promotedSeq: event.durable.seq,
398-
}),
399-
)
400-
}),
401-
)
402377
yield* events.project(SessionEvent.ContextUpdated, (event) => run(db, event))
403378
yield* events.project(SessionEvent.Synthetic, (event) => run(db, event))
404379
yield* events.project(SessionEvent.Shell.Started, (event) => run(db, event))

0 commit comments

Comments
 (0)