Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ include = [
]

[workspace.dependencies]
prosa-utils = { version = "0.4.3", path = "prosa_utils" }
prosa-macros = { version = "0.4.2", path = "prosa_macros" }
prosa-utils = { version = "0.5.0", path = "prosa_utils" }
prosa-macros = { version = "0.5.0", path = "prosa_macros" }
thiserror = "2"
simple-mermaid = "0.2"
bytes = "1"
Expand Down
2 changes: 1 addition & 1 deletion cargo-prosa/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "cargo-prosa"
version = "0.4.2"
version = "0.5.0"
authors.workspace = true
description = "ProSA utility to package and deliver a builded ProSA"
homepage.workspace = true
Expand Down
20 changes: 7 additions & 13 deletions cargo-prosa/assets/build.rs.j2
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ fn write_settings_rs(out_dir: &OsString, desc: &Desc, metadata: &HashMap<&str, M
if let Some(processors) = &desc.proc {{ '{' }}
writeln!(f, "\n/// ProSA Run settings")?;
writeln!(f, "#[settings]")?;
writeln!(f, "#[derive(Default, Debug, Deserialize, Serialize)]")?;
writeln!(f, "#[derive(Default, Debug, Clone, Deserialize, Serialize)]")?;
writeln!(f, "pub struct RunSettings {{ '{{' }}")?;
for processor in processors {{ '{' }}
let proc_metadata = metadata.get(processor.proc_name.as_str()).unwrap_or_else(|| panic!("Can't get the processor {{ '{}' }} metadata ({{ '{:?}' }})", processor.proc, processor.name));
Expand Down Expand Up @@ -90,18 +90,12 @@ fn write_config_rs(out_dir: &OsString, desc: &Desc, cargo_metadata: &CargoMetada
writeln!(f, " .arg(::clap::arg!(-t --worker_threads <THREADS> \"Number of worker threads to use for the main\").value_parser(clap::value_parser!(u32).range(1..)).default_value(\"1\"))")?;
writeln!(f, "{{ '}}' }}\n")?;

writeln!(f, "fn prosa_config(matches: &::clap::ArgMatches) -> Result<::config::Config, ::config::ConfigError> {{ '{{' }}")?;
writeln!(f, " prosa::core::settings::get_config_builder(")?;
writeln!(f, " matches.get_one::<String>(\"config\").unwrap().as_str(),")?;
writeln!(f, " )")?;
writeln!(f, " .map_err(|e| ::config::ConfigError::Foreign(Box::new(e)))?")?;
writeln!(f, " .add_source(")?;
writeln!(f, " ::config::Environment::with_prefix(\"PROSA\")")?;
writeln!(f, " .try_parsing(true)")?;
writeln!(f, " .separator(\"_\")")?;
writeln!(f, " .list_separator(\" \"),")?;
writeln!(f, " )")?;
writeln!(f, " .build()")?;
writeln!(f, "fn prosa_config(matches: &::clap::ArgMatches) -> Result<::prosa::core::settings::ProsaConfig, ::config::ConfigError> {{ '{{' }}")?;
writeln!(f, " let config_path = matches")?;
writeln!(f, " .get_one::<String>(\"config\")")?;
writeln!(f, " .ok_or_else(|| ::config::ConfigError::NotFound(\"config\".to_string()))?;")?;
writeln!(f, "")?;
writeln!(f, " ::prosa::core::settings::ProsaConfig::from_path(config_path)")?;
writeln!(f, "{{ '}}' }}\n")
{{ '}' }}

Expand Down
40 changes: 39 additions & 1 deletion cargo-prosa/assets/main.rs.j2
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ async fn prosa_main(matches: clap::ArgMatches) -> Result<(), Box<dyn std::error:
{{ '}' }}
{{ '}' }}
{{ '}' }} else {{ '{' }}
let mut prosa_settings = prosa_config(&matches)?.try_deserialize::<RunSettings>()?;
let prosa_config_cache = prosa_config(&matches)?;
let mut prosa_settings = prosa_config_cache.try_deserialize::<RunSettings>()?;

// Provide ProSA name if set in command line
if let Some(name) = matches.get_one::<String>("name") {{ '{' }}
Expand All @@ -69,6 +70,43 @@ async fn prosa_main(matches: clap::ArgMatches) -> Result<(), Box<dyn std::error:
// Create bus and main processor
log::info!("Starting ProSA {} - {}", env!("CARGO_PKG_NAME"), PROSA_VERSIONS);
let (bus, main) = new_main(&prosa_settings);
bus.update_config(std::sync::Arc::new(prosa_config_cache.clone())).await?;

// Watch effective configuration changes and notify processors only when it changes
let reload_bus = bus.clone();
let reload_filter = filter.clone();
let reload_config_path = matches
.get_one::<String>("config")
.ok_or_else(|| ::config::ConfigError::NotFound("config".to_string()))?
.clone();
let reload_name = matches.get_one::<String>("name").cloned();
tokio::spawn(async move {{ '{' }}
prosa::core::settings::watch_config_reload::<RunSettings, _, _, _>(
reload_config_path,
prosa_config_cache,
::prosa::core::settings::ProsaConfig::from_path,
move |mut settings, new_config| {{ '{' }}
let reload_bus = reload_bus.clone();
let reload_filter = reload_filter.clone();
let reload_name = reload_name.clone();

async move {{ '{' }}
if let Some(name) = &reload_name {{ '{' }}
settings.set_prosa_name(name.clone());
{{ '}' }}

reload_filter.set_level(settings.get_observability().get_level().into());
if let Err(err) = reload_bus.update_config(std::sync::Arc::new(new_config)).await {{ '{' }}
log::warn!("Can't notify processors of configuration reload: {{ '{err}' }}");
false
{{ '}' }} else {{ '{' }}
true
{{ '}' }}
{{ '}' }}
{{ '}' }},
)
.await;
{{ '}' }});

// Launch the main task
let main_task = main.run();
Expand Down
1 change: 1 addition & 0 deletions deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ allow = [
"MIT",
"BSD-2-Clause",
"BSD-3-Clause",
"CC0-1.0",
"CDLA-Permissive-2.0",
"ISC",
"Zlib",
Expand Down
3 changes: 2 additions & 1 deletion prosa/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "prosa"
version = "0.4.4"
version = "0.5.0"
authors.workspace = true
description = "ProSA core"
homepage.workspace = true
Expand Down Expand Up @@ -60,6 +60,7 @@ async-http-proxy = { version = "1", optional = true, features = ["runtime-tokio"

serde.workspace = true
config = { version = "0.15", default-features = false, features = ["toml", "yaml", "json", "convert-case"] }
notify = "8"
glob = { version = "0.3" }
toml.workspace = true
serde_yaml = "0.9"
Expand Down
15 changes: 14 additions & 1 deletion prosa/examples/my_prosa_settings.yml
Original file line number Diff line number Diff line change
@@ -1,10 +1,23 @@
name: my-prosa

observability:
level: DEBUG
level: INFO
metrics:
prometheus:
endpoint: 0.0.0.0:9100
traces:
stdout:
level: debug

stub_proc:
adaptor_config_path: examples/stub_async_parot_adaptor.yml
service_names:
- STUB_TEST

proc_1:
service_name: PROC_TEST
tick_secs: 4

proc_2:
service_name: PROC_TEST_2
tick_secs: 4
134 changes: 104 additions & 30 deletions prosa/examples/proc.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,49 @@
use config::Config;
use prosa::core::adaptor::Adaptor;
use prosa::core::error::ProcError;
use prosa::core::main::{MainProc, MainRunnable};
use prosa::core::msg::{InternalMsg, Msg, RequestMsg};
use prosa::core::proc::{Proc, ProcBusParam, ProcConfig, proc};
use prosa::core::settings::Settings;
use prosa::core::settings::settings;
use prosa::core::settings::tracing::TelemetryFilter;
use prosa::core::settings::{ProsaConfig, Settings, settings};
use prosa::event::pending::PendingMsgs;
use prosa::stub::adaptor::StubParotAdaptor;
use prosa::stub::adaptor::StubAsyncParotAdaptor;
use prosa::stub::proc::{StubProc, StubSettings};
use prosa::tracing::{debug, info, warn};
use prosa_macros::proc_settings;
use prosa_utils::msg::simple_string_tvf::SimpleStringTvf;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::Duration;
use tokio::time;
use tracing::metadata::LevelFilter;

#[derive(Default, Adaptor)]
struct MyAdaptor {}

#[proc]
#[proc_settings]
#[derive(Debug, Deserialize, Serialize, Clone)]
struct MyProcSettings {
service_name: String,
tick_secs: u64,
}

impl MyProcSettings {
fn interval(&self) -> time::Interval {
time::interval(time::Duration::from_secs(self.tick_secs.max(1)))
}
}

#[proc_settings]
impl Default for MyProcSettings {
fn default() -> Self {
MyProcSettings {
service_name: String::from("PROC_TEST"),
tick_secs: 4,
}
}
}

#[proc(settings = MyProcSettings)]
struct MyProcClass {}

#[proc]
Expand All @@ -32,39 +55,55 @@ where
let adaptor = A::default();
self.proc.add_proc().await?;
self.proc
.add_service_proc(vec![String::from("PROC_TEST")])
.add_service_proc(vec![self.settings.service_name.clone()])
.await?;
let mut interval = time::interval(time::Duration::from_secs(4));
let mut interval = self.settings.interval();
let mut pending_msgs: PendingMsgs<RequestMsg<M>, M> = Default::default();
loop {
tokio::select! {
Some(msg) = self.internal_rx_queue.recv() => {
match msg {
InternalMsg::Request(msg) => {
info!("Proc {} receive a request: {:?}", self.get_proc_id(), msg);

info!("Proc {} received a request: {:?}", self.get_proc_id(), msg);

// Push in the pending message
pending_msgs.push(msg, Duration::from_millis(200));
//msg.return_to_sender(tvf).await.unwrap();
},
InternalMsg::Response(msg) => {
let _enter = msg.enter_span();
info!("Proc {} receive a response: {:?}", self.get_proc_id(), msg);
info!("Proc {} received a response: {:?}", self.get_proc_id(), msg);
},
InternalMsg::Error(err) => {
let _enter = err.enter_span();
info!("Proc {} receive an error: {:?}", self.get_proc_id(), err);
info!("Proc {} received an error: {:?}", self.get_proc_id(), err);
},
InternalMsg::Config(config) => {
let settings = config.get_proc::<MyProcSettings>(self.proc.as_ref())?;

if self.settings.service_name != settings.service_name {
self.proc
.remove_service_proc(vec![self.settings.service_name.clone()])
.await?;
self.proc
.add_service_proc(vec![settings.service_name.clone()])
.await?;
}

if self.settings.tick_secs != settings.tick_secs {
interval = settings.interval();
}

info!("Proc {} reloaded settings: {:?}", self.get_proc_id(), settings);
self.settings = settings;
},
InternalMsg::Command(_) => todo!(),
InternalMsg::Config => todo!(),
InternalMsg::Service(table) => {
debug!("New service table received:\n{}\n", table);
self.service = table;
},
InternalMsg::Shutdown => {
adaptor.terminate();
warn!("The processor will shut down");
warn!("The processor is shutting down");
},
}
},
Expand All @@ -77,14 +116,13 @@ where

let stub_service_name = String::from("STUB_TEST");
if let Some(service) = self.service.get_proc_service(&stub_service_name) {
debug!("The service is find: {:?}", service);
debug!("Service found: {:?}", service);
let _ = service.proc_queue.send(InternalMsg::Request(RequestMsg::new(stub_service_name, tvf.clone(), self.proc.get_service_queue()))).await;
}

let proc_service_name = String::from("PROC_TEST");
if let Some(service) = self.service.get_proc_service(&proc_service_name) {
debug!("The service is find: {:?}", service);
let _ = service.proc_queue.send(InternalMsg::Request(RequestMsg::new(proc_service_name, tvf, self.proc.get_service_queue()))).await;
if let Some(service) = self.service.get_proc_service(&self.settings.service_name) {
debug!("Service found: {:?}", service);
let _ = service.proc_queue.send(InternalMsg::Request(RequestMsg::new(self.settings.service_name.clone(), tvf, self.proc.get_service_queue()))).await;
}
},
Some(msg) = pending_msgs.pull(), if !pending_msgs.is_empty() => {
Expand All @@ -106,17 +144,18 @@ where
#[settings]
#[derive(Default, Debug, Deserialize, Serialize)]
struct MySettings {
// Can add parameters here
stub_proc: StubSettings,
proc_1: MyProcSettings,
proc_2: MyProcSettings,
}

#[allow(clippy::needless_return)]
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config_path = "examples/my_prosa_settings.yml";

// load the configuration
let config = Config::builder()
.add_source(config::File::with_name("examples/my_prosa_settings.yml"))
.add_source(config::Environment::with_prefix("PROSA"))
.build()?;
let config = ProsaConfig::from_path(config_path)?;

let my_settings = config.try_deserialize::<MySettings>()?;
println!("My ProSA settings: {my_settings:?}");
Expand All @@ -129,26 +168,61 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

// Create bus and main processor
let (bus, main) = MainProc::<SimpleStringTvf>::create(&my_settings, Some(3));
bus.update_config(Arc::new(config.clone())).await?;

let reload_bus = bus.clone();
let reload_filter = telemetry_filter.clone();
tokio::spawn(async move {
prosa::core::settings::watch_config_reload::<MySettings, _, _, _>(
String::from(config_path),
config,
ProsaConfig::from_path,
move |settings, new_config| {
let reload_bus = reload_bus.clone();
let reload_filter = reload_filter.clone();
async move {
reload_filter.set_level(settings.get_observability().get_level().into());

if let Err(err) = reload_bus.update_config(Arc::new(new_config)).await {
warn!("Can't notify processors of configuration reload: {err}");
false
} else {
true
}
}
},
)
.await;
});

// Launch a stub processor
let stub_settings = StubSettings::new(vec![String::from("STUB_TEST")]);
let stub_proc = StubProc::<SimpleStringTvf>::create(
1,
String::from("STUB_PROC"),
String::from("stub_proc"),
bus.clone(),
stub_settings,
my_settings.stub_proc.clone(),
);
Proc::<StubParotAdaptor>::run(stub_proc)?;
Proc::<StubAsyncParotAdaptor>::run(stub_proc)?;

// Launch the test processor
let proc = MyProcClass::<SimpleStringTvf>::create_raw(2, String::from("proc_1"), bus.clone());
let proc = MyProcClass::<SimpleStringTvf>::create(
2,
String::from("proc_1"),
bus.clone(),
my_settings.proc_1.clone(),
);
Proc::<MyAdaptor>::run(proc)?;

// Wait before launch the second processor
std::thread::sleep(time::Duration::from_secs(2));

// Launch the second test processor
let proc2 = MyProcClass::<SimpleStringTvf>::create_raw(3, String::from("proc_2"), bus.clone());
let proc2 = MyProcClass::<SimpleStringTvf>::create(
3,
String::from("proc_2"),
bus.clone(),
my_settings.proc_2.clone(),
);
Proc::<MyAdaptor>::run(proc2)?;

// Wait on main task
Expand Down
1 change: 1 addition & 0 deletions prosa/examples/stub_async_parot_adaptor.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sleep_ms: 100
Loading
Loading