Skip to content

[Improve][Connector-V2] Migrate Kafka Source imperative validation to declarative OptionRule#11157

Open
nzw921rx wants to merge 5 commits into
apache:devfrom
nzw921rx:improve/kafka-source-optionrule-migration
Open

[Improve][Connector-V2] Migrate Kafka Source imperative validation to declarative OptionRule#11157
nzw921rx wants to merge 5 commits into
apache:devfrom
nzw921rx:improve/kafka-source-optionrule-migration

Conversation

@nzw921rx

@nzw921rx nzw921rx commented Jun 22, 2026

Copy link
Copy Markdown
Collaborator

Purpose of this pull request

Related #11007

  1. Migrate 4 imperative if/throw checks in KafkaSourceConfig.createConsumerMetadata() to declarative Conditions in KafkaSourceFactory.optionRule()
  2. Add greaterOrEqual(START_MODE_TIMESTAMP, 0L) constraint when start_mode = TIMESTAMP
  3. Add greaterOrEqual(START_MODE_END_TIMESTAMP, 0L) optional value constraint
  4. Add mapNotEmpty(START_MODE_OFFSETS) constraint when start_mode = SPECIFIC_OFFSETS
  5. Replace KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS conditional-required with greaterThan(..., 0L) value constraint when ignore_no_leader_partition = true
  6. Retain runtime-only checks (> currentTimestamp) with explanatory comments

Does this PR introduce any user-facing change?

Yes — previously invalid configurations (negative timestamps, empty offsets map) were rejected at task execution time with generic IllegalArgumentException. Now they are rejected earlier at job submission time with structured OptionValidationException including option name, constraint type, and violation details.

How was this patch tested?

Unit tests in KafkaFactoryTest:

  • testValidTimestampConfig — valid TIMESTAMP mode passes validation
  • testNegativeTimestampRejected — negative start_mode.timestamp is rejected
  • testNegativeEndTimestampRejected — negative start_mode.end_timestamp is rejected
  • testValidSpecificOffsetsConfig — valid SPECIFIC_OFFSETS mode passes validation
  • testEmptyOffsetsMapRejected — empty offsets map is rejected
./mvnw test -pl seatunnel-connectors-v2/connector-kafka -Dtest="KafkaFactoryTest" -DfailIfNoTests=false

@DanielLeens DanielLeens left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this! I re-reviewed the current head from the full config-validation path down to the runtime parsing path, and I found one blocker that still needs to be addressed before merge.

What this PR fixes

  • User pain: Kafka source validation is being moved to declarative option rules, but nested multi-table configs can still carry incomplete start-mode settings into runtime.
  • Fix approach: The PR migrates the main validation path into KafkaSourceFactory.optionRule() and adds TableConfigsValidator for tables_configs.
  • One-line summary: The migration direction is correct, but the nested table-config validator still misses required timestamp / offsets checks.

Runtime path I traced

Job submission
  -> KafkaSourceFactory.optionRule()
      -> TableConfigsValidator.evaluate()
          -> TIMESTAMP only rejects negative values when `start_mode.timestamp` is present
          -> SPECIFIC_OFFSETS only rejects empty maps when `start_mode.offsets` is present

Runtime startup
  -> KafkaSourceConfig.parseSourceConfig()
      -> TIMESTAMP branch calls `readonlyConfig.get(START_MODE_TIMESTAMP)` directly
      -> SPECIFIC_OFFSETS branch calls `readonlyConfig.get(START_MODE_OFFSETS)` and iterates it

Result
  -> the nested config can pass submission-time validation and still fail on the main startup path

Key findings

  • This change is on the normal multi-table Kafka submission path.
  • The PR currently validates value shape, but not the presence of required nested start-mode fields.
  • Runtime parsing still assumes those fields exist, so the gap is observable on the main startup path.

Blocking items

  1. Nested tables_configs still misses required start-mode fields
    • Location: seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java:138-167; seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java:187-230
    • Why this is a problem: This is on the main submission-to-runtime path. The nested validator only rejects negative or empty values when they are present, but runtime parsing still treats start_mode.timestamp and start_mode.offsets as mandatory.
    • Risk: A valid-looking multi-table job can pass submission-time validation and then fail on source startup.
    • Suggested fix: Make the nested validator require start_mode.timestamp for TIMESTAMP and require a non-empty start_mode.offsets map for SPECIFIC_OFFSETS, then add regression tests for both cases.

Other review notes

  • No additional non-blocking source-level comments from this round.

Compatibility and side effects

  • Compatibility: Backward compatible at the API/config-name level, but submission-time and runtime validation still disagree on required nested fields.
  • Side effects: The main impact is operational: the validation gap shifts a deterministic config error from submission time to runtime startup.
  • Error handling / logging: This should fail as an OptionValidationException during submission instead of surfacing later with a weaker runtime error context.
  • Tests: Current tests cover negative values and empty maps, but they do not cover the missing nested timestamp / offsets cases that trigger the real gap.
  • Docs: No user-facing documentation change is required here.
  • CI: Checks were still queued when I reviewed. The blocker above comes from the source path itself, not from CI noise.

Merge verdict

  • From the current head, I see a real source-level blocker. Please fix the nested tables_configs validation gap before this merges.

nzw921rx added 2 commits June 23, 2026 23:12
…e OptionRule

Migrate 4 imperative if/throw checks in KafkaSourceConfig.createConsumerMetadata()
to declarative Conditions in KafkaSourceFactory.optionRule():

- start_mode.timestamp >= 0 (greaterOrEqual when start_mode=TIMESTAMP)
- start_mode.end_timestamp >= 0 (greaterOrEqual, optional value constraint)
- start_mode.offsets non-empty map (mapNotEmpty when start_mode=SPECIFIC_OFFSETS)
- partition-discovery.interval-millis > 0 (greaterThan when ignore_no_leader_partition=true)

Runtime-only checks (> currentTimestamp) are kept in place with comments.

Related apache#11007
@nzw921rx nzw921rx force-pushed the improve/kafka-source-optionrule-migration branch from fdf97b3 to 8626980 Compare June 23, 2026 15:12

@DanielLeens DanielLeens left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. I re-reviewed the latest head from the full Kafka config-validation path down to the runtime source startup path, and I still see one blocker on the current version.

What this PR solves

  • User pain: Kafka source validation is being migrated to declarative option rules so invalid configs can fail during submission instead of later at runtime.
  • Fix approach: move the main validation into KafkaSourceFactory.optionRule() and add a nested validator for multi-table entries.
  • One-line summary: the migration direction is good, but the nested multi-table validation still does not fully match the runtime contract.

Runtime path I checked

Job submission
  -> KafkaSourceFactory.optionRule()
      -> TableConfigsValidator.evaluate()
          -> TIMESTAMP only rejects negative values when `start_mode.timestamp` is present
          -> SPECIFIC_OFFSETS only rejects empty maps when `start_mode.offsets` is present

Job startup
  -> KafkaSourceConfig.createConsumerMetadata()
      -> TIMESTAMP branch reads `start_mode.timestamp` directly
      -> SPECIFIC_OFFSETS branch reads `start_mode.offsets` directly
      -> both paths still treat those fields as required

Findings

Issue 1: nested multi-table validation still misses required start-mode fields

  • Location: seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java, seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
  • Why this is a problem:
    The nested validator only checks value shape when the field is present. But the runtime path still assumes start_mode.timestamp exists for TIMESTAMP and start_mode.offsets exists for SPECIFIC_OFFSETS.
  • Risk:
    A multi-table job can pass submission-time validation and still fail on the normal source startup path.
  • Better fix:
    Make the nested validator require start_mode.timestamp for TIMESTAMP and require a non-empty start_mode.offsets map for SPECIFIC_OFFSETS, then add regression tests for both missing-field cases.
  • Severity: High
  • Already raised by others: No

Compatibility / side effects

  • API/config names remain compatible.
  • The blocker is behavioral: submission-time validation and runtime parsing are still out of sync on the main multi-table path.

Tests / stability / CI

  • The new tests cover negative values and empty maps, but they still miss the real gap: missing nested timestamp / offsets fields.
  • The tests themselves look stable; the issue is coverage, not flakiness.
  • I am not relying on CI for this conclusion. The blocker comes directly from the source path above.

Merge conclusion

Conclusion: can merge after fixes

  1. Blocking items
  • Issue 1: align the nested tables_configs validation with the runtime contract by making the required timestamp / offsets fields actually required.
  1. Suggested but non-blocking follow-up
  • None beyond that blocker from the path I reviewed.

Happy to take another pass once the nested required-field validation is fully aligned with the runtime startup path.

@DanielLeens DanielLeens left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working through the follow-up comments here. I re-reviewed the latest head from the full Kafka config-validation path down to the runtime startup path. The earlier missing-condition gap is now addressed, and the ignore_no_leader_partition thread is resolved from my side. I did find one remaining blocker on the current revision though.

What this PR solves

  • User pain: Kafka source validation is being moved from scattered runtime checks into declarative OptionRule validation so invalid configs fail earlier at submission time.
  • Fix approach: the PR adds declarative validators for start_mode, ignore_no_leader_partition, and nested tables_configs.
  • One-line summary: the migration direction is good, but the latest validators now reject timestamp = 0, which was previously accepted by the actual runtime path.

Simple example

  • Before this PR, a config like start_mode = TIMESTAMP with start_mode.timestamp = 0 could still start, because the runtime only rejected future timestamps.
  • On the current head, the same config is rejected during submission by the new declarative validators.
  • So this is not just “stricter validation”; it is a real compatibility tightening on an existing user path.

Runtime path I checked

Job submission
  -> KafkaSourceFactory.optionRule() [KafkaSourceFactory.java:60-92]
      -> KafkaStartModeValidator.evaluate() [KafkaSourceFactory.java:125-146]
      -> KafkaTableConfigsValidator.evaluate() [KafkaSourceFactory.java:148-193]
      -> TIMESTAMP currently requires the timestamp to be > 0

Job startup
  -> KafkaSourceConfig.createConsumerMetadata() [KafkaSourceConfig.java:176-235]
      -> TIMESTAMP branch reads start_mode.timestamp directly [KafkaSourceConfig.java:187-210]
      -> runtime still only rejects timestamps greater than the current time

Result
  -> a previously accepted boundary value (0) is now rejected at submission time

I also want to explicitly note that @CosmosNi's earlier thread about the missing nested condition is addressed in the latest head. My remaining blocker is different: the new declarative validation is now stricter than the historical runtime contract.

Blocking items

  1. timestamp = 0 is now rejected even though the runtime path previously allowed it
    • Location:
      • seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java:135-142
      • seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java:167-179
      • seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java:187-210
      • seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/KafkaFactoryTest.java:143-174
    • Why this is a problem:
      The new declarative validators require > 0, while the runtime path only rejects future timestamps. That means the PR tightens an existing config contract on the normal submission path.
    • Risk:
      Existing jobs or user configs that rely on 0 as the epoch boundary can now fail before startup. This is a real backward-compatibility regression.
    • Best fix:
      • Option A (recommended): make the declarative validators accept >= 0 so they stay aligned with the existing runtime behavior, and add a regression test for the zero case.
      • Option B: if the project intentionally wants > 0, then the runtime path, docs, and incompatible-change docs all need to be updated together. I do not think that is the right fit for this PR.
    • Severity: High
    • Already raised by others: No. This is related to @CosmosNi's earlier validation-alignment point, but it is a different compatibility edge.

Tests / stability

  • The new unit tests look structurally stable: they are pure in-memory validation tests with no sleeps, no ports, no containers, and no async timing assumptions.
  • The gap is coverage, not flakiness: KafkaFactoryTest does not currently assert the timestamp = 0 boundary.

CI

  • The latest Build is green on the current head.
  • My blocker here is source-level compatibility, not CI.

Merge conclusion

Conclusion: can merge after fixes

  1. Blocking items
  • Issue 1: align the declarative timestamp boundary with the existing runtime contract instead of rejecting 0.
  1. Suggested but non-blocking follow-up
  • No additional non-blocking items from this round.

Happy to take another pass once the timestamp boundary is brought back into alignment.

@DanielLeens DanielLeens left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the follow-up commit. I re-reviewed the latest head from the full Kafka source validation path down to the runtime startup path again.

First, I want to be explicit about what is already fixed on this revision:

  • the earlier missing ignore_no_leader_partition condition is resolved
  • the earlier start_mode.timestamp = 0 compatibility blocker is also resolved on the current head
  • +1 to @CosmosNi's earlier validation-alignment concern in the thread; that thread itself is no longer my blocker on this revision

I do still see one remaining compatibility edge before merge, though.

What this PR solves

  • User pain: Kafka source validation is being migrated from scattered runtime checks into declarative OptionRule validation so invalid configs fail earlier at submission time.
  • Fix approach: the PR moves validation into KafkaSourceFactory.optionRule() and adds dedicated validators for start_mode, tables_configs, and ignore_no_leader_partition.
  • One-line summary: the migration direction is good, but the latest head still tightens one existing timestamp boundary that the historical runtime path allowed.

Simple example

  • Before this PR, a config with start_mode = TIMESTAMP and start_mode.end_timestamp = 0 could still enter the runtime path, because the runtime only rejected timestamps greater than the current time.
  • On the current head, the top-level declarative validator rejects start_mode.end_timestamp = 0 during submission.
  • So this is still a real compatibility tightening on an existing user path.

Runtime chain I checked

Job submission
  -> KafkaSourceFactory.optionRule() [KafkaSourceFactory.java:55-92]
      -> optional(start_mode.end_timestamp, Conditions.greaterThan(..., 0L)) [KafkaSourceFactory.java:68-70]
      -> start_mode.timestamp validator now correctly accepts >= 0 [KafkaSourceFactory.java:135-142]
      -> nested tables_configs validator now correctly accepts >= 0 for timestamps [KafkaSourceFactory.java:167-179]

Job startup
  -> KafkaSourceConfig.createConsumerMetadata() [KafkaSourceConfig.java:180-230]
      -> TIMESTAMP branch reads start_mode.timestamp [KafkaSourceConfig.java:187-198]
      -> optional end timestamp is only rejected when it is greater than current time [KafkaSourceConfig.java:199-209]

Result
  -> start_mode.timestamp = 0 is aligned now
  -> top-level start_mode.end_timestamp = 0 is still rejected earlier than the historical runtime path

Findings

Issue 1: top-level start_mode.end_timestamp = 0 is still rejected even though the historical runtime path allowed it

  • Location:
    • seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java:68-70
    • seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java:199-209
    • seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/KafkaFactoryTest.java:120-174
  • Why this is a problem:
    The runtime path now only rejects end timestamps greater than the current time, which means 0 remained historically acceptable. But the new top-level declarative validator still enforces > 0, so the migration is not fully aligned with the previous runtime contract yet.
  • Risk:
    Existing jobs or configs that rely on the epoch boundary for start_mode.end_timestamp can now fail at submission time on the normal path.
  • Best fix:
    • Option A (recommended): align the declarative validator with the runtime contract by accepting >= 0 here as well, then add an explicit regression test for the zero end-timestamp case.
    • Option B: if the project intentionally wants > 0, then the runtime behavior, tests, docs, and incompatible-change notes all need to be updated together. I do not think this PR should expand scope that way.
  • Severity: High
  • Already raised by others: This is related in spirit to @CosmosNi's earlier validation/runtime alignment point, but it is a different remaining compatibility edge on the latest head.

Issue 2: the validator/user-facing wording is now out of sync with the actual accepted boundary

  • Location:
    • seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java:127-129
    • seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java:152-154
  • Why this is a problem:
    The descriptions still say the timestamp must be “greater than 0”, while the implementation now accepts 0 for both top-level start_mode.timestamp and nested tables_configs timestamps.
  • Risk:
    This is not a runtime blocker, but it does make validation guidance and failure messaging harder to trust.
  • Best fix:
    Update the validator descriptions so they describe the real accepted boundary (>= 0).
  • Severity: Low
  • Already raised by others: No

Tests / stability

  • The new UTs look structurally stable: no sleeps, ports, containers, or async timing.
  • The gap is still coverage, not flakiness. There is no regression test yet for the top-level start_mode.end_timestamp = 0 case.

CI

  • The latest Build check on this head is currently cancelled, but my blocker here is source-level compatibility rather than CI state.

Merge conclusion

Conclusion: can merge after fixes

  1. Blocking items
  • Issue 1: align the top-level start_mode.end_timestamp validator with the historical runtime contract instead of still rejecting 0.
  1. Suggested but non-blocking follow-up
  • Issue 2: update the validator description strings so they match the real accepted boundary.

Happy to re-review once the last timestamp-boundary mismatch is brought fully into alignment.

@DanielLeens DanielLeens left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the follow-up commit. I re-reviewed the latest head from the Kafka source declarative validation path down to the runtime startup path again.

First, I want to be explicit about what is now closed on the current revision:

  • the earlier ignore_no_leader_partition validation gap is fixed
  • the earlier start_mode.timestamp = 0 compatibility mismatch is fixed
  • the remaining top-level start_mode.end_timestamp = 0 mismatch is also fixed now via greaterOrEqual(...)
  • +1 to @CosmosNi's earlier validation/runtime alignment concern; that thread is resolved from my side on the current head

What this PR solves

  • User pain: Kafka source validation is being migrated from scattered runtime checks into declarative OptionRule validation so invalid configs fail at submission time instead of later at startup.
  • Fix approach: the PR moves the key checks into KafkaSourceFactory.optionRule() and adds dedicated validators for start mode, nested multi-table configs, and ignore_no_leader_partition.
  • One-line summary: the migration now matches the historical runtime contract on the paths I traced, and I do not see a remaining source-level blocker on the current head.

Runtime chain I rechecked

Job submission
  -> KafkaSourceFactory.optionRule()
      -> top-level START_MODE_END_TIMESTAMP uses greaterOrEqual(..., 0)
      -> START_MODE validator accepts timestamp >= 0 and non-empty specific offsets
      -> TABLE_CONFIGS / TABLE_LIST validator checks nested TIMESTAMP and SPECIFIC_OFFSETS entries
      -> IGNORE_NO_LEADER_PARTITION validator enforces positive partition discovery interval

Job startup
  -> KafkaSourceConfig.createConsumerMetadata()
      -> TIMESTAMP branch still rejects only future timestamps
      -> SPECIFIC_OFFSETS branch reads the validated offsets map

Findings

I do not see a blocking correctness issue on the current revision.

One small non-blocking follow-up:

  • seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java:127-129
    The validator description still says start_mode.timestamp must be "greater than 0", while the implementation now correctly accepts >= 0. I would update the wording so the guidance matches the real boundary.

Tests / stability

  • The added UTs are aligned with the new declarative-validation path.
  • Stability rating: Stable.
  • Reason: the tests are pure config-validation checks without sleeps, timing, ports, or container ordering dependencies.

Merge conclusion

Conclusion: can merge

  1. Blocking items
  • None from my source-level re-review on the current head.
  1. Suggested non-blocking follow-up
  • Update the validator description string so it matches the actual accepted timestamp boundary.

Overall, the current revision closes the validation/runtime alignment gaps I was tracking earlier. From my side this is ready.

@DanielLeens

Copy link
Copy Markdown
Contributor

Thanks for the update. I rechecked the current head against the latest dev, and this branch is still behind upstream.

  • base branch: dev
  • head SHA: 345b0d51cefbddc4c72ade2dbf676bb77f2365e1
  • compare status: diverged
  • ahead_by: 5
  • behind_by: 26
  • mergeable_state: unknown
  • CI / merge gate snapshot: pending

Because the PR is still behind the latest base, any CI result and merge gate signal here is mixed with upstream drift. The lowest-cost next step is to sync with the latest dev and rerun CI first. If anything still fails after the sync, I'm happy to take another look on top of the updated head.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants