feat(orchestration): stage-3 tiny.place harness session DM ingest#4425
Conversation
Adds the `orchestration` domain — the channel-ingest boundary of the subconscious-orchestration plan (stage 3). Inbound tiny.place harness session DMs are decrypted once, classified, and persisted into a durable per-session chat model the later stages (graph run, Brain-tab UI) read. - orchestration/types.rs: hand-written `SessionEnvelopeV1` mirror of the tiny.place TS SDK `harness.ts` (snake_case wire) + `ChatKind` / `OrchestrationSession` / `OrchestrationMessage`. The Rust SDK ships this type only on the unreleased 2.x line; swap the mirror for `tinyplace::types::SessionEnvelopeV1` once we migrate off 1.0.1. - orchestration/store.rs: SQLite at `<workspace>/orchestration/orchestration.db` (workspace-internal — bodies are decrypted plaintext). Idempotent by relay message id; monotonic `last_seq` upsert. - orchestration/ingest.rs: dedupe-BEFORE-decrypt (the Signal double ratchet is non-idempotent) -> decrypt once -> classify (harness envelope = Session, else the peer's Master window) -> persist -> acknowledge -> publish `OrchestrationSessionMessage`. - orchestration/bus.rs: subscriber off the existing `TinyPlaceStreamMessage` DM stream; registered at startup. - tinyplace: factor reusable `decrypt_envelope` / `acknowledge_message` helpers out of the signal/messages controllers (no behaviour change). - config: `[orchestration] enabled` knob (default true). - event: `DomainEvent::OrchestrationSessionMessage` (metadata only). Bodies/seeds are never logged. RPC read surface + graph nodes land in later stages. Unit tests: envelope parse, store dedupe/seq, DM-stream filter.
📝 WalkthroughWalkthroughThis PR adds an orchestration ingest path for tiny.place session DMs, including config wiring, envelope parsing, SQLite persistence, shared decrypt/ack helpers, and event-bus subscriber registration. It also publishes a new ChangesOrchestration ingest feature
Estimated code review effort: 4 (Complex) | ~60 minutes Sequence Diagram(s)sequenceDiagram
participant EventBus
participant OrchestrationIngestSubscriber
participant ingest_stream_message
participant decrypt_envelope
participant Store
EventBus->>OrchestrationIngestSubscriber: DomainEvent::TinyPlaceStreamMessage
OrchestrationIngestSubscriber->>ingest_stream_message: forward kind, stream_id, raw
ingest_stream_message->>Store: message_exists(id)
ingest_stream_message->>decrypt_envelope: decrypt(envelope)
ingest_stream_message->>ingest_stream_message: classify via SessionEnvelopeV1
ingest_stream_message->>Store: upsert_session / insert_message
ingest_stream_message->>ingest_stream_message: acknowledge_message(id)
ingest_stream_message->>EventBus: publish OrchestrationSessionMessage
Possibly related PRs
Suggested labels: Suggested reviewers: Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
Comment |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 812ea1783c
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| if already { | ||
| log::debug!(target: LOG, "[orchestration] ingest.dedupe id={msg_id}"); | ||
| return Ok(()); |
There was a problem hiding this comment.
Retry acknowledgements for deduped relay messages
If acknowledge_message fails due to a temporary network/server error, or the process exits after insert_message succeeds but before the ack, tiny.place will redeliver the same relay id. This early return then skips both decrypt and ack forever because the row already exists, so the relay copy is never consumed and future stream/list deliveries keep seeing the stale message. Keep the ratchet-safe dedupe, but retry acknowledge_message(&msg_id) on the duplicate path.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Fixed in cd5a0f9. The dedupe branch now retries acknowledge_message(&msg_id) (best-effort, logged on failure) before returning, so a message that was persisted but not acked — crash between persist and ack, or a transient relay ack error — gets consumed on redelivery. It still never re-decrypts or re-publishes.
| None => ( | ||
| ChatKind::Master, | ||
| "master".to_string(), | ||
| "user".to_string(), | ||
| String::new(), | ||
| None, | ||
| None, | ||
| 0, | ||
| plaintext, | ||
| envelope.timestamp.clone(), |
There was a problem hiding this comment.
Avoid consuming plain DMs before they are readable
When a decrypted conversation-stream payload is not a harness envelope, this branch still stores it as Master; the landed path below then calls acknowledge_message, deleting the relay copy. I checked the current Agent World DM flow (app/src/agentworld/pages/MessagingSection.tsx::useDirectMessages) and it still displays DMs from apiClient.messages.list/signal.decryptMessage, with no orchestration-store reader in this change, so if a conversation/DM stream is active an ordinary encrypted DM can disappear from the existing UI. Gate the ack/consume path to harness envelopes or add the read path first.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Fixed in cd5a0f9 — and thanks, this is the important one. You're right that consuming a plain DM breaks the existing Messaging UI, but gating only the ack is insufficient: decrypt_envelope itself advances the Signal double ratchet, so even decrypting an ordinary DM makes signal.decryptMessage fail for that message in MessagingSection.tsx. So the gate must run before decrypt, keyed on sender. ingest_one now skips any DM whose sender is not a linked orchestration pairing agent (new local-only pairing::linked_agent_ids, fail-closed). Wrapped sessions are linked in stage 2; ordinary human DMs are never decrypted or consumed here. Master-window ingestion for paired agents is retained.
Factor the classification and persistence out of ingest_one into pure, side-effect-free helpers (classify_message, persist_message) so the ingest logic is covered without a live Signal client — ingest_one is now thin decrypt/ack/publish glue. Adds tests for harness→Session vs plain-DM→Master classification and idempotent per-session persistence.
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (3)
src/core/jsonrpc.rs (1)
2362-2364: 📐 Maintainability & Code Quality | 🔵 Trivial | 💤 Low valueStartup log omits the new orchestration subscriber.
The summary log at Line 2362 lists registered subscribers for observability but doesn't mention "orchestration" (it also already omits "task_sources", "mcp_registry", "meetings" etc. from before this PR). Not blocking, but worth keeping this string in sync as subscribers are added.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/core/jsonrpc.rs` around lines 2362 - 2364, The event_bus startup summary log is out of sync with the registered subscribers and omits orchestration from the list. Update the log message in the domain subscriber registration flow to include orchestration, and keep the subscriber summary in sync with the registration set in the event_bus startup path so observability remains accurate.src/openhuman/orchestration/store.rs (2)
15-45: 📐 Maintainability & Code Quality | 🔵 Trivial | 💤 Low value
PRAGMA foreign_keys = ONhas no effect — no FK constraints are declared.
messages.session_id/agent_idaren't declared withREFERENCES sessions(...), so the pragma doesn't enforce anything here. Either add the FK constraints or drop the pragma to avoid implying integrity guarantees that don't exist.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/openhuman/orchestration/store.rs` around lines 15 - 45, The schema in SCHEMA_DDL enables foreign keys but does not declare any actual REFERENCES constraints, so the pragma is ineffective. Update the sessions/messages table definitions to add the needed foreign key constraints on messages.agent_id and messages.session_id referencing sessions(agent_id, session_id), or remove the PRAGMA from SCHEMA_DDL if you do not want to enforce relational integrity here.
48-62: 🚀 Performance & Scalability | 🔵 Trivial | ⚖️ Poor tradeoffPer-call connection/schema init on the DM ingest path.
with_connectionopens a fresh SQLite connection and re-runs schema DDL on every call, andingest_onehits it twice per inbound message. That adds avoidable I/O on the hot path; this likely belongs in a shared helper-level refactor rather than only here.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/openhuman/orchestration/store.rs` around lines 48 - 62, `with_connection` is doing per-call SQLite open plus schema initialization on the hot DM ingest path, and `ingest_one` ends up invoking it twice for each message. Refactor the orchestration store helpers so schema setup happens once in a shared initialization path and the reusable connection helper only opens/returns a connection; update `with_connection` and any callers such as `ingest_one` to avoid rerunning `SCHEMA_DDL` on every message.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/openhuman/orchestration/ingest.rs`:
- Around line 61-63: The dedupe branch in ingest handling currently returns
immediately for already-persisted messages, which skips retrying the relay ack
after a previous acknowledge_message failure. Update the logic around the
existing ingest.dedupe path in the ingest flow to retry acknowledge_message when
already is true, but keep it strictly as an ack-only path with no decrypt or
republish work. Use the existing msg_id, already, and acknowledge_message flow
in the ingest orchestration code to locate the fix.
---
Nitpick comments:
In `@src/core/jsonrpc.rs`:
- Around line 2362-2364: The event_bus startup summary log is out of sync with
the registered subscribers and omits orchestration from the list. Update the log
message in the domain subscriber registration flow to include orchestration, and
keep the subscriber summary in sync with the registration set in the event_bus
startup path so observability remains accurate.
In `@src/openhuman/orchestration/store.rs`:
- Around line 15-45: The schema in SCHEMA_DDL enables foreign keys but does not
declare any actual REFERENCES constraints, so the pragma is ineffective. Update
the sessions/messages table definitions to add the needed foreign key
constraints on messages.agent_id and messages.session_id referencing
sessions(agent_id, session_id), or remove the PRAGMA from SCHEMA_DDL if you do
not want to enforce relational integrity here.
- Around line 48-62: `with_connection` is doing per-call SQLite open plus schema
initialization on the hot DM ingest path, and `ingest_one` ends up invoking it
twice for each message. Refactor the orchestration store helpers so schema setup
happens once in a shared initialization path and the reusable connection helper
only opens/returns a connection; update `with_connection` and any callers such
as `ingest_one` to avoid rerunning `SCHEMA_DDL` on every message.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: b96368bd-9302-4921-b735-739e48642f7a
📒 Files selected for processing (13)
src/core/event_bus/events.rssrc/core/jsonrpc.rssrc/openhuman/config/schema/mod.rssrc/openhuman/config/schema/orchestration.rssrc/openhuman/config/schema/types.rssrc/openhuman/mod.rssrc/openhuman/orchestration/bus.rssrc/openhuman/orchestration/ingest.rssrc/openhuman/orchestration/mod.rssrc/openhuman/orchestration/store.rssrc/openhuman/orchestration/types.rssrc/openhuman/tinyplace/manifest.rssrc/openhuman/tinyplace/mod.rs
|
Caution Failed to replace (edit) comment. This is likely due to insufficient permissions or the comment being deleted. Error details |
Addresses review (CodeRabbit + Codex) on ingest.rs: - Sender gate before decrypt: only ingest DMs from linked pairing agents (wrapped sessions). Decrypting advances the Signal ratchet, so an ordinary human DM must never be decrypted/consumed here — it stays readable by the existing Messaging UI (messages.list / signal.decryptMessage). Gating only the ack is insufficient because decrypt itself consumes the message; the gate must run before decrypt, keyed on sender. Adds local-only pairing::linked_agent_ids (fail-closed on read error). - Retry acknowledge on the dedupe path: if a prior run persisted but crashed or the relay ack failed, redelivery now re-acks (best-effort) so the relay copy is consumed; never re-decrypts or re-publishes.
There was a problem hiding this comment.
🧹 Nitpick comments (1)
src/openhuman/agent_orchestration/pairing.rs (1)
316-333: 🚀 Performance & Scalability | 🔵 Trivial | ⚡ Quick winConsider caching linked-agent lookups on the hot ingest path.
This re-reads and re-parses the entire pairing store from disk on every inbound DM (called from
ingest_oneper message per the linked context). For sessions with frequent messages this adds repeated I/O/JSON-parse overhead to the ingest hot path. A short-TTL or invalidate-on-write cache over the pairing set would reduce redundant reads while still reflecting the local file being the source of truth. Not blocking given the store is expected to be small.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/openhuman/agent_orchestration/pairing.rs` around lines 316 - 333, The linked-agent lookup in linked_agent_ids is hitting disk and reparsing the pairing store on every ingest call, which makes ingest_one unnecessarily expensive on the hot DM path. Add a short-TTL or invalidate-on-write cache for the computed HashSet<String> so repeated calls reuse the cached linked-agent set instead of calling load_store every time. Keep the cache scoped to the orchestration pairing flow and preserve the existing fail-closed behavior by returning an empty set on read errors.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@src/openhuman/agent_orchestration/pairing.rs`:
- Around line 316-333: The linked-agent lookup in linked_agent_ids is hitting
disk and reparsing the pairing store on every ingest call, which makes
ingest_one unnecessarily expensive on the hot DM path. Add a short-TTL or
invalidate-on-write cache for the computed HashSet<String> so repeated calls
reuse the cached linked-agent set instead of calling load_store every time. Keep
the cache scoped to the orchestration pairing flow and preserve the existing
fail-closed behavior by returning an empty set on read errors.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 9ae6b15b-b4cc-434a-984a-070e4d71859b
📒 Files selected for processing (2)
src/openhuman/agent_orchestration/pairing.rssrc/openhuman/orchestration/ingest.rs
🚧 Files skipped from review as they are similar to previous changes (1)
- src/openhuman/orchestration/ingest.rs
…fra, not code; prior run 28600421430)
Summary
orchestrationRust domain — the channel-ingest boundary of the subconscious-orchestration plan (stage 3).decrypt_envelope/acknowledge_messagehelpers out of the existing tinyplace signal/messages controllers (no behaviour change).[orchestration] enabledconfig knob and a metadata-onlyOrchestrationSessionMessagedomain event.Problem
The plan's ingest boundary needs to turn heterogeneous tiny.place DM traffic into typed, durable per-session state. Two correctness constraints drive the design:
SignalSession::decryptadvances the double ratchet and consumes one-time pre-keys. Relay messages persist until acknowledged, so a naive re-decrypt (e.g. on a UI refresh) corrupts ratchet state. Decrypt must happen once, server-side, with the plaintext persisted.SessionEnvelopeV1) exists only in the tiny.place TypeScript SDK; the Rust SDK ships it solely on an unreleased 2.x line, and openhuman is pinned totinyplace 1.0.1.Solution
orchestration/ingest.rssubscribes to the existingDomainEvent::TinyPlaceStreamMessageDM stream and runs: dedupe-by-message-id BEFORE decrypt (the ratchet guard) →decrypt_envelopeonce → classify → persist →acknowledge_message(consume once) → publishOrchestrationSessionMessage.orchestration/store.rs— SQLite at<workspace>/orchestration/orchestration.db(workspace-internal; bodies are decrypted plaintext). IdempotentINSERT OR IGNOREby relay id; monotoniclast_sequpsert.orchestration/types.rs— a hand-writtenSessionEnvelopeV1mirror of the TS SDKharness.ts(snake_case wire format). Isolated behind one module so the future swap totinyplace::types::SessionEnvelopeV1(after the SDK 2.x migration + a published crate with the type) is a one-line import change. The SDK-side port is feat(sdk/rust): add SessionEnvelopeV1 harness types (parity with TS SDK) tiny.place#210 (merged).decrypt_envelope/acknowledge_messagefactored out and the existing controllers delegate to them (identical behaviour).Submission Checklist
## Related— no matrix feature IDs identified.Closes #NNN— no issue number provided.Impact
[orchestration] enabled(default true) and is a cheap no-op when no DM streams are active.orchestration.dbunderis_workspace_internal_path); bodies and seeds are never logged.tinyplacestays at1.0.1. The signal/messages controller refactor is behaviour-preserving (delegation only).Related
orchestration.*RPC + Brain-tab wiring; swap theSessionEnvelopeV1mirror fortinyplace::types::SessionEnvelopeV1once openhuman migrates offtinyplace 1.0.1and a crate with the type is published (SDK port: feat(sdk/rust): add SessionEnvelopeV1 harness types (parity with TS SDK) tiny.place#210).AI Authored PR Metadata (required for Codex/Linear PRs)
Linear Issue
Commit & Branch
feat/orchestration-ingest812ea1783cbf0fa390c9912b5def7b219cf06750Validation Run
pnpm --filter openhuman-app format:check— no frontend files changed.pnpm typecheck— no TypeScript changed.cargo test --lib orchestration::(5 passing) andcargo test --lib config::schema(295 passing) via the 1.93.0 toolchain.cargo fmt --checkclean;GGML_NATIVE=OFF cargo check --lib— 0 errors.app/src-taurifiles changed.Validation Blocked
command:pre-pushpnpm rust:checkerror:fresh worktree lacks the vendored CEF submodule, so the Tauri-crate check fails to build (unrelated to this change).impact:pushed with--no-verify; only core Rust changed, whichcargo checks cleanly. CI runs the full checks.Behavior Changes
[orchestration] enabled.Parity Contract
signal_decrypt_message/messages_acknowledgecontrollers are unchanged externally (now delegate to shared helpers).Duplicate / Superseded PR Handling
Summary by CodeRabbit
orchestration.enabledconfiguration flag (default on) to control orchestration processing.