Add two-phase sampling API to DistServer#578
Conversation
…r.py Move create_dist_sampler(), SamplerInput, and SamplerRuntime out of dist_sampling_producer.py into a shared utils module so they can be reused by the upcoming SharedDistSamplingBackend. Also rename `w` -> `worker` in DistSamplingProducer.init() for clarity. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Introduce SharedDistSamplingBackend which manages a pool of worker processes servicing multiple compute-rank channels through a fair-queued round-robin scheduler. This replaces the per-channel producer model in graph-store mode with a shared backend + lightweight per-channel state. Includes tests for pure business logic helpers (_compute_num_batches, _epoch_batch_indices, _compute_worker_seeds_ranges), shuffle behavior, and completion reporting. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace the single-step create_sampling_producer with a two-phase API: - init_sampling_backend: creates/reuses a SharedDistSamplingBackend - register_sampling_input: registers a lightweight per-channel input The existing create_sampling_producer/destroy_sampling_producer methods are preserved as bridge methods that delegate to the new API, keeping existing loaders working without changes. Also adds InitSamplingBackendRequest and RegisterBackendRequest message dataclasses, and per-channel fetch stats logging. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
/all_test |
GiGL Automation@ 21:11:56UTC : 🔄 @ 21:19:11UTC : ✅ Workflow completed successfully. |
GiGL Automation@ 21:11:56UTC : 🔄 @ 21:19:07UTC : ✅ Workflow completed successfully. |
GiGL Automation@ 21:11:58UTC : 🔄 @ 22:35:09UTC : ✅ Workflow completed successfully. |
GiGL Automation@ 21:12:01UTC : 🔄 @ 22:24:16UTC : ✅ Workflow completed successfully. |
GiGL Automation@ 21:12:02UTC : 🔄 @ 22:30:05UTC : ✅ Workflow completed successfully. |
…r module docstring Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…sses Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…into kmonte/shared-backend-decomp-3
|
/all_test |
GiGL Automation@ 23:13:27UTC : 🔄 @ 24:24:24UTC : ✅ Workflow completed successfully. |
GiGL Automation@ 23:13:28UTC : 🔄 @ 23:17:18UTC : ❌ Workflow failed. |
GiGL Automation@ 23:13:29UTC : 🔄 @ 23:22:27UTC : ✅ Workflow completed successfully. |
GiGL Automation@ 23:13:31UTC : 🔄 @ 24:20:52UTC : ✅ Workflow completed successfully. |
GiGL Automation@ 23:13:31UTC : 🔄 @ 24:43:22UTC : ✅ Workflow completed successfully. |
|
/all_test |
GiGL Automation@ 24:19:20UTC : 🔄 @ 01:26:03UTC : ✅ Workflow completed successfully. |
GiGL Automation@ 24:19:21UTC : 🔄 @ 24:26:20UTC : ✅ Workflow completed successfully. |
GiGL Automation@ 24:19:21UTC : 🔄 @ 24:26:53UTC : ✅ Workflow completed successfully. |
|
/all_test |
GiGL Automation@ 02:50:48UTC : 🔄 @ 04:15:20UTC : ✅ Workflow completed successfully. |
GiGL Automation@ 02:50:49UTC : 🔄 @ 02:58:50UTC : ✅ Workflow completed successfully. |
GiGL Automation@ 02:50:51UTC : 🔄 @ 02:58:14UTC : ✅ Workflow completed successfully. |
GiGL Automation@ 02:50:51UTC : 🔄 @ 04:05:30UTC : ✅ Workflow completed successfully. |
GiGL Automation@ 02:50:52UTC : 🔄 @ 03:47:46UTC : ✅ Workflow completed successfully. |
mkolodner-sc
left a comment
There was a problem hiding this comment.
Thanks Kyle! Did a pass here
- Fix typo "patition" → "partition" in docstrings. - Clear _fetch_stats_by_channel_id on shutdown. - Fix init_sampling_backend race via lock-overlap on backend_state.lock; second callers block on that lock and observe init_complete / init_error instead of racing to a rolled-back map entry. - Snapshot active_channels length under self._lock in register_sampling_input so the log reflects state at registration. - Serialize destroy_sampling_input and start_new_epoch_sampling under channel_state.lock, restoring the pre-two-phase destroy/start_epoch invariant. Move _fetch_stats_by_channel_id.pop inside self._lock. - Document idempotency of start_new_epoch_sampling and the expected QueueTimeoutError case in fetch_one_sampled_message. Adds three concurrency regression tests: two for the init_sampling_backend race (success and failure-propagation via __cause__), and one verifying monotonic dispatch under the channel_state.lock wrap on start_new_epoch_sampling. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
/all_test |
GiGL Automation@ 15:16:59UTC : 🔄 @ 16:22:15UTC : ✅ Workflow completed successfully. |
GiGL Automation@ 15:17:00UTC : 🔄 @ 15:25:02UTC : ✅ Workflow completed successfully. |
GiGL Automation@ 15:17:00UTC : 🔄 @ 15:24:40UTC : ✅ Workflow completed successfully. |
GiGL Automation@ 15:17:01UTC : 🔄 @ 16:51:04UTC : ✅ Workflow completed successfully. |
Replace the single-step create_sampling_producer with a two-phase API:
We do this so we can re-use the sampler backends across the storage cluster, this greatly improves on cluster stability and lets us save on process_start_gap_seconds time.
I will have a followup PR (#579) where we have BaseDistLoader use the new two-phase API, but for now we just delegate to the two phases in DistServer.
Note, we really should have that follow up, as this approach means we'd be creating one dist sampling process tree per input still, which we are trying to avoid (but should be fine as a temporary standin to help make the reviews easier).
The existing create_sampling_producer/destroy_sampling_producer methods
are preserved as bridge methods that delegate to the new API, keeping
existing loaders working without changes.
Also adds InitSamplingBackendRequest and RegisterBackendRequest message
dataclasses, and per-channel fetch stats logging.