[Feature][Connector-V2] Maxcompute Source Round-Robin Split Assignment#11131
[Feature][Connector-V2] Maxcompute Source Round-Robin Split Assignment#11131zhiliang-wu wants to merge 2 commits into
Conversation
c9eed42 to
83ccf17
Compare
DanielLeens
left a comment
There was a problem hiding this comment.
Thanks for working on this. I reviewed the current diff from the MaxCompute source enumerator down to the actual split assignment path instead of only looking at the unit test update. The goal of the change makes sense: the old logic gave each reader a contiguous range, which can leave the tail split distribution uneven when split_row is much smaller than the per-reader range.
Runtime path I checked:
Job startup
-> MaxcomputeSourceSplitEnumerator.run()
-> discoverySplits()
-> build MaxcomputeSourceSplit(start, count, tablePath, ownerReader)
-> addSplitChangeToPendingAssignments()
-> assignPendingSplits()
-> enumeratorContext.assignSplit(reader, splits)
Blocking item:
-
seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceSplitEnumerator.java:115-139
The round-robin counter is reset for everysourceTableInfobecausechunkIndexis declared inside the per-table loop. That means the first split of every table still goes to reader 0, the second split of every table still goes to reader 1, and so on. On the normal multi-table path this can still skew badly. A simple example is 2 readers + many small tables that each produce 1 split: every table will assign its only split to reader 0, so reader 1 stays idle. In other words, the PR only round-robins within one table, not across the full enumerator workload that is actually assigned inallSplit.I would treat this as a blocker because the advertised behavior is global round-robin assignment, but the current implementation still produces concentrated load on the multi-table path. The test at
.../MaxcomputeSourceSplitTest.java:30-72only covers a single table, so it misses the real gap.
Suggested fix:
- keep the counter outside the table loop, or
- derive the owner from the total split ordinal after aggregating all tables,
- then add a regression test with multiple tables where each table produces 1 or 2 splits.
Merge conclusion: fix required before merge. Happy to re-review once you push an update.
83ccf17 to
cf0d14f
Compare
|
Thank you for your review. I've just implemented your exact suggested fix:
|
|
Thanks for the update, and the intended fix sounds aligned with the blocker I called out. I checked the PR activity before replying here, and I still do not see a new commit on the current head yet. Since the code itself has not changed on GitHub, I am keeping this as a reply-only follow-up for now rather than starting a new full review round on the same version. Once you push the updated commit with the moved |
|
It is the same commit but I have updated it with the moved chunkIndex scope and the multi-table regression test. |
DanielLeens
left a comment
There was a problem hiding this comment.
Thanks for the ping. I do see the updated head now, and I re-reviewed the latest commit from scratch against the real MaxCompute split enumeration path.
The earlier Daniel blocker is resolved on this revision: chunkIndex is now defined outside the per-table loop in MaxcomputeSourceSplitEnumerator.java:115, so the round-robin assignment no longer restarts from reader 0 for every table.
What this PR fixes
- Pain point: the old split assignment could become uneven when
split_rowis small, especially on the multi-table path where many small tables could still keep sending their first split to the same reader. - Approach: assign each chunk by a global
chunkIndex % numReaders, and keep that counter across all tables instead of resetting it inside each table loop. - Simple example: with 5 tables, 2 splits per table, and 3 readers, the current head distributes them as
4 / 3 / 3instead of restarting each table from reader 0.
Full runtime path I reviewed
Job startup
-> MaxcomputeSourceSplitEnumerator.run()
-> discoverySplits() [MaxcomputeSourceSplitEnumerator.java:112-143]
-> iterate all SourceTableInfo
-> ownerReader = chunkIndex % numReaders
-> remove already assigned splits
-> addSplitChangeToPendingAssignments(allSplit)
-> assignPendingSplits() [MaxcomputeSourceSplitEnumerator.java:153-170]
-> enumeratorContext.assignSplit(reader, splits)
Findings
- The normal path definitely hits this change; this is the shared split enumeration path, not a side helper.
- The previous cross-table round-robin blocker is fixed on the current head because the counter now persists across all tables.
MaxcomputeSourceSplitTest.java:75-112now adds the missing multi-table regression case that I asked for in the previous round.- I did not find a new correctness, compatibility, or test-stability blocker on the current head.
Compatibility / side effects
- No API/config/protocol change here.
- CPU and memory impact should be negligible.
- The practical behavior improvement is better load distribution on the real multi-table path.
Test coverage / stability
- The updated tests cover both the single-table and multi-table round-robin cases.
- These are deterministic in-memory tests with no timing or environment sensitivity. I would rate them as stable.
- The current
Buildis green on this head.
Merge conclusion
Conclusion: can merge
- Blockers
- None on the current head.
- Non-blocking suggestions
- None.
Overall, the current revision closes the exact blocker from the previous round, and it looks good to merge from my side.
|
Thanks @zhiliang-wu , The whole point of the change is preserving order, but The tests also re-implement the |
|
Thanks for calling this out, @zhangshenghang — I rechecked the current head and I agree your concern is a useful one. My earlier approval was based on the production logic itself: the current That said, your testing point is fair: the current test would be stronger if it asserted the per-reader Since there is still no new commit on this PR, I’m treating this as a reply-only follow-up rather than opening a new full review round on the same revision. If @zhiliang-wu pushes a small test-only follow-up for the ordering assertion / production-path coverage, I’m happy to re-review the updated head. |
|
Thanks for the great feedback! You made excellent points regarding the tests only verifying a copy of the logic
|
DanielLeens
left a comment
There was a problem hiding this comment.
Thanks for the update. I re-reviewed the latest head from split discovery through assignment, and this looks good to merge from the current source state.
What this PR fixes
- User pain: The old MaxCompute split discovery grouped chunks by table order, so the first readers could get a skewed run of chunks from large tables.
- Fix approach: The PR extracts split computation into
computeSplits()and uses a globalchunkIndexto assign owners round-robin across all tables. - One-line summary: Split assignment is now interleaved across tables instead of being clustered per table.
Runtime path I traced
Split discovery
-> `discoverySplits()` computes record counts
-> `computeSplits()` walks all tables and increments one global `chunkIndex`
-> `ownerReader = chunkIndex % numReaders`
-> pending assignments are filled from that interleaved split set
Result
-> ownership is distributed across tables on the normal enumerator path
Key findings
- The normal enumerator path hits this change directly.
- Ownership is no longer implicitly tied to per-table chunk grouping.
- The updated test now checks the behavior that actually matters.
Blocking items
None at the source-code level on the current head.
Other review notes
- No additional non-blocking source-level comments from this round.
Compatibility and side effects
- Compatibility: Fully backward compatible. This only changes split distribution strategy.
- Side effects: No meaningful resource-model downside from the new owner calculation.
- Error handling / logging: No new error-handling concern from this diff.
- Tests: The updated test now exercises ordering / distribution semantics instead of only counting splits.
- Docs: No docs change is required.
- CI: The current head build is green.
Merge verdict
- I do not see a blocker on the current head. The round-robin split assignment now looks deterministic and correctly exercised by the updated test.
Purpose of this pull request
Currently, the MaxcomputeSource divides the total record count into massive contiguous blocks sequentially (e.g., Reader 0 gets the first 25% of the table, Reader 1 gets the next 25%). Furthermore, it stores these splits internally using a HashSet, which completely scrambles the split assignment order before they are processed by the readers.
Because of this, if a user uses ORDER BY or RANGE CLUSTERED BY in their MaxCompute table to prioritize data, running SeaTunnel with parallelism > 1 destroys this ordering entirely.
This PR introduces a Round-Robin Split Assignment logic:
Replaces the massive contiguous block chunking with striped/round-robin chunking (e.g. Reader 0 gets chunk 1, Reader 1 gets chunk 2). This allows all parallel readers to pull from the "top" of the table simultaneously, preserving a high-level priority order across multiple threads.
Replaces HashSet with LinkedHashSet in MaxcomputeSourceSplitEnumerator to guarantee that the chronological insertion order of splits is strictly preserved when assigning them to the readers.
Does this PR introduce any user-facing change?
No. This PR only modifies the internal split enumeration and distribution logic for the MaxCompute Source. It introduces no new configuration options and does not break any existing contracts. It purely optimizes the reading order to be more evenly distributed across parallel readers.
How was this patch tested?
Ran mvn clean install -pl seatunnel-connectors-v2/connector-maxcompute -am -DskipTests to verify compilation and dependency integrity.
Manually tested the execution graph locally to verify that splits are correctly striped and assigned in round-robin order without hash scrambling.
Check list
New License Guide
incompatible-changes.mdto describe the incompatibility caused by this PR.