From b99792c124ed2b3334d5a490f4e722f105690054 Mon Sep 17 00:00:00 2001 From: Jeremy HERGAULT Date: Sun, 7 Jun 2026 18:55:00 +0200 Subject: [PATCH 1/3] feat: dynamic config Signed-off-by: Jeremy HERGAULT --- cargo-prosa/assets/build.rs.j2 | 20 +-- cargo-prosa/assets/main.rs.j2 | 40 ++++- prosa/Cargo.toml | 1 + prosa/examples/my_prosa_settings.yml | 14 +- prosa/examples/proc.rs | 118 +++++++++++--- prosa/src/core/error.rs | 6 + prosa/src/core/main.rs | 87 ++++++++++- prosa/src/core/msg.rs | 9 +- prosa/src/core/proc.rs | 10 +- prosa/src/core/settings.rs | 199 +++++++++++++++++++++++- prosa/src/inj/proc.rs | 7 +- prosa/src/stub/proc.rs | 29 +++- prosa_book/src/ch03-01-settings.md | 15 ++ prosa_book/src/ch03-02-creation.md | 7 +- prosa_book/src/ch03-05-service.md | 25 +-- prosa_book/src/ch03-06-events.md | 5 +- prosa_utils/src/config/observability.rs | 9 +- prosa_utils/src/config/tracing.rs | 92 +++++++---- 18 files changed, 594 insertions(+), 99 deletions(-) diff --git a/cargo-prosa/assets/build.rs.j2 b/cargo-prosa/assets/build.rs.j2 index b445cde..7e206f8 100644 --- a/cargo-prosa/assets/build.rs.j2 +++ b/cargo-prosa/assets/build.rs.j2 @@ -25,7 +25,7 @@ fn write_settings_rs(out_dir: &OsString, desc: &Desc, metadata: &HashMap<&str, M if let Some(processors) = &desc.proc {{ '{' }} writeln!(f, "\n/// ProSA Run settings")?; writeln!(f, "#[settings]")?; - writeln!(f, "#[derive(Default, Debug, Deserialize, Serialize)]")?; + writeln!(f, "#[derive(Default, Debug, Clone, Deserialize, Serialize)]")?; writeln!(f, "pub struct RunSettings {{ '{{' }}")?; for processor in processors {{ '{' }} let proc_metadata = metadata.get(processor.proc_name.as_str()).unwrap_or_else(|| panic!("Can't get the processor {{ '{}' }} metadata ({{ '{:?}' }})", processor.proc, processor.name)); @@ -90,18 +90,12 @@ fn write_config_rs(out_dir: &OsString, desc: &Desc, cargo_metadata: &CargoMetada writeln!(f, " .arg(::clap::arg!(-t --worker_threads \"Number of worker threads to use for the main\").value_parser(clap::value_parser!(u32).range(1..)).default_value(\"1\"))")?; writeln!(f, "{{ '}}' }}\n")?; - writeln!(f, "fn prosa_config(matches: &::clap::ArgMatches) -> Result<::config::Config, ::config::ConfigError> {{ '{{' }}")?; - writeln!(f, " prosa::core::settings::get_config_builder(")?; - writeln!(f, " matches.get_one::(\"config\").unwrap().as_str(),")?; - writeln!(f, " )")?; - writeln!(f, " .map_err(|e| ::config::ConfigError::Foreign(Box::new(e)))?")?; - writeln!(f, " .add_source(")?; - writeln!(f, " ::config::Environment::with_prefix(\"PROSA\")")?; - writeln!(f, " .try_parsing(true)")?; - writeln!(f, " .separator(\"_\")")?; - writeln!(f, " .list_separator(\" \"),")?; - writeln!(f, " )")?; - writeln!(f, " .build()")?; + writeln!(f, "fn prosa_config(matches: &::clap::ArgMatches) -> Result<::prosa::core::settings::ProsaConfig, ::config::ConfigError> {{ '{{' }}")?; + writeln!(f, " let config_path = matches")?; + writeln!(f, " .get_one::(\"config\")")?; + writeln!(f, " .ok_or_else(|| ::config::ConfigError::NotFound(\"config\".to_string()))?;")?; + writeln!(f, "")?; + writeln!(f, " ::prosa::core::settings::ProsaConfig::from_path(config_path)")?; writeln!(f, "{{ '}}' }}\n") {{ '}' }} diff --git a/cargo-prosa/assets/main.rs.j2 b/cargo-prosa/assets/main.rs.j2 index b0d8125..0044441 100644 --- a/cargo-prosa/assets/main.rs.j2 +++ b/cargo-prosa/assets/main.rs.j2 @@ -53,7 +53,8 @@ async fn prosa_main(matches: clap::ArgMatches) -> Result<(), Box()?; + let prosa_config_cache = prosa_config(&matches)?; + let mut prosa_settings = prosa_config_cache.try_deserialize::()?; // Provide ProSA name if set in command line if let Some(name) = matches.get_one::("name") {{ '{' }} @@ -69,6 +70,43 @@ async fn prosa_main(matches: clap::ArgMatches) -> Result<(), Box("config") + .ok_or_else(|| ::config::ConfigError::NotFound("config".to_string()))? + .clone(); + let reload_name = matches.get_one::("name").cloned(); + tokio::spawn(async move {{ '{' }} + prosa::core::settings::watch_config_reload::( + reload_config_path, + prosa_config_cache, + ::prosa::core::settings::ProsaConfig::from_path, + move |mut settings, new_config| {{ '{' }} + let reload_bus = reload_bus.clone(); + let reload_filter = reload_filter.clone(); + let reload_name = reload_name.clone(); + + async move {{ '{' }} + if let Some(name) = &reload_name {{ '{' }} + settings.set_prosa_name(name.clone()); + {{ '}' }} + + reload_filter.set_level(settings.get_observability().get_level().into()); + if let Err(err) = reload_bus.update_config(std::sync::Arc::new(new_config)).await {{ '{' }} + log::warn!("Can't notify processors of configuration reload: {{ '{err}' }}"); + false + {{ '}' }} else {{ '{' }} + true + {{ '}' }} + {{ '}' }} + {{ '}' }}, + ) + .await; + {{ '}' }}); // Launch the main task let main_task = main.run(); diff --git a/prosa/Cargo.toml b/prosa/Cargo.toml index 978345f..69251d4 100644 --- a/prosa/Cargo.toml +++ b/prosa/Cargo.toml @@ -60,6 +60,7 @@ async-http-proxy = { version = "1", optional = true, features = ["runtime-tokio" serde.workspace = true config = { version = "0.15", default-features = false, features = ["toml", "yaml", "json", "convert-case"] } +notify = "8" glob = { version = "0.3" } toml.workspace = true serde_yaml = "0.9" diff --git a/prosa/examples/my_prosa_settings.yml b/prosa/examples/my_prosa_settings.yml index 184c876..6e2607a 100644 --- a/prosa/examples/my_prosa_settings.yml +++ b/prosa/examples/my_prosa_settings.yml @@ -1,10 +1,22 @@ name: my-prosa observability: - level: DEBUG + level: INFO metrics: prometheus: endpoint: 0.0.0.0:9100 traces: stdout: level: debug + +stub_proc: + service_names: + - STUB_TEST + +proc_1: + service_name: PROC_TEST + tick_secs: 4 + +proc_2: + service_name: PROC_TEST_2 + tick_secs: 4 diff --git a/prosa/examples/proc.rs b/prosa/examples/proc.rs index 17a2d55..12bb586 100644 --- a/prosa/examples/proc.rs +++ b/prosa/examples/proc.rs @@ -1,18 +1,18 @@ -use config::Config; use prosa::core::adaptor::Adaptor; use prosa::core::error::ProcError; use prosa::core::main::{MainProc, MainRunnable}; use prosa::core::msg::{InternalMsg, Msg, RequestMsg}; use prosa::core::proc::{Proc, ProcBusParam, ProcConfig, proc}; -use prosa::core::settings::Settings; -use prosa::core::settings::settings; +use prosa::core::settings::{ProsaConfig, Settings, settings}; use prosa::core::settings::tracing::TelemetryFilter; use prosa::event::pending::PendingMsgs; use prosa::stub::adaptor::StubParotAdaptor; use prosa::stub::proc::{StubProc, StubSettings}; use prosa::tracing::{debug, info, warn}; +use prosa_macros::proc_settings; use prosa_utils::msg::simple_string_tvf::SimpleStringTvf; use serde::{Deserialize, Serialize}; +use std::sync::Arc; use std::time::Duration; use tokio::time; use tracing::metadata::LevelFilter; @@ -20,7 +20,30 @@ use tracing::metadata::LevelFilter; #[derive(Default, Adaptor)] struct MyAdaptor {} -#[proc] +#[proc_settings] +#[derive(Debug, Deserialize, Serialize, Clone)] +struct MyProcSettings { + service_name: String, + tick_secs: u64, +} + +impl MyProcSettings { + fn interval(&self) -> time::Interval { + time::interval(time::Duration::from_secs(self.tick_secs.max(1))) + } +} + +#[proc_settings] +impl Default for MyProcSettings { + fn default() -> Self { + MyProcSettings { + service_name: String::from("PROC_TEST"), + tick_secs: 4, + } + } +} + +#[proc(settings = MyProcSettings)] struct MyProcClass {} #[proc] @@ -32,9 +55,9 @@ where let adaptor = A::default(); self.proc.add_proc().await?; self.proc - .add_service_proc(vec![String::from("PROC_TEST")]) + .add_service_proc(vec![self.settings.service_name.clone()]) .await?; - let mut interval = time::interval(time::Duration::from_secs(4)); + let mut interval = self.settings.interval(); let mut pending_msgs: PendingMsgs, M> = Default::default(); loop { tokio::select! { @@ -43,7 +66,6 @@ where InternalMsg::Request(msg) => { info!("Proc {} receive a request: {:?}", self.get_proc_id(), msg); - // Push in the pending message pending_msgs.push(msg, Duration::from_millis(200)); //msg.return_to_sender(tvf).await.unwrap(); @@ -56,8 +78,25 @@ where let _enter = err.enter_span(); info!("Proc {} receive an error: {:?}", self.get_proc_id(), err); }, - InternalMsg::Command(_) => todo!(), - InternalMsg::Config => todo!(), + InternalMsg::Config(config) => { + let settings = config.get_proc::(self.proc.as_ref())?; + + if self.settings.service_name != settings.service_name { + self.proc + .remove_service_proc(vec![self.settings.service_name.clone()]) + .await?; + self.proc + .add_service_proc(vec![settings.service_name.clone()]) + .await?; + } + + if self.settings.tick_secs != settings.tick_secs { + interval = settings.interval(); + } + + info!("Proc {} reloaded settings: {:?}", self.get_proc_id(), settings); + self.settings = settings; + }, InternalMsg::Service(table) => { debug!("New service table received:\n{}\n", table); self.service = table; @@ -81,10 +120,9 @@ where let _ = service.proc_queue.send(InternalMsg::Request(RequestMsg::new(stub_service_name, tvf.clone(), self.proc.get_service_queue()))).await; } - let proc_service_name = String::from("PROC_TEST"); - if let Some(service) = self.service.get_proc_service(&proc_service_name) { + if let Some(service) = self.service.get_proc_service(&self.settings.service_name) { debug!("The service is find: {:?}", service); - let _ = service.proc_queue.send(InternalMsg::Request(RequestMsg::new(proc_service_name, tvf, self.proc.get_service_queue()))).await; + let _ = service.proc_queue.send(InternalMsg::Request(RequestMsg::new(self.settings.service_name.clone(), tvf, self.proc.get_service_queue()))).await; } }, Some(msg) = pending_msgs.pull(), if !pending_msgs.is_empty() => { @@ -106,17 +144,18 @@ where #[settings] #[derive(Default, Debug, Deserialize, Serialize)] struct MySettings { - // Can add parameters here + stub_proc: StubSettings, + proc_1: MyProcSettings, + proc_2: MyProcSettings, } #[allow(clippy::needless_return)] #[tokio::main] async fn main() -> Result<(), Box> { + let config_path = "examples/my_prosa_settings.yml"; + // load the configuration - let config = Config::builder() - .add_source(config::File::with_name("examples/my_prosa_settings.yml")) - .add_source(config::Environment::with_prefix("PROSA")) - .build()?; + let config = ProsaConfig::from_path(config_path)?; let my_settings = config.try_deserialize::()?; println!("My ProSA settings: {my_settings:?}"); @@ -129,26 +168,61 @@ async fn main() -> Result<(), Box> { // Create bus and main processor let (bus, main) = MainProc::::create(&my_settings, Some(3)); + bus.update_config(Arc::new(config.clone())).await?; + + let reload_bus = bus.clone(); + let reload_filter = telemetry_filter.clone(); + tokio::spawn(async move { + prosa::core::settings::watch_config_reload::( + String::from(config_path), + config, + ProsaConfig::from_path, + move |settings, new_config| { + let reload_bus = reload_bus.clone(); + let reload_filter = reload_filter.clone(); + async move { + reload_filter.set_level(settings.get_observability().get_level().into()); + + if let Err(err) = reload_bus.update_config(Arc::new(new_config)).await { + warn!("Can't notify processors of configuration reload: {err}"); + false + } else { + true + } + } + }, + ) + .await; + }); // Launch a stub processor - let stub_settings = StubSettings::new(vec![String::from("STUB_TEST")]); let stub_proc = StubProc::::create( 1, - String::from("STUB_PROC"), + String::from("stub_proc"), bus.clone(), - stub_settings, + my_settings.stub_proc.clone(), ); Proc::::run(stub_proc)?; // Launch the test processor - let proc = MyProcClass::::create_raw(2, String::from("proc_1"), bus.clone()); + let proc = MyProcClass::::create( + 2, + String::from("proc_1"), + bus.clone(), + my_settings.proc_1.clone(), + ); Proc::::run(proc)?; // Wait before launch the second processor std::thread::sleep(time::Duration::from_secs(2)); // Launch the second test processor - let proc2 = MyProcClass::::create_raw(3, String::from("proc_2"), bus.clone()); + let proc2 = MyProcClass::::create( + 3, + String::from("proc_2"), + bus.clone(), + my_settings.proc_2.clone(), + ); Proc::::run(proc2)?; // Wait on main task diff --git a/prosa/src/core/error.rs b/prosa/src/core/error.rs index c8485f0..77e9086 100644 --- a/prosa/src/core/error.rs +++ b/prosa/src/core/error.rs @@ -36,6 +36,12 @@ impl ProcError for ConfigError { } } +impl ProcError for ::config::ConfigError { + fn recoverable(&self) -> bool { + false + } +} + impl ProcError for tokio::task::JoinError { fn recoverable(&self) -> bool { self.is_cancelled() diff --git a/prosa/src/core/main.rs b/prosa/src/core/main.rs index b29a707..bf809fd 100644 --- a/prosa/src/core/main.rs +++ b/prosa/src/core/main.rs @@ -14,7 +14,7 @@ use super::{ msg::{InternalMainMsg, InternalMsg, Tvf}, proc::ProcBusParam, service::{ProcService, ServiceTable}, - settings::Settings, + settings::{ProsaConfig, Settings}, }; use crate::otel::metrics::{Meter, MeterProvider as _}; use crate::otel::trace::TracerProvider as _; @@ -228,6 +228,17 @@ where .await?) } + /// Method to notify processors that the configuration changed + pub async fn update_config( + &self, + config: Arc, + ) -> Result<(), SendError>> { + Ok(self + .internal_tx_queue + .send(InternalMainMsg::Config(config)) + .await?) + } + /// Provide the ProSA name based on ProSA settings pub fn name(&self) -> &String { &self.name @@ -256,6 +267,7 @@ where name: String, processors: HashMap>>, services: Arc>, + config: Option>, internal_rx_queue: mpsc::Receiver>, meter: Meter, stop: Arc, @@ -329,6 +341,58 @@ where Ok(()) } + /// Method to notify processors whose configuration has changed + async fn notify_config_proc_queue(&mut self, config: Arc) -> Result<(), BusError> { + let current = self.config.as_deref(); + for proc in self.processors.values() { + for proc_service in proc.values() { + if current.is_some_and(|current_config| { + !current_config.has_proc_changed(&config, &proc_service.get_proc_config_key()) + }) { + continue; + } + + if let Err(e) = proc_service + .proc_queue + .send(InternalMsg::Config(config.clone())) + .await + { + return Err(BusError::ProcComm( + proc_service.get_proc_id(), + proc_service.get_queue_id(), + e.to_string(), + )); + } + } + } + + self.config = Some(config); + + Ok(()) + } + + /// Method to notify one processor queue with the current configuration + async fn notify_config_proc_service( + &self, + proc_service: &ProcService, + ) -> Result<(), BusError> { + if let Some(config) = &self.config { + proc_service + .proc_queue + .send(InternalMsg::Config(config.clone())) + .await + .map_err(|e| { + BusError::ProcComm( + proc_service.get_proc_id(), + proc_service.get_queue_id(), + e.to_string(), + ) + })?; + } + + Ok(()) + } + /// Method to notify all processor that the service table have changed async fn notify_srv_proc(&mut self) -> bool { if let Err(BusError::ProcComm(proc_id, queue_id, _)) = self.notify_srv_proc_queue().await { @@ -392,6 +456,7 @@ where name, processors, services: Arc::new(ServiceTable::default()), + config: None, internal_rx_queue, meter, stop, @@ -520,6 +585,7 @@ where let proc_id = proc.get_proc_id(); let queue_id = proc.get_queue_id(); let proc_queue = proc.proc_queue.clone(); + let proc_config = proc.clone(); if let Some(proc_service) = self.processors.get_mut(&proc_id) { proc_service.insert(queue_id, proc); } else { @@ -538,6 +604,14 @@ where } } + if self.notify_config_proc_service(&proc_config).await.is_err() { + if let Some(proc_service) = self.processors.get_mut(&proc_id) { + let _ = proc_service.remove(&queue_id); + } else { + let _ = self.processors.remove(&proc_id); + } + } + prosa_main_record_proc!(); }, InternalMainMsg::DeleteProc(proc_id, proc_err) => { @@ -608,8 +682,15 @@ where let _ = service_update.send(self.services.clone()); prosa_main_update_srv!(); }, - InternalMainMsg::Command(cmd)=> { - info!("Wan't to execute the command {}", cmd); + InternalMainMsg::Config(config) => { + info!("Reload ProSA configuration"); + if let Err(BusError::ProcComm(proc_id, queue_id, _)) = self.notify_config_proc_queue(config).await { + if queue_id > 0 { + self.remove_proc_queue(proc_id, queue_id).await; + } else { + self.remove_proc(proc_id).await; + } + } }, InternalMainMsg::Shutdown(reason) => { warn!("ProSA need to stop: {}", reason); diff --git a/prosa/src/core/msg.rs b/prosa/src/core/msg.rs index 1546ba9..a6525bd 100644 --- a/prosa/src/core/msg.rs +++ b/prosa/src/core/msg.rs @@ -7,6 +7,7 @@ use super::{ error::{BusError, ProcError}, queue::{InternalMsgQueue, SendError}, service::{ProcService, ServiceError, ServiceTable}, + settings::ProsaConfig, }; use tracing::{Level, Span, event, info_span, span}; @@ -33,8 +34,8 @@ where DeleteProcService(Vec, u32), /// Message to unregister service(s) for a processor queue. Message that contain service(s) name(s), the processor id, and the queue id DeleteService(Vec, u32, u32), - /// Command to ask an action or a status to the main processor - Command(String), + /// Message to notify processors that the configuration changed + Config(Arc), /// Internal call for shutdown (with a reason) Shutdown(String), } @@ -51,10 +52,8 @@ where Response(ResponseMsg), /// Response of a data request message by an error Error(ErrorMsg), - /// Command to ask an actiion or a status to the processor - Command(String), /// Message to ask the processor to reload its configuration - Config, + Config(Arc), /// Message to ask the processor to reload its service table Service(Arc>), /// Message to ask the processor to shutdown diff --git a/prosa/src/core/proc.rs b/prosa/src/core/proc.rs index e9f72bb..63de268 100644 --- a/prosa/src/core/proc.rs +++ b/prosa/src/core/proc.rs @@ -115,8 +115,9 @@ //! InternalMsg::Error(err) => { //! // TODO process the error //! }, -//! InternalMsg::Command(_) => todo!(), -//! InternalMsg::Config => todo!(), +//! InternalMsg::Config(config) => { +//! self.settings = config.get_proc(&self.proc)?; +//! }, //! InternalMsg::Service(table) => self.service = table, //! InternalMsg::Shutdown => { //! adaptor.terminate(); @@ -217,6 +218,11 @@ pub trait ProcBusParam { /// Provide the processor name fn name(&self) -> &str; + + /// Provide the processor configuration key + fn get_proc_config_key(&self) -> String { + self.name().replace('-', "_") + } } impl Debug for dyn ProcBusParam { diff --git a/prosa/src/core/settings.rs b/prosa/src/core/settings.rs index a5dc472..c7587a1 100644 --- a/prosa/src/core/settings.rs +++ b/prosa/src/core/settings.rs @@ -6,12 +6,19 @@ use std::{ ffi::OsStr, fs, + future::Future, io::{self, Write}, + path::Path, }; -use config::{Config, ConfigBuilder, builder::DefaultState}; +use config::{Config, ConfigBuilder, ValueKind, builder::DefaultState}; +use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher}; use prosa_utils::config::observability::Observability; use serde::Serialize; +use serde::de::DeserializeOwned; +use tokio::sync::mpsc; + +use super::proc::ProcBusParam; /// Re-export of prosa_utils for observability config pub use prosa_utils::config::{observability, tracing}; @@ -154,6 +161,171 @@ pub fn get_config_builder(path: &str) -> io::Result> } } +/// Loaded ProSA configuration. +#[derive(Clone, Debug)] +pub struct ProsaConfig { + config: Config, +} + +impl ProsaConfig { + /// Create a ProSA configuration wrapper from a loaded [`Config`]. + pub fn new(config: Config) -> Self { + Self { config } + } + + /// Load a ProSA configuration from a file or directory path. + pub fn from_path(config_path: &str) -> Result { + get_config_builder(config_path) + .map_err(|e| config::ConfigError::Foreign(Box::new(e)))? + .add_source( + config::Environment::with_prefix("PROSA") + .try_parsing(true) + .separator("_") + .list_separator(" "), + ) + .build() + .map(Self::new) + } + + /// Access the underlying loaded configuration. + pub fn config(&self) -> &Config { + &self.config + } + + /// Deserialize the full configuration. + pub fn try_deserialize(&self) -> Result + where + C: DeserializeOwned, + { + self.config.clone().try_deserialize() + } + + /// Deserialize a processor configuration from its processor name. + pub fn get_proc(&self, proc: &impl ProcBusParam) -> Result + where + C: DeserializeOwned, + { + self.config.get::(&proc.get_proc_config_key()) + } + + /// Check if this configuration differs from another loaded configuration. + pub fn has_changed(&self, new: &Self) -> bool { + self.config.cache != new.config.cache + } + + /// Check if one processor configuration differs from another loaded configuration. + pub fn has_proc_changed(&self, new: &Self, proc_config_key: &str) -> bool { + if let (ValueKind::Table(current_table), ValueKind::Table(new_table)) = + (&self.config.cache.kind, &new.config.cache.kind) + { + current_table.get(proc_config_key) != new_table.get(proc_config_key) + } else { + false + } + } +} + +impl From for ProsaConfig { + fn from(config: Config) -> Self { + Self::new(config) + } +} + +impl From for Config { + fn from(prosa_config: ProsaConfig) -> Self { + prosa_config.config + } +} + +/// Watches a configuration path and exposes native file change events. +pub struct ConfigWatcher { + _watcher: RecommendedWatcher, + events: mpsc::UnboundedReceiver>, +} + +impl ConfigWatcher { + /// Wait for the next configuration file system event. + pub async fn changed(&mut self) -> Option> { + self.events.recv().await + } +} + +/// Create a watcher for the configuration file or directory. +pub fn watch_config_path(config_path: &str) -> notify::Result { + let (tx, events) = mpsc::unbounded_channel(); + let mut watcher = notify::recommended_watcher(move |event| { + let _ = tx.send(event); + })?; + + watcher.watch(Path::new(config_path), RecursiveMode::NonRecursive)?; + + Ok(ConfigWatcher { + _watcher: watcher, + events, + }) +} + +/// Filter out file system events that cannot affect configuration content. +pub fn is_config_reload_event(event: &Event) -> bool { + matches!( + event.kind, + EventKind::Any | EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_) + ) +} + +/// Watch and reload a configuration path on file system changes. +pub async fn watch_config_reload( + config_path: String, + mut current_config: ProsaConfig, + mut load_config: LoadConfig, + mut apply_config: ApplyConfig, +) where + S: DeserializeOwned, + LoadConfig: FnMut(&str) -> Result, + ApplyConfig: FnMut(S, ProsaConfig) -> ApplyFuture, + ApplyFuture: Future, +{ + let mut config_watcher = match watch_config_path(&config_path) { + Ok(config_watcher) => config_watcher, + Err(err) => { + log::warn!("Can't watch configuration {config_path}: {err}"); + return; + } + }; + + loop { + match config_watcher.changed().await { + Some(Ok(event)) if is_config_reload_event(&event) => {} + Some(Ok(_)) => continue, + Some(Err(err)) => { + log::warn!("Error watching configuration {config_path}: {err}"); + continue; + } + None => { + log::warn!("Configuration watcher stopped for {config_path}"); + return; + } + } + + match load_config(&config_path) { + Ok(new_config) if current_config.has_changed(&new_config) => { + match new_config.try_deserialize::() { + Ok(settings) => { + if apply_config(settings, new_config.clone()).await { + current_config = new_config; + } + } + Err(err) => { + log::error!("Configuration changed but can't be deserialized: {err}") + } + } + } + Ok(_) => {} + Err(err) => log::warn!("Can't reload configuration {config_path}: {err}"), + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -189,4 +361,29 @@ mod tests { assert_eq!("test", test_settings.name_test); assert_eq!("test2", test_settings.name_test2); } + + #[test] + fn test_proc_config_hash_change() -> Result<(), config::ConfigError> { + let current = ProsaConfig::new( + Config::builder() + .set_override("proc_1.service_name", "PROC_TEST")? + .set_override("proc_1.tick_secs", 4)? + .set_override("proc_2.service_name", "PROC_TEST_2")? + .set_override("proc_2.tick_secs", 4)? + .build()?, + ); + let new = ProsaConfig::new( + Config::builder() + .set_override("proc_1.service_name", "PROC_TEST_UPDATED")? + .set_override("proc_1.tick_secs", 4)? + .set_override("proc_2.service_name", "PROC_TEST_2")? + .set_override("proc_2.tick_secs", 4)? + .build()?, + ); + + assert!(current.has_proc_changed(&new, "proc_1")); + assert!(!current.has_proc_changed(&new, "proc_2")); + + Ok(()) + } } diff --git a/prosa/src/inj/proc.rs b/prosa/src/inj/proc.rs index b32890b..e56283c 100644 --- a/prosa/src/inj/proc.rs +++ b/prosa/src/inj/proc.rs @@ -197,8 +197,11 @@ impl InjProc { // Build the next transaction let _ = next_transaction.get_or_insert(adaptor.build_transaction()); } - InternalMsg::Command(_) => todo!(), - InternalMsg::Config => todo!(), + InternalMsg::Config(config) => { + let settings = config.get_proc::(self.proc.as_ref())?; + *regulator = settings.get_regulator(); + self.settings = settings; + } InternalMsg::Service(table) => self.service = table, InternalMsg::Shutdown => { adaptor.terminate(); diff --git a/prosa/src/stub/proc.rs b/prosa/src/stub/proc.rs index c106a42..8a3bc6f 100644 --- a/prosa/src/stub/proc.rs +++ b/prosa/src/stub/proc.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{collections::HashSet, sync::Arc}; use crate::tracing::debug; use prosa_macros::proc_settings; @@ -117,8 +117,31 @@ where self.get_proc_id(), err ), - InternalMsg::Command(_) => todo!(), - InternalMsg::Config => todo!(), + InternalMsg::Config(config) => { + let settings = config.get_proc::(self.proc.as_ref())?; + + let current_services = + self.settings.service_names.iter().collect::>(); + let new_services = settings.service_names.iter().collect::>(); + + let services_to_remove = current_services + .difference(&new_services) + .map(|service| (*service).clone()) + .collect::>(); + if !services_to_remove.is_empty() { + self.proc.remove_service_proc(services_to_remove).await?; + } + + let services_to_add = new_services + .difference(¤t_services) + .map(|service| (*service).clone()) + .collect::>(); + if !services_to_add.is_empty() { + self.proc.add_service_proc(services_to_add).await?; + } + + self.settings = settings; + } InternalMsg::Service(table) => self.service = table, InternalMsg::Shutdown => { adaptor.terminate(); diff --git a/prosa_book/src/ch03-01-settings.md b/prosa_book/src/ch03-01-settings.md index 72a0422..b2a5b6f 100644 --- a/prosa_book/src/ch03-01-settings.md +++ b/prosa_book/src/ch03-01-settings.md @@ -5,6 +5,21 @@ You'll specify your processor settings object when you create your processor in > `Settings` is the top-level configuration object, while `ProcSettings` is specific to processors. +## Loading + +Use [`ProsaConfig`](https://docs.rs/prosa/latest/prosa/core/settings/struct.ProsaConfig.html) to load a ProSA configuration from a file or a directory: + +```rust,noplayground +use prosa::core::settings::ProsaConfig; + +let config = ProsaConfig::from_path("prosa.yml")?; +let settings = config.try_deserialize::()?; +``` + +`ProsaConfig::from_path()` uses the same configuration loading rules as ProSA itself: the path can be a single configuration file or a directory containing `yml`, `yaml`, or `toml` files, and `PROSA_*` environment variables are applied on top of the file sources. + +When the main task notifies a processor about a configuration change, the message contains the same `ProsaConfig` wrapper. Processors should reload their own section with `config.get_proc(self.proc.as_ref())?`. + ## Creation To create a processor settings, declare a `struct` and use the [`proc_settings`](https://docs.rs/prosa/latest/prosa/core/proc/attr.proc_settings.html) macro. diff --git a/prosa_book/src/ch03-02-creation.md b/prosa_book/src/ch03-02-creation.md index 78e8bfa..020f7f8 100644 --- a/prosa_book/src/ch03-02-creation.md +++ b/prosa_book/src/ch03-02-creation.md @@ -57,8 +57,9 @@ where InternalMsg::Error(err) => { // TODO: process the error } - InternalMsg::Command(_) => todo!(), - InternalMsg::Config => todo!(), + InternalMsg::Config(config) => { + self.settings = config.get_proc(self.proc.as_ref())?; + }, InternalMsg::Service(table) => self.service = table, InternalMsg::Shutdown => { adaptor.terminate(); @@ -72,6 +73,8 @@ where } ``` +When receiving `InternalMsg::Config(config)`, call `config.get_proc(self.proc.as_ref())?` to deserialize the section matching the processor configuration key (the processor name with `-` replaced by `_`) into the processor settings type. The main task only sends this message to processors whose own configuration section changed. + The generic parameter `A` represents the adaptor type your processor uses. Specify in the _where_ clause which traits your adaptor must implement (commonly, [`Adaptor`](https://docs.rs/prosa/latest/prosa/core/adaptor/trait.Adaptor.html) plus `Send` and `Sync`) diff --git a/prosa_book/src/ch03-05-service.md b/prosa_book/src/ch03-05-service.md index 305c071..f68f8df 100644 --- a/prosa_book/src/ch03-05-service.md +++ b/prosa_book/src/ch03-05-service.md @@ -41,8 +41,9 @@ To start listening to a specific service, call [`add_service_proc()`](https://do InternalMsg::Error(err) => { // Handle errors as if they were responses }, - InternalMsg::Command(_) => todo!(), - InternalMsg::Config => todo!(), + InternalMsg::Config(config) => { + self.settings = config.get_proc(self.proc.as_ref())?; + }, InternalMsg::Service(table) => self.service = table, InternalMsg::Shutdown => { self.proc.remove_proc(None).await?; @@ -107,8 +108,9 @@ In this case, you can declare multiple listener subtasks, each of which subscrib InternalMsg::Error(err) => { // Handle errors as if they were responses }, - InternalMsg::Command(_) => todo!(), - InternalMsg::Config => todo!(), + InternalMsg::Config(config) => { + self.settings = config.get_proc(self.proc.as_ref())?; + }, InternalMsg::Service(table) => service = table, InternalMsg::Shutdown => { self.proc.remove_proc(None).await?; @@ -131,8 +133,9 @@ In this case, you can declare multiple listener subtasks, each of which subscrib InternalMsg::Error(err) => { // Handle errors as if they were responses }, - InternalMsg::Command(_) => todo!(), - InternalMsg::Config => todo!(), + InternalMsg::Config(config) => { + self.settings = config.get_proc(self.proc.as_ref())?; + }, InternalMsg::Service(table) => self.service = table, InternalMsg::Shutdown => { self.proc.remove_proc(None).await?; @@ -178,8 +181,9 @@ After that, you are free to call any services. InternalMsg::Error(err) => { // Handle errors }, - InternalMsg::Command(_) => todo!(), - InternalMsg::Config => todo!(), + InternalMsg::Config(config) => { + self.settings = config.get_proc(self.proc.as_ref())?; + }, InternalMsg::Service(table) => self.service = table, InternalMsg::Shutdown => { self.proc.remove_proc(None).await?; @@ -244,8 +248,9 @@ The logic is similar to single senders, but you specify the queue when sending m InternalMsg::Error(err) => { // Handle errors for this subtask }, - InternalMsg::Command(_) => todo!(), - InternalMsg::Config => todo!(), + InternalMsg::Config(config) => { + self.settings = config.get_proc(self.proc.as_ref())?; + }, InternalMsg::Service(table) => self.service = table, InternalMsg::Shutdown => { self.proc.remove_proc(None).await?; diff --git a/prosa_book/src/ch03-06-events.md b/prosa_book/src/ch03-06-events.md index f6b74f7..d5c7767 100644 --- a/prosa_book/src/ch03-06-events.md +++ b/prosa_book/src/ch03-06-events.md @@ -54,8 +54,9 @@ There are three important methods you need to use for this object: let _enter = err.enter_span(); info!("Proc {} receive an error: {:?}", self.get_proc_id(), err); }, - InternalMsg::Command(_) => todo!(), - InternalMsg::Config => todo!(), + InternalMsg::Config(config) => { + self.settings = config.get_proc(self.proc.as_ref())?; + }, InternalMsg::Service(table) => { debug!("New service table received:\n{}\n", table); self.service = table; diff --git a/prosa_utils/src/config/observability.rs b/prosa_utils/src/config/observability.rs index e6d3c43..002fb8a 100644 --- a/prosa_utils/src/config/observability.rs +++ b/prosa_utils/src/config/observability.rs @@ -523,6 +523,11 @@ impl Observability { } } + /// Getter of the global telemetry level + pub fn get_level(&self) -> TelemetryLevel { + self.level + } + /// Meter provider builder #[cfg(feature = "config-observability-prometheus")] pub fn build_meter_provider(&self, registry: &prometheus::Registry) -> SdkMeterProvider { @@ -607,8 +612,8 @@ impl Observability { /// Method to init `tracing` pub fn tracing_init(&self, filter: &TelemetryFilter) -> Result<(), TryInitError> { - let global_level: filter::LevelFilter = self.level.into(); - let subscriber = tracing_subscriber::registry().with(global_level); + filter.set_level(self.level.into()); + let subscriber = tracing_subscriber::registry().with(filter::LevelFilter::TRACE); if let Some(traces) = &self.traces { if let Some(otlp) = &traces.otlp { diff --git a/prosa_utils/src/config/tracing.rs b/prosa_utils/src/config/tracing.rs index 177492d..d5f9f57 100644 --- a/prosa_utils/src/config/tracing.rs +++ b/prosa_utils/src/config/tracing.rs @@ -8,8 +8,9 @@ use serde::de::Unexpected; use serde::de::Visitor; use std::collections::HashMap; use std::fmt; +use std::sync::{Arc, RwLock}; use tracing_core::Event; -use tracing_core::{Metadata, subscriber::Interest}; +use tracing_core::Metadata; use tracing_subscriber::filter; use tracing_subscriber::layer; @@ -154,54 +155,93 @@ impl<'de> Deserialize<'de> for TelemetryLevel { /// ``` #[derive(Debug, Clone)] pub struct TelemetryFilter { + inner: Arc>, + max_level: Option, +} + +#[derive(Debug, Clone)] +struct TelemetryFilterInner { proc_levels: HashMap, - pub(crate) level: filter::LevelFilter, + level: filter::LevelFilter, } impl TelemetryFilter { /// Method to create a new telemetry filter pub fn new(level: filter::LevelFilter) -> TelemetryFilter { TelemetryFilter { - proc_levels: HashMap::new(), - level, + inner: Arc::new(RwLock::new(TelemetryFilterInner { + proc_levels: HashMap::new(), + level, + })), + max_level: None, } } /// Method to clone the telemetry filter and change its default level if it's less verbose pub fn clone_with_level(&self, level: TelemetryLevel) -> TelemetryFilter { - let mut filter = self.clone(); - let level: filter::LevelFilter = level.into(); - if level < filter.level { - filter.level = level; + TelemetryFilter { + inner: self.inner.clone(), + max_level: Some(level.into()), + } + } + + /// Method to update the dynamic default telemetry level + pub fn set_level(&self, level: filter::LevelFilter) { + if let Ok(mut inner) = self.inner.write() { + inner.level = level; + } + } + + /// Getter of the current dynamic default telemetry level + pub fn level(&self) -> filter::LevelFilter { + let mut level = self + .inner + .read() + .map(|inner| inner.level) + .unwrap_or(filter::LevelFilter::OFF); + + if let Some(max_level) = self.max_level + && max_level < level + { + level = max_level; } - filter + level } /// Method to add a filter on a specific processor - pub fn add_proc_filter(&mut self, proc_name: String, level: filter::LevelFilter) { - self.proc_levels.insert(proc_name, level); + pub fn add_proc_filter(&self, proc_name: String, level: filter::LevelFilter) { + if let Ok(mut inner) = self.inner.write() { + inner.proc_levels.insert(proc_name, level); + } } fn is_enabled(&self, metadata: &Metadata<'_>) -> bool { - let level = if let Some(value) = self.proc_levels.get(metadata.name()) { - value - } else if let Some(value) = self.proc_levels.get(metadata.target()) { - value + let Ok(inner) = self.inner.read() else { + return false; + }; + + let mut level = if let Some(value) = inner.proc_levels.get(metadata.name()) { + *value + } else if let Some(value) = inner.proc_levels.get(metadata.target()) { + *value } else { - &self.level + inner.level }; - metadata.level() <= level + if let Some(max_level) = self.max_level + && max_level < level + { + level = max_level; + } + + metadata.level() <= &level } } impl Default for TelemetryFilter { fn default() -> TelemetryFilter { - TelemetryFilter { - proc_levels: HashMap::new(), - level: filter::LevelFilter::TRACE, - } + TelemetryFilter::new(filter::LevelFilter::TRACE) } } @@ -210,20 +250,12 @@ impl layer::Filter for TelemetryFilter { self.is_enabled(metadata) } - fn callsite_enabled(&self, metadata: &'static Metadata<'static>) -> Interest { - if self.is_enabled(metadata) { - Interest::always() - } else { - Interest::never() - } - } - fn event_enabled(&self, event: &Event<'_>, _: &layer::Context<'_, S>) -> bool { self.is_enabled(event.metadata()) } fn max_level_hint(&self) -> Option { - Some(self.level) + Some(self.level()) } } From 39b5fdf3cd08a974bbb70313d7e2c8076c8fa14e Mon Sep 17 00:00:00 2001 From: Jeremy HERGAULT Date: Sat, 27 Jun 2026 18:48:16 +0200 Subject: [PATCH 2/3] feat: dynamic config for adaptor and global improvements Signed-off-by: Jeremy HERGAULT --- Cargo.toml | 2 +- cargo-prosa/Cargo.toml | 2 +- deny.toml | 1 + prosa/Cargo.toml | 2 +- prosa/examples/my_prosa_settings.yml | 1 + prosa/examples/proc.rs | 18 +- prosa/examples/stub_async_parot_adaptor.yml | 1 + prosa/src/core/adaptor.rs | 30 ++ prosa/src/core/main.rs | 91 +++-- prosa/src/core/msg.rs | 4 + prosa/src/core/proc.rs | 34 +- prosa/src/core/settings.rs | 373 +++++++++++++++++--- prosa/src/inj/proc.rs | 21 +- prosa/src/stub/adaptor.rs | 94 ++++- prosa/src/stub/proc.rs | 25 +- prosa_book/src/ch03-01-settings.md | 3 +- prosa_book/src/ch03-09-builtin.md | 17 +- prosa_utils/Cargo.toml | 2 +- prosa_utils/src/config/tracing.rs | 6 + 19 files changed, 583 insertions(+), 144 deletions(-) create mode 100644 prosa/examples/stub_async_parot_adaptor.yml diff --git a/Cargo.toml b/Cargo.toml index c471067..7b2375d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,7 @@ include = [ ] [workspace.dependencies] -prosa-utils = { version = "0.4.3", path = "prosa_utils" } +prosa-utils = { version = "0.5.0", path = "prosa_utils" } prosa-macros = { version = "0.4.2", path = "prosa_macros" } thiserror = "2" simple-mermaid = "0.2" diff --git a/cargo-prosa/Cargo.toml b/cargo-prosa/Cargo.toml index 9a2c726..f33039d 100644 --- a/cargo-prosa/Cargo.toml +++ b/cargo-prosa/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "cargo-prosa" -version = "0.4.2" +version = "0.5.0" authors.workspace = true description = "ProSA utility to package and deliver a builded ProSA" homepage.workspace = true diff --git a/deny.toml b/deny.toml index 1e7295a..94d8294 100644 --- a/deny.toml +++ b/deny.toml @@ -11,6 +11,7 @@ allow = [ "MIT", "BSD-2-Clause", "BSD-3-Clause", + "CC0-1.0", "CDLA-Permissive-2.0", "ISC", "Zlib", diff --git a/prosa/Cargo.toml b/prosa/Cargo.toml index 69251d4..a4a7e35 100644 --- a/prosa/Cargo.toml +++ b/prosa/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "prosa" -version = "0.4.4" +version = "0.5.0" authors.workspace = true description = "ProSA core" homepage.workspace = true diff --git a/prosa/examples/my_prosa_settings.yml b/prosa/examples/my_prosa_settings.yml index 6e2607a..ff920bf 100644 --- a/prosa/examples/my_prosa_settings.yml +++ b/prosa/examples/my_prosa_settings.yml @@ -10,6 +10,7 @@ observability: level: debug stub_proc: + adaptor_config_path: examples/stub_async_parot_adaptor.yml service_names: - STUB_TEST diff --git a/prosa/examples/proc.rs b/prosa/examples/proc.rs index 12bb586..708a167 100644 --- a/prosa/examples/proc.rs +++ b/prosa/examples/proc.rs @@ -3,10 +3,10 @@ use prosa::core::error::ProcError; use prosa::core::main::{MainProc, MainRunnable}; use prosa::core::msg::{InternalMsg, Msg, RequestMsg}; use prosa::core::proc::{Proc, ProcBusParam, ProcConfig, proc}; -use prosa::core::settings::{ProsaConfig, Settings, settings}; use prosa::core::settings::tracing::TelemetryFilter; +use prosa::core::settings::{ProsaConfig, Settings, settings}; use prosa::event::pending::PendingMsgs; -use prosa::stub::adaptor::StubParotAdaptor; +use prosa::stub::adaptor::StubAsyncParotAdaptor; use prosa::stub::proc::{StubProc, StubSettings}; use prosa::tracing::{debug, info, warn}; use prosa_macros::proc_settings; @@ -64,7 +64,7 @@ where Some(msg) = self.internal_rx_queue.recv() => { match msg { InternalMsg::Request(msg) => { - info!("Proc {} receive a request: {:?}", self.get_proc_id(), msg); + info!("Proc {} received a request: {:?}", self.get_proc_id(), msg); // Push in the pending message pending_msgs.push(msg, Duration::from_millis(200)); @@ -72,11 +72,11 @@ where }, InternalMsg::Response(msg) => { let _enter = msg.enter_span(); - info!("Proc {} receive a response: {:?}", self.get_proc_id(), msg); + info!("Proc {} received a response: {:?}", self.get_proc_id(), msg); }, InternalMsg::Error(err) => { let _enter = err.enter_span(); - info!("Proc {} receive an error: {:?}", self.get_proc_id(), err); + info!("Proc {} received an error: {:?}", self.get_proc_id(), err); }, InternalMsg::Config(config) => { let settings = config.get_proc::(self.proc.as_ref())?; @@ -103,7 +103,7 @@ where }, InternalMsg::Shutdown => { adaptor.terminate(); - warn!("The processor will shut down"); + warn!("The processor is shutting down"); }, } }, @@ -116,12 +116,12 @@ where let stub_service_name = String::from("STUB_TEST"); if let Some(service) = self.service.get_proc_service(&stub_service_name) { - debug!("The service is find: {:?}", service); + debug!("Service found: {:?}", service); let _ = service.proc_queue.send(InternalMsg::Request(RequestMsg::new(stub_service_name, tvf.clone(), self.proc.get_service_queue()))).await; } if let Some(service) = self.service.get_proc_service(&self.settings.service_name) { - debug!("The service is find: {:?}", service); + debug!("Service found: {:?}", service); let _ = service.proc_queue.send(InternalMsg::Request(RequestMsg::new(self.settings.service_name.clone(), tvf, self.proc.get_service_queue()))).await; } }, @@ -202,7 +202,7 @@ async fn main() -> Result<(), Box> { bus.clone(), my_settings.stub_proc.clone(), ); - Proc::::run(stub_proc)?; + Proc::::run(stub_proc)?; // Launch the test processor let proc = MyProcClass::::create( diff --git a/prosa/examples/stub_async_parot_adaptor.yml b/prosa/examples/stub_async_parot_adaptor.yml new file mode 100644 index 0000000..f9b1e46 --- /dev/null +++ b/prosa/examples/stub_async_parot_adaptor.yml @@ -0,0 +1 @@ +sleep_ms: 100 diff --git a/prosa/src/core/adaptor.rs b/prosa/src/core/adaptor.rs index c190843..28cc399 100644 --- a/prosa/src/core/adaptor.rs +++ b/prosa/src/core/adaptor.rs @@ -55,8 +55,38 @@ pub use prosa_macros::Adaptor; /// } /// ``` pub trait Adaptor { + /// Method call when the adaptor configuration is loaded or reloaded. + /// + /// Adaptors that need a dedicated configuration file can deserialize this + /// config and update their internal state. The default implementation keeps + /// existing adaptors compatible by ignoring the configuration. + /// + /// Processor implementations should call this from their loop when they + /// receive [`InternalMsg::Config`](crate::core::msg::InternalMsg::Config): + /// + /// ```rust,ignore + /// InternalMsg::Config(config) => { + /// self.settings = config.get_proc(self.proc.as_ref())?; + /// adaptor.reload_config(config.get_adaptor_config(self.proc.as_ref()))?; + /// } + /// ``` + fn reload_config(&self, _config: Option<&config::Config>) -> Result<(), config::ConfigError> { + Ok(()) + } + /// Method call when the ProSA need to shut down. /// This method is call only once so the processing will be thread safe. + /// + /// Processor implementations should call this from their loop when they + /// receive [`InternalMsg::Shutdown`](crate::core::msg::InternalMsg::Shutdown): + /// + /// ```rust,ignore + /// InternalMsg::Shutdown => { + /// adaptor.terminate(); + /// self.proc.remove_proc(None).await?; + /// return Ok(()); + /// } + /// ``` fn terminate(&self); } diff --git a/prosa/src/core/main.rs b/prosa/src/core/main.rs index bf809fd..4a35b4a 100644 --- a/prosa/src/core/main.rs +++ b/prosa/src/core/main.rs @@ -320,7 +320,8 @@ where } /// Method to notify all processor that the service table have changed - async fn notify_srv_proc_queue(&self) -> Result<(), BusError> { + async fn notify_srv_proc_queue(&self) -> Vec { + let mut errors = Vec::new(); for proc in self.processors.values() { for proc_service in proc.values() { if let Err(e) = proc_service @@ -328,8 +329,7 @@ where .send(InternalMsg::Service(self.services.clone())) .await { - // FIXME match the error. If it's a capacity error, don't drop the processor do something else - return Err(BusError::ProcComm( + errors.push(BusError::ProcComm( proc_service.get_proc_id(), proc_service.get_queue_id(), e.to_string(), @@ -338,26 +338,23 @@ where } } - Ok(()) + errors } /// Method to notify processors whose configuration has changed - async fn notify_config_proc_queue(&mut self, config: Arc) -> Result<(), BusError> { + async fn notify_config_proc_queue(&mut self, config: Arc) -> Vec { let current = self.config.as_deref(); + let mut errors = Vec::new(); for proc in self.processors.values() { for proc_service in proc.values() { - if current.is_some_and(|current_config| { - !current_config.has_proc_changed(&config, &proc_service.get_proc_config_key()) - }) { - continue; - } - - if let Err(e) = proc_service + if current.is_none_or(|current_config| { + current_config.has_proc_changed(&config, &proc_service.get_proc_config_key()) + }) && let Err(e) = proc_service .proc_queue .send(InternalMsg::Config(config.clone())) .await { - return Err(BusError::ProcComm( + errors.push(BusError::ProcComm( proc_service.get_proc_id(), proc_service.get_queue_id(), e.to_string(), @@ -368,7 +365,7 @@ where self.config = Some(config); - Ok(()) + errors } /// Method to notify one processor queue with the current configuration @@ -394,18 +391,17 @@ where } /// Method to notify all processor that the service table have changed - async fn notify_srv_proc(&mut self) -> bool { - if let Err(BusError::ProcComm(proc_id, queue_id, _)) = self.notify_srv_proc_queue().await { + async fn notify_srv_proc(&mut self) { + for error in self.notify_srv_proc_queue().await { // The processor doesn't exist anymore so remove it - if queue_id > 0 { - self.remove_proc_queue(proc_id, queue_id).await; - } else { - self.remove_proc(proc_id).await; + if let BusError::ProcComm(proc_id, queue_id, _) = error { + if queue_id > 0 { + self.remove_proc_queue(proc_id, queue_id).await; + } else { + warn!("Processor {proc_id} stopped during service table reload"); + self.remove_proc(proc_id).await; + } } - - false - } else { - true } } @@ -416,7 +412,10 @@ where for proc in self.processors.values() { for proc_service in proc.values() { if let Err(e) = proc_service.proc_queue.send(InternalMsg::Shutdown).await { - debug!("The {:?} seems already stopped: {}", proc_service, e); + debug!( + "Processor service {:?} seems to have already stopped: {}", + proc_service, e + ); } else { is_stopped = false; } @@ -520,15 +519,6 @@ where .with_description("Processors declared to the main task") .build(); - /// Macro to notify processors for a change about service list - macro_rules! prosa_main_update_srv { - ( ) => { - if !self.notify_srv_proc().await { - self.notify_srv_proc().await; - } - }; - } - /// Macro to record a change to the processors macro_rules! prosa_main_record_proc { ( ) => { @@ -600,6 +590,7 @@ where if let Some(proc_service) = self.processors.get_mut(&proc_id) { let _ = proc_service.remove(&queue_id); } else { + warn!("Processor {proc_id} stopped while loading the service table"); let _ = self.processors.remove(&proc_id); } } @@ -608,6 +599,7 @@ where if let Some(proc_service) = self.processors.get_mut(&proc_id) { let _ = proc_service.remove(&queue_id); } else { + warn!("Processor {proc_id} stopped while loading configuration"); let _ = self.processors.remove(&proc_id); } } @@ -616,7 +608,7 @@ where }, InternalMainMsg::DeleteProc(proc_id, proc_err) => { if self.remove_proc(proc_id).await.is_some() { - prosa_main_update_srv!(); + self.notify_srv_proc().await; } if let Some(err) = proc_err { @@ -635,7 +627,7 @@ where }, InternalMainMsg::DeleteProcQueue(proc_id, queue_id) => { if self.remove_proc_queue(proc_id, queue_id).await.is_some() { - prosa_main_update_srv!(); + self.notify_srv_proc().await; } prosa_main_record_proc!(); @@ -650,7 +642,7 @@ where } self.services = Arc::new(new_services); let _ = service_update.send(self.services.clone()); - prosa_main_update_srv!(); + self.notify_srv_proc().await; } }, InternalMainMsg::NewService(names, proc_id, queue_id) => { @@ -661,7 +653,7 @@ where } self.services = Arc::new(new_services); let _ = service_update.send(self.services.clone()); - prosa_main_update_srv!(); + self.notify_srv_proc().await; } }, InternalMainMsg::DeleteProcService(names, proc_id) => { @@ -671,7 +663,7 @@ where } self.services = Arc::new(new_services); let _ = service_update.send(self.services.clone()); - prosa_main_update_srv!(); + self.notify_srv_proc().await; }, InternalMainMsg::DeleteService(names, proc_id, queue_id) => { let mut new_services = (*self.services).clone(); @@ -680,20 +672,23 @@ where } self.services = Arc::new(new_services); let _ = service_update.send(self.services.clone()); - prosa_main_update_srv!(); + self.notify_srv_proc().await; }, InternalMainMsg::Config(config) => { - info!("Reload ProSA configuration"); - if let Err(BusError::ProcComm(proc_id, queue_id, _)) = self.notify_config_proc_queue(config).await { - if queue_id > 0 { - self.remove_proc_queue(proc_id, queue_id).await; - } else { - self.remove_proc(proc_id).await; + info!("Reloading ProSA configuration"); + for error in self.notify_config_proc_queue(config).await { + if let BusError::ProcComm(proc_id, queue_id, _) = error { + if queue_id > 0 { + self.remove_proc_queue(proc_id, queue_id).await; + } else { + warn!("Processor {proc_id} stopped during configuration reload"); + self.remove_proc(proc_id).await; + } } } }, InternalMainMsg::Shutdown(reason) => { - warn!("ProSA need to stop: {}", reason); + warn!("ProSA is stopping: {}", reason); self.stop().await; // The shutdown mecanism will be implemented later @@ -702,7 +697,7 @@ where } }, _ = signal::ctrl_c() => { - warn!("ProSA need to stop"); + warn!("ProSA is stopping"); self.stop().await; // The shutdown mecanism will be implemented later diff --git a/prosa/src/core/msg.rs b/prosa/src/core/msg.rs index a6525bd..f6b3afe 100644 --- a/prosa/src/core/msg.rs +++ b/prosa/src/core/msg.rs @@ -14,6 +14,10 @@ use tracing::{Level, Span, event, info_span, span}; /// Expose Tvf trait pub use prosa_utils::msg::tvf::Tvf; +// re-export types used by TVF trait +pub use prosa_utils::msg::bytes; +pub use prosa_utils::msg::chrono; + /// Internal ProSA message that define all message type that can be received by the main ProSA processor #[derive(Debug)] pub enum InternalMainMsg diff --git a/prosa/src/core/proc.rs b/prosa/src/core/proc.rs index 63de268..b1bc55f 100644 --- a/prosa/src/core/proc.rs +++ b/prosa/src/core/proc.rs @@ -95,7 +95,7 @@ //! { //! async fn internal_run(&mut self) -> Result<(), Box> { //! // Initiate an adaptor -//! let mut adaptor = A::new(self)?; +//! let adaptor = A::new(self)?; //! //! // Declare the processor //! self.proc.add_proc().await?; @@ -116,7 +116,8 @@ //! // TODO process the error //! }, //! InternalMsg::Config(config) => { -//! self.settings = config.get_proc(&self.proc)?; +//! self.settings = config.get_proc(self.proc.as_ref())?; +//! adaptor.reload_config(config.get_adaptor_config(self.proc.as_ref()))?; //! }, //! InternalMsg::Service(table) => self.service = table, //! InternalMsg::Shutdown => { @@ -138,13 +139,9 @@ use super::{ msg::{InternalMsg, Tvf}, service::ProcService, }; -use config::{Config, ConfigError, File}; -use glob::glob; +use config::ConfigError; use log::{error, info, warn}; -use std::borrow::Cow; -use std::fmt::Debug; -use std::io; -use std::time::Duration; +use std::{borrow::Cow, fmt::Debug, io, time::Duration}; use tokio::sync::mpsc; use tokio::time::sleep; use tokio::{runtime, spawn}; @@ -190,19 +187,8 @@ pub trait ProcSettings { C: serde::de::Deserialize<'static>, { if let Some(config_path) = &self.get_adaptor_config_path() { - Config::builder() - .add_source( - glob(config_path) - .map_err(|e| { - ConfigError::Message(format!( - "Wrong adaptor config path pattern `{config_path}`: `{e}`" - )) - })? - .filter_map(|path| path.ok().map(File::from)) - .collect::>(), - ) - .build()? - .try_deserialize() + let (config, _) = crate::core::settings::ProsaConfig::load_adaptor_config(config_path)?; + config.try_deserialize() } else { Err(ConfigError::NotFound( "No configuration set for processor's adaptor".to_string(), @@ -480,7 +466,7 @@ where macro_rules! proc_run { ( $self:ident ) => { info!( - "Run processor[{}] {} on {} threads", + "Running processor[{}] {} on {} threads", $self.get_proc_id(), $self.name(), $self.get_proc_threads() @@ -502,7 +488,7 @@ macro_rules! proc_run { // Log and restart if needed if proc_err.recoverable() { warn!( - "Processor[{}] {} encounter an error `{}`. Will restart after {}ms", + "Processor[{}] {} encountered an error `{}`. Restarting after {}ms", $self.get_proc_id(), $self.name(), proc_err, @@ -515,7 +501,7 @@ macro_rules! proc_run { } } else { error!( - "Processor[{}] {} encounter a fatal error `{}`", + "Processor[{}] {} encountered a fatal error `{}`", $self.get_proc_id(), $self.name(), proc_err diff --git a/prosa/src/core/settings.rs b/prosa/src/core/settings.rs index c7587a1..4c0e2a6 100644 --- a/prosa/src/core/settings.rs +++ b/prosa/src/core/settings.rs @@ -4,14 +4,16 @@ //! use std::{ + collections::{HashMap, HashSet}, ffi::OsStr, fs, future::Future, io::{self, Write}, - path::Path, + path::{Path, PathBuf}, }; -use config::{Config, ConfigBuilder, ValueKind, builder::DefaultState}; +use config::{Config, ConfigBuilder, File, ValueKind, builder::DefaultState}; +use glob::glob; use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher}; use prosa_utils::config::observability::Observability; use serde::Serialize; @@ -130,25 +132,29 @@ pub trait Settings: Serialize { /// - a folder with multiple configuration files in it /// - a file with the entire configuration in it pub fn get_config_builder(path: &str) -> io::Result> { - let mut builder = Config::builder(); + add_config_path(Config::builder(), Path::new(path)) +} - let mut path_attr = std::fs::metadata(path)?; - if path_attr.is_symlink() { - path_attr = std::fs::metadata(fs::read_link(path)?)?; - } +fn add_config_path( + mut builder: ConfigBuilder, + path: &Path, +) -> io::Result> { + let path_attr = fs::metadata(path)?; if path_attr.is_file() { - Ok(builder.add_source(config::File::with_name(path))) + Ok(builder.add_source(File::from(path.to_path_buf()))) } else if path_attr.is_dir() { - for entry in fs::read_dir(path)? { - let path_subdir = entry?.path(); - if path_subdir.is_file() + for path_subdir in sorted_dir_entries(path)? { + let path_attr = fs::metadata(&path_subdir)?; + if path_attr.is_dir() { + builder = add_config_path(builder, &path_subdir)?; + } else if path_attr.is_file() && path_subdir .extension() .and_then(OsStr::to_str) .is_some_and(|ext| matches!(ext, "yml" | "yaml" | "toml")) { - builder = builder.add_source(config::File::from(path_subdir)); + builder = builder.add_source(File::from(path_subdir)); } } @@ -156,26 +162,83 @@ pub fn get_config_builder(path: &str) -> io::Result> } else { Err(io::Error::new( io::ErrorKind::Unsupported, - format!("Unrecognize filetype for path `{path}`"), + format!("Unrecognize filetype for path `{}`", path.display()), )) } } +fn sorted_dir_entries(path: &Path) -> io::Result> { + let mut paths = fs::read_dir(path)? + .map(|entry| entry.map(|entry| entry.path())) + .collect::>>()?; + paths.sort(); + Ok(paths) +} + +fn sorted_paths(paths: HashSet) -> Vec { + let mut paths = paths.into_iter().collect::>(); + paths.sort(); + paths +} + +fn config_watch_paths(path: &Path) -> Vec { + let mut paths = Vec::new(); + + if path.is_dir() || path.is_file() { + paths.push(path.to_path_buf()); + } + + if path.is_file() + && let Some(parent) = path.parent().filter(|parent| parent.exists()) + { + paths.push(parent.to_path_buf()); + } + + paths +} + +fn fallback_watch_paths(config_path: &str) -> Vec { + let fallback = if has_glob_pattern(config_path) { + let mut parent = PathBuf::new(); + + for component in Path::new(config_path).components() { + let component = component.as_os_str(); + if component.to_str().is_some_and(has_glob_pattern) { + break; + } + parent.push(component); + } + + if parent.as_os_str().is_empty() { + Some(PathBuf::from(".")) + } else if parent.exists() && parent.is_file() { + parent.parent().map(Path::to_path_buf) + } else { + Some(parent) + } + } else { + Path::new(config_path).parent().map(Path::to_path_buf) + }; + + fallback.filter(|path| path.exists()).into_iter().collect() +} + +fn has_glob_pattern(config_path: &str) -> bool { + config_path.chars().any(|c| matches!(c, '*' | '?' | '[')) +} + /// Loaded ProSA configuration. #[derive(Clone, Debug)] pub struct ProsaConfig { config: Config, + adaptor_configs: HashMap, + adaptor_config_watch_paths: Vec, } impl ProsaConfig { - /// Create a ProSA configuration wrapper from a loaded [`Config`]. - pub fn new(config: Config) -> Self { - Self { config } - } - /// Load a ProSA configuration from a file or directory path. pub fn from_path(config_path: &str) -> Result { - get_config_builder(config_path) + let config = get_config_builder(config_path) .map_err(|e| config::ConfigError::Foreign(Box::new(e)))? .add_source( config::Environment::with_prefix("PROSA") @@ -183,8 +246,56 @@ impl ProsaConfig { .separator("_") .list_separator(" "), ) - .build() - .map(Self::new) + .build()?; + + Self::from_config(config) + } + + /// Create a ProSA configuration wrapper and load all processor adaptor configs. + pub fn from_config(config: Config) -> Result { + let mut adaptor_configs = HashMap::new(); + let mut adaptor_config_watch_paths = HashSet::new(); + + for (proc_config_key, config_path) in get_proc_adaptor_config_paths(&config) { + let (adaptor_config, watch_paths) = Self::load_adaptor_config(&config_path)?; + adaptor_configs.insert(proc_config_key, adaptor_config); + adaptor_config_watch_paths.extend(watch_paths); + } + + Ok(Self { + config, + adaptor_configs, + adaptor_config_watch_paths: sorted_paths(adaptor_config_watch_paths), + }) + } + + /// Load an adaptor config path or glob pattern and return its watcher paths. + pub(crate) fn load_adaptor_config( + config_path: &str, + ) -> Result<(Config, Vec), config::ConfigError> { + let mut builder = Config::builder(); + let mut watch_paths = HashSet::new(); + let mut matched = false; + + for path in glob(config_path) + .map_err(|e| { + config::ConfigError::Message(format!( + "Wrong config path pattern `{config_path}`: `{e}`" + )) + })? + .filter_map(Result::ok) + { + matched = true; + watch_paths.extend(config_watch_paths(&path)); + builder = add_config_path(builder, &path) + .map_err(|e| config::ConfigError::Foreign(Box::new(e)))?; + } + + if !matched { + watch_paths.extend(fallback_watch_paths(config_path)); + } + + Ok((builder.build()?, sorted_paths(watch_paths))) } /// Access the underlying loaded configuration. @@ -208,27 +319,85 @@ impl ProsaConfig { self.config.get::(&proc.get_proc_config_key()) } - /// Check if this configuration differs from another loaded configuration. - pub fn has_changed(&self, new: &Self) -> bool { - self.config.cache != new.config.cache + /// Access a processor adaptor configuration from its processor name. + pub fn get_adaptor_config(&self, proc: &impl ProcBusParam) -> Option<&Config> { + self.adaptor_configs.get(&proc.get_proc_config_key()) + } + + /// Return every configuration path watched to maintain this configuration. + pub fn watch_paths(&self, config_path: &str) -> Vec { + let mut watch_paths = config_watch_paths(Path::new(config_path)) + .into_iter() + .collect::>(); + + watch_paths.extend(self.adaptor_config_watch_paths.iter().cloned()); + + sorted_paths(watch_paths) } /// Check if one processor configuration differs from another loaded configuration. pub fn has_proc_changed(&self, new: &Self, proc_config_key: &str) -> bool { - if let (ValueKind::Table(current_table), ValueKind::Table(new_table)) = - (&self.config.cache.kind, &new.config.cache.kind) - { - current_table.get(proc_config_key) != new_table.get(proc_config_key) - } else { - false - } + let proc_config_changed = + if let (ValueKind::Table(current_table), ValueKind::Table(new_table)) = + (&self.config.cache.kind, &new.config.cache.kind) + { + current_table.get(proc_config_key) != new_table.get(proc_config_key) + } else { + false + }; + + proc_config_changed + || self + .adaptor_configs + .get(proc_config_key) + .map(|config| &config.cache) + != new + .adaptor_configs + .get(proc_config_key) + .map(|config| &config.cache) + } +} + +impl PartialEq for ProsaConfig { + fn eq(&self, other: &Self) -> bool { + self.config.cache == other.config.cache + && self.adaptor_configs.len() == other.adaptor_configs.len() + && self + .adaptor_configs + .iter() + .all(|(proc_config_key, current_config)| { + other + .adaptor_configs + .get(proc_config_key) + .is_some_and(|new_config| current_config.cache == new_config.cache) + }) } } -impl From for ProsaConfig { - fn from(config: Config) -> Self { - Self::new(config) +impl Eq for ProsaConfig {} + +fn get_proc_adaptor_config_paths(config: &Config) -> HashMap { + let mut adaptor_config_paths = HashMap::new(); + + if let ValueKind::Table(config_table) = &config.cache.kind { + for (proc_config_key, proc_config) in config_table { + if let ValueKind::Table(proc_config_table) = &proc_config.kind + && let Some(adaptor_config_path) = proc_config_table + .get("adaptor_config_path") + .and_then(|value| { + if let ValueKind::String(path) = &value.kind { + Some(path) + } else { + None + } + }) + { + adaptor_config_paths.insert(proc_config_key.clone(), adaptor_config_path.clone()); + } + } } + + adaptor_config_paths } impl From for Config { @@ -239,8 +408,9 @@ impl From for Config { /// Watches a configuration path and exposes native file change events. pub struct ConfigWatcher { - _watcher: RecommendedWatcher, + watcher: RecommendedWatcher, events: mpsc::UnboundedReceiver>, + watched_paths: HashSet, } impl ConfigWatcher { @@ -248,20 +418,57 @@ impl ConfigWatcher { pub async fn changed(&mut self) -> Option> { self.events.recv().await } + + /// Replace the set of paths watched for configuration changes. + pub fn set_paths(&mut self, paths: Vec) -> notify::Result<()> { + let paths = paths.into_iter().collect::>(); + + for path in self.watched_paths.difference(&paths) { + if let Err(err) = self.watcher.unwatch(path) { + log::warn!( + "Can't stop watching configuration {}: {err}", + path.display() + ); + } + } + + for path in paths.difference(&self.watched_paths) { + let recursive_mode = if path.is_dir() { + RecursiveMode::Recursive + } else { + RecursiveMode::NonRecursive + }; + self.watcher.watch(path, recursive_mode)?; + } + + self.watched_paths = paths; + + Ok(()) + } } -/// Create a watcher for the configuration file or directory. -pub fn watch_config_path(config_path: &str) -> notify::Result { +/// Create a watcher for multiple configuration files or directories. +pub fn watch_config_paths(paths: Vec) -> notify::Result { let (tx, events) = mpsc::unbounded_channel(); let mut watcher = notify::recommended_watcher(move |event| { let _ = tx.send(event); })?; - watcher.watch(Path::new(config_path), RecursiveMode::NonRecursive)?; + let mut watched_paths = HashSet::new(); + for path in paths { + let recursive_mode = if path.is_dir() { + RecursiveMode::Recursive + } else { + RecursiveMode::NonRecursive + }; + watcher.watch(&path, recursive_mode)?; + watched_paths.insert(path); + } Ok(ConfigWatcher { - _watcher: watcher, + watcher, events, + watched_paths, }) } @@ -285,7 +492,7 @@ pub async fn watch_config_reload( ApplyConfig: FnMut(S, ProsaConfig) -> ApplyFuture, ApplyFuture: Future, { - let mut config_watcher = match watch_config_path(&config_path) { + let mut config_watcher = match watch_config_paths(current_config.watch_paths(&config_path)) { Ok(config_watcher) => config_watcher, Err(err) => { log::warn!("Can't watch configuration {config_path}: {err}"); @@ -308,10 +515,15 @@ pub async fn watch_config_reload( } match load_config(&config_path) { - Ok(new_config) if current_config.has_changed(&new_config) => { + Ok(new_config) if current_config != new_config => { match new_config.try_deserialize::() { Ok(settings) => { if apply_config(settings, new_config.clone()).await { + if let Err(err) = + config_watcher.set_paths(new_config.watch_paths(&config_path)) + { + log::warn!("Can't update watched configuration paths: {err}"); + } current_config = new_config; } } @@ -330,6 +542,7 @@ pub async fn watch_config_reload( mod tests { use super::*; use prosa_macros::settings; + use std::time::{SystemTime, UNIX_EPOCH}; extern crate self as prosa; @@ -364,26 +577,96 @@ mod tests { #[test] fn test_proc_config_hash_change() -> Result<(), config::ConfigError> { - let current = ProsaConfig::new( + let current = ProsaConfig::from_config( Config::builder() .set_override("proc_1.service_name", "PROC_TEST")? .set_override("proc_1.tick_secs", 4)? .set_override("proc_2.service_name", "PROC_TEST_2")? .set_override("proc_2.tick_secs", 4)? .build()?, - ); - let new = ProsaConfig::new( + )?; + let new = ProsaConfig::from_config( Config::builder() .set_override("proc_1.service_name", "PROC_TEST_UPDATED")? .set_override("proc_1.tick_secs", 4)? .set_override("proc_2.service_name", "PROC_TEST_2")? .set_override("proc_2.tick_secs", 4)? .build()?, - ); + )?; assert!(current.has_proc_changed(&new, "proc_1")); assert!(!current.has_proc_changed(&new, "proc_2")); Ok(()) } + + #[test] + fn test_config_folder_is_loaded_recursively() -> Result<(), Box> { + let config_path = unique_test_dir("prosa-recursive-config"); + let nested_path = config_path.join("nested"); + fs::create_dir_all(&nested_path)?; + fs::write( + config_path.join("main.yml"), + "proc_1:\n service_name: PROC_TEST\n", + )?; + fs::write( + nested_path.join("override.yml"), + "proc_1:\n tick_secs: 4\n", + )?; + + let config = ProsaConfig::from_path(config_path.to_str().ok_or("invalid temp path")?)?; + + assert_eq!( + "PROC_TEST", + config.config().get_string("proc_1.service_name")? + ); + assert_eq!(4, config.config().get_int("proc_1.tick_secs")?); + + fs::remove_dir_all(config_path)?; + + Ok(()) + } + + #[test] + fn test_adaptor_config_change_is_proc_change() -> Result<(), Box> { + let config_path = unique_test_dir("prosa-adaptor-config"); + fs::create_dir_all(&config_path)?; + let adaptor_config_path = config_path.join("adaptor.yml"); + fs::write(&adaptor_config_path, "sleep_ms: 100\n")?; + fs::write( + config_path.join("main.yml"), + format!( + "proc_1:\n service_name: PROC_TEST\n adaptor_config_path: {}\n", + adaptor_config_path.display() + ), + )?; + + let current = ProsaConfig::from_path(config_path.to_str().ok_or("invalid temp path")?)?; + assert_eq!( + 100, + current + .adaptor_configs + .get("proc_1") + .ok_or("missing adaptor config")? + .get_int("sleep_ms")? + ); + + fs::write(&adaptor_config_path, "sleep_ms: 200\n")?; + let new = ProsaConfig::from_path(config_path.to_str().ok_or("invalid temp path")?)?; + + assert_ne!(current, new); + assert!(current.has_proc_changed(&new, "proc_1")); + + fs::remove_dir_all(config_path)?; + + Ok(()) + } + + fn unique_test_dir(prefix: &str) -> PathBuf { + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_nanos(); + std::env::temp_dir().join(format!("{prefix}-{}-{timestamp}", std::process::id())) + } } diff --git a/prosa/src/inj/proc.rs b/prosa/src/inj/proc.rs index e56283c..8ce9d91 100644 --- a/prosa/src/inj/proc.rs +++ b/prosa/src/inj/proc.rs @@ -1,7 +1,7 @@ use std::time::Duration; use crate::otel::{KeyValue, metrics::Histogram}; -use crate::tracing::debug; +use crate::tracing::{debug, warn}; use prosa_macros::{proc, proc_settings}; use serde::{Deserialize, Serialize}; @@ -198,7 +198,24 @@ impl InjProc { let _ = next_transaction.get_or_insert(adaptor.build_transaction()); } InternalMsg::Config(config) => { - let settings = config.get_proc::(self.proc.as_ref())?; + let settings = match config.get_proc::(self.proc.as_ref()) { + Ok(settings) => settings, + Err(err) => { + warn!("Can't reload settings for processor {}: {err}", self.name()); + return Ok(()); + } + }; + + if let Err(err) = + adaptor.reload_config(config.get_adaptor_config(self.proc.as_ref())) + { + warn!( + "Can't reload adaptor configuration for processor {}: {err}", + self.name() + ); + return Ok(()); + } + *regulator = settings.get_regulator(); self.settings = settings; } diff --git a/prosa/src/stub/adaptor.rs b/prosa/src/stub/adaptor.rs index 0aabb08..524141f 100644 --- a/prosa/src/stub/adaptor.rs +++ b/prosa/src/stub/adaptor.rs @@ -4,13 +4,38 @@ use crate::{ adaptor::{Adaptor, MaybeAsync}, error::ProcError, msg::Tvf, - proc::ProcConfig, + proc::{ProcConfig, ProcSettings}, service::ServiceError, }, maybe_async, }; extern crate self as prosa; use crate::otel::metrics::Meter; +use serde::Deserialize; +use std::{ + sync::atomic::{AtomicU64, Ordering}, + time::Duration, +}; + +const DEFAULT_STUB_ASYNC_PAROT_SLEEP_MS: u64 = 100; + +fn default_stub_async_parot_sleep_ms() -> u64 { + DEFAULT_STUB_ASYNC_PAROT_SLEEP_MS +} + +#[derive(Debug, Deserialize)] +struct StubAsyncParotConfig { + #[serde(default = "default_stub_async_parot_sleep_ms", alias = "sleep_millis")] + sleep_ms: u64, +} + +impl Default for StubAsyncParotConfig { + fn default() -> Self { + StubAsyncParotConfig { + sleep_ms: DEFAULT_STUB_ASYNC_PAROT_SLEEP_MS, + } + } +} /// Adaptator trait for the stub processor /// @@ -146,9 +171,28 @@ where } } -/// Parot adaptor for the stub processor. Use to respond to a request with the same message -#[derive(Adaptor)] -pub struct StubAsyncParotAdaptor {} +/// Parot adaptor for the stub processor. Use to respond asynchronously after a configurable delay. +/// +/// The adaptor configuration accepts `sleep_ms` (or `sleep_millis`) and defaults to 100ms. +pub struct StubAsyncParotAdaptor { + sleep_ms: AtomicU64, +} + +impl Adaptor for StubAsyncParotAdaptor { + fn reload_config(&self, config: Option<&config::Config>) -> Result<(), config::ConfigError> { + let config = if let Some(config) = config { + config.clone().try_deserialize::()? + } else { + StubAsyncParotConfig { + sleep_ms: DEFAULT_STUB_ASYNC_PAROT_SLEEP_MS, + } + }; + self.sleep_ms.store(config.sleep_ms, Ordering::Relaxed); + Ok(()) + } + + fn terminate(&self) {} +} impl StubAdaptor for StubAsyncParotAdaptor where @@ -161,8 +205,22 @@ where + Tvf + std::default::Default, { - fn new(_proc: &StubProc) -> Result> { - Ok(Self {}) + fn new(proc: &StubProc) -> Result> { + let sleep_ms = match proc.settings.get_adaptor_config::() { + Ok(config) => config.sleep_ms, + Err(err) => { + if proc.settings.get_adaptor_config_path().is_some() { + log::warn!( + "Can't load StubAsyncParotAdaptor configuration: {err}. Using default sleep of {DEFAULT_STUB_ASYNC_PAROT_SLEEP_MS}ms" + ); + } + DEFAULT_STUB_ASYNC_PAROT_SLEEP_MS + } + }; + + Ok(Self { + sleep_ms: AtomicU64::new(sleep_ms), + }) } fn process_request( @@ -170,9 +228,31 @@ where _service_name: &str, request: M, ) -> MaybeAsync> { + let sleep_duration = Duration::from_millis(self.sleep_ms.load(Ordering::Relaxed)); maybe_async!(async move { - tokio::time::sleep(std::time::Duration::from_millis(100)).await; + tokio::time::sleep(sleep_duration).await; Ok(request) }) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn stub_async_parot_reload_config_updates_sleep() -> Result<(), config::ConfigError> { + let adaptor = StubAsyncParotAdaptor { + sleep_ms: AtomicU64::new(DEFAULT_STUB_ASYNC_PAROT_SLEEP_MS), + }; + let config = config::Config::builder() + .set_override("sleep_ms", 25_u64)? + .build()?; + + adaptor.reload_config(Some(&config))?; + + assert_eq!(25, adaptor.sleep_ms.load(Ordering::Relaxed)); + + Ok(()) + } +} diff --git a/prosa/src/stub/proc.rs b/prosa/src/stub/proc.rs index 8a3bc6f..8e09832 100644 --- a/prosa/src/stub/proc.rs +++ b/prosa/src/stub/proc.rs @@ -1,6 +1,6 @@ use std::{collections::HashSet, sync::Arc}; -use crate::tracing::debug; +use crate::tracing::{debug, info, warn}; use prosa_macros::proc_settings; use serde::{Deserialize, Serialize}; @@ -118,7 +118,23 @@ where err ), InternalMsg::Config(config) => { - let settings = config.get_proc::(self.proc.as_ref())?; + let settings = match config.get_proc::(self.proc.as_ref()) { + Ok(settings) => settings, + Err(err) => { + warn!("Can't reload settings for processor {}: {err}", self.name()); + continue; + } + }; + + if let Err(err) = + adaptor.reload_config(config.get_adaptor_config(self.proc.as_ref())) + { + warn!( + "Can't reload adaptor configuration for processor {}: {err}", + self.name() + ); + continue; + } let current_services = self.settings.service_names.iter().collect::>(); @@ -140,6 +156,11 @@ where self.proc.add_service_proc(services_to_add).await?; } + info!( + "{} reloaded settings for services: {}", + self.name(), + settings.service_names.join(", ") + ); self.settings = settings; } InternalMsg::Service(table) => self.service = table, diff --git a/prosa_book/src/ch03-01-settings.md b/prosa_book/src/ch03-01-settings.md index b2a5b6f..931e81c 100644 --- a/prosa_book/src/ch03-01-settings.md +++ b/prosa_book/src/ch03-01-settings.md @@ -16,9 +16,10 @@ let config = ProsaConfig::from_path("prosa.yml")?; let settings = config.try_deserialize::()?; ``` -`ProsaConfig::from_path()` uses the same configuration loading rules as ProSA itself: the path can be a single configuration file or a directory containing `yml`, `yaml`, or `toml` files, and `PROSA_*` environment variables are applied on top of the file sources. +`ProsaConfig::from_path()` uses the same configuration loading rules as ProSA itself: the path can be a single configuration file or a directory recursively containing `yml`, `yaml`, or `toml` files, and `PROSA_*` environment variables are applied on top of the file sources. When the main task notifies a processor about a configuration change, the message contains the same `ProsaConfig` wrapper. Processors should reload their own section with `config.get_proc(self.proc.as_ref())?`. +If the processor section has `adaptor_config_path`, `ProsaConfig` also loads that adaptor configuration and watches it as part of the global configuration reload flow. ## Creation diff --git a/prosa_book/src/ch03-09-builtin.md b/prosa_book/src/ch03-09-builtin.md index 17ba230..62175d1 100644 --- a/prosa_book/src/ch03-09-builtin.md +++ b/prosa_book/src/ch03-09-builtin.md @@ -175,12 +175,25 @@ fn process_request(&self, _service_name: &str, request: M) -> MaybeAsync MaybeAsync> { + let sleep_duration = std::time::Duration::from_millis( + self.sleep_ms.load(std::sync::atomic::Ordering::Relaxed), + ); maybe_async!(async move { - tokio::time::sleep(std::time::Duration::from_millis(100)).await; + tokio::time::sleep(sleep_duration).await; Ok(request) }) } diff --git a/prosa_utils/Cargo.toml b/prosa_utils/Cargo.toml index 6238b87..b4210d5 100644 --- a/prosa_utils/Cargo.toml +++ b/prosa_utils/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "prosa-utils" -version = "0.4.3" +version = "0.5.0" authors.workspace = true description = "ProSA utils" homepage.workspace = true diff --git a/prosa_utils/src/config/tracing.rs b/prosa_utils/src/config/tracing.rs index d5f9f57..27a65ed 100644 --- a/prosa_utils/src/config/tracing.rs +++ b/prosa_utils/src/config/tracing.rs @@ -187,9 +187,15 @@ impl TelemetryFilter { /// Method to update the dynamic default telemetry level pub fn set_level(&self, level: filter::LevelFilter) { + let mut level_changed = false; if let Ok(mut inner) = self.inner.write() { + level_changed = inner.level != level; inner.level = level; } + + if level_changed { + tracing_core::callsite::rebuild_interest_cache(); + } } /// Getter of the current dynamic default telemetry level From 53ff45440813f09ee8b2d86beeed9848bb450844 Mon Sep 17 00:00:00 2001 From: Jeremy HERGAULT Date: Sat, 27 Jun 2026 18:59:20 +0200 Subject: [PATCH 3/3] feat: update all version for 0.5.0 release Signed-off-by: Jeremy HERGAULT --- Cargo.toml | 2 +- prosa_macros/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 7b2375d..feda398 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,7 +24,7 @@ include = [ [workspace.dependencies] prosa-utils = { version = "0.5.0", path = "prosa_utils" } -prosa-macros = { version = "0.4.2", path = "prosa_macros" } +prosa-macros = { version = "0.5.0", path = "prosa_macros" } thiserror = "2" simple-mermaid = "0.2" bytes = "1" diff --git a/prosa_macros/Cargo.toml b/prosa_macros/Cargo.toml index b94abea..f0f8b05 100644 --- a/prosa_macros/Cargo.toml +++ b/prosa_macros/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "prosa-macros" -version = "0.4.2" +version = "0.5.0" authors.workspace = true description = "ProSA macros" homepage.workspace = true