[Improve][Connector-V2] Migrate Kafka Source imperative validation to declarative OptionRule#11157
[Improve][Connector-V2] Migrate Kafka Source imperative validation to declarative OptionRule#11157nzw921rx wants to merge 5 commits into
Conversation
DanielLeens
left a comment
There was a problem hiding this comment.
Thanks for working on this! I re-reviewed the current head from the full config-validation path down to the runtime parsing path, and I found one blocker that still needs to be addressed before merge.
What this PR fixes
- User pain: Kafka source validation is being moved to declarative option rules, but nested multi-table configs can still carry incomplete start-mode settings into runtime.
- Fix approach: The PR migrates the main validation path into
KafkaSourceFactory.optionRule()and addsTableConfigsValidatorfortables_configs. - One-line summary: The migration direction is correct, but the nested table-config validator still misses required
timestamp/offsetschecks.
Runtime path I traced
Job submission
-> KafkaSourceFactory.optionRule()
-> TableConfigsValidator.evaluate()
-> TIMESTAMP only rejects negative values when `start_mode.timestamp` is present
-> SPECIFIC_OFFSETS only rejects empty maps when `start_mode.offsets` is present
Runtime startup
-> KafkaSourceConfig.parseSourceConfig()
-> TIMESTAMP branch calls `readonlyConfig.get(START_MODE_TIMESTAMP)` directly
-> SPECIFIC_OFFSETS branch calls `readonlyConfig.get(START_MODE_OFFSETS)` and iterates it
Result
-> the nested config can pass submission-time validation and still fail on the main startup path
Key findings
- This change is on the normal multi-table Kafka submission path.
- The PR currently validates value shape, but not the presence of required nested start-mode fields.
- Runtime parsing still assumes those fields exist, so the gap is observable on the main startup path.
Blocking items
- Nested
tables_configsstill misses required start-mode fields- Location:
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java:138-167; seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java:187-230 - Why this is a problem: This is on the main submission-to-runtime path. The nested validator only rejects negative or empty values when they are present, but runtime parsing still treats
start_mode.timestampandstart_mode.offsetsas mandatory. - Risk: A valid-looking multi-table job can pass submission-time validation and then fail on source startup.
- Suggested fix: Make the nested validator require
start_mode.timestampforTIMESTAMPand require a non-emptystart_mode.offsetsmap forSPECIFIC_OFFSETS, then add regression tests for both cases.
- Location:
Other review notes
- No additional non-blocking source-level comments from this round.
Compatibility and side effects
- Compatibility: Backward compatible at the API/config-name level, but submission-time and runtime validation still disagree on required nested fields.
- Side effects: The main impact is operational: the validation gap shifts a deterministic config error from submission time to runtime startup.
- Error handling / logging: This should fail as an
OptionValidationExceptionduring submission instead of surfacing later with a weaker runtime error context. - Tests: Current tests cover negative values and empty maps, but they do not cover the missing nested timestamp / offsets cases that trigger the real gap.
- Docs: No user-facing documentation change is required here.
- CI: Checks were still queued when I reviewed. The blocker above comes from the source path itself, not from CI noise.
Merge verdict
- From the current head, I see a real source-level blocker. Please fix the nested
tables_configsvalidation gap before this merges.
…e OptionRule Migrate 4 imperative if/throw checks in KafkaSourceConfig.createConsumerMetadata() to declarative Conditions in KafkaSourceFactory.optionRule(): - start_mode.timestamp >= 0 (greaterOrEqual when start_mode=TIMESTAMP) - start_mode.end_timestamp >= 0 (greaterOrEqual, optional value constraint) - start_mode.offsets non-empty map (mapNotEmpty when start_mode=SPECIFIC_OFFSETS) - partition-discovery.interval-millis > 0 (greaterThan when ignore_no_leader_partition=true) Runtime-only checks (> currentTimestamp) are kept in place with comments. Related apache#11007
…config validation
fdf97b3 to
8626980
Compare
DanielLeens
left a comment
There was a problem hiding this comment.
Thanks for the update. I re-reviewed the latest head from the full Kafka config-validation path down to the runtime source startup path, and I still see one blocker on the current version.
What this PR solves
- User pain: Kafka source validation is being migrated to declarative option rules so invalid configs can fail during submission instead of later at runtime.
- Fix approach: move the main validation into
KafkaSourceFactory.optionRule()and add a nested validator for multi-table entries. - One-line summary: the migration direction is good, but the nested multi-table validation still does not fully match the runtime contract.
Runtime path I checked
Job submission
-> KafkaSourceFactory.optionRule()
-> TableConfigsValidator.evaluate()
-> TIMESTAMP only rejects negative values when `start_mode.timestamp` is present
-> SPECIFIC_OFFSETS only rejects empty maps when `start_mode.offsets` is present
Job startup
-> KafkaSourceConfig.createConsumerMetadata()
-> TIMESTAMP branch reads `start_mode.timestamp` directly
-> SPECIFIC_OFFSETS branch reads `start_mode.offsets` directly
-> both paths still treat those fields as required
Findings
Issue 1: nested multi-table validation still misses required start-mode fields
- Location:
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java,seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java - Why this is a problem:
The nested validator only checks value shape when the field is present. But the runtime path still assumesstart_mode.timestampexists forTIMESTAMPandstart_mode.offsetsexists forSPECIFIC_OFFSETS. - Risk:
A multi-table job can pass submission-time validation and still fail on the normal source startup path. - Better fix:
Make the nested validator requirestart_mode.timestampforTIMESTAMPand require a non-emptystart_mode.offsetsmap forSPECIFIC_OFFSETS, then add regression tests for both missing-field cases. - Severity: High
- Already raised by others: No
Compatibility / side effects
- API/config names remain compatible.
- The blocker is behavioral: submission-time validation and runtime parsing are still out of sync on the main multi-table path.
Tests / stability / CI
- The new tests cover negative values and empty maps, but they still miss the real gap: missing nested timestamp / offsets fields.
- The tests themselves look stable; the issue is coverage, not flakiness.
- I am not relying on CI for this conclusion. The blocker comes directly from the source path above.
Merge conclusion
Conclusion: can merge after fixes
- Blocking items
- Issue 1: align the nested
tables_configsvalidation with the runtime contract by making the required timestamp / offsets fields actually required.
- Suggested but non-blocking follow-up
- None beyond that blocker from the path I reviewed.
Happy to take another pass once the nested required-field validation is fully aligned with the runtime startup path.
…alidation in Kafka MultiTable
DanielLeens
left a comment
There was a problem hiding this comment.
Thanks for working through the follow-up comments here. I re-reviewed the latest head from the full Kafka config-validation path down to the runtime startup path. The earlier missing-condition gap is now addressed, and the ignore_no_leader_partition thread is resolved from my side. I did find one remaining blocker on the current revision though.
What this PR solves
- User pain: Kafka source validation is being moved from scattered runtime checks into declarative
OptionRulevalidation so invalid configs fail earlier at submission time. - Fix approach: the PR adds declarative validators for
start_mode,ignore_no_leader_partition, and nestedtables_configs. - One-line summary: the migration direction is good, but the latest validators now reject
timestamp = 0, which was previously accepted by the actual runtime path.
Simple example
- Before this PR, a config like
start_mode = TIMESTAMPwithstart_mode.timestamp = 0could still start, because the runtime only rejected future timestamps. - On the current head, the same config is rejected during submission by the new declarative validators.
- So this is not just “stricter validation”; it is a real compatibility tightening on an existing user path.
Runtime path I checked
Job submission
-> KafkaSourceFactory.optionRule() [KafkaSourceFactory.java:60-92]
-> KafkaStartModeValidator.evaluate() [KafkaSourceFactory.java:125-146]
-> KafkaTableConfigsValidator.evaluate() [KafkaSourceFactory.java:148-193]
-> TIMESTAMP currently requires the timestamp to be > 0
Job startup
-> KafkaSourceConfig.createConsumerMetadata() [KafkaSourceConfig.java:176-235]
-> TIMESTAMP branch reads start_mode.timestamp directly [KafkaSourceConfig.java:187-210]
-> runtime still only rejects timestamps greater than the current time
Result
-> a previously accepted boundary value (0) is now rejected at submission time
I also want to explicitly note that @CosmosNi's earlier thread about the missing nested condition is addressed in the latest head. My remaining blocker is different: the new declarative validation is now stricter than the historical runtime contract.
Blocking items
timestamp = 0is now rejected even though the runtime path previously allowed it- Location:
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java:135-142seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java:167-179seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java:187-210seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/KafkaFactoryTest.java:143-174
- Why this is a problem:
The new declarative validators require> 0, while the runtime path only rejects future timestamps. That means the PR tightens an existing config contract on the normal submission path. - Risk:
Existing jobs or user configs that rely on0as the epoch boundary can now fail before startup. This is a real backward-compatibility regression. - Best fix:
- Option A (recommended): make the declarative validators accept
>= 0so they stay aligned with the existing runtime behavior, and add a regression test for the zero case. - Option B: if the project intentionally wants
> 0, then the runtime path, docs, and incompatible-change docs all need to be updated together. I do not think that is the right fit for this PR.
- Option A (recommended): make the declarative validators accept
- Severity: High
- Already raised by others: No. This is related to @CosmosNi's earlier validation-alignment point, but it is a different compatibility edge.
- Location:
Tests / stability
- The new unit tests look structurally stable: they are pure in-memory validation tests with no sleeps, no ports, no containers, and no async timing assumptions.
- The gap is coverage, not flakiness:
KafkaFactoryTestdoes not currently assert thetimestamp = 0boundary.
CI
- The latest Build is green on the current head.
- My blocker here is source-level compatibility, not CI.
Merge conclusion
Conclusion: can merge after fixes
- Blocking items
- Issue 1: align the declarative timestamp boundary with the existing runtime contract instead of rejecting
0.
- Suggested but non-blocking follow-up
- No additional non-blocking items from this round.
Happy to take another pass once the timestamp boundary is brought back into alignment.
…ation judgment logic
DanielLeens
left a comment
There was a problem hiding this comment.
Thanks for the follow-up commit. I re-reviewed the latest head from the full Kafka source validation path down to the runtime startup path again.
First, I want to be explicit about what is already fixed on this revision:
- the earlier missing
ignore_no_leader_partitioncondition is resolved - the earlier
start_mode.timestamp = 0compatibility blocker is also resolved on the current head - +1 to @CosmosNi's earlier validation-alignment concern in the thread; that thread itself is no longer my blocker on this revision
I do still see one remaining compatibility edge before merge, though.
What this PR solves
- User pain: Kafka source validation is being migrated from scattered runtime checks into declarative
OptionRulevalidation so invalid configs fail earlier at submission time. - Fix approach: the PR moves validation into
KafkaSourceFactory.optionRule()and adds dedicated validators forstart_mode,tables_configs, andignore_no_leader_partition. - One-line summary: the migration direction is good, but the latest head still tightens one existing timestamp boundary that the historical runtime path allowed.
Simple example
- Before this PR, a config with
start_mode = TIMESTAMPandstart_mode.end_timestamp = 0could still enter the runtime path, because the runtime only rejected timestamps greater than the current time. - On the current head, the top-level declarative validator rejects
start_mode.end_timestamp = 0during submission. - So this is still a real compatibility tightening on an existing user path.
Runtime chain I checked
Job submission
-> KafkaSourceFactory.optionRule() [KafkaSourceFactory.java:55-92]
-> optional(start_mode.end_timestamp, Conditions.greaterThan(..., 0L)) [KafkaSourceFactory.java:68-70]
-> start_mode.timestamp validator now correctly accepts >= 0 [KafkaSourceFactory.java:135-142]
-> nested tables_configs validator now correctly accepts >= 0 for timestamps [KafkaSourceFactory.java:167-179]
Job startup
-> KafkaSourceConfig.createConsumerMetadata() [KafkaSourceConfig.java:180-230]
-> TIMESTAMP branch reads start_mode.timestamp [KafkaSourceConfig.java:187-198]
-> optional end timestamp is only rejected when it is greater than current time [KafkaSourceConfig.java:199-209]
Result
-> start_mode.timestamp = 0 is aligned now
-> top-level start_mode.end_timestamp = 0 is still rejected earlier than the historical runtime path
Findings
Issue 1: top-level start_mode.end_timestamp = 0 is still rejected even though the historical runtime path allowed it
- Location:
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java:68-70seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java:199-209seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/KafkaFactoryTest.java:120-174
- Why this is a problem:
The runtime path now only rejects end timestamps greater than the current time, which means0remained historically acceptable. But the new top-level declarative validator still enforces> 0, so the migration is not fully aligned with the previous runtime contract yet. - Risk:
Existing jobs or configs that rely on the epoch boundary forstart_mode.end_timestampcan now fail at submission time on the normal path. - Best fix:
- Option A (recommended): align the declarative validator with the runtime contract by accepting
>= 0here as well, then add an explicit regression test for the zero end-timestamp case. - Option B: if the project intentionally wants
> 0, then the runtime behavior, tests, docs, and incompatible-change notes all need to be updated together. I do not think this PR should expand scope that way.
- Option A (recommended): align the declarative validator with the runtime contract by accepting
- Severity: High
- Already raised by others: This is related in spirit to @CosmosNi's earlier validation/runtime alignment point, but it is a different remaining compatibility edge on the latest head.
Issue 2: the validator/user-facing wording is now out of sync with the actual accepted boundary
- Location:
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java:127-129seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java:152-154
- Why this is a problem:
The descriptions still say the timestamp must be “greater than 0”, while the implementation now accepts0for both top-levelstart_mode.timestampand nestedtables_configstimestamps. - Risk:
This is not a runtime blocker, but it does make validation guidance and failure messaging harder to trust. - Best fix:
Update the validator descriptions so they describe the real accepted boundary (>= 0). - Severity: Low
- Already raised by others: No
Tests / stability
- The new UTs look structurally stable: no sleeps, ports, containers, or async timing.
- The gap is still coverage, not flakiness. There is no regression test yet for the top-level
start_mode.end_timestamp = 0case.
CI
- The latest
Buildcheck on this head is currently cancelled, but my blocker here is source-level compatibility rather than CI state.
Merge conclusion
Conclusion: can merge after fixes
- Blocking items
- Issue 1: align the top-level
start_mode.end_timestampvalidator with the historical runtime contract instead of still rejecting0.
- Suggested but non-blocking follow-up
- Issue 2: update the validator description strings so they match the real accepted boundary.
Happy to re-review once the last timestamp-boundary mismatch is brought fully into alignment.
DanielLeens
left a comment
There was a problem hiding this comment.
Thanks for the follow-up commit. I re-reviewed the latest head from the Kafka source declarative validation path down to the runtime startup path again.
First, I want to be explicit about what is now closed on the current revision:
- the earlier
ignore_no_leader_partitionvalidation gap is fixed - the earlier
start_mode.timestamp = 0compatibility mismatch is fixed - the remaining top-level
start_mode.end_timestamp = 0mismatch is also fixed now viagreaterOrEqual(...) - +1 to @CosmosNi's earlier validation/runtime alignment concern; that thread is resolved from my side on the current head
What this PR solves
- User pain: Kafka source validation is being migrated from scattered runtime checks into declarative
OptionRulevalidation so invalid configs fail at submission time instead of later at startup. - Fix approach: the PR moves the key checks into
KafkaSourceFactory.optionRule()and adds dedicated validators for start mode, nested multi-table configs, andignore_no_leader_partition. - One-line summary: the migration now matches the historical runtime contract on the paths I traced, and I do not see a remaining source-level blocker on the current head.
Runtime chain I rechecked
Job submission
-> KafkaSourceFactory.optionRule()
-> top-level START_MODE_END_TIMESTAMP uses greaterOrEqual(..., 0)
-> START_MODE validator accepts timestamp >= 0 and non-empty specific offsets
-> TABLE_CONFIGS / TABLE_LIST validator checks nested TIMESTAMP and SPECIFIC_OFFSETS entries
-> IGNORE_NO_LEADER_PARTITION validator enforces positive partition discovery interval
Job startup
-> KafkaSourceConfig.createConsumerMetadata()
-> TIMESTAMP branch still rejects only future timestamps
-> SPECIFIC_OFFSETS branch reads the validated offsets map
Findings
I do not see a blocking correctness issue on the current revision.
One small non-blocking follow-up:
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java:127-129
The validator description still saysstart_mode.timestampmust be "greater than 0", while the implementation now correctly accepts>= 0. I would update the wording so the guidance matches the real boundary.
Tests / stability
- The added UTs are aligned with the new declarative-validation path.
- Stability rating: Stable.
- Reason: the tests are pure config-validation checks without sleeps, timing, ports, or container ordering dependencies.
Merge conclusion
Conclusion: can merge
- Blocking items
- None from my source-level re-review on the current head.
- Suggested non-blocking follow-up
- Update the validator description string so it matches the actual accepted timestamp boundary.
Overall, the current revision closes the validation/runtime alignment gaps I was tracking earlier. From my side this is ready.
|
Thanks for the update. I rechecked the current head against the latest
Because the PR is still behind the latest base, any CI result and merge gate signal here is mixed with upstream drift. The lowest-cost next step is to sync with the latest |
Purpose of this pull request
Related #11007
if/throwchecks inKafkaSourceConfig.createConsumerMetadata()to declarativeConditionsinKafkaSourceFactory.optionRule()greaterOrEqual(START_MODE_TIMESTAMP, 0L)constraint whenstart_mode = TIMESTAMPgreaterOrEqual(START_MODE_END_TIMESTAMP, 0L)optional value constraintmapNotEmpty(START_MODE_OFFSETS)constraint whenstart_mode = SPECIFIC_OFFSETSKEY_PARTITION_DISCOVERY_INTERVAL_MILLISconditional-required withgreaterThan(..., 0L)value constraint whenignore_no_leader_partition = true> currentTimestamp) with explanatory commentsDoes this PR introduce any user-facing change?
Yes — previously invalid configurations (negative timestamps, empty offsets map) were rejected at task execution time with generic
IllegalArgumentException. Now they are rejected earlier at job submission time with structuredOptionValidationExceptionincluding option name, constraint type, and violation details.How was this patch tested?
Unit tests in
KafkaFactoryTest:testValidTimestampConfig— valid TIMESTAMP mode passes validationtestNegativeTimestampRejected— negativestart_mode.timestampis rejectedtestNegativeEndTimestampRejected— negativestart_mode.end_timestampis rejectedtestValidSpecificOffsetsConfig— valid SPECIFIC_OFFSETS mode passes validationtestEmptyOffsetsMapRejected— empty offsets map is rejected