-
Notifications
You must be signed in to change notification settings - Fork 2.5k
Hudi 1.2 flink append-only sink failed to flush data #18424
Description
Bug Description
We found that the hudi 1.2 flink append-only sink failed to flush data to GCS when checkpoint barrier arrives. Please check the logs session for stack trace.
And I did some investigation, seems like there's a bug in the way we close disruptor thread. In hudi 1.2 flink sink, we added a disruptor queue and thread to decouple the parquet write from the main flink thread, the flow:
When the checkpoint barrier arrives, the snapshotState function (code) is called, where it calls:
- flushDisruptor()
- super.snapshotState()
In step 1, flushDisruptor() calls disruptorQueue.close(), which drains ring buffer AND kills the disruptor thread. For the parquet in-memory buffer (the 2nd buffer), the disruptor thread is the writer thread, and the GCS upload thread is the reader thread. The reader thread checks every 1s if the writer thread is still alive, code:
while (in < 0) {
if (writeSide != null && !writeSide.isAlive()) {
throw new IOException("Pipe broken");
}
try { wait(1000); } catch (InterruptedException e) { ... }
}
So when the disruptor thread is terminated, the GCS upload thread detects "write side is not alive" and then this thread error out as well.
Hence in step 2, when super.snapshotState() tries to flush parquet in-memory buffer to GCS, the job errors out because the upload thread was dead.
To fix this, we can potentially change the flushDisruptor function NOT to call disruptorQueue.close() directly, but implementing a function to spin-wait until the disruptor buffer is fully consumed. In other words, just does the "spin-wait part" but not the "terminate thread" part.
Does this make sense?
Environment
**Hudi version: 1.2
**Query engine: Flink
Relevant configs:
Logs and Stack Trace
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 2 for operator hoodie_append_write: default_database.kafka_hp_storeindex_query_nodedup (188/512)#0. Failure reason: Checkpoint was declined.
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:269)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:173)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:336)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:228)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:213)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:718)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:351)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$15(StreamTask.java:1318)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1306)
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1263)
... 16 common frames omitted
Caused by: org.apache.hudi.exception.HoodieException: Error collect the write status for task [187]
at org.apache.hudi.sink.bulk.BulkInsertWriterHelper.getWriteStatuses(BulkInsertWriterHelper.java:204)
at org.apache.hudi.sink.append.AppendWriteFunction.flushData(AppendWriteFunction.java:145)
at org.apache.hudi.sink.append.AppendWriteFunction.snapshotState(AppendWriteFunction.java:97)
at org.apache.hudi.sink.append.AppendWriteFunctionWithDisruptorBufferSort.snapshotState(AppendWriteFunctionWithDisruptorBufferSort.java:147)
at org.apache.hudi.sink.common.AbstractStreamWriteFunction.snapshotState(AbstractStreamWriteFunction.java:181)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:88)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:222)
... 27 common frames omitted
Caused by: java.io.IOException: Upload failed for 'gs://uber-staging-kbb9a/zyf5m/team/flink_ingestion_test/kafka_hp_storeindex_query_nodedup/2026/03/27/aef1bbf1-a85b-4395-a350-366deaeac34c-0_187-512-0_20260327175848521.parquet'. details=java.io.IOException: Pipe broken