diff --git a/src/openhuman/channels/context.rs b/src/openhuman/channels/context.rs index dc4cee0d4d..94c4b9cf8c 100644 --- a/src/openhuman/channels/context.rs +++ b/src/openhuman/channels/context.rs @@ -6,7 +6,7 @@ use crate::openhuman::tools::Tool; use crate::openhuman::util::truncate_with_ellipsis; use std::collections::HashMap; use std::path::PathBuf; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, RwLock}; /// Per-sender conversation history for channel messages. pub(crate) type ConversationHistoryMap = Arc>>>; @@ -47,7 +47,10 @@ pub(crate) struct ChannelRuntimeContext { pub(crate) channels_by_name: Arc>>, pub(crate) provider: Arc, pub(crate) default_provider: Arc, - pub(crate) memory: Arc, + /// Re-bindable channel memory store (issue #4398). Held behind a lock so a + /// post-login `channels::rebind_memory` swaps it for the activated user's + /// workspace store. Read via [`ChannelRuntimeContext::memory`]. + pub(crate) memory_handle: Arc>>, pub(crate) tools_registry: Arc>>, pub(crate) system_prompt: Arc, pub(crate) model: Arc, @@ -63,12 +66,42 @@ pub(crate) struct ChannelRuntimeContext { pub(crate) reliability: Arc, pub(crate) provider_runtime_options: crate::openhuman::inference::provider::ProviderRuntimeOptions, - pub(crate) workspace_dir: Arc, + /// Re-bindable workspace handle. Started from the pre-login `Config` + /// snapshot, then re-pointed at the activated user's workspace on login + /// via `channels::runtime::rebind_workspace` (issue #4398). Read through + /// [`ChannelRuntimeContext::workspace_dir`] so every turn re-resolves. + pub(crate) workspace_handle: Arc>, pub(crate) message_timeout_secs: u64, pub(crate) multimodal: crate::openhuman::config::MultimodalConfig, pub(crate) multimodal_files: crate::openhuman::config::MultimodalFileConfig, } +impl ChannelRuntimeContext { + /// Current workspace directory, re-resolved through the re-bindable handle. + /// + /// After a post-login `rebind_workspace`, this returns the activated + /// user's `users//` workspace instead of the pre-login + /// `users/local/` snapshot the runtime was started with (issue #4398). + pub(crate) fn workspace_dir(&self) -> PathBuf { + match self.workspace_handle.read() { + Ok(guard) => guard.clone(), + Err(poisoned) => poisoned.into_inner().clone(), + } + } + + /// Current channel memory store, re-resolved through the re-bindable handle. + /// + /// After a post-login `rebind_memory`, this returns a store rooted at the + /// activated user's workspace instead of the pre-login `users/local/` store + /// the runtime was started with (issue #4398). + pub(crate) fn memory(&self) -> Arc { + match self.memory_handle.read() { + Ok(guard) => Arc::clone(&guard), + Err(poisoned) => Arc::clone(&poisoned.into_inner()), + } + } +} + pub(crate) fn conversation_memory_key(msg: &super::traits::ChannelMessage) -> String { format!("{}_{}_{}", msg.channel, msg.sender, msg.id) } @@ -331,9 +364,9 @@ mod tests { channels_by_name: Arc::new(HashMap::new()), provider: Arc::new(DummyProvider), default_provider: Arc::new("default".into()), - memory: Arc::new(MockMemory { + memory_handle: Arc::new(RwLock::new(Arc::new(MockMemory { entries: Vec::new(), - }), + }))), tools_registry: Arc::new(vec![Box::new(DummyTool) as Box]), system_prompt: Arc::new("prompt".into()), model: Arc::new("model".into()), @@ -349,7 +382,7 @@ mod tests { reliability: Arc::new(crate::openhuman::config::ReliabilityConfig::default()), provider_runtime_options: crate::openhuman::inference::provider::ProviderRuntimeOptions::default(), - workspace_dir: Arc::new(PathBuf::from("/tmp")), + workspace_handle: Arc::new(RwLock::new(PathBuf::from("/tmp"))), message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS, multimodal: crate::openhuman::config::MultimodalConfig::default(), multimodal_files: crate::openhuman::config::MultimodalFileConfig::default(), diff --git a/src/openhuman/channels/mod.rs b/src/openhuman/channels/mod.rs index 6a2c2f1c09..3725f4f23d 100644 --- a/src/openhuman/channels/mod.rs +++ b/src/openhuman/channels/mod.rs @@ -67,4 +67,6 @@ pub use controllers::{ChannelAuthMode, ChannelDefinition}; // the prompt-building code. Re-exported here for callers that used the // old `channels::build_system_prompt` path. pub use crate::openhuman::context::channels_prompt::build_system_prompt; +pub use runtime::rebind_memory; +pub use runtime::rebind_workspace; pub use runtime::start_channels; diff --git a/src/openhuman/channels/providers/telegram/bus.rs b/src/openhuman/channels/providers/telegram/bus.rs index b58fb20280..21f10889e7 100644 --- a/src/openhuman/channels/providers/telegram/bus.rs +++ b/src/openhuman/channels/providers/telegram/bus.rs @@ -4,22 +4,37 @@ use crate::core::event_bus::{DomainEvent, EventHandler}; use crate::openhuman::channels::providers::telegram::session_store::with_store; use async_trait::async_trait; use std::path::PathBuf; +use std::sync::{Arc, RwLock}; const LOG_PREFIX: &str = "[telegram-remote]"; /// Tracks Telegram turn lifecycle via channel domain events and exposes busy /// state for `/status`. pub struct TelegramRemoteSubscriber { - workspace_dir: PathBuf, + /// Re-bindable workspace handle (issue #4398). Shared with the channel + /// runtime context so a post-login `rebind_workspace` re-points busy-state + /// persistence at the activated user's workspace. Read at event time so the + /// stale-workspace guard below compares against the CURRENT workspace — the + /// dispatch loop stamps events with `ctx.workspace_dir()`, which after a + /// re-bind is the new path; a baked snapshot here would drop every event. + workspace_handle: Arc>, } impl TelegramRemoteSubscriber { - pub fn new(workspace_dir: PathBuf) -> Self { - Self { workspace_dir } + pub fn new(workspace_handle: Arc>) -> Self { + Self { workspace_handle } + } + + /// Current workspace directory, re-resolved through the shared handle. + fn workspace_dir(&self) -> PathBuf { + match self.workspace_handle.read() { + Ok(guard) => guard.clone(), + Err(poisoned) => poisoned.into_inner().clone(), + } } async fn set_busy(&self, reply_target: &str, busy: bool) { - let workspace_dir = self.workspace_dir.clone(); + let workspace_dir = self.workspace_dir(); let reply_target_owned = reply_target.to_string(); let join_result = tokio::task::spawn_blocking(move || { with_store(&workspace_dir, |store| { @@ -59,12 +74,12 @@ impl EventHandler for TelegramRemoteSubscriber { workspace_dir, .. } if channel == "telegram" => { - if *workspace_dir != self.workspace_dir { + if *workspace_dir != self.workspace_dir() { tracing::debug!( "{LOG_PREFIX} dropping stale-workspace ChannelMessageReceived \ event_ws={} self_ws={}", workspace_dir.display(), - self.workspace_dir.display() + self.workspace_dir().display() ); return; } @@ -79,12 +94,12 @@ impl EventHandler for TelegramRemoteSubscriber { workspace_dir, .. } if channel == "telegram" => { - if *workspace_dir != self.workspace_dir { + if *workspace_dir != self.workspace_dir() { tracing::debug!( "{LOG_PREFIX} dropping stale-workspace ChannelMessageProcessed \ event_ws={} self_ws={}", workspace_dir.display(), - self.workspace_dir.display() + self.workspace_dir().display() ); return; } diff --git a/src/openhuman/channels/providers/telegram/bus_tests.rs b/src/openhuman/channels/providers/telegram/bus_tests.rs index c171ec8f82..fdcc4f929a 100644 --- a/src/openhuman/channels/providers/telegram/bus_tests.rs +++ b/src/openhuman/channels/providers/telegram/bus_tests.rs @@ -5,7 +5,9 @@ use tempfile::tempdir; #[tokio::test] async fn subscriber_marks_busy_on_received_and_clears_on_processed() { let dir = tempdir().expect("tempdir"); - let subscriber = TelegramRemoteSubscriber::new(dir.path().to_path_buf()); + let subscriber = TelegramRemoteSubscriber::new(std::sync::Arc::new(std::sync::RwLock::new( + dir.path().to_path_buf(), + ))); assert_eq!(subscriber.name(), "telegram::remote_control"); assert_eq!(subscriber.domains(), Some(&["channel"][..])); @@ -50,7 +52,9 @@ async fn subscriber_marks_busy_on_received_and_clears_on_processed() { #[tokio::test] async fn subscriber_ignores_non_telegram_channel_events() { let dir = tempdir().expect("tempdir"); - let subscriber = TelegramRemoteSubscriber::new(dir.path().to_path_buf()); + let subscriber = TelegramRemoteSubscriber::new(std::sync::Arc::new(std::sync::RwLock::new( + dir.path().to_path_buf(), + ))); subscriber .handle(&DomainEvent::ChannelMessageReceived { @@ -75,7 +79,9 @@ async fn subscriber_ignores_non_telegram_channel_events() { #[tokio::test] async fn telegram_received_matching_workspace_sets_busy() { let dir = tempdir().expect("tempdir"); - let subscriber = TelegramRemoteSubscriber::new(dir.path().to_path_buf()); + let subscriber = TelegramRemoteSubscriber::new(std::sync::Arc::new(std::sync::RwLock::new( + dir.path().to_path_buf(), + ))); subscriber .handle(&DomainEvent::ChannelMessageReceived { @@ -99,7 +105,9 @@ async fn telegram_received_matching_workspace_sets_busy() { async fn telegram_received_stale_workspace_does_not_set_busy() { let dir = tempdir().expect("tempdir"); let stale = tempdir().expect("stale tempdir"); - let subscriber = TelegramRemoteSubscriber::new(dir.path().to_path_buf()); + let subscriber = TelegramRemoteSubscriber::new(std::sync::Arc::new(std::sync::RwLock::new( + dir.path().to_path_buf(), + ))); subscriber .handle(&DomainEvent::ChannelMessageReceived { @@ -122,7 +130,9 @@ async fn telegram_received_stale_workspace_does_not_set_busy() { #[tokio::test] async fn telegram_processed_matching_workspace_clears_busy() { let dir = tempdir().expect("tempdir"); - let subscriber = TelegramRemoteSubscriber::new(dir.path().to_path_buf()); + let subscriber = TelegramRemoteSubscriber::new(std::sync::Arc::new(std::sync::RwLock::new( + dir.path().to_path_buf(), + ))); // First mark as busy via a matching received event. subscriber @@ -169,7 +179,9 @@ async fn telegram_processed_matching_workspace_clears_busy() { async fn telegram_processed_stale_workspace_does_not_clear_busy() { let dir = tempdir().expect("tempdir"); let stale = tempdir().expect("stale tempdir"); - let subscriber = TelegramRemoteSubscriber::new(dir.path().to_path_buf()); + let subscriber = TelegramRemoteSubscriber::new(std::sync::Arc::new(std::sync::RwLock::new( + dir.path().to_path_buf(), + ))); // Mark as busy via a matching received event. subscriber diff --git a/src/openhuman/channels/providers/telegram/remote_control.rs b/src/openhuman/channels/providers/telegram/remote_control.rs index 93c35c232f..66c1a6ddb9 100644 --- a/src/openhuman/channels/providers/telegram/remote_control.rs +++ b/src/openhuman/channels/providers/telegram/remote_control.rs @@ -101,7 +101,7 @@ async fn build_status_response(ctx: &ChannelRuntimeContext, msg: &ChannelMessage .map(|h| h.len()) .unwrap_or(0); - let workspace = ctx.workspace_dir.clone(); + let workspace = ctx.workspace_dir(); let reply_target = msg.reply_target.clone(); // Use with_store_read (no disk write) and spawn_blocking to keep the async // executor thread unblocked during mutex acquisition + file I/O. @@ -154,7 +154,7 @@ async fn build_status_response(ctx: &ChannelRuntimeContext, msg: &ChannelMessage } async fn build_sessions_response(ctx: &ChannelRuntimeContext, msg: &ChannelMessage) -> String { - let workspace = ctx.workspace_dir.clone(); + let workspace = ctx.workspace_dir(); let reply_target = msg.reply_target.clone(); // Read-only lookup — use with_store_read (no save) wrapped in spawn_blocking. let active_thread_id = tokio::task::spawn_blocking(move || { @@ -166,7 +166,7 @@ async fn build_sessions_response(ctx: &ChannelRuntimeContext, msg: &ChannelMessa .ok() .and_then(|res| res.ok()) .flatten(); - let workspace = ctx.workspace_dir.as_path(); + let workspace = ctx.workspace_dir(); let threads = match conversations::list_threads(workspace.to_path_buf()) { Ok(list) => list, @@ -215,7 +215,7 @@ fn format_session_line(thread: &ConversationThread, active_id: Option<&str>) -> } async fn build_new_session_response(ctx: &ChannelRuntimeContext, msg: &ChannelMessage) -> String { - let workspace = ctx.workspace_dir.as_path(); + let workspace = ctx.workspace_dir(); let sender_key = conversation_history_key(msg); let thread_id = format!("thread-{}", uuid::Uuid::new_v4()); let now = chrono::Utc::now(); @@ -243,7 +243,7 @@ async fn build_new_session_response(ctx: &ChannelRuntimeContext, msg: &ChannelMe clear_sender_history(ctx, &sender_key); - let workspace_dir = ctx.workspace_dir.clone(); + let workspace_dir = ctx.workspace_dir(); let reply_target_owned = msg.reply_target.clone(); let thread_id_owned = thread_id.clone(); let sender_key_owned = sender_key.clone(); diff --git a/src/openhuman/channels/routes.rs b/src/openhuman/channels/routes.rs index e1d42eb2a8..77dae9c59d 100644 --- a/src/openhuman/channels/routes.rs +++ b/src/openhuman/channels/routes.rs @@ -317,7 +317,7 @@ pub(crate) async fn handle_runtime_command_if_needed( } } ChannelRuntimeCommand::ShowModel => { - build_models_help_response(¤t, ctx.workspace_dir.as_path()) + build_models_help_response(¤t, ctx.workspace_dir().as_path()) } ChannelRuntimeCommand::SetModel(raw_model) => { let model = raw_model.trim().trim_matches('`').to_string(); diff --git a/src/openhuman/channels/routes_tests.rs b/src/openhuman/channels/routes_tests.rs index 948fba5753..ecac826c58 100644 --- a/src/openhuman/channels/routes_tests.rs +++ b/src/openhuman/channels/routes_tests.rs @@ -135,7 +135,7 @@ fn runtime_context(workspace_dir: PathBuf) -> ChannelRuntimeContext { channels_by_name: Arc::new(HashMap::new()), provider: Arc::new(DummyProvider), default_provider: Arc::new("openai".into()), - memory: Arc::new(DummyMemory), + memory_handle: Arc::new(std::sync::RwLock::new(Arc::new(DummyMemory))), tools_registry: Arc::new(vec![Box::new(DummyTool) as Box]), system_prompt: Arc::new("prompt".into()), model: Arc::new("reasoning-v1".into()), @@ -151,7 +151,7 @@ fn runtime_context(workspace_dir: PathBuf) -> ChannelRuntimeContext { reliability: Arc::new(crate::openhuman::config::ReliabilityConfig::default()), provider_runtime_options: crate::openhuman::inference::provider::ProviderRuntimeOptions::default(), - workspace_dir: Arc::new(workspace_dir), + workspace_handle: Arc::new(std::sync::RwLock::new(workspace_dir)), message_timeout_secs: 60, multimodal: crate::openhuman::config::MultimodalConfig::default(), multimodal_files: crate::openhuman::config::MultimodalFileConfig::default(), @@ -517,7 +517,9 @@ async fn handle_runtime_command_telegram_new_status_and_sessions_round_trip() { }, ); - let subscriber = TelegramRemoteSubscriber::new(tempdir.path().to_path_buf()); + let subscriber = TelegramRemoteSubscriber::new(Arc::new(std::sync::RwLock::new( + tempdir.path().to_path_buf(), + ))); subscriber .handle(&DomainEvent::ChannelMessageReceived { channel: "telegram".into(), diff --git a/src/openhuman/channels/runtime/dispatch/processor.rs b/src/openhuman/channels/runtime/dispatch/processor.rs index 9648cd8260..8742fbe805 100644 --- a/src/openhuman/channels/runtime/dispatch/processor.rs +++ b/src/openhuman/channels/runtime/dispatch/processor.rs @@ -129,7 +129,7 @@ pub(crate) async fn process_channel_message( reply_target: msg.reply_target.clone(), content: msg.content.clone(), thread_ts: msg.thread_ts.clone(), - workspace_dir: ctx.workspace_dir.as_ref().clone(), + workspace_dir: ctx.workspace_dir(), }); let target_channel = ctx.channels_by_name.get(&msg.channel).cloned(); @@ -214,13 +214,16 @@ pub(crate) async fn process_channel_message( } }; + // Re-resolve the memory store through the re-bindable handle so a + // post-login active-user switch routes channel memory to the activated + // user's workspace (issue #4398). + let memory = ctx.memory(); let memory_context = - build_memory_context(ctx.memory.as_ref(), &msg.content, ctx.min_relevance_score).await; + build_memory_context(memory.as_ref(), &msg.content, ctx.min_relevance_score).await; if ctx.auto_save_memory { let autosave_key = conversation_memory_key(&msg); - let _ = ctx - .memory + let _ = memory .store( "", &autosave_key, @@ -619,7 +622,7 @@ pub(crate) async fn process_channel_message( model: route.model.clone(), elapsed_ms: started_at.elapsed().as_millis() as u64, success: false, - workspace_dir: ctx.workspace_dir.as_ref().clone(), + workspace_dir: ctx.workspace_dir(), }); return; } @@ -760,7 +763,7 @@ pub(crate) async fn process_channel_message( model: response_model, elapsed_ms: started_at.elapsed().as_millis() as u64, success, - workspace_dir: ctx.workspace_dir.as_ref().clone(), + workspace_dir: ctx.workspace_dir(), }); } diff --git a/src/openhuman/channels/runtime/memory_rebind.rs b/src/openhuman/channels/runtime/memory_rebind.rs new file mode 100644 index 0000000000..d67c4cc698 --- /dev/null +++ b/src/openhuman/channels/runtime/memory_rebind.rs @@ -0,0 +1,148 @@ +//! Re-bindable channel-runtime memory store (issue #4398). +//! +//! The channel runtime builds its OWN local memory store at boot +//! (`create_memory_with_local_ai`) and hands it to the dispatch loop via +//! [`ChannelRuntimeContext`](crate::openhuman::channels::context::ChannelRuntimeContext). +//! Channel-level memory writes — conversation auto-save and memory-context +//! retrieval — go through that store (agent *tools* separately use the +//! already-rebound `memory::global` client). When the runtime starts pre-login, +//! that store is rooted at `~/.openhuman/users/local/`, so post-login channel +//! turns keep reading/writing the wrong workspace's memory DB until a restart. +//! +//! This module holds a process-global handle to the context's memory slot +//! (`Arc>>`) registered at boot, plus [`rebind_memory`] +//! which rebuilds the store for the activated user's workspace and swaps it in +//! place — the same shape as the workspace / live-policy re-bind seams. The +//! swap is race-safe: an in-flight turn keeps the old `Arc` it +//! already cloned; new turns pick up the new store. + +use std::sync::{Arc, OnceLock, RwLock}; + +use anyhow::Result; + +use crate::openhuman::config::Config; +use crate::openhuman::memory::Memory; +use crate::openhuman::memory_store; + +/// In-place-swappable memory store held by the live runtime context. +pub(crate) type MemoryHandle = Arc>>; + +type HandleSlot = RwLock>; + +static MEMORY_HANDLE: OnceLock = OnceLock::new(); + +fn handle_slot() -> &'static HandleSlot { + MEMORY_HANDLE.get_or_init(|| RwLock::new(None)) +} + +/// Build the channel-runtime memory store for `config`'s workspace. +/// +/// Mirrors the boot construction, including the keyword-only fallback (#3712): +/// if the configured embedder can't be built, degrade to +/// `embedding_provider = "none"` (NoopEmbedding) rather than failing, so the +/// channel runtime keeps working with reduced (keyword-only) memory. Shared by +/// `start_channels` (boot) and [`rebind_memory`] (post-login) so both paths +/// build the store identically. +pub(crate) fn build_channel_memory(config: &Config) -> Result> { + let local_embedding = config.workload_local_model("embeddings"); + let embedding_api_key = + crate::openhuman::embeddings::resolve_api_key(config, &config.memory.embedding_provider); + match memory_store::create_memory_with_local_ai( + &config.memory, + local_embedding.as_deref(), + &embedding_api_key, + &[], + Some(&config.storage.provider.config), + &config.workspace_dir, + ) { + Ok(mem) => Ok(Arc::from(mem)), + Err(e) => { + tracing::error!( + error = %format!("{e:#}"), + provider = %config.memory.embedding_provider, + "[channels] memory embedder build failed — falling back to keyword-only \ + memory so channels still start" + ); + let mut fallback_memory = config.memory.clone(); + fallback_memory.embedding_provider = "none".to_string(); + Ok(Arc::from(memory_store::create_memory_with_local_ai( + &fallback_memory, + local_embedding.as_deref(), + &embedding_api_key, + &[], + Some(&config.storage.provider.config), + &config.workspace_dir, + )?)) + } + } +} + +fn register_in(slot: &HandleSlot, handle: MemoryHandle) { + match slot.write() { + Ok(mut guard) => *guard = Some(handle), + Err(e) => log::warn!("[channels:runtime] register_memory_handle: slot poisoned: {e}"), + } +} + +fn rebind_in(slot: &HandleSlot, config: &Config) { + let handle = match slot.read() { + Ok(guard) => guard.as_ref().map(Arc::clone), + Err(e) => { + log::warn!("[channels:runtime] rebind_memory: slot read poisoned: {e}"); + return; + } + }; + let Some(handle) = handle else { + log::debug!( + "[channels:runtime] rebind_memory: no live runtime memory handle registered; \ + skipping re-bind to {}", + config.workspace_dir.display() + ); + return; + }; + match build_channel_memory(config) { + Ok(new_mem) => match handle.write() { + Ok(mut current) => { + log::info!( + "[channels:runtime] re-binding channel memory store to workspace {}", + config.workspace_dir.display() + ); + *current = new_mem; + } + Err(e) => log::warn!("[channels:runtime] rebind_memory: handle poisoned: {e}"), + }, + Err(e) => log::warn!( + "[channels:runtime] rebind_memory: rebuild for {} failed; keeping existing store: {e:#}", + config.workspace_dir.display() + ), + } +} + +/// Register the live runtime's memory handle so [`rebind_memory`] can swap it +/// after login. Called once from `start_channels`. +pub(crate) fn register_memory_handle(handle: MemoryHandle) { + register_in(handle_slot(), handle); +} + +/// Rebuild the channel-runtime memory store for `config`'s workspace and swap +/// it into the live context. Invoked from `credentials::ops::store_session` +/// after activation. No-op when the channel runtime is not running (no handle +/// registered) or when the rebuild fails (keeps the existing store). +pub fn rebind_memory(config: &Config) { + rebind_in(handle_slot(), config); +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn rebind_without_registered_handle_is_noop() { + // With no handle registered, `rebind_in` must return before it ever + // tries to build a store — so this neither panics nor touches disk. + let slot: HandleSlot = RwLock::new(None); + let config = crate::openhuman::config::Config::default(); + rebind_in(&slot, &config); + assert!(slot.read().unwrap().is_none()); + } +} diff --git a/src/openhuman/channels/runtime/mod.rs b/src/openhuman/channels/runtime/mod.rs index 346da3b010..82a0f24940 100644 --- a/src/openhuman/channels/runtime/mod.rs +++ b/src/openhuman/channels/runtime/mod.rs @@ -1,10 +1,14 @@ //! Channel runtime entry points. mod dispatch; +mod memory_rebind; mod startup; mod supervision; +mod workspace; +pub use memory_rebind::rebind_memory; pub use startup::start_channels; +pub use workspace::rebind_workspace; #[cfg(any(test, debug_assertions))] pub mod test_support; diff --git a/src/openhuman/channels/runtime/startup.rs b/src/openhuman/channels/runtime/startup.rs index 9785dee3b0..f18ffdee9e 100644 --- a/src/openhuman/channels/runtime/startup.rs +++ b/src/openhuman/channels/runtime/startup.rs @@ -39,7 +39,7 @@ use crate::openhuman::security::SecurityPolicy; use crate::openhuman::tools; use anyhow::Result; use std::collections::HashMap; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, RwLock}; /// How the channels runtime should construct its default chat provider. /// @@ -83,6 +83,15 @@ pub async fn start_channels(mut config: Config) -> Result<()> { let bus = event_bus::init_global(DEFAULT_CAPACITY); let _tracing_handle = bus.subscribe(Arc::new(TracingSubscriber)); crate::openhuman::health::bus::register_health_subscriber(); + // Re-bindable workspace handle (issue #4398). The runtime may have started + // pre-login against `users/local/`; register the handle so a later + // `store_session` can re-point it at the activated user's workspace without + // a restart. Created up-front so the workspace-scoped subscribers below + // (ProfileMdRenderer, TelegramRemoteSubscriber) and the runtime context all + // share the SAME `RwLock` — one `rebind_workspace` re-points them + // all at once. The context and the global slot hold clones of this handle. + let workspace_handle = Arc::new(RwLock::new(config.workspace_dir.clone())); + super::workspace::register_workspace_handle(Arc::clone(&workspace_handle)); crate::openhuman::workflows::bus::register_workflow_cleanup_subscriber(); crate::openhuman::memory_conversations::register_conversation_persistence_subscriber( config.workspace_dir.clone(), @@ -182,7 +191,7 @@ pub async fn start_channels(mut config: Config) -> Result<()> { use std::sync::Arc; let cache = Arc::new(FacetCache::new(client.profile_conn())); let renderer = - Arc::new(ProfileMdRenderer::new(cache, config.workspace_dir.clone())); + Arc::new(ProfileMdRenderer::new(cache, Arc::clone(&workspace_handle))); ProfileMdRenderer::subscribe(renderer) } else { tracing::debug!( @@ -296,44 +305,17 @@ pub async fn start_channels(mut config: Config) -> Result<()> { config.workspace_dir.clone(), )?; let temperature = config.default_temperature; - let local_embedding = config.workload_local_model("embeddings"); - let embedding_api_key = - crate::openhuman::embeddings::resolve_api_key(&config, &config.memory.embedding_provider); - // Build the memory store. A misconfigured/removed embedding provider (e.g. a - // stale `embedding_provider = "fastembed"` that the factory no longer knows) - // makes the embedder build fail — but that must NOT take every messaging - // channel offline (issue #3712). Fall back to keyword-only memory - // (`embedding_provider = "none"` → NoopEmbedding) so the channel listeners - // still start; semantic memory degrades gracefully instead of the whole - // runtime aborting. - let mem: Arc = match memory_store::create_memory_with_local_ai( - &config.memory, - local_embedding.as_deref(), - &embedding_api_key, - &[], - Some(&config.storage.provider.config), - &config.workspace_dir, - ) { - Ok(mem) => Arc::from(mem), - Err(e) => { - tracing::error!( - error = %format!("{e:#}"), - provider = %config.memory.embedding_provider, - "[channels] memory embedder build failed — falling back to keyword-only \ - memory so channels still start" - ); - let mut fallback_memory = config.memory.clone(); - fallback_memory.embedding_provider = "none".to_string(); - Arc::from(memory_store::create_memory_with_local_ai( - &fallback_memory, - local_embedding.as_deref(), - &embedding_api_key, - &[], - Some(&config.storage.provider.config), - &config.workspace_dir, - )?) - } - }; + // Build the channel-runtime memory store (with the #3712 keyword-only + // fallback). Extracted into `memory_rebind::build_channel_memory` so the + // post-login re-bind (#4398) rebuilds it identically. + let mem: Arc = super::memory_rebind::build_channel_memory(&config)?; + // Register a re-bindable handle to this store so a later `store_session` can + // swap it for the activated user's workspace without a restart. The context + // holds the same `RwLock`; a turn that already cloned the old store keeps + // using it, new turns pick up the swap. + let memory_handle: std::sync::Arc>> = + Arc::new(RwLock::new(Arc::clone(&mem))); + super::memory_rebind::register_memory_handle(Arc::clone(&memory_handle)); // Build system prompt from workspace identity files + skills let workspace = config.workspace_dir.clone(); let tools_registry = Arc::new(tools::all_tools_with_runtime( @@ -733,7 +715,7 @@ pub async fn start_channels(mut config: Config) -> Result<()> { let _telegram_remote_handle = if channels_by_name.contains_key("telegram") { let handle = bus.subscribe(Arc::new( crate::openhuman::channels::providers::telegram::TelegramRemoteSubscriber::new( - config.workspace_dir.clone(), + Arc::clone(&workspace_handle), ), )); tracing::debug!("[telegram-remote] registered TelegramRemoteSubscriber"); @@ -776,7 +758,7 @@ pub async fn start_channels(mut config: Config) -> Result<()> { channels_by_name, provider: Arc::clone(&provider), default_provider: Arc::new(provider_name), - memory: Arc::clone(&mem), + memory_handle, tools_registry: Arc::clone(&tools_registry), system_prompt: Arc::new(system_prompt), model: Arc::new(model.clone()), @@ -791,7 +773,7 @@ pub async fn start_channels(mut config: Config) -> Result<()> { inference_url: config.inference_url.clone(), reliability: Arc::new(config.reliability.clone()), provider_runtime_options, - workspace_dir: Arc::new(config.workspace_dir.clone()), + workspace_handle, message_timeout_secs, multimodal: config.multimodal.clone(), multimodal_files: config.multimodal_files.clone(), diff --git a/src/openhuman/channels/runtime/test_support.rs b/src/openhuman/channels/runtime/test_support.rs index a7b32ea3de..f9841a2fbf 100644 --- a/src/openhuman/channels/runtime/test_support.rs +++ b/src/openhuman/channels/runtime/test_support.rs @@ -431,13 +431,13 @@ pub async fn run_dispatch_harness(options: DispatchHarnessOptions) -> DispatchHa channels_by_name: Arc::new(channels_by_name), provider, default_provider: Arc::new("harness-provider".to_string()), - memory: Arc::new(HarnessMemory { + memory_handle: Arc::new(std::sync::RwLock::new(Arc::new(HarnessMemory { entries: options .memory_entries .into_iter() .map(memory_entry) .collect(), - }), + }))), tools_registry: Arc::new(vec![Box::new(HarnessTool) as Box]), system_prompt: Arc::new("system prompt".to_string()), model: Arc::new("harness-model".to_string()), @@ -452,7 +452,7 @@ pub async fn run_dispatch_harness(options: DispatchHarnessOptions) -> DispatchHa inference_url: None, reliability: Arc::new(ReliabilityConfig::default()), provider_runtime_options: ProviderRuntimeOptions::default(), - workspace_dir: Arc::new(PathBuf::from(std::env::temp_dir())), + workspace_handle: Arc::new(std::sync::RwLock::new(PathBuf::from(std::env::temp_dir()))), message_timeout_secs: options.timeout_secs, multimodal: MultimodalConfig::default(), multimodal_files: MultimodalFileConfig::default(), diff --git a/src/openhuman/channels/runtime/workspace.rs b/src/openhuman/channels/runtime/workspace.rs new file mode 100644 index 0000000000..81772cba3f --- /dev/null +++ b/src/openhuman/channels/runtime/workspace.rs @@ -0,0 +1,122 @@ +//! Process-global, re-bindable workspace handle for the channel runtime. +//! +//! The channel runtime is started **once** with a `Config` snapshot — often +//! *before* any user has signed in, so `config.workspace_dir` resolves to +//! `~/.openhuman/users/local/`. When a later sign-in writes +//! `~/.openhuman/active_user.toml` and creates +//! `~/.openhuman/users//`, `credentials::ops::store_session` must +//! re-point the running runtime at the activated user's workspace — otherwise +//! every channel turn keeps reading/writing under `users/local/` until the +//! process restarts (issue #4398). +//! +//! [`ChannelRuntimeContext`](crate::openhuman::channels::context::ChannelRuntimeContext) +//! holds an `Arc>` handle and registers a clone of it here at +//! boot ([`register_workspace_handle`]). [`rebind_workspace`] swaps the path in +//! place through that shared handle, so the live dispatch loop and every +//! workspace-scoped read/write (`ctx.workspace_dir()`) re-resolves on the next +//! turn without a restart. This mirrors the memory rebind +//! (`memory::global::init`) and the active-channel handle +//! (`channels::proactive::register_active_channel_handle`) precedents. + +use std::path::PathBuf; +use std::sync::{Arc, OnceLock, RwLock}; + +/// Shared, in-place-swappable workspace path held by the live runtime context. +pub(crate) type WorkspaceHandle = Arc>; + +type HandleSlot = RwLock>; + +/// Process-global slot pointing at the live runtime's workspace handle. +static WORKSPACE_HANDLE: OnceLock = OnceLock::new(); + +fn handle_slot() -> &'static HandleSlot { + WORKSPACE_HANDLE.get_or_init(|| RwLock::new(None)) +} + +fn register_in(slot: &HandleSlot, handle: WorkspaceHandle) { + match slot.write() { + Ok(mut guard) => *guard = Some(handle), + Err(e) => log::warn!("[channels:runtime] register_workspace_handle: slot poisoned: {e}"), + } +} + +fn rebind_in(slot: &HandleSlot, workspace_dir: PathBuf) { + let handle = match slot.read() { + Ok(guard) => guard.as_ref().map(Arc::clone), + Err(e) => { + log::warn!("[channels:runtime] rebind_workspace: slot read poisoned: {e}"); + return; + } + }; + let Some(handle) = handle else { + // The channel runtime isn't running in this process (e.g. CLI-only + // paths, or login before `start_channels`). Nothing to re-bind — the + // runtime will resolve the active user from its startup `Config` load. + log::debug!( + "[channels:runtime] rebind_workspace: no live runtime handle registered; \ + skipping re-bind to {}", + workspace_dir.display() + ); + return; + }; + match handle.write() { + Ok(mut current) => { + if *current != workspace_dir { + log::info!( + "[channels:runtime] re-binding channel workspace {} -> {}", + current.display(), + workspace_dir.display() + ); + *current = workspace_dir; + } + } + Err(e) => log::warn!("[channels:runtime] rebind_workspace: handle poisoned: {e}"), + }; +} + +/// Register the live runtime's workspace handle so [`rebind_workspace`] can +/// re-point it after login. Called once from `start_channels`. +pub(crate) fn register_workspace_handle(handle: WorkspaceHandle) { + register_in(handle_slot(), handle); +} + +/// Re-point the running channel runtime at `workspace_dir`. +/// +/// Invoked from `credentials::ops::store_session` after activation, against +/// `effective_config.workspace_dir`. No-op when the channel runtime is not +/// running (no handle registered yet). +pub fn rebind_workspace(workspace_dir: PathBuf) { + rebind_in(handle_slot(), workspace_dir); +} + +#[cfg(test)] +mod tests { + use super::*; + use std::path::Path; + + #[test] + fn rebind_reresolves_registered_handle() { + // A fresh, test-local slot — the `memory::global` test pattern — so this + // assertion is deterministic under parallel test execution instead of + // racing the process-global singleton. + let slot: HandleSlot = RwLock::new(None); + // The live `ChannelRuntimeContext` holds a clone of this same handle + // and reads it via `workspace_dir()`, so a re-bind here is what every + // channel turn observes on its next workspace resolution. + let handle: WorkspaceHandle = Arc::new(RwLock::new(PathBuf::from("/users/local"))); + register_in(&slot, Arc::clone(&handle)); + assert_eq!(*handle.read().unwrap(), Path::new("/users/local")); + + rebind_in(&slot, PathBuf::from("/users/user-42")); + + assert_eq!(*handle.read().unwrap(), Path::new("/users/user-42")); + } + + #[test] + fn rebind_without_registered_handle_is_noop() { + let slot: HandleSlot = RwLock::new(None); + // Must not panic when the channel runtime isn't started in-process. + rebind_in(&slot, PathBuf::from("/users/user-7")); + assert!(slot.read().unwrap().is_none()); + } +} diff --git a/src/openhuman/channels/tests/context.rs b/src/openhuman/channels/tests/context.rs index 67ab5517c8..026f83c6c3 100644 --- a/src/openhuman/channels/tests/context.rs +++ b/src/openhuman/channels/tests/context.rs @@ -66,7 +66,7 @@ fn compact_sender_history_keeps_recent_truncated_messages() { channels_by_name: Arc::new(HashMap::new()), provider: Arc::new(DummyProvider), default_provider: Arc::new("test-provider".to_string()), - memory: Arc::new(super::common::NoopMemory), + memory_handle: Arc::new(std::sync::RwLock::new(Arc::new(super::common::NoopMemory))), tools_registry: Arc::new(vec![]), system_prompt: Arc::new("system".to_string()), model: Arc::new("test-model".to_string()), @@ -83,7 +83,7 @@ fn compact_sender_history_keeps_recent_truncated_messages() { multimodal: crate::openhuman::config::MultimodalConfig::default(), multimodal_files: crate::openhuman::config::MultimodalFileConfig::default(), provider_runtime_options: crate::openhuman::inference::provider::ProviderRuntimeOptions::default(), - workspace_dir: Arc::new(std::env::temp_dir()), + workspace_handle: Arc::new(std::sync::RwLock::new(std::env::temp_dir())), message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS, }; diff --git a/src/openhuman/channels/tests/discord_integration.rs b/src/openhuman/channels/tests/discord_integration.rs index f8ce7050d1..a85a4327d5 100644 --- a/src/openhuman/channels/tests/discord_integration.rs +++ b/src/openhuman/channels/tests/discord_integration.rs @@ -120,7 +120,7 @@ fn make_discord_ctx( channels_by_name: Arc::new(channels), provider, default_provider: Arc::new("test-provider".to_string()), - memory: Arc::new(NoopMemory), + memory_handle: Arc::new(std::sync::RwLock::new(Arc::new(NoopMemory))), tools_registry: Arc::new(vec![]), system_prompt: Arc::new("test-system-prompt".to_string()), model: Arc::new("test-model".to_string()), @@ -136,7 +136,7 @@ fn make_discord_ctx( reliability: Arc::new(crate::openhuman::config::ReliabilityConfig::default()), provider_runtime_options: crate::openhuman::inference::provider::ProviderRuntimeOptions::default(), - workspace_dir: Arc::new(std::env::temp_dir()), + workspace_handle: Arc::new(std::sync::RwLock::new(std::env::temp_dir())), message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS, multimodal: crate::openhuman::config::MultimodalConfig::default(), multimodal_files: crate::openhuman::config::MultimodalFileConfig::default(), diff --git a/src/openhuman/channels/tests/memory.rs b/src/openhuman/channels/tests/memory.rs index f255c6d9e3..018da9066f 100644 --- a/src/openhuman/channels/tests/memory.rs +++ b/src/openhuman/channels/tests/memory.rs @@ -140,7 +140,7 @@ async fn process_channel_message_restores_per_sender_history_on_follow_ups() { channels_by_name: Arc::new(channels_by_name), provider: provider_impl.clone(), default_provider: Arc::new("test-provider".to_string()), - memory: Arc::new(NoopMemory), + memory_handle: Arc::new(std::sync::RwLock::new(Arc::new(NoopMemory))), tools_registry: Arc::new(vec![]), system_prompt: Arc::new("test-system-prompt".to_string()), model: Arc::new("test-model".to_string()), @@ -155,7 +155,7 @@ async fn process_channel_message_restores_per_sender_history_on_follow_ups() { inference_url: None, reliability: Arc::new(crate::openhuman::config::ReliabilityConfig::default()), provider_runtime_options: provider::ProviderRuntimeOptions::default(), - workspace_dir: Arc::new(std::env::temp_dir()), + workspace_handle: Arc::new(std::sync::RwLock::new(std::env::temp_dir())), message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS, multimodal: crate::openhuman::config::MultimodalConfig::default(), multimodal_files: crate::openhuman::config::MultimodalFileConfig::default(), @@ -224,7 +224,7 @@ async fn process_channel_message_uses_autosaved_memory_after_history_is_cleared( channels_by_name: Arc::new(channels_by_name), provider: provider_impl.clone(), default_provider: Arc::new("test-provider".to_string()), - memory, + memory_handle: Arc::new(std::sync::RwLock::new(memory)), tools_registry: Arc::new(vec![]), system_prompt: Arc::new("test-system-prompt".to_string()), model: Arc::new("test-model".to_string()), @@ -239,7 +239,7 @@ async fn process_channel_message_uses_autosaved_memory_after_history_is_cleared( inference_url: None, reliability: Arc::new(crate::openhuman::config::ReliabilityConfig::default()), provider_runtime_options: provider::ProviderRuntimeOptions::default(), - workspace_dir: Arc::new(std::env::temp_dir()), + workspace_handle: Arc::new(std::sync::RwLock::new(std::env::temp_dir())), message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS, multimodal: crate::openhuman::config::MultimodalConfig::default(), multimodal_files: crate::openhuman::config::MultimodalFileConfig::default(), diff --git a/src/openhuman/channels/tests/runtime_dispatch.rs b/src/openhuman/channels/tests/runtime_dispatch.rs index c02f776168..0537d2b318 100644 --- a/src/openhuman/channels/tests/runtime_dispatch.rs +++ b/src/openhuman/channels/tests/runtime_dispatch.rs @@ -47,7 +47,7 @@ async fn message_dispatch_processes_messages_in_parallel() { delay: Duration::from_millis(5), }), default_provider: Arc::new("test-provider".to_string()), - memory: Arc::new(NoopMemory), + memory_handle: Arc::new(std::sync::RwLock::new(Arc::new(NoopMemory))), tools_registry: Arc::new(vec![]), system_prompt: Arc::new("test-system-prompt".to_string()), model: Arc::new("test-model".to_string()), @@ -62,7 +62,7 @@ async fn message_dispatch_processes_messages_in_parallel() { inference_url: None, reliability: Arc::new(crate::openhuman::config::ReliabilityConfig::default()), provider_runtime_options: provider::ProviderRuntimeOptions::default(), - workspace_dir: Arc::new(std::env::temp_dir()), + workspace_handle: Arc::new(std::sync::RwLock::new(std::env::temp_dir())), message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS, multimodal: crate::openhuman::config::MultimodalConfig::default(), multimodal_files: crate::openhuman::config::MultimodalFileConfig::default(), @@ -119,7 +119,7 @@ async fn process_channel_message_cancels_scoped_typing_task() { delay: Duration::from_millis(20), }), default_provider: Arc::new("test-provider".to_string()), - memory: Arc::new(NoopMemory), + memory_handle: Arc::new(std::sync::RwLock::new(Arc::new(NoopMemory))), tools_registry: Arc::new(vec![]), system_prompt: Arc::new("test-system-prompt".to_string()), model: Arc::new("test-model".to_string()), @@ -134,7 +134,7 @@ async fn process_channel_message_cancels_scoped_typing_task() { inference_url: None, reliability: Arc::new(crate::openhuman::config::ReliabilityConfig::default()), provider_runtime_options: provider::ProviderRuntimeOptions::default(), - workspace_dir: Arc::new(std::env::temp_dir()), + workspace_handle: Arc::new(std::sync::RwLock::new(std::env::temp_dir())), message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS, multimodal: crate::openhuman::config::MultimodalConfig::default(), multimodal_files: crate::openhuman::config::MultimodalFileConfig::default(), @@ -206,7 +206,7 @@ async fn dispatch_routes_through_agent_run_turn_bus_handler() { // handler never invokes it — so a minimal no-op is fine. provider: Arc::new(super::common::DummyProvider), default_provider: Arc::new("test-provider".to_string()), - memory: Arc::new(NoopMemory), + memory_handle: Arc::new(std::sync::RwLock::new(Arc::new(NoopMemory))), tools_registry: Arc::new(vec![]), system_prompt: Arc::new("test-system-prompt".to_string()), model: Arc::new("test-model".to_string()), @@ -221,7 +221,7 @@ async fn dispatch_routes_through_agent_run_turn_bus_handler() { inference_url: None, reliability: Arc::new(crate::openhuman::config::ReliabilityConfig::default()), provider_runtime_options: provider::ProviderRuntimeOptions::default(), - workspace_dir: Arc::new(std::env::temp_dir()), + workspace_handle: Arc::new(std::sync::RwLock::new(std::env::temp_dir())), message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS, multimodal: crate::openhuman::config::MultimodalConfig::default(), multimodal_files: crate::openhuman::config::MultimodalFileConfig::default(), @@ -288,7 +288,7 @@ async fn channel_processed_event_records_resolved_agent_route() { channels_by_name: Arc::new(channels_by_name), provider: Arc::new(super::common::DummyProvider), default_provider: Arc::new("requested-provider".to_string()), - memory: Arc::new(NoopMemory), + memory_handle: Arc::new(std::sync::RwLock::new(Arc::new(NoopMemory))), tools_registry: Arc::new(vec![]), system_prompt: Arc::new("test-system-prompt".to_string()), model: Arc::new("requested-model".to_string()), @@ -303,7 +303,7 @@ async fn channel_processed_event_records_resolved_agent_route() { inference_url: None, reliability: Arc::new(crate::openhuman::config::ReliabilityConfig::default()), provider_runtime_options: provider::ProviderRuntimeOptions::default(), - workspace_dir: Arc::new(std::env::temp_dir()), + workspace_handle: Arc::new(std::sync::RwLock::new(std::env::temp_dir())), message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS, multimodal: crate::openhuman::config::MultimodalConfig::default(), multimodal_files: crate::openhuman::config::MultimodalFileConfig::default(), @@ -399,7 +399,7 @@ async fn process_channel_message_hardens_multimodal_files_against_smuggled_marke channels_by_name: Arc::new(channels_by_name), provider: Arc::new(super::common::DummyProvider), default_provider: Arc::new("test-provider".to_string()), - memory: Arc::new(NoopMemory), + memory_handle: Arc::new(std::sync::RwLock::new(Arc::new(NoopMemory))), tools_registry: Arc::new(vec![]), system_prompt: Arc::new("test-system-prompt".to_string()), model: Arc::new("test-model".to_string()), @@ -414,7 +414,7 @@ async fn process_channel_message_hardens_multimodal_files_against_smuggled_marke inference_url: None, reliability: Arc::new(crate::openhuman::config::ReliabilityConfig::default()), provider_runtime_options: provider::ProviderRuntimeOptions::default(), - workspace_dir: Arc::new(std::env::temp_dir()), + workspace_handle: Arc::new(std::sync::RwLock::new(std::env::temp_dir())), message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS, multimodal: crate::openhuman::config::MultimodalConfig::default(), multimodal_files: permissive_operator_default, @@ -481,7 +481,7 @@ async fn process_channel_message_hardens_against_relative_path_markers() { channels_by_name: Arc::new(channels_by_name), provider: Arc::new(super::common::DummyProvider), default_provider: Arc::new("test-provider".to_string()), - memory: Arc::new(NoopMemory), + memory_handle: Arc::new(std::sync::RwLock::new(Arc::new(NoopMemory))), tools_registry: Arc::new(vec![]), system_prompt: Arc::new("test-system-prompt".to_string()), model: Arc::new("test-model".to_string()), @@ -496,7 +496,7 @@ async fn process_channel_message_hardens_against_relative_path_markers() { inference_url: None, reliability: Arc::new(crate::openhuman::config::ReliabilityConfig::default()), provider_runtime_options: provider::ProviderRuntimeOptions::default(), - workspace_dir: Arc::new(std::env::temp_dir()), + workspace_handle: Arc::new(std::sync::RwLock::new(std::env::temp_dir())), message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS, multimodal: crate::openhuman::config::MultimodalConfig::default(), multimodal_files: crate::openhuman::config::MultimodalFileConfig::default(), diff --git a/src/openhuman/channels/tests/runtime_tool_calls.rs b/src/openhuman/channels/tests/runtime_tool_calls.rs index 257eda6780..5cc1f5d600 100644 --- a/src/openhuman/channels/tests/runtime_tool_calls.rs +++ b/src/openhuman/channels/tests/runtime_tool_calls.rs @@ -25,7 +25,7 @@ async fn process_channel_message_executes_tool_calls_instead_of_sending_raw_json channels_by_name: Arc::new(channels_by_name), provider: Arc::new(ToolCallingProvider), default_provider: Arc::new("test-provider".to_string()), - memory: Arc::new(NoopMemory), + memory_handle: Arc::new(std::sync::RwLock::new(Arc::new(NoopMemory))), tools_registry: Arc::new(vec![Box::new(MockPriceTool)]), system_prompt: Arc::new("test-system-prompt".to_string()), model: Arc::new("test-model".to_string()), @@ -40,7 +40,7 @@ async fn process_channel_message_executes_tool_calls_instead_of_sending_raw_json inference_url: None, reliability: Arc::new(crate::openhuman::config::ReliabilityConfig::default()), provider_runtime_options: provider::ProviderRuntimeOptions::default(), - workspace_dir: Arc::new(std::env::temp_dir()), + workspace_handle: Arc::new(std::sync::RwLock::new(std::env::temp_dir())), message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS, multimodal: crate::openhuman::config::MultimodalConfig::default(), multimodal_files: crate::openhuman::config::MultimodalFileConfig::default(), @@ -81,7 +81,7 @@ async fn process_channel_message_executes_tool_calls_with_alias_tags() { channels_by_name: Arc::new(channels_by_name), provider: Arc::new(ToolCallingAliasProvider), default_provider: Arc::new("test-provider".to_string()), - memory: Arc::new(NoopMemory), + memory_handle: Arc::new(std::sync::RwLock::new(Arc::new(NoopMemory))), tools_registry: Arc::new(vec![Box::new(MockPriceTool)]), system_prompt: Arc::new("test-system-prompt".to_string()), model: Arc::new("test-model".to_string()), @@ -96,7 +96,7 @@ async fn process_channel_message_executes_tool_calls_with_alias_tags() { inference_url: None, reliability: Arc::new(crate::openhuman::config::ReliabilityConfig::default()), provider_runtime_options: provider::ProviderRuntimeOptions::default(), - workspace_dir: Arc::new(std::env::temp_dir()), + workspace_handle: Arc::new(std::sync::RwLock::new(std::env::temp_dir())), message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS, multimodal: crate::openhuman::config::MultimodalConfig::default(), multimodal_files: crate::openhuman::config::MultimodalFileConfig::default(), @@ -146,7 +146,7 @@ async fn process_channel_message_handles_models_command_without_llm_call() { channels_by_name: Arc::new(channels_by_name), provider: Arc::clone(&default_provider), default_provider: Arc::new("test-provider".to_string()), - memory: Arc::new(NoopMemory), + memory_handle: Arc::new(std::sync::RwLock::new(Arc::new(NoopMemory))), tools_registry: Arc::new(vec![]), system_prompt: Arc::new("test-system-prompt".to_string()), model: Arc::new("default-model".to_string()), @@ -161,7 +161,7 @@ async fn process_channel_message_handles_models_command_without_llm_call() { inference_url: None, reliability: Arc::new(crate::openhuman::config::ReliabilityConfig::default()), provider_runtime_options: provider::ProviderRuntimeOptions::default(), - workspace_dir: Arc::new(std::env::temp_dir()), + workspace_handle: Arc::new(std::sync::RwLock::new(std::env::temp_dir())), message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS, multimodal: crate::openhuman::config::MultimodalConfig::default(), multimodal_files: crate::openhuman::config::MultimodalFileConfig::default(), @@ -238,7 +238,7 @@ async fn process_channel_message_uses_route_override_provider_and_model() { channels_by_name: Arc::new(channels_by_name), provider: Arc::clone(&default_provider), default_provider: Arc::new("test-provider".to_string()), - memory: Arc::new(NoopMemory), + memory_handle: Arc::new(std::sync::RwLock::new(Arc::new(NoopMemory))), tools_registry: Arc::new(vec![]), system_prompt: Arc::new("test-system-prompt".to_string()), model: Arc::new("default-model".to_string()), @@ -253,7 +253,7 @@ async fn process_channel_message_uses_route_override_provider_and_model() { inference_url: None, reliability: Arc::new(crate::openhuman::config::ReliabilityConfig::default()), provider_runtime_options: provider::ProviderRuntimeOptions::default(), - workspace_dir: Arc::new(std::env::temp_dir()), + workspace_handle: Arc::new(std::sync::RwLock::new(std::env::temp_dir())), message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS, multimodal: crate::openhuman::config::MultimodalConfig::default(), multimodal_files: crate::openhuman::config::MultimodalFileConfig::default(), @@ -288,7 +288,7 @@ async fn process_channel_message_respects_configured_max_tool_iterations_above_d required_tool_iterations: 11, }), default_provider: Arc::new("test-provider".to_string()), - memory: Arc::new(NoopMemory), + memory_handle: Arc::new(std::sync::RwLock::new(Arc::new(NoopMemory))), tools_registry: Arc::new(vec![Box::new(MockPriceTool)]), system_prompt: Arc::new("test-system-prompt".to_string()), model: Arc::new("test-model".to_string()), @@ -303,7 +303,7 @@ async fn process_channel_message_respects_configured_max_tool_iterations_above_d inference_url: None, reliability: Arc::new(crate::openhuman::config::ReliabilityConfig::default()), provider_runtime_options: provider::ProviderRuntimeOptions::default(), - workspace_dir: Arc::new(std::env::temp_dir()), + workspace_handle: Arc::new(std::sync::RwLock::new(std::env::temp_dir())), message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS, multimodal: crate::openhuman::config::MultimodalConfig::default(), multimodal_files: crate::openhuman::config::MultimodalFileConfig::default(), @@ -345,7 +345,7 @@ async fn process_channel_message_reports_configured_max_tool_iterations_limit() required_tool_iterations: 20, }), default_provider: Arc::new("test-provider".to_string()), - memory: Arc::new(NoopMemory), + memory_handle: Arc::new(std::sync::RwLock::new(Arc::new(NoopMemory))), tools_registry: Arc::new(vec![Box::new(MockPriceTool)]), system_prompt: Arc::new("test-system-prompt".to_string()), model: Arc::new("test-model".to_string()), @@ -360,7 +360,7 @@ async fn process_channel_message_reports_configured_max_tool_iterations_limit() inference_url: None, reliability: Arc::new(crate::openhuman::config::ReliabilityConfig::default()), provider_runtime_options: provider::ProviderRuntimeOptions::default(), - workspace_dir: Arc::new(std::env::temp_dir()), + workspace_handle: Arc::new(std::sync::RwLock::new(std::env::temp_dir())), message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS, multimodal: crate::openhuman::config::MultimodalConfig::default(), multimodal_files: crate::openhuman::config::MultimodalFileConfig::default(), diff --git a/src/openhuman/channels/tests/telegram_integration.rs b/src/openhuman/channels/tests/telegram_integration.rs index 11811cffbd..35c2245c9b 100644 --- a/src/openhuman/channels/tests/telegram_integration.rs +++ b/src/openhuman/channels/tests/telegram_integration.rs @@ -96,7 +96,7 @@ fn make_test_context( channels_by_name: Arc::new(channels), provider, default_provider: Arc::new("test-provider".to_string()), - memory: Arc::new(NoopMemory), + memory_handle: Arc::new(std::sync::RwLock::new(Arc::new(NoopMemory))), tools_registry: Arc::new(vec![]), system_prompt: Arc::new("test-system-prompt".to_string()), model: Arc::new("test-model".to_string()), @@ -112,7 +112,7 @@ fn make_test_context( reliability: Arc::new(crate::openhuman::config::ReliabilityConfig::default()), provider_runtime_options: crate::openhuman::inference::provider::ProviderRuntimeOptions::default(), - workspace_dir: Arc::new(std::env::temp_dir()), + workspace_handle: Arc::new(std::sync::RwLock::new(std::env::temp_dir())), message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS, multimodal: crate::openhuman::config::MultimodalConfig::default(), multimodal_files: crate::openhuman::config::MultimodalFileConfig::default(), diff --git a/src/openhuman/credentials/ops.rs b/src/openhuman/credentials/ops.rs index 4f41be8bdc..6f50bdd767 100644 --- a/src/openhuman/credentials/ops.rs +++ b/src/openhuman/credentials/ops.rs @@ -438,6 +438,34 @@ async fn store_session_inner( ); logs.push("conversation persistence bound to active workspace".to_string()); + // Re-resolve the two long-lived subsystems that started from a pre-login + // Config snapshot and would otherwise keep writing under users/local/ until + // a process restart (issue #4398, the #2437-E follow-up). Mirrors the + // memory/conversation rebind above: + // * channel runtime — re-point its shared workspace handle so every + // channel turn re-resolves to the activated user's workspace. + // * cron scheduler — re-bind its active config so the next poll reads the + // cron store under users//. + // Both are no-ops when the subsystem isn't running in this process. + crate::openhuman::channels::rebind_workspace(effective_config.workspace_dir.clone()); + crate::openhuman::cron::scheduler::rebind(effective_config.clone()); + // Re-point the remaining workspace-baked channel-runtime state (issue + // #4398 follow-up): the agent sandbox root (so file-writing tools stay + // confined to the activated user's workspace) and the channel memory store + // (conversation auto-save + memory-context retrieval). The Telegram + // busy-state subscriber and the PROFILE.md writer share the workspace + // handle re-bound above, so they re-resolve without a separate call. Both + // calls are no-ops when the channel runtime / live policy aren't installed. + crate::openhuman::security::live_policy::set_workspace_dir( + effective_config.workspace_dir.clone(), + &effective_config.autonomy, + ); + crate::openhuman::channels::rebind_memory(&effective_config); + logs.push(format!( + "channel runtime + scheduler + security + memory re-bound to workspace {}", + effective_config.workspace_dir.display() + )); + // Now that active_user.toml exists and config.workspace_dir resolves to // the per-user path, seed the subconscious defaults and spawn the // heartbeat loop. Idempotent — no-op on subsequent logins of the same diff --git a/src/openhuman/cron/scheduler.rs b/src/openhuman/cron/scheduler.rs index b31cdf5921..2bc1af0ae2 100644 --- a/src/openhuman/cron/scheduler.rs +++ b/src/openhuman/cron/scheduler.rs @@ -11,11 +11,83 @@ use anyhow::Result; use chrono::{DateTime, Utc}; use futures_util::{stream, StreamExt}; use std::process::Stdio; -use std::sync::Arc; +use std::sync::{Arc, OnceLock, RwLock}; use tokio::process::Command; use tokio::time::{self, Duration}; const MIN_POLL_SECONDS: u64 = 5; + +/// Process-global holder for the scheduler's *active* config. +/// +/// The cron scheduler is started once (`run`) with a `Config` snapshot, +/// frequently *before* any user has signed in — so `config.workspace_dir` +/// resolves to `~/.openhuman/users/local/` and every `due_jobs` poll reads the +/// cron store there. After a sign-in, `credentials::ops::store_session` calls +/// [`rebind`] with the activated user's config; the poll loop reads this holder +/// each tick, so the next poll resolves the cron store under +/// `users//` without a process restart (issue #4398). Mirrors the +/// re-bind precedents in `memory::global::init` and +/// `channels::runtime::rebind_workspace`. +type ActiveConfigSlot = RwLock>; + +static ACTIVE_CONFIG: OnceLock = OnceLock::new(); + +fn active_config_slot() -> &'static ActiveConfigSlot { + ACTIVE_CONFIG.get_or_init(ActiveConfigSlot::default) +} + +/// Seed the holder with the scheduler's startup config, but only if nothing is +/// bound yet — a live [`rebind`] that happened before the loop reached its +/// first tick must not be clobbered by the boot snapshot. +fn seed_in(slot: &ActiveConfigSlot, config: Config) { + if let Ok(mut guard) = slot.write() { + if guard.is_none() { + *guard = Some(config); + } + } +} + +fn rebind_in(slot: &ActiveConfigSlot, config: Config) { + match slot.write() { + Ok(mut guard) => { + let changed = guard + .as_ref() + .map(|c| c.workspace_dir != config.workspace_dir) + .unwrap_or(true); + if changed { + log::info!( + "[cron:scheduler] re-binding scheduler workspace -> {}", + config.workspace_dir.display() + ); + } + *guard = Some(config); + } + Err(e) => log::warn!("[cron:scheduler] rebind: active-config lock poisoned: {e}"), + } +} + +fn current_in(slot: &ActiveConfigSlot) -> Option { + slot.read().ok().and_then(|guard| guard.clone()) +} + +/// Seed the scheduler's active-config holder from its startup config. +fn seed_active_config(config: Config) { + seed_in(active_config_slot(), config); +} + +/// Re-bind the scheduler's active config after a post-login active-user switch. +/// +/// Invoked from `credentials::ops::store_session` against `effective_config`. +/// The poll loop picks this up on its next tick. +pub fn rebind(config: Config) { + rebind_in(active_config_slot(), config); +} + +/// The scheduler's current active config, if the loop has started (or a +/// [`rebind`] has landed). +fn current_active_config() -> Option { + current_in(active_config_slot()) +} const SHELL_JOB_TIMEOUT_SECS: u64 = 120; const AGENT_JOB_USER_FAILURE_MESSAGE: &str = "Something went wrong. Please try again.\nThis error has been reported. You can also report it on Discord.\nReport on Discord"; // Actionable, static failure copy for the three permanent cron halt states @@ -167,13 +239,12 @@ pub async fn run(config: Config) -> Result<()> { crate::core::event_bus::init_global(crate::core::event_bus::DEFAULT_CAPACITY); crate::openhuman::health::bus::register_health_subscriber(); + // Seed the re-bindable active-config holder so `store_session` can later + // re-point the poll loop at the activated user's workspace (issue #4398). + seed_active_config(config.clone()); + let poll_secs = config.reliability.scheduler_poll_secs.max(MIN_POLL_SECONDS); let mut interval = time::interval(Duration::from_secs(poll_secs)); - let security = Arc::new(SecurityPolicy::from_config( - &config.autonomy, - &config.workspace_dir, - &config.action_dir, - )); publish_global(DomainEvent::SystemStartup { component: "scheduler".to_string(), @@ -190,7 +261,19 @@ pub async fn run(config: Config) -> Result<()> { loop { interval.tick().await; - tick_once(&config, &security, &mut last_emitted_health).await; + // Re-resolve the active config every tick. After a post-login + // `rebind`, `current` (and its `workspace_dir`) points at the + // activated user's workspace, so `due_jobs`/security below read the + // cron store under `users//` instead of the pre-login + // `users/local/` snapshot. Falls back to the startup snapshot if the + // holder is somehow unset. + let current = current_active_config().unwrap_or_else(|| config.clone()); + let security = Arc::new(SecurityPolicy::from_config( + ¤t.autonomy, + ¤t.workspace_dir, + ¤t.action_dir, + )); + tick_once(¤t, &security, &mut last_emitted_health).await; } } diff --git a/src/openhuman/cron/scheduler_tests.rs b/src/openhuman/cron/scheduler_tests.rs index 98faf9c11d..1eb888ac78 100644 --- a/src/openhuman/cron/scheduler_tests.rs +++ b/src/openhuman/cron/scheduler_tests.rs @@ -6,7 +6,7 @@ use crate::openhuman::security::SecurityPolicy; use chrono::{Duration as ChronoDuration, Timelike, Utc}; #[cfg(not(windows))] use std::os::unix::fs::PermissionsExt; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use tempfile::TempDir; async fn test_config(tmp: &TempDir) -> Config { @@ -134,6 +134,53 @@ async fn deliver_if_configured_alerts_no_delivery_failure() { ); } +// Issue #4398 — the scheduler is started once with a pre-login `Config` +// snapshot (workspace = `users/local/`). After a sign-in, `store_session` +// calls `scheduler::rebind(effective_config)`; the poll loop re-resolves the +// active config each tick, so `due_jobs` must then read the cron store under +// the activated user's workspace instead of the pre-login one. This drives the +// re-bind seam through a test-local slot (the `memory::global` pattern) so it's +// deterministic under parallel execution. +#[tokio::test] +async fn rebind_reresolves_scheduler_workspace_after_store_session() { + let pre_login = TempDir::new().unwrap(); + let active_user = TempDir::new().unwrap(); + let pre_login_config = test_config(&pre_login).await; + let active_user_config = test_config(&active_user).await; + + // A scheduled job created only under the activated user's workspace. + let job = cron::add_job(&active_user_config, "* * * * *", "echo active-user").unwrap(); + let far_future = job.next_run + ChronoDuration::minutes(1); + + let slot: ActiveConfigSlot = RwLock::new(None); + + // Boot: bound to the pre-login (users/local) workspace. Its store is empty, + // so the scheduler would run nothing for the signed-in user. + seed_in(&slot, pre_login_config.clone()); + let before = current_in(&slot).expect("seeded config"); + assert_eq!(before.workspace_dir, pre_login_config.workspace_dir); + assert!( + cron::due_jobs(&before, far_future).unwrap().is_empty(), + "pre-login workspace has no jobs for the signed-in user" + ); + + // Login: store_session re-binds to the activated user's workspace. + rebind_in(&slot, active_user_config.clone()); + + let after = current_in(&slot).expect("rebound config"); + assert_eq!( + after.workspace_dir, active_user_config.workspace_dir, + "scheduler must re-resolve to the activated user's workspace" + ); + let due = cron::due_jobs(&after, far_future).unwrap(); + assert_eq!( + due.len(), + 1, + "after rebind the poll loop must read jobs from users//" + ); + assert_eq!(due[0].command, "echo active-user"); +} + // Negative guard: a successful no-delivery run with no output must NOT alert — // the hoist only surfaces failures + non-empty results, never quiet successes. #[tokio::test] diff --git a/src/openhuman/learning/profile_md_renderer.rs b/src/openhuman/learning/profile_md_renderer.rs index e610815a0d..92c31ce90a 100644 --- a/src/openhuman/learning/profile_md_renderer.rs +++ b/src/openhuman/learning/profile_md_renderer.rs @@ -35,7 +35,7 @@ //! thread pool. use std::path::PathBuf; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use async_trait::async_trait; @@ -93,16 +93,28 @@ const BLOCK_SPECS: &[BlockSpec] = &[ /// managed blocks of `PROFILE.md`. pub struct ProfileMdRenderer { cache: Arc, - workspace_dir: PathBuf, + /// Re-bindable workspace handle (issue #4398). Shared with the channel + /// runtime context so a post-login `rebind_workspace` re-points PROFILE.md + /// writes at the activated user's workspace without re-subscribing. + workspace_handle: Arc>, } impl ProfileMdRenderer { /// Create a new renderer backed by `cache`, writing to - /// `workspace_dir/PROFILE.md`. - pub fn new(cache: Arc, workspace_dir: PathBuf) -> Self { + /// `/PROFILE.md` where `` is re-resolved through the + /// shared `workspace_handle` on every render. + pub fn new(cache: Arc, workspace_handle: Arc>) -> Self { Self { cache, - workspace_dir, + workspace_handle, + } + } + + /// Current workspace directory, re-resolved through the shared handle. + fn workspace_dir(&self) -> PathBuf { + match self.workspace_handle.read() { + Ok(guard) => guard.clone(), + Err(poisoned) => poisoned.into_inner().clone(), } } @@ -150,7 +162,7 @@ impl ProfileMdRenderer { lines.join("\n") }; - replace_managed_block(&self.workspace_dir, spec.block_name, spec.heading, body) + replace_managed_block(&self.workspace_dir(), spec.block_name, spec.heading, body) .map_err(|e| { anyhow::anyhow!( "[learning::profile_md_renderer] failed to write block '{}': {e}", @@ -260,7 +272,10 @@ mod tests { let conn = Connection::open_in_memory().unwrap(); conn.execute_batch(PROFILE_INIT_SQL).unwrap(); let cache = make_cache(Arc::new(Mutex::new(conn))); - let renderer = ProfileMdRenderer::new(Arc::clone(&cache), tmp.path().to_path_buf()); + let renderer = ProfileMdRenderer::new( + Arc::clone(&cache), + Arc::new(RwLock::new(tmp.path().to_path_buf())), + ); (cache, renderer, tmp) } @@ -502,7 +517,10 @@ mod tests { let conn = Connection::open_in_memory().unwrap(); conn.execute_batch(PROFILE_INIT_SQL).unwrap(); let cache = make_cache(Arc::new(Mutex::new(conn))); - let renderer = Arc::new(ProfileMdRenderer::new(cache, tmp.path().to_path_buf())); + let renderer = Arc::new(ProfileMdRenderer::new( + cache, + Arc::new(RwLock::new(tmp.path().to_path_buf())), + )); // subscribe_global requires a running runtime; just verify the type works. let _renderer_ref = Arc::clone(&renderer); // We can't call subscribe_global in a unit test without a tokio runtime, diff --git a/src/openhuman/security/live_policy.rs b/src/openhuman/security/live_policy.rs index eb1933a258..34f834f9fd 100644 --- a/src/openhuman/security/live_policy.rs +++ b/src/openhuman/security/live_policy.rs @@ -117,6 +117,56 @@ pub fn update_action_dir(new_action_dir: PathBuf) -> Result { Ok(gen) } +/// Swap the workspace root on the process-global live policy after a +/// post-login active-user switch (issue #4398). +/// +/// The agent sandbox confines writes to `workspace_dir` (memory DBs, sessions, +/// tokens). When the core started pre-login, that root was +/// `~/.openhuman/users/local/`; after sign-in the agent's writes must be +/// confined to `users//`. Updates the stored `workspace_dir` (so a +/// later [`reload_from`] keeps the new root) and rebuilds the current policy +/// via [`SecurityPolicy::from_config`] against the new workspace + the stored +/// action root — using `from_config` (not a field poke) so the policy's +/// lazily-cached canonical workspace path is rebuilt fresh rather than left +/// pointing at the old root. No-op if nothing has been [`install`]ed yet +/// (e.g. a CLI invocation that never started a session runtime), mirroring +/// [`set_action_dir`]. Tools read [`current`] at call time, so the next tool +/// call sees the new sandbox root. +pub fn set_workspace_dir( + new_workspace: PathBuf, + autonomy_config: &crate::openhuman::config::AutonomyConfig, +) { + let Some(state) = STATE.get() else { + tracing::debug!( + "[security:live_policy] set_workspace_dir called before install; no live policy to swap" + ); + return; + }; + + if let Ok(mut guard) = state.workspace_dir.write() { + *guard = new_workspace.clone(); + } + let action = state + .action_dir + .read() + .map(|g| g.clone()) + .unwrap_or_default(); + let rebuilt = Arc::new(SecurityPolicy::from_config( + autonomy_config, + &new_workspace, + &action, + )); + if let Ok(mut guard) = state.policy.write() { + *guard = rebuilt; + } + let gen = state.generation.fetch_add(1, Ordering::Relaxed) + 1; + tracing::info!( + generation = gen, + workspace_dir = %new_workspace.display(), + "[security:live_policy] SecurityPolicy workspace_dir swapped after active-user switch" + ); +} + /// Rebuild the policy from `autonomy_config` against the stored workspace dir /// and swap it in, bumping the generation counter. No-op if nothing has been /// installed yet (e.g. a CLI invocation that never started a session runtime). @@ -305,4 +355,65 @@ mod tests { "reload after set_action_dir must keep the swapped root" ); } + + // Issue #4398 — after a post-login active-user switch, the agent sandbox + // root must move from the pre-login `users/local/` workspace to the + // activated user's workspace, so file-writing tools (which read `current()` + // at call time) stay confined to the right place. Preserves autonomy and + // action_dir, bumps the generation, and survives a later reload. + #[test] + fn set_workspace_dir_swaps_root_and_preserves_other_access() { + let _env = crate::openhuman::config::TEST_ENV_LOCK + .lock() + .unwrap_or_else(|e| e.into_inner()); + let pre_login = std::env::temp_dir().join("openhuman_set_ws_test_local"); + let action = std::env::temp_dir().join("openhuman_set_ws_test_action"); + let initial = Arc::new(SecurityPolicy { + autonomy: AutonomyLevel::Full, + workspace_dir: pre_login.clone(), + action_dir: action.clone(), + ..SecurityPolicy::default() + }); + install(initial, pre_login.clone(), action.clone()); + assert_eq!( + current().expect("policy installed").workspace_dir, + pre_login, + "precondition: workspace starts at the pre-login root" + ); + + let before = generation(); + let active_user = std::env::temp_dir().join("openhuman_set_ws_test_user_42"); + let cfg = AutonomyConfig { + level: AutonomyLevel::Full, + ..AutonomyConfig::default() + }; + set_workspace_dir(active_user.clone(), &cfg); + + assert!( + generation() > before, + "generation must increase on workspace_dir swap" + ); + let now = current().expect("policy still installed"); + assert_eq!( + now.workspace_dir, active_user, + "live policy must reflect the activated user's workspace" + ); + assert_eq!( + now.action_dir, action, + "action_dir must survive a workspace_dir swap" + ); + assert_eq!( + now.autonomy, + AutonomyLevel::Full, + "autonomy level must survive a workspace_dir swap" + ); + + // The stored workspace is updated, so a later reload keeps the new root. + reload_from(&cfg); + assert_eq!( + current().expect("policy still installed").workspace_dir, + active_user, + "reload after set_workspace_dir must keep the swapped workspace" + ); + } } diff --git a/tests/learning_phase4_integration_test.rs b/tests/learning_phase4_integration_test.rs index 687cf52b11..6754f17b76 100644 --- a/tests/learning_phase4_integration_test.rs +++ b/tests/learning_phase4_integration_test.rs @@ -77,7 +77,7 @@ impl TestHarness { let workspace = TempDir::new().unwrap(); let renderer = Arc::new(ProfileMdRenderer::new( Arc::clone(&cache), - workspace.path().to_path_buf(), + Arc::new(std::sync::RwLock::new(workspace.path().to_path_buf())), )); // Drain any stale candidates from prior tests so they don't affect