[Fix] [Connector-V2] Fix Flink schema evolution hang caused by XA transaction MDL deadlock with MySQL CDC#10648
Conversation
DanielLeens
left a comment
There was a problem hiding this comment.
What Problem Does This PR Solve?
- User pain point: when MySQL CDC schema evolution is enabled and the target is a MySQL JDBC exactly-once sink, a DDL event may reach the sink while previous XA transactions are still being committed. MySQL metadata locks can then make
ALTER TABLEwait on XA commit, while the job appears to hang. - Fix approach: this PR changes
SchemaOperatorfrom applying schema changes synchronously inprocessElement()to buffering the DDL and following records until checkpoint completion. It also moves the schema-change ACK fromBroadcastSchemaSinkOperatortoFlinkSinkWriter, so the source side is acknowledged only after the sink writer has actually runapplySchemaChange(). In the flink-common writer, it also removes an extrasinkWriter.prepareCommit()before applying the schema change. - One sentence: the PR tries to move Flink CDC schema evolution DDL execution behind the checkpoint/XA commit boundary, so MySQL
ALTER TABLEis less likely to deadlock with prepared XA transactions holding metadata locks.
Simple example: a job syncs shop.products to a MySQL sink and the source captures ALTER TABLE products ADD COLUMN add_column1. Before this PR, the sink could try to execute ALTER TABLE while the previous XA transaction was still committing. With this PR, SeaTunnel first holds back the rows after the DDL, waits for checkpoint completion, applies the DDL in the sink, and only then releases rows with the new schema.
1. Code Change Review
1.1 Core Logic Analysis
Precise Description of the Changes
Local review context:
- Local review branch:
seatunnel-review-10648 - PR head:
256603996702aaa914c13804726ff8f85ba05e53 - Target branch:
upstream/dev - Comparison base:
219c2acec365ae6fade236b513b34c8783042409 - Commit range:
upstream/dev..HEADincludes32474d776 fixand256603996 fix - Local diff: 6 files,
+534 / -639 - CI: GitHub
Build,Notify test workflow, andlabelerare all passing. The PR ismergeable=true,mergeStateStatus=BLOCKED. - Local command result:
git diff --check upstream/dev...HEADproduced no output, so I did not find whitespace errors. Per the review requirement, I did not run the PR locally or run Maven/E2E jobs.
The main changes are in:
seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/schema/SchemaOperator.java:116-299processElement()now buffers detected DDL events;notifyCheckpointComplete()broadcasts schema events after checkpoint completion and waits for coordinator ACKs;- snapshot/restore state now includes the pending queue and first-seen checkpoint id.
seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/schema/BroadcastSchemaSinkOperator.java:142-209- ACK is no longer sent immediately after forwarding;
- the schema event is forwarded to the sink writer, which sends ACK only after
applySchemaChange()completes.
seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java:125-167- the extra
sinkWriter.prepareCommit()before schema change application was removed in the flink-common writer; - the writer sends success/failure ACK through
LocalSchemaCoordinator.notifySchemaChangeApplied().
- the extra
seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/schema/coordinator/LocalSchemaCoordinator.java:122-342- active sink subtasks are tracked dynamically instead of relying on fixed registered parallelism.
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCWithFlinkSchemaChangeIT.java:151-168- a MySQL CDC schema evolution exactly-once E2E path was added.
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_flink_schema_change_exactly_once.conf:21-54- the new E2E config enables
is_exactly_once=trueandxa_data_source_class_name.
- the new E2E config enables
This code is on the normal runtime path:
- Source side:
SourceExecuteProcessorinsertsSchemaOperatorwhen the job is streaming andschema-changes.enabled=true, seeseatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java:102-122. - Sink side: the Flink starter inserts
BroadcastSchemaSinkOperatorwhen the sink supports schema evolution, seeseatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java:52-70andseatunnel-core/seatunnel-flink-starter/seatunnel-flink-20-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java:50-67. - Sink writer: the sink adapter creates
FlinkSinkWriter, see the common path atseatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java:77-92and the Flink 1.20 path atseatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-20/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java:76-83.
Before / After Code Snippets
Before, SchemaOperator broadcasted the DDL and synchronously waited in the element-processing path:
if ("__SCHEMA_CHANGE_EVENT__".equals(element.getTableId())
&& element.getOptions() != null) {
Object object = element.getOptions().get("schema_change_event");
if (object instanceof SchemaChangeEvent) {
handleSchemaChangeEvent((SchemaChangeEvent) object);
return;
}
}
sendSchemaChangeEventToDownstream(schemaChangeEvent);
boolean success = coordinator.requestSchemaChange(tableId, eventTime, timeoutMs);
...
releaseBufferedData(key, tableId);After, DDL events and subsequent records are buffered and the DDL is applied from notifyCheckpointComplete():
if ("__SCHEMA_CHANGE_EVENT__".equals(element.getTableId())
&& element.getOptions() != null) {
Object object = element.getOptions().get("schema_change_event");
if (object instanceof SchemaChangeEvent) {
handleSchemaChangeDetected((SchemaChangeEvent) object, streamRecord.getTimestamp());
return;
}
}
if (schemaChangePending) {
enqueueDataRecord(element, streamRecord.getTimestamp());
return;
}if (firstSeenCheckpointId < 0) {
firstSeenCheckpointId = checkpointId;
return;
}
if (checkpointId < firstSeenCheckpointId + CHECKPOINT_WAIT_ROUNDS) {
return;
}
sendSchemaChangeEventToDownstream(event);
boolean success = coordinator.requestSchemaChange(tableId, eventTime, SCHEMA_CHANGE_TIMEOUT_MS);
...
drainDataUntilNextSchemaChange();Before, BroadcastSchemaSinkOperator ACKed immediately after forwarding:
emitApplySchemaEventToSink(event, epoch);
lastProcessedEpoch.put(tableId, epoch);
coordinator.notifySchemaChangeApplied(tableId, epoch, subtaskId, true);After, ACK is sent by the sink writer after the actual schema change:
emitApplySchemaEventToSink(event, epoch);
lastProcessedEpoch.put(tableId, epoch);
// ACK will be sent by FlinkSinkWriter after ALTER TABLE completes.try {
((SupportSchemaEvolutionSinkWriter) sinkWriter).applySchemaChange(schemaChangeEvent);
success = true;
} catch (Exception e) {
log.error("Failed to apply schema change for table: {}", schemaChangeEvent.tableIdentifier(), e);
} finally {
sendSchemaChangeAck(schemaChangeEvent, epoch, subtaskId, success);
}Before, the flink-common writer performed an extra prepare before schema change application:
sinkWriter.prepareCommit();
if (!(sinkWriter instanceof SupportSchemaEvolutionSinkWriter)) {
...
}
((SupportSchemaEvolutionSinkWriter) sinkWriter).applySchemaChange(schemaChangeEvent);After, the flink-common path removes that extra prepare:
if (!(sinkWriter instanceof SupportSchemaEvolutionSinkWriter)) {
...
}
((SupportSchemaEvolutionSinkWriter) sinkWriter).applySchemaChange(schemaChangeEvent);However, the Flink 1.20 writer still keeps the old behavior:
// seatunnel-translation-flink-20/.../FlinkSinkWriter.java:127-149
private void handleSchemaChangeEvent(
SchemaChangeEvent schemaChangeEvent, Map<String, Object> options) throws IOException {
...
sinkWriter.prepareCommit();
if (!(sinkWriter instanceof SupportSchemaEvolutionSinkWriter)) {
...
}
...
((SupportSchemaEvolutionSinkWriter) sinkWriter).applySchemaChange(schemaChangeEvent);
}Key Findings
- The normal path does hit this PR: MySQL CDC emits
__SCHEMA_CHANGE_EVENT__;SchemaOperatorbuffers the event;BroadcastSchemaSinkOperatorforwards it to the sink;FlinkSinkWriterapplies the schema change and ACKs the coordinator. - The direction is right: delaying DDL until checkpoint completion and ACKing only after the real sink-side DDL is much closer to the XA/MDL failure mode.
- There are still blocking issues: the Flink 1.20 writer did not receive the sink-side fix; schema coordination failure still releases buffered rows; and pending queue overflow can drop the DDL control event itself.
- The new E2E test covers only the successful, single-parallelism, throttled path. It does not cover Flink 1.20, failed ACKs/timeouts, queue overflow, or recovery with a pending DDL.
Deep Correctness Analysis
On the success path, the design is sound: after a DDL is detected, rows after the DDL are buffered; checkpoint completion is used as the boundary before the DDL is broadcast; every sink subtask must apply the DDL and ACK; then the source side releases buffered rows with the new schema. That should reduce the MySQL metadata-lock conflict with previous XA commits.
The current implementation still depends on conditions that are not fully enforced:
- every Flink runtime writer must follow the same schema-event transaction semantics;
- if sink-side DDL fails, ACK fails, or coordinator times out, the source must not mark the DDL as processed or release rows after it;
- control events must not be evicted by ordinary data records;
- the checkpoint-wait semantics should be clarified and tested, especially because the code says “two checkpoint cycles” while
CHECKPOINT_WAIT_ROUNDS = 1.
So this is a good direction, but not yet a safe merge as-is.
Complete Runtime Flow
CDC source captures a DDL
-> FlinkRowCollector.collect(SchemaChangeEvent) [FlinkRowCollector.java:75-82]
-> creates tableId="__SCHEMA_CHANGE_EVENT__"
-> options["schema_change_event"] = SchemaChangeEvent
Flink source translation
-> SourceExecuteProcessor.createSeaTunnelSource [SourceExecuteProcessor.java:102-122]
-> job.mode=STREAMING and schema-changes.enabled=true
-> source implements SupportSchemaEvolution
-> inserts SchemaOperator
SchemaOperator
-> processElement() [SchemaOperator.java:116-140]
-> normal row: forwarded when no DDL is pending, buffered when a DDL is pending
-> DDL row: handleSchemaChangeDetected() adds it to pendingQueue [SchemaOperator.java:143-165]
-> notifyCheckpointComplete(checkpointId) [SchemaOperator.java:193-299]
-> first time DDL reaches queue head: record firstSeenCheckpointId and return [L212-L223]
-> later checkpoint satisfies the wait condition: poll the DDL [L225-L241]
-> sendSchemaChangeEventToDownstream(event) [L262, L451-L459]
-> coordinator.requestSchemaChange(..., 300s) [L264-L266]
-> on success, update localSchemaState/lastProcessedEventTime and release buffered data [L286-L292]
Flink sink translation
-> SinkExecuteProcessor.createVersionSpecificDataStreamSink()
-> flink-common starter inserts BroadcastSchemaSinkOperator [seatunnel-flink-starter-common/.../SinkExecuteProcessor.java:52-70]
-> Flink 1.20 starter also inserts BroadcastSchemaSinkOperator [seatunnel-flink-20-starter/.../SinkExecuteProcessor.java:50-67]
BroadcastSchemaSinkOperator
-> processElement() receives options["schema_change_broadcast"] [BroadcastSchemaSinkOperator.java:129-136]
-> handleBroadcastedSchemaChange() [L142-L199]
-> emitApplySchemaEventToSink(event, epoch) [L201-L209]
-> does not ACK here; waits for the sink writer to ACK after ALTER TABLE
FlinkSinkWriter
-> common path createWriter() [FlinkSink.java:77-92]
-> FlinkSinkWriter.handleSchemaChangeEvent() [FlinkSinkWriter.java:125-167]
-> applySchemaChange()
-> sendSchemaChangeAck(success/failure)
-> Flink 1.20 path createWriter() [flink-20/FlinkSink.java:76-83]
-> flink-20/FlinkSinkWriter.handleSchemaChangeEvent() [flink-20/FlinkSinkWriter.java:127-149]
-> still calls sinkWriter.prepareCommit() first
-> then applySchemaChange()
JDBC exactly-once sink
-> AbstractJdbcSinkWriter.applySchemaChange() [AbstractJdbcSinkWriter.java:61-65]
-> reOpenOutputFormat(event) [L67-L87]
-> prepareCommit() [L68]
-> dialect.applySchemaChange(connection, sinkTablePath, event) [L71-L73]
-> JdbcExactlyOnceSinkWriter.prepareCommit(long checkpointId) [JdbcExactlyOnceSinkWriter.java:171-198]
-> prepareCurrentTx()
-> beginTx(checkpointId)
1.2 Compatibility Impact
Judgement: partially incompatible.
- API: no public Java API/SPI signature changes.
- Config options: no user-facing option was added or removed.
- Defaults: no default value change.
- Protocol: no external protocol change; internally, schema-change ACK timing changes from “after broadcast forwarding” to “after sink writer apply”.
- Serialization format: the Flink operator state layout changes through new
bufferedRecordsandfirstSeenCheckpointIdstate. Compatibility with old running schema-evolution jobs restored from savepoints is unclear. - Historical behavior: DDL used to be broadcast immediately. It is now delayed until checkpoint completion, and only one DDL is processed per checkpoint cycle.
Recommendation: either keep backward-compatible restore from the old bufferedDataRows/pending state, or document that old streaming schema-evolution jobs should be restarted cleanly instead of restored from the old operator state.
1.3 Performance / Side Effect Analysis
- CPU: low impact on low-frequency DDL paths.
- Memory:
pendingQueuecan hold up to 100000 rows/events. The current overflow policy is unsafe because it can drop the DDL itself; see issue 3. - GC: while a DDL is pending, rows are retained in a
LinkedListand snapshotted to operator state; high throughput or long checkpoints can increase heap and checkpoint state pressure. - Network: only small control rows are broadcast.
- Lock contention: delaying DDL helps reduce MySQL MDL contention with XA commits. The Flink 1.20 writer still keeps the old prepare behavior; see issue 1.
- Concurrency safety: concurrent maps/sets are used, but failure and timeout semantics still release data unsafely; see issue 2.
- Retry/idempotency: failed schema changes are not retried safely because the source side still marks them as processed.
- Resource release: unregistering closed sink subtasks helps avoid indefinite waits, but it makes correct failure handling even more important.
1.4 Error Handling and Logging
Issue 1: The Flink 1.20 sink writer still performs an untracked prepareCommit(), so the runtime fix is incomplete
- Location:
seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-20/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java:127 - Description:
This is the actual sink writer used by the Flink 1.20 runtime. The Flink 1.20 starter insertsBroadcastSchemaSinkOperator, thenFlinkSink.createWriter()creates this writer. The PR removes the extrasinkWriter.prepareCommit()only from the flink-common writer, but the Flink 1.20 writer still callssinkWriter.prepareCommit()at the beginning ofhandleSchemaChangeEvent(), and the returned committable is discarded.
For JDBC exactly-once,prepareCommit()is an XA transaction boundary and should not be called outside the Flink checkpoint commit protocol without handling the committable. Even if the new source-side wait usually makes the current transaction empty, this leaves Flink 1.20 with different transaction semantics from the common path. If any row reaches the current writer transaction, the prepared XID is not passed to the Flink committer, which can leave a prepared XA transaction unmanaged or reopen the MDL conflict window. - Risk:
Flink 1.20 users can still hit the old sink-side prepare behavior in the normal MySQL CDC + JDBC exactly-once schema-evolution path. The worst case is a prepared XA transaction that is not managed by Flink's committer, causing unreleased locks, a stuck DDL, or broken exactly-once semantics. - Best improvement:
Option A: apply the same sink-writer fix toseatunnel-translation-flink-20and remove the extrasinkWriter.prepareCommit()fromhandleSchemaChangeEvent().
Option B: if Flink 1.20 really needs an explicit flush/prepare here, the returned committable must be integrated into the committer path or the job must fail fast; it should not be silently discarded. - Severity: High
Issue 2: Schema coordination failure or timeout still updates local schema state and releases buffered data
- Location:
seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/schema/SchemaOperator.java:264 - Description:
notifyCheckpointComplete()sends the schema event and callscoordinator.requestSchemaChange().LocalSchemaCoordinator.requestSchemaChange()throws on failed ACKs or timeout, as shown inLocalSchemaCoordinator.java:244-261. However,SchemaOperatorcatches the exception, only logs it, and then still runslocalSchemaState.put(), assignslastProcessedEventTime, and callsdrainDataUntilNextSchemaChange().
This means that when sink-sideALTER TABLEfails, a subtask ACKs false, the ACK is lost, or the request times out after 300 seconds, the source side still marks the DDL as processed and releases rows written with the new schema. - Risk:
Rows with the new schema can be written to a sink table that was not successfully altered. This can cause write failures, column mismatch, data loss, or recovery that skips the DDL becauselastProcessedEventTimewas advanced. This is a blocking correctness issue in the failure/recovery path. - Best improvement:
Only updatelocalSchemaState, advancelastProcessedEventTime, and release buffered rows afterrequestSchemaChange()succeeds. On false/timeout/exception, fail the job and keep the pending state so Flink recovery can retry the DDL from a checkpointed state. - Severity: High
Issue 3: When the pending queue is full, the oldest record can be the DDL control event itself
- Location:
seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/schema/SchemaOperator.java:168 - Description:
handleSchemaChangeDetected()appends the DDL asBufferedRecord.schemaChange(event), and all subsequent rows are appended to the samependingQueue. When the queue reachesMAX_BUFFERED_RECORDS,enqueueDataRecord()callspendingQueue.poll()and drops the oldest record. In the typical pending-DDL case, the oldest record is the DDL itself.
Once the DDL is dropped before checkpoint completion,notifyCheckpointComplete()can see only data records, drain them, setschemaChangePending=false, and never apply the schema change. - Risk:
The DDL is silently skipped, while rows after the DDL are still released. That can make the sink schema diverge from the source schema and cause write failures or data loss. The new E2E config usesread_limit.rows_per_second=400andcheckpoint.interval=5000, so it avoids this boundary; production workloads with higher throughput or delayed checkpoints can hit it. - Best improvement:
Do not evict schema-change control events. Prefer failing fast on buffer overflow so Flink can recover, or separate control events from the data buffer. If a bounded buffer is required, make it state-backed/backpressured and configurable, but never drop the DDL silently. - Severity: High
2. Code Quality Assessment
2.1 Code Style
The code mostly follows the existing Flink translation style, and the new E2E config includes the ASF license header. The main concern is not formatting; it is the correctness of failure and overflow paths. Also, the comment says “two checkpoint cycles” while CHECKPOINT_WAIT_ROUNDS = 1, so the intended wait semantics should be clarified when the blocking issues are fixed.
2.2 Test Coverage
The PR adds an E2E case for the successful MySQL CDC -> MySQL JDBC exactly-once schema-evolution path.
Missing coverage:
- Flink 1.20
seatunnel-translation-flink-20writer path. - Sink
applySchemaChange()failure, ACK false, and coordinator timeout. - Pending queue overflow where the DDL must not be dropped.
- Recovery from operator state while a DDL is pending.
- Multi-subtask/high-throughput/long-checkpoint scenarios.
Please add focused tests around SchemaOperator and the Flink 1.20 writer behavior. The current E2E is helpful, but it does not cover the most important failure modes introduced by this change.
2.3 Documentation
This changes user-visible behavior: Flink streaming schema evolution is now delayed by checkpoint completion, and at most one DDL is processed per checkpoint cycle. The PR does not update docs.
docs/en: should document the checkpoint-delayed DDL behavior and exactly-once sink limitations.docs/zh: should be updated consistently.- Current English/Chinese consistency: no inconsistency was introduced, but the new behavior is not documented.
3. Architectural Reasonableness
3.1 Elegance of the Solution
This is a precise fix in direction, but not yet a complete long-term solution. Delaying DDL and moving ACK after real sink-side schema application matches the root cause. The remaining failure and overflow paths need to be closed before merge.
3.2 Maintainability
The main flow is readable, but SchemaOperator now owns DDL queuing, checkpoint wait, state restore, data release, and coordinator wait. The state transition should be made stricter: schema state and buffered data should be committed only after a successful sink-side schema application.
3.3 Extensibility
The approach can extend to other schema-evolution sinks if the control event is durable, all runtime adapters follow the same semantics, and failures go through Flink recovery. The current single global queue is a useful starting point, but it should not silently drop control events.
3.4 Historical Compatibility
Runtime API/config compatibility is mostly preserved, but compatibility with old Flink operator state is unclear because the state layout changed from the previous buffering model to bufferedRecords and firstSeenCheckpointId. Please document or implement the upgrade behavior.
4. Issue Summary
| No. | Issue | Location | Severity |
|---|---|---|---|
| Issue 1 | Flink 1.20 sink writer still performs an untracked prepareCommit(), so the runtime fix is incomplete |
seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-20/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java:127 |
High |
| Issue 2 | Schema coordination failure or timeout still updates local schema state and releases buffered data | seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/schema/SchemaOperator.java:264 |
High |
| Issue 3 | Pending queue overflow can drop the DDL control event itself | seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/schema/SchemaOperator.java:168 |
High |
5. Merge Decision
Conclusion: can merge after fixes
-
Blocking items that must be fixed
- Issue 1: the Flink 1.20 writer still keeps the old untracked XA prepare behavior, so the sink-side fix does not cover all normal runtimes.
- Issue 2: after schema-change failure or timeout, the source side still releases rows after the DDL, which breaks schema/data consistency and recovery semantics.
- Issue 3: queue overflow can silently drop the DDL control event and continue releasing rows with the new schema.
-
Suggested non-blocking improvements
- Clarify the
CHECKPOINT_WAIT_ROUNDS = 1semantics versus the “two checkpoint cycles” comment. - Add English and Chinese docs for the checkpoint-delayed schema evolution behavior.
- Explain whether old streaming schema-evolution savepoints/operator state are supported during upgrade.
- Clarify the
Overall, this PR is pointed at the right root cause, and the ACK-after-actual-DDL direction is a very good improvement. But I do not think it is safe to merge yet because the failure path can turn a failed DDL into released data, the queue overflow policy can lose the DDL itself, and the Flink 1.20 runtime has not been kept consistent with the common writer change.
A better implementation would treat the schema change as a transactional state transition: keep the DDL and following rows pending, update lastProcessedEventTime and release data only after all active sink subtasks ACK success, and fail/recover on any failure or timeout. Also, keep schema control events separate from the data buffer or fail fast on overflow instead of dropping the oldest record.
2566039 to
f25a22a
Compare
|
Okay, I've fixed the issues you just mentioned. Please take a look again.
|
Purpose of this pull request
This PR fixes a deadlock issue where Flink jobs using MySQL CDC source with JDBC Exactly-Once sink would hang indefinitely during schema evolution. The root cause was a self-deadlock in JdbcExactlyOnceSinkWriter where prepared XA transactions held MySQL Metadata Locks that blocked subsequent DDL operations on the same connection.
For details, please see: #10647
Key Changes:
JdbcExactlyOnceSinkWriter deadlock fix: Modified reOpenOutputFormat() to properly handle XA transaction cleanup before DDL execution
SchemaOperator: Implemented checkpoint-aware DDL deferring mechanism to prevent XA/DDL timing conflicts
Test stability improvement: Adjusted test timeout tolerance for timestamp default values
Does this PR introduce any user-facing change?
Yes. This PR improves the reliability of schema evolution in production environments:
Previous behavior: Flink jobs would hang indefinitely when DDL operations occurred during checkpoints with exactly-once semantics enabled.
New behavior: DDL operations are processed safely with proper XA transaction coordination, ensuring both data consistency and schema evolution reliability.
Impact: Users can now safely enable both exactly-once semantics and schema evolution without experiencing job hangs.
How was this patch tested?
A new test method has been added:
MysqlCDCWithFlinkSchemaChangeIT.testMysqlCdcWithSchemaEvolutionCaseExactlyOnce.It now runs smoothly and no longer encounters job hangs.
Check list
New License Guide
incompatible-changes.mdto describe the incompatibility caused by this PR.