Switch loaders to two-phase init and remove bridge methods#579
Switch loaders to two-phase init and remove bridge methods#579kmontemayor2-sc wants to merge 26 commits intomainfrom
Conversation
…r.py Move create_dist_sampler(), SamplerInput, and SamplerRuntime out of dist_sampling_producer.py into a shared utils module so they can be reused by the upcoming SharedDistSamplingBackend. Also rename `w` -> `worker` in DistSamplingProducer.init() for clarity. Co-Authored-By: Claude Opus 4.6 <[email protected]>
Introduce SharedDistSamplingBackend which manages a pool of worker processes servicing multiple compute-rank channels through a fair-queued round-robin scheduler. This replaces the per-channel producer model in graph-store mode with a shared backend + lightweight per-channel state. Includes tests for pure business logic helpers (_compute_num_batches, _epoch_batch_indices, _compute_worker_seeds_ranges), shuffle behavior, and completion reporting. Co-Authored-By: Claude Opus 4.6 <[email protected]>
Replace the single-step create_sampling_producer with a two-phase API: - init_sampling_backend: creates/reuses a SharedDistSamplingBackend - register_sampling_input: registers a lightweight per-channel input The existing create_sampling_producer/destroy_sampling_producer methods are preserved as bridge methods that delegate to the new API, keeping existing loaders working without changes. Also adds InitSamplingBackendRequest and RegisterBackendRequest message dataclasses, and per-channel fetch stats logging. Co-Authored-By: Claude Opus 4.6 <[email protected]>
Refactor BaseDistLoader to use the two-phase sampling API directly: - init_sampling_backend (shared across all ranks per loader instance) - register_sampling_input (unique per compute rank) Key changes: - Add GroupLeaderInfo, _compute_group_leader, _dispatch_grouped_graph_store_phase for generic leader-elected grouped RPC dispatch - Add _init_graph_store_sampling_backends and _register_graph_store_sampling_inputs - Replace _producer_id_list with _backend_id_list + _channel_id_list - Remove create_sampling_producer/destroy_sampling_producer bridge methods - Keep per-class _counter in each loader (not a global counter) since type-prefixed _backend_key already prevents cross-type collisions - Fix test_multiple_loaders_in_graph_store to use num_compute_nodes=2 so backend-sharing assertions are exercised across ranks Co-Authored-By: Claude Opus 4.6 <[email protected]>
|
/all_test |
GiGL Automation@ 21:22:55UTC : 🔄 @ 22:40:25UTC : ✅ Workflow completed successfully. |
GiGL Automation@ 21:22:57UTC : 🔄 @ 22:32:01UTC : ✅ Workflow completed successfully. |
GiGL Automation@ 21:22:59UTC : 🔄 @ 21:32:06UTC : ✅ Workflow completed successfully. |
GiGL Automation@ 21:22:59UTC : 🔄 @ 22:47:31UTC : ✅ Workflow completed successfully. |
GiGL Automation@ 21:23:00UTC : 🔄 @ 21:31:11UTC : ✅ Workflow completed successfully. |
…r module docstring Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
…sses Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
Co-Authored-By: Claude Opus 4.6 <[email protected]>
…into kmonte/shared-backend-decomp-3
- Fix typo "patition" → "partition" in docstrings. - Clear _fetch_stats_by_channel_id on shutdown. - Fix init_sampling_backend race via lock-overlap on backend_state.lock; second callers block on that lock and observe init_complete / init_error instead of racing to a rolled-back map entry. - Snapshot active_channels length under self._lock in register_sampling_input so the log reflects state at registration. - Serialize destroy_sampling_input and start_new_epoch_sampling under channel_state.lock, restoring the pre-two-phase destroy/start_epoch invariant. Move _fetch_stats_by_channel_id.pop inside self._lock. - Document idempotency of start_new_epoch_sampling and the expected QueueTimeoutError case in fetch_one_sampled_message. Adds three concurrency regression tests: two for the init_sampling_backend race (success and failure-propagation via __cause__), and one verifying monotonic dispatch under the channel_state.lock wrap on start_new_epoch_sampling. Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
…pchat/GiGL into kmonte/shared-backend-decomp-3
|
/all_test |
GiGL Automation@ 15:58:09UTC : 🔄 @ 17:27:53UTC : ✅ Workflow completed successfully. |
GiGL Automation@ 15:58:10UTC : 🔄 @ 16:05:37UTC : ✅ Workflow completed successfully. |
GiGL Automation@ 15:58:11UTC : 🔄 @ 17:02:30UTC : ✅ Workflow completed successfully. |
GiGL Automation@ 15:58:12UTC : 🔄 @ 16:07:48UTC : ✅ Workflow completed successfully. |
GiGL Automation@ 15:58:13UTC : 🔄 @ 17:14:12UTC : ✅ Workflow completed successfully. |
mkolodner-sc
left a comment
There was a problem hiding this comment.
Thanks Kyle! Left a couple small comments, but generally LGTM
|
/all_test |
GiGL Automation@ 18:44:29UTC : 🔄 @ 19:51:33UTC : ✅ Workflow completed successfully. |
GiGL Automation@ 18:44:31UTC : 🔄 @ 18:51:23UTC : ✅ Workflow completed successfully. |
GiGL Automation@ 18:44:31UTC : 🔄 @ 20:11:01UTC : ✅ Workflow completed successfully. |
GiGL Automation@ 18:44:32UTC : 🔄 @ 20:16:00UTC : ✅ Workflow completed successfully. |
GiGL Automation@ 18:44:32UTC : 🔄 @ 18:52:39UTC : ✅ Workflow completed successfully. |
|
/all_test |
GiGL Automation@ 18:54:50UTC : 🔄 @ 20:15:53UTC : ❌ Workflow failed. |
GiGL Automation@ 18:54:51UTC : 🔄 @ 19:04:03UTC : ✅ Workflow completed successfully. |
GiGL Automation@ 18:54:51UTC : 🔄 @ 19:01:29UTC : ✅ Workflow completed successfully. |
GiGL Automation@ 18:54:52UTC : 🔄 @ 19:51:55UTC : ✅ Workflow completed successfully. |
GiGL Automation@ 18:54:54UTC : 🔄 @ 20:15:44UTC : ✅ Workflow completed successfully. |
Switch loaders to two-phase init and remove bridge methods
Refactor BaseDistLoader to use the two-phase sampling API directly:
Key changes:
for generic leader-elected grouped RPC dispatch
type-prefixed _backend_key already prevents cross-type collisions
so backend-sharing assertions are exercised across ranks