-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Description
Search before reporting
- I searched in the issues and found nothing similar.
Read release policy
- I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.
User environment
Broker version: Master
Java version: Java8
Issue Description
The FileSource connector (org.apache.pulsar.io.file.FileSource) initializes internal blocking queues (workQueue, inProcess, recentlyProcessed) using the default new LinkedBlockingQueue<>() constructor.
Internal queues should have a bounded capacity to provide backpressure. This prevents memory exhaustion when the producer (file listing) creates tasks faster than the consumer (file processing) can handle them.
The default constructor sets the capacity to Integer.MAX_VALUE. If the FileListingThread scans files significantly faster than the FileConsumerThread can process them (e.g., large files or slow I/O), the workQueue will grow indefinitely, eventually leading to a java.lang.OutOfMemoryError.
This is a classic "Unbounded Size Cache Queue" (UBSCQ) pattern. It lacks protection against traffic spikes or slow consumption rates.
Error messages
java.lang.OutOfMemoryError: Java heap space
at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:416)
at org.apache.pulsar.io.file.FileListingThread.run(FileListingThread.java:...)
Reproducing the issue
-
Configure a FileSource connector to monitor a directory.
-
Generate a scenario where the source directory contains a massive number of files (high production rate).
-
Ensure the file processing logic is slower than the disk listing speed (simulation of backpressure accumulation).
-
Monitor the JVM Heap memory; the workQueue object size will grow until OOM occurs.
Relevant code: Location: pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileSource.java
// Line 39-41: Queues are unbounded
private final BlockingQueue workQueue = new LinkedBlockingQueue<>();
private final BlockingQueue inProcess = new LinkedBlockingQueue<>();
private final BlockingQueue recentlyProcessed = new LinkedBlockingQueue<>();
// Line 49: ThreadPool is also unbounded
executor = Executors.newFixedThreadPool(fileConfig.getNumWorkers() + 2);
Additional information
Suggestion for fix: Replace the unbounded initialization with a configurable or fixed capacity to enable blocking behavior (backpressure) when the queue is full.
// Example fix
private final BlockingQueue workQueue = new LinkedBlockingQueue<>(10000);
This ensures the FileListingThread blocks when the consumer cannot keep up, preventing OOM.
Are you willing to submit a PR?
- I'm willing to submit a PR!