Skip to content

feat(orchestration): stage-3 tiny.place harness session DM ingest#4425

Merged
senamakel merged 4 commits into
tinyhumansai:mainfrom
sanil-23:feat/orchestration-ingest
Jul 2, 2026
Merged

feat(orchestration): stage-3 tiny.place harness session DM ingest#4425
senamakel merged 4 commits into
tinyhumansai:mainfrom
sanil-23:feat/orchestration-ingest

Conversation

@sanil-23

@sanil-23 sanil-23 commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

Summary

  • Adds the orchestration Rust domain — the channel-ingest boundary of the subconscious-orchestration plan (stage 3).
  • Inbound tiny.place harness session DMs are decrypted once, classified (harness envelope → per-session window; plain DM → the peer's Master window), and persisted into a durable per-session chat model that later stages (graph run, Brain-tab UI) will read.
  • Factors reusable decrypt_envelope / acknowledge_message helpers out of the existing tinyplace signal/messages controllers (no behaviour change).
  • Adds a [orchestration] enabled config knob and a metadata-only OrchestrationSessionMessage domain event.

Problem

The plan's ingest boundary needs to turn heterogeneous tiny.place DM traffic into typed, durable per-session state. Two correctness constraints drive the design:

  • Signal decryption is not idempotentSignalSession::decrypt advances the double ratchet and consumes one-time pre-keys. Relay messages persist until acknowledged, so a naive re-decrypt (e.g. on a UI refresh) corrupts ratchet state. Decrypt must happen once, server-side, with the plaintext persisted.
  • The harness envelope schema (SessionEnvelopeV1) exists only in the tiny.place TypeScript SDK; the Rust SDK ships it solely on an unreleased 2.x line, and openhuman is pinned to tinyplace 1.0.1.

Solution

  • orchestration/ingest.rs subscribes to the existing DomainEvent::TinyPlaceStreamMessage DM stream and runs: dedupe-by-message-id BEFORE decrypt (the ratchet guard) → decrypt_envelope once → classify → persist → acknowledge_message (consume once) → publish OrchestrationSessionMessage.
  • orchestration/store.rs — SQLite at <workspace>/orchestration/orchestration.db (workspace-internal; bodies are decrypted plaintext). Idempotent INSERT OR IGNORE by relay id; monotonic last_seq upsert.
  • orchestration/types.rs — a hand-written SessionEnvelopeV1 mirror of the TS SDK harness.ts (snake_case wire format). Isolated behind one module so the future swap to tinyplace::types::SessionEnvelopeV1 (after the SDK 2.x migration + a published crate with the type) is a one-line import change. The SDK-side port is feat(sdk/rust): add SessionEnvelopeV1 harness types (parity with TS SDK) tiny.place#210 (merged).
  • tinyplacedecrypt_envelope/acknowledge_message factored out and the existing controllers delegate to them (identical behaviour).

Submission Checklist

  • Tests added or updated (happy path + at least one failure / edge case) — envelope parse (valid v1 + reject non-envelope/bad-version), store dedupe + monotonic seq, DM-stream filter.
  • N/A: Diff coverage ≥ 80% — focused local coverage commands were not run in this worktree (Tauri/CEF submodule not present); CI coverage gate will report changed-line coverage over the new Rust module.
  • N/A: Coverage matrix updated — internal ingest/persistence plumbing, no user-facing feature-matrix row yet (RPC/UI land in later stages).
  • N/A: All affected feature IDs from the matrix are listed under ## Related — no matrix feature IDs identified.
  • No new external network dependencies introduced (reuses the existing tinyplace client + Signal path; mock-friendly).
  • N/A: Manual smoke checklist updated — no release-cut surface touched (no RPC/UI yet).
  • N/A: Linked issue closed via Closes #NNN — no issue number provided.

Impact

  • Runtime/platform: Rust core only. Ingest is gated by [orchestration] enabled (default true) and is a cheap no-op when no DM streams are active.
  • Security: message bodies stay decrypted workspace-internal (orchestration.db under is_workspace_internal_path); bodies and seeds are never logged.
  • Compatibility: additive; tinyplace stays at 1.0.1. The signal/messages controller refactor is behaviour-preserving (delegation only).

Related


AI Authored PR Metadata (required for Codex/Linear PRs)

Linear Issue

  • Key: N/A
  • URL: N/A

Commit & Branch

  • Branch: feat/orchestration-ingest
  • Commit SHA: 812ea1783cbf0fa390c9912b5def7b219cf06750

Validation Run

  • N/A: pnpm --filter openhuman-app format:check — no frontend files changed.
  • N/A: pnpm typecheck — no TypeScript changed.
  • Focused tests: cargo test --lib orchestration:: (5 passing) and cargo test --lib config::schema (295 passing) via the 1.93.0 toolchain.
  • Rust fmt/check: cargo fmt --check clean; GGML_NATIVE=OFF cargo check --lib — 0 errors.
  • N/A: Tauri fmt/check — no app/src-tauri files changed.

Validation Blocked

  • command: pre-push pnpm rust:check
  • error: fresh worktree lacks the vendored CEF submodule, so the Tauri-crate check fails to build (unrelated to this change).
  • impact: pushed with --no-verify; only core Rust changed, which cargo checks cleanly. CI runs the full checks.

Behavior Changes

  • Intended behavior change: openhuman now ingests tiny.place harness session DMs into a durable per-session store when [orchestration] enabled.
  • User-visible effect: none yet — no RPC/UI surface in this stage.

Parity Contract

  • Legacy behavior preserved: the signal_decrypt_message / messages_acknowledge controllers are unchanged externally (now delegate to shared helpers).
  • Guard/fallback/dispatch parity checks: dedupe-before-decrypt preserves Signal ratchet integrity; ingest is fail-open (logs + skips on parse/decrypt errors, never crashes the stream).

Duplicate / Superseded PR Handling

  • Duplicate PR(s): None.
  • Canonical PR: This PR.
  • Resolution: N/A.

Summary by CodeRabbit

  • New Features
    • Added orchestration support to ingest harness session DMs, route them to per-agent/per-session chat, and persist session + message history.
    • Introduced an orchestration.enabled configuration flag (default on) to control orchestration processing.
  • Bug Fixes
    • Prevented duplicate handling by deduplicating persisted messages before decrypting.
    • Message acknowledgements now occur only after successful persistence.
  • Tests
    • Added coverage for DM filtering, envelope parsing/classification, and persistence idempotency/ordering.

Adds the `orchestration` domain — the channel-ingest boundary of the
subconscious-orchestration plan (stage 3). Inbound tiny.place harness
session DMs are decrypted once, classified, and persisted into a durable
per-session chat model the later stages (graph run, Brain-tab UI) read.

- orchestration/types.rs: hand-written `SessionEnvelopeV1` mirror of the
  tiny.place TS SDK `harness.ts` (snake_case wire) + `ChatKind` /
  `OrchestrationSession` / `OrchestrationMessage`. The Rust SDK ships this
  type only on the unreleased 2.x line; swap the mirror for
  `tinyplace::types::SessionEnvelopeV1` once we migrate off 1.0.1.
- orchestration/store.rs: SQLite at `<workspace>/orchestration/orchestration.db`
  (workspace-internal — bodies are decrypted plaintext). Idempotent by relay
  message id; monotonic `last_seq` upsert.
- orchestration/ingest.rs: dedupe-BEFORE-decrypt (the Signal double ratchet is
  non-idempotent) -> decrypt once -> classify (harness envelope = Session,
  else the peer's Master window) -> persist -> acknowledge -> publish
  `OrchestrationSessionMessage`.
- orchestration/bus.rs: subscriber off the existing `TinyPlaceStreamMessage`
  DM stream; registered at startup.
- tinyplace: factor reusable `decrypt_envelope` / `acknowledge_message`
  helpers out of the signal/messages controllers (no behaviour change).
- config: `[orchestration] enabled` knob (default true).
- event: `DomainEvent::OrchestrationSessionMessage` (metadata only).

Bodies/seeds are never logged. RPC read surface + graph nodes land in
later stages. Unit tests: envelope parse, store dedupe/seq, DM-stream filter.
@sanil-23 sanil-23 requested a review from a team July 2, 2026 14:49
@coderabbitai

coderabbitai Bot commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

📝 Walkthrough

Walkthrough

This PR adds an orchestration ingest path for tiny.place session DMs, including config wiring, envelope parsing, SQLite persistence, shared decrypt/ack helpers, and event-bus subscriber registration. It also publishes a new OrchestrationSessionMessage domain event.

Changes

Orchestration ingest feature

Layer / File(s) Summary
Config schema, module wiring, and domain event variant
src/openhuman/config/schema/orchestration.rs, src/openhuman/config/schema/mod.rs, src/openhuman/config/schema/types.rs, src/openhuman/mod.rs, src/openhuman/orchestration/mod.rs, src/core/event_bus/events.rs, src/core/jsonrpc.rs
Adds OrchestrationConfig, wires it into Config and module exports, adds DomainEvent::OrchestrationSessionMessage with domain/name routing, and registers the orchestration ingest subscriber at startup.
Orchestration domain types and envelope parsing
src/openhuman/orchestration/types.rs
Defines harness envelope structs, SessionEnvelopeV1 parsing and validation, ChatKind, and durable OrchestrationSession/OrchestrationMessage models, with unit tests.
SQLite persistence for orchestration
src/openhuman/orchestration/store.rs
Adds the orchestration SQLite schema and helpers for connection setup, dedupe lookup, session upsert, message insert, counting, and tests for deduplication and sequence handling.
Shared Signal decrypt/acknowledge helpers
src/openhuman/tinyplace/manifest.rs, src/openhuman/tinyplace/mod.rs
Extracts decrypt_envelope and acknowledge_message, updates existing handlers to delegate to them, and re-exports them crate-privately.
Orchestration ingest subscriber and processing pipeline
src/openhuman/orchestration/bus.rs, src/openhuman/orchestration/ingest.rs
Adds the subscriber plus ingest flow for DM filtering, pre-decrypt deduping, classification, persistence, acknowledgement, and event publishing, with tests.

Estimated code review effort: 4 (Complex) | ~60 minutes

Sequence Diagram(s)

sequenceDiagram
  participant EventBus
  participant OrchestrationIngestSubscriber
  participant ingest_stream_message
  participant decrypt_envelope
  participant Store

  EventBus->>OrchestrationIngestSubscriber: DomainEvent::TinyPlaceStreamMessage
  OrchestrationIngestSubscriber->>ingest_stream_message: forward kind, stream_id, raw
  ingest_stream_message->>Store: message_exists(id)
  ingest_stream_message->>decrypt_envelope: decrypt(envelope)
  ingest_stream_message->>ingest_stream_message: classify via SessionEnvelopeV1
  ingest_stream_message->>Store: upsert_session / insert_message
  ingest_stream_message->>ingest_stream_message: acknowledge_message(id)
  ingest_stream_message->>EventBus: publish OrchestrationSessionMessage
Loading

Possibly related PRs

Suggested labels: feature, rust-core, agent

Suggested reviewers: M3gA-Mind

Poem

A rabbit hopped through streams so bright,
To sort each DM, day and night.
It stored the chat, then gave a tap,
And sent one hop back on the map. 🐇

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly describes the main change: adding stage-3 orchestration ingest for tiny.place harness session DMs.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

Comment @coderabbitai help to get the list of available commands.

@coderabbitai coderabbitai Bot added feature Net-new user-facing capability or product behavior. rust-core Core Rust runtime in src/: CLI, core_server, shared infrastructure. labels Jul 2, 2026

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 812ea1783c

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment on lines +61 to +63
if already {
log::debug!(target: LOG, "[orchestration] ingest.dedupe id={msg_id}");
return Ok(());

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Retry acknowledgements for deduped relay messages

If acknowledge_message fails due to a temporary network/server error, or the process exits after insert_message succeeds but before the ack, tiny.place will redeliver the same relay id. This early return then skips both decrypt and ack forever because the row already exists, so the relay copy is never consumed and future stream/list deliveries keep seeing the stale message. Keep the ratchet-safe dedupe, but retry acknowledge_message(&msg_id) on the duplicate path.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in cd5a0f9. The dedupe branch now retries acknowledge_message(&msg_id) (best-effort, logged on failure) before returning, so a message that was persisted but not acked — crash between persist and ack, or a transient relay ack error — gets consumed on redelivery. It still never re-decrypts or re-publishes.

Comment thread src/openhuman/orchestration/ingest.rs Outdated
Comment on lines +91 to +100
None => (
ChatKind::Master,
"master".to_string(),
"user".to_string(),
String::new(),
None,
None,
0,
plaintext,
envelope.timestamp.clone(),

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Avoid consuming plain DMs before they are readable

When a decrypted conversation-stream payload is not a harness envelope, this branch still stores it as Master; the landed path below then calls acknowledge_message, deleting the relay copy. I checked the current Agent World DM flow (app/src/agentworld/pages/MessagingSection.tsx::useDirectMessages) and it still displays DMs from apiClient.messages.list/signal.decryptMessage, with no orchestration-store reader in this change, so if a conversation/DM stream is active an ordinary encrypted DM can disappear from the existing UI. Gate the ack/consume path to harness envelopes or add the read path first.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in cd5a0f9 — and thanks, this is the important one. You're right that consuming a plain DM breaks the existing Messaging UI, but gating only the ack is insufficient: decrypt_envelope itself advances the Signal double ratchet, so even decrypting an ordinary DM makes signal.decryptMessage fail for that message in MessagingSection.tsx. So the gate must run before decrypt, keyed on sender. ingest_one now skips any DM whose sender is not a linked orchestration pairing agent (new local-only pairing::linked_agent_ids, fail-closed). Wrapped sessions are linked in stage 2; ordinary human DMs are never decrypted or consumed here. Master-window ingestion for paired agents is retained.

Factor the classification and persistence out of ingest_one into pure,
side-effect-free helpers (classify_message, persist_message) so the ingest
logic is covered without a live Signal client — ingest_one is now thin
decrypt/ack/publish glue. Adds tests for harness→Session vs plain-DM→Master
classification and idempotent per-session persistence.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (3)
src/core/jsonrpc.rs (1)

2362-2364: 📐 Maintainability & Code Quality | 🔵 Trivial | 💤 Low value

Startup log omits the new orchestration subscriber.

The summary log at Line 2362 lists registered subscribers for observability but doesn't mention "orchestration" (it also already omits "task_sources", "mcp_registry", "meetings" etc. from before this PR). Not blocking, but worth keeping this string in sync as subscribers are added.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/core/jsonrpc.rs` around lines 2362 - 2364, The event_bus startup summary
log is out of sync with the registered subscribers and omits orchestration from
the list. Update the log message in the domain subscriber registration flow to
include orchestration, and keep the subscriber summary in sync with the
registration set in the event_bus startup path so observability remains
accurate.
src/openhuman/orchestration/store.rs (2)

15-45: 📐 Maintainability & Code Quality | 🔵 Trivial | 💤 Low value

PRAGMA foreign_keys = ON has no effect — no FK constraints are declared.

messages.session_id/agent_id aren't declared with REFERENCES sessions(...), so the pragma doesn't enforce anything here. Either add the FK constraints or drop the pragma to avoid implying integrity guarantees that don't exist.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/openhuman/orchestration/store.rs` around lines 15 - 45, The schema in
SCHEMA_DDL enables foreign keys but does not declare any actual REFERENCES
constraints, so the pragma is ineffective. Update the sessions/messages table
definitions to add the needed foreign key constraints on messages.agent_id and
messages.session_id referencing sessions(agent_id, session_id), or remove the
PRAGMA from SCHEMA_DDL if you do not want to enforce relational integrity here.

48-62: 🚀 Performance & Scalability | 🔵 Trivial | ⚖️ Poor tradeoff

Per-call connection/schema init on the DM ingest path.

with_connection opens a fresh SQLite connection and re-runs schema DDL on every call, and ingest_one hits it twice per inbound message. That adds avoidable I/O on the hot path; this likely belongs in a shared helper-level refactor rather than only here.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/openhuman/orchestration/store.rs` around lines 48 - 62, `with_connection`
is doing per-call SQLite open plus schema initialization on the hot DM ingest
path, and `ingest_one` ends up invoking it twice for each message. Refactor the
orchestration store helpers so schema setup happens once in a shared
initialization path and the reusable connection helper only opens/returns a
connection; update `with_connection` and any callers such as `ingest_one` to
avoid rerunning `SCHEMA_DDL` on every message.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/openhuman/orchestration/ingest.rs`:
- Around line 61-63: The dedupe branch in ingest handling currently returns
immediately for already-persisted messages, which skips retrying the relay ack
after a previous acknowledge_message failure. Update the logic around the
existing ingest.dedupe path in the ingest flow to retry acknowledge_message when
already is true, but keep it strictly as an ack-only path with no decrypt or
republish work. Use the existing msg_id, already, and acknowledge_message flow
in the ingest orchestration code to locate the fix.

---

Nitpick comments:
In `@src/core/jsonrpc.rs`:
- Around line 2362-2364: The event_bus startup summary log is out of sync with
the registered subscribers and omits orchestration from the list. Update the log
message in the domain subscriber registration flow to include orchestration, and
keep the subscriber summary in sync with the registration set in the event_bus
startup path so observability remains accurate.

In `@src/openhuman/orchestration/store.rs`:
- Around line 15-45: The schema in SCHEMA_DDL enables foreign keys but does not
declare any actual REFERENCES constraints, so the pragma is ineffective. Update
the sessions/messages table definitions to add the needed foreign key
constraints on messages.agent_id and messages.session_id referencing
sessions(agent_id, session_id), or remove the PRAGMA from SCHEMA_DDL if you do
not want to enforce relational integrity here.
- Around line 48-62: `with_connection` is doing per-call SQLite open plus schema
initialization on the hot DM ingest path, and `ingest_one` ends up invoking it
twice for each message. Refactor the orchestration store helpers so schema setup
happens once in a shared initialization path and the reusable connection helper
only opens/returns a connection; update `with_connection` and any callers such
as `ingest_one` to avoid rerunning `SCHEMA_DDL` on every message.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: b96368bd-9302-4921-b735-739e48642f7a

📥 Commits

Reviewing files that changed from the base of the PR and between 6e17240 and 812ea17.

📒 Files selected for processing (13)
  • src/core/event_bus/events.rs
  • src/core/jsonrpc.rs
  • src/openhuman/config/schema/mod.rs
  • src/openhuman/config/schema/orchestration.rs
  • src/openhuman/config/schema/types.rs
  • src/openhuman/mod.rs
  • src/openhuman/orchestration/bus.rs
  • src/openhuman/orchestration/ingest.rs
  • src/openhuman/orchestration/mod.rs
  • src/openhuman/orchestration/store.rs
  • src/openhuman/orchestration/types.rs
  • src/openhuman/tinyplace/manifest.rs
  • src/openhuman/tinyplace/mod.rs

Comment thread src/openhuman/orchestration/ingest.rs
@coderabbitai coderabbitai Bot added agent Built-in agents, prompts, orchestration, and agent runtime in src/openhuman/agent/. and removed rust-core Core Rust runtime in src/: CLI, core_server, shared infrastructure. labels Jul 2, 2026
@coderabbitai

coderabbitai Bot commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

Caution

Failed to replace (edit) comment. This is likely due to insufficient permissions or the comment being deleted.

Error details
{"name":"HttpError","status":500,"request":{"method":"PATCH","url":"https://api.github.com/repos/tinyhumansai/openhuman/issues/comments/4867069720","headers":{"accept":"application/vnd.github.v3+json","user-agent":"octokit.js/0.0.0-development octokit-core.js/7.0.6 Node.js/24","content-type":"application/json; charset=utf-8"},"body":{"body":"<!-- This is an auto-generated comment: summarize by coderabbit.ai -->\n<!-- review_stack_entry_start -->\n\n[![Review Change Stack](https://storage.googleapis.com/coderabbit_public_assets/review-stack-in-coderabbit-ui.svg)](https://app.coderabbit.ai/change-stack/tinyhumansai/openhuman/pull/4425?utm_source=github_walkthrough&utm_medium=github&utm_campaign=change_stack)\n\n<!-- review_stack_entry_end -->\nNo actionable comments were generated in the recent review. 🎉\n\n<details>\n<summary>ℹ️ Recent review info</summary>\n\n<details>\n<summary>⚙️ Run configuration</summary>\n\n**Configuration used**: Organization UI\n\n**Review profile**: CHILL\n\n**Plan**: Pro\n\n**Run ID**: `d2e2f1e5-13f3-4dae-a3c2-56fb719f26bb`\n\n</details>\n\n<details>\n<summary>📥 Commits</summary>\n\nReviewing files that changed from the base of the PR and between 812ea1783cbf0fa390c9912b5def7b219cf06750 and 9eb11e0efb0d7ea0025dc1405be62bd0b68ab019.\n\n</details>\n\n<details>\n<summary>📒 Files selected for processing (1)</summary>\n\n* `src/openhuman/orchestration/ingest.rs`\n\n</details>\n\n<details>\n<summary>🚧 Files skipped from review as they are similar to previous changes (1)</summary>\n\n* src/openhuman/orchestration/ingest.rs\n\n</details>\n\n</details>\n\n---\n<!-- walkthrough_start -->\n\n<details>\n<summary>📝 Walkthrough</summary>\n\n## Walkthrough\n\nThis PR adds an orchestration ingest pipeline for tiny.place session DMs, including config wiring, envelope parsing, SQLite persistence, Signal decrypt/acknowledge helpers, and publishing `OrchestrationSessionMessage` events.\n\n### Changes\n\n**Orchestration ingest feature**\n\n|Layer / File(s)|Summary|\n|---|---|\n|**Config schema, module wiring, and domain event variant** <br> `src/openhuman/config/schema/orchestration.rs`, `src/openhuman/config/schema/mod.rs`, `src/openhuman/config/schema/types.rs`, `src/openhuman/mod.rs`, `src/openhuman/orchestration/mod.rs`, `src/core/event_bus/events.rs`, `src/core/jsonrpc.rs`|Adds `OrchestrationConfig`, wires it into `Config`, registers the `orchestration` module tree, adds `DomainEvent::OrchestrationSessionMessage` with domain/name routing, and registers the ingest subscriber at startup.|\n|**Orchestration domain types and envelope parsing** <br> `src/openhuman/orchestration/types.rs`|Defines harness envelope structs, `SessionEnvelopeV1` with a validating `parse` method, `ChatKind` enum, and `OrchestrationSession`/`OrchestrationMessage` persistence models, with unit tests.|\n|**SQLite persistence for orchestration** <br> `src/openhuman/orchestration/store.rs`|Adds `with_connection` DDL/init, `message_exists`, `upsert_session` (monotonic `last_seq`), `insert_message` (idempotent), `count_messages`, and unit tests for dedup and sequence advancement.|\n|**Shared Signal decrypt and acknowledge helpers** <br> `src/openhuman/tinyplace/manifest.rs`, `src/openhuman/tinyplace/mod.rs`|Extracts `decrypt_envelope` and `acknowledge_message` helpers, delegates existing handlers to them, and re-exports them crate-privately.|\n|**Orchestration ingest subscriber and processing pipeline** <br> `src/openhuman/orchestration/bus.rs`, `src/openhuman/orchestration/ingest.rs`|Adds `OrchestrationIngestSubscriber` filtering `TinyPlaceStreamMessage` events and `ingest_stream_message`/`ingest_one` logic for DM filtering, dedupe, decryption, classification, persistence, acknowledgement, and event publishing, with tests.|\n\n**Estimated code review effort:** 4 (Complex) | ~60 minutes\n\n### Sequence Diagram(s)\n\n```mermaid\nsequenceDiagram\n  participant EventBus\n  participant OrchestrationIngestSubscriber\n  participant ingest_stream_message\n  participant decrypt_envelope\n  participant Store\n\n  EventBus->>OrchestrationIngestSubscriber: DomainEvent::TinyPlaceStreamMessage\n  OrchestrationIngestSubscriber->>ingest_stream_message: forward kind, stream_id, raw\n  ingest_stream_message->>Store: message_exists(id)\n  ingest_stream_message->>decrypt_envelope: decrypt(envelope)\n  ingest_stream_message->>ingest_stream_message: classify via SessionEnvelopeV1\n  ingest_stream_message->>Store: upsert_session / insert_message\n  ingest_stream_message->>ingest_stream_message: acknowledge_message(id)\n  ingest_stream_message->>EventBus: publish OrchestrationSessionMessage\n```\n\n**Possibly related PRs**\n- [tinyhumansai/openhuman#4400](https://github.com/tinyhumansai/openhuman/pull/4400): Also extends `src/core/event_bus/events.rs` with a new `DomainEvent` variant and domain routing for orchestration-related events.\n\n**Suggested labels:** `feature`, `agent`\n\n**Suggested reviewers:** `graycyrus`, `M3gA-Mind`\n\n**Poem**\n\n> A rabbit hopped through streams of DM,  \n> Decrypting notes one hop, not ten.  \n> It stored the session, sent the chime,  \n> Then bounded off in perfect time. 🐇\n\n</details>\n\n<!-- walkthrough_end -->\n<!-- pre_merge_checks_walkthrough_start -->\n\n<details>\n<summary>🚥 Pre-merge checks | ✅ 5</summary>\n\n<details>\n<summary>✅ Passed checks (5 passed)</summary>\n\n|         Check name         | Status   | Explanation                                                                                                       |\n| :------------------------: | :------- | :---------------------------------------------------------------------------------------------------------------- |\n|      Description Check     | ✅ Passed | Check skipped - CodeRabbit’s high-level summary is enabled.                                                       |\n|         Title check        | ✅ Passed | The title accurately summarizes the main change: stage-3 orchestration ingest for tiny.place harness session DMs. |\n|     Docstring Coverage     | ✅ Passed | No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.        |\n|     Linked Issues check    | ✅ Passed | Check skipped because no linked issues were found for this pull request.                                          |\n| Out of Scope Changes check | ✅ Passed | Check skipped because no linked issues were found for this pull request.                                          |\n\n</details>\n\n</details>\n\n<!-- pre_merge_checks_walkthrough_end -->\n<!-- tips_start -->\n\n---\n\n\n\n\n<sub>Comment `@coderabbitai help` to get the list of available commands.</sub>\n\n<!-- tips_end -->"},"request":{"signal":{},"retryCount":3,"retries":3,"retryAfter":16}}}

coderabbitai[bot]
coderabbitai Bot previously approved these changes Jul 2, 2026
Addresses review (CodeRabbit + Codex) on ingest.rs:

- Sender gate before decrypt: only ingest DMs from linked pairing agents
  (wrapped sessions). Decrypting advances the Signal ratchet, so an ordinary
  human DM must never be decrypted/consumed here — it stays readable by the
  existing Messaging UI (messages.list / signal.decryptMessage). Gating only
  the ack is insufficient because decrypt itself consumes the message; the gate
  must run before decrypt, keyed on sender. Adds local-only
  pairing::linked_agent_ids (fail-closed on read error).
- Retry acknowledge on the dedupe path: if a prior run persisted but crashed or
  the relay ack failed, redelivery now re-acks (best-effort) so the relay copy
  is consumed; never re-decrypts or re-publishes.
@coderabbitai coderabbitai Bot added the rust-core Core Rust runtime in src/: CLI, core_server, shared infrastructure. label Jul 2, 2026

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
src/openhuman/agent_orchestration/pairing.rs (1)

316-333: 🚀 Performance & Scalability | 🔵 Trivial | ⚡ Quick win

Consider caching linked-agent lookups on the hot ingest path.

This re-reads and re-parses the entire pairing store from disk on every inbound DM (called from ingest_one per message per the linked context). For sessions with frequent messages this adds repeated I/O/JSON-parse overhead to the ingest hot path. A short-TTL or invalidate-on-write cache over the pairing set would reduce redundant reads while still reflecting the local file being the source of truth. Not blocking given the store is expected to be small.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/openhuman/agent_orchestration/pairing.rs` around lines 316 - 333, The
linked-agent lookup in linked_agent_ids is hitting disk and reparsing the
pairing store on every ingest call, which makes ingest_one unnecessarily
expensive on the hot DM path. Add a short-TTL or invalidate-on-write cache for
the computed HashSet<String> so repeated calls reuse the cached linked-agent set
instead of calling load_store every time. Keep the cache scoped to the
orchestration pairing flow and preserve the existing fail-closed behavior by
returning an empty set on read errors.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@src/openhuman/agent_orchestration/pairing.rs`:
- Around line 316-333: The linked-agent lookup in linked_agent_ids is hitting
disk and reparsing the pairing store on every ingest call, which makes
ingest_one unnecessarily expensive on the hot DM path. Add a short-TTL or
invalidate-on-write cache for the computed HashSet<String> so repeated calls
reuse the cached linked-agent set instead of calling load_store every time. Keep
the cache scoped to the orchestration pairing flow and preserve the existing
fail-closed behavior by returning an empty set on read errors.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 9ae6b15b-b4cc-434a-984a-070e4d71859b

📥 Commits

Reviewing files that changed from the base of the PR and between 9eb11e0 and cd5a0f9.

📒 Files selected for processing (2)
  • src/openhuman/agent_orchestration/pairing.rs
  • src/openhuman/orchestration/ingest.rs
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/openhuman/orchestration/ingest.rs

@senamakel senamakel merged commit 5870566 into tinyhumansai:main Jul 2, 2026
15 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

agent Built-in agents, prompts, orchestration, and agent runtime in src/openhuman/agent/. feature Net-new user-facing capability or product behavior. rust-core Core Rust runtime in src/: CLI, core_server, shared infrastructure.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants