Skip to content

[Feature][Connector-V2][CDC] Add configurable schema change behavior policies#11162

Open
CloverDew wants to merge 12 commits into
apache:devfrom
CloverDew:feature/add-configurable-schema-change-behavior-policies-for-cdc
Open

[Feature][Connector-V2][CDC] Add configurable schema change behavior policies#11162
CloverDew wants to merge 12 commits into
apache:devfrom
CloverDew:feature/add-configurable-schema-change-behavior-policies-for-cdc

Conversation

@CloverDew

Copy link
Copy Markdown
Contributor

Purpose of this pull request

Close: #11043

Since the current branch is based on #11025, please wait for that PR to be merged; I will perform a rebase after it is merged to maintain a clean commit history and avoid conflicts.
Please begin the review starting from commit f80539d.

This PR introduces a new schema-changes.behavior option with three explicit, well-defined policies:

  • STRICT: Fail the job immediately when any schema change event is observed, before any sink-side mutation is attempted. For pipelines that treat the incoming schema as a fixed contract.
  • EVOLVE: Forward schema change events through the normal schema coordination path only when they are supported end-to-end. Any unsupported event type or sink-side apply failure is fatal. This is the default when schema-changes.enabled = true.
  • IGNORE: Drop schema change events before downstream schema coordination. The job continues only for event types that are safe to ignore (e.g. comment-only changes). Events that alter the runtime row layout cannot be silently ignored and will fail fast.

Does this PR introduce any user-facing change?

Yes.

A new CDC source option schema-changes.behavior is introduced. It accepts STRICT, EVOLVE, or IGNORE.

Backward compatibility is preserved:

  • Jobs with schema-changes.enabled = false (the existing default) are unaffected.
  • Jobs with schema-changes.enabled = true that do not set schema-changes.behavior will default to EVOLVE, which preserves the closest semantics to the previous behavior.

Example configuration:

env {
  # You can set engine configuration here
  parallelism = 5
  job.mode = "STREAMING"
  checkpoint.interval = 5000
  read_limit.bytes_per_second=7000000
  read_limit.rows_per_second=400
}

source {
  MySQL-CDC {
    server-id = 5652-5657
    username = "st_user_source"
    password = "mysqlpw"
    table-names = ["shop.products"]
    url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"

    schema-changes.enabled = true
    schema-changes.behavior = evolve
  }
}

sink {
  jdbc {
    url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
    driver = "com.mysql.cj.jdbc.Driver"
    user = "st_user_sink"
    password = "mysqlpw"
    generate_sink_sql = true
    database = shop
    table = mysql_cdc_e2e_sink_table_with_schema_change
    primary_keys = ["id"]
  }
}

How was this patch tested?

Unit tests added or updated: SchemaChangePolicyTest, SeaTunnelSourceCollectorSchemaChangePolicyTest, SinkFlowLifeCycleSchemaChangePolicyTest, SchemaOperatorTest, FlinkSinkWriterTest

E2E tests added:

  • MysqlCDCWithSchemaChangeIT.testMysqlCDCWithSchemaChangeIgnore, MysqlCDCWithSchemaChangeIT.testMysqlCDCWithSchemaChangeStrict.

  • MysqlCDCWithFlinkSchemaChangeIT.testMysqlCDCWithFlinkSchemaChangeIgnore, MysqlCDCWithFlinkSchemaChangeIT.testMysqlCDCWithFlinkSchemaChangeStrict

conf files added:

  • mysqlcdc_to_mysql_with_schema_change_ignore.conf
  • mysqlcdc_to_mysql_with_schema_change_strict.conf
  • mysqlcdc_to_mysql_with_flink_schema_change_ignore.conf
  • mysqlcdc_to_mysql_with_flink_schema_change_strict.conf

Check list

@DanielLeens DanielLeens left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pushing this forward. I re-reviewed the latest head from the source-side schema-change resolver through the engine / Flink coordination path and into sink-side application. I found one real blocker plus one test gap that should be fixed before merge.

What this PR fixes

  • User pain: Users need explicit schema-change behavior policies so CDC schema evolution can fail fast, ignore safe events, or evolve end to end in a controlled way.
  • Fix approach: The PR introduces SchemaChangeBehavior / SchemaChangePolicy, wires behavior through CDC source, engine, and Flink translation, and lets sinks validate schema-evolution support.
  • One-line summary: The design direction is good, but the current support check is too permissive for composite column-change events.

Runtime path I traced

CDC source parsing
  -> AbstractSchemaChangeResolver batches all column changes from one DDL into one `AlterTableColumnsEvent`

Coordination path
  -> Flink `SchemaOperator` validates the composite event
  -> Engine `SinkFlowLifeCycle` validates the same composite event again

Policy check
  -> `SchemaChangePolicy.isSupported()` returns true for `SCHEMA_CHANGE_UPDATE_COLUMNS` if the sink supports ANY one of ADD / DROP / UPDATE / RENAME / ALTER_COLUMN_COMMENT

Sink example
  -> Elasticsearch only advertises `ADD_COLUMN`
  -> its writer iterates sub-events and throws on non-ADD changes

Result
  -> mixed DDL can pass the policy gate and only fail later inside the sink writer

Key findings

  • This PR is on the normal schema-evolution path, not on a dormant edge path.
  • The real risk is the composite AlterTableColumnsEvent, not single add/drop/rename events.
  • The current policy check can green-light mixed DDL that the sink cannot actually apply.

Blocking items

  1. Composite AlterTableColumnsEvent is treated as supported when the sink supports only one sub-operation
    • Location: seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/SchemaChangePolicy.java:85-90; seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/event/AlterTableColumnsEvent.java:31-52; seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/schema/AbstractSchemaChangeResolver.java:110-145; seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java:305-310; seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java:110-112; seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java:135-180
    • Why this is a problem: The source batches multiple column changes from one DDL into a single AlterTableColumnsEvent, but the policy currently treats that composite event as supported if the sink supports any one column-change capability.
    • Risk: Mixed DDL can pass the policy gate in EVOLVE mode and then fail later inside the sink writer.
    • Suggested fix: Validate composite events against the full set of sub-events, and only pass when the sink supports every sub-operation in the batch.
  2. No regression test covers mixed composite column-change events against a partially supported sink
    • Location: seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollectorSchemaChangePolicyTest.java:49-91; seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/test/java/org/apache/seatunnel/translation/flink/schema/SchemaOperatorTest.java:183-237
    • Why this is a problem: Current tests cover strict/ignore and single-event behavior, but not the mixed composite-event case that exposes the policy gap.
    • Risk: Without a regression test, this capability mismatch can easily reappear later.
    • Suggested fix: Add at least one engine or Flink-level regression test where a partially supported sink receives a mixed AlterTableColumnsEvent and is rejected at the policy gate.

Other review notes

  • I like the overall architecture direction here. Once the composite-event capability check is tightened, this will be much easier to reason about end to end.

Compatibility and side effects

  • Compatibility: The new config is backward compatible, but the capability check is currently too permissive for mixed DDL.
  • Side effects: This is an architecture-level behavior risk rather than a simple performance issue: policy, coordination, and sink application can disagree on what “supported” means.
  • Error handling / logging: Right now the failure is deferred to sink-application time instead of being rejected clearly at the policy gate.
  • Tests: The new tests cover single-event behavior, but I did not find coverage for mixed composite column-change events against a partially supported sink.
  • Docs: English and Chinese schema-evolution docs were updated and generally align with the code direction.
  • CI: Some checks were still pending. The blocker above is source-level and reproducible from the current code path.

Merge verdict

  • I like the overall direction, but the current SCHEMA_CHANGE_UPDATE_COLUMNS support check is too permissive and can green-light mixed DDL that the sink cannot actually apply.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature][Connector-V2][CDC] Add configurable schema change behavior policies

2 participants