Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/core/event_bus/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,15 @@ pub enum DomainEvent {
source: String,
},

/// A tiny.place harness session DM was ingested and persisted. Metadata only
/// — bodies stay in the workspace-internal orchestration store. Consumed by
/// later stages (graph run, UI socket push).
OrchestrationSessionMessage {
agent_id: String,
session_id: String,
chat_kind: String,
},

// ── Subconscious orchestrator ───────────────────────────────────────
/// A subconscious trigger finished gate evaluation (promote or drop).
/// Observability only — lets dashboards see ingestion volume and the
Expand Down Expand Up @@ -1226,6 +1235,7 @@ impl DomainEvent {
| Self::AgentOrchestrationFailed { .. }
| Self::AgentOrchestrationClosed { .. }
| Self::OrchestrationPairingChanged { .. }
| Self::OrchestrationSessionMessage { .. }
| Self::RunQueueMessageQueued { .. }
| Self::RunQueueMessageDelivered { .. }
| Self::RunQueueFollowupDispatched { .. }
Expand Down Expand Up @@ -1383,6 +1393,7 @@ impl DomainEvent {
Self::AgentOrchestrationFailed { .. } => "AgentOrchestrationFailed",
Self::AgentOrchestrationClosed { .. } => "AgentOrchestrationClosed",
Self::OrchestrationPairingChanged { .. } => "OrchestrationPairingChanged",
Self::OrchestrationSessionMessage { .. } => "OrchestrationSessionMessage",
Self::SubconsciousTriggerProcessed { .. } => "SubconsciousTriggerProcessed",
Self::RunQueueMessageQueued { .. } => "RunQueueMessageQueued",
Self::RunQueueMessageDelivered { .. } => "RunQueueMessageDelivered",
Expand Down
2 changes: 2 additions & 0 deletions src/core/jsonrpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2242,6 +2242,8 @@ fn register_domain_subscribers(
// only walks Composio connections, so without this they only sync
// on manual "Sync now" and silently go stale.
crate::openhuman::memory_sync::workspace::start_workspace_periodic_sync();
// Orchestration: ingest tiny.place harness session DMs off the stream bus.
crate::openhuman::orchestration::register_orchestration_ingest_subscriber();
// Task-sources proactive ingestion: connection-created hook + poll.
crate::openhuman::task_sources::bus::register_task_sources_subscriber();
crate::openhuman::task_sources::start_periodic_poll();
Expand Down
19 changes: 19 additions & 0 deletions src/openhuman/agent_orchestration/pairing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,25 @@ async fn load_store(workspace_dir: &Path) -> Result<PairingStore, String> {
}
}

/// Agent ids that hold an accepted (linked) local pairing record. Local-only
/// read (no network) — used to gate which DM senders the orchestration layer is
/// allowed to decrypt/ingest, so ordinary human DMs are never consumed. Returns
/// an empty set on any read error (fail-closed: nothing is ingested).
pub(crate) async fn linked_agent_ids(workspace_dir: &Path) -> std::collections::HashSet<String> {
match load_store(workspace_dir).await {
Ok(store) => store
.records
.into_iter()
.filter(|record| record.status == PairingStatus::Linked)
.map(|record| record.agent_id)
.collect(),
Err(e) => {
log::warn!(target: LOG_TARGET, "[orchestration_pairing] linked_agent_ids read failed: {e}");
std::collections::HashSet::new()
}
}
}

async fn save_store(workspace_dir: &Path, store: &PairingStore) -> Result<(), String> {
let path = store_path(workspace_dir);
let parent = path
Expand Down
2 changes: 2 additions & 0 deletions src/openhuman/config/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ mod local_ai;
mod meet;
mod node;
mod observability;
mod orchestration;
mod proxy;
mod routes;
mod runtime;
Expand Down Expand Up @@ -69,6 +70,7 @@ pub use local_ai::{LocalAiConfig, LocalAiUsage};
pub use meet::{AutoJoinPolicy, AutoSummarizePolicy, CalendarProvider, MeetConfig};
pub use node::NodeConfig;
pub use observability::{AgentTracingBackend, AgentTracingConfig, ObservabilityConfig};
pub use orchestration::OrchestrationConfig;
pub use proxy::{
apply_runtime_proxy_to_builder, build_runtime_proxy_client,
build_runtime_proxy_client_with_timeouts, runtime_proxy_config, set_runtime_proxy_config,
Expand Down
28 changes: 28 additions & 0 deletions src/openhuman/config/schema/orchestration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
//! Orchestration configuration — controls the tiny.place harness session
//! ingest layer.
//!
//! Consumed by [`crate::openhuman::orchestration`].

use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

fn default_enabled() -> bool {
true
}

#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(default)]
pub struct OrchestrationConfig {
/// Ingest inbound tiny.place harness session DMs into the orchestration
/// store. Default: `true`.
#[serde(default = "default_enabled")]
pub enabled: bool,
}

impl Default for OrchestrationConfig {
fn default() -> Self {
Self {
enabled: default_enabled(),
}
}
}
6 changes: 6 additions & 0 deletions src/openhuman/config/schema/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,11 @@ pub struct Config {
#[serde(default)]
pub scheduler_gate: SchedulerGateConfig,

/// tiny.place harness session-DM ingest layer. See
/// [`crate::openhuman::orchestration`].
#[serde(default)]
pub orchestration: OrchestrationConfig,

/// User-facing activity-level knob (0–4) controlling how proactive
/// background AI work is. Maps into scheduler_gate mode, periodic sync
/// cadence, heartbeat/subconscious toggles. See issue #3117.
Expand Down Expand Up @@ -742,6 +747,7 @@ impl Default for Config {
reliability: ReliabilityConfig::default(),
scheduler: SchedulerConfig::default(),
scheduler_gate: SchedulerGateConfig::default(),
orchestration: OrchestrationConfig::default(),
agent_activity_level: AgentActivityLevel::default(),
memory_sync_interval_secs: None,
agent: AgentConfig::default(),
Expand Down
1 change: 1 addition & 0 deletions src/openhuman/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ pub mod migrations;
pub mod model_council;
pub mod monitor;
pub mod notifications;
pub mod orchestration;
pub mod overlay;
pub mod people;
pub mod plan_review;
Expand Down
59 changes: 59 additions & 0 deletions src/openhuman/orchestration/bus.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
//! Event-bus subscriber that drives orchestration ingest off inbound tiny.place
//! DM stream messages.

use std::sync::{Arc, OnceLock};

use async_trait::async_trait;

use crate::core::event_bus::{subscribe_global, DomainEvent, EventHandler, SubscriptionHandle};

static INGEST_HANDLE: OnceLock<SubscriptionHandle> = OnceLock::new();

/// Register the orchestration ingest subscriber on the global event bus.
pub fn register_orchestration_ingest_subscriber() {
if INGEST_HANDLE.get().is_some() {
return;
}
match subscribe_global(Arc::new(OrchestrationIngestSubscriber)) {
Some(handle) => {
let _ = INGEST_HANDLE.set(handle);
}
None => {
log::warn!(
"[orchestration] failed to register ingest subscriber — bus not initialized"
);
}
}
}

pub struct OrchestrationIngestSubscriber;

#[async_trait]
impl EventHandler for OrchestrationIngestSubscriber {
fn name(&self) -> &str {
"orchestration::ingest"
}

fn domains(&self) -> Option<&[&str]> {
Some(&["tinyplace"])
}

async fn handle(&self, event: &DomainEvent) {
let DomainEvent::TinyPlaceStreamMessage {
stream_id,
kind,
message,
} = event
else {
return;
};
let config = match crate::openhuman::config::Config::load_or_init().await {
Ok(config) => config,
Err(e) => {
log::warn!("[orchestration] ingest.config_load_failed: {e}");
return;
}
};
super::ingest::ingest_stream_message(&config, kind, stream_id, message).await;
}
}
Loading
Loading