From 5f36ee247ede369d5fab8ca699c1d513bf2a1022 Mon Sep 17 00:00:00 2001 From: Jan-Erik Rediger Date: Tue, 6 May 2025 10:50:15 +0200 Subject: [PATCH] Bug 1839428 - force dispatcher to be stopped, always --- glean-core/rlb/examples/prototype.rs | 6 ++++++ glean-core/src/dispatcher/global.rs | 4 ++++ glean-core/src/dispatcher/mod.rs | 19 +++++++++++++++++++ glean-core/src/lib.rs | 5 +++-- 4 files changed, 32 insertions(+), 2 deletions(-) diff --git a/glean-core/rlb/examples/prototype.rs b/glean-core/rlb/examples/prototype.rs index a74a75ca50..471da9710a 100644 --- a/glean-core/rlb/examples/prototype.rs +++ b/glean-core/rlb/examples/prototype.rs @@ -71,8 +71,14 @@ fn main() { glean::initialize(cfg, client_info); glean_metrics::sample_boolean.set(true); + _ = glean_metrics::sample_boolean.test_get_value(None); PrototypePing.submit(None); + glean_core::dispatcher::launch(|| { + std::thread::sleep(std::time::Duration::from_secs(15)); + }); + glean::shutdown(); + std::thread::sleep(std::time::Duration::from_secs(10)); } diff --git a/glean-core/src/dispatcher/global.rs b/glean-core/src/dispatcher/global.rs index 0c1f2a6fa0..2327f3e1c4 100644 --- a/glean-core/src/dispatcher/global.rs +++ b/glean-core/src/dispatcher/global.rs @@ -116,6 +116,10 @@ pub fn kill() -> Result<(), DispatchError> { join_dispatcher_thread() } +pub fn force_kill() -> Result<(), DispatchError> { + guard().force_kill() +} + /// Shuts down the dispatch queue. /// /// This will initiate a shutdown of the worker thread diff --git a/glean-core/src/dispatcher/mod.rs b/glean-core/src/dispatcher/mod.rs index ead58fb867..3625ebadd6 100644 --- a/glean-core/src/dispatcher/mod.rs +++ b/glean-core/src/dispatcher/mod.rs @@ -109,6 +109,8 @@ struct DispatchGuard { /// Sender for the unbounded queue. sender: Sender, + + running: Arc, } impl DispatchGuard { @@ -196,6 +198,11 @@ impl DispatchGuard { Ok(()) } + fn force_kill(&mut self) -> Result<(), DispatchError> { + self.running.store(false, Ordering::Release); + Ok(()) + } + /// Flushes the pre-init buffer. /// /// This function blocks until tasks queued prior to this call are finished. @@ -264,10 +271,14 @@ impl Dispatcher { let queue_preinit = Arc::new(AtomicBool::new(true)); let overflow_count = Arc::new(AtomicUsize::new(0)); + let running = Arc::new(AtomicBool::new(true)); + let inner_running = running.clone(); let worker = thread::Builder::new() .name("glean.dispatcher".into()) .spawn(move || { + let running = inner_running; + match block_receiver.recv() { Err(_) => { // The other side was disconnected. @@ -288,6 +299,13 @@ impl Dispatcher { loop { use Command::*; + if !running.load(Ordering::Relaxed) { + log::info!( + "Not running anymore. Bailing out with potential tasks pending." + ); + break; + } + match receiver.recv() { Ok(Shutdown) => { break; @@ -333,6 +351,7 @@ impl Dispatcher { block_sender, preinit_sender, sender, + running, }; Dispatcher { diff --git a/glean-core/src/lib.rs b/glean-core/src/lib.rs index d26693d86a..9adef57092 100644 --- a/glean-core/src/lib.rs +++ b/glean-core/src/lib.rs @@ -6,7 +6,7 @@ #![allow(clippy::significant_drop_in_scrutinee)] #![allow(clippy::uninlined_format_args)] #![deny(rustdoc::broken_intra_doc_links)] -#![deny(missing_docs)] +//#![deny(missing_docs)] //! Glean is a modern approach for recording and sending Telemetry data. //! @@ -38,7 +38,7 @@ mod core_metrics; mod coverage; mod database; mod debug; -mod dispatcher; +pub mod dispatcher; mod error; mod error_recording; mod event_database; @@ -706,6 +706,7 @@ pub fn shutdown() { log::error!( "Timeout while blocking on the dispatcher. No further shutdown cleanup will happen." ); + dispatcher::force_kill().unwrap(); return; }