Skip to content

[Improve][Connector-V2] Migrate RocketMQ Source validation to declarative OptionRule#11158

Open
nzw921rx wants to merge 1 commit into
apache:devfrom
nzw921rx:improve/rocketmq-source-optionrule-migration
Open

[Improve][Connector-V2] Migrate RocketMQ Source validation to declarative OptionRule#11158
nzw921rx wants to merge 1 commit into
apache:devfrom
nzw921rx:improve/rocketmq-source-optionrule-migration

Conversation

@nzw921rx

Copy link
Copy Markdown
Collaborator

Purpose of this pull request

Related #11007
Depends on #11121

  1. Add greaterOrEqual(START_MODE_TIMESTAMP, 0L) constraint when start.mode = CONSUME_FROM_TIMESTAMP
  2. Add mapNotEmpty(START_MODE_OFFSETS) constraint when start.mode = CONSUME_FROM_SPECIFIC_OFFSETS
  3. Add TableConfigsValidator (ConditionExtension) to validate nested tables_configs / table_list entries: topics non-empty, per-entry start.mode conditional dependencies, timestamp >= 0, offsets non-empty
  4. Remove migrated imperative checks from RocketMqSourceConfig, retain runtime-only > currentTimestamp checks with comments
  5. Add RocketMqFactoryTest covering single-table and multi-table positive/negative validation paths

Does this PR introduce any user-facing change?

Yes — previously invalid configurations (negative timestamps, empty offsets, missing topics in tables_configs entries) were rejected at task execution time with generic IllegalArgumentException. Now they are rejected earlier at job submission time with structured OptionValidationException including option name, constraint type, and violation details. Multiple errors are reported together thanks to the aggregation support from #11121.

How was this patch tested?

Unit tests in RocketMqFactoryTest:

  • testValidTimestampConfig — valid CONSUME_FROM_TIMESTAMP mode passes validation
  • testNegativeTimestampRejected — negative start.mode.timestamp is rejected
  • testValidSpecificOffsetsConfig — valid CONSUME_FROM_SPECIFIC_OFFSETS mode passes validation
  • testEmptyOffsetsMapRejected — empty offsets map is rejected
  • testMultiTableValidConfig — valid multi-table config passes validation
  • testMultiTableMissingTopicsRejected — empty topics in tables_configs entry is rejected
  • testMultiTableTimestampModeWithoutTimestampRejected — TIMESTAMP mode without timestamp in entry is rejected
  • testMultiTableNegativeTimestampRejected — negative timestamp in tables_configs entry is rejected
  • testMultiTableEmptyOffsetsRejected — empty offsets in tables_configs entry is rejected

Existing tests continue to pass.

./mvnw test -pl seatunnel-connectors-v2/connector-rocketmq -Dtest="RocketMqFactoryTest" -DfailIfNoTests=false

…tive OptionRule

Migrate imperative if/throw checks in RocketMqSourceConfig to declarative
OptionRule constraints in RocketMqSourceFactory:

Single-table mode:
- start.mode.timestamp >= 0 (greaterOrEqual when start.mode=CONSUME_FROM_TIMESTAMP)
- start.mode.offsets non-empty map (mapNotEmpty when start.mode=CONSUME_FROM_SPECIFIC_OFFSETS)

Multi-table mode (via ConditionExtension):
- Each tables_configs entry must have non-empty 'topics'
- start.mode.timestamp required and >= 0 when CONSUME_FROM_TIMESTAMP
- start.mode.offsets non-empty when CONSUME_FROM_SPECIFIC_OFFSETS

Runtime-only checks (> currentTimestamp) are kept in place with comments.

Related apache#11007

@DanielLeens DanielLeens left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the migration work here. I re-checked the current head from the declarative option rule down to the runtime table-config parser, and the logic looks aligned to me.

What this PR fixes

  • User pain: RocketMQ source validation should live in declarative option rules instead of being scattered imperatively, especially for timestamp / specific-offset start modes.
  • Fix approach: The PR moves the main start-mode and ACL checks into factory-side conditional rules plus a nested TableConfigsValidator.
  • One-line summary: On the current head, the declarative validation path is aligned with the runtime parser.

Runtime path I traced

Job submission
  -> `RocketMqSourceFactory.optionRule()`
      -> top-level conditional rules for timestamp / offsets
      -> nested `TableConfigsValidator` for per-table configs

Runtime startup
  -> `RocketMqSourceConfig.parseTableConfig()` reads the same fields directly

Result
  -> submission-time validation now matches the runtime parser contract

Key findings

  • The normal submission path hits this change directly.
  • Unlike the Kafka PR, the nested validator here does require missing timestamp / offsets fields when the mode demands them.
  • The declarative rules now match what runtime parsing expects.

Blocking items
None at the source-code level on the current head.

Other review notes

  • No additional non-blocking source-level comments from this round.

Compatibility and side effects

  • Compatibility: Fully backward compatible. This only makes invalid configs fail earlier.
  • Side effects: No meaningful performance or resource-model downside; the main effect is earlier rejection of bad configs.
  • Error handling / logging: Aggregated submission-time validation is better than failing later with a runtime IllegalArgumentException.
  • Tests: The added tests cover the nested required-field cases and do not show obvious flaky-test patterns.
  • Docs: No docs update is required.
  • CI: There is still a red rocketmq-connector-it check on the branch, but the failed log points to an engine-side CheckpointCoordinator.isNoErrorCompleted(...) NPE rather than this factory/config diff. I would sync the latest dev and rerun the checks.

Merge verdict

  • From the current head I do not see a source-level blocker. The declarative validation path looks aligned with the runtime parsing path.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants