[Fix][Connector-V2] Fix TiDB CDC silent row event loss when resolvedT…#11113
[Fix][Connector-V2] Fix TiDB CDC silent row event loss when resolvedT…#11113zhaoysg wants to merge 7 commits into
Conversation
…s advances ahead of prewrite replay (apache#11013) The TiDB CDC source could silently drop row change events while `resolvedTs` kept advancing, as reported in apache#11013. The root cause is in the CDC streaming reader: `captureStreamingEvents` advanced `resolvedTs` to `cdcClient.getMaxResolvedTs()` at the end of every batch, but the corresponding commit/prewrite pair was consumed via `commits.pollFirstEntry().getValue()` without verifying that the matching prewrite row had actually been buffered. When a commit row arrived before its prewrite (which happens during region split, rebalance, or under backpressure), the commit was discarded while `resolvedTs` was still pushed forward past the commit timestamp, causing the engine to skip subsequent real events at that timestamp without any error. This commit fixes the issue and adds minimal diagnostic logging to help users diagnose similar scenarios in the future. Changes ------- `TiDBSourceReader.captureStreamingEvents` / `flushRows` - `flushRows` now returns a `safeResolvedTs`. If a commit row has no matching prewrite row in `preWrites` yet, the function stops advancing `resolvedTs` to the commit timestamp and holds the commit in the queue, so the next batch can re-pair it with the late prewrite. - `flushRows` now only `pollFirstEntry()`s a commit row after confirming its prewrite is present, fixing the previous poll-then-leak behaviour. - The batch loop distinguishes between a "no row right now" return from `cdcClient.get()` and a "resolvedTs advanced but no row" case, and no longer exits early when only `resolvedTs` moved. `TiDBSourceReader.snapshotEvents` - The snapshot scan no longer relies on a single `KVClient.scan` call over the whole `keyRange`. It now uses a `RegionStoreClient` per region and walks forward using `region.getEndKey()`, which is correct across region boundaries and stops cleanly when the scan reaches the end of the requested key range. - Adds `scannedCount` / `emittedCount` log on snapshot completion. `TableKeyRangeUtils` - `isRecordKey` now guards against short keys (`key.length > 10`) to avoid `ArrayIndexOutOfBoundsException` on truncated payloads. - Switches from Guava `ImmutableList` to JDK collections to keep the module free of Guava dependencies on this hot path. `TiDBSourceSplitEnumerator` - Adds structured INFO logs for enumerator open, split enumeration, and per-reader split assignment to make split distribution observable in production. `TiDBSourceReader` - Adds `[TiDB-CDC-DIAG]` streaming stats logging (every 10s or when a single batch exceeds 1s) exposing `resolvedLagMs`, polled/emitted/committed counters, and pull/flush/emit latencies. This is the primary signal users can use to detect a prewrite/commit imbalance in the future. Fixes apache#11013
…flush diagnostics This is a follow-up to apache#11013 (Fix TiDB CDC silent row event loss when resolvedTs advances ahead of prewrite replay). The original fix patched `flushRows` so that a commit row without a matching prewrite would be held back instead of being discarded with `resolvedTs` advanced past it. In production we still saw the same class of "silent event loss" in narrower windows, and there was no first-class log to correlate with the JDBC sink, so the failure mode was still hard to attribute. This commit hardens the source-side loop and adds a paired diagnostic channel on the JDBC sink so the same symptom can be observed end-to-end. Bug --- After apache#11013, the streaming loop in `captureStreamingEvents` still treated a "row == null" return as an unconditional batch boundary because the poll-attempt cap equalled `batchSize`. Concretely: 1. `cdcClient.get()` returns `null` while `getMaxResolvedTs()` has already advanced. Under region split / rebalance / backpressure the loop hits this branch many times in a row. 2. The first such return set `resolvedTsAdvances++` and continued, but the cap of `config.getBatchSize()` poll attempts meant that the next batch could still be terminated by another `null` return (the test we used on `currentMaxResolvedTs != beforeGetResolvedTs` was true the *previous* iteration, not the current one). 3. `handleRow` returned `void`, so non-record keys (index keys) were silently counted in the same `polledRows` counter as real data rows, hiding how much of the budget was actually being spent on work that emits nothing. The end result: `resolvedTs` could be pushed forward by `flushRows` without the corresponding commit/prewrite pair ever being drained, and the user had no log counter that showed the imbalance. Fix --- `TiDBSourceReader.captureStreamingEvents` - Introduce `maxPollAttempts = max(batchSize, batchSize * 10)` so the loop keeps draining metadata-driven empty polls (resolvedTs-only advances and sub-50ms returns) instead of exiting early, while still bounding worst-case work per batch. - Distinguish three cases for `cdcClient.get() == null`: (a) resolvedTs advanced since the last poll -> pure metadata, count and continue (this was already there, kept); (b) the single poll finished in < 50ms with no resolvedTs advance -> metadata-dominant batch, count and continue (new); (c) a "real" empty poll -> break. - `handleRow` now returns `boolean`; non-record keys return `false` and are counted in a new `ignoredRows` metric, while real data rows still increment `polledRows`. The streaming stats log now exposes `ignoredRows` alongside `polledRows` / `emittedRows`. `TiDBSourceReader` streaming stats log - Adds `metadataEvents`, `emptyPolls`, `pollAttempts`, and `maxPollAttempts` to the existing `[TiDB-CDC-DIAG]` log, so an operator can see at a glance whether a batch was dominated by metadata traffic (a likely companion of the original symptom). `JdbcOutputFormat` - Adds a parallel `[JDBC-SINK-DIAG]` flush-success log, emitted every 10s or whenever a single `attemptFlush()` exceeds 1s, with `records`, `totalFlushedRecords`, `flushCostMs`, and the retry `attempt` index. This gives the JDBC side the same observability that `[TiDB-CDC-DIAG]` gives the source, so the two can be correlated when investigating an end-to-end "rows missing on the sink" report. No public API, config option, or default value is changed. Verification ------------ - `./mvnw spotless:apply` on both modules - `./mvnw -q -DskipTests verify` on both modules (BUILD SUCCESS) - `./mvnw test` on both modules * connector-jdbc: 712 tests, 0 failures, 0 errors, 10 skipped * connector-cdc-tidb: 1 test, 0 failures, 0 errors Refs apache#11013 Co-authored-by: Cursor <cursoragent@cursor.com>
Empty commit to retrigger GitHub Actions on this PR after the "Workflow run detection failed" issue on the initial push. No code change. Co-authored-by: Cursor <cursoragent@cursor.com>
DanielLeens
left a comment
There was a problem hiding this comment.
Thanks for working on this. I reviewed the latest head from the real TiDB CDC snapshot/streaming reader path, not just from the new log lines.
What this PR fixes
- User pain: when
resolvedTsadvances ahead of the matching prewrite replay, the old streaming path can process commits too aggressively and lose the chance to emit the corresponding row correctly. The snapshot path was also brittle around region-based scanning. - Fix approach: the PR rewrites the TiDB reader's snapshot loop, hardens the streaming poll loop, and changes
flushRows(...)so an unmatched commit is held back with a loweredsafeResolvedTsinstead of being consumed immediately. - One-line summary: the direction is reasonable, but this is a data-correctness fix on the main CDC path, and I still need a direct regression proof for the key reordered-event scenario before I can accept it.
Actual runtime path
TiDBSourceReader.pollNext(...) [TiDBSourceReader.java:145-159]
-> snapshotEvents(...) for INITIAL startup [162-219]
-> captureStreamingEvents(...) [221-305]
-> CDCClient.get()
-> handleRow(...) [363-387]
-> PREWRITE / COMMIT / COMMITTED stored into preWrites / commits
-> flushRows(resolvedTs) [389-410]
-> old path: poll commit immediately once commitTs <= resolvedTs
-> new path: if matching prewrite is still missing, keep the commit,
lower safeResolvedTs, and stop advancing past that point
Main findings
- The core idea in
flushRows(...)is correct: do not permanently consume a commit before its matching prewrite is available. - This PR is not a tiny one-line guard. It changes the snapshot loop, the streaming poll loop, the resolved-ts advancement behavior, and also adds unrelated JDBC sink diagnostics.
- For a CDC data-correctness fix, the current version still misses the most important proof: a deterministic regression test that reproduces "commit visible first, prewrite arrives later, row is still emitted correctly".
Blocking issue
- The PR still lacks a focused regression test for the exact reordered-event path it claims to fix.
- Location:
TiDBSourceReader.java:221-410 - Why this matters:
this is the real hot path for the claimed fix.captureStreamingEvents(...) -> handleRow(...) -> flushRows(...)now has different semantics around commit retention and resolved-ts advancement, but the PR does not add a reader-level or behavior-level test that proves the new logic works when the matching prewrite arrives after the commit has already been seen. - Risk:
this is a CDC correctness path. Without a targeted regression, we are still trusting a fairly large rewrite on reasoning alone. - Recommended fix:
Option A: add a focused reader/integration regression test that drives the sequence "commit first, resolvedTs advances, prewrite later" and asserts the row is emitted on the next round.
Option B: if the current harness cannot drive the whole reader easily, at least add a smaller behavior test aroundhandleRow(...)/flushRows(...)state transitions. - Severity: High
- Location:
Non-blocking issue
2. The JDBC sink diagnostics broaden this PR beyond the minimal TiDB CDC fix.
- Location:
JdbcOutputFormat.java:45-192 - Why this matters:
the PR title and the main runtime fix are about TiDB CDC source correctness, but the diff also includes JDBC sink flush diagnostics. That extra scope makes the review and CI attribution noisier than necessary. - Suggested follow-up:
ideally split the JDBC diagnostics into a separate PR, or at least explain very clearly in the PR description why they must ship together. - Severity: Medium
Tests and CI
- I do not see a new targeted TiDB CDC regression test in this PR.
Buildis currently red, and the failing jobs areupdated-modules-integration-test-part-1 (11, ubuntu-latest)andupdated-modules-integration-test-part-2 (11, ubuntu-latest).- Those are much closer to the touched modules than a random unrelated job, so without deeper log evidence I cannot safely dismiss them as unrelated noise.
Conclusion: fix required before merge
Blocking items (must fix)
- Issue 1 above: please add a focused regression proof for the reordered
resolvedTs/ prewrite / commit path before merge.
Suggested follow-up (non-blocking)
- Issue 2 above: consider splitting the JDBC sink diagnostics out of this PR.
Overall assessment
- The core fix direction makes sense.
- What is still missing is the proof layer. For a CDC row-loss bug, that proof should come from a targeted regression, not just from a broad code rewrite plus generic CI.
- Once the regression is in place and the red
Buildjobs are explained, I am happy to re-review the latest head.
|
感谢详细 review。 我已经补了一个针对 TiDB CDC 乱序事件路径的聚焦回归测试: 这个测试覆盖 review 中提到的关键状态转换:
本地验证:
以上两个命令均通过。 我也尝试了全仓 |
DanielLeens
left a comment
There was a problem hiding this comment.
Thanks for the follow-up and for adding the focused regression tests. I re-reviewed the latest head from scratch.
Sorry for missing this in the last review: the JDBC side here is not just extra diagnostics. Looking at the current head and the still-red changed-module CI, I should treat that part as a blocker on this version as well.
The new TiDBSourceReaderTest#flushRowsShouldHoldCommitUntilMatchingPrewriteArrives and TiDBSourceSplitEnumeratorTest do close the specific proof gap I called out earlier for the reordered commit/prewrite path and the same-reader split overwrite path. So the TiDB reader fix itself is in much better shape now.
What this PR solves
- Pain point: the TiDB CDC source could silently lose a row when
COMMITarrived first,resolvedTsmoved past it, and the matchingPREWRITEonly showed up later. The enumerator also overwrote pending splits when multiple returned splits mapped to the same reader. - Approach: keep unmatched commits in
flushRows(...), lower the saferesolvedTs, store pending splits as lists per reader, and add focused unit coverage. - One-line summary: the TiDB-side fix direction is right, but the current head still has one checkpoint-compatibility break and one JDBC-scope / CI blocker before merge.
Full runtime path I rechecked
TiDB source main path
-> TiDBSourceReader.pollNext(...) [TiDBSourceReader.java:145-159]
-> snapshotEvents(...) [162-219]
-> captureStreamingEvents(...) [221-305]
-> CDCClient.get()
-> handleRow(...) [363-387]
-> COMMIT / PREWRITE / COMMITTED buffered into commits / preWrites
-> flushRows(resolvedTs) [389-410]
-> if the earliest commit has no matching prewrite yet
keep the commit in the map
lower safeResolvedTs to commitTs - 1
stop advancing past that point
-> otherwise poll the commit and emit the matched prewrite row
checkpoint / restore path
-> TiDBSourceSplitEnumerator.snapshotState(...) [225-228]
-> TiDBSourceCheckpointState.pendingSplit
-> SeaTunnelSource.getEnumeratorStateSerializer() [SeaTunnelSource.java:120-121]
-> DefaultSerializer.serialize(...) [DefaultSerializer.java:28-41]
-> restore
-> DefaultSerializer.deserialize(...)
-> TiDBSource.restoreEnumerator(...) [TiDBSource.java:136-146]
JDBC sink path included by this PR
-> JdbcSinkWriter / JdbcExactlyOnceSinkWriter
-> JdbcOutputFormat.writeRecord(...) [105-118]
-> batchSize or batchIntervalMs can now trigger flush()
Main findings
- The TiDB reader now correctly covers the core reordered-event case I asked for earlier.
- The new enumerator state shape is on the checkpoint / restore path, not just an in-memory refactor.
- The JDBC change is not logging-only. It changes
JdbcOutputFormat.writeRecord(...)behavior on the normal sink path. - The current
Buildcheck is still red, and two of the failed jobs are JDBC E2E jobs in a module touched by this PR.
Findings
Issue 1: changing TiDBSourceCheckpointState.pendingSplit in place breaks restore compatibility for existing checkpoints / savepoints
- Location:
seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/enumerator/TiDBSourceCheckpointState.java:35-38seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java:120-121seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java:28-41seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java:103-106
- Why this is a problem:
pendingSplitchanged fromMap<Integer, TiDBSourceSplit>toMap<Integer, List<TiDBSourceSplit>>.- This object is serialized as the enumerator checkpoint state through the default serializer path. There is no custom versioned serializer and no backward-compatibility branch on restore.
- So this is not just a runtime structure cleanup. It changes the persisted checkpoint schema for a running unbounded CDC source.
- Risk:
- Existing TiDB CDC jobs restored from an older checkpoint/savepoint can fail to deserialize or otherwise fail to recover cleanly.
- That is a checkpoint-main-path compatibility break.
- Best fix:
- Option A: add a versioned custom serializer for the enumerator state and explicitly migrate the old
Map<Integer, TiDBSourceSplit>shape into the new list-based shape. - Option B: keep backward-compatible restore handling for the old state layout before normalizing it.
- If you intentionally keep this as an incompatible upgrade, it also needs to be documented in both incompatible-changes docs with migration guidance.
- Option A: add a versioned custom serializer for the enumerator state and explicitly migrate the old
- Severity: High
- Raised by others already: No
Issue 2: the JDBC side is a real sink-behavior change, not just diagnostics, and the changed-module CI is still red
- Location:
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormat.java:105-113seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormat.java:236-239- current failed jobs on this head:
updated-modules-integration-test-part-1->connector-jdbc-e2e-ddl->SqlServerSchemaChangeIT ... ConditionTimeoutupdated-modules-integration-test-part-2->connector-jdbc-e2e-part-1->JdbcDb2UpsertIT.testDb2UpsertE2e
- Why this matters:
writeRecord(...)now flushes not only onbatchSize, but also onbatchIntervalMs.- That changes the normal JDBC sink execution path and retry / transaction timing. It is broader than “paired diagnostics for troubleshooting”.
- On the current head, the visible
Buildcheck is still red, and two failed jobs are JDBC E2E jobs in a module touched by this change. From the current evidence I cannot safely dismiss them as unrelated noise.
- Risk:
- This can change flush frequency and transaction boundaries across JDBC sinks, including exactly-once and schema-change paths.
- Merging it together with the TiDB fix keeps the PR scope noisy and leaves the changed-module CI unresolved.
- Best fix:
- Option A: split the JDBC change into a separate PR and keep this PR focused on the TiDB CDC fix.
- Option B: if the JDBC change must stay here, add focused JDBC coverage for the new
batchIntervalMspath and get the JDBC E2E failures green (or provide a concrete, reproducible proof that they are unrelated).
- Severity: High
- Raised by others already: Partially. I mentioned the JDBC scope concern in the previous round, but I should have called out the runtime behavior change and the still-red changed-module CI more explicitly.
Tests and CI
- The new TiDB unit tests are useful and stable:
TiDBSourceReaderTest.java:42-68is deterministic and directly covers the reorderedCOMMIT -> flush -> PREWRITE -> flushpath.TiDBSourceSplitEnumeratorTest.java:45-68is also deterministic and covers the same-reader multi-split reassignment case.
- Test stability rating for the newly added UTs: Stable
- No
sleep, no time-based assertions, no port allocation, no shared static state, no order-sensitive collection assertions.
- No
- Coverage gaps still left:
- no restore-compatibility test for the new enumerator checkpoint shape
- no JDBC test for the new
batchIntervalMsflush path
- Current visible
Buildfailures on head53d729c94bbff089113ebfa379be4d8957966aa1:connector-jdbc-e2e-ddl:SqlServerSchemaChangeIT ... ConditionTimeoutconnector-jdbc-e2e-part-1:JdbcDb2UpsertIT.testDb2UpsertE2econnector-cdc-mysql-e2e:MysqlCDCWithSchemaChangeIT
- MySQL CDC looks unrelated to the files changed here, but the two JDBC E2E failures land on a touched module, so I would not wave those away on this version.
Merge conclusion
Conclusion: can merge after fixes
- Blockers
- Issue 1: please make the enumerator checkpoint state backward-compatible before merge.
- Issue 2: please either split the JDBC change out, or prove it with tests and green JDBC E2E on this head.
- Non-blocking suggestions
- No extra non-blocking items from this round. The two issues above are already enough to block merge.
Overall, the TiDB reader fix itself is much closer now, and the new regression proof helped a lot. The remaining blockers are around upgrade safety and PR scope control, not around the original reordered-event diagnosis anymore. Happy to re-review once you push an update.
|
Thanks for the careful re-review. I pushed an update in Changes in this update:
One clarification on the JDBC point: after checking the PR diff against the base, the Local verification: ./mvnw -DspotlessFiles=seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/enumerator/TiDBSourceCheckpointState.java,seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/enumerator/TiDBSourceSplitEnumeratorTest.java,seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormat.java spotless:check
./mvnw -pl seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb -DskipIT -Dskip.spotless=true -Dcheckstyle.skip=true -Dtest=TiDBSourceReaderTest,TiDBSourceSplitEnumeratorTest testThe focused TiDB tests pass locally: 4 tests, 0 failures. |
DanielLeens
left a comment
There was a problem hiding this comment.
Thanks for the update. I re-reviewed the latest head from the TiDB checkpoint-state restore path and the new regression coverage, and the exact PR scope now looks good to me.
What this PR fixes
- User pain: After the TiDB CDC pending-split structure changed from a single split to a list, old checkpoint state could no longer restore cleanly.
- Fix approach: The PR normalizes legacy
Map<Integer, TiDBSourceSplit>state into the newMap<Integer, List<TiDBSourceSplit>>shape in the constructor, setter, andreadObject()path. - One-line summary: Legacy checkpoint state now restores cleanly into the new pending-split structure.
Runtime path I traced
Restore path
-> Java deserialization of `TiDBSourceCheckpointState`
-> `readObject()` normalizes the pending-split map
-> legacy single-split entries are wrapped into singleton lists
Result
-> old checkpoint state stays readable after the internal shape change
Key findings
- This change is on the restore path, which is exactly where the compatibility bug lives.
- Constructor, setter, and
readObject()now all normalize state the same way. - The exact PR scope is clean again on the current head.
Blocking items
None at the source-code level on the current head.
Other review notes
- No additional non-blocking source-level comments from this round.
Compatibility and side effects
- Compatibility: Fully backward compatible — that is the main point of this fix.
- Side effects: The normalization cost is tiny and only happens on restore / state-setting paths.
- Error handling / logging: Unsupported state shapes still fail fast instead of being silently swallowed.
- Tests: The added regression coverage matches the legacy-single-split restore scenario that matters here.
- Docs: No docs update is needed.
- CI: The red checks I traced are in SQLServer/MySQL CDC schema-evolution jobs, not in the exact TiDB file scope of this PR. I would sync the latest
devand rerun CI.
Merge verdict
- From the current head I do not see a source-level blocker. The legacy checkpoint-state compatibility issue looks fixed on the exact PR files.
…s advances ahead of prewrite replay (#11013)
The TiDB CDC source could silently drop row change events while
resolvedTskept advancing, as reported in #11013. The root cause is in the CDC streaming reader:captureStreamingEventsadvancedresolvedTstocdcClient.getMaxResolvedTs()at the end of every batch, but the corresponding commit/prewrite pair was consumed viacommits.pollFirstEntry().getValue()without verifying that the matching prewrite row had actually been buffered. When a commit row arrived before its prewrite (which happens during region split, rebalance, or under backpressure), the commit was discarded whileresolvedTswas still pushed forward past the commit timestamp, causing the engine to skip subsequent real events at that timestamp without any error.This commit fixes the issue and adds minimal diagnostic logging to help users diagnose similar scenarios in the future.
Changes
TiDBSourceReader.captureStreamingEvents/flushRowsflushRowsnow returns asafeResolvedTs. If a commit row has no matching prewrite row inpreWritesyet, the function stops advancingresolvedTsto the commit timestamp and holds the commit in the queue, so the next batch can re-pair it with the late prewrite.flushRowsnow onlypollFirstEntry()s a commit row after confirming its prewrite is present, fixing the previous poll-then-leak behaviour.cdcClient.get()and a "resolvedTs advanced but no row" case, and no longer exits early when onlyresolvedTsmoved.TiDBSourceReader.snapshotEventsKVClient.scancall over the wholekeyRange. It now uses aRegionStoreClientper region and walks forward usingregion.getEndKey(), which is correct across region boundaries and stops cleanly when the scan reaches the end of the requested key range.scannedCount/emittedCountlog on snapshot completion.TableKeyRangeUtilsisRecordKeynow guards against short keys (key.length > 10) to avoidArrayIndexOutOfBoundsExceptionon truncated payloads.ImmutableListto JDK collections to keep the module free of Guava dependencies on this hot path.TiDBSourceSplitEnumeratorTiDBSourceReader[TiDB-CDC-DIAG]streaming stats logging (every 10s or when a single batch exceeds 1s) exposingresolvedLagMs, polled/emitted/committed counters, and pull/flush/emit latencies. This is the primary signal users can use to detect a prewrite/commit imbalance in the future.Fixes #11013
Purpose of this pull request
Does this PR introduce any user-facing change?
How was this patch tested?
Check list
New License Guide
incompatible-changes.mdto describe the incompatibility caused by this PR.