Skip to content

[Feature][Connector-V2] Maxcompute Source Round-Robin Split Assignment#11131

Open
zhiliang-wu wants to merge 2 commits into
apache:devfrom
zhiliang-wu:feature-maxcompute-split-order
Open

[Feature][Connector-V2] Maxcompute Source Round-Robin Split Assignment#11131
zhiliang-wu wants to merge 2 commits into
apache:devfrom
zhiliang-wu:feature-maxcompute-split-order

Conversation

@zhiliang-wu

Copy link
Copy Markdown
Contributor

Purpose of this pull request

Currently, the MaxcomputeSource divides the total record count into massive contiguous blocks sequentially (e.g., Reader 0 gets the first 25% of the table, Reader 1 gets the next 25%). Furthermore, it stores these splits internally using a HashSet, which completely scrambles the split assignment order before they are processed by the readers.

Because of this, if a user uses ORDER BY or RANGE CLUSTERED BY in their MaxCompute table to prioritize data, running SeaTunnel with parallelism > 1 destroys this ordering entirely.

This PR introduces a Round-Robin Split Assignment logic:

Replaces the massive contiguous block chunking with striped/round-robin chunking (e.g. Reader 0 gets chunk 1, Reader 1 gets chunk 2). This allows all parallel readers to pull from the "top" of the table simultaneously, preserving a high-level priority order across multiple threads.
Replaces HashSet with LinkedHashSet in MaxcomputeSourceSplitEnumerator to guarantee that the chronological insertion order of splits is strictly preserved when assigning them to the readers.

Does this PR introduce any user-facing change?

No. This PR only modifies the internal split enumeration and distribution logic for the MaxCompute Source. It introduces no new configuration options and does not break any existing contracts. It purely optimizes the reading order to be more evenly distributed across parallel readers.

How was this patch tested?

Ran mvn clean install -pl seatunnel-connectors-v2/connector-maxcompute -am -DskipTests to verify compilation and dependency integrity.
Manually tested the execution graph locally to verify that splits are correctly striped and assigned in round-robin order without hash scrambling.

Check list

@DanielLeens DanielLeens left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this. I reviewed the current diff from the MaxCompute source enumerator down to the actual split assignment path instead of only looking at the unit test update. The goal of the change makes sense: the old logic gave each reader a contiguous range, which can leave the tail split distribution uneven when split_row is much smaller than the per-reader range.

Runtime path I checked:

Job startup
  -> MaxcomputeSourceSplitEnumerator.run()
      -> discoverySplits()
          -> build MaxcomputeSourceSplit(start, count, tablePath, ownerReader)
      -> addSplitChangeToPendingAssignments()
      -> assignPendingSplits()
          -> enumeratorContext.assignSplit(reader, splits)

Blocking item:

  1. seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceSplitEnumerator.java:115-139
    The round-robin counter is reset for every sourceTableInfo because chunkIndex is declared inside the per-table loop. That means the first split of every table still goes to reader 0, the second split of every table still goes to reader 1, and so on. On the normal multi-table path this can still skew badly. A simple example is 2 readers + many small tables that each produce 1 split: every table will assign its only split to reader 0, so reader 1 stays idle. In other words, the PR only round-robins within one table, not across the full enumerator workload that is actually assigned in allSplit.

    I would treat this as a blocker because the advertised behavior is global round-robin assignment, but the current implementation still produces concentrated load on the multi-table path. The test at .../MaxcomputeSourceSplitTest.java:30-72 only covers a single table, so it misses the real gap.

Suggested fix:

  • keep the counter outside the table loop, or
  • derive the owner from the total split ordinal after aggregating all tables,
  • then add a regression test with multiple tables where each table produces 1 or 2 splits.

Merge conclusion: fix required before merge. Happy to re-review once you push an update.

@zhiliang-wu zhiliang-wu force-pushed the feature-maxcompute-split-order branch from 83ccf17 to cf0d14f Compare June 19, 2026 07:28
@zhiliang-wu

Copy link
Copy Markdown
Contributor Author

Thank you for your review. I've just implemented your exact suggested fix:

  1. Moved the int chunkIndex = 0; declaration to the top of the discoverySplits method, outside of the sourceTableInfos.values() loop so the round-robin distribution persists seamlessly across all tables.
  2. Wrote a new unit test testMultiTableRoundRobinDistribution that simulates exactly the scenario you described (multiple tables pushing 1 or 2 splits each) and validates that the splits are still evenly distributed among the readers across the entire workload footprint.

@DanielLeens

Copy link
Copy Markdown
Contributor

Thanks for the update, and the intended fix sounds aligned with the blocker I called out.

I checked the PR activity before replying here, and I still do not see a new commit on the current head yet. Since the code itself has not changed on GitHub, I am keeping this as a reply-only follow-up for now rather than starting a new full review round on the same version.

Once you push the updated commit with the moved chunkIndex scope and the multi-table regression test, please ping me again and I will re-review the latest head from scratch.

@zhiliang-wu

Copy link
Copy Markdown
Contributor Author

It is the same commit but I have updated it with the moved chunkIndex scope and the multi-table regression test.
Please check the PR activity for the updates

@DanielLeens DanielLeens left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the ping. I do see the updated head now, and I re-reviewed the latest commit from scratch against the real MaxCompute split enumeration path.

The earlier Daniel blocker is resolved on this revision: chunkIndex is now defined outside the per-table loop in MaxcomputeSourceSplitEnumerator.java:115, so the round-robin assignment no longer restarts from reader 0 for every table.

What this PR fixes

  • Pain point: the old split assignment could become uneven when split_row is small, especially on the multi-table path where many small tables could still keep sending their first split to the same reader.
  • Approach: assign each chunk by a global chunkIndex % numReaders, and keep that counter across all tables instead of resetting it inside each table loop.
  • Simple example: with 5 tables, 2 splits per table, and 3 readers, the current head distributes them as 4 / 3 / 3 instead of restarting each table from reader 0.

Full runtime path I reviewed

Job startup
  -> MaxcomputeSourceSplitEnumerator.run()
      -> discoverySplits() [MaxcomputeSourceSplitEnumerator.java:112-143]
          -> iterate all SourceTableInfo
          -> ownerReader = chunkIndex % numReaders
          -> remove already assigned splits
          -> addSplitChangeToPendingAssignments(allSplit)
      -> assignPendingSplits() [MaxcomputeSourceSplitEnumerator.java:153-170]
          -> enumeratorContext.assignSplit(reader, splits)

Findings

  • The normal path definitely hits this change; this is the shared split enumeration path, not a side helper.
  • The previous cross-table round-robin blocker is fixed on the current head because the counter now persists across all tables.
  • MaxcomputeSourceSplitTest.java:75-112 now adds the missing multi-table regression case that I asked for in the previous round.
  • I did not find a new correctness, compatibility, or test-stability blocker on the current head.

Compatibility / side effects

  • No API/config/protocol change here.
  • CPU and memory impact should be negligible.
  • The practical behavior improvement is better load distribution on the real multi-table path.

Test coverage / stability

  • The updated tests cover both the single-table and multi-table round-robin cases.
  • These are deterministic in-memory tests with no timing or environment sensitivity. I would rate them as stable.
  • The current Build is green on this head.

Merge conclusion

Conclusion: can merge

  1. Blockers
  • None on the current head.
  1. Non-blocking suggestions
  • None.

Overall, the current revision closes the exact blocker from the previous round, and it looks good to merge from my side.

@zhangshenghang

zhangshenghang commented Jun 21, 2026

Copy link
Copy Markdown
Member

Thanks @zhiliang-wu , The whole point of the change is preserving order, but MaxcomputeSourceSplitTest only asserts how many splits land on each reader — nothing checks that splits arrive in ascending rowStart per reader, which is the behavior the LinkedHashSet swap is there to guarantee. A future refactor could quietly reintroduce a HashSet and these tests would stay green.

The tests also re-implement the discoverySplits loop inline rather than driving the enumerator, so they validate a copy of the logic, not the production path. At minimum, could you add an assertion that the splits collected per reader are in ascending rowStart order? That's the regression guard that matches the PR's intent.

@DanielLeens

Copy link
Copy Markdown
Contributor

Thanks for calling this out, @zhangshenghang — I rechecked the current head and I agree your concern is a useful one.

My earlier approval was based on the production logic itself: the current chunkIndex scope does fix the cross-table round-robin bug I raised before. So from the runtime-behavior side, I do not see a new correctness regression on the same commit.

That said, your testing point is fair: the current test would be stronger if it asserted the per-reader rowStart ordering directly, and even better if the regression guard exercised the production enumerator path rather than a copied loop shape.

Since there is still no new commit on this PR, I’m treating this as a reply-only follow-up rather than opening a new full review round on the same revision. If @zhiliang-wu pushes a small test-only follow-up for the ordering assertion / production-path coverage, I’m happy to re-review the updated head.

@zhiliang-wu

Copy link
Copy Markdown
Contributor Author

Thanks for the great feedback! You made excellent points regarding the tests only verifying a copy of the logic
I've pushed a new commit to address these issues:

  1. Tests Now Drive Production Logic: To fix this without requiring mocked MaxCompute network sessions, I refactored MaxcomputeSourceSplitEnumerator. The core chunking logic has been extracted out of discoverySplits() into a package-private static method computeSplits(). All unit tests now invoke MaxcomputeSourceSplitEnumerator.computeSplits(...) directly, guaranteeing we are testing the actual production path.

  2. Added Ascending Order Regression Guard: I've updated MaxcomputeSourceSplitTest (specifically testComputeSplitsAscendingRowStartOrder) to strictly assert that the splits assigned to any given reader for any given table arrive in purely ascending rowStart order. If someone accidentally swaps the LinkedHashSet back out for a HashSet in the future, the test will immediately fail the build.

@DanielLeens DanielLeens left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. I re-reviewed the latest head from split discovery through assignment, and this looks good to merge from the current source state.

What this PR fixes

  • User pain: The old MaxCompute split discovery grouped chunks by table order, so the first readers could get a skewed run of chunks from large tables.
  • Fix approach: The PR extracts split computation into computeSplits() and uses a global chunkIndex to assign owners round-robin across all tables.
  • One-line summary: Split assignment is now interleaved across tables instead of being clustered per table.

Runtime path I traced

Split discovery
  -> `discoverySplits()` computes record counts
  -> `computeSplits()` walks all tables and increments one global `chunkIndex`
  -> `ownerReader = chunkIndex % numReaders`
  -> pending assignments are filled from that interleaved split set

Result
  -> ownership is distributed across tables on the normal enumerator path

Key findings

  • The normal enumerator path hits this change directly.
  • Ownership is no longer implicitly tied to per-table chunk grouping.
  • The updated test now checks the behavior that actually matters.

Blocking items
None at the source-code level on the current head.

Other review notes

  • No additional non-blocking source-level comments from this round.

Compatibility and side effects

  • Compatibility: Fully backward compatible. This only changes split distribution strategy.
  • Side effects: No meaningful resource-model downside from the new owner calculation.
  • Error handling / logging: No new error-handling concern from this diff.
  • Tests: The updated test now exercises ordering / distribution semantics instead of only counting splits.
  • Docs: No docs change is required.
  • CI: The current head build is green.

Merge verdict

  • I do not see a blocker on the current head. The round-robin split assignment now looks deterministic and correctly exercised by the updated test.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants