From 812ea1783cbf0fa390c9912b5def7b219cf06750 Mon Sep 17 00:00:00 2001 From: sanil-23 Date: Thu, 2 Jul 2026 20:18:19 +0530 Subject: [PATCH 1/4] feat(orchestration): stage-3 tiny.place harness session DM ingest MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the `orchestration` domain — the channel-ingest boundary of the subconscious-orchestration plan (stage 3). Inbound tiny.place harness session DMs are decrypted once, classified, and persisted into a durable per-session chat model the later stages (graph run, Brain-tab UI) read. - orchestration/types.rs: hand-written `SessionEnvelopeV1` mirror of the tiny.place TS SDK `harness.ts` (snake_case wire) + `ChatKind` / `OrchestrationSession` / `OrchestrationMessage`. The Rust SDK ships this type only on the unreleased 2.x line; swap the mirror for `tinyplace::types::SessionEnvelopeV1` once we migrate off 1.0.1. - orchestration/store.rs: SQLite at `/orchestration/orchestration.db` (workspace-internal — bodies are decrypted plaintext). Idempotent by relay message id; monotonic `last_seq` upsert. - orchestration/ingest.rs: dedupe-BEFORE-decrypt (the Signal double ratchet is non-idempotent) -> decrypt once -> classify (harness envelope = Session, else the peer's Master window) -> persist -> acknowledge -> publish `OrchestrationSessionMessage`. - orchestration/bus.rs: subscriber off the existing `TinyPlaceStreamMessage` DM stream; registered at startup. - tinyplace: factor reusable `decrypt_envelope` / `acknowledge_message` helpers out of the signal/messages controllers (no behaviour change). - config: `[orchestration] enabled` knob (default true). - event: `DomainEvent::OrchestrationSessionMessage` (metadata only). Bodies/seeds are never logged. RPC read surface + graph nodes land in later stages. Unit tests: envelope parse, store dedupe/seq, DM-stream filter. --- src/core/event_bus/events.rs | 11 + src/core/jsonrpc.rs | 2 + src/openhuman/config/schema/mod.rs | 2 + src/openhuman/config/schema/orchestration.rs | 28 +++ src/openhuman/config/schema/types.rs | 6 + src/openhuman/mod.rs | 1 + src/openhuman/orchestration/bus.rs | 59 ++++++ src/openhuman/orchestration/ingest.rs | 165 +++++++++++++++ src/openhuman/orchestration/mod.rs | 17 ++ src/openhuman/orchestration/store.rs | 193 ++++++++++++++++++ src/openhuman/orchestration/types.rs | 199 +++++++++++++++++++ src/openhuman/tinyplace/manifest.rs | 122 +++++++----- src/openhuman/tinyplace/mod.rs | 1 + 13 files changed, 755 insertions(+), 51 deletions(-) create mode 100644 src/openhuman/config/schema/orchestration.rs create mode 100644 src/openhuman/orchestration/bus.rs create mode 100644 src/openhuman/orchestration/ingest.rs create mode 100644 src/openhuman/orchestration/mod.rs create mode 100644 src/openhuman/orchestration/store.rs create mode 100644 src/openhuman/orchestration/types.rs diff --git a/src/core/event_bus/events.rs b/src/core/event_bus/events.rs index b1ee0ae5e5..ff6811ef06 100644 --- a/src/core/event_bus/events.rs +++ b/src/core/event_bus/events.rs @@ -138,6 +138,15 @@ pub enum DomainEvent { source: String, }, + /// A tiny.place harness session DM was ingested and persisted. Metadata only + /// — bodies stay in the workspace-internal orchestration store. Consumed by + /// later stages (graph run, UI socket push). + OrchestrationSessionMessage { + agent_id: String, + session_id: String, + chat_kind: String, + }, + // ── Subconscious orchestrator ─────────────────────────────────────── /// A subconscious trigger finished gate evaluation (promote or drop). /// Observability only — lets dashboards see ingestion volume and the @@ -1226,6 +1235,7 @@ impl DomainEvent { | Self::AgentOrchestrationFailed { .. } | Self::AgentOrchestrationClosed { .. } | Self::OrchestrationPairingChanged { .. } + | Self::OrchestrationSessionMessage { .. } | Self::RunQueueMessageQueued { .. } | Self::RunQueueMessageDelivered { .. } | Self::RunQueueFollowupDispatched { .. } @@ -1383,6 +1393,7 @@ impl DomainEvent { Self::AgentOrchestrationFailed { .. } => "AgentOrchestrationFailed", Self::AgentOrchestrationClosed { .. } => "AgentOrchestrationClosed", Self::OrchestrationPairingChanged { .. } => "OrchestrationPairingChanged", + Self::OrchestrationSessionMessage { .. } => "OrchestrationSessionMessage", Self::SubconsciousTriggerProcessed { .. } => "SubconsciousTriggerProcessed", Self::RunQueueMessageQueued { .. } => "RunQueueMessageQueued", Self::RunQueueMessageDelivered { .. } => "RunQueueMessageDelivered", diff --git a/src/core/jsonrpc.rs b/src/core/jsonrpc.rs index 81999797ef..d23b55ce82 100644 --- a/src/core/jsonrpc.rs +++ b/src/core/jsonrpc.rs @@ -2242,6 +2242,8 @@ fn register_domain_subscribers( // only walks Composio connections, so without this they only sync // on manual "Sync now" and silently go stale. crate::openhuman::memory_sync::workspace::start_workspace_periodic_sync(); + // Orchestration: ingest tiny.place harness session DMs off the stream bus. + crate::openhuman::orchestration::register_orchestration_ingest_subscriber(); // Task-sources proactive ingestion: connection-created hook + poll. crate::openhuman::task_sources::bus::register_task_sources_subscriber(); crate::openhuman::task_sources::start_periodic_poll(); diff --git a/src/openhuman/config/schema/mod.rs b/src/openhuman/config/schema/mod.rs index d9968ef1ae..7bbb8b6799 100644 --- a/src/openhuman/config/schema/mod.rs +++ b/src/openhuman/config/schema/mod.rs @@ -34,6 +34,7 @@ mod local_ai; mod meet; mod node; mod observability; +mod orchestration; mod proxy; mod routes; mod runtime; @@ -69,6 +70,7 @@ pub use local_ai::{LocalAiConfig, LocalAiUsage}; pub use meet::{AutoJoinPolicy, AutoSummarizePolicy, CalendarProvider, MeetConfig}; pub use node::NodeConfig; pub use observability::{AgentTracingBackend, AgentTracingConfig, ObservabilityConfig}; +pub use orchestration::OrchestrationConfig; pub use proxy::{ apply_runtime_proxy_to_builder, build_runtime_proxy_client, build_runtime_proxy_client_with_timeouts, runtime_proxy_config, set_runtime_proxy_config, diff --git a/src/openhuman/config/schema/orchestration.rs b/src/openhuman/config/schema/orchestration.rs new file mode 100644 index 0000000000..80182520e7 --- /dev/null +++ b/src/openhuman/config/schema/orchestration.rs @@ -0,0 +1,28 @@ +//! Orchestration configuration — controls the tiny.place harness session +//! ingest layer. +//! +//! Consumed by [`crate::openhuman::orchestration`]. + +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +fn default_enabled() -> bool { + true +} + +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +#[serde(default)] +pub struct OrchestrationConfig { + /// Ingest inbound tiny.place harness session DMs into the orchestration + /// store. Default: `true`. + #[serde(default = "default_enabled")] + pub enabled: bool, +} + +impl Default for OrchestrationConfig { + fn default() -> Self { + Self { + enabled: default_enabled(), + } + } +} diff --git a/src/openhuman/config/schema/types.rs b/src/openhuman/config/schema/types.rs index 51eaa456f1..df3cfc9a63 100644 --- a/src/openhuman/config/schema/types.rs +++ b/src/openhuman/config/schema/types.rs @@ -164,6 +164,11 @@ pub struct Config { #[serde(default)] pub scheduler_gate: SchedulerGateConfig, + /// tiny.place harness session-DM ingest layer. See + /// [`crate::openhuman::orchestration`]. + #[serde(default)] + pub orchestration: OrchestrationConfig, + /// User-facing activity-level knob (0–4) controlling how proactive /// background AI work is. Maps into scheduler_gate mode, periodic sync /// cadence, heartbeat/subconscious toggles. See issue #3117. @@ -742,6 +747,7 @@ impl Default for Config { reliability: ReliabilityConfig::default(), scheduler: SchedulerConfig::default(), scheduler_gate: SchedulerGateConfig::default(), + orchestration: OrchestrationConfig::default(), agent_activity_level: AgentActivityLevel::default(), memory_sync_interval_secs: None, agent: AgentConfig::default(), diff --git a/src/openhuman/mod.rs b/src/openhuman/mod.rs index 741ef5ea67..246a85af1b 100644 --- a/src/openhuman/mod.rs +++ b/src/openhuman/mod.rs @@ -87,6 +87,7 @@ pub mod migrations; pub mod model_council; pub mod monitor; pub mod notifications; +pub mod orchestration; pub mod overlay; pub mod people; pub mod plan_review; diff --git a/src/openhuman/orchestration/bus.rs b/src/openhuman/orchestration/bus.rs new file mode 100644 index 0000000000..901073b4e0 --- /dev/null +++ b/src/openhuman/orchestration/bus.rs @@ -0,0 +1,59 @@ +//! Event-bus subscriber that drives orchestration ingest off inbound tiny.place +//! DM stream messages. + +use std::sync::{Arc, OnceLock}; + +use async_trait::async_trait; + +use crate::core::event_bus::{subscribe_global, DomainEvent, EventHandler, SubscriptionHandle}; + +static INGEST_HANDLE: OnceLock = OnceLock::new(); + +/// Register the orchestration ingest subscriber on the global event bus. +pub fn register_orchestration_ingest_subscriber() { + if INGEST_HANDLE.get().is_some() { + return; + } + match subscribe_global(Arc::new(OrchestrationIngestSubscriber)) { + Some(handle) => { + let _ = INGEST_HANDLE.set(handle); + } + None => { + log::warn!( + "[orchestration] failed to register ingest subscriber — bus not initialized" + ); + } + } +} + +pub struct OrchestrationIngestSubscriber; + +#[async_trait] +impl EventHandler for OrchestrationIngestSubscriber { + fn name(&self) -> &str { + "orchestration::ingest" + } + + fn domains(&self) -> Option<&[&str]> { + Some(&["tinyplace"]) + } + + async fn handle(&self, event: &DomainEvent) { + let DomainEvent::TinyPlaceStreamMessage { + stream_id, + kind, + message, + } = event + else { + return; + }; + let config = match crate::openhuman::config::Config::load_or_init().await { + Ok(config) => config, + Err(e) => { + log::warn!("[orchestration] ingest.config_load_failed: {e}"); + return; + } + }; + super::ingest::ingest_stream_message(&config, kind, stream_id, message).await; + } +} diff --git a/src/openhuman/orchestration/ingest.rs b/src/openhuman/orchestration/ingest.rs new file mode 100644 index 0000000000..d514ce9392 --- /dev/null +++ b/src/openhuman/orchestration/ingest.rs @@ -0,0 +1,165 @@ +//! DM ingest: decrypt-once → classify → persist → acknowledge. +//! +//! Driven by the existing `DomainEvent::TinyPlaceStreamMessage` (the tinyplace +//! websocket recv loop), filtered to conversation/DM streams. Never logs message +//! bodies or seeds. + +use crate::core::event_bus::{publish_global, DomainEvent}; +use crate::openhuman::config::Config; +use crate::openhuman::tinyplace::{acknowledge_message, decrypt_envelope}; + +use super::store; +use super::types::{ChatKind, OrchestrationMessage, OrchestrationSession, SessionEnvelopeV1}; + +const LOG: &str = "orchestration"; + +/// True for streams that carry ciphertext DM envelopes worth ingesting. +fn is_dm_stream(kind: &str, stream_id: &str) -> bool { + kind.eq_ignore_ascii_case("conversation") + || kind.eq_ignore_ascii_case("dm") + || stream_id.starts_with("conversation:") +} + +/// Entry point from the bus subscriber. Cheap no-op when orchestration is +/// disabled or the stream is not a DM stream. +pub async fn ingest_stream_message( + config: &Config, + kind: &str, + stream_id: &str, + raw: &serde_json::Value, +) { + if !config.orchestration.enabled { + return; + } + if !is_dm_stream(kind, stream_id) { + return; + } + let envelope: tinyplace::types::MessageEnvelope = match serde_json::from_value(raw.clone()) { + Ok(env) => env, + Err(e) => { + log::debug!(target: LOG, "[orchestration] ingest.skip stream={stream_id} not-an-envelope err={e}"); + return; + } + }; + if let Err(e) = ingest_one(config, envelope).await { + log::warn!(target: LOG, "[orchestration] ingest.error stream={stream_id}: {e}"); + } +} + +async fn ingest_one( + config: &Config, + envelope: tinyplace::types::MessageEnvelope, +) -> Result<(), String> { + let msg_id = envelope.id.clone(); + let agent_id = envelope.from.clone(); + log::debug!(target: LOG, "[orchestration] ingest.entry id={msg_id} from={agent_id}"); + + // 1. Dedupe BEFORE decrypt — protects the non-idempotent Signal ratchet. + let workspace_dir = config.workspace_dir.clone(); + let already = store::with_connection(&workspace_dir, |c| store::message_exists(c, &msg_id)) + .map_err(|e| format!("store lookup: {e}"))?; + if already { + log::debug!(target: LOG, "[orchestration] ingest.dedupe id={msg_id}"); + return Ok(()); + } + + // 2. Decrypt exactly once. + let plaintext = decrypt_envelope(&envelope).await?; + + // 3. Classify: harness envelope → Session, else the peer's Master window. + let (chat_kind, session_id, role, source, label, workspace, seq, body, ts) = + match SessionEnvelopeV1::parse(&plaintext) { + Some(env) => { + let label = (env.scope.scope_type == "folder").then(|| env.scope.key.clone()); + let workspace = (!env.scope.cwd.is_empty()).then(|| env.scope.cwd.clone()); + ( + ChatKind::Session, + env.scope.harness_session_id, + env.message.role, + env.harness.provider, + label, + workspace, + env.message.line, + env.message.text, + if env.message.timestamp.is_empty() { + envelope.timestamp.clone() + } else { + env.message.timestamp + }, + ) + } + None => ( + ChatKind::Master, + "master".to_string(), + "user".to_string(), + String::new(), + None, + None, + 0, + plaintext, + envelope.timestamp.clone(), + ), + }; + + // 4. Persist (idempotent). + let now = chrono::Utc::now().to_rfc3339(); + let session_id_for_event = session_id.clone(); + let agent_id_for_event = agent_id.clone(); + let chat_kind_str = chat_kind.as_str().to_string(); + let landed = store::with_connection(&workspace_dir, |c| { + store::upsert_session( + c, + &OrchestrationSession { + session_id: session_id.clone(), + agent_id: agent_id.clone(), + source, + label, + workspace, + last_seq: seq, + created_at: now.clone(), + last_message_at: ts.clone(), + }, + )?; + store::insert_message( + c, + &OrchestrationMessage { + id: msg_id.clone(), + agent_id: agent_id.clone(), + session_id: session_id.clone(), + chat_kind, + role, + body, + timestamp: ts.clone(), + seq, + }, + ) + }) + .map_err(|e| format!("persist: {e}"))?; + + // 5. Acknowledge (consume once) + fan out for stages 4/7. + if landed { + if let Err(e) = acknowledge_message(&msg_id).await { + log::warn!(target: LOG, "[orchestration] ingest.ack_failed id={msg_id}: {e}"); + } + publish_global(DomainEvent::OrchestrationSessionMessage { + agent_id: agent_id_for_event, + session_id: session_id_for_event, + chat_kind: chat_kind_str, + }); + } + log::debug!(target: LOG, "[orchestration] ingest.exit id={msg_id} landed={landed}"); + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn dm_stream_filter() { + assert!(is_dm_stream("conversation", "conversation:abc")); + assert!(is_dm_stream("DM", "x")); + assert!(is_dm_stream("other", "conversation:abc")); + assert!(!is_dm_stream("inbox", "inbox")); + } +} diff --git a/src/openhuman/orchestration/mod.rs b/src/openhuman/orchestration/mod.rs new file mode 100644 index 0000000000..e01f084a03 --- /dev/null +++ b/src/openhuman/orchestration/mod.rs @@ -0,0 +1,17 @@ +//! Orchestration domain — ingests tiny.place harness session DMs (stage 3 of the +//! subconscious-orchestration plan) into a durable per-session chat model. +//! +//! - [`types`]: the harness `SessionEnvelopeV1` mirror + persisted session/message model. +//! - [`store`]: SQLite persistence at `/orchestration/orchestration.db`. +//! - [`ingest`]: decrypt-once → classify → persist → acknowledge. +//! - [`bus`]: subscriber wiring off `TinyPlaceStreamMessage`. +//! +//! The JSON-RPC read surface (`orchestration.*`) and graph nodes land in later +//! stages; this module is transport/ingest only. + +pub mod bus; +pub mod ingest; +pub mod store; +pub mod types; + +pub use bus::register_orchestration_ingest_subscriber; diff --git a/src/openhuman/orchestration/store.rs b/src/openhuman/orchestration/store.rs new file mode 100644 index 0000000000..6f384ed444 --- /dev/null +++ b/src/openhuman/orchestration/store.rs @@ -0,0 +1,193 @@ +//! SQLite persistence for the orchestration domain. +//! +//! Lives at `/orchestration/orchestration.db`. Message bodies are +//! decrypted plaintext, so this path is workspace-internal (protected by +//! `is_workspace_internal_path`). Follows the subconscious/cron `with_connection` +//! pattern. + +use std::path::Path; + +use anyhow::{Context, Result}; +use rusqlite::{params, Connection, OptionalExtension}; + +use super::types::{OrchestrationMessage, OrchestrationSession}; + +const SCHEMA_DDL: &str = " + PRAGMA foreign_keys = ON; + + CREATE TABLE IF NOT EXISTS sessions ( + session_id TEXT NOT NULL, + agent_id TEXT NOT NULL, + source TEXT NOT NULL, + label TEXT, + workspace TEXT, + last_seq INTEGER NOT NULL DEFAULT 0, + created_at TEXT NOT NULL, + last_message_at TEXT NOT NULL, + PRIMARY KEY (agent_id, session_id) + ); + + CREATE TABLE IF NOT EXISTS messages ( + id TEXT PRIMARY KEY, + agent_id TEXT NOT NULL, + session_id TEXT NOT NULL, + chat_kind TEXT NOT NULL, + role TEXT NOT NULL, + body TEXT NOT NULL, + timestamp TEXT NOT NULL, + seq INTEGER NOT NULL DEFAULT 0 + ); + + CREATE INDEX IF NOT EXISTS idx_messages_session + ON messages (agent_id, session_id, timestamp); + + CREATE TABLE IF NOT EXISTS kv (k TEXT PRIMARY KEY, v TEXT NOT NULL); +"; + +/// Open the orchestration DB, initialise the schema, and run `f`. +pub fn with_connection( + workspace_dir: &Path, + f: impl FnOnce(&Connection) -> Result, +) -> Result { + let db_path = workspace_dir.join("orchestration").join("orchestration.db"); + if let Some(parent) = db_path.parent() { + std::fs::create_dir_all(parent) + .with_context(|| format!("create orchestration dir: {}", parent.display()))?; + } + let conn = Connection::open(&db_path) + .with_context(|| format!("open orchestration DB: {}", db_path.display()))?; + conn.execute_batch(SCHEMA_DDL) + .context("initialise orchestration schema")?; + f(&conn) +} + +/// True if a relay message id is already persisted. This guard MUST run before +/// decryption so the non-idempotent Signal double-ratchet is never advanced +/// twice for the same message. +pub fn message_exists(conn: &Connection, id: &str) -> Result { + Ok(conn + .query_row("SELECT 1 FROM messages WHERE id = ?1", params![id], |_| { + Ok(()) + }) + .optional()? + .is_some()) +} + +/// Insert or update the session row (keyed by agent + session). +pub fn upsert_session(conn: &Connection, s: &OrchestrationSession) -> Result<()> { + conn.execute( + "INSERT INTO sessions + (session_id, agent_id, source, label, workspace, last_seq, created_at, last_message_at) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8) + ON CONFLICT(agent_id, session_id) DO UPDATE SET + last_seq = MAX(sessions.last_seq, excluded.last_seq), + last_message_at = excluded.last_message_at, + label = COALESCE(excluded.label, sessions.label), + workspace = COALESCE(excluded.workspace, sessions.workspace)", + params![ + s.session_id, + s.agent_id, + s.source, + s.label, + s.workspace, + s.last_seq, + s.created_at, + s.last_message_at, + ], + )?; + Ok(()) +} + +/// Insert a message, idempotent by relay id. Returns true if a new row landed. +pub fn insert_message(conn: &Connection, m: &OrchestrationMessage) -> Result { + let changed = conn.execute( + "INSERT OR IGNORE INTO messages + (id, agent_id, session_id, chat_kind, role, body, timestamp, seq) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)", + params![ + m.id, + m.agent_id, + m.session_id, + m.chat_kind.as_str(), + m.role, + m.body, + m.timestamp, + m.seq, + ], + )?; + Ok(changed > 0) +} + +/// Count persisted messages for a session (test/observability helper). +pub fn count_messages(conn: &Connection, agent_id: &str, session_id: &str) -> Result { + Ok(conn.query_row( + "SELECT COUNT(*) FROM messages WHERE agent_id = ?1 AND session_id = ?2", + params![agent_id, session_id], + |row| row.get(0), + )?) +} + +#[cfg(test)] +mod tests { + use super::super::types::ChatKind; + use super::*; + + fn msg(id: &str, agent: &str, session: &str, seq: i64) -> OrchestrationMessage { + OrchestrationMessage { + id: id.into(), + agent_id: agent.into(), + session_id: session.into(), + chat_kind: ChatKind::Session, + role: "agent".into(), + body: "hi".into(), + timestamp: "2026-07-02T00:00:00Z".into(), + seq, + } + } + + fn session(agent: &str, session: &str, seq: i64) -> OrchestrationSession { + OrchestrationSession { + session_id: session.into(), + agent_id: agent.into(), + source: "claude".into(), + label: None, + workspace: None, + last_seq: seq, + created_at: "2026-07-02T00:00:00Z".into(), + last_message_at: "2026-07-02T00:00:00Z".into(), + } + } + + #[test] + fn persists_and_dedupes_by_message_id() { + let tmp = tempfile::tempdir().unwrap(); + with_connection(tmp.path(), |conn| { + upsert_session(conn, &session("@a", "h1", 1))?; + assert!(!message_exists(conn, "m1")?); + assert!(insert_message(conn, &msg("m1", "@a", "h1", 1))?); + // Replay of the same id is a no-op and stays deduped. + assert!(!insert_message(conn, &msg("m1", "@a", "h1", 1))?); + assert!(message_exists(conn, "m1")?); + assert_eq!(count_messages(conn, "@a", "h1")?, 1); + Ok(()) + }) + .unwrap(); + } + + #[test] + fn upsert_advances_last_seq_monotonically() { + let tmp = tempfile::tempdir().unwrap(); + with_connection(tmp.path(), |conn| { + upsert_session(conn, &session("@a", "h1", 5))?; + upsert_session(conn, &session("@a", "h1", 2))?; // lower seq must not regress + let seq: i64 = conn.query_row( + "SELECT last_seq FROM sessions WHERE agent_id='@a' AND session_id='h1'", + [], + |r| r.get(0), + )?; + assert_eq!(seq, 5); + Ok(()) + }) + .unwrap(); + } +} diff --git a/src/openhuman/orchestration/types.rs b/src/openhuman/orchestration/types.rs new file mode 100644 index 0000000000..92e9fa9bc0 --- /dev/null +++ b/src/openhuman/orchestration/types.rs @@ -0,0 +1,199 @@ +//! Orchestration domain types. +//! +//! [`SessionEnvelopeV1`] is a hand-written mirror of the tiny.place TypeScript +//! SDK schema `sdk/typescript/src/types/harness.ts`. The Rust SDK does not ship +//! this type on the `1.0.x` line we depend on (it was added on the unreleased +//! `2.x` line — see the SDK port in tinyhumansai/tiny.place#210). Once a crate +//! version that includes it is published *and* openhuman migrates off +//! `tinyplace 1.0.1`, replace this block with +//! `use tinyplace::types::SessionEnvelopeV1;` — field names already match. +//! +//! Wire format is **snake_case** (the CLI wrapper emits a literal snake_case +//! object), unlike the camelCase tiny.place API — so these structs deliberately +//! omit `#[serde(rename_all = "camelCase")]`. + +use serde::{Deserialize, Serialize}; + +/// `envelope_version` discriminator for v1 harness envelopes. +pub const SESSION_ENVELOPE_VERSION_V1: &str = "tinyplace.harness.session.v1"; + +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct HarnessBucket { + #[serde(default)] + pub unit: String, + #[serde(default)] + pub start: String, + #[serde(default)] + pub end: String, +} + +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct HarnessScope { + #[serde(rename = "type", default)] + pub scope_type: String, + #[serde(default)] + pub key: String, + #[serde(default)] + pub cwd: String, + #[serde(default)] + pub wrapper_session_id: String, + #[serde(default)] + pub harness_session_id: String, +} + +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct HarnessInfo { + #[serde(default)] + pub provider: String, + #[serde(default)] + pub command: String, + #[serde(default)] + pub argv: Vec, +} + +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct HarnessEnvelopeMessage { + #[serde(default)] + pub id: String, + #[serde(default)] + pub line: i64, + #[serde(default)] + pub role: String, + #[serde(default)] + pub text: String, + #[serde(default)] + pub timestamp: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub phase: Option, +} + +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct HarnessSource { + #[serde(default)] + pub path: String, + #[serde(default)] + pub record_type: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub source_role: Option, +} + +/// Mirror of the TS `SessionEnvelopeV1` interface. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct SessionEnvelopeV1 { + #[serde(default)] + pub envelope_version: String, + #[serde(default)] + pub version: u32, + #[serde(default)] + pub bucket: HarnessBucket, + #[serde(default)] + pub scope: HarnessScope, + #[serde(default)] + pub harness: HarnessInfo, + #[serde(default)] + pub message: HarnessEnvelopeMessage, + #[serde(default)] + pub source: HarnessSource, +} + +impl SessionEnvelopeV1 { + fn is_valid_v1(&self) -> bool { + self.envelope_version == SESSION_ENVELOPE_VERSION_V1 + && !self.scope.harness_session_id.is_empty() + } + + /// Parse a decrypted DM body as a v1 session envelope. Returns `None` for + /// any non-envelope payload (a plain DM) so callers route it to Master. + pub fn parse(body: &str) -> Option { + let envelope: Self = serde_json::from_str(body).ok()?; + envelope.is_valid_v1().then_some(envelope) + } +} + +/// Which pinned/session window a persisted message belongs to. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum ChatKind { + Master, + Subconscious, + Session, +} + +impl ChatKind { + pub fn as_str(&self) -> &'static str { + match self { + ChatKind::Master => "master", + ChatKind::Subconscious => "subconscious", + ChatKind::Session => "session", + } + } +} + +/// Durable per-session record. `session_id` is the harness session id for +/// [`ChatKind::Session`], or the literal `"master"` for a peer's Master window. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct OrchestrationSession { + pub session_id: String, + pub agent_id: String, + pub source: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub label: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub workspace: Option, + pub last_seq: i64, + pub created_at: String, + pub last_message_at: String, +} + +/// A single persisted message. `body` is DECRYPTED plaintext and therefore +/// workspace-internal only. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct OrchestrationMessage { + pub id: String, + pub agent_id: String, + pub session_id: String, + pub chat_kind: ChatKind, + pub role: String, + pub body: String, + pub timestamp: String, + pub seq: i64, +} + +#[cfg(test)] +mod tests { + use super::*; + + const SAMPLE: &str = r#"{ + "envelope_version": "tinyplace.harness.session.v1", + "version": 1, + "scope": { "type": "session", "key": "repo", "cwd": "/w", + "wrapper_session_id": "w1", "harness_session_id": "h1" }, + "harness": { "provider": "claude", "command": "claude", "argv": [] }, + "message": { "id": "m1", "line": 2, "role": "agent", "text": "hi", + "timestamp": "2026-07-02T00:00:00Z" }, + "source": { "path": "p", "record_type": "assistant" } + }"#; + + #[test] + fn parses_valid_v1_envelope() { + let env = SessionEnvelopeV1::parse(SAMPLE).expect("valid v1"); + assert_eq!(env.scope.harness_session_id, "h1"); + assert_eq!(env.message.role, "agent"); + assert_eq!(env.harness.provider, "claude"); + } + + #[test] + fn rejects_non_envelope_and_bad_version() { + assert!(SessionEnvelopeV1::parse("a plain message").is_none()); + assert!(SessionEnvelopeV1::parse( + r#"{"envelope_version":"x","scope":{"harness_session_id":"h"}}"# + ) + .is_none()); + assert!(SessionEnvelopeV1::parse( + r#"{"envelope_version":"tinyplace.harness.session.v1","scope":{"harness_session_id":""}}"# + ) + .is_none()); + } +} diff --git a/src/openhuman/tinyplace/manifest.rs b/src/openhuman/tinyplace/manifest.rs index 259ca3f46d..f4c0d98179 100644 --- a/src/openhuman/tinyplace/manifest.rs +++ b/src/openhuman/tinyplace/manifest.rs @@ -3351,6 +3351,75 @@ pub(crate) fn handle_tinyplace_signal_send_message(params: Map) - }) } +/// Decrypt a single inbound Signal DM envelope to its plaintext body. +/// +/// Shared by the `signal_decrypt_message` controller and the orchestration +/// ingest path. NOTE: `SignalSession::decrypt` advances the double ratchet and +/// consumes one-time pre-keys — it is NOT idempotent. Callers that may re-see +/// the same envelope must dedupe by message id BEFORE calling this. +pub(crate) async fn decrypt_envelope( + envelope: &tinyplace::types::MessageEnvelope, +) -> Result { + log::debug!( + "{LOG_PREFIX} decrypt_envelope from={} type={} id={}", + envelope.from, + envelope.envelope_type, + envelope.id + ); + + // Obtain our identity public key and an Arc-wrapped store for SignalSession. + let store = crate::openhuman::tinyplace::signal_store::global_signal_store_arc().await?; + let client = global_state().client().await?; + let our_identity_pub = store + .identity_x25519_key_pair() + .await + .map_err(|e| format!("identity key: {e}"))? + .public_key; + + let sender = envelope.from.clone(); + + // Fetch sender's published key bundle to obtain their X25519 identity key. + // Ed25519 -> X25519 conversion via decode_identity_key — must be preserved. + let sender_bundle = client.keys.get_bundle(&sender).await.map_err(map_err)?; + let sender_x25519_identity = decode_identity_key(&sender_bundle.identity_key)?; + + // Decrypt via SDK SignalSession. + // + // SignalSession::decrypt handles both PREKEY_BUNDLE and CIPHERTEXT paths + // internally (via process_pre_key_message), including one-time pre-key + // consumption, x3dh_respond, ratchet_decrypt, and store_session. + let signal_session = SignalSession::new( + Arc::clone(&store) as Arc, + our_identity_pub, + ); + let plaintext_bytes = signal_session + .decrypt(&sender, &sender_x25519_identity, envelope) + .await + .map_err(|e| format!("decryption failed: {e}"))?; + + let plaintext = String::from_utf8(plaintext_bytes) + .map_err(|e| format!("plaintext is not valid UTF-8: {e}"))?; + log::info!( + "{LOG_PREFIX} decrypt_envelope decrypted from={sender} id={} len={}", + envelope.id, + plaintext.len() + ); + Ok(plaintext) +} + +/// Acknowledge (delete) a delivered message from the relay mailbox. Shared by +/// the `messages_acknowledge` controller and orchestration ingest. +pub(crate) async fn acknowledge_message(message_id: &str) -> Result<(), String> { + let client = global_state().client().await?; + let signer = require_signer(client)?; + client + .messages + .acknowledge(message_id, &signer.agent_id()) + .await + .map_err(map_err)?; + Ok(()) +} + pub(crate) fn handle_tinyplace_signal_decrypt_message( params: Map, ) -> ControllerFuture { @@ -3361,50 +3430,7 @@ pub(crate) fn handle_tinyplace_signal_decrypt_message( let envelope: tinyplace::types::MessageEnvelope = serde_json::from_value(envelope_val.clone()) .map_err(|e| format!("invalid envelope: {e}"))?; - log::debug!( - "{LOG_PREFIX} signal_decrypt_message from={} type={} id={}", - envelope.from, - envelope.envelope_type, - envelope.id - ); - - // Obtain our identity public key and an Arc-wrapped store for SignalSession. - let store = crate::openhuman::tinyplace::signal_store::global_signal_store_arc().await?; - let client = global_state().client().await?; - let our_identity_pub = store - .identity_x25519_key_pair() - .await - .map_err(|e| format!("identity key: {e}"))? - .public_key; - - let sender = envelope.from.clone(); - - // Fetch sender's published key bundle to obtain their X25519 identity key. - // Ed25519 -> X25519 conversion via decode_identity_key — must be preserved. - let sender_bundle = client.keys.get_bundle(&sender).await.map_err(map_err)?; - let sender_x25519_identity = decode_identity_key(&sender_bundle.identity_key)?; - - // Decrypt via SDK SignalSession. - // - // SignalSession::decrypt handles both PREKEY_BUNDLE and CIPHERTEXT paths - // internally (via process_pre_key_message), including one-time pre-key - // consumption, x3dh_respond, ratchet_decrypt, and store_session. - let signal_session = SignalSession::new( - Arc::clone(&store) as Arc, - our_identity_pub, - ); - let plaintext_bytes = signal_session - .decrypt(&sender, &sender_x25519_identity, &envelope) - .await - .map_err(|e| format!("decryption failed: {e}"))?; - - let plaintext = String::from_utf8(plaintext_bytes) - .map_err(|e| format!("plaintext is not valid UTF-8: {e}"))?; - log::info!( - "{LOG_PREFIX} signal_decrypt_message decrypted from={sender} id={} len={}", - envelope.id, - plaintext.len() - ); + let plaintext = decrypt_envelope(&envelope).await?; to_value(serde_json::json!({ "plaintext": plaintext, "from": envelope.from, @@ -3451,13 +3477,7 @@ pub(crate) fn handle_tinyplace_messages_acknowledge( Box::pin(async move { let message_id = req_str(¶ms, "messageId")?.to_string(); log::debug!("{LOG_PREFIX} messages_acknowledge id={message_id}"); - let client = global_state().client().await?; - let signer = require_signer(client)?; - client - .messages - .acknowledge(&message_id, &signer.agent_id()) - .await - .map_err(map_err)?; + acknowledge_message(&message_id).await?; to_value(serde_json::json!({ "ok": true })) }) } diff --git a/src/openhuman/tinyplace/mod.rs b/src/openhuman/tinyplace/mod.rs index 983d02641f..e06df159d8 100644 --- a/src/openhuman/tinyplace/mod.rs +++ b/src/openhuman/tinyplace/mod.rs @@ -28,6 +28,7 @@ pub(crate) mod agent; mod agent_tools; mod manifest; +pub(crate) use manifest::{acknowledge_message, decrypt_envelope}; pub(crate) mod ops; mod payment; mod schemas; From 9eb11e0efb0d7ea0025dc1405be62bd0b68ab019 Mon Sep 17 00:00:00 2001 From: sanil-23 Date: Thu, 2 Jul 2026 20:24:10 +0530 Subject: [PATCH 2/4] test(orchestration): make ingest classify/persist pure + unit-tested MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Factor the classification and persistence out of ingest_one into pure, side-effect-free helpers (classify_message, persist_message) so the ingest logic is covered without a live Signal client — ingest_one is now thin decrypt/ack/publish glue. Adds tests for harness→Session vs plain-DM→Master classification and idempotent per-session persistence. --- src/openhuman/orchestration/ingest.rs | 232 ++++++++++++++++++-------- 1 file changed, 158 insertions(+), 74 deletions(-) diff --git a/src/openhuman/orchestration/ingest.rs b/src/openhuman/orchestration/ingest.rs index d514ce9392..7c48a27202 100644 --- a/src/openhuman/orchestration/ingest.rs +++ b/src/openhuman/orchestration/ingest.rs @@ -4,6 +4,8 @@ //! websocket recv loop), filtered to conversation/DM streams. Never logs message //! bodies or seeds. +use std::path::Path; + use crate::core::event_bus::{publish_global, DomainEvent}; use crate::openhuman::config::Config; use crate::openhuman::tinyplace::{acknowledge_message, decrypt_envelope}; @@ -13,6 +15,21 @@ use super::types::{ChatKind, OrchestrationMessage, OrchestrationSession, Session const LOG: &str = "orchestration"; +/// A decrypted DM turned into the fields we persist. Pure result of +/// [`classify_message`] — no IO, so it is unit-testable without a live client. +#[derive(Debug, Clone, PartialEq, Eq)] +struct ClassifiedMessage { + chat_kind: ChatKind, + session_id: String, + role: String, + source: String, + label: Option, + workspace: Option, + seq: i64, + body: String, + timestamp: String, +} + /// True for streams that carry ciphertext DM envelopes worth ingesting. fn is_dm_stream(kind: &str, stream_id: &str) -> bool { kind.eq_ignore_ascii_case("conversation") @@ -20,6 +37,84 @@ fn is_dm_stream(kind: &str, stream_id: &str) -> bool { || stream_id.starts_with("conversation:") } +/// Classify a decrypted DM: a harness envelope becomes a per-session message, +/// anything else becomes a message in the peer's Master window. Pure. +fn classify_message(plaintext: String, fallback_timestamp: &str) -> ClassifiedMessage { + match SessionEnvelopeV1::parse(&plaintext) { + Some(env) => { + let label = (env.scope.scope_type == "folder").then(|| env.scope.key.clone()); + let workspace = (!env.scope.cwd.is_empty()).then(|| env.scope.cwd.clone()); + let timestamp = if env.message.timestamp.is_empty() { + fallback_timestamp.to_string() + } else { + env.message.timestamp + }; + ClassifiedMessage { + chat_kind: ChatKind::Session, + session_id: env.scope.harness_session_id, + role: env.message.role, + source: env.harness.provider, + label, + workspace, + seq: env.message.line, + body: env.message.text, + timestamp, + } + } + None => ClassifiedMessage { + chat_kind: ChatKind::Master, + session_id: "master".to_string(), + role: "user".to_string(), + source: String::new(), + label: None, + workspace: None, + seq: 0, + body: plaintext, + timestamp: fallback_timestamp.to_string(), + }, + } +} + +/// Persist a classified message + its session row. Idempotent by `msg_id`; +/// returns true if a new message row landed. Testable with a tempdir DB. +fn persist_message( + workspace_dir: &Path, + msg_id: &str, + agent_id: &str, + classified: &ClassifiedMessage, + now: &str, +) -> Result { + store::with_connection(workspace_dir, |c| { + store::upsert_session( + c, + &OrchestrationSession { + session_id: classified.session_id.clone(), + agent_id: agent_id.to_string(), + source: classified.source.clone(), + label: classified.label.clone(), + workspace: classified.workspace.clone(), + last_seq: classified.seq, + created_at: now.to_string(), + last_message_at: classified.timestamp.clone(), + }, + )?; + store::insert_message( + c, + &OrchestrationMessage { + id: msg_id.to_string(), + agent_id: agent_id.to_string(), + session_id: classified.session_id.clone(), + chat_kind: classified.chat_kind, + role: classified.role.clone(), + body: classified.body.clone(), + timestamp: classified.timestamp.clone(), + seq: classified.seq, + }, + ) + }) + .map_err(|e| format!("persist: {e}")) +} + /// Entry point from the bus subscriber. Cheap no-op when orchestration is /// disabled or the stream is not a DM stream. pub async fn ingest_stream_message( @@ -63,88 +158,21 @@ async fn ingest_one( return Ok(()); } - // 2. Decrypt exactly once. + // 2. Decrypt exactly once, then classify + persist. let plaintext = decrypt_envelope(&envelope).await?; - - // 3. Classify: harness envelope → Session, else the peer's Master window. - let (chat_kind, session_id, role, source, label, workspace, seq, body, ts) = - match SessionEnvelopeV1::parse(&plaintext) { - Some(env) => { - let label = (env.scope.scope_type == "folder").then(|| env.scope.key.clone()); - let workspace = (!env.scope.cwd.is_empty()).then(|| env.scope.cwd.clone()); - ( - ChatKind::Session, - env.scope.harness_session_id, - env.message.role, - env.harness.provider, - label, - workspace, - env.message.line, - env.message.text, - if env.message.timestamp.is_empty() { - envelope.timestamp.clone() - } else { - env.message.timestamp - }, - ) - } - None => ( - ChatKind::Master, - "master".to_string(), - "user".to_string(), - String::new(), - None, - None, - 0, - plaintext, - envelope.timestamp.clone(), - ), - }; - - // 4. Persist (idempotent). + let classified = classify_message(plaintext, &envelope.timestamp); let now = chrono::Utc::now().to_rfc3339(); - let session_id_for_event = session_id.clone(); - let agent_id_for_event = agent_id.clone(); - let chat_kind_str = chat_kind.as_str().to_string(); - let landed = store::with_connection(&workspace_dir, |c| { - store::upsert_session( - c, - &OrchestrationSession { - session_id: session_id.clone(), - agent_id: agent_id.clone(), - source, - label, - workspace, - last_seq: seq, - created_at: now.clone(), - last_message_at: ts.clone(), - }, - )?; - store::insert_message( - c, - &OrchestrationMessage { - id: msg_id.clone(), - agent_id: agent_id.clone(), - session_id: session_id.clone(), - chat_kind, - role, - body, - timestamp: ts.clone(), - seq, - }, - ) - }) - .map_err(|e| format!("persist: {e}"))?; + let landed = persist_message(&workspace_dir, &msg_id, &agent_id, &classified, &now)?; - // 5. Acknowledge (consume once) + fan out for stages 4/7. + // 3. Acknowledge (consume once) + fan out for stages 4/7. if landed { if let Err(e) = acknowledge_message(&msg_id).await { log::warn!(target: LOG, "[orchestration] ingest.ack_failed id={msg_id}: {e}"); } publish_global(DomainEvent::OrchestrationSessionMessage { - agent_id: agent_id_for_event, - session_id: session_id_for_event, - chat_kind: chat_kind_str, + agent_id, + session_id: classified.session_id, + chat_kind: classified.chat_kind.as_str().to_string(), }); } log::debug!(target: LOG, "[orchestration] ingest.exit id={msg_id} landed={landed}"); @@ -155,6 +183,17 @@ async fn ingest_one( mod tests { use super::*; + const ENVELOPE: &str = r#"{ + "envelope_version": "tinyplace.harness.session.v1", + "version": 1, + "scope": { "type": "folder", "key": "my-repo", "cwd": "/w", + "wrapper_session_id": "w1", "harness_session_id": "h1" }, + "harness": { "provider": "codex", "command": "codex", "argv": [] }, + "message": { "id": "m1", "line": 7, "role": "user", "text": "hello", + "timestamp": "2026-07-02T01:00:00Z" }, + "source": { "path": "p", "record_type": "user" } + }"#; + #[test] fn dm_stream_filter() { assert!(is_dm_stream("conversation", "conversation:abc")); @@ -162,4 +201,49 @@ mod tests { assert!(is_dm_stream("other", "conversation:abc")); assert!(!is_dm_stream("inbox", "inbox")); } + + #[test] + fn classifies_harness_envelope_as_session() { + let c = classify_message(ENVELOPE.to_string(), "2026-07-02T09:00:00Z"); + assert_eq!(c.chat_kind, ChatKind::Session); + assert_eq!(c.session_id, "h1"); + assert_eq!(c.role, "user"); + assert_eq!(c.source, "codex"); + assert_eq!(c.label.as_deref(), Some("my-repo")); // folder scope → label + assert_eq!(c.workspace.as_deref(), Some("/w")); + assert_eq!(c.seq, 7); + assert_eq!(c.body, "hello"); + assert_eq!(c.timestamp, "2026-07-02T01:00:00Z"); // envelope ts wins + } + + #[test] + fn classifies_plain_dm_as_master_with_fallback_timestamp() { + let c = classify_message("just chatting".to_string(), "2026-07-02T09:00:00Z"); + assert_eq!(c.chat_kind, ChatKind::Master); + assert_eq!(c.session_id, "master"); + assert_eq!(c.role, "user"); + assert!(c.label.is_none()); + assert_eq!(c.seq, 0); + assert_eq!(c.body, "just chatting"); + assert_eq!(c.timestamp, "2026-07-02T09:00:00Z"); // fallback used + } + + #[test] + fn persist_message_is_idempotent_and_buckets_by_session() { + let tmp = tempfile::tempdir().unwrap(); + let session = classify_message(ENVELOPE.to_string(), "2026-07-02T09:00:00Z"); + let master = classify_message("hi".to_string(), "2026-07-02T09:00:00Z"); + + assert!(persist_message(tmp.path(), "m1", "@peer", &session, "now").unwrap()); + // Replay of the same relay id does not double-insert. + assert!(!persist_message(tmp.path(), "m1", "@peer", &session, "now").unwrap()); + assert!(persist_message(tmp.path(), "m2", "@peer", &master, "now").unwrap()); + + store::with_connection(tmp.path(), |c| { + assert_eq!(store::count_messages(c, "@peer", "h1")?, 1); + assert_eq!(store::count_messages(c, "@peer", "master")?, 1); + Ok(()) + }) + .unwrap(); + } } From cd5a0f96eda57b38cf04ece62f657b27f203350f Mon Sep 17 00:00:00 2001 From: sanil-23 Date: Thu, 2 Jul 2026 20:34:33 +0530 Subject: [PATCH 3/4] fix(orchestration): gate ingest to paired senders + retry ack on dedupe MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses review (CodeRabbit + Codex) on ingest.rs: - Sender gate before decrypt: only ingest DMs from linked pairing agents (wrapped sessions). Decrypting advances the Signal ratchet, so an ordinary human DM must never be decrypted/consumed here — it stays readable by the existing Messaging UI (messages.list / signal.decryptMessage). Gating only the ack is insufficient because decrypt itself consumes the message; the gate must run before decrypt, keyed on sender. Adds local-only pairing::linked_agent_ids (fail-closed on read error). - Retry acknowledge on the dedupe path: if a prior run persisted but crashed or the relay ack failed, redelivery now re-acks (best-effort) so the relay copy is consumed; never re-decrypts or re-publishes. --- src/openhuman/agent_orchestration/pairing.rs | 19 +++++++++++++++++++ src/openhuman/orchestration/ingest.rs | 20 +++++++++++++++++++- 2 files changed, 38 insertions(+), 1 deletion(-) diff --git a/src/openhuman/agent_orchestration/pairing.rs b/src/openhuman/agent_orchestration/pairing.rs index 40264338d3..e2d22ae8b9 100644 --- a/src/openhuman/agent_orchestration/pairing.rs +++ b/src/openhuman/agent_orchestration/pairing.rs @@ -313,6 +313,25 @@ async fn load_store(workspace_dir: &Path) -> Result { } } +/// Agent ids that hold an accepted (linked) local pairing record. Local-only +/// read (no network) — used to gate which DM senders the orchestration layer is +/// allowed to decrypt/ingest, so ordinary human DMs are never consumed. Returns +/// an empty set on any read error (fail-closed: nothing is ingested). +pub(crate) async fn linked_agent_ids(workspace_dir: &Path) -> std::collections::HashSet { + match load_store(workspace_dir).await { + Ok(store) => store + .records + .into_iter() + .filter(|record| record.status == PairingStatus::Linked) + .map(|record| record.agent_id) + .collect(), + Err(e) => { + log::warn!(target: LOG_TARGET, "[orchestration_pairing] linked_agent_ids read failed: {e}"); + std::collections::HashSet::new() + } + } +} + async fn save_store(workspace_dir: &Path, store: &PairingStore) -> Result<(), String> { let path = store_path(workspace_dir); let parent = path diff --git a/src/openhuman/orchestration/ingest.rs b/src/openhuman/orchestration/ingest.rs index 7c48a27202..f475ff03c2 100644 --- a/src/openhuman/orchestration/ingest.rs +++ b/src/openhuman/orchestration/ingest.rs @@ -148,13 +148,31 @@ async fn ingest_one( let msg_id = envelope.id.clone(); let agent_id = envelope.from.clone(); log::debug!(target: LOG, "[orchestration] ingest.entry id={msg_id} from={agent_id}"); + let workspace_dir = config.workspace_dir.clone(); + + // 0. Sender gate: only ingest DMs from linked (accepted) pairing agents — + // i.e. wrapped Codex/Claude sessions. Decrypting advances the Signal + // ratchet, so an unpaired sender's DM (an ordinary human message) must + // never be decrypted or consumed here; it stays readable by the existing + // Messaging UI via messages.list / signal.decryptMessage. + let linked = + crate::openhuman::agent_orchestration::pairing::linked_agent_ids(&workspace_dir).await; + if !linked.contains(&agent_id) { + log::debug!(target: LOG, "[orchestration] ingest.skip_unpaired from={agent_id}"); + return Ok(()); + } // 1. Dedupe BEFORE decrypt — protects the non-idempotent Signal ratchet. - let workspace_dir = config.workspace_dir.clone(); let already = store::with_connection(&workspace_dir, |c| store::message_exists(c, &msg_id)) .map_err(|e| format!("store lookup: {e}"))?; if already { + // The row already exists but a prior run may have crashed (or the relay + // ack failed) after persist. Retry the ack so the relay copy is + // consumed; never re-decrypt or re-publish. log::debug!(target: LOG, "[orchestration] ingest.dedupe id={msg_id}"); + if let Err(e) = acknowledge_message(&msg_id).await { + log::warn!(target: LOG, "[orchestration] ingest.ack_failed_dedupe id={msg_id}: {e}"); + } return Ok(()); } From d9bb2136e54e3b79bd3fa197406dd9c52510830b Mon Sep 17 00:00:00 2001 From: sanil-23 Date: Thu, 2 Jul 2026 21:55:25 +0530 Subject: [PATCH 4/4] =?UTF-8?q?ci:=20re-trigger=20PR=20CI=20(Rust=20Core?= =?UTF-8?q?=20Coverage=20hit=20the=2055m=20job=20timeout=20=E2=80=94=20inf?= =?UTF-8?q?ra,=20not=20code;=20prior=20run=2028600421430)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit