Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 39 additions & 6 deletions src/openhuman/channels/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::openhuman::tools::Tool;
use crate::openhuman::util::truncate_with_ellipsis;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, RwLock};

/// Per-sender conversation history for channel messages.
pub(crate) type ConversationHistoryMap = Arc<Mutex<HashMap<String, Vec<ChatMessage>>>>;
Expand Down Expand Up @@ -47,7 +47,10 @@ pub(crate) struct ChannelRuntimeContext {
pub(crate) channels_by_name: Arc<HashMap<String, Arc<dyn super::Channel>>>,
pub(crate) provider: Arc<dyn Provider>,
pub(crate) default_provider: Arc<String>,
pub(crate) memory: Arc<dyn Memory>,
/// Re-bindable channel memory store (issue #4398). Held behind a lock so a
/// post-login `channels::rebind_memory` swaps it for the activated user's
/// workspace store. Read via [`ChannelRuntimeContext::memory`].
pub(crate) memory_handle: Arc<RwLock<Arc<dyn Memory>>>,
pub(crate) tools_registry: Arc<Vec<Box<dyn Tool>>>,
pub(crate) system_prompt: Arc<String>,
pub(crate) model: Arc<String>,
Expand All @@ -63,12 +66,42 @@ pub(crate) struct ChannelRuntimeContext {
pub(crate) reliability: Arc<crate::openhuman::config::ReliabilityConfig>,
pub(crate) provider_runtime_options:
crate::openhuman::inference::provider::ProviderRuntimeOptions,
pub(crate) workspace_dir: Arc<PathBuf>,
/// Re-bindable workspace handle. Started from the pre-login `Config`
/// snapshot, then re-pointed at the activated user's workspace on login
/// via `channels::runtime::rebind_workspace` (issue #4398). Read through
/// [`ChannelRuntimeContext::workspace_dir`] so every turn re-resolves.
pub(crate) workspace_handle: Arc<RwLock<PathBuf>>,
pub(crate) message_timeout_secs: u64,
pub(crate) multimodal: crate::openhuman::config::MultimodalConfig,
pub(crate) multimodal_files: crate::openhuman::config::MultimodalFileConfig,
}

impl ChannelRuntimeContext {
/// Current workspace directory, re-resolved through the re-bindable handle.
///
/// After a post-login `rebind_workspace`, this returns the activated
/// user's `users/<user_id>/` workspace instead of the pre-login
/// `users/local/` snapshot the runtime was started with (issue #4398).
pub(crate) fn workspace_dir(&self) -> PathBuf {
match self.workspace_handle.read() {
Ok(guard) => guard.clone(),
Err(poisoned) => poisoned.into_inner().clone(),
}
}

/// Current channel memory store, re-resolved through the re-bindable handle.
///
/// After a post-login `rebind_memory`, this returns a store rooted at the
/// activated user's workspace instead of the pre-login `users/local/` store
/// the runtime was started with (issue #4398).
pub(crate) fn memory(&self) -> Arc<dyn Memory> {
match self.memory_handle.read() {
Ok(guard) => Arc::clone(&guard),
Err(poisoned) => Arc::clone(&poisoned.into_inner()),
}
}
}

pub(crate) fn conversation_memory_key(msg: &super::traits::ChannelMessage) -> String {
format!("{}_{}_{}", msg.channel, msg.sender, msg.id)
}
Expand Down Expand Up @@ -331,9 +364,9 @@ mod tests {
channels_by_name: Arc::new(HashMap::new()),
provider: Arc::new(DummyProvider),
default_provider: Arc::new("default".into()),
memory: Arc::new(MockMemory {
memory_handle: Arc::new(RwLock::new(Arc::new(MockMemory {
entries: Vec::new(),
}),
}))),
tools_registry: Arc::new(vec![Box::new(DummyTool) as Box<dyn Tool>]),
system_prompt: Arc::new("prompt".into()),
model: Arc::new("model".into()),
Expand All @@ -349,7 +382,7 @@ mod tests {
reliability: Arc::new(crate::openhuman::config::ReliabilityConfig::default()),
provider_runtime_options:
crate::openhuman::inference::provider::ProviderRuntimeOptions::default(),
workspace_dir: Arc::new(PathBuf::from("/tmp")),
workspace_handle: Arc::new(RwLock::new(PathBuf::from("/tmp"))),
message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS,
multimodal: crate::openhuman::config::MultimodalConfig::default(),
multimodal_files: crate::openhuman::config::MultimodalFileConfig::default(),
Expand Down
2 changes: 2 additions & 0 deletions src/openhuman/channels/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,6 @@ pub use controllers::{ChannelAuthMode, ChannelDefinition};
// the prompt-building code. Re-exported here for callers that used the
// old `channels::build_system_prompt` path.
pub use crate::openhuman::context::channels_prompt::build_system_prompt;
pub use runtime::rebind_memory;
pub use runtime::rebind_workspace;
pub use runtime::start_channels;
31 changes: 23 additions & 8 deletions src/openhuman/channels/providers/telegram/bus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,37 @@ use crate::core::event_bus::{DomainEvent, EventHandler};
use crate::openhuman::channels::providers::telegram::session_store::with_store;
use async_trait::async_trait;
use std::path::PathBuf;
use std::sync::{Arc, RwLock};

const LOG_PREFIX: &str = "[telegram-remote]";

/// Tracks Telegram turn lifecycle via channel domain events and exposes busy
/// state for `/status`.
pub struct TelegramRemoteSubscriber {
workspace_dir: PathBuf,
/// Re-bindable workspace handle (issue #4398). Shared with the channel
/// runtime context so a post-login `rebind_workspace` re-points busy-state
/// persistence at the activated user's workspace. Read at event time so the
/// stale-workspace guard below compares against the CURRENT workspace — the
/// dispatch loop stamps events with `ctx.workspace_dir()`, which after a
/// re-bind is the new path; a baked snapshot here would drop every event.
workspace_handle: Arc<RwLock<PathBuf>>,
}

impl TelegramRemoteSubscriber {
pub fn new(workspace_dir: PathBuf) -> Self {
Self { workspace_dir }
pub fn new(workspace_handle: Arc<RwLock<PathBuf>>) -> Self {
Self { workspace_handle }
}

/// Current workspace directory, re-resolved through the shared handle.
fn workspace_dir(&self) -> PathBuf {
match self.workspace_handle.read() {
Ok(guard) => guard.clone(),
Err(poisoned) => poisoned.into_inner().clone(),
}
}

async fn set_busy(&self, reply_target: &str, busy: bool) {
let workspace_dir = self.workspace_dir.clone();
let workspace_dir = self.workspace_dir();
let reply_target_owned = reply_target.to_string();
let join_result = tokio::task::spawn_blocking(move || {
with_store(&workspace_dir, |store| {
Expand Down Expand Up @@ -59,12 +74,12 @@ impl EventHandler for TelegramRemoteSubscriber {
workspace_dir,
..
} if channel == "telegram" => {
if *workspace_dir != self.workspace_dir {
if *workspace_dir != self.workspace_dir() {
tracing::debug!(
"{LOG_PREFIX} dropping stale-workspace ChannelMessageReceived \
event_ws={} self_ws={}",
workspace_dir.display(),
self.workspace_dir.display()
self.workspace_dir().display()
);
return;
}
Expand All @@ -79,12 +94,12 @@ impl EventHandler for TelegramRemoteSubscriber {
workspace_dir,
..
} if channel == "telegram" => {
if *workspace_dir != self.workspace_dir {
if *workspace_dir != self.workspace_dir() {
tracing::debug!(
"{LOG_PREFIX} dropping stale-workspace ChannelMessageProcessed \
event_ws={} self_ws={}",
workspace_dir.display(),
self.workspace_dir.display()
self.workspace_dir().display()
);
return;
}
Expand Down
24 changes: 18 additions & 6 deletions src/openhuman/channels/providers/telegram/bus_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use tempfile::tempdir;
#[tokio::test]
async fn subscriber_marks_busy_on_received_and_clears_on_processed() {
let dir = tempdir().expect("tempdir");
let subscriber = TelegramRemoteSubscriber::new(dir.path().to_path_buf());
let subscriber = TelegramRemoteSubscriber::new(std::sync::Arc::new(std::sync::RwLock::new(
dir.path().to_path_buf(),
)));
assert_eq!(subscriber.name(), "telegram::remote_control");
assert_eq!(subscriber.domains(), Some(&["channel"][..]));

Expand Down Expand Up @@ -50,7 +52,9 @@ async fn subscriber_marks_busy_on_received_and_clears_on_processed() {
#[tokio::test]
async fn subscriber_ignores_non_telegram_channel_events() {
let dir = tempdir().expect("tempdir");
let subscriber = TelegramRemoteSubscriber::new(dir.path().to_path_buf());
let subscriber = TelegramRemoteSubscriber::new(std::sync::Arc::new(std::sync::RwLock::new(
dir.path().to_path_buf(),
)));

subscriber
.handle(&DomainEvent::ChannelMessageReceived {
Expand All @@ -75,7 +79,9 @@ async fn subscriber_ignores_non_telegram_channel_events() {
#[tokio::test]
async fn telegram_received_matching_workspace_sets_busy() {
let dir = tempdir().expect("tempdir");
let subscriber = TelegramRemoteSubscriber::new(dir.path().to_path_buf());
let subscriber = TelegramRemoteSubscriber::new(std::sync::Arc::new(std::sync::RwLock::new(
dir.path().to_path_buf(),
)));

subscriber
.handle(&DomainEvent::ChannelMessageReceived {
Expand All @@ -99,7 +105,9 @@ async fn telegram_received_matching_workspace_sets_busy() {
async fn telegram_received_stale_workspace_does_not_set_busy() {
let dir = tempdir().expect("tempdir");
let stale = tempdir().expect("stale tempdir");
let subscriber = TelegramRemoteSubscriber::new(dir.path().to_path_buf());
let subscriber = TelegramRemoteSubscriber::new(std::sync::Arc::new(std::sync::RwLock::new(
dir.path().to_path_buf(),
)));

subscriber
.handle(&DomainEvent::ChannelMessageReceived {
Expand All @@ -122,7 +130,9 @@ async fn telegram_received_stale_workspace_does_not_set_busy() {
#[tokio::test]
async fn telegram_processed_matching_workspace_clears_busy() {
let dir = tempdir().expect("tempdir");
let subscriber = TelegramRemoteSubscriber::new(dir.path().to_path_buf());
let subscriber = TelegramRemoteSubscriber::new(std::sync::Arc::new(std::sync::RwLock::new(
dir.path().to_path_buf(),
)));

// First mark as busy via a matching received event.
subscriber
Expand Down Expand Up @@ -169,7 +179,9 @@ async fn telegram_processed_matching_workspace_clears_busy() {
async fn telegram_processed_stale_workspace_does_not_clear_busy() {
let dir = tempdir().expect("tempdir");
let stale = tempdir().expect("stale tempdir");
let subscriber = TelegramRemoteSubscriber::new(dir.path().to_path_buf());
let subscriber = TelegramRemoteSubscriber::new(std::sync::Arc::new(std::sync::RwLock::new(
dir.path().to_path_buf(),
)));

// Mark as busy via a matching received event.
subscriber
Expand Down
10 changes: 5 additions & 5 deletions src/openhuman/channels/providers/telegram/remote_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ async fn build_status_response(ctx: &ChannelRuntimeContext, msg: &ChannelMessage
.map(|h| h.len())
.unwrap_or(0);

let workspace = ctx.workspace_dir.clone();
let workspace = ctx.workspace_dir();
let reply_target = msg.reply_target.clone();
// Use with_store_read (no disk write) and spawn_blocking to keep the async
// executor thread unblocked during mutex acquisition + file I/O.
Expand Down Expand Up @@ -154,7 +154,7 @@ async fn build_status_response(ctx: &ChannelRuntimeContext, msg: &ChannelMessage
}

async fn build_sessions_response(ctx: &ChannelRuntimeContext, msg: &ChannelMessage) -> String {
let workspace = ctx.workspace_dir.clone();
let workspace = ctx.workspace_dir();
let reply_target = msg.reply_target.clone();
// Read-only lookup — use with_store_read (no save) wrapped in spawn_blocking.
let active_thread_id = tokio::task::spawn_blocking(move || {
Expand All @@ -166,7 +166,7 @@ async fn build_sessions_response(ctx: &ChannelRuntimeContext, msg: &ChannelMessa
.ok()
.and_then(|res| res.ok())
.flatten();
let workspace = ctx.workspace_dir.as_path();
let workspace = ctx.workspace_dir();

let threads = match conversations::list_threads(workspace.to_path_buf()) {
Ok(list) => list,
Expand Down Expand Up @@ -215,7 +215,7 @@ fn format_session_line(thread: &ConversationThread, active_id: Option<&str>) ->
}

async fn build_new_session_response(ctx: &ChannelRuntimeContext, msg: &ChannelMessage) -> String {
let workspace = ctx.workspace_dir.as_path();
let workspace = ctx.workspace_dir();
let sender_key = conversation_history_key(msg);
let thread_id = format!("thread-{}", uuid::Uuid::new_v4());
let now = chrono::Utc::now();
Expand Down Expand Up @@ -243,7 +243,7 @@ async fn build_new_session_response(ctx: &ChannelRuntimeContext, msg: &ChannelMe

clear_sender_history(ctx, &sender_key);

let workspace_dir = ctx.workspace_dir.clone();
let workspace_dir = ctx.workspace_dir();
let reply_target_owned = msg.reply_target.clone();
let thread_id_owned = thread_id.clone();
let sender_key_owned = sender_key.clone();
Expand Down
2 changes: 1 addition & 1 deletion src/openhuman/channels/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ pub(crate) async fn handle_runtime_command_if_needed(
}
}
ChannelRuntimeCommand::ShowModel => {
build_models_help_response(&current, ctx.workspace_dir.as_path())
build_models_help_response(&current, ctx.workspace_dir().as_path())
}
ChannelRuntimeCommand::SetModel(raw_model) => {
let model = raw_model.trim().trim_matches('`').to_string();
Expand Down
8 changes: 5 additions & 3 deletions src/openhuman/channels/routes_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ fn runtime_context(workspace_dir: PathBuf) -> ChannelRuntimeContext {
channels_by_name: Arc::new(HashMap::new()),
provider: Arc::new(DummyProvider),
default_provider: Arc::new("openai".into()),
memory: Arc::new(DummyMemory),
memory_handle: Arc::new(std::sync::RwLock::new(Arc::new(DummyMemory))),
tools_registry: Arc::new(vec![Box::new(DummyTool) as Box<dyn Tool>]),
system_prompt: Arc::new("prompt".into()),
model: Arc::new("reasoning-v1".into()),
Expand All @@ -151,7 +151,7 @@ fn runtime_context(workspace_dir: PathBuf) -> ChannelRuntimeContext {
reliability: Arc::new(crate::openhuman::config::ReliabilityConfig::default()),
provider_runtime_options:
crate::openhuman::inference::provider::ProviderRuntimeOptions::default(),
workspace_dir: Arc::new(workspace_dir),
workspace_handle: Arc::new(std::sync::RwLock::new(workspace_dir)),
message_timeout_secs: 60,
multimodal: crate::openhuman::config::MultimodalConfig::default(),
multimodal_files: crate::openhuman::config::MultimodalFileConfig::default(),
Expand Down Expand Up @@ -517,7 +517,9 @@ async fn handle_runtime_command_telegram_new_status_and_sessions_round_trip() {
},
);

let subscriber = TelegramRemoteSubscriber::new(tempdir.path().to_path_buf());
let subscriber = TelegramRemoteSubscriber::new(Arc::new(std::sync::RwLock::new(
tempdir.path().to_path_buf(),
)));
subscriber
.handle(&DomainEvent::ChannelMessageReceived {
channel: "telegram".into(),
Expand Down
15 changes: 9 additions & 6 deletions src/openhuman/channels/runtime/dispatch/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ pub(crate) async fn process_channel_message(
reply_target: msg.reply_target.clone(),
content: msg.content.clone(),
thread_ts: msg.thread_ts.clone(),
workspace_dir: ctx.workspace_dir.as_ref().clone(),
workspace_dir: ctx.workspace_dir(),
});

let target_channel = ctx.channels_by_name.get(&msg.channel).cloned();
Expand Down Expand Up @@ -214,13 +214,16 @@ pub(crate) async fn process_channel_message(
}
};

// Re-resolve the memory store through the re-bindable handle so a
// post-login active-user switch routes channel memory to the activated
// user's workspace (issue #4398).
let memory = ctx.memory();
let memory_context =
build_memory_context(ctx.memory.as_ref(), &msg.content, ctx.min_relevance_score).await;
build_memory_context(memory.as_ref(), &msg.content, ctx.min_relevance_score).await;

if ctx.auto_save_memory {
let autosave_key = conversation_memory_key(&msg);
let _ = ctx
.memory
let _ = memory
.store(
"",
&autosave_key,
Expand Down Expand Up @@ -619,7 +622,7 @@ pub(crate) async fn process_channel_message(
model: route.model.clone(),
elapsed_ms: started_at.elapsed().as_millis() as u64,
success: false,
workspace_dir: ctx.workspace_dir.as_ref().clone(),
workspace_dir: ctx.workspace_dir(),
});
return;
}
Expand Down Expand Up @@ -760,7 +763,7 @@ pub(crate) async fn process_channel_message(
model: response_model,
elapsed_ms: started_at.elapsed().as_millis() as u64,
success,
workspace_dir: ctx.workspace_dir.as_ref().clone(),
workspace_dir: ctx.workspace_dir(),
});
}

Expand Down
Loading
Loading