Skip to content

[Fix][Feat] Fix worker sorting with external pg bundles & Support persistent buffer for update_params#4397

Open
CyCle1024 wants to merge 6 commits intoInternLM:mainfrom
CyCle1024:update_params_persist_buffer
Open

[Fix][Feat] Fix worker sorting with external pg bundles & Support persistent buffer for update_params#4397
CyCle1024 wants to merge 6 commits intoInternLM:mainfrom
CyCle1024:update_params_persist_buffer

Conversation

@CyCle1024
Copy link
Copy Markdown
Collaborator

@CyCle1024 CyCle1024 commented Mar 6, 2026

Motivation

This PR addresses two issues related to distributed training parameter update in RLHF scenarios:

  1. Worker Sorting Issue: When using external placement group bundle indices (e.g., in specific Ray cluster configurations), the workers might be incorrectly sorted by IP address, which broke the intended worker-to-bundle mapping.

  2. Memory Copy Overhead: The update_params interface used for RL training required cloning tensors for IPC (Inter-Process Communication) on every parameter update, causing unnecessary memory overhead and synchronization costs.

Modification

  1. ray_executor.py:

    • Modified _sort_workers to skip IP-based sorting when external bundle indices are specified via LMDEPLOY_RAY_EXTERNAL_PG_BUNDLES env var
    • Refactored bundle index handling to preserve user-specified order when using external placement groups
    • Removed unused _valid_bundle_id method
  2. agent.py:

    • Added persistent IPC buffer (_update_params_ipc_tensor) and CUDA event (_update_params_ipc_event) for efficient parameter updates
    • Support receiving pre-allocated tensor and IPC event handle from upstream to avoid unnecessary tensor cloning
    • Proper cleanup of IPC resources after parameter updates and during sleep mode
  3. utils.py:

    • Enhanced FlattenedTensorBucket to support optional pre-allocated flattened tensor buffer
    • Made flattened_tensor field optional in serialization to support zero-copy scenarios

BC-breaking (Optional)

No BC-breaking changes. The modifications are backward compatible:

  • External bundle index handling is opt-in via environment variable
  • The new persistent buffer mechanism gracefully falls back to clone behavior when not provided

Use cases (Optional)

This PR optimizes RL training workflows where:

  • Ray placement groups are pre-configured with specific bundle indices
  • Frequent parameter updates occur between training steps
  • Minimizing memory copy overhead is critical for performance

Checklist

  1. Pre-commit or other linting tools are used to fix the potential lint issues.
  2. The modification is covered by complete unit tests. If not, please add more unit tests to ensure the correctness.
  3. If the modification has a dependency on downstream projects of a newer version, this PR should be tested with all supported versions of downstream projects.
  4. The documentation has been modified accordingly, like docstring or example tutorials.

Copilot AI review requested due to automatic review settings March 6, 2026 04:48
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR improves RLHF/distributed weight-update workflows by (1) preserving Ray worker/bundle ordering when users provide external placement group bundle indices and (2) reducing IPC cloning overhead for frequent update_params calls via a persistent IPC tensor/event, supported by a preallocated flattened-tensor bucket path.

Changes:

  • Ray: skip IP-based worker sorting and preserve user-provided bundle index order when LMDEPLOY_RAY_EXTERNAL_PG_BUNDLES is set.
  • Pytorch agent: add persistent IPC tensor/event handling to avoid per-update cloning when possible.
  • Utils: extend FlattenedTensorBucket and serialization to support optional/preallocated flattened buffers (zero/low-copy paths).

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 7 comments.

File Description
lmdeploy/utils.py Makes flattened bucket serialization more flexible and adds a preallocated buffer path for concatenation.
lmdeploy/pytorch/engine/model_agent/agent.py Adds persistent IPC tensor/event support in update_params and cleanup during sleep/finalization.
lmdeploy/pytorch/engine/executor/ray_executor.py Adjusts worker sorting and bundle index selection for externally-provided Ray PG bundle indices.
Comments suppressed due to low confidence (1)

lmdeploy/pytorch/engine/model_agent/agent.py:1183

  • The comment says request.serialized_named_tensors is now a dict ..., but the code still treats request.serialized_named_tensors as base64-encoded bytes and the deserialized object (weights) is the dict. Please reword this to avoid misleading API users (especially since UpdateParamsRequest.serialized_named_tensors is typed as Union[str, List[str], Dict]).
            # request.serialized_named_tensors is now a dict with following keys:
            # - metadata: List[FlattenedTensorMetadata]
            # - flattened_tensor: the flattened tensor for weights, optional
            # - event_ipc_handle: the ipc handle of the event
            # that used to sync stream across processes, optional
            serialized_data = request.serialized_named_tensors
            if isinstance(serialized_data, list):
                serialized_data = serialized_data[self.dist_ctx.tp_group.rank]
            model = self.patched_model.get_model()
            weights = ForkingPickler.loads(pybase64.b64decode(serialized_data))

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

assert flattened_tensor.numel() >= current_idx, \
'Provided flattened tensor numel is smaller than ' + \
f'required numel: {flattened_tensor.numel()} < {current_idx}'
assert sum([t.numel() for t in flattened_tensor_list]) == current_idx
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These new branches add non-trivial behavior (optional flattened_tensor in serialization and the preallocated-buffer out= cat path), but there are no unit tests covering them. Since this module already has tests (tests/test_lmdeploy/test_utils.py), adding focused tests around this initialization path would help prevent regressions.

Suggested change
assert sum([t.numel() for t in flattened_tensor_list]) == current_idx
assert sum([t.numel() for t in flattened_tensor_list]) == current_idx
# Validate that the provided preallocated buffer is compatible with inputs.
# All named_tensors are already verified to share the same dtype.
first_tensor = named_tensors[0][1] if named_tensors else None
if first_tensor is not None:
if flattened_tensor.dtype != first_tensor.dtype:
raise ValueError(
f'flattened_tensor dtype {flattened_tensor.dtype} does not match '
f'input tensors dtype {first_tensor.dtype}'
)
if flattened_tensor.device != first_tensor.device:
raise ValueError(
f'flattened_tensor device {flattened_tensor.device} does not match '
f'input tensors device {first_tensor.device}'
)
if not flattened_tensor.is_contiguous():
raise ValueError('flattened_tensor must be contiguous when used as an output buffer')

Copilot uses AI. Check for mistakes.
Comment on lines +1187 to 1199
if 'flattened_tensor' in weights:
self._update_params_ipc_tensor = _construct(weights['flattened_tensor'],
require_clone=weights.get('require_clone', True))
if 'event_ipc_handle' in weights and hasattr(torch.cuda.Event, 'from_ipc_handle'):
self._update_params_ipc_event = torch.cuda.Event.from_ipc_handle(
device=torch.cuda.current_device(),
handle=weights['event_ipc_handle'],
)
flattened_tensor: torch.Tensor = self._update_params_ipc_tensor
if self._update_params_ipc_event is not None:
self._update_params_ipc_event.wait()
bucket = FlattenedTensorBucket(flattened_tensor=flattened_tensor, metadata=metadata)
weights = bucket.reconstruct_tensors()
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When request.load_format == 'flattened_bucket' and metadata is non-empty, this code will pass flattened_tensor=None into FlattenedTensorBucket if (a) the request omits flattened_tensor and (b) _update_params_ipc_tensor has not been initialized from a prior call. That will raise in FlattenedTensorBucket.__init__ and makes the “flattened_tensor optional” behavior fragile; consider explicitly erroring with a clear message unless a cached _update_params_ipc_tensor already exists (or require flattened_tensor on the first call).

Copilot uses AI. Check for mistakes.
Comment on lines +1190 to +1197
if 'event_ipc_handle' in weights and hasattr(torch.cuda.Event, 'from_ipc_handle'):
self._update_params_ipc_event = torch.cuda.Event.from_ipc_handle(
device=torch.cuda.current_device(),
handle=weights['event_ipc_handle'],
)
flattened_tensor: torch.Tensor = self._update_params_ipc_tensor
if self._update_params_ipc_event is not None:
self._update_params_ipc_event.wait()
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the producer supplies event_ipc_handle but the local PyTorch build lacks torch.cuda.Event.from_ipc_handle, this silently skips cross-process stream synchronization and proceeds to read from the IPC tensor. That can lead to stale/partially-written weights; consider failing fast (or forcing require_clone=True / torch.cuda.synchronize() fallback) when an event handle is provided but cannot be imported.

Copilot uses AI. Check for mistakes.
Comment on lines +543 to +547
assert len(flattened_tensor.shape) == 1, 'flattened_tensor must be 1-D tensor'
assert flattened_tensor.numel() >= current_idx, \
'Provided flattened tensor numel is smaller than ' + \
f'required numel: {flattened_tensor.numel()} < {current_idx}'
assert sum([t.numel() for t in flattened_tensor_list]) == current_idx
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the preallocated-buffer path, input validation uses assert and only checks 1-D and numel(). Since this is user-provided input (and asserts can be stripped with -O), it would be safer to raise ValueError with a clear message and also validate dtype/device (and ideally contiguity) match the source tensors to avoid cryptic torch.cat(..., out=...) runtime errors.

Suggested change
assert len(flattened_tensor.shape) == 1, 'flattened_tensor must be 1-D tensor'
assert flattened_tensor.numel() >= current_idx, \
'Provided flattened tensor numel is smaller than ' + \
f'required numel: {flattened_tensor.numel()} < {current_idx}'
assert sum([t.numel() for t in flattened_tensor_list]) == current_idx
# Validate user-provided preallocated buffer.
if flattened_tensor.dim() != 1:
raise ValueError(
f'flattened_tensor must be a 1-D tensor, but got shape {tuple(flattened_tensor.shape)}'
)
if flattened_tensor.numel() < current_idx:
raise ValueError(
'Provided flattened tensor numel is smaller than required numel: '
f'{flattened_tensor.numel()} < {current_idx}'
)
# Ensure dtype and device are compatible with source tensors.
reference_tensor = named_tensors[0][1]
if flattened_tensor.dtype != reference_tensor.dtype:
raise ValueError(
f'flattened_tensor dtype {flattened_tensor.dtype} does not match source '
f'tensors dtype {reference_tensor.dtype}'
)
if flattened_tensor.device != reference_tensor.device:
raise ValueError(
f'flattened_tensor device {flattened_tensor.device} does not match source '
f'tensors device {reference_tensor.device}'
)
if not flattened_tensor.is_contiguous():
raise ValueError('flattened_tensor must be contiguous')
total_numel = sum(t.numel() for t in flattened_tensor_list)
if total_numel != current_idx:
raise ValueError(
'Mismatch between computed and expected flattened size: '
f'{total_numel} != {current_idx}'
)

Copilot uses AI. Check for mistakes.
@lvhan028 lvhan028 requested review from RunningLeon and grimoire March 7, 2026 13:58
Copy link
Copy Markdown
Collaborator

@grimoire grimoire left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. @RunningLeon We better abstract weight-update logical from ModelAgent to make it more maintainable.

@windreamer windreamer force-pushed the update_params_persist_buffer branch from 2cadf8d to 53cc026 Compare March 24, 2026 03:36
@windreamer
Copy link
Copy Markdown
Collaborator

Rebase on recent main branch, resolve the conflict and try force push.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 4 out of 4 changed files in this pull request and generated 6 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +37 to +75
# Test 4: Error case - buffer too small
preallocated_small = torch.empty(50, dtype=torch.float32, device='cuda') # Only 50 elements, need 200
try:
FlattenedTensorBucket(named_tensors=named_tensors, flattened_tensor=preallocated_small)
assert False, 'Should have raised ValueError'
except ValueError as e:
assert 'smaller than required numel' in str(e)

# Test 5: Error case - wrong dtype
preallocated_wrong_dtype = torch.empty(200, dtype=torch.float64, device='cuda')
try:
FlattenedTensorBucket(named_tensors=named_tensors, flattened_tensor=preallocated_wrong_dtype)
assert False, 'Should have raised ValueError'
except ValueError as e:
assert 'dtype' in str(e)

# Test 6: Error case - wrong device (CPU buffer for CUDA tensors)
preallocated_cpu = torch.empty(200, dtype=torch.float32, device='cpu')
try:
FlattenedTensorBucket(named_tensors=named_tensors, flattened_tensor=preallocated_cpu)
assert False, 'Should have raised ValueError'
except ValueError as e:
assert 'device' in str(e)

# Test 7: Error case - non-contiguous tensor
preallocated_non_contig = torch.empty(400, dtype=torch.float32, device='cuda')[::2] # Strided view
try:
FlattenedTensorBucket(named_tensors=named_tensors, flattened_tensor=preallocated_non_contig)
assert False, 'Should have raised ValueError'
except ValueError as e:
assert 'contiguous' in str(e)

# Test 8: Error case - not 1-D tensor
preallocated_2d = torch.empty(10, 20, dtype=torch.float32, device='cuda')
try:
FlattenedTensorBucket(named_tensors=named_tensors, flattened_tensor=preallocated_2d)
assert False, 'Should have raised ValueError'
except ValueError as e:
assert '1-D tensor' in str(e)
Copy link

Copilot AI Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error-path assertions use try/except with assert False rather than pytest helpers. Using pytest.raises(...) (optionally with match=) makes the intent clearer and avoids false positives if an unexpected exception type is thrown.

Copilot uses AI. Check for mistakes.
Comment on lines +1194 to +1206
if 'flattened_tensor' in weights:
# Determine if clone is required
require_clone = weights.get('require_clone', True)
if 'event_ipc_handle' in weights and not hasattr(torch.cuda.Event, 'from_ipc_handle'):
# Force clone when IPC event is provided but cannot be used
require_clone = True
self._update_params_ipc_tensor = _construct(weights['flattened_tensor'],
require_clone=require_clone)
elif self._update_params_ipc_tensor is None:
raise ValueError(
'flattened_tensor is not provided in weights and no cached ipc tensor is available. '
'Please provide flattened_tensor on the first update_params call.')
if 'event_ipc_handle' in weights and hasattr(torch.cuda.Event, 'from_ipc_handle'):
Copy link

Copilot AI Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update_params allows omitting flattened_tensor on subsequent calls as long as _update_params_ipc_tensor is cached. If the cached tensor was created via .clone() (default require_clone=True), this will silently reuse stale weights instead of updated producer memory. Consider tracking whether the cached buffer is an IPC-backed tensor vs a cloned copy, and require flattened_tensor to be present whenever cloning is used (or error with a clearer message).

Copilot uses AI. Check for mistakes.
Comment on lines +585 to +592
if not _envs.ray_external_pg_bundles:
for bundle_id, bundle in enumerate(placement_group.bundle_specs):
if bundle.get(device_str, 0):
bundle_indices.append(bundle_id)
else:
# use external specified bundle indices,keep the order as well
bundle_indices = _envs.ray_external_pg_bundles.copy()
# validate external bundle indices
Copy link

Copilot AI Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_sort_workers explicitly says external bundle handling should apply only when lmdeploy does not own the placement group, but _init_workers_ray still unconditionally uses _envs.ray_external_pg_bundles whenever it is set. This can override lmdeploy-owned PG bundle selection and potentially break internal worker-to-bundle assumptions. Consider gating the external-bundle branch on not self.ray_ctx.owned_pg as well.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Collaborator

@RunningLeon RunningLeon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants