[Improve][Connector-V2] Migrate RocketMQ Source validation to declarative OptionRule#11158
Open
nzw921rx wants to merge 1 commit into
Open
[Improve][Connector-V2] Migrate RocketMQ Source validation to declarative OptionRule#11158nzw921rx wants to merge 1 commit into
nzw921rx wants to merge 1 commit into
Conversation
…tive OptionRule Migrate imperative if/throw checks in RocketMqSourceConfig to declarative OptionRule constraints in RocketMqSourceFactory: Single-table mode: - start.mode.timestamp >= 0 (greaterOrEqual when start.mode=CONSUME_FROM_TIMESTAMP) - start.mode.offsets non-empty map (mapNotEmpty when start.mode=CONSUME_FROM_SPECIFIC_OFFSETS) Multi-table mode (via ConditionExtension): - Each tables_configs entry must have non-empty 'topics' - start.mode.timestamp required and >= 0 when CONSUME_FROM_TIMESTAMP - start.mode.offsets non-empty when CONSUME_FROM_SPECIFIC_OFFSETS Runtime-only checks (> currentTimestamp) are kept in place with comments. Related apache#11007
DanielLeens
approved these changes
Jun 22, 2026
DanielLeens
left a comment
Contributor
There was a problem hiding this comment.
Thanks for the migration work here. I re-checked the current head from the declarative option rule down to the runtime table-config parser, and the logic looks aligned to me.
What this PR fixes
- User pain: RocketMQ source validation should live in declarative option rules instead of being scattered imperatively, especially for timestamp / specific-offset start modes.
- Fix approach: The PR moves the main start-mode and ACL checks into factory-side conditional rules plus a nested
TableConfigsValidator. - One-line summary: On the current head, the declarative validation path is aligned with the runtime parser.
Runtime path I traced
Job submission
-> `RocketMqSourceFactory.optionRule()`
-> top-level conditional rules for timestamp / offsets
-> nested `TableConfigsValidator` for per-table configs
Runtime startup
-> `RocketMqSourceConfig.parseTableConfig()` reads the same fields directly
Result
-> submission-time validation now matches the runtime parser contract
Key findings
- The normal submission path hits this change directly.
- Unlike the Kafka PR, the nested validator here does require missing timestamp / offsets fields when the mode demands them.
- The declarative rules now match what runtime parsing expects.
Blocking items
None at the source-code level on the current head.
Other review notes
- No additional non-blocking source-level comments from this round.
Compatibility and side effects
- Compatibility: Fully backward compatible. This only makes invalid configs fail earlier.
- Side effects: No meaningful performance or resource-model downside; the main effect is earlier rejection of bad configs.
- Error handling / logging: Aggregated submission-time validation is better than failing later with a runtime
IllegalArgumentException. - Tests: The added tests cover the nested required-field cases and do not show obvious flaky-test patterns.
- Docs: No docs update is required.
- CI: There is still a red
rocketmq-connector-itcheck on the branch, but the failed log points to an engine-sideCheckpointCoordinator.isNoErrorCompleted(...)NPE rather than this factory/config diff. I would sync the latestdevand rerun the checks.
Merge verdict
- From the current head I do not see a source-level blocker. The declarative validation path looks aligned with the runtime parsing path.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Purpose of this pull request
Related #11007
Depends on #11121
greaterOrEqual(START_MODE_TIMESTAMP, 0L)constraint whenstart.mode = CONSUME_FROM_TIMESTAMPmapNotEmpty(START_MODE_OFFSETS)constraint whenstart.mode = CONSUME_FROM_SPECIFIC_OFFSETSTableConfigsValidator(ConditionExtension) to validate nestedtables_configs/table_listentries: topics non-empty, per-entry start.mode conditional dependencies, timestamp >= 0, offsets non-emptyRocketMqSourceConfig, retain runtime-only> currentTimestampchecks with commentsRocketMqFactoryTestcovering single-table and multi-table positive/negative validation pathsDoes this PR introduce any user-facing change?
Yes — previously invalid configurations (negative timestamps, empty offsets, missing topics in tables_configs entries) were rejected at task execution time with generic
IllegalArgumentException. Now they are rejected earlier at job submission time with structuredOptionValidationExceptionincluding option name, constraint type, and violation details. Multiple errors are reported together thanks to the aggregation support from #11121.How was this patch tested?
Unit tests in
RocketMqFactoryTest:testValidTimestampConfig— valid CONSUME_FROM_TIMESTAMP mode passes validationtestNegativeTimestampRejected— negativestart.mode.timestampis rejectedtestValidSpecificOffsetsConfig— valid CONSUME_FROM_SPECIFIC_OFFSETS mode passes validationtestEmptyOffsetsMapRejected— empty offsets map is rejectedtestMultiTableValidConfig— valid multi-table config passes validationtestMultiTableMissingTopicsRejected— empty topics in tables_configs entry is rejectedtestMultiTableTimestampModeWithoutTimestampRejected— TIMESTAMP mode without timestamp in entry is rejectedtestMultiTableNegativeTimestampRejected— negative timestamp in tables_configs entry is rejectedtestMultiTableEmptyOffsetsRejected— empty offsets in tables_configs entry is rejectedExisting tests continue to pass.