Skip to content

Commit eea2d9a

Browse files
Dhruv Paranjapegilibelz2cstyles
authored
AsyncProducer: Limit the number of retries & AsyncProducer: make sure threads are running before shutdown (#10)
* AsyncProducer: limit number of retries * AsyncProducer: make sure ensure_threads_running before shutdown and deliver_message * create a unique mutex before creating the Timer and Worker threads. * call ensure_threads_running before `shutdown` and `deliver_messages` * async_producer: change max_retries default value to nil --------- Co-authored-by: gilibelz <[email protected]> Co-authored-by: Collin Styles <[email protected]>
1 parent 033eee5 commit eea2d9a

File tree

3 files changed

+81
-24
lines changed

3 files changed

+81
-24
lines changed

lib/kafka/async_producer.rb

Lines changed: 41 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ class AsyncProducer
7171
# @param delivery_interval [Integer] if greater than zero, the number of
7272
# seconds between automatic message deliveries.
7373
#
74-
def initialize(sync_producer:, max_queue_size: 1000, delivery_threshold: 0, delivery_interval: 0, instrumenter:, logger:)
74+
def initialize(sync_producer:, max_queue_size: 1000, delivery_threshold: 0, delivery_interval: 0, max_retries: nil, retry_backoff: 0, instrumenter:, logger:)
7575
raise ArgumentError unless max_queue_size > 0
7676
raise ArgumentError unless delivery_threshold >= 0
7777
raise ArgumentError unless delivery_interval >= 0
@@ -85,12 +85,16 @@ def initialize(sync_producer:, max_queue_size: 1000, delivery_threshold: 0, deli
8585
queue: @queue,
8686
producer: sync_producer,
8787
delivery_threshold: delivery_threshold,
88+
max_retries: max_retries,
89+
retry_backoff: retry_backoff,
8890
instrumenter: instrumenter,
8991
logger: logger,
9092
)
9193

9294
# The timer will no-op if the delivery interval is zero.
9395
@timer = Timer.new(queue: @queue, interval: delivery_interval)
96+
97+
@thread_mutex = Mutex.new
9498
end
9599

96100
# Produces a message to the specified topic.
@@ -125,6 +129,8 @@ def produce(value, topic:, **options)
125129
# @see Kafka::Producer#deliver_messages
126130
# @return [nil]
127131
def deliver_messages
132+
ensure_threads_running!
133+
128134
@queue << [:deliver_messages, nil]
129135

130136
nil
@@ -136,6 +142,8 @@ def deliver_messages
136142
# @see Kafka::Producer#shutdown
137143
# @return [nil]
138144
def shutdown
145+
ensure_threads_running!
146+
139147
@timer_thread && @timer_thread.exit
140148
@queue << [:shutdown, nil]
141149
@worker_thread && @worker_thread.join
@@ -146,11 +154,20 @@ def shutdown
146154
private
147155

148156
def ensure_threads_running!
149-
@worker_thread = nil unless @worker_thread && @worker_thread.alive?
150-
@worker_thread ||= Thread.new { @worker.run }
157+
return if worker_thread_alive? && timer_thread_alive?
158+
159+
@thread_mutex.synchronize do
160+
@worker_thread = Thread.new { @worker.run } unless worker_thread_alive?
161+
@timer_thread = Thread.new { @timer.run } unless timer_thread_alive?
162+
end
163+
end
164+
165+
def worker_thread_alive?
166+
!!@worker_thread && @worker_thread.alive?
167+
end
151168

152-
@timer_thread = nil unless @timer_thread && @timer_thread.alive?
153-
@timer_thread ||= Thread.new { @timer.run }
169+
def timer_thread_alive?
170+
!!@timer_thread && @timer_thread.alive?
154171
end
155172

156173
def buffer_overflow(topic, message)
@@ -179,10 +196,12 @@ def run
179196
end
180197

181198
class Worker
182-
def initialize(queue:, producer:, delivery_threshold:, instrumenter:, logger:)
199+
def initialize(queue:, producer:, delivery_threshold:, max_retries: nil, retry_backoff: 0, instrumenter:, logger:)
183200
@queue = queue
184201
@producer = producer
185202
@delivery_threshold = delivery_threshold
203+
@max_retries = max_retries
204+
@retry_backoff = retry_backoff
186205
@instrumenter = instrumenter
187206
@logger = logger
188207
end
@@ -233,10 +252,22 @@ def run
233252
private
234253

235254
def produce(value, **kwargs)
236-
@producer.produce(value, **kwargs)
237-
rescue BufferOverflow
238-
deliver_messages
239-
retry
255+
retries = 0
256+
begin
257+
@producer.produce(value, **kwargs)
258+
rescue BufferOverflow => e
259+
deliver_messages
260+
if @max_retries.nil?
261+
retry
262+
elsif retries < @max_retries
263+
retries += 1
264+
sleep(@retry_backoff**retries)
265+
retry
266+
else
267+
@logger.error("Failed to asynchronously produce messages due to BufferOverflow")
268+
@instrumenter.instrument("error.async_producer", { error: e })
269+
end
270+
end
240271
end
241272

242273
def deliver_messages

lib/kafka/client.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,14 +252,16 @@ def producer(compression_codec: nil, compression_threshold: 1, ack_timeout: 5, r
252252
#
253253
# @see AsyncProducer
254254
# @return [AsyncProducer]
255-
def async_producer(delivery_interval: 0, delivery_threshold: 0, max_queue_size: 1000, **options)
255+
def async_producer(delivery_interval: 0, delivery_threshold: 0, max_queue_size: 1000, max_retries: nil, retry_backoff: 0, **options)
256256
sync_producer = producer(**options)
257257

258258
AsyncProducer.new(
259259
sync_producer: sync_producer,
260260
delivery_interval: delivery_interval,
261261
delivery_threshold: delivery_threshold,
262262
max_queue_size: max_queue_size,
263+
max_retries: max_retries,
264+
retry_backoff: retry_backoff,
263265
instrumenter: @instrumenter,
264266
logger: @logger,
265267
)

spec/async_producer_spec.rb

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,22 @@ def instrument(name, payload = {})
1818
end
1919

2020
describe Kafka::AsyncProducer do
21-
describe "#shutdown" do
22-
let(:sync_producer) { double(:sync_producer, produce: nil, shutdown: nil, deliver_messages: nil) }
23-
let(:log) { StringIO.new }
24-
let(:logger) { Logger.new(log) }
25-
let(:instrumenter) { FakeInstrumenter.new }
26-
27-
let(:async_producer) {
28-
Kafka::AsyncProducer.new(
29-
sync_producer: sync_producer,
30-
instrumenter: instrumenter,
31-
logger: logger,
32-
)
33-
}
21+
let(:sync_producer) { double(:sync_producer, produce: nil, shutdown: nil, deliver_messages: nil) }
22+
let(:log) { StringIO.new }
23+
let(:logger) { Logger.new(log) }
24+
let(:instrumenter) { FakeInstrumenter.new }
25+
26+
let(:async_producer) {
27+
Kafka::AsyncProducer.new(
28+
sync_producer: sync_producer,
29+
instrumenter: instrumenter,
30+
max_retries: 2,
31+
retry_backoff: 0.2,
32+
logger: logger,
33+
)
34+
}
3435

36+
describe "#shutdown" do
3537
it "delivers buffered messages" do
3638
async_producer.produce("hello", topic: "greetings")
3739
async_producer.shutdown
@@ -52,4 +54,26 @@ def instrument(name, payload = {})
5254
expect(metric.payload[:message_count]).to eq 42
5355
end
5456
end
57+
58+
describe "#produce" do
59+
it "delivers buffered messages" do
60+
async_producer.produce("hello", topic: "greetings")
61+
sleep 0.2 # wait for worker to call produce
62+
63+
expect(sync_producer).to have_received(:produce)
64+
end
65+
66+
it "retries until configured max_retries" do
67+
allow(sync_producer).to receive(:produce) {raise Kafka::BufferOverflow}
68+
69+
async_producer.produce("hello", topic: "greetings")
70+
sleep 0.3 # wait for all retries to be done
71+
72+
expect(log.string).to include "Failed to asynchronously produce messages due to BufferOverflow"
73+
74+
metric = instrumenter.metrics_for("error.async_producer").first
75+
expect(metric.payload[:error]).to be_a(Kafka::BufferOverflow)
76+
expect(sync_producer).to have_received(:produce).exactly(3).times
77+
end
78+
end
5579
end

0 commit comments

Comments
 (0)