Skip to content

Hudi 1.2 flink append-only sink failed to flush data #18424

@skywalker0618

Description

@skywalker0618

Bug Description

We found that the hudi 1.2 flink append-only sink failed to flush data to GCS when checkpoint barrier arrives. Please check the logs session for stack trace.

And I did some investigation, seems like there's a bug in the way we close disruptor thread. In hudi 1.2 flink sink, we added a disruptor queue and thread to decouple the parquet write from the main flink thread, the flow:

Image

When the checkpoint barrier arrives, the snapshotState function (code) is called, where it calls:

  1. flushDisruptor()
  2. super.snapshotState()

In step 1, flushDisruptor() calls disruptorQueue.close(), which drains ring buffer AND kills the disruptor thread. For the parquet in-memory buffer (the 2nd buffer), the disruptor thread is the writer thread, and the GCS upload thread is the reader thread. The reader thread checks every 1s if the writer thread is still alive, code:

while (in < 0) {
if (writeSide != null && !writeSide.isAlive()) {
throw new IOException("Pipe broken");
}
try { wait(1000); } catch (InterruptedException e) { ... }
}

So when the disruptor thread is terminated, the GCS upload thread detects "write side is not alive" and then this thread error out as well.
Hence in step 2, when super.snapshotState() tries to flush parquet in-memory buffer to GCS, the job errors out because the upload thread was dead.

To fix this, we can potentially change the flushDisruptor function NOT to call disruptorQueue.close() directly, but implementing a function to spin-wait until the disruptor buffer is fully consumed. In other words, just does the "spin-wait part" but not the "terminate thread" part.

Does this make sense?

Environment

**Hudi version: 1.2
**Query engine: Flink
Relevant configs:

Logs and Stack Trace

Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 2 for operator hoodie_append_write: default_database.kafka_hp_storeindex_query_nodedup (188/512)#0. Failure reason: Checkpoint was declined.
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:269)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:173)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:336)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:228)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:213)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:718)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:351)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$15(StreamTask.java:1318)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1306)
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1263)
... 16 common frames omitted
Caused by: org.apache.hudi.exception.HoodieException: Error collect the write status for task [187]
at org.apache.hudi.sink.bulk.BulkInsertWriterHelper.getWriteStatuses(BulkInsertWriterHelper.java:204)
at org.apache.hudi.sink.append.AppendWriteFunction.flushData(AppendWriteFunction.java:145)
at org.apache.hudi.sink.append.AppendWriteFunction.snapshotState(AppendWriteFunction.java:97)
at org.apache.hudi.sink.append.AppendWriteFunctionWithDisruptorBufferSort.snapshotState(AppendWriteFunctionWithDisruptorBufferSort.java:147)
at org.apache.hudi.sink.common.AbstractStreamWriteFunction.snapshotState(AbstractStreamWriteFunction.java:181)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:88)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:222)
... 27 common frames omitted
Caused by: java.io.IOException: Upload failed for 'gs://uber-staging-kbb9a/zyf5m/team/flink_ingestion_test/kafka_hp_storeindex_query_nodedup/2026/03/27/aef1bbf1-a85b-4395-a350-366deaeac34c-0_187-512-0_20260327175848521.parquet'. details=java.io.IOException: Pipe broken

Metadata

Metadata

Assignees

No one assigned

    Labels

    type:bugBug reports and fixes

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions