Skip to content

Commit 3d2ebfd

Browse files
Replace atomic shared_ptr cache with atomic raw pointer; fix clang-tidy
The Reaction::scheduler_data cache previously held an std::shared_ptr<void> read/written via std::atomic_load/atomic_store. On libstdc++ those fall back to a small global pool of mutexes (selected by pointer hash), which becomes a contention point on hot submission paths. Change scheduler_data to std::atomic<void*>{nullptr}. Pools live for the lifetime of the Scheduler and the PowerPlant tears reactors down before the scheduler, so a non-owning raw pointer is safe. Group::try_submit and WaitEntry are switched to Pool* accordingly, and the Scheduler field declaration order is changed so that pools outlive groups on destruction. Also fix the clang-tidy errors that were blocking the lint job: switch the queue Slot/Block backing storage to std::array (avoid-c-arrays, member init), explicit-base BLOCK_SIZE, do-while -> while, use auto with new, RunningLock special members, Semaphore destructor, missing direct includes, unused-and-kept includes, and a couple of small test cleanups (reserve before emplace, explicit lvalue MPSCQueue enqueue overload to work around an MSVC overload-resolution quirk on int(i)). Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent c03ffd2 commit 3d2ebfd

14 files changed

Lines changed: 128 additions & 68 deletions

File tree

src/threading/Reaction.hpp

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,19 @@ namespace threading {
135135
/// The callback generator function (creates databound callbacks)
136136
TaskGenerator generator;
137137

138-
/// Cached data for this reaction added by the scheduler
139-
std::shared_ptr<void> scheduler_data;
138+
/// Cached scheduler-private pointer for this reaction.
139+
///
140+
/// The scheduler uses this as a fast-path cache for the resolved pool that this reaction's
141+
/// tasks should run on. It is a raw, non-owning `void*` rather than `std::shared_ptr<void>`
142+
/// to avoid the per-submit cost of `std::atomic_load`/`atomic_store` on a `shared_ptr`,
143+
/// which on libstdc++ falls back to a small global pool of mutexes (selected by pointer
144+
/// hash) and can become a contention point on hot submission paths.
145+
///
146+
/// Ownership of whatever this points at lives entirely with the scheduler; reactions
147+
/// outlive scheduler-side resources because PowerPlant tears reactors down before the
148+
/// scheduler. The cache is set-once: the first submit resolves the pool and CASes it in,
149+
/// subsequent submits just load it.
150+
std::atomic<void*> scheduler_data{nullptr};
140151
friend class scheduler::Scheduler; /// Let the scheduler mess with reaction objects
141152
};
142153

src/threading/scheduler/Group.cpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
#include "Group.hpp"
2323

2424
#include <algorithm>
25+
#include <atomic>
26+
#include <cstddef>
2527
#include <functional>
2628
#include <memory>
2729
#include <mutex>
@@ -31,7 +33,9 @@
3133
#include "../../id.hpp"
3234
#include "../../util/GroupDescriptor.hpp"
3335
#include "../ReactionTask.hpp"
36+
#include "Lock.hpp"
3437
#include "Pool.hpp"
38+
#include "queue/Priority.hpp"
3539

3640
namespace NUClear {
3741
namespace threading {
@@ -165,9 +169,7 @@ namespace threading {
165169
return nullptr;
166170
}
167171

168-
bool Group::try_submit(std::unique_ptr<ReactionTask>&& task,
169-
const std::shared_ptr<Pool>& pool,
170-
const bool& clear_idle) {
172+
bool Group::try_submit(std::unique_ptr<ReactionTask>&& task, Pool* pool, const bool& clear_idle) {
171173
// Don't jump ahead of multi-group waiters; if any exist, queue ourselves.
172174
if (slow_pending.load(std::memory_order_acquire) == 0) {
173175
int expected = tokens.load(std::memory_order_acquire);
@@ -240,7 +242,7 @@ namespace threading {
240242
WaitEntry entry;
241243
for (std::size_t bucket = 0; bucket < queue::PRIORITY_BUCKETS; ++bucket) {
242244
if (wait_buckets[bucket].try_dequeue(entry)) {
243-
auto pool = entry.pool;
245+
Pool* pool = entry.pool;
244246
pool->submit({std::move(entry.task), make_running_lock()}, entry.clear_idle, /*force=*/true);
245247
pool->unregister_external_waiter();
246248
return true;

src/threading/scheduler/Group.hpp

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,10 @@ namespace threading {
5757
private:
5858
struct WaitEntry {
5959
std::unique_ptr<ReactionTask> task;
60-
std::shared_ptr<Pool> pool;
60+
/// Non-owning pointer; Pools live for the lifetime of the Scheduler and the
61+
/// Scheduler tears down Groups before Pools, so it is always safe to dereference
62+
/// while this WaitEntry is reachable.
63+
Pool* pool{nullptr};
6164
bool clear_idle{false};
6265
};
6366

@@ -111,6 +114,11 @@ namespace threading {
111114
RunningLock(Group& group, std::shared_ptr<Group> group_keepalive);
112115
~RunningLock() override;
113116

117+
RunningLock(const RunningLock&) = delete;
118+
RunningLock(RunningLock&&) = delete;
119+
RunningLock& operator=(const RunningLock&) = delete;
120+
RunningLock& operator=(RunningLock&&) = delete;
121+
114122
bool lock() override;
115123

116124
private:
@@ -196,14 +204,12 @@ namespace threading {
196204
* Otherwise the task is queued until a token is released.
197205
*
198206
* @param task the reaction task to submit
199-
* @param pool the pool to submit to when runnable
207+
* @param pool the pool to submit to when runnable (non-owning; must outlive the call)
200208
* @param clear_idle if true, clear idle state on submission
201209
*
202210
* @return true if the task was submitted immediately
203211
*/
204-
bool try_submit(std::unique_ptr<ReactionTask>&& task,
205-
const std::shared_ptr<Pool>& pool,
206-
const bool& clear_idle);
212+
bool try_submit(std::unique_ptr<ReactionTask>&& task, Pool* pool, const bool& clear_idle);
207213

208214
/**
209215
* This function will create a new lock for the task and return it.

src/threading/scheduler/Pool.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
#include "Pool.hpp"
2323

2424
#include <algorithm>
25+
#include <atomic>
26+
#include <cstddef>
2527
#include <memory>
2628
#include <mutex>
2729
#include <set>
@@ -31,11 +33,15 @@
3133

3234
#include "../../dsl/word/MainThread.hpp"
3335
#include "../../dsl/word/Pool.hpp"
36+
#include "../../id.hpp"
3437
#include "../../threading/Reaction.hpp"
3538
#include "../../util/Inline.hpp"
3639
#include "../ReactionTask.hpp"
3740
#include "CountingLock.hpp"
3841
#include "Scheduler.hpp"
42+
#include "queue/MPSCQueue.hpp"
43+
#include "queue/Priority.hpp"
44+
#include "queue/TaskQueue.hpp"
3945

4046
namespace NUClear {
4147
namespace threading {

src/threading/scheduler/Scheduler.cpp

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ namespace threading {
162162
std::unique_ptr<Lock> Scheduler::get_groups_lock(
163163
const NUClear::id_t& task_id,
164164
const int& priority,
165-
const std::shared_ptr<Pool>& pool,
165+
Pool* pool,
166166
const std::set<std::shared_ptr<const util::GroupDescriptor>>& descs) {
167167

168168
// No groups
@@ -188,28 +188,32 @@ namespace threading {
188188
return;
189189
}
190190

191-
// If we have run this task before, we know which pool it should be submitted to and cached it
192-
// on the parent reaction. This avoids every submit having to lock a mutex to find the pool.
191+
// Resolve the Pool for this task.
193192
//
194-
// The cache is read/written from any thread that submits a task for this reaction, so we use
195-
// std::atomic_load/store on the shared_ptr to avoid a data race. The cache lookup is benign
196-
// even under contention: the worst case is two submitters racing both compute the same pool
197-
// pointer and store it; the resulting pool is identical so a "last writer wins" is fine.
198-
std::shared_ptr<Pool> pool;
193+
// The first submit for a reaction does a mutex-protected `get_pool()` lookup; the
194+
// resulting pointer is then cached on the parent Reaction so subsequent submits skip
195+
// the mutex entirely.
196+
//
197+
// The cache is a single `std::atomic<void*>` (see Reaction::scheduler_data). We
198+
// deliberately avoid `std::atomic_load`/`atomic_store` on a `std::shared_ptr<void>`:
199+
// on libstdc++ those fall back to a small global pool of mutexes (~8 chosen by
200+
// pointer hash) and become a contention point on hot submission paths. Pools live
201+
// for the lifetime of the Scheduler (and the Scheduler tears down reactions before
202+
// its own pools), so a non-owning raw pointer is safe.
203+
//
204+
// The cache update is benign-racing: two submitters that miss simultaneously will
205+
// both call `get_pool()` and store the same pointer; last writer wins, identical
206+
// value.
207+
Pool* pool = nullptr;
199208
if (task->parent) {
200-
auto cached = std::atomic_load_explicit(&task->parent->scheduler_data, std::memory_order_acquire);
201-
if (cached) {
202-
pool = std::static_pointer_cast<Pool>(cached);
203-
}
204-
else {
205-
pool = get_pool(task->pool_descriptor);
206-
std::atomic_store_explicit(&task->parent->scheduler_data,
207-
std::static_pointer_cast<void>(pool),
208-
std::memory_order_release);
209+
pool = static_cast<Pool*>(task->parent->scheduler_data.load(std::memory_order_acquire));
210+
if (pool == nullptr) {
211+
pool = get_pool(task->pool_descriptor).get();
212+
task->parent->scheduler_data.store(static_cast<void*>(pool), std::memory_order_release);
209213
}
210214
}
211215
else {
212-
pool = get_pool(task->pool_descriptor);
216+
pool = get_pool(task->pool_descriptor).get();
213217
}
214218

215219
const bool current_pool_idle = Pool::current() != nullptr && Pool::current()->is_idle();

src/threading/scheduler/Scheduler.hpp

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ namespace threading {
127127
*/
128128
std::unique_ptr<Lock> get_groups_lock(const NUClear::id_t& task_id,
129129
const int& priority,
130-
const std::shared_ptr<Pool>& pool,
130+
Pool* pool,
131131
const std::set<std::shared_ptr<const util::GroupDescriptor>>& descs);
132132

133133
/// The number of threads that will be in the default thread pool
@@ -136,10 +136,9 @@ namespace threading {
136136
/// If running is false this means the scheduler is shutting down and no new pools will be created
137137
std::atomic<bool> running{true};
138138

139-
/// A mutex for when we are modifying groups
140-
std::mutex groups_mutex;
141-
/// A map of group ids to the number of active tasks currently running in that group
142-
std::map<std::shared_ptr<const util::GroupDescriptor>, std::shared_ptr<Group>> groups;
139+
// NB: `pools` is declared before `groups` so that on Scheduler destruction the groups
140+
// (which may hold non-owning Pool* in their waiter buckets) are destroyed first, then
141+
// the pools. This keeps the raw pointers in WaitEntry safe-by-construction.
143142

144143
/// A mutex for when we are modifying pools
145144
std::mutex pools_mutex;
@@ -149,6 +148,11 @@ namespace threading {
149148
/// once start is called future pools will be started immediately
150149
std::atomic<bool> started{false};
151150

151+
/// A mutex for when we are modifying groups
152+
std::mutex groups_mutex;
153+
/// A map of group ids to the number of active tasks currently running in that group
154+
std::map<std::shared_ptr<const util::GroupDescriptor>, std::shared_ptr<Group>> groups;
155+
152156
/// A mutex to protect the idle tasks list
153157
std::mutex idle_mutex;
154158
/// A list of idle tasks to execute when all pools are idle

src/threading/scheduler/queue/MPSCQueue.hpp

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#define NUCLEAR_THREADING_SCHEDULER_QUEUE_MPSC_QUEUE_HPP
2424

2525
#include <algorithm>
26+
#include <array>
2627
#include <atomic>
2728
#include <cstddef>
2829
#include <new>
@@ -55,15 +56,17 @@ namespace threading {
5556
static_assert(std::is_move_constructible<T>::value, "MPSCQueue requires move constructible T");
5657

5758
private:
58-
enum { BLOCK_SIZE = 64 };
59+
static constexpr std::size_t BLOCK_SIZE = 64;
5960

6061
struct Slot {
6162
std::atomic<bool> committed{false};
62-
alignas(T) unsigned char storage[sizeof(T)];
63+
/// Raw aligned storage for the T payload. Left value-initialised (zeroed) so the
64+
/// constructor fully covers all members; placement-new overwrites it on enqueue.
65+
alignas(T) std::array<unsigned char, sizeof(T)> storage{};
6366
};
6467

6568
struct Block {
66-
Slot slots[BLOCK_SIZE];
69+
std::array<Slot, BLOCK_SIZE> slots{};
6770
/// Producer claim counter, fetched by every enqueuer (atomic, MP-safe).
6871
std::atomic<std::size_t> write{0};
6972
/// Consumer read counter, only touched by the single consumer (non-atomic).
@@ -73,10 +76,10 @@ namespace threading {
7376
};
7477

7578
static T* slot_ptr(Slot& slot) {
76-
return reinterpret_cast<T*>(slot.storage);
79+
return reinterpret_cast<T*>(slot.storage.data());
7780
}
7881

79-
Block* allocate_block() {
82+
static Block* allocate_block() {
8083
return new Block();
8184
}
8285

@@ -87,12 +90,15 @@ namespace threading {
8790
// state the graveyard length is bounded by the peak number of in-flight blocks.
8891
void retire_block(Block* block) {
8992
Block* head_graveyard = graveyard.load(std::memory_order_acquire);
90-
do {
93+
while (true) {
9194
block->graveyard_next = head_graveyard;
92-
} while (!graveyard.compare_exchange_weak(head_graveyard,
93-
block,
94-
std::memory_order_release,
95-
std::memory_order_relaxed));
95+
if (graveyard.compare_exchange_weak(head_graveyard,
96+
block,
97+
std::memory_order_release,
98+
std::memory_order_relaxed)) {
99+
return;
100+
}
101+
}
96102
}
97103

98104
bool link_next_block(Block* block) {
@@ -126,8 +132,8 @@ namespace threading {
126132

127133
public:
128134
MPSCQueue() {
129-
Block* initial = new Block();
130-
head_block = initial;
135+
auto* initial = new Block();
136+
head_block = initial;
131137
tail_block.store(initial, std::memory_order_relaxed);
132138
graveyard.store(nullptr, std::memory_order_relaxed);
133139
}
@@ -153,14 +159,19 @@ namespace threading {
153159
}
154160
}
155161

162+
void enqueue(const T& item) {
163+
T copy(item);
164+
enqueue(std::move(copy));
165+
}
166+
156167
void enqueue(T&& item) override {
157168
while (true) {
158169
Block* block = tail_block.load(std::memory_order_acquire);
159170
const std::size_t index = block->write.fetch_add(1, std::memory_order_relaxed);
160171

161172
if (index < BLOCK_SIZE) {
162173
Slot& slot = block->slots[index];
163-
new (slot.storage) T(std::move(item));
174+
new (slot.storage.data()) T(std::move(item));
164175
slot.committed.store(true, std::memory_order_release);
165176
return;
166177
}

src/threading/scheduler/queue/Semaphore.hpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ namespace threading {
3939
*/
4040
class Semaphore {
4141
public:
42-
Semaphore() = default;
42+
Semaphore() = default;
43+
~Semaphore() = default;
4344

4445
Semaphore(const Semaphore&) = delete;
4546
Semaphore& operator=(const Semaphore&) = delete;
@@ -49,7 +50,7 @@ namespace threading {
4950
void signal(int n = 1) {
5051
const int previous = count.fetch_add(n, std::memory_order_release);
5152
if (previous < 0) {
52-
std::lock_guard<std::mutex> lock(mutex);
53+
const std::lock_guard<std::mutex> lock(mutex);
5354
const int waiters = std::min(n, -previous);
5455
for (int i = 0; i < waiters; ++i) {
5556
cv.notify_one();

0 commit comments

Comments
 (0)