From 6dd51941656fd2f021a18829b82a638524a21d45 Mon Sep 17 00:00:00 2001 From: M3gA-Mind Date: Thu, 2 Jul 2026 17:05:32 +0530 Subject: [PATCH 1/3] fix(credentials): re-resolve active-user workspace in long-lived subsystems after store_session (#4398) Long-lived subsystems started from a pre-login Config snapshot (workspace = users/local/) and never re-resolved after credentials::ops::store_session wrote active_user.toml, so they kept reading/writing under users/local/ until a process restart. Route each subsystem's workspace resolution through a re-bindable holder (mirroring memory::global::init) and re-point them all from store_session_inner, right after the existing memory/conversation rebind. - Cron scheduler: process-global ACTIVE_CONFIG holder; the poll loop re-resolves config + SecurityPolicy each tick, so due_jobs reads the cron store under users//. New scheduler::rebind(config). - Channel runtime: ChannelRuntimeContext.workspace_dir (baked Arc) is now a shared Arc> handle read via workspace_dir(); channels::rebind_workspace(path) swaps it. The Telegram busy-state subscriber and PROFILE.md writer share the same handle (fixes a stale-workspace event drop the base change would otherwise introduce). - Security sandbox: live_policy::set_workspace_dir rebuilds the policy via from_config so file-writing tools stay confined to the activated user's workspace. - Channel memory store: ctx.memory is now a swappable handle; channels::rebind_memory(config) rebuilds it (shared build_channel_memory helper with the #3712 keyword-only fallback) so conversation auto-save and memory-context retrieval land in the right workspace. Agent-definition registry re-resolution is deferred to a follow-up: it needs an invasive OnceLock -> RwLock refactor of AgentDefinitionRegistry::GLOBAL with ~8 &'static caller ripples, out of scope for this change. Tests: scheduler re-resolves to the activated workspace after rebind; workspace handle re-resolves + no-op when unregistered; live_policy::set_workspace_dir swaps workspace while preserving autonomy/action_dir; memory rebind no-op guard. Closes #4398 --- src/openhuman/channels/context.rs | 45 +++++- src/openhuman/channels/mod.rs | 2 + .../channels/providers/telegram/bus.rs | 31 +++- .../channels/providers/telegram/bus_tests.rs | 12 +- .../providers/telegram/remote_control.rs | 10 +- src/openhuman/channels/routes.rs | 2 +- src/openhuman/channels/routes_tests.rs | 4 +- .../channels/runtime/dispatch/processor.rs | 15 +- .../channels/runtime/memory_rebind.rs | 148 ++++++++++++++++++ src/openhuman/channels/runtime/mod.rs | 4 + src/openhuman/channels/runtime/startup.rs | 68 +++----- .../channels/runtime/test_support.rs | 6 +- src/openhuman/channels/runtime/workspace.rs | 122 +++++++++++++++ src/openhuman/channels/tests/context.rs | 4 +- .../channels/tests/discord_integration.rs | 4 +- src/openhuman/channels/tests/memory.rs | 6 +- .../channels/tests/runtime_dispatch.rs | 24 +-- .../channels/tests/runtime_tool_calls.rs | 24 +-- .../channels/tests/telegram_integration.rs | 4 +- src/openhuman/credentials/ops.rs | 28 ++++ src/openhuman/cron/scheduler.rs | 97 +++++++++++- src/openhuman/cron/scheduler_tests.rs | 49 +++++- src/openhuman/learning/profile_md_renderer.rs | 28 +++- src/openhuman/security/live_policy.rs | 111 +++++++++++++ 24 files changed, 719 insertions(+), 129 deletions(-) create mode 100644 src/openhuman/channels/runtime/memory_rebind.rs create mode 100644 src/openhuman/channels/runtime/workspace.rs diff --git a/src/openhuman/channels/context.rs b/src/openhuman/channels/context.rs index dc4cee0d4d..94c4b9cf8c 100644 --- a/src/openhuman/channels/context.rs +++ b/src/openhuman/channels/context.rs @@ -6,7 +6,7 @@ use crate::openhuman::tools::Tool; use crate::openhuman::util::truncate_with_ellipsis; use std::collections::HashMap; use std::path::PathBuf; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, RwLock}; /// Per-sender conversation history for channel messages. pub(crate) type ConversationHistoryMap = Arc>>>; @@ -47,7 +47,10 @@ pub(crate) struct ChannelRuntimeContext { pub(crate) channels_by_name: Arc>>, pub(crate) provider: Arc, pub(crate) default_provider: Arc, - pub(crate) memory: Arc, + /// Re-bindable channel memory store (issue #4398). Held behind a lock so a + /// post-login `channels::rebind_memory` swaps it for the activated user's + /// workspace store. Read via [`ChannelRuntimeContext::memory`]. + pub(crate) memory_handle: Arc>>, pub(crate) tools_registry: Arc>>, pub(crate) system_prompt: Arc, pub(crate) model: Arc, @@ -63,12 +66,42 @@ pub(crate) struct ChannelRuntimeContext { pub(crate) reliability: Arc, pub(crate) provider_runtime_options: crate::openhuman::inference::provider::ProviderRuntimeOptions, - pub(crate) workspace_dir: Arc, + /// Re-bindable workspace handle. Started from the pre-login `Config` + /// snapshot, then re-pointed at the activated user's workspace on login + /// via `channels::runtime::rebind_workspace` (issue #4398). Read through + /// [`ChannelRuntimeContext::workspace_dir`] so every turn re-resolves. + pub(crate) workspace_handle: Arc>, pub(crate) message_timeout_secs: u64, pub(crate) multimodal: crate::openhuman::config::MultimodalConfig, pub(crate) multimodal_files: crate::openhuman::config::MultimodalFileConfig, } +impl ChannelRuntimeContext { + /// Current workspace directory, re-resolved through the re-bindable handle. + /// + /// After a post-login `rebind_workspace`, this returns the activated + /// user's `users//` workspace instead of the pre-login + /// `users/local/` snapshot the runtime was started with (issue #4398). + pub(crate) fn workspace_dir(&self) -> PathBuf { + match self.workspace_handle.read() { + Ok(guard) => guard.clone(), + Err(poisoned) => poisoned.into_inner().clone(), + } + } + + /// Current channel memory store, re-resolved through the re-bindable handle. + /// + /// After a post-login `rebind_memory`, this returns a store rooted at the + /// activated user's workspace instead of the pre-login `users/local/` store + /// the runtime was started with (issue #4398). + pub(crate) fn memory(&self) -> Arc { + match self.memory_handle.read() { + Ok(guard) => Arc::clone(&guard), + Err(poisoned) => Arc::clone(&poisoned.into_inner()), + } + } +} + pub(crate) fn conversation_memory_key(msg: &super::traits::ChannelMessage) -> String { format!("{}_{}_{}", msg.channel, msg.sender, msg.id) } @@ -331,9 +364,9 @@ mod tests { channels_by_name: Arc::new(HashMap::new()), provider: Arc::new(DummyProvider), default_provider: Arc::new("default".into()), - memory: Arc::new(MockMemory { + memory_handle: Arc::new(RwLock::new(Arc::new(MockMemory { entries: Vec::new(), - }), + }))), tools_registry: Arc::new(vec![Box::new(DummyTool) as Box]), system_prompt: Arc::new("prompt".into()), model: Arc::new("model".into()), @@ -349,7 +382,7 @@ mod tests { reliability: Arc::new(crate::openhuman::config::ReliabilityConfig::default()), provider_runtime_options: crate::openhuman::inference::provider::ProviderRuntimeOptions::default(), - workspace_dir: Arc::new(PathBuf::from("/tmp")), + workspace_handle: Arc::new(RwLock::new(PathBuf::from("/tmp"))), message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS, multimodal: crate::openhuman::config::MultimodalConfig::default(), multimodal_files: crate::openhuman::config::MultimodalFileConfig::default(), diff --git a/src/openhuman/channels/mod.rs b/src/openhuman/channels/mod.rs index 6a2c2f1c09..687c3d391d 100644 --- a/src/openhuman/channels/mod.rs +++ b/src/openhuman/channels/mod.rs @@ -68,3 +68,5 @@ pub use controllers::{ChannelAuthMode, ChannelDefinition}; // old `channels::build_system_prompt` path. pub use crate::openhuman::context::channels_prompt::build_system_prompt; pub use runtime::start_channels; +pub use runtime::rebind_workspace; +pub use runtime::rebind_memory; diff --git a/src/openhuman/channels/providers/telegram/bus.rs b/src/openhuman/channels/providers/telegram/bus.rs index b58fb20280..21f10889e7 100644 --- a/src/openhuman/channels/providers/telegram/bus.rs +++ b/src/openhuman/channels/providers/telegram/bus.rs @@ -4,22 +4,37 @@ use crate::core::event_bus::{DomainEvent, EventHandler}; use crate::openhuman::channels::providers::telegram::session_store::with_store; use async_trait::async_trait; use std::path::PathBuf; +use std::sync::{Arc, RwLock}; const LOG_PREFIX: &str = "[telegram-remote]"; /// Tracks Telegram turn lifecycle via channel domain events and exposes busy /// state for `/status`. pub struct TelegramRemoteSubscriber { - workspace_dir: PathBuf, + /// Re-bindable workspace handle (issue #4398). Shared with the channel + /// runtime context so a post-login `rebind_workspace` re-points busy-state + /// persistence at the activated user's workspace. Read at event time so the + /// stale-workspace guard below compares against the CURRENT workspace — the + /// dispatch loop stamps events with `ctx.workspace_dir()`, which after a + /// re-bind is the new path; a baked snapshot here would drop every event. + workspace_handle: Arc>, } impl TelegramRemoteSubscriber { - pub fn new(workspace_dir: PathBuf) -> Self { - Self { workspace_dir } + pub fn new(workspace_handle: Arc>) -> Self { + Self { workspace_handle } + } + + /// Current workspace directory, re-resolved through the shared handle. + fn workspace_dir(&self) -> PathBuf { + match self.workspace_handle.read() { + Ok(guard) => guard.clone(), + Err(poisoned) => poisoned.into_inner().clone(), + } } async fn set_busy(&self, reply_target: &str, busy: bool) { - let workspace_dir = self.workspace_dir.clone(); + let workspace_dir = self.workspace_dir(); let reply_target_owned = reply_target.to_string(); let join_result = tokio::task::spawn_blocking(move || { with_store(&workspace_dir, |store| { @@ -59,12 +74,12 @@ impl EventHandler for TelegramRemoteSubscriber { workspace_dir, .. } if channel == "telegram" => { - if *workspace_dir != self.workspace_dir { + if *workspace_dir != self.workspace_dir() { tracing::debug!( "{LOG_PREFIX} dropping stale-workspace ChannelMessageReceived \ event_ws={} self_ws={}", workspace_dir.display(), - self.workspace_dir.display() + self.workspace_dir().display() ); return; } @@ -79,12 +94,12 @@ impl EventHandler for TelegramRemoteSubscriber { workspace_dir, .. } if channel == "telegram" => { - if *workspace_dir != self.workspace_dir { + if *workspace_dir != self.workspace_dir() { tracing::debug!( "{LOG_PREFIX} dropping stale-workspace ChannelMessageProcessed \ event_ws={} self_ws={}", workspace_dir.display(), - self.workspace_dir.display() + self.workspace_dir().display() ); return; } diff --git a/src/openhuman/channels/providers/telegram/bus_tests.rs b/src/openhuman/channels/providers/telegram/bus_tests.rs index c171ec8f82..5da3824976 100644 --- a/src/openhuman/channels/providers/telegram/bus_tests.rs +++ b/src/openhuman/channels/providers/telegram/bus_tests.rs @@ -5,7 +5,7 @@ use tempfile::tempdir; #[tokio::test] async fn subscriber_marks_busy_on_received_and_clears_on_processed() { let dir = tempdir().expect("tempdir"); - let subscriber = TelegramRemoteSubscriber::new(dir.path().to_path_buf()); + let subscriber = TelegramRemoteSubscriber::new(std::sync::Arc::new(std::sync::RwLock::new(dir.path().to_path_buf()))); assert_eq!(subscriber.name(), "telegram::remote_control"); assert_eq!(subscriber.domains(), Some(&["channel"][..])); @@ -50,7 +50,7 @@ async fn subscriber_marks_busy_on_received_and_clears_on_processed() { #[tokio::test] async fn subscriber_ignores_non_telegram_channel_events() { let dir = tempdir().expect("tempdir"); - let subscriber = TelegramRemoteSubscriber::new(dir.path().to_path_buf()); + let subscriber = TelegramRemoteSubscriber::new(std::sync::Arc::new(std::sync::RwLock::new(dir.path().to_path_buf()))); subscriber .handle(&DomainEvent::ChannelMessageReceived { @@ -75,7 +75,7 @@ async fn subscriber_ignores_non_telegram_channel_events() { #[tokio::test] async fn telegram_received_matching_workspace_sets_busy() { let dir = tempdir().expect("tempdir"); - let subscriber = TelegramRemoteSubscriber::new(dir.path().to_path_buf()); + let subscriber = TelegramRemoteSubscriber::new(std::sync::Arc::new(std::sync::RwLock::new(dir.path().to_path_buf()))); subscriber .handle(&DomainEvent::ChannelMessageReceived { @@ -99,7 +99,7 @@ async fn telegram_received_matching_workspace_sets_busy() { async fn telegram_received_stale_workspace_does_not_set_busy() { let dir = tempdir().expect("tempdir"); let stale = tempdir().expect("stale tempdir"); - let subscriber = TelegramRemoteSubscriber::new(dir.path().to_path_buf()); + let subscriber = TelegramRemoteSubscriber::new(std::sync::Arc::new(std::sync::RwLock::new(dir.path().to_path_buf()))); subscriber .handle(&DomainEvent::ChannelMessageReceived { @@ -122,7 +122,7 @@ async fn telegram_received_stale_workspace_does_not_set_busy() { #[tokio::test] async fn telegram_processed_matching_workspace_clears_busy() { let dir = tempdir().expect("tempdir"); - let subscriber = TelegramRemoteSubscriber::new(dir.path().to_path_buf()); + let subscriber = TelegramRemoteSubscriber::new(std::sync::Arc::new(std::sync::RwLock::new(dir.path().to_path_buf()))); // First mark as busy via a matching received event. subscriber @@ -169,7 +169,7 @@ async fn telegram_processed_matching_workspace_clears_busy() { async fn telegram_processed_stale_workspace_does_not_clear_busy() { let dir = tempdir().expect("tempdir"); let stale = tempdir().expect("stale tempdir"); - let subscriber = TelegramRemoteSubscriber::new(dir.path().to_path_buf()); + let subscriber = TelegramRemoteSubscriber::new(std::sync::Arc::new(std::sync::RwLock::new(dir.path().to_path_buf()))); // Mark as busy via a matching received event. subscriber diff --git a/src/openhuman/channels/providers/telegram/remote_control.rs b/src/openhuman/channels/providers/telegram/remote_control.rs index 93c35c232f..66c1a6ddb9 100644 --- a/src/openhuman/channels/providers/telegram/remote_control.rs +++ b/src/openhuman/channels/providers/telegram/remote_control.rs @@ -101,7 +101,7 @@ async fn build_status_response(ctx: &ChannelRuntimeContext, msg: &ChannelMessage .map(|h| h.len()) .unwrap_or(0); - let workspace = ctx.workspace_dir.clone(); + let workspace = ctx.workspace_dir(); let reply_target = msg.reply_target.clone(); // Use with_store_read (no disk write) and spawn_blocking to keep the async // executor thread unblocked during mutex acquisition + file I/O. @@ -154,7 +154,7 @@ async fn build_status_response(ctx: &ChannelRuntimeContext, msg: &ChannelMessage } async fn build_sessions_response(ctx: &ChannelRuntimeContext, msg: &ChannelMessage) -> String { - let workspace = ctx.workspace_dir.clone(); + let workspace = ctx.workspace_dir(); let reply_target = msg.reply_target.clone(); // Read-only lookup — use with_store_read (no save) wrapped in spawn_blocking. let active_thread_id = tokio::task::spawn_blocking(move || { @@ -166,7 +166,7 @@ async fn build_sessions_response(ctx: &ChannelRuntimeContext, msg: &ChannelMessa .ok() .and_then(|res| res.ok()) .flatten(); - let workspace = ctx.workspace_dir.as_path(); + let workspace = ctx.workspace_dir(); let threads = match conversations::list_threads(workspace.to_path_buf()) { Ok(list) => list, @@ -215,7 +215,7 @@ fn format_session_line(thread: &ConversationThread, active_id: Option<&str>) -> } async fn build_new_session_response(ctx: &ChannelRuntimeContext, msg: &ChannelMessage) -> String { - let workspace = ctx.workspace_dir.as_path(); + let workspace = ctx.workspace_dir(); let sender_key = conversation_history_key(msg); let thread_id = format!("thread-{}", uuid::Uuid::new_v4()); let now = chrono::Utc::now(); @@ -243,7 +243,7 @@ async fn build_new_session_response(ctx: &ChannelRuntimeContext, msg: &ChannelMe clear_sender_history(ctx, &sender_key); - let workspace_dir = ctx.workspace_dir.clone(); + let workspace_dir = ctx.workspace_dir(); let reply_target_owned = msg.reply_target.clone(); let thread_id_owned = thread_id.clone(); let sender_key_owned = sender_key.clone(); diff --git a/src/openhuman/channels/routes.rs b/src/openhuman/channels/routes.rs index e1d42eb2a8..77dae9c59d 100644 --- a/src/openhuman/channels/routes.rs +++ b/src/openhuman/channels/routes.rs @@ -317,7 +317,7 @@ pub(crate) async fn handle_runtime_command_if_needed( } } ChannelRuntimeCommand::ShowModel => { - build_models_help_response(¤t, ctx.workspace_dir.as_path()) + build_models_help_response(¤t, ctx.workspace_dir().as_path()) } ChannelRuntimeCommand::SetModel(raw_model) => { let model = raw_model.trim().trim_matches('`').to_string(); diff --git a/src/openhuman/channels/routes_tests.rs b/src/openhuman/channels/routes_tests.rs index 948fba5753..f11c28ef57 100644 --- a/src/openhuman/channels/routes_tests.rs +++ b/src/openhuman/channels/routes_tests.rs @@ -135,7 +135,7 @@ fn runtime_context(workspace_dir: PathBuf) -> ChannelRuntimeContext { channels_by_name: Arc::new(HashMap::new()), provider: Arc::new(DummyProvider), default_provider: Arc::new("openai".into()), - memory: Arc::new(DummyMemory), + memory_handle: Arc::new(std::sync::RwLock::new(Arc::new(DummyMemory))), tools_registry: Arc::new(vec![Box::new(DummyTool) as Box]), system_prompt: Arc::new("prompt".into()), model: Arc::new("reasoning-v1".into()), @@ -151,7 +151,7 @@ fn runtime_context(workspace_dir: PathBuf) -> ChannelRuntimeContext { reliability: Arc::new(crate::openhuman::config::ReliabilityConfig::default()), provider_runtime_options: crate::openhuman::inference::provider::ProviderRuntimeOptions::default(), - workspace_dir: Arc::new(workspace_dir), + workspace_handle: Arc::new(std::sync::RwLock::new(workspace_dir)), message_timeout_secs: 60, multimodal: crate::openhuman::config::MultimodalConfig::default(), multimodal_files: crate::openhuman::config::MultimodalFileConfig::default(), diff --git a/src/openhuman/channels/runtime/dispatch/processor.rs b/src/openhuman/channels/runtime/dispatch/processor.rs index 9648cd8260..8742fbe805 100644 --- a/src/openhuman/channels/runtime/dispatch/processor.rs +++ b/src/openhuman/channels/runtime/dispatch/processor.rs @@ -129,7 +129,7 @@ pub(crate) async fn process_channel_message( reply_target: msg.reply_target.clone(), content: msg.content.clone(), thread_ts: msg.thread_ts.clone(), - workspace_dir: ctx.workspace_dir.as_ref().clone(), + workspace_dir: ctx.workspace_dir(), }); let target_channel = ctx.channels_by_name.get(&msg.channel).cloned(); @@ -214,13 +214,16 @@ pub(crate) async fn process_channel_message( } }; + // Re-resolve the memory store through the re-bindable handle so a + // post-login active-user switch routes channel memory to the activated + // user's workspace (issue #4398). + let memory = ctx.memory(); let memory_context = - build_memory_context(ctx.memory.as_ref(), &msg.content, ctx.min_relevance_score).await; + build_memory_context(memory.as_ref(), &msg.content, ctx.min_relevance_score).await; if ctx.auto_save_memory { let autosave_key = conversation_memory_key(&msg); - let _ = ctx - .memory + let _ = memory .store( "", &autosave_key, @@ -619,7 +622,7 @@ pub(crate) async fn process_channel_message( model: route.model.clone(), elapsed_ms: started_at.elapsed().as_millis() as u64, success: false, - workspace_dir: ctx.workspace_dir.as_ref().clone(), + workspace_dir: ctx.workspace_dir(), }); return; } @@ -760,7 +763,7 @@ pub(crate) async fn process_channel_message( model: response_model, elapsed_ms: started_at.elapsed().as_millis() as u64, success, - workspace_dir: ctx.workspace_dir.as_ref().clone(), + workspace_dir: ctx.workspace_dir(), }); } diff --git a/src/openhuman/channels/runtime/memory_rebind.rs b/src/openhuman/channels/runtime/memory_rebind.rs new file mode 100644 index 0000000000..d67c4cc698 --- /dev/null +++ b/src/openhuman/channels/runtime/memory_rebind.rs @@ -0,0 +1,148 @@ +//! Re-bindable channel-runtime memory store (issue #4398). +//! +//! The channel runtime builds its OWN local memory store at boot +//! (`create_memory_with_local_ai`) and hands it to the dispatch loop via +//! [`ChannelRuntimeContext`](crate::openhuman::channels::context::ChannelRuntimeContext). +//! Channel-level memory writes — conversation auto-save and memory-context +//! retrieval — go through that store (agent *tools* separately use the +//! already-rebound `memory::global` client). When the runtime starts pre-login, +//! that store is rooted at `~/.openhuman/users/local/`, so post-login channel +//! turns keep reading/writing the wrong workspace's memory DB until a restart. +//! +//! This module holds a process-global handle to the context's memory slot +//! (`Arc>>`) registered at boot, plus [`rebind_memory`] +//! which rebuilds the store for the activated user's workspace and swaps it in +//! place — the same shape as the workspace / live-policy re-bind seams. The +//! swap is race-safe: an in-flight turn keeps the old `Arc` it +//! already cloned; new turns pick up the new store. + +use std::sync::{Arc, OnceLock, RwLock}; + +use anyhow::Result; + +use crate::openhuman::config::Config; +use crate::openhuman::memory::Memory; +use crate::openhuman::memory_store; + +/// In-place-swappable memory store held by the live runtime context. +pub(crate) type MemoryHandle = Arc>>; + +type HandleSlot = RwLock>; + +static MEMORY_HANDLE: OnceLock = OnceLock::new(); + +fn handle_slot() -> &'static HandleSlot { + MEMORY_HANDLE.get_or_init(|| RwLock::new(None)) +} + +/// Build the channel-runtime memory store for `config`'s workspace. +/// +/// Mirrors the boot construction, including the keyword-only fallback (#3712): +/// if the configured embedder can't be built, degrade to +/// `embedding_provider = "none"` (NoopEmbedding) rather than failing, so the +/// channel runtime keeps working with reduced (keyword-only) memory. Shared by +/// `start_channels` (boot) and [`rebind_memory`] (post-login) so both paths +/// build the store identically. +pub(crate) fn build_channel_memory(config: &Config) -> Result> { + let local_embedding = config.workload_local_model("embeddings"); + let embedding_api_key = + crate::openhuman::embeddings::resolve_api_key(config, &config.memory.embedding_provider); + match memory_store::create_memory_with_local_ai( + &config.memory, + local_embedding.as_deref(), + &embedding_api_key, + &[], + Some(&config.storage.provider.config), + &config.workspace_dir, + ) { + Ok(mem) => Ok(Arc::from(mem)), + Err(e) => { + tracing::error!( + error = %format!("{e:#}"), + provider = %config.memory.embedding_provider, + "[channels] memory embedder build failed — falling back to keyword-only \ + memory so channels still start" + ); + let mut fallback_memory = config.memory.clone(); + fallback_memory.embedding_provider = "none".to_string(); + Ok(Arc::from(memory_store::create_memory_with_local_ai( + &fallback_memory, + local_embedding.as_deref(), + &embedding_api_key, + &[], + Some(&config.storage.provider.config), + &config.workspace_dir, + )?)) + } + } +} + +fn register_in(slot: &HandleSlot, handle: MemoryHandle) { + match slot.write() { + Ok(mut guard) => *guard = Some(handle), + Err(e) => log::warn!("[channels:runtime] register_memory_handle: slot poisoned: {e}"), + } +} + +fn rebind_in(slot: &HandleSlot, config: &Config) { + let handle = match slot.read() { + Ok(guard) => guard.as_ref().map(Arc::clone), + Err(e) => { + log::warn!("[channels:runtime] rebind_memory: slot read poisoned: {e}"); + return; + } + }; + let Some(handle) = handle else { + log::debug!( + "[channels:runtime] rebind_memory: no live runtime memory handle registered; \ + skipping re-bind to {}", + config.workspace_dir.display() + ); + return; + }; + match build_channel_memory(config) { + Ok(new_mem) => match handle.write() { + Ok(mut current) => { + log::info!( + "[channels:runtime] re-binding channel memory store to workspace {}", + config.workspace_dir.display() + ); + *current = new_mem; + } + Err(e) => log::warn!("[channels:runtime] rebind_memory: handle poisoned: {e}"), + }, + Err(e) => log::warn!( + "[channels:runtime] rebind_memory: rebuild for {} failed; keeping existing store: {e:#}", + config.workspace_dir.display() + ), + } +} + +/// Register the live runtime's memory handle so [`rebind_memory`] can swap it +/// after login. Called once from `start_channels`. +pub(crate) fn register_memory_handle(handle: MemoryHandle) { + register_in(handle_slot(), handle); +} + +/// Rebuild the channel-runtime memory store for `config`'s workspace and swap +/// it into the live context. Invoked from `credentials::ops::store_session` +/// after activation. No-op when the channel runtime is not running (no handle +/// registered) or when the rebuild fails (keeps the existing store). +pub fn rebind_memory(config: &Config) { + rebind_in(handle_slot(), config); +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn rebind_without_registered_handle_is_noop() { + // With no handle registered, `rebind_in` must return before it ever + // tries to build a store — so this neither panics nor touches disk. + let slot: HandleSlot = RwLock::new(None); + let config = crate::openhuman::config::Config::default(); + rebind_in(&slot, &config); + assert!(slot.read().unwrap().is_none()); + } +} diff --git a/src/openhuman/channels/runtime/mod.rs b/src/openhuman/channels/runtime/mod.rs index 346da3b010..82a0f24940 100644 --- a/src/openhuman/channels/runtime/mod.rs +++ b/src/openhuman/channels/runtime/mod.rs @@ -1,10 +1,14 @@ //! Channel runtime entry points. mod dispatch; +mod memory_rebind; mod startup; mod supervision; +mod workspace; +pub use memory_rebind::rebind_memory; pub use startup::start_channels; +pub use workspace::rebind_workspace; #[cfg(any(test, debug_assertions))] pub mod test_support; diff --git a/src/openhuman/channels/runtime/startup.rs b/src/openhuman/channels/runtime/startup.rs index 9785dee3b0..f18ffdee9e 100644 --- a/src/openhuman/channels/runtime/startup.rs +++ b/src/openhuman/channels/runtime/startup.rs @@ -39,7 +39,7 @@ use crate::openhuman::security::SecurityPolicy; use crate::openhuman::tools; use anyhow::Result; use std::collections::HashMap; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, RwLock}; /// How the channels runtime should construct its default chat provider. /// @@ -83,6 +83,15 @@ pub async fn start_channels(mut config: Config) -> Result<()> { let bus = event_bus::init_global(DEFAULT_CAPACITY); let _tracing_handle = bus.subscribe(Arc::new(TracingSubscriber)); crate::openhuman::health::bus::register_health_subscriber(); + // Re-bindable workspace handle (issue #4398). The runtime may have started + // pre-login against `users/local/`; register the handle so a later + // `store_session` can re-point it at the activated user's workspace without + // a restart. Created up-front so the workspace-scoped subscribers below + // (ProfileMdRenderer, TelegramRemoteSubscriber) and the runtime context all + // share the SAME `RwLock` — one `rebind_workspace` re-points them + // all at once. The context and the global slot hold clones of this handle. + let workspace_handle = Arc::new(RwLock::new(config.workspace_dir.clone())); + super::workspace::register_workspace_handle(Arc::clone(&workspace_handle)); crate::openhuman::workflows::bus::register_workflow_cleanup_subscriber(); crate::openhuman::memory_conversations::register_conversation_persistence_subscriber( config.workspace_dir.clone(), @@ -182,7 +191,7 @@ pub async fn start_channels(mut config: Config) -> Result<()> { use std::sync::Arc; let cache = Arc::new(FacetCache::new(client.profile_conn())); let renderer = - Arc::new(ProfileMdRenderer::new(cache, config.workspace_dir.clone())); + Arc::new(ProfileMdRenderer::new(cache, Arc::clone(&workspace_handle))); ProfileMdRenderer::subscribe(renderer) } else { tracing::debug!( @@ -296,44 +305,17 @@ pub async fn start_channels(mut config: Config) -> Result<()> { config.workspace_dir.clone(), )?; let temperature = config.default_temperature; - let local_embedding = config.workload_local_model("embeddings"); - let embedding_api_key = - crate::openhuman::embeddings::resolve_api_key(&config, &config.memory.embedding_provider); - // Build the memory store. A misconfigured/removed embedding provider (e.g. a - // stale `embedding_provider = "fastembed"` that the factory no longer knows) - // makes the embedder build fail — but that must NOT take every messaging - // channel offline (issue #3712). Fall back to keyword-only memory - // (`embedding_provider = "none"` → NoopEmbedding) so the channel listeners - // still start; semantic memory degrades gracefully instead of the whole - // runtime aborting. - let mem: Arc = match memory_store::create_memory_with_local_ai( - &config.memory, - local_embedding.as_deref(), - &embedding_api_key, - &[], - Some(&config.storage.provider.config), - &config.workspace_dir, - ) { - Ok(mem) => Arc::from(mem), - Err(e) => { - tracing::error!( - error = %format!("{e:#}"), - provider = %config.memory.embedding_provider, - "[channels] memory embedder build failed — falling back to keyword-only \ - memory so channels still start" - ); - let mut fallback_memory = config.memory.clone(); - fallback_memory.embedding_provider = "none".to_string(); - Arc::from(memory_store::create_memory_with_local_ai( - &fallback_memory, - local_embedding.as_deref(), - &embedding_api_key, - &[], - Some(&config.storage.provider.config), - &config.workspace_dir, - )?) - } - }; + // Build the channel-runtime memory store (with the #3712 keyword-only + // fallback). Extracted into `memory_rebind::build_channel_memory` so the + // post-login re-bind (#4398) rebuilds it identically. + let mem: Arc = super::memory_rebind::build_channel_memory(&config)?; + // Register a re-bindable handle to this store so a later `store_session` can + // swap it for the activated user's workspace without a restart. The context + // holds the same `RwLock`; a turn that already cloned the old store keeps + // using it, new turns pick up the swap. + let memory_handle: std::sync::Arc>> = + Arc::new(RwLock::new(Arc::clone(&mem))); + super::memory_rebind::register_memory_handle(Arc::clone(&memory_handle)); // Build system prompt from workspace identity files + skills let workspace = config.workspace_dir.clone(); let tools_registry = Arc::new(tools::all_tools_with_runtime( @@ -733,7 +715,7 @@ pub async fn start_channels(mut config: Config) -> Result<()> { let _telegram_remote_handle = if channels_by_name.contains_key("telegram") { let handle = bus.subscribe(Arc::new( crate::openhuman::channels::providers::telegram::TelegramRemoteSubscriber::new( - config.workspace_dir.clone(), + Arc::clone(&workspace_handle), ), )); tracing::debug!("[telegram-remote] registered TelegramRemoteSubscriber"); @@ -776,7 +758,7 @@ pub async fn start_channels(mut config: Config) -> Result<()> { channels_by_name, provider: Arc::clone(&provider), default_provider: Arc::new(provider_name), - memory: Arc::clone(&mem), + memory_handle, tools_registry: Arc::clone(&tools_registry), system_prompt: Arc::new(system_prompt), model: Arc::new(model.clone()), @@ -791,7 +773,7 @@ pub async fn start_channels(mut config: Config) -> Result<()> { inference_url: config.inference_url.clone(), reliability: Arc::new(config.reliability.clone()), provider_runtime_options, - workspace_dir: Arc::new(config.workspace_dir.clone()), + workspace_handle, message_timeout_secs, multimodal: config.multimodal.clone(), multimodal_files: config.multimodal_files.clone(), diff --git a/src/openhuman/channels/runtime/test_support.rs b/src/openhuman/channels/runtime/test_support.rs index a7b32ea3de..f9841a2fbf 100644 --- a/src/openhuman/channels/runtime/test_support.rs +++ b/src/openhuman/channels/runtime/test_support.rs @@ -431,13 +431,13 @@ pub async fn run_dispatch_harness(options: DispatchHarnessOptions) -> DispatchHa channels_by_name: Arc::new(channels_by_name), provider, default_provider: Arc::new("harness-provider".to_string()), - memory: Arc::new(HarnessMemory { + memory_handle: Arc::new(std::sync::RwLock::new(Arc::new(HarnessMemory { entries: options .memory_entries .into_iter() .map(memory_entry) .collect(), - }), + }))), tools_registry: Arc::new(vec![Box::new(HarnessTool) as Box]), system_prompt: Arc::new("system prompt".to_string()), model: Arc::new("harness-model".to_string()), @@ -452,7 +452,7 @@ pub async fn run_dispatch_harness(options: DispatchHarnessOptions) -> DispatchHa inference_url: None, reliability: Arc::new(ReliabilityConfig::default()), provider_runtime_options: ProviderRuntimeOptions::default(), - workspace_dir: Arc::new(PathBuf::from(std::env::temp_dir())), + workspace_handle: Arc::new(std::sync::RwLock::new(PathBuf::from(std::env::temp_dir()))), message_timeout_secs: options.timeout_secs, multimodal: MultimodalConfig::default(), multimodal_files: MultimodalFileConfig::default(), diff --git a/src/openhuman/channels/runtime/workspace.rs b/src/openhuman/channels/runtime/workspace.rs new file mode 100644 index 0000000000..2a1f3b1699 --- /dev/null +++ b/src/openhuman/channels/runtime/workspace.rs @@ -0,0 +1,122 @@ +//! Process-global, re-bindable workspace handle for the channel runtime. +//! +//! The channel runtime is started **once** with a `Config` snapshot — often +//! *before* any user has signed in, so `config.workspace_dir` resolves to +//! `~/.openhuman/users/local/`. When a later sign-in writes +//! `~/.openhuman/active_user.toml` and creates +//! `~/.openhuman/users//`, `credentials::ops::store_session` must +//! re-point the running runtime at the activated user's workspace — otherwise +//! every channel turn keeps reading/writing under `users/local/` until the +//! process restarts (issue #4398). +//! +//! [`ChannelRuntimeContext`](crate::openhuman::channels::context::ChannelRuntimeContext) +//! holds an `Arc>` handle and registers a clone of it here at +//! boot ([`register_workspace_handle`]). [`rebind_workspace`] swaps the path in +//! place through that shared handle, so the live dispatch loop and every +//! workspace-scoped read/write (`ctx.workspace_dir()`) re-resolves on the next +//! turn without a restart. This mirrors the memory rebind +//! (`memory::global::init`) and the active-channel handle +//! (`channels::proactive::register_active_channel_handle`) precedents. + +use std::path::PathBuf; +use std::sync::{Arc, OnceLock, RwLock}; + +/// Shared, in-place-swappable workspace path held by the live runtime context. +pub(crate) type WorkspaceHandle = Arc>; + +type HandleSlot = RwLock>; + +/// Process-global slot pointing at the live runtime's workspace handle. +static WORKSPACE_HANDLE: OnceLock = OnceLock::new(); + +fn handle_slot() -> &'static HandleSlot { + WORKSPACE_HANDLE.get_or_init(|| RwLock::new(None)) +} + +fn register_in(slot: &HandleSlot, handle: WorkspaceHandle) { + match slot.write() { + Ok(mut guard) => *guard = Some(handle), + Err(e) => log::warn!("[channels:runtime] register_workspace_handle: slot poisoned: {e}"), + } +} + +fn rebind_in(slot: &HandleSlot, workspace_dir: PathBuf) { + let handle = match slot.read() { + Ok(guard) => guard.as_ref().map(Arc::clone), + Err(e) => { + log::warn!("[channels:runtime] rebind_workspace: slot read poisoned: {e}"); + return; + } + }; + let Some(handle) = handle else { + // The channel runtime isn't running in this process (e.g. CLI-only + // paths, or login before `start_channels`). Nothing to re-bind — the + // runtime will resolve the active user from its startup `Config` load. + log::debug!( + "[channels:runtime] rebind_workspace: no live runtime handle registered; \ + skipping re-bind to {}", + workspace_dir.display() + ); + return; + }; + match handle.write() { + Ok(mut current) => { + if *current != workspace_dir { + log::info!( + "[channels:runtime] re-binding channel workspace {} -> {}", + current.display(), + workspace_dir.display() + ); + *current = workspace_dir; + } + } + Err(e) => log::warn!("[channels:runtime] rebind_workspace: handle poisoned: {e}"), + } +} + +/// Register the live runtime's workspace handle so [`rebind_workspace`] can +/// re-point it after login. Called once from `start_channels`. +pub(crate) fn register_workspace_handle(handle: WorkspaceHandle) { + register_in(handle_slot(), handle); +} + +/// Re-point the running channel runtime at `workspace_dir`. +/// +/// Invoked from `credentials::ops::store_session` after activation, against +/// `effective_config.workspace_dir`. No-op when the channel runtime is not +/// running (no handle registered yet). +pub fn rebind_workspace(workspace_dir: PathBuf) { + rebind_in(handle_slot(), workspace_dir); +} + +#[cfg(test)] +mod tests { + use super::*; + use std::path::Path; + + #[test] + fn rebind_reresolves_registered_handle() { + // A fresh, test-local slot — the `memory::global` test pattern — so this + // assertion is deterministic under parallel test execution instead of + // racing the process-global singleton. + let slot: HandleSlot = RwLock::new(None); + // The live `ChannelRuntimeContext` holds a clone of this same handle + // and reads it via `workspace_dir()`, so a re-bind here is what every + // channel turn observes on its next workspace resolution. + let handle: WorkspaceHandle = Arc::new(RwLock::new(PathBuf::from("/users/local"))); + register_in(&slot, Arc::clone(&handle)); + assert_eq!(*handle.read().unwrap(), Path::new("/users/local")); + + rebind_in(&slot, PathBuf::from("/users/user-42")); + + assert_eq!(*handle.read().unwrap(), Path::new("/users/user-42")); + } + + #[test] + fn rebind_without_registered_handle_is_noop() { + let slot: HandleSlot = RwLock::new(None); + // Must not panic when the channel runtime isn't started in-process. + rebind_in(&slot, PathBuf::from("/users/user-7")); + assert!(slot.read().unwrap().is_none()); + } +} diff --git a/src/openhuman/channels/tests/context.rs b/src/openhuman/channels/tests/context.rs index 67ab5517c8..026f83c6c3 100644 --- a/src/openhuman/channels/tests/context.rs +++ b/src/openhuman/channels/tests/context.rs @@ -66,7 +66,7 @@ fn compact_sender_history_keeps_recent_truncated_messages() { channels_by_name: Arc::new(HashMap::new()), provider: Arc::new(DummyProvider), default_provider: Arc::new("test-provider".to_string()), - memory: Arc::new(super::common::NoopMemory), + memory_handle: Arc::new(std::sync::RwLock::new(Arc::new(super::common::NoopMemory))), tools_registry: Arc::new(vec![]), system_prompt: Arc::new("system".to_string()), model: Arc::new("test-model".to_string()), @@ -83,7 +83,7 @@ fn compact_sender_history_keeps_recent_truncated_messages() { multimodal: crate::openhuman::config::MultimodalConfig::default(), multimodal_files: crate::openhuman::config::MultimodalFileConfig::default(), provider_runtime_options: crate::openhuman::inference::provider::ProviderRuntimeOptions::default(), - workspace_dir: Arc::new(std::env::temp_dir()), + workspace_handle: Arc::new(std::sync::RwLock::new(std::env::temp_dir())), message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS, }; diff --git a/src/openhuman/channels/tests/discord_integration.rs b/src/openhuman/channels/tests/discord_integration.rs index f8ce7050d1..a85a4327d5 100644 --- a/src/openhuman/channels/tests/discord_integration.rs +++ b/src/openhuman/channels/tests/discord_integration.rs @@ -120,7 +120,7 @@ fn make_discord_ctx( channels_by_name: Arc::new(channels), provider, default_provider: Arc::new("test-provider".to_string()), - memory: Arc::new(NoopMemory), + memory_handle: Arc::new(std::sync::RwLock::new(Arc::new(NoopMemory))), tools_registry: Arc::new(vec![]), system_prompt: Arc::new("test-system-prompt".to_string()), model: Arc::new("test-model".to_string()), @@ -136,7 +136,7 @@ fn make_discord_ctx( reliability: Arc::new(crate::openhuman::config::ReliabilityConfig::default()), provider_runtime_options: crate::openhuman::inference::provider::ProviderRuntimeOptions::default(), - workspace_dir: Arc::new(std::env::temp_dir()), + workspace_handle: Arc::new(std::sync::RwLock::new(std::env::temp_dir())), message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS, multimodal: crate::openhuman::config::MultimodalConfig::default(), multimodal_files: crate::openhuman::config::MultimodalFileConfig::default(), diff --git a/src/openhuman/channels/tests/memory.rs b/src/openhuman/channels/tests/memory.rs index f255c6d9e3..4a2cfb561a 100644 --- a/src/openhuman/channels/tests/memory.rs +++ b/src/openhuman/channels/tests/memory.rs @@ -140,7 +140,7 @@ async fn process_channel_message_restores_per_sender_history_on_follow_ups() { channels_by_name: Arc::new(channels_by_name), provider: provider_impl.clone(), default_provider: Arc::new("test-provider".to_string()), - memory: Arc::new(NoopMemory), + memory_handle: Arc::new(std::sync::RwLock::new(Arc::new(NoopMemory))), tools_registry: Arc::new(vec![]), system_prompt: Arc::new("test-system-prompt".to_string()), model: Arc::new("test-model".to_string()), @@ -155,7 +155,7 @@ async fn process_channel_message_restores_per_sender_history_on_follow_ups() { inference_url: None, reliability: Arc::new(crate::openhuman::config::ReliabilityConfig::default()), provider_runtime_options: provider::ProviderRuntimeOptions::default(), - workspace_dir: Arc::new(std::env::temp_dir()), + workspace_handle: Arc::new(std::sync::RwLock::new(std::env::temp_dir())), message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS, multimodal: crate::openhuman::config::MultimodalConfig::default(), multimodal_files: crate::openhuman::config::MultimodalFileConfig::default(), @@ -239,7 +239,7 @@ async fn process_channel_message_uses_autosaved_memory_after_history_is_cleared( inference_url: None, reliability: Arc::new(crate::openhuman::config::ReliabilityConfig::default()), provider_runtime_options: provider::ProviderRuntimeOptions::default(), - workspace_dir: Arc::new(std::env::temp_dir()), + workspace_handle: Arc::new(std::sync::RwLock::new(std::env::temp_dir())), message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS, multimodal: crate::openhuman::config::MultimodalConfig::default(), multimodal_files: crate::openhuman::config::MultimodalFileConfig::default(), diff --git a/src/openhuman/channels/tests/runtime_dispatch.rs b/src/openhuman/channels/tests/runtime_dispatch.rs index c02f776168..0537d2b318 100644 --- a/src/openhuman/channels/tests/runtime_dispatch.rs +++ b/src/openhuman/channels/tests/runtime_dispatch.rs @@ -47,7 +47,7 @@ async fn message_dispatch_processes_messages_in_parallel() { delay: Duration::from_millis(5), }), default_provider: Arc::new("test-provider".to_string()), - memory: Arc::new(NoopMemory), + memory_handle: Arc::new(std::sync::RwLock::new(Arc::new(NoopMemory))), tools_registry: Arc::new(vec![]), system_prompt: Arc::new("test-system-prompt".to_string()), model: Arc::new("test-model".to_string()), @@ -62,7 +62,7 @@ async fn message_dispatch_processes_messages_in_parallel() { inference_url: None, reliability: Arc::new(crate::openhuman::config::ReliabilityConfig::default()), provider_runtime_options: provider::ProviderRuntimeOptions::default(), - workspace_dir: Arc::new(std::env::temp_dir()), + workspace_handle: Arc::new(std::sync::RwLock::new(std::env::temp_dir())), message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS, multimodal: crate::openhuman::config::MultimodalConfig::default(), multimodal_files: crate::openhuman::config::MultimodalFileConfig::default(), @@ -119,7 +119,7 @@ async fn process_channel_message_cancels_scoped_typing_task() { delay: Duration::from_millis(20), }), default_provider: Arc::new("test-provider".to_string()), - memory: Arc::new(NoopMemory), + memory_handle: Arc::new(std::sync::RwLock::new(Arc::new(NoopMemory))), tools_registry: Arc::new(vec![]), system_prompt: Arc::new("test-system-prompt".to_string()), model: Arc::new("test-model".to_string()), @@ -134,7 +134,7 @@ async fn process_channel_message_cancels_scoped_typing_task() { inference_url: None, reliability: Arc::new(crate::openhuman::config::ReliabilityConfig::default()), provider_runtime_options: provider::ProviderRuntimeOptions::default(), - workspace_dir: Arc::new(std::env::temp_dir()), + workspace_handle: Arc::new(std::sync::RwLock::new(std::env::temp_dir())), message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS, multimodal: crate::openhuman::config::MultimodalConfig::default(), multimodal_files: crate::openhuman::config::MultimodalFileConfig::default(), @@ -206,7 +206,7 @@ async fn dispatch_routes_through_agent_run_turn_bus_handler() { // handler never invokes it — so a minimal no-op is fine. provider: Arc::new(super::common::DummyProvider), default_provider: Arc::new("test-provider".to_string()), - memory: Arc::new(NoopMemory), + memory_handle: Arc::new(std::sync::RwLock::new(Arc::new(NoopMemory))), tools_registry: Arc::new(vec![]), system_prompt: Arc::new("test-system-prompt".to_string()), model: Arc::new("test-model".to_string()), @@ -221,7 +221,7 @@ async fn dispatch_routes_through_agent_run_turn_bus_handler() { inference_url: None, reliability: Arc::new(crate::openhuman::config::ReliabilityConfig::default()), provider_runtime_options: provider::ProviderRuntimeOptions::default(), - workspace_dir: Arc::new(std::env::temp_dir()), + workspace_handle: Arc::new(std::sync::RwLock::new(std::env::temp_dir())), message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS, multimodal: crate::openhuman::config::MultimodalConfig::default(), multimodal_files: crate::openhuman::config::MultimodalFileConfig::default(), @@ -288,7 +288,7 @@ async fn channel_processed_event_records_resolved_agent_route() { channels_by_name: Arc::new(channels_by_name), provider: Arc::new(super::common::DummyProvider), default_provider: Arc::new("requested-provider".to_string()), - memory: Arc::new(NoopMemory), + memory_handle: Arc::new(std::sync::RwLock::new(Arc::new(NoopMemory))), tools_registry: Arc::new(vec![]), system_prompt: Arc::new("test-system-prompt".to_string()), model: Arc::new("requested-model".to_string()), @@ -303,7 +303,7 @@ async fn channel_processed_event_records_resolved_agent_route() { inference_url: None, reliability: Arc::new(crate::openhuman::config::ReliabilityConfig::default()), provider_runtime_options: provider::ProviderRuntimeOptions::default(), - workspace_dir: Arc::new(std::env::temp_dir()), + workspace_handle: Arc::new(std::sync::RwLock::new(std::env::temp_dir())), message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS, multimodal: crate::openhuman::config::MultimodalConfig::default(), multimodal_files: crate::openhuman::config::MultimodalFileConfig::default(), @@ -399,7 +399,7 @@ async fn process_channel_message_hardens_multimodal_files_against_smuggled_marke channels_by_name: Arc::new(channels_by_name), provider: Arc::new(super::common::DummyProvider), default_provider: Arc::new("test-provider".to_string()), - memory: Arc::new(NoopMemory), + memory_handle: Arc::new(std::sync::RwLock::new(Arc::new(NoopMemory))), tools_registry: Arc::new(vec![]), system_prompt: Arc::new("test-system-prompt".to_string()), model: Arc::new("test-model".to_string()), @@ -414,7 +414,7 @@ async fn process_channel_message_hardens_multimodal_files_against_smuggled_marke inference_url: None, reliability: Arc::new(crate::openhuman::config::ReliabilityConfig::default()), provider_runtime_options: provider::ProviderRuntimeOptions::default(), - workspace_dir: Arc::new(std::env::temp_dir()), + workspace_handle: Arc::new(std::sync::RwLock::new(std::env::temp_dir())), message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS, multimodal: crate::openhuman::config::MultimodalConfig::default(), multimodal_files: permissive_operator_default, @@ -481,7 +481,7 @@ async fn process_channel_message_hardens_against_relative_path_markers() { channels_by_name: Arc::new(channels_by_name), provider: Arc::new(super::common::DummyProvider), default_provider: Arc::new("test-provider".to_string()), - memory: Arc::new(NoopMemory), + memory_handle: Arc::new(std::sync::RwLock::new(Arc::new(NoopMemory))), tools_registry: Arc::new(vec![]), system_prompt: Arc::new("test-system-prompt".to_string()), model: Arc::new("test-model".to_string()), @@ -496,7 +496,7 @@ async fn process_channel_message_hardens_against_relative_path_markers() { inference_url: None, reliability: Arc::new(crate::openhuman::config::ReliabilityConfig::default()), provider_runtime_options: provider::ProviderRuntimeOptions::default(), - workspace_dir: Arc::new(std::env::temp_dir()), + workspace_handle: Arc::new(std::sync::RwLock::new(std::env::temp_dir())), message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS, multimodal: crate::openhuman::config::MultimodalConfig::default(), multimodal_files: crate::openhuman::config::MultimodalFileConfig::default(), diff --git a/src/openhuman/channels/tests/runtime_tool_calls.rs b/src/openhuman/channels/tests/runtime_tool_calls.rs index 257eda6780..5cc1f5d600 100644 --- a/src/openhuman/channels/tests/runtime_tool_calls.rs +++ b/src/openhuman/channels/tests/runtime_tool_calls.rs @@ -25,7 +25,7 @@ async fn process_channel_message_executes_tool_calls_instead_of_sending_raw_json channels_by_name: Arc::new(channels_by_name), provider: Arc::new(ToolCallingProvider), default_provider: Arc::new("test-provider".to_string()), - memory: Arc::new(NoopMemory), + memory_handle: Arc::new(std::sync::RwLock::new(Arc::new(NoopMemory))), tools_registry: Arc::new(vec![Box::new(MockPriceTool)]), system_prompt: Arc::new("test-system-prompt".to_string()), model: Arc::new("test-model".to_string()), @@ -40,7 +40,7 @@ async fn process_channel_message_executes_tool_calls_instead_of_sending_raw_json inference_url: None, reliability: Arc::new(crate::openhuman::config::ReliabilityConfig::default()), provider_runtime_options: provider::ProviderRuntimeOptions::default(), - workspace_dir: Arc::new(std::env::temp_dir()), + workspace_handle: Arc::new(std::sync::RwLock::new(std::env::temp_dir())), message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS, multimodal: crate::openhuman::config::MultimodalConfig::default(), multimodal_files: crate::openhuman::config::MultimodalFileConfig::default(), @@ -81,7 +81,7 @@ async fn process_channel_message_executes_tool_calls_with_alias_tags() { channels_by_name: Arc::new(channels_by_name), provider: Arc::new(ToolCallingAliasProvider), default_provider: Arc::new("test-provider".to_string()), - memory: Arc::new(NoopMemory), + memory_handle: Arc::new(std::sync::RwLock::new(Arc::new(NoopMemory))), tools_registry: Arc::new(vec![Box::new(MockPriceTool)]), system_prompt: Arc::new("test-system-prompt".to_string()), model: Arc::new("test-model".to_string()), @@ -96,7 +96,7 @@ async fn process_channel_message_executes_tool_calls_with_alias_tags() { inference_url: None, reliability: Arc::new(crate::openhuman::config::ReliabilityConfig::default()), provider_runtime_options: provider::ProviderRuntimeOptions::default(), - workspace_dir: Arc::new(std::env::temp_dir()), + workspace_handle: Arc::new(std::sync::RwLock::new(std::env::temp_dir())), message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS, multimodal: crate::openhuman::config::MultimodalConfig::default(), multimodal_files: crate::openhuman::config::MultimodalFileConfig::default(), @@ -146,7 +146,7 @@ async fn process_channel_message_handles_models_command_without_llm_call() { channels_by_name: Arc::new(channels_by_name), provider: Arc::clone(&default_provider), default_provider: Arc::new("test-provider".to_string()), - memory: Arc::new(NoopMemory), + memory_handle: Arc::new(std::sync::RwLock::new(Arc::new(NoopMemory))), tools_registry: Arc::new(vec![]), system_prompt: Arc::new("test-system-prompt".to_string()), model: Arc::new("default-model".to_string()), @@ -161,7 +161,7 @@ async fn process_channel_message_handles_models_command_without_llm_call() { inference_url: None, reliability: Arc::new(crate::openhuman::config::ReliabilityConfig::default()), provider_runtime_options: provider::ProviderRuntimeOptions::default(), - workspace_dir: Arc::new(std::env::temp_dir()), + workspace_handle: Arc::new(std::sync::RwLock::new(std::env::temp_dir())), message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS, multimodal: crate::openhuman::config::MultimodalConfig::default(), multimodal_files: crate::openhuman::config::MultimodalFileConfig::default(), @@ -238,7 +238,7 @@ async fn process_channel_message_uses_route_override_provider_and_model() { channels_by_name: Arc::new(channels_by_name), provider: Arc::clone(&default_provider), default_provider: Arc::new("test-provider".to_string()), - memory: Arc::new(NoopMemory), + memory_handle: Arc::new(std::sync::RwLock::new(Arc::new(NoopMemory))), tools_registry: Arc::new(vec![]), system_prompt: Arc::new("test-system-prompt".to_string()), model: Arc::new("default-model".to_string()), @@ -253,7 +253,7 @@ async fn process_channel_message_uses_route_override_provider_and_model() { inference_url: None, reliability: Arc::new(crate::openhuman::config::ReliabilityConfig::default()), provider_runtime_options: provider::ProviderRuntimeOptions::default(), - workspace_dir: Arc::new(std::env::temp_dir()), + workspace_handle: Arc::new(std::sync::RwLock::new(std::env::temp_dir())), message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS, multimodal: crate::openhuman::config::MultimodalConfig::default(), multimodal_files: crate::openhuman::config::MultimodalFileConfig::default(), @@ -288,7 +288,7 @@ async fn process_channel_message_respects_configured_max_tool_iterations_above_d required_tool_iterations: 11, }), default_provider: Arc::new("test-provider".to_string()), - memory: Arc::new(NoopMemory), + memory_handle: Arc::new(std::sync::RwLock::new(Arc::new(NoopMemory))), tools_registry: Arc::new(vec![Box::new(MockPriceTool)]), system_prompt: Arc::new("test-system-prompt".to_string()), model: Arc::new("test-model".to_string()), @@ -303,7 +303,7 @@ async fn process_channel_message_respects_configured_max_tool_iterations_above_d inference_url: None, reliability: Arc::new(crate::openhuman::config::ReliabilityConfig::default()), provider_runtime_options: provider::ProviderRuntimeOptions::default(), - workspace_dir: Arc::new(std::env::temp_dir()), + workspace_handle: Arc::new(std::sync::RwLock::new(std::env::temp_dir())), message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS, multimodal: crate::openhuman::config::MultimodalConfig::default(), multimodal_files: crate::openhuman::config::MultimodalFileConfig::default(), @@ -345,7 +345,7 @@ async fn process_channel_message_reports_configured_max_tool_iterations_limit() required_tool_iterations: 20, }), default_provider: Arc::new("test-provider".to_string()), - memory: Arc::new(NoopMemory), + memory_handle: Arc::new(std::sync::RwLock::new(Arc::new(NoopMemory))), tools_registry: Arc::new(vec![Box::new(MockPriceTool)]), system_prompt: Arc::new("test-system-prompt".to_string()), model: Arc::new("test-model".to_string()), @@ -360,7 +360,7 @@ async fn process_channel_message_reports_configured_max_tool_iterations_limit() inference_url: None, reliability: Arc::new(crate::openhuman::config::ReliabilityConfig::default()), provider_runtime_options: provider::ProviderRuntimeOptions::default(), - workspace_dir: Arc::new(std::env::temp_dir()), + workspace_handle: Arc::new(std::sync::RwLock::new(std::env::temp_dir())), message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS, multimodal: crate::openhuman::config::MultimodalConfig::default(), multimodal_files: crate::openhuman::config::MultimodalFileConfig::default(), diff --git a/src/openhuman/channels/tests/telegram_integration.rs b/src/openhuman/channels/tests/telegram_integration.rs index 11811cffbd..35c2245c9b 100644 --- a/src/openhuman/channels/tests/telegram_integration.rs +++ b/src/openhuman/channels/tests/telegram_integration.rs @@ -96,7 +96,7 @@ fn make_test_context( channels_by_name: Arc::new(channels), provider, default_provider: Arc::new("test-provider".to_string()), - memory: Arc::new(NoopMemory), + memory_handle: Arc::new(std::sync::RwLock::new(Arc::new(NoopMemory))), tools_registry: Arc::new(vec![]), system_prompt: Arc::new("test-system-prompt".to_string()), model: Arc::new("test-model".to_string()), @@ -112,7 +112,7 @@ fn make_test_context( reliability: Arc::new(crate::openhuman::config::ReliabilityConfig::default()), provider_runtime_options: crate::openhuman::inference::provider::ProviderRuntimeOptions::default(), - workspace_dir: Arc::new(std::env::temp_dir()), + workspace_handle: Arc::new(std::sync::RwLock::new(std::env::temp_dir())), message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS, multimodal: crate::openhuman::config::MultimodalConfig::default(), multimodal_files: crate::openhuman::config::MultimodalFileConfig::default(), diff --git a/src/openhuman/credentials/ops.rs b/src/openhuman/credentials/ops.rs index 4f41be8bdc..6f50bdd767 100644 --- a/src/openhuman/credentials/ops.rs +++ b/src/openhuman/credentials/ops.rs @@ -438,6 +438,34 @@ async fn store_session_inner( ); logs.push("conversation persistence bound to active workspace".to_string()); + // Re-resolve the two long-lived subsystems that started from a pre-login + // Config snapshot and would otherwise keep writing under users/local/ until + // a process restart (issue #4398, the #2437-E follow-up). Mirrors the + // memory/conversation rebind above: + // * channel runtime — re-point its shared workspace handle so every + // channel turn re-resolves to the activated user's workspace. + // * cron scheduler — re-bind its active config so the next poll reads the + // cron store under users//. + // Both are no-ops when the subsystem isn't running in this process. + crate::openhuman::channels::rebind_workspace(effective_config.workspace_dir.clone()); + crate::openhuman::cron::scheduler::rebind(effective_config.clone()); + // Re-point the remaining workspace-baked channel-runtime state (issue + // #4398 follow-up): the agent sandbox root (so file-writing tools stay + // confined to the activated user's workspace) and the channel memory store + // (conversation auto-save + memory-context retrieval). The Telegram + // busy-state subscriber and the PROFILE.md writer share the workspace + // handle re-bound above, so they re-resolve without a separate call. Both + // calls are no-ops when the channel runtime / live policy aren't installed. + crate::openhuman::security::live_policy::set_workspace_dir( + effective_config.workspace_dir.clone(), + &effective_config.autonomy, + ); + crate::openhuman::channels::rebind_memory(&effective_config); + logs.push(format!( + "channel runtime + scheduler + security + memory re-bound to workspace {}", + effective_config.workspace_dir.display() + )); + // Now that active_user.toml exists and config.workspace_dir resolves to // the per-user path, seed the subconscious defaults and spawn the // heartbeat loop. Idempotent — no-op on subsequent logins of the same diff --git a/src/openhuman/cron/scheduler.rs b/src/openhuman/cron/scheduler.rs index b31cdf5921..2bc1af0ae2 100644 --- a/src/openhuman/cron/scheduler.rs +++ b/src/openhuman/cron/scheduler.rs @@ -11,11 +11,83 @@ use anyhow::Result; use chrono::{DateTime, Utc}; use futures_util::{stream, StreamExt}; use std::process::Stdio; -use std::sync::Arc; +use std::sync::{Arc, OnceLock, RwLock}; use tokio::process::Command; use tokio::time::{self, Duration}; const MIN_POLL_SECONDS: u64 = 5; + +/// Process-global holder for the scheduler's *active* config. +/// +/// The cron scheduler is started once (`run`) with a `Config` snapshot, +/// frequently *before* any user has signed in — so `config.workspace_dir` +/// resolves to `~/.openhuman/users/local/` and every `due_jobs` poll reads the +/// cron store there. After a sign-in, `credentials::ops::store_session` calls +/// [`rebind`] with the activated user's config; the poll loop reads this holder +/// each tick, so the next poll resolves the cron store under +/// `users//` without a process restart (issue #4398). Mirrors the +/// re-bind precedents in `memory::global::init` and +/// `channels::runtime::rebind_workspace`. +type ActiveConfigSlot = RwLock>; + +static ACTIVE_CONFIG: OnceLock = OnceLock::new(); + +fn active_config_slot() -> &'static ActiveConfigSlot { + ACTIVE_CONFIG.get_or_init(ActiveConfigSlot::default) +} + +/// Seed the holder with the scheduler's startup config, but only if nothing is +/// bound yet — a live [`rebind`] that happened before the loop reached its +/// first tick must not be clobbered by the boot snapshot. +fn seed_in(slot: &ActiveConfigSlot, config: Config) { + if let Ok(mut guard) = slot.write() { + if guard.is_none() { + *guard = Some(config); + } + } +} + +fn rebind_in(slot: &ActiveConfigSlot, config: Config) { + match slot.write() { + Ok(mut guard) => { + let changed = guard + .as_ref() + .map(|c| c.workspace_dir != config.workspace_dir) + .unwrap_or(true); + if changed { + log::info!( + "[cron:scheduler] re-binding scheduler workspace -> {}", + config.workspace_dir.display() + ); + } + *guard = Some(config); + } + Err(e) => log::warn!("[cron:scheduler] rebind: active-config lock poisoned: {e}"), + } +} + +fn current_in(slot: &ActiveConfigSlot) -> Option { + slot.read().ok().and_then(|guard| guard.clone()) +} + +/// Seed the scheduler's active-config holder from its startup config. +fn seed_active_config(config: Config) { + seed_in(active_config_slot(), config); +} + +/// Re-bind the scheduler's active config after a post-login active-user switch. +/// +/// Invoked from `credentials::ops::store_session` against `effective_config`. +/// The poll loop picks this up on its next tick. +pub fn rebind(config: Config) { + rebind_in(active_config_slot(), config); +} + +/// The scheduler's current active config, if the loop has started (or a +/// [`rebind`] has landed). +fn current_active_config() -> Option { + current_in(active_config_slot()) +} const SHELL_JOB_TIMEOUT_SECS: u64 = 120; const AGENT_JOB_USER_FAILURE_MESSAGE: &str = "Something went wrong. Please try again.\nThis error has been reported. You can also report it on Discord.\nReport on Discord"; // Actionable, static failure copy for the three permanent cron halt states @@ -167,13 +239,12 @@ pub async fn run(config: Config) -> Result<()> { crate::core::event_bus::init_global(crate::core::event_bus::DEFAULT_CAPACITY); crate::openhuman::health::bus::register_health_subscriber(); + // Seed the re-bindable active-config holder so `store_session` can later + // re-point the poll loop at the activated user's workspace (issue #4398). + seed_active_config(config.clone()); + let poll_secs = config.reliability.scheduler_poll_secs.max(MIN_POLL_SECONDS); let mut interval = time::interval(Duration::from_secs(poll_secs)); - let security = Arc::new(SecurityPolicy::from_config( - &config.autonomy, - &config.workspace_dir, - &config.action_dir, - )); publish_global(DomainEvent::SystemStartup { component: "scheduler".to_string(), @@ -190,7 +261,19 @@ pub async fn run(config: Config) -> Result<()> { loop { interval.tick().await; - tick_once(&config, &security, &mut last_emitted_health).await; + // Re-resolve the active config every tick. After a post-login + // `rebind`, `current` (and its `workspace_dir`) points at the + // activated user's workspace, so `due_jobs`/security below read the + // cron store under `users//` instead of the pre-login + // `users/local/` snapshot. Falls back to the startup snapshot if the + // holder is somehow unset. + let current = current_active_config().unwrap_or_else(|| config.clone()); + let security = Arc::new(SecurityPolicy::from_config( + ¤t.autonomy, + ¤t.workspace_dir, + ¤t.action_dir, + )); + tick_once(¤t, &security, &mut last_emitted_health).await; } } diff --git a/src/openhuman/cron/scheduler_tests.rs b/src/openhuman/cron/scheduler_tests.rs index 98faf9c11d..1eb888ac78 100644 --- a/src/openhuman/cron/scheduler_tests.rs +++ b/src/openhuman/cron/scheduler_tests.rs @@ -6,7 +6,7 @@ use crate::openhuman::security::SecurityPolicy; use chrono::{Duration as ChronoDuration, Timelike, Utc}; #[cfg(not(windows))] use std::os::unix::fs::PermissionsExt; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use tempfile::TempDir; async fn test_config(tmp: &TempDir) -> Config { @@ -134,6 +134,53 @@ async fn deliver_if_configured_alerts_no_delivery_failure() { ); } +// Issue #4398 — the scheduler is started once with a pre-login `Config` +// snapshot (workspace = `users/local/`). After a sign-in, `store_session` +// calls `scheduler::rebind(effective_config)`; the poll loop re-resolves the +// active config each tick, so `due_jobs` must then read the cron store under +// the activated user's workspace instead of the pre-login one. This drives the +// re-bind seam through a test-local slot (the `memory::global` pattern) so it's +// deterministic under parallel execution. +#[tokio::test] +async fn rebind_reresolves_scheduler_workspace_after_store_session() { + let pre_login = TempDir::new().unwrap(); + let active_user = TempDir::new().unwrap(); + let pre_login_config = test_config(&pre_login).await; + let active_user_config = test_config(&active_user).await; + + // A scheduled job created only under the activated user's workspace. + let job = cron::add_job(&active_user_config, "* * * * *", "echo active-user").unwrap(); + let far_future = job.next_run + ChronoDuration::minutes(1); + + let slot: ActiveConfigSlot = RwLock::new(None); + + // Boot: bound to the pre-login (users/local) workspace. Its store is empty, + // so the scheduler would run nothing for the signed-in user. + seed_in(&slot, pre_login_config.clone()); + let before = current_in(&slot).expect("seeded config"); + assert_eq!(before.workspace_dir, pre_login_config.workspace_dir); + assert!( + cron::due_jobs(&before, far_future).unwrap().is_empty(), + "pre-login workspace has no jobs for the signed-in user" + ); + + // Login: store_session re-binds to the activated user's workspace. + rebind_in(&slot, active_user_config.clone()); + + let after = current_in(&slot).expect("rebound config"); + assert_eq!( + after.workspace_dir, active_user_config.workspace_dir, + "scheduler must re-resolve to the activated user's workspace" + ); + let due = cron::due_jobs(&after, far_future).unwrap(); + assert_eq!( + due.len(), + 1, + "after rebind the poll loop must read jobs from users//" + ); + assert_eq!(due[0].command, "echo active-user"); +} + // Negative guard: a successful no-delivery run with no output must NOT alert — // the hoist only surfaces failures + non-empty results, never quiet successes. #[tokio::test] diff --git a/src/openhuman/learning/profile_md_renderer.rs b/src/openhuman/learning/profile_md_renderer.rs index e610815a0d..99c0eb622d 100644 --- a/src/openhuman/learning/profile_md_renderer.rs +++ b/src/openhuman/learning/profile_md_renderer.rs @@ -35,7 +35,7 @@ //! thread pool. use std::path::PathBuf; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use async_trait::async_trait; @@ -93,16 +93,28 @@ const BLOCK_SPECS: &[BlockSpec] = &[ /// managed blocks of `PROFILE.md`. pub struct ProfileMdRenderer { cache: Arc, - workspace_dir: PathBuf, + /// Re-bindable workspace handle (issue #4398). Shared with the channel + /// runtime context so a post-login `rebind_workspace` re-points PROFILE.md + /// writes at the activated user's workspace without re-subscribing. + workspace_handle: Arc>, } impl ProfileMdRenderer { /// Create a new renderer backed by `cache`, writing to - /// `workspace_dir/PROFILE.md`. - pub fn new(cache: Arc, workspace_dir: PathBuf) -> Self { + /// `/PROFILE.md` where `` is re-resolved through the + /// shared `workspace_handle` on every render. + pub fn new(cache: Arc, workspace_handle: Arc>) -> Self { Self { cache, - workspace_dir, + workspace_handle, + } + } + + /// Current workspace directory, re-resolved through the shared handle. + fn workspace_dir(&self) -> PathBuf { + match self.workspace_handle.read() { + Ok(guard) => guard.clone(), + Err(poisoned) => poisoned.into_inner().clone(), } } @@ -150,7 +162,7 @@ impl ProfileMdRenderer { lines.join("\n") }; - replace_managed_block(&self.workspace_dir, spec.block_name, spec.heading, body) + replace_managed_block(&self.workspace_dir(), spec.block_name, spec.heading, body) .map_err(|e| { anyhow::anyhow!( "[learning::profile_md_renderer] failed to write block '{}': {e}", @@ -260,7 +272,7 @@ mod tests { let conn = Connection::open_in_memory().unwrap(); conn.execute_batch(PROFILE_INIT_SQL).unwrap(); let cache = make_cache(Arc::new(Mutex::new(conn))); - let renderer = ProfileMdRenderer::new(Arc::clone(&cache), tmp.path().to_path_buf()); + let renderer = ProfileMdRenderer::new(Arc::clone(&cache), Arc::new(RwLock::new(tmp.path().to_path_buf()))); (cache, renderer, tmp) } @@ -502,7 +514,7 @@ mod tests { let conn = Connection::open_in_memory().unwrap(); conn.execute_batch(PROFILE_INIT_SQL).unwrap(); let cache = make_cache(Arc::new(Mutex::new(conn))); - let renderer = Arc::new(ProfileMdRenderer::new(cache, tmp.path().to_path_buf())); + let renderer = Arc::new(ProfileMdRenderer::new(cache, Arc::new(RwLock::new(tmp.path().to_path_buf())))); // subscribe_global requires a running runtime; just verify the type works. let _renderer_ref = Arc::clone(&renderer); // We can't call subscribe_global in a unit test without a tokio runtime, diff --git a/src/openhuman/security/live_policy.rs b/src/openhuman/security/live_policy.rs index eb1933a258..34f834f9fd 100644 --- a/src/openhuman/security/live_policy.rs +++ b/src/openhuman/security/live_policy.rs @@ -117,6 +117,56 @@ pub fn update_action_dir(new_action_dir: PathBuf) -> Result { Ok(gen) } +/// Swap the workspace root on the process-global live policy after a +/// post-login active-user switch (issue #4398). +/// +/// The agent sandbox confines writes to `workspace_dir` (memory DBs, sessions, +/// tokens). When the core started pre-login, that root was +/// `~/.openhuman/users/local/`; after sign-in the agent's writes must be +/// confined to `users//`. Updates the stored `workspace_dir` (so a +/// later [`reload_from`] keeps the new root) and rebuilds the current policy +/// via [`SecurityPolicy::from_config`] against the new workspace + the stored +/// action root — using `from_config` (not a field poke) so the policy's +/// lazily-cached canonical workspace path is rebuilt fresh rather than left +/// pointing at the old root. No-op if nothing has been [`install`]ed yet +/// (e.g. a CLI invocation that never started a session runtime), mirroring +/// [`set_action_dir`]. Tools read [`current`] at call time, so the next tool +/// call sees the new sandbox root. +pub fn set_workspace_dir( + new_workspace: PathBuf, + autonomy_config: &crate::openhuman::config::AutonomyConfig, +) { + let Some(state) = STATE.get() else { + tracing::debug!( + "[security:live_policy] set_workspace_dir called before install; no live policy to swap" + ); + return; + }; + + if let Ok(mut guard) = state.workspace_dir.write() { + *guard = new_workspace.clone(); + } + let action = state + .action_dir + .read() + .map(|g| g.clone()) + .unwrap_or_default(); + let rebuilt = Arc::new(SecurityPolicy::from_config( + autonomy_config, + &new_workspace, + &action, + )); + if let Ok(mut guard) = state.policy.write() { + *guard = rebuilt; + } + let gen = state.generation.fetch_add(1, Ordering::Relaxed) + 1; + tracing::info!( + generation = gen, + workspace_dir = %new_workspace.display(), + "[security:live_policy] SecurityPolicy workspace_dir swapped after active-user switch" + ); +} + /// Rebuild the policy from `autonomy_config` against the stored workspace dir /// and swap it in, bumping the generation counter. No-op if nothing has been /// installed yet (e.g. a CLI invocation that never started a session runtime). @@ -305,4 +355,65 @@ mod tests { "reload after set_action_dir must keep the swapped root" ); } + + // Issue #4398 — after a post-login active-user switch, the agent sandbox + // root must move from the pre-login `users/local/` workspace to the + // activated user's workspace, so file-writing tools (which read `current()` + // at call time) stay confined to the right place. Preserves autonomy and + // action_dir, bumps the generation, and survives a later reload. + #[test] + fn set_workspace_dir_swaps_root_and_preserves_other_access() { + let _env = crate::openhuman::config::TEST_ENV_LOCK + .lock() + .unwrap_or_else(|e| e.into_inner()); + let pre_login = std::env::temp_dir().join("openhuman_set_ws_test_local"); + let action = std::env::temp_dir().join("openhuman_set_ws_test_action"); + let initial = Arc::new(SecurityPolicy { + autonomy: AutonomyLevel::Full, + workspace_dir: pre_login.clone(), + action_dir: action.clone(), + ..SecurityPolicy::default() + }); + install(initial, pre_login.clone(), action.clone()); + assert_eq!( + current().expect("policy installed").workspace_dir, + pre_login, + "precondition: workspace starts at the pre-login root" + ); + + let before = generation(); + let active_user = std::env::temp_dir().join("openhuman_set_ws_test_user_42"); + let cfg = AutonomyConfig { + level: AutonomyLevel::Full, + ..AutonomyConfig::default() + }; + set_workspace_dir(active_user.clone(), &cfg); + + assert!( + generation() > before, + "generation must increase on workspace_dir swap" + ); + let now = current().expect("policy still installed"); + assert_eq!( + now.workspace_dir, active_user, + "live policy must reflect the activated user's workspace" + ); + assert_eq!( + now.action_dir, action, + "action_dir must survive a workspace_dir swap" + ); + assert_eq!( + now.autonomy, + AutonomyLevel::Full, + "autonomy level must survive a workspace_dir swap" + ); + + // The stored workspace is updated, so a later reload keeps the new root. + reload_from(&cfg); + assert_eq!( + current().expect("policy still installed").workspace_dir, + active_user, + "reload after set_workspace_dir must keep the swapped workspace" + ); + } } From de1fc967904004ab3bf5907d857724cbbfbfa48c Mon Sep 17 00:00:00 2001 From: M3gA-Mind Date: Thu, 2 Jul 2026 18:31:18 +0530 Subject: [PATCH 2/3] fix(channels): satisfy fmt + clippy in workspace rebind path - rebind_in: make the RwLock write match a statement so the write guard temporary drops before the handle Arc (fixes E0597 borrow-lifetime error) - apply cargo fmt to touched channels/learning files --- src/openhuman/channels/mod.rs | 4 ++-- .../channels/providers/telegram/bus_tests.rs | 24 ++++++++++++++----- src/openhuman/channels/runtime/workspace.rs | 2 +- src/openhuman/learning/profile_md_renderer.rs | 10 ++++++-- 4 files changed, 29 insertions(+), 11 deletions(-) diff --git a/src/openhuman/channels/mod.rs b/src/openhuman/channels/mod.rs index 687c3d391d..3725f4f23d 100644 --- a/src/openhuman/channels/mod.rs +++ b/src/openhuman/channels/mod.rs @@ -67,6 +67,6 @@ pub use controllers::{ChannelAuthMode, ChannelDefinition}; // the prompt-building code. Re-exported here for callers that used the // old `channels::build_system_prompt` path. pub use crate::openhuman::context::channels_prompt::build_system_prompt; -pub use runtime::start_channels; -pub use runtime::rebind_workspace; pub use runtime::rebind_memory; +pub use runtime::rebind_workspace; +pub use runtime::start_channels; diff --git a/src/openhuman/channels/providers/telegram/bus_tests.rs b/src/openhuman/channels/providers/telegram/bus_tests.rs index 5da3824976..fdcc4f929a 100644 --- a/src/openhuman/channels/providers/telegram/bus_tests.rs +++ b/src/openhuman/channels/providers/telegram/bus_tests.rs @@ -5,7 +5,9 @@ use tempfile::tempdir; #[tokio::test] async fn subscriber_marks_busy_on_received_and_clears_on_processed() { let dir = tempdir().expect("tempdir"); - let subscriber = TelegramRemoteSubscriber::new(std::sync::Arc::new(std::sync::RwLock::new(dir.path().to_path_buf()))); + let subscriber = TelegramRemoteSubscriber::new(std::sync::Arc::new(std::sync::RwLock::new( + dir.path().to_path_buf(), + ))); assert_eq!(subscriber.name(), "telegram::remote_control"); assert_eq!(subscriber.domains(), Some(&["channel"][..])); @@ -50,7 +52,9 @@ async fn subscriber_marks_busy_on_received_and_clears_on_processed() { #[tokio::test] async fn subscriber_ignores_non_telegram_channel_events() { let dir = tempdir().expect("tempdir"); - let subscriber = TelegramRemoteSubscriber::new(std::sync::Arc::new(std::sync::RwLock::new(dir.path().to_path_buf()))); + let subscriber = TelegramRemoteSubscriber::new(std::sync::Arc::new(std::sync::RwLock::new( + dir.path().to_path_buf(), + ))); subscriber .handle(&DomainEvent::ChannelMessageReceived { @@ -75,7 +79,9 @@ async fn subscriber_ignores_non_telegram_channel_events() { #[tokio::test] async fn telegram_received_matching_workspace_sets_busy() { let dir = tempdir().expect("tempdir"); - let subscriber = TelegramRemoteSubscriber::new(std::sync::Arc::new(std::sync::RwLock::new(dir.path().to_path_buf()))); + let subscriber = TelegramRemoteSubscriber::new(std::sync::Arc::new(std::sync::RwLock::new( + dir.path().to_path_buf(), + ))); subscriber .handle(&DomainEvent::ChannelMessageReceived { @@ -99,7 +105,9 @@ async fn telegram_received_matching_workspace_sets_busy() { async fn telegram_received_stale_workspace_does_not_set_busy() { let dir = tempdir().expect("tempdir"); let stale = tempdir().expect("stale tempdir"); - let subscriber = TelegramRemoteSubscriber::new(std::sync::Arc::new(std::sync::RwLock::new(dir.path().to_path_buf()))); + let subscriber = TelegramRemoteSubscriber::new(std::sync::Arc::new(std::sync::RwLock::new( + dir.path().to_path_buf(), + ))); subscriber .handle(&DomainEvent::ChannelMessageReceived { @@ -122,7 +130,9 @@ async fn telegram_received_stale_workspace_does_not_set_busy() { #[tokio::test] async fn telegram_processed_matching_workspace_clears_busy() { let dir = tempdir().expect("tempdir"); - let subscriber = TelegramRemoteSubscriber::new(std::sync::Arc::new(std::sync::RwLock::new(dir.path().to_path_buf()))); + let subscriber = TelegramRemoteSubscriber::new(std::sync::Arc::new(std::sync::RwLock::new( + dir.path().to_path_buf(), + ))); // First mark as busy via a matching received event. subscriber @@ -169,7 +179,9 @@ async fn telegram_processed_matching_workspace_clears_busy() { async fn telegram_processed_stale_workspace_does_not_clear_busy() { let dir = tempdir().expect("tempdir"); let stale = tempdir().expect("stale tempdir"); - let subscriber = TelegramRemoteSubscriber::new(std::sync::Arc::new(std::sync::RwLock::new(dir.path().to_path_buf()))); + let subscriber = TelegramRemoteSubscriber::new(std::sync::Arc::new(std::sync::RwLock::new( + dir.path().to_path_buf(), + ))); // Mark as busy via a matching received event. subscriber diff --git a/src/openhuman/channels/runtime/workspace.rs b/src/openhuman/channels/runtime/workspace.rs index 2a1f3b1699..81772cba3f 100644 --- a/src/openhuman/channels/runtime/workspace.rs +++ b/src/openhuman/channels/runtime/workspace.rs @@ -71,7 +71,7 @@ fn rebind_in(slot: &HandleSlot, workspace_dir: PathBuf) { } } Err(e) => log::warn!("[channels:runtime] rebind_workspace: handle poisoned: {e}"), - } + }; } /// Register the live runtime's workspace handle so [`rebind_workspace`] can diff --git a/src/openhuman/learning/profile_md_renderer.rs b/src/openhuman/learning/profile_md_renderer.rs index 99c0eb622d..92c31ce90a 100644 --- a/src/openhuman/learning/profile_md_renderer.rs +++ b/src/openhuman/learning/profile_md_renderer.rs @@ -272,7 +272,10 @@ mod tests { let conn = Connection::open_in_memory().unwrap(); conn.execute_batch(PROFILE_INIT_SQL).unwrap(); let cache = make_cache(Arc::new(Mutex::new(conn))); - let renderer = ProfileMdRenderer::new(Arc::clone(&cache), Arc::new(RwLock::new(tmp.path().to_path_buf()))); + let renderer = ProfileMdRenderer::new( + Arc::clone(&cache), + Arc::new(RwLock::new(tmp.path().to_path_buf())), + ); (cache, renderer, tmp) } @@ -514,7 +517,10 @@ mod tests { let conn = Connection::open_in_memory().unwrap(); conn.execute_batch(PROFILE_INIT_SQL).unwrap(); let cache = make_cache(Arc::new(Mutex::new(conn))); - let renderer = Arc::new(ProfileMdRenderer::new(cache, Arc::new(RwLock::new(tmp.path().to_path_buf())))); + let renderer = Arc::new(ProfileMdRenderer::new( + cache, + Arc::new(RwLock::new(tmp.path().to_path_buf())), + )); // subscribe_global requires a running runtime; just verify the type works. let _renderer_ref = Arc::clone(&renderer); // We can't call subscribe_global in a unit test without a tokio runtime, From 1455ff961884eb3f005b7a6347b235b658078050 Mon Sep 17 00:00:00 2001 From: M3gA-Mind Date: Thu, 2 Jul 2026 20:24:02 +0530 Subject: [PATCH 3/3] test(channels,learning): update test call sites to rebindable handle signatures The #4398 change migrated three constructors to shared handles but left three test call sites on the old signatures, breaking test compilation (caught by the Rust Core Coverage lane, which builds tests; plain clippy -p openhuman does not): - channels/routes_tests.rs: TelegramRemoteSubscriber::new now takes Arc> (workspace handle), not PathBuf. - channels/tests/memory.rs: ChannelRuntimeContext.memory renamed to memory_handle: Arc>>. - tests/learning_phase4_integration_test.rs: ProfileMdRenderer::new now takes Arc> (workspace handle), not PathBuf. Values unchanged (wrapped in the new handle types); behaviour identical. --- src/openhuman/channels/routes_tests.rs | 4 +++- src/openhuman/channels/tests/memory.rs | 2 +- tests/learning_phase4_integration_test.rs | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/openhuman/channels/routes_tests.rs b/src/openhuman/channels/routes_tests.rs index f11c28ef57..ecac826c58 100644 --- a/src/openhuman/channels/routes_tests.rs +++ b/src/openhuman/channels/routes_tests.rs @@ -517,7 +517,9 @@ async fn handle_runtime_command_telegram_new_status_and_sessions_round_trip() { }, ); - let subscriber = TelegramRemoteSubscriber::new(tempdir.path().to_path_buf()); + let subscriber = TelegramRemoteSubscriber::new(Arc::new(std::sync::RwLock::new( + tempdir.path().to_path_buf(), + ))); subscriber .handle(&DomainEvent::ChannelMessageReceived { channel: "telegram".into(), diff --git a/src/openhuman/channels/tests/memory.rs b/src/openhuman/channels/tests/memory.rs index 4a2cfb561a..018da9066f 100644 --- a/src/openhuman/channels/tests/memory.rs +++ b/src/openhuman/channels/tests/memory.rs @@ -224,7 +224,7 @@ async fn process_channel_message_uses_autosaved_memory_after_history_is_cleared( channels_by_name: Arc::new(channels_by_name), provider: provider_impl.clone(), default_provider: Arc::new("test-provider".to_string()), - memory, + memory_handle: Arc::new(std::sync::RwLock::new(memory)), tools_registry: Arc::new(vec![]), system_prompt: Arc::new("test-system-prompt".to_string()), model: Arc::new("test-model".to_string()), diff --git a/tests/learning_phase4_integration_test.rs b/tests/learning_phase4_integration_test.rs index 687cf52b11..6754f17b76 100644 --- a/tests/learning_phase4_integration_test.rs +++ b/tests/learning_phase4_integration_test.rs @@ -77,7 +77,7 @@ impl TestHarness { let workspace = TempDir::new().unwrap(); let renderer = Arc::new(ProfileMdRenderer::new( Arc::clone(&cache), - workspace.path().to_path_buf(), + Arc::new(std::sync::RwLock::new(workspace.path().to_path_buf())), )); // Drain any stale candidates from prior tests so they don't affect