Skip to content

[Fix] [Connector-V2] Fix Flink schema evolution hang caused by XA transaction MDL deadlock with MySQL CDC#10648

Open
CloverDew wants to merge 3 commits intoapache:devfrom
CloverDew:fix/mysqlcdc-exactly-once-hang-on-flink
Open

[Fix] [Connector-V2] Fix Flink schema evolution hang caused by XA transaction MDL deadlock with MySQL CDC#10648
CloverDew wants to merge 3 commits intoapache:devfrom
CloverDew:fix/mysqlcdc-exactly-once-hang-on-flink

Conversation

@CloverDew
Copy link
Copy Markdown
Contributor

Purpose of this pull request

This PR fixes a deadlock issue where Flink jobs using MySQL CDC source with JDBC Exactly-Once sink would hang indefinitely during schema evolution. The root cause was a self-deadlock in JdbcExactlyOnceSinkWriter where prepared XA transactions held MySQL Metadata Locks that blocked subsequent DDL operations on the same connection.
For details, please see: #10647

Key Changes:

  • JdbcExactlyOnceSinkWriter deadlock fix: Modified reOpenOutputFormat() to properly handle XA transaction cleanup before DDL execution

  • SchemaOperator: Implemented checkpoint-aware DDL deferring mechanism to prevent XA/DDL timing conflicts

  • Test stability improvement: Adjusted test timeout tolerance for timestamp default values

Does this PR introduce any user-facing change?

Yes. This PR improves the reliability of schema evolution in production environments:

  • Previous behavior: Flink jobs would hang indefinitely when DDL operations occurred during checkpoints with exactly-once semantics enabled.

  • New behavior: DDL operations are processed safely with proper XA transaction coordination, ensuring both data consistency and schema evolution reliability.

  • Impact: Users can now safely enable both exactly-once semantics and schema evolution without experiencing job hangs.

How was this patch tested?

A new test method has been added: MysqlCDCWithFlinkSchemaChangeIT.testMysqlCdcWithSchemaEvolutionCaseExactlyOnce.
It now runs smoothly and no longer encounters job hangs.

Check list

Copy link
Copy Markdown

@DanielLeens DanielLeens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What Problem Does This PR Solve?

  • User pain point: when MySQL CDC schema evolution is enabled and the target is a MySQL JDBC exactly-once sink, a DDL event may reach the sink while previous XA transactions are still being committed. MySQL metadata locks can then make ALTER TABLE wait on XA commit, while the job appears to hang.
  • Fix approach: this PR changes SchemaOperator from applying schema changes synchronously in processElement() to buffering the DDL and following records until checkpoint completion. It also moves the schema-change ACK from BroadcastSchemaSinkOperator to FlinkSinkWriter, so the source side is acknowledged only after the sink writer has actually run applySchemaChange(). In the flink-common writer, it also removes an extra sinkWriter.prepareCommit() before applying the schema change.
  • One sentence: the PR tries to move Flink CDC schema evolution DDL execution behind the checkpoint/XA commit boundary, so MySQL ALTER TABLE is less likely to deadlock with prepared XA transactions holding metadata locks.

Simple example: a job syncs shop.products to a MySQL sink and the source captures ALTER TABLE products ADD COLUMN add_column1. Before this PR, the sink could try to execute ALTER TABLE while the previous XA transaction was still committing. With this PR, SeaTunnel first holds back the rows after the DDL, waits for checkpoint completion, applies the DDL in the sink, and only then releases rows with the new schema.

1. Code Change Review

1.1 Core Logic Analysis

Precise Description of the Changes

Local review context:

  • Local review branch: seatunnel-review-10648
  • PR head: 256603996702aaa914c13804726ff8f85ba05e53
  • Target branch: upstream/dev
  • Comparison base: 219c2acec365ae6fade236b513b34c8783042409
  • Commit range: upstream/dev..HEAD includes 32474d776 fix and 256603996 fix
  • Local diff: 6 files, +534 / -639
  • CI: GitHub Build, Notify test workflow, and labeler are all passing. The PR is mergeable=true, mergeStateStatus=BLOCKED.
  • Local command result: git diff --check upstream/dev...HEAD produced no output, so I did not find whitespace errors. Per the review requirement, I did not run the PR locally or run Maven/E2E jobs.

The main changes are in:

  • seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/schema/SchemaOperator.java:116-299
    • processElement() now buffers detected DDL events;
    • notifyCheckpointComplete() broadcasts schema events after checkpoint completion and waits for coordinator ACKs;
    • snapshot/restore state now includes the pending queue and first-seen checkpoint id.
  • seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/schema/BroadcastSchemaSinkOperator.java:142-209
    • ACK is no longer sent immediately after forwarding;
    • the schema event is forwarded to the sink writer, which sends ACK only after applySchemaChange() completes.
  • seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java:125-167
    • the extra sinkWriter.prepareCommit() before schema change application was removed in the flink-common writer;
    • the writer sends success/failure ACK through LocalSchemaCoordinator.notifySchemaChangeApplied().
  • seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/schema/coordinator/LocalSchemaCoordinator.java:122-342
    • active sink subtasks are tracked dynamically instead of relying on fixed registered parallelism.
  • seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCWithFlinkSchemaChangeIT.java:151-168
    • a MySQL CDC schema evolution exactly-once E2E path was added.
  • seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_flink_schema_change_exactly_once.conf:21-54
    • the new E2E config enables is_exactly_once=true and xa_data_source_class_name.

This code is on the normal runtime path:

  • Source side: SourceExecuteProcessor inserts SchemaOperator when the job is streaming and schema-changes.enabled=true, see seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java:102-122.
  • Sink side: the Flink starter inserts BroadcastSchemaSinkOperator when the sink supports schema evolution, see seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java:52-70 and seatunnel-core/seatunnel-flink-starter/seatunnel-flink-20-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java:50-67.
  • Sink writer: the sink adapter creates FlinkSinkWriter, see the common path at seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java:77-92 and the Flink 1.20 path at seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-20/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java:76-83.

Before / After Code Snippets

Before, SchemaOperator broadcasted the DDL and synchronously waited in the element-processing path:

if ("__SCHEMA_CHANGE_EVENT__".equals(element.getTableId())
        && element.getOptions() != null) {
    Object object = element.getOptions().get("schema_change_event");
    if (object instanceof SchemaChangeEvent) {
        handleSchemaChangeEvent((SchemaChangeEvent) object);
        return;
    }
}

sendSchemaChangeEventToDownstream(schemaChangeEvent);
boolean success = coordinator.requestSchemaChange(tableId, eventTime, timeoutMs);
...
releaseBufferedData(key, tableId);

After, DDL events and subsequent records are buffered and the DDL is applied from notifyCheckpointComplete():

if ("__SCHEMA_CHANGE_EVENT__".equals(element.getTableId())
        && element.getOptions() != null) {
    Object object = element.getOptions().get("schema_change_event");
    if (object instanceof SchemaChangeEvent) {
        handleSchemaChangeDetected((SchemaChangeEvent) object, streamRecord.getTimestamp());
        return;
    }
}

if (schemaChangePending) {
    enqueueDataRecord(element, streamRecord.getTimestamp());
    return;
}
if (firstSeenCheckpointId < 0) {
    firstSeenCheckpointId = checkpointId;
    return;
}

if (checkpointId < firstSeenCheckpointId + CHECKPOINT_WAIT_ROUNDS) {
    return;
}

sendSchemaChangeEventToDownstream(event);
boolean success = coordinator.requestSchemaChange(tableId, eventTime, SCHEMA_CHANGE_TIMEOUT_MS);
...
drainDataUntilNextSchemaChange();

Before, BroadcastSchemaSinkOperator ACKed immediately after forwarding:

emitApplySchemaEventToSink(event, epoch);
lastProcessedEpoch.put(tableId, epoch);
coordinator.notifySchemaChangeApplied(tableId, epoch, subtaskId, true);

After, ACK is sent by the sink writer after the actual schema change:

emitApplySchemaEventToSink(event, epoch);
lastProcessedEpoch.put(tableId, epoch);
// ACK will be sent by FlinkSinkWriter after ALTER TABLE completes.
try {
    ((SupportSchemaEvolutionSinkWriter) sinkWriter).applySchemaChange(schemaChangeEvent);
    success = true;
} catch (Exception e) {
    log.error("Failed to apply schema change for table: {}", schemaChangeEvent.tableIdentifier(), e);
} finally {
    sendSchemaChangeAck(schemaChangeEvent, epoch, subtaskId, success);
}

Before, the flink-common writer performed an extra prepare before schema change application:

sinkWriter.prepareCommit();
if (!(sinkWriter instanceof SupportSchemaEvolutionSinkWriter)) {
    ...
}
((SupportSchemaEvolutionSinkWriter) sinkWriter).applySchemaChange(schemaChangeEvent);

After, the flink-common path removes that extra prepare:

if (!(sinkWriter instanceof SupportSchemaEvolutionSinkWriter)) {
    ...
}
((SupportSchemaEvolutionSinkWriter) sinkWriter).applySchemaChange(schemaChangeEvent);

However, the Flink 1.20 writer still keeps the old behavior:

// seatunnel-translation-flink-20/.../FlinkSinkWriter.java:127-149
private void handleSchemaChangeEvent(
        SchemaChangeEvent schemaChangeEvent, Map<String, Object> options) throws IOException {
    ...
    sinkWriter.prepareCommit();
    if (!(sinkWriter instanceof SupportSchemaEvolutionSinkWriter)) {
        ...
    }
    ...
    ((SupportSchemaEvolutionSinkWriter) sinkWriter).applySchemaChange(schemaChangeEvent);
}

Key Findings

  1. The normal path does hit this PR: MySQL CDC emits __SCHEMA_CHANGE_EVENT__; SchemaOperator buffers the event; BroadcastSchemaSinkOperator forwards it to the sink; FlinkSinkWriter applies the schema change and ACKs the coordinator.
  2. The direction is right: delaying DDL until checkpoint completion and ACKing only after the real sink-side DDL is much closer to the XA/MDL failure mode.
  3. There are still blocking issues: the Flink 1.20 writer did not receive the sink-side fix; schema coordination failure still releases buffered rows; and pending queue overflow can drop the DDL control event itself.
  4. The new E2E test covers only the successful, single-parallelism, throttled path. It does not cover Flink 1.20, failed ACKs/timeouts, queue overflow, or recovery with a pending DDL.

Deep Correctness Analysis

On the success path, the design is sound: after a DDL is detected, rows after the DDL are buffered; checkpoint completion is used as the boundary before the DDL is broadcast; every sink subtask must apply the DDL and ACK; then the source side releases buffered rows with the new schema. That should reduce the MySQL metadata-lock conflict with previous XA commits.

The current implementation still depends on conditions that are not fully enforced:

  • every Flink runtime writer must follow the same schema-event transaction semantics;
  • if sink-side DDL fails, ACK fails, or coordinator times out, the source must not mark the DDL as processed or release rows after it;
  • control events must not be evicted by ordinary data records;
  • the checkpoint-wait semantics should be clarified and tested, especially because the code says “two checkpoint cycles” while CHECKPOINT_WAIT_ROUNDS = 1.

So this is a good direction, but not yet a safe merge as-is.

Complete Runtime Flow

CDC source captures a DDL
  -> FlinkRowCollector.collect(SchemaChangeEvent) [FlinkRowCollector.java:75-82]
      -> creates tableId="__SCHEMA_CHANGE_EVENT__"
      -> options["schema_change_event"] = SchemaChangeEvent

Flink source translation
  -> SourceExecuteProcessor.createSeaTunnelSource [SourceExecuteProcessor.java:102-122]
      -> job.mode=STREAMING and schema-changes.enabled=true
      -> source implements SupportSchemaEvolution
      -> inserts SchemaOperator

SchemaOperator
  -> processElement() [SchemaOperator.java:116-140]
      -> normal row: forwarded when no DDL is pending, buffered when a DDL is pending
      -> DDL row: handleSchemaChangeDetected() adds it to pendingQueue [SchemaOperator.java:143-165]
  -> notifyCheckpointComplete(checkpointId) [SchemaOperator.java:193-299]
      -> first time DDL reaches queue head: record firstSeenCheckpointId and return [L212-L223]
      -> later checkpoint satisfies the wait condition: poll the DDL [L225-L241]
      -> sendSchemaChangeEventToDownstream(event) [L262, L451-L459]
      -> coordinator.requestSchemaChange(..., 300s) [L264-L266]
      -> on success, update localSchemaState/lastProcessedEventTime and release buffered data [L286-L292]

Flink sink translation
  -> SinkExecuteProcessor.createVersionSpecificDataStreamSink()
      -> flink-common starter inserts BroadcastSchemaSinkOperator [seatunnel-flink-starter-common/.../SinkExecuteProcessor.java:52-70]
      -> Flink 1.20 starter also inserts BroadcastSchemaSinkOperator [seatunnel-flink-20-starter/.../SinkExecuteProcessor.java:50-67]

BroadcastSchemaSinkOperator
  -> processElement() receives options["schema_change_broadcast"] [BroadcastSchemaSinkOperator.java:129-136]
  -> handleBroadcastedSchemaChange() [L142-L199]
      -> emitApplySchemaEventToSink(event, epoch) [L201-L209]
      -> does not ACK here; waits for the sink writer to ACK after ALTER TABLE

FlinkSinkWriter
  -> common path createWriter() [FlinkSink.java:77-92]
      -> FlinkSinkWriter.handleSchemaChangeEvent() [FlinkSinkWriter.java:125-167]
          -> applySchemaChange()
          -> sendSchemaChangeAck(success/failure)
  -> Flink 1.20 path createWriter() [flink-20/FlinkSink.java:76-83]
      -> flink-20/FlinkSinkWriter.handleSchemaChangeEvent() [flink-20/FlinkSinkWriter.java:127-149]
          -> still calls sinkWriter.prepareCommit() first
          -> then applySchemaChange()

JDBC exactly-once sink
  -> AbstractJdbcSinkWriter.applySchemaChange() [AbstractJdbcSinkWriter.java:61-65]
      -> reOpenOutputFormat(event) [L67-L87]
          -> prepareCommit() [L68]
          -> dialect.applySchemaChange(connection, sinkTablePath, event) [L71-L73]
  -> JdbcExactlyOnceSinkWriter.prepareCommit(long checkpointId) [JdbcExactlyOnceSinkWriter.java:171-198]
      -> prepareCurrentTx()
      -> beginTx(checkpointId)

1.2 Compatibility Impact

Judgement: partially incompatible.

  • API: no public Java API/SPI signature changes.
  • Config options: no user-facing option was added or removed.
  • Defaults: no default value change.
  • Protocol: no external protocol change; internally, schema-change ACK timing changes from “after broadcast forwarding” to “after sink writer apply”.
  • Serialization format: the Flink operator state layout changes through new bufferedRecords and firstSeenCheckpointId state. Compatibility with old running schema-evolution jobs restored from savepoints is unclear.
  • Historical behavior: DDL used to be broadcast immediately. It is now delayed until checkpoint completion, and only one DDL is processed per checkpoint cycle.

Recommendation: either keep backward-compatible restore from the old bufferedDataRows/pending state, or document that old streaming schema-evolution jobs should be restarted cleanly instead of restored from the old operator state.

1.3 Performance / Side Effect Analysis

  • CPU: low impact on low-frequency DDL paths.
  • Memory: pendingQueue can hold up to 100000 rows/events. The current overflow policy is unsafe because it can drop the DDL itself; see issue 3.
  • GC: while a DDL is pending, rows are retained in a LinkedList and snapshotted to operator state; high throughput or long checkpoints can increase heap and checkpoint state pressure.
  • Network: only small control rows are broadcast.
  • Lock contention: delaying DDL helps reduce MySQL MDL contention with XA commits. The Flink 1.20 writer still keeps the old prepare behavior; see issue 1.
  • Concurrency safety: concurrent maps/sets are used, but failure and timeout semantics still release data unsafely; see issue 2.
  • Retry/idempotency: failed schema changes are not retried safely because the source side still marks them as processed.
  • Resource release: unregistering closed sink subtasks helps avoid indefinite waits, but it makes correct failure handling even more important.

1.4 Error Handling and Logging

Issue 1: The Flink 1.20 sink writer still performs an untracked prepareCommit(), so the runtime fix is incomplete

  • Location: seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-20/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java:127
  • Description:
    This is the actual sink writer used by the Flink 1.20 runtime. The Flink 1.20 starter inserts BroadcastSchemaSinkOperator, then FlinkSink.createWriter() creates this writer. The PR removes the extra sinkWriter.prepareCommit() only from the flink-common writer, but the Flink 1.20 writer still calls sinkWriter.prepareCommit() at the beginning of handleSchemaChangeEvent(), and the returned committable is discarded.
    For JDBC exactly-once, prepareCommit() is an XA transaction boundary and should not be called outside the Flink checkpoint commit protocol without handling the committable. Even if the new source-side wait usually makes the current transaction empty, this leaves Flink 1.20 with different transaction semantics from the common path. If any row reaches the current writer transaction, the prepared XID is not passed to the Flink committer, which can leave a prepared XA transaction unmanaged or reopen the MDL conflict window.
  • Risk:
    Flink 1.20 users can still hit the old sink-side prepare behavior in the normal MySQL CDC + JDBC exactly-once schema-evolution path. The worst case is a prepared XA transaction that is not managed by Flink's committer, causing unreleased locks, a stuck DDL, or broken exactly-once semantics.
  • Best improvement:
    Option A: apply the same sink-writer fix to seatunnel-translation-flink-20 and remove the extra sinkWriter.prepareCommit() from handleSchemaChangeEvent().
    Option B: if Flink 1.20 really needs an explicit flush/prepare here, the returned committable must be integrated into the committer path or the job must fail fast; it should not be silently discarded.
  • Severity: High

Issue 2: Schema coordination failure or timeout still updates local schema state and releases buffered data

  • Location: seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/schema/SchemaOperator.java:264
  • Description:
    notifyCheckpointComplete() sends the schema event and calls coordinator.requestSchemaChange(). LocalSchemaCoordinator.requestSchemaChange() throws on failed ACKs or timeout, as shown in LocalSchemaCoordinator.java:244-261. However, SchemaOperator catches the exception, only logs it, and then still runs localSchemaState.put(), assigns lastProcessedEventTime, and calls drainDataUntilNextSchemaChange().
    This means that when sink-side ALTER TABLE fails, a subtask ACKs false, the ACK is lost, or the request times out after 300 seconds, the source side still marks the DDL as processed and releases rows written with the new schema.
  • Risk:
    Rows with the new schema can be written to a sink table that was not successfully altered. This can cause write failures, column mismatch, data loss, or recovery that skips the DDL because lastProcessedEventTime was advanced. This is a blocking correctness issue in the failure/recovery path.
  • Best improvement:
    Only update localSchemaState, advance lastProcessedEventTime, and release buffered rows after requestSchemaChange() succeeds. On false/timeout/exception, fail the job and keep the pending state so Flink recovery can retry the DDL from a checkpointed state.
  • Severity: High

Issue 3: When the pending queue is full, the oldest record can be the DDL control event itself

  • Location: seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/schema/SchemaOperator.java:168
  • Description:
    handleSchemaChangeDetected() appends the DDL as BufferedRecord.schemaChange(event), and all subsequent rows are appended to the same pendingQueue. When the queue reaches MAX_BUFFERED_RECORDS, enqueueDataRecord() calls pendingQueue.poll() and drops the oldest record. In the typical pending-DDL case, the oldest record is the DDL itself.
    Once the DDL is dropped before checkpoint completion, notifyCheckpointComplete() can see only data records, drain them, set schemaChangePending=false, and never apply the schema change.
  • Risk:
    The DDL is silently skipped, while rows after the DDL are still released. That can make the sink schema diverge from the source schema and cause write failures or data loss. The new E2E config uses read_limit.rows_per_second=400 and checkpoint.interval=5000, so it avoids this boundary; production workloads with higher throughput or delayed checkpoints can hit it.
  • Best improvement:
    Do not evict schema-change control events. Prefer failing fast on buffer overflow so Flink can recover, or separate control events from the data buffer. If a bounded buffer is required, make it state-backed/backpressured and configurable, but never drop the DDL silently.
  • Severity: High

2. Code Quality Assessment

2.1 Code Style

The code mostly follows the existing Flink translation style, and the new E2E config includes the ASF license header. The main concern is not formatting; it is the correctness of failure and overflow paths. Also, the comment says “two checkpoint cycles” while CHECKPOINT_WAIT_ROUNDS = 1, so the intended wait semantics should be clarified when the blocking issues are fixed.

2.2 Test Coverage

The PR adds an E2E case for the successful MySQL CDC -> MySQL JDBC exactly-once schema-evolution path.

Missing coverage:

  • Flink 1.20 seatunnel-translation-flink-20 writer path.
  • Sink applySchemaChange() failure, ACK false, and coordinator timeout.
  • Pending queue overflow where the DDL must not be dropped.
  • Recovery from operator state while a DDL is pending.
  • Multi-subtask/high-throughput/long-checkpoint scenarios.

Please add focused tests around SchemaOperator and the Flink 1.20 writer behavior. The current E2E is helpful, but it does not cover the most important failure modes introduced by this change.

2.3 Documentation

This changes user-visible behavior: Flink streaming schema evolution is now delayed by checkpoint completion, and at most one DDL is processed per checkpoint cycle. The PR does not update docs.

  • docs/en: should document the checkpoint-delayed DDL behavior and exactly-once sink limitations.
  • docs/zh: should be updated consistently.
  • Current English/Chinese consistency: no inconsistency was introduced, but the new behavior is not documented.

3. Architectural Reasonableness

3.1 Elegance of the Solution

This is a precise fix in direction, but not yet a complete long-term solution. Delaying DDL and moving ACK after real sink-side schema application matches the root cause. The remaining failure and overflow paths need to be closed before merge.

3.2 Maintainability

The main flow is readable, but SchemaOperator now owns DDL queuing, checkpoint wait, state restore, data release, and coordinator wait. The state transition should be made stricter: schema state and buffered data should be committed only after a successful sink-side schema application.

3.3 Extensibility

The approach can extend to other schema-evolution sinks if the control event is durable, all runtime adapters follow the same semantics, and failures go through Flink recovery. The current single global queue is a useful starting point, but it should not silently drop control events.

3.4 Historical Compatibility

Runtime API/config compatibility is mostly preserved, but compatibility with old Flink operator state is unclear because the state layout changed from the previous buffering model to bufferedRecords and firstSeenCheckpointId. Please document or implement the upgrade behavior.

4. Issue Summary

No. Issue Location Severity
Issue 1 Flink 1.20 sink writer still performs an untracked prepareCommit(), so the runtime fix is incomplete seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-20/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java:127 High
Issue 2 Schema coordination failure or timeout still updates local schema state and releases buffered data seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/schema/SchemaOperator.java:264 High
Issue 3 Pending queue overflow can drop the DDL control event itself seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/schema/SchemaOperator.java:168 High

5. Merge Decision

Conclusion: can merge after fixes

  1. Blocking items that must be fixed

    • Issue 1: the Flink 1.20 writer still keeps the old untracked XA prepare behavior, so the sink-side fix does not cover all normal runtimes.
    • Issue 2: after schema-change failure or timeout, the source side still releases rows after the DDL, which breaks schema/data consistency and recovery semantics.
    • Issue 3: queue overflow can silently drop the DDL control event and continue releasing rows with the new schema.
  2. Suggested non-blocking improvements

    • Clarify the CHECKPOINT_WAIT_ROUNDS = 1 semantics versus the “two checkpoint cycles” comment.
    • Add English and Chinese docs for the checkpoint-delayed schema evolution behavior.
    • Explain whether old streaming schema-evolution savepoints/operator state are supported during upgrade.

Overall, this PR is pointed at the right root cause, and the ACK-after-actual-DDL direction is a very good improvement. But I do not think it is safe to merge yet because the failure path can turn a failed DDL into released data, the queue overflow policy can lose the DDL itself, and the Flink 1.20 runtime has not been kept consistent with the common writer change.

A better implementation would treat the schema change as a transactional state transition: keep the DDL and following rows pending, update lastProcessedEventTime and release data only after all active sink subtasks ACK success, and fail/recover on any failure or timeout. Also, keep schema control events separate from the data buffer or fail fast on overflow instead of dropping the oldest record.

@CloverDew CloverDew force-pushed the fix/mysqlcdc-exactly-once-hang-on-flink branch from 2566039 to f25a22a Compare April 19, 2026 10:49
@CloverDew
Copy link
Copy Markdown
Contributor Author

Okay, I've fixed the issues you just mentioned. Please take a look again.

  • Removed the prepareCommit() call from the seatunnel-translation-flink-20 writer.

  • In the SchemaOperator, schema state updates and buffered data releases now only occur after successful execution of requestSchemaChange(...), throwing an exception if reconciliation fails.

  • Replaced the "Delete oldest record" behavior with fail-fast overflow handling to avoid silently discarding schema change control events.

  • Updated the SchemaOperator annotations to more precisely describe the semantics of additional checkpoint rounds.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants