Skip to content

Commit 30ee1cd

Browse files
Lock-free scheduler with per-priority queues (#193)
## Summary Rework `Scheduler` / `Pool` / `Group` to remove the global mutex contention that showed up on the existing implementation under load: - **Per-priority lock-free task queues.** Each `Pool` now owns five independent MPMC `TaskQueue`s (one per priority bucket) instead of a single mutex-protected priority queue. Submitting and dequeuing on different priorities no longer contend at all, and same-priority submits use a block-based lock-free FIFO with a graveyard-style block reclamation scheme to avoid ABA. - **MPSC specialisation for single-consumer pools.** Pools with `concurrency == 1` (e.g. `MainThread`, `TraceController`) now construct `MPSCQueue` instead of `TaskQueue`, giving a non-atomic consumer side and removing the dequeue CAS entirely for those pools. Both queues sit behind a `Queue<T>` polymorphic interface so `Pool` can hold an array of buckets and dispatch the right type at construction. - **Counting-semaphore Group fast path.** Single-`Sync` groups now acquire via a signed `std::atomic<int> tokens` plus a lock-free `TaskQueue` per priority for waiters, falling back to the mutex-backed `GroupLock` only for multi-group submissions. Sync ordering inside a group is strict; equal-priority tasks across pools see relaxed global FIFO. - **Idle bookkeeping off the pool mutex.** Pool idle tracking moved off the main pool mutex so wake-ups no longer block submitters. New lock-free primitives (`TaskQueue`, `MPSCQueue`, `Semaphore`) and the Group counting fast path are covered by Catch2 BDD-style tests, plus a scheduler microbenchmark (`tests/tests/Benchmark.cpp`, hidden behind the `[.benchmark]` tag). ## TSAN race fixes (pre-existing) While validating the changes under TSAN on macOS clang and Linux gcc 13 three pre-existing data races surfaced and are fixed in this PR: - **`IOController_Posix.ipp` — `pollfd::events`.** The `IOFinished` handler mutated `watches[].events` while only holding `tasks_mutex`, and the poll thread read the same field from inside `::poll()` while only holding `notifier.mutex`. `bump()` woke poll but released `notifier.mutex` before the mutation, leaving the race window open. The handler now writes the wake byte inline and holds `notifier.mutex` across the `watches` update and the follow-up `fire_event` call (which can also touch `watches[].events`). - **`Scheduler::submit` — `Reaction::scheduler_data`.** The cached pool was read/written from any submitting thread without synchronisation. It is now a non-owning raw `Pool*` cached in a `std::atomic<void*>` (release store / acquire load) rather than a `std::shared_ptr<void>` accessed via `std::atomic_load`/`atomic_store` — the shared_ptr atomics fall back to a small global mutex pool on libstdc++ and become a contention point on hot submission paths. Pools outlive all reactions (PowerPlant tears reactors down before the scheduler), so a raw pointer is safe, and all racers resolve the same pool, so the benign store is last-writer-wins. - **`Watchdog` data store.** The service `time_point` was read by the chrono controller while being mutated by user threads emitting a service event, and the void specialisation returned a reference through a temporary `shared_ptr` (latent dangling reference). Centralised reads/writes through a per-`(WatchdogGroup, RuntimeType)` `std::mutex`, made `get` return by value, and routed `WatchdogServicer::service` through `WatchdogDataStore::service` so writes share the read mutex. ## Other - `.gitignore` now matches `build-*/` so out-of-tree TSAN/ASAN/Release build directories don't appear in `git status`. - SonarCloud suppressions for the deliberate lock-free / placement-new idioms live in `sonar-project.properties`. ## Test plan - [x] macOS clang TSAN: `dsl/IO`, `dsl/Inline`, `dsl/Watchdog` 30/30 clean each; full suite ✓ - [x] macOS clang Release: full suite ✓ - [x] Linux gcc 13 TSAN (Docker): same three tests 30/30 clean; hot-path tests under repeated runs with no TSAN warnings - [x] Linux gcc 9 Release (Docker): full suite ✓ - [x] Lock-free primitives have BDD-style unit tests (including queue-destructor leak coverage) - [x] Scheduler microbenchmark exercises submit / dequeue / group acquire under contention Made with [Cursor](https://cursor.com) --------- Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent c8a9185 commit 30ee1cd

36 files changed

Lines changed: 3501 additions & 205 deletions

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
# Build & CMake files
1616
build/
17+
build-*/
1718
CMakeCache.txt
1819
CMakeFiles
1920
Makefile

CMakeLists.txt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,11 +81,13 @@ if(CI_BUILD)
8181
endif()
8282
endif(CI_BUILD)
8383

84+
# Tests must be declared before src/ so NUClear can expose test-only APIs when enabled.
85+
option(BUILD_TESTS "Builds all of the NUClear unit tests." ON)
86+
8487
# Add the src directory
8588
add_subdirectory(src)
8689

8790
# Add the tests directory
88-
option(BUILD_TESTS "Builds all of the NUClear unit tests." ON)
8991
if(BUILD_TESTS)
9092
enable_testing()
9193
add_subdirectory(tests)

cmake/TestRunner.cmake

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ foreach(target ${all_targets})
5454
list(APPEND report_outputs ${junit_report_file})
5555
add_custom_command(
5656
OUTPUT ${junit_report_file} ${raw_coverage}
57-
COMMAND ${command_prefix} $<TARGET_FILE:${target}> --reporter console --reporter JUnit::out=${junit_report_file}
57+
COMMAND ${command_prefix} $<TARGET_FILE:${target}> --allow-running-no-tests --reporter console
58+
--reporter JUnit::out=${junit_report_file}
5859
WORKING_DIRECTORY ${PROJECT_BINARY_DIR}
5960
DEPENDS ${target}
6061
USES_TERMINAL

docs/explanation/index.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ If you've already followed the tutorials and know how to use NUClear, this is wh
1111
| --------------------------------- | --------------------------------------------------------------------------------------------- |
1212
| [Architecture](architecture.md) | Why NUClear exists, the problems it solves, and the event-driven reactive pattern at its core |
1313
| [Threading Model](threading.md) | How tasks are scheduled across thread pools, priority queues, and group constraints |
14+
| [Scheduler](scheduler.md) | Internal design of the lock-free scheduler: pools, queues, groups, idle tasks, and shutdown |
1415
| [Lifecycle](lifecycle.md) | The three phases of a NUClear system: initialisation, execution, and shutdown |
1516
| [The DSL System](dsl-system.md) | How `on<>().then()` works from top to bottom — template metaprogramming in action |
1617
| [Message Flow](message-flow.md) | What happens when you emit data, from call site to reaction execution |

docs/explanation/scheduler.md

Lines changed: 297 additions & 0 deletions
Large diffs are not rendered by default.

docs/explanation/threading.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
NUClear's threading model is designed around a simple goal: **you should never have to write a mutex**.
44
The framework handles concurrency for you through immutable messages, thread pools, and a priority-based scheduler.
55

6+
For the internal design of the scheduler (lock-free queues, group tokens, idle detection, shutdown), see [Scheduler](scheduler.md).
7+
68
## Thread Pool Architecture
79

810
NUClear uses multiple thread pools, each serving a different purpose:

mkdocs.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ nav:
181181
- explanation/index.md
182182
- Architecture: explanation/architecture.md
183183
- Threading Model: explanation/threading.md
184+
- Scheduler: explanation/scheduler.md
184185
- Lifecycle: explanation/lifecycle.md
185186
- The DSL System: explanation/dsl-system.md
186187
- Message Flow: explanation/message-flow.md

sonar-project.properties

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# SonarCloud issue suppressions for deliberate lock-free / placement-new code.
2+
# projectKey, organization, sources, tests and coverage settings are passed on
3+
# the scanner CLI in .github/workflows/sonarcloud.yaml; only the ignore rules
4+
# below are configured here.
5+
6+
sonar.issue.ignore.multicriteria=e1,e2,e3,e4,e5
7+
8+
# S8417: explicit memory_order arguments are intentional in this concurrency
9+
# framework; the carefully chosen relaxed/acquire/release/acq_rel orderings are
10+
# required for performance and must not be forced to seq_cst.
11+
sonar.issue.ignore.multicriteria.e1.ruleKey=cpp:S8417
12+
sonar.issue.ignore.multicriteria.e1.resourceKey=src/threading/**
13+
sonar.issue.ignore.multicriteria.e2.ruleKey=cpp:S8417
14+
sonar.issue.ignore.multicriteria.e2.resourceKey=src/extension/**
15+
16+
# S5025 (manual new/delete), S3630 (reinterpret_cast) and S3432 (explicit
17+
# destructor call) are unavoidable in the lock-free queues: manual Block
18+
# lifetime is dictated by the graveyard reclamation scheme and the
19+
# reinterpret_cast + explicit ~T() are the placement-new idiom for the aligned
20+
# slot storage. Scope these to the queue files only.
21+
sonar.issue.ignore.multicriteria.e3.ruleKey=cpp:S5025
22+
sonar.issue.ignore.multicriteria.e3.resourceKey=**/scheduler/queue/*.hpp
23+
sonar.issue.ignore.multicriteria.e4.ruleKey=cpp:S3630
24+
sonar.issue.ignore.multicriteria.e4.resourceKey=**/scheduler/queue/*.hpp
25+
sonar.issue.ignore.multicriteria.e5.ruleKey=cpp:S3432
26+
sonar.issue.ignore.multicriteria.e5.resourceKey=**/scheduler/queue/*.hpp

src/Reactor.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,7 @@ class Reactor {
390390

391391
public:
392392
template <typename... Args>
393-
Binder(Reactor& r, Args&&... args) : reactor(r), args(std::forward<Args>(args)...) {}
393+
Binder(Reactor& r, Args&&... args_) : reactor(r), args(std::forward<Args>(args_)...) {}
394394

395395
template <typename Label, typename Function>
396396
auto then(Label&& label, Function&& callback) {

src/dsl/word/Watchdog.hpp

Lines changed: 62 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#ifndef NUCLEAR_DSL_WORD_WATCHDOG_HPP
2424
#define NUCLEAR_DSL_WORD_WATCHDOG_HPP
2525

26+
#include <mutex>
2627
#include <stdexcept>
2728

2829
#include "../../threading/Reaction.hpp"
@@ -52,12 +53,25 @@ namespace dsl {
5253
using MapType = std::remove_cv_t<RuntimeType>;
5354
using WatchdogStore = util::TypeMap<WatchdogGroup, MapType, std::map<MapType, NUClear::clock::time_point>>;
5455

56+
/**
57+
* Mutex protecting structural and value updates to the underlying map for this
58+
* (WatchdogGroup, RuntimeType) pair. Watchdog timers are read by the chrono controller
59+
* thread (via @ref get) while being written by user threads that emit a service event
60+
* (via @ref service), and the underlying std::map is also mutated by init/unbind, so a
61+
* single shared mutex serialises all of those operations.
62+
*/
63+
static std::mutex& mutex() {
64+
static std::mutex m; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables)
65+
return m;
66+
}
67+
5568
/**
5669
* Ensures the data store is initialised correctly.
5770
*
5871
* @param data The runtime argument for the current watchdog in the WatchdogGroup/RuntimeType group
5972
*/
6073
static void init(const RuntimeType& data) {
74+
const std::lock_guard<std::mutex> lock(mutex());
6175
if (WatchdogStore::get() == nullptr) {
6276
WatchdogStore::set(std::make_shared<std::map<MapType, NUClear::clock::time_point>>());
6377
}
@@ -67,11 +81,15 @@ namespace dsl {
6781
}
6882

6983
/**
70-
* Gets the current service time for the WatchdogGroup/RuntimeType/data watchdog
84+
* Gets the current service time for the WatchdogGroup/RuntimeType/data watchdog.
85+
*
86+
* Returned by value so the caller never holds a reference into the (mutex-protected)
87+
* map. The time_point is small and trivially copyable so the copy is essentially free.
7188
*
7289
* @param data The runtime argument for the current watchdog in the WatchdogGroup/RuntimeType group
7390
*/
74-
static const NUClear::clock::time_point& get(const RuntimeType& data) {
91+
static NUClear::clock::time_point get(const RuntimeType& data) {
92+
const std::lock_guard<std::mutex> lock(mutex());
7593
if (WatchdogStore::get() == nullptr || WatchdogStore::get()->count(data) == 0) {
7694
throw std::domain_error("Store for <" + util::demangle(typeid(WatchdogGroup).name()) + ", "
7795
+ util::demangle(typeid(MapType).name())
@@ -80,12 +98,29 @@ namespace dsl {
8098
return WatchdogStore::get()->at(data);
8199
}
82100

101+
/**
102+
* Atomically updates the service time for the WatchdogGroup/RuntimeType/data watchdog.
103+
*
104+
* Called by @ref emit::WatchdogServicer::service to keep the write under the same
105+
* mutex that @ref get uses for reads.
106+
*/
107+
static void service(const RuntimeType& data, const NUClear::clock::time_point& when) {
108+
const std::lock_guard<std::mutex> lock(mutex());
109+
if (WatchdogStore::get() == nullptr || WatchdogStore::get()->count(data) == 0) {
110+
throw std::domain_error("Store for <" + util::demangle(typeid(WatchdogGroup).name()) + ", "
111+
+ util::demangle(typeid(MapType).name())
112+
+ "> has not been created yet or no watchdog has been set up");
113+
}
114+
WatchdogStore::get()->at(data) = when;
115+
}
116+
83117
/**
84118
* Cleans up any allocated storage for the WatchdogGroup/RuntimeType/data watchdog
85119
*
86120
* @param data The runtime argument for the current watchdog in the WatchdogGroup/RuntimeType group
87121
*/
88122
static void unbind(const RuntimeType& data) {
123+
const std::lock_guard<std::mutex> lock(mutex());
89124
if (WatchdogStore::get() != nullptr) {
90125
WatchdogStore::get()->erase(data);
91126
}
@@ -105,30 +140,54 @@ namespace dsl {
105140
struct WatchdogDataStore<WatchdogGroup, void> {
106141
using WatchdogStore = util::TypeMap<WatchdogGroup, void, NUClear::clock::time_point>;
107142

143+
/// See the documentation on the runtime-arg specialisation.
144+
static std::mutex& mutex() {
145+
static std::mutex m; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables)
146+
return m;
147+
}
148+
108149
/**
109150
* Ensures the data store is initialised correctly.
110151
*/
111152
static void init() {
153+
const std::lock_guard<std::mutex> lock(mutex());
112154
if (WatchdogStore::get() == nullptr) {
113155
WatchdogStore::set(std::make_shared<NUClear::clock::time_point>(NUClear::clock::now()));
114156
}
115157
}
116158

117159
/**
118160
* Gets the current service time for the WatchdogGroup watchdog.
161+
*
162+
* Returned by value so the caller never reads from the time_point while it is being
163+
* mutated by @ref service on another thread.
119164
*/
120-
static const NUClear::clock::time_point& get() {
165+
static NUClear::clock::time_point get() {
166+
const std::lock_guard<std::mutex> lock(mutex());
121167
if (WatchdogStore::get() == nullptr) {
122168
throw std::domain_error("Store for <" + util::demangle(typeid(WatchdogGroup).name())
123169
+ "> is trying to field a service call for an unknown data type");
124170
}
125171
return *WatchdogStore::get();
126172
}
127173

174+
/**
175+
* Atomically updates the service time for the WatchdogGroup watchdog.
176+
*/
177+
static void service(const NUClear::clock::time_point& when) {
178+
const std::lock_guard<std::mutex> lock(mutex());
179+
if (WatchdogStore::get() == nullptr) {
180+
throw std::domain_error("Store for <" + util::demangle(typeid(WatchdogGroup).name())
181+
+ "> has not been created yet or no watchdog has been set up");
182+
}
183+
*WatchdogStore::get() = when;
184+
}
185+
128186
/**
129187
* Cleans up any allocated storage for the WatchdogGroup watchdog.
130188
*/
131189
static void unbind() {
190+
const std::lock_guard<std::mutex> lock(mutex());
132191
if (WatchdogStore::get() != nullptr) {
133192
WatchdogStore::get().reset();
134193
}

0 commit comments

Comments
 (0)