Skip to content

Commit f25a22a

Browse files
committed
resolve comments
1 parent 6d112e7 commit f25a22a

2 files changed

Lines changed: 44 additions & 33 deletions

File tree

seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-20/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,6 @@ private void handleSchemaChangeEvent(
130130
"FlinkSinkWriter applying SchemaChangeEvent for table: {}",
131131
schemaChangeEvent.tableIdentifier());
132132

133-
sinkWriter.prepareCommit();
134133
if (!(sinkWriter instanceof SupportSchemaEvolutionSinkWriter)) {
135134
log.warn(
136135
"Sink writer {} does not support schema evolution, ignoring SchemaChangeEvent for table: {}",

seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/schema/SchemaOperator.java

Lines changed: 44 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import org.apache.seatunnel.api.table.schema.SchemaChangeType;
2626
import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
2727
import org.apache.seatunnel.api.table.schema.event.TableEvent;
28+
import org.apache.seatunnel.api.table.schema.exception.SchemaEvolutionErrorCode;
29+
import org.apache.seatunnel.api.table.schema.exception.SchemaEvolutionException;
2830
import org.apache.seatunnel.api.table.schema.exception.SchemaValidationException;
2931
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
3032
import org.apache.seatunnel.translation.flink.schema.coordinator.LocalSchemaCoordinator;
@@ -53,11 +55,12 @@
5355
* Operator placed after the source to handle schema evolution.
5456
*
5557
* <p>schema change events are NOT processed synchronously in {@link #processElement}. Instead, they
56-
* are buffered and deferred until <b>two</b> checkpoint cycles have completed. This two-round wait
57-
* ensures that when the sink executes ALTER TABLE, all XA transactions from prior checkpoint cycles
58-
* have been fully committed by the {@code FlinkGlobalCommitter} (which runs asynchronously after
59-
* {@code notifyCheckpointComplete}), so their metadata locks are released and the ALTER TABLE can
60-
* acquire an exclusive MDL lock without deadlock.
58+
* are buffered and deferred until an additional checkpoint cycle has completed after the first
59+
* checkpoint that observed the pending DDL. This wait ensures that when the sink executes ALTER
60+
* TABLE, all XA transactions from prior checkpoint cycles have been fully committed by the {@code
61+
* FlinkGlobalCommitter} (which runs asynchronously after {@code notifyCheckpointComplete}), so
62+
* their metadata locks are released and the ALTER TABLE can acquire an exclusive MDL lock without
63+
* deadlock.
6164
*
6265
* <p>Per checkpoint cycle, at most ONE schema change is applied. If multiple DDLs arrive between
6366
* two checkpoints, they are processed across successive checkpoint cycles.
@@ -167,26 +170,41 @@ private void handleSchemaChangeDetected(SchemaChangeEvent event, long timestamp)
167170

168171
private void enqueueDataRecord(SeaTunnelRow row, long timestamp) {
169172
if (pendingQueue.size() >= MAX_BUFFERED_RECORDS) {
170-
log.warn(
171-
"Pending queue exceeded max size {}, dropping oldest record",
172-
MAX_BUFFERED_RECORDS);
173-
pendingQueue.poll();
173+
TableIdentifier tableIdentifier = getPendingSchemaTableIdentifier();
174+
throw new SchemaEvolutionException(
175+
SchemaEvolutionErrorCode.SCHEMA_EVENT_PROCESSING_FAILED,
176+
String.format(
177+
"Pending schema buffer overflow (max=%d). "
178+
+ "Failing fast to avoid dropping schema change control events.",
179+
MAX_BUFFERED_RECORDS),
180+
tableIdentifier,
181+
jobId);
174182
}
175183
pendingQueue.add(BufferedRecord.data(row, timestamp));
176184
}
177185

186+
private TableIdentifier getPendingSchemaTableIdentifier() {
187+
for (BufferedRecord record : pendingQueue) {
188+
if (record.isSchemaChange && record.schemaEvent != null) {
189+
return record.schemaEvent.tableIdentifier();
190+
}
191+
}
192+
return null;
193+
}
194+
178195
/**
179-
* Called by Flink after a checkpoint succeeds. Uses a two-round wait to ensure safety:
196+
* Called by Flink after a checkpoint succeeds. Uses an extra completed checkpoint round to
197+
* ensure safety:
180198
*
181199
* <ul>
182200
* <li><b>first time seeing the DDL: record {@link #firstSeenCheckpointId} but do NOT
183201
* broadcast the DDL yet. At this point the {@code FlinkGlobalCommitter} may still be
184202
* running {@code XA COMMIT} for this checkpoint's prepared transactions, holding MDL
185203
* locks on the sink table.
186204
* <li><b>{@code checkpointId >= firstSeenCheckpointId + CHECKPOINT_WAIT_ROUNDS} : the XA
187-
* COMMIT from the earlier checkpoint cycle is guaranteed to have finished (another full
188-
* checkpoint cycle has completed, which implies the committer ran). The sink's ALTER
189-
* TABLE will not encounter MDL lock, it is now safe to broadcast the DDL.
205+
* COMMIT from the earlier checkpoint cycle is guaranteed to have finished (at least one
206+
* additional checkpoint cycle has completed, which implies the committer ran). The sink's
207+
* ALTER TABLE will not encounter MDL lock, it is now safe to broadcast the DDL.
190208
* </ul>
191209
*/
192210
@Override
@@ -261,27 +279,21 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
261279

262280
sendSchemaChangeEventToDownstream(event);
263281

264-
try {
265-
boolean success =
266-
coordinator.requestSchemaChange(tableId, eventTime, SCHEMA_CHANGE_TIMEOUT_MS);
267-
if (success) {
268-
log.info(
269-
"Schema change for table {} (epoch {}) confirmed by all sink subtasks.",
270-
tableId,
271-
eventTime);
272-
} else {
273-
log.error(
274-
"Schema change for table {} (epoch {}) failed or timed out.",
275-
tableId,
276-
eventTime);
277-
}
278-
} catch (Exception e) {
279-
log.error(
280-
"Error during schema change coordination for table {} (epoch {})",
282+
boolean success =
283+
coordinator.requestSchemaChange(tableId, eventTime, SCHEMA_CHANGE_TIMEOUT_MS);
284+
if (!success) {
285+
throw new SchemaEvolutionException(
286+
SchemaEvolutionErrorCode.SCHEMA_EVENT_PROCESSING_FAILED,
287+
String.format(
288+
"Schema change for table %s (epoch %d) failed during sink coordination.",
289+
tableId, eventTime),
281290
tableId,
282-
eventTime,
283-
e);
291+
jobId);
284292
}
293+
log.info(
294+
"Schema change for table {} (epoch {}) confirmed by all sink subtasks.",
295+
tableId,
296+
eventTime);
285297

286298
CatalogTable newSchema = event.getChangeAfter();
287299
if (newSchema != null) {

0 commit comments

Comments
 (0)