Adding multisample feature along with testcases#740
Adding multisample feature along with testcases#740VijayVignesh1 wants to merge 29 commits intoLightning-AI:mainfrom
Conversation
1b01b6f to
6a77302
Compare
|
@tchaton @deependujha @bhimrazy Can you verify the approach once? I can then make changes to the README. |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #740 +/- ##
===================================
- Coverage 81% 81% -0%
===================================
Files 54 54
Lines 7613 7642 +29
===================================
+ Hits 6140 6163 +23
- Misses 1473 1479 +6 🚀 New features to boost your workflow:
|
|
Closing this PR due to inactivity. Please feel free to reopen or recreate it whenever convenient. |
|
Hi @bhimrazy, |
|
@deependujha The pipeline is failing on test_wav_deserialization in VideoSerializer, even though my PR didn’t touch this file. Do you know why it might be failing? UPDATE: The latest TorchVision release (v0.26) has deprecated |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated 7 comments.
Comments suppressed due to low confidence (1)
src/litdata/streaming/dataset.py:499
- Resume logic appears inconsistent with the new inflated indexing.
num_samples_yieldedcounts yielded samples (now multiplied bysample_count), but_replay_chunks_samplingsubtracts raw interval sizes (not scaled bysample_count), sochunks_index/indexeswill be wrong whensample_count>1. This will cause_resume()to restart from the wrong chunk/offset (repeats or skips). The replay math needs to incorporatesample_count(and ideally persist/validatesample_countin the state_dict).
# TODO: Implement elastic sampling where the number of workers, ranks can change.
num_samples_yielded = self._state_dict["num_samples_yielded"]
worker_start = self.distributed_env.global_rank * num_workers
worker_end = worker_start + num_workers
# replay sampling from each worker / chunks using the batch size
indexes = _replay_sampling(num_samples_yielded, batch_size, num_workers)
chunks_index, indexes = _replay_chunks_sampling(
workers_intervals={i: workers_intervals[j] for i, j in enumerate(range(worker_start, worker_end))},
indexes=indexes,
)
# select the chunks and intervals associated to this worker
worker_rank = self.distributed_env.global_rank * self.worker_env.world_size + self.worker_env.rank
worker_local_rank = self.worker_env.rank
self.num_chunks = len(workers_intervals[worker_rank])
self.worker_next_chunk_index = chunks_index[worker_local_rank]
self.worker_chunks = workers_chunks[worker_rank]
self.worker_intervals = workers_intervals[worker_rank]
if self.worker_next_chunk_index >= self.num_chunks:
# This can happen when interrupting and resuming after some but not all workers are done.
# Proceeding would result in an indexing error when attempting to access the next chunk.
# To prevent this we exit early and let the worker raise a StopIteration in __next__.
return
# replay the indexes for the current chunks
interval = self.worker_intervals[self.worker_next_chunk_index]
# multiply the interval by the sample_count for multisample case
current_indexes = np.arange(interval[1] * self.sample_count, interval[2] * self.sample_count)
# re-shuffle the indexes
current_indexes = self.shuffler(
current_indexes, self.num_chunks, self.current_epoch, self.worker_next_chunk_index
)
# skip any indexes already consumed
current_indexes = current_indexes[indexes[worker_local_rank] :]
self.upcoming_indexes = current_indexes
self.global_index = indexes[worker_local_rank]
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
|
Doesn't support resuming with Also, |
deependujha
left a comment
There was a problem hiding this comment.
Thanks for the cool work. 🥳
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated 8 comments.
Comments suppressed due to low confidence (1)
src/litdata/streaming/dataset.py:505
- When
sample_count > 1, resume logic will miscompute which chunk to resume from because_replay_chunks_sampling(...)subtracts chunk sizes in base-item units (interval[2]-interval[1]), whilenum_samples_yieldedis counted in yielded (virtual) samples. This can cause resuming in the wrong chunk / offset. The replay logic needs to account forsample_count(e.g., treat each interval size as(interval[2]-interval[1]) * sample_count).
# replay the indexes for the current chunks
interval = self.worker_intervals[self.worker_next_chunk_index]
# multiply the interval by the sample_count for multisample case
current_indexes = np.arange(interval[1] * self.sample_count, interval[2] * self.sample_count)
# re-shuffle the indexes
current_indexes = self.shuffler(
current_indexes, self.num_chunks, self.current_epoch, self.worker_next_chunk_index
)
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
requires #806 |
Before submitting
What does this PR do?
Fixes #317
PR review
Added support for multisample item.
Basically added a sample_count parameter which creates a batch of sub samples for each sample, given a single transform function.
Sample code:
Anyone in the community is free to review the PR once the tests have passed.
If we didn't discuss your PR in GitHub issues there's a high chance it will not be merged.
Did you have fun?
Make sure you had fun coding 🙃