Skip to content

Commit 4f82a76

Browse files
Simplify IOController pool identity and address PR review feedback.
Move pool constexprs onto IOController, raise poll to NORMAL priority, colocate inline bump reactions with their handlers, inline the poll loop, and bring Windows onto the same scheduler-driven model. Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent fd91a44 commit 4f82a76

5 files changed

Lines changed: 95 additions & 140 deletions

File tree

docs/explanation/scheduler.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,14 @@ Persistent pools (`ThreadPoolDescriptor::persistent`) continue accepting tasks d
277277

278278
### Blocking pools and priority ordering
279279

280-
Some extensions use a single-consumer pool with a blocking wait in the worker thread (for example, IOController on POSIX blocks in `::poll()`). Priority does not preempt inside the blocking call; instead, cross-thread control work is enqueued at HIGH priority on the same pool, a notify pipe wakes the poll thread, the LOW poll task finishes its iteration and resubmits, and the worker dequeues the already-queued HIGH task before the resubmitted poll. Control handlers that mutate poll state from another thread pair with separate default-pool bump reactions registered after the HIGH handlers so the bump runs from a thread that is not blocked in `::poll()`.
280+
Some extensions use a single-consumer pool with a blocking wait in the worker thread.
281+
For example, IOController blocks in `::poll()` or `WSAWaitForMultipleEvents()`.
282+
283+
Priority does not preempt inside the blocking call.
284+
Instead, cross-thread control work is enqueued at HIGH priority on the same pool.
285+
An inline bump reaction writes the notify pipe or event from the emitting thread.
286+
The poll task finishes its iteration and resubmits at NORMAL priority.
287+
The worker then dequeues the already-queued HIGH task before the resubmitted poll.
281288

282289
## Design tradeoffs
283290

docs/reference/extensions/built-in-extensions.md

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -95,12 +95,13 @@ Monitors file descriptors for IO readiness events and triggers corresponding rea
9595

9696
**Implementation:**
9797

98-
**Implementation:**
99-
100-
Uses platform-native polling mechanisms:
98+
Uses platform-native polling mechanisms on a dedicated `IOController` pool (`concurrency = 1`, MPSC):
10199

102-
- **POSIX (Linux/macOS):** `poll()` with `pollfd` arrays, driven by a dedicated `IOController` pool (`concurrency = 1`, MPSC). A LOW-priority poll task blocks in `::poll()` and resubmits itself after each iteration. Control handlers (`IOConfiguration`, `IOFinished`, `Unbind`, `Shutdown`) run at HIGH priority on the same pool; separate default-pool bump reactions wake the poll thread via a notify pipe once the HIGH task is queued.
103-
- **Windows:** `WSAWaitForMultipleEvents` with `WSAEVENT` handles, still using `on<Always>` (scheduler-driven parity deferred).
100+
- **POSIX (Linux/macOS):** `poll()` with `pollfd` arrays.
101+
A NORMAL-priority poll task blocks in `::poll()` and resubmits itself after each iteration.
102+
Control handlers (`IOConfiguration`, `IOFinished`, `Unbind`, `Shutdown`) run at HIGH priority on the same pool.
103+
Each control handler is followed by an `Inline::ALWAYS` bump reaction that writes the notify pipe from the emitting thread.
104+
- **Windows:** `WSAWaitForMultipleEvents` with `WSAEVENT` handles, using the same scheduler-driven pool and priority model.
104105

105106
When events are detected on registered file descriptors, the controller creates tasks for the corresponding reactions, passing the event flags through `ThreadStore` so the `get` method can report which specific events occurred.
106107

src/extension/IOController.hpp

Lines changed: 7 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,6 @@
3232
namespace NUClear {
3333
namespace extension {
3434

35-
/// Dedicated single-consumer pool for IOController scheduler-driven polling.
36-
namespace io_pool {
37-
struct IO {
38-
static constexpr const char* name = "IOController";
39-
static constexpr int concurrency = 1;
40-
static constexpr bool counts_for_idle = false;
41-
};
42-
} // namespace io_pool
43-
4435
class IOController : public Reactor {
4536
public:
4637
struct Task;
@@ -104,6 +95,13 @@ namespace extension {
10495
};
10596

10697
private:
98+
/// Dedicated single-consumer pool for scheduler-driven IO polling.
99+
struct IOPool {
100+
static constexpr const char* name = "IOController";
101+
static constexpr int concurrency = 1;
102+
static constexpr bool counts_for_idle = false;
103+
};
104+
107105
/**
108106
* Rebuilds the list of file descriptors to poll.
109107
*
@@ -133,20 +131,6 @@ namespace extension {
133131
// NOLINTNEXTLINE(readability-make-member-function-const) this changes states
134132
void bump();
135133

136-
/**
137-
* Runs one poll iteration on the IO pool and reschedules the poll task when running.
138-
*
139-
* The poll loop is a LOW-priority task on a dedicated single-consumer pool. After each
140-
* iteration it resubmits at LOW; cross-thread control work is queued at HIGH on the same
141-
* pool before a separate bump reaction wakes ::poll(), so the worker dequeues HIGH first.
142-
*/
143-
void poll_iteration();
144-
145-
/**
146-
* Resubmit the poll reaction to the scheduler (same pool, LOW priority).
147-
*/
148-
void reschedule_poll();
149-
150134
public:
151135
explicit IOController(std::unique_ptr<NUClear::Environment> environment);
152136

src/extension/IOController_Posix.ipp

Lines changed: 41 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
#include "IOController.hpp"
2424

25+
#include "../dsl/word/Inline.hpp"
2526
#include "../dsl/word/Pool.hpp"
2627
#include "../dsl/word/Priority.hpp"
2728
#include "../dsl/word/Startup.hpp"
@@ -30,8 +31,6 @@
3031
namespace NUClear {
3132
namespace extension {
3233

33-
using IOPool = dsl::word::Pool<io_pool::IO>;
34-
3534
namespace {
3635

3736
/**
@@ -97,52 +96,6 @@ namespace extension {
9796

9897
} // namespace
9998

100-
void IOController::reschedule_poll() {
101-
if (!running.load(std::memory_order_acquire)) {
102-
return;
103-
}
104-
if (const auto* current = threading::ReactionTask::get_current_task()) {
105-
powerplant.submit(current->parent->get_task());
106-
}
107-
}
108-
109-
void IOController::poll_iteration() {
110-
// Rebuild the list if something changed
111-
if (dirty.load(std::memory_order_acquire)) {
112-
rebuild_list();
113-
}
114-
115-
// Wait for an event to happen on one of our file descriptors
116-
bool polled = false;
117-
/* mutex scope */ {
118-
const NotifierPollScope poll(notifier);
119-
if (!poll.wake_pending()) {
120-
if (::poll(watches.data(), nfds_t(watches.size()), -1) < 0) {
121-
throw std::system_error(network_errno,
122-
std::system_category(),
123-
"There was an IO error while attempting to poll the file descriptors");
124-
}
125-
polled = true;
126-
}
127-
}
128-
129-
if (polled) {
130-
// Get the lock so we don't concurrently modify the list
131-
const std::lock_guard<std::mutex> lock(tasks_mutex);
132-
for (auto& fd : watches) {
133-
134-
// Collect the events that happened into the tasks list
135-
// Something happened
136-
if (fd.revents != 0) {
137-
process_event(fd);
138-
}
139-
}
140-
}
141-
142-
// Yield to the scheduler. Any HIGH control tasks queued before the bump wake are dequeued first.
143-
reschedule_poll();
144-
}
145-
14699
void IOController::rebuild_list() {
147100
// Get the lock so we don't concurrently modify the list
148101
const std::lock_guard<std::mutex> lock(tasks_mutex);
@@ -286,11 +239,7 @@ namespace extension {
286239
// Start by rebuilding the list
287240
rebuild_list();
288241

289-
// Control handlers run on the IO pool at HIGH priority. Separate bump reactions (default pool,
290-
// registered after the HIGH handlers) wake ::poll() once the HIGH task is already queued.
291-
// TypeCallbackStore iteration follows registration order, so HIGH is enqueued before bump.
292-
293-
on<Trigger<dsl::word::IOConfiguration>, IOPool, Priority::HIGH>().then(
242+
on<Trigger<dsl::word::IOConfiguration>, Pool<IOPool>, Priority::HIGH>().then(
294243
"Configure IO Reaction",
295244
[this](const dsl::word::IOConfiguration& config) {
296245
const std::lock_guard<std::mutex> lock(tasks_mutex);
@@ -301,8 +250,9 @@ namespace extension {
301250
std::sort(tasks.begin(), tasks.end());
302251
dirty.store(true, std::memory_order_release);
303252
});
253+
on<Trigger<dsl::word::IOConfiguration>, Inline::ALWAYS>().then("Configure IO bump", [this] { bump(); });
304254

305-
on<Trigger<dsl::word::IOFinished>, IOPool, Priority::HIGH>().then("IO Finished", [this](const dsl::word::IOFinished& event) {
255+
on<Trigger<dsl::word::IOFinished>, Pool<IOPool>, Priority::HIGH>().then("IO Finished", [this](const dsl::word::IOFinished& event) {
306256
const std::lock_guard<std::mutex> lock(tasks_mutex);
307257

308258
auto task = std::find_if(tasks.begin(), tasks.end(), [&event](const Task& t) {
@@ -315,7 +265,6 @@ namespace extension {
315265
tasks.erase(task);
316266
}
317267
else {
318-
// Mutate watches[] only while poll is out of ::poll() — wake-then-lock on the IO thread.
319268
NotifierWakeGuard wake(notifier);
320269
wake.signal();
321270
const auto notifier_lock = wake.lock();
@@ -333,8 +282,9 @@ namespace extension {
333282
}
334283
}
335284
});
285+
on<Trigger<dsl::word::IOFinished>, Inline::ALWAYS>().then("IO Finished bump", [this] { bump(); });
336286

337-
on<Trigger<dsl::operation::Unbind<IO>>, IOPool, Priority::HIGH>().then(
287+
on<Trigger<dsl::operation::Unbind<IO>>, Pool<IOPool>, Priority::HIGH>().then(
338288
"Unbind IO Reaction",
339289
[this](const dsl::operation::Unbind<IO>& unbind) {
340290
const std::lock_guard<std::mutex> lock(tasks_mutex);
@@ -349,18 +299,46 @@ namespace extension {
349299

350300
dirty.store(true, std::memory_order_release);
351301
});
302+
on<Trigger<dsl::operation::Unbind<IO>>, Inline::ALWAYS>().then("Unbind IO bump", [this] { bump(); });
352303

353-
on<Shutdown, IOPool, Priority::HIGH>().then("Shutdown IO Controller", [this] {
304+
on<Shutdown, Pool<IOPool>, Priority::HIGH>().then("Shutdown IO Controller", [this] {
354305
running.store(false, std::memory_order_release);
355306
});
307+
on<Shutdown, Inline::ALWAYS>().then("Shutdown IO bump", [this] { bump(); });
356308

357-
on<Trigger<dsl::word::IOConfiguration>>().then("Configure IO bump", [this] { bump(); });
358-
on<Trigger<dsl::word::IOFinished>>().then("IO Finished bump", [this] { bump(); });
359-
on<Trigger<dsl::operation::Unbind<IO>>>().then("Unbind IO bump", [this] { bump(); });
360-
on<Shutdown>().then("Shutdown IO bump", [this] { bump(); });
309+
on<Startup, Pool<IOPool>, Priority::NORMAL>().then("IO Poll", [this] {
310+
if (!running.load(std::memory_order_acquire)) {
311+
return;
312+
}
361313

362-
// Scheduler-driven poll: dedicated single-consumer IO pool, LOW priority, self-resubmitting.
363-
on<Startup, IOPool, Priority::LOW>().then("IO Poll", [this] { poll_iteration(); });
314+
if (dirty.load(std::memory_order_acquire)) {
315+
rebuild_list();
316+
}
317+
318+
bool polled = false;
319+
{
320+
const NotifierPollScope poll(notifier);
321+
if (!poll.wake_pending()) {
322+
if (::poll(watches.data(), nfds_t(watches.size()), -1) < 0) {
323+
throw std::system_error(network_errno,
324+
std::system_category(),
325+
"There was an IO error while attempting to poll the file descriptors");
326+
}
327+
polled = true;
328+
}
329+
}
330+
331+
if (polled) {
332+
const std::lock_guard<std::mutex> lock(tasks_mutex);
333+
for (auto& fd : watches) {
334+
if (fd.revents != 0) {
335+
process_event(fd);
336+
}
337+
}
338+
}
339+
340+
powerplant.submit(threading::ReactionTask::get_current_task()->parent->get_task());
341+
});
364342
}
365343

366344
} // namespace extension

0 commit comments

Comments
 (0)