Skip to content

Commit 6ea07e1

Browse files
committed
Add expired job reaper and related tests for job expiration handling
1 parent 85d601b commit 6ea07e1

File tree

8 files changed

+425
-100
lines changed

8 files changed

+425
-100
lines changed

spec/expired_job_reaper_spec.cr

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
require "./spec_helper"
2+
3+
module JoobQ
4+
describe ExpiredJobReaper do
5+
before_each do
6+
JoobQ.reset
7+
end
8+
9+
describe "#reap_once" do
10+
it "reaps expired jobs from main queue" do
11+
store = JoobQ.store.as(RedisStore)
12+
13+
job = ExampleJob.new(1)
14+
job.expires = (Time.local - 1.hour).to_unix_ms
15+
store.enqueue(job)
16+
17+
store.queue_size("example").should eq(1)
18+
19+
reaper = ExpiredJobReaper.new(store: store)
20+
reaper.reap_once
21+
22+
store.queue_size("example").should eq(0)
23+
store.redis.zcard("joobq:dead_letter").should be >= 1
24+
end
25+
26+
it "does not reap non-expired jobs" do
27+
store = JoobQ.store.as(RedisStore)
28+
29+
job = ExampleJob.new(1)
30+
job.expires = (Time.local + 1.hour).to_unix_ms
31+
store.enqueue(job)
32+
33+
reaper = ExpiredJobReaper.new(store: store)
34+
reaper.reap_once
35+
36+
store.queue_size("example").should eq(1)
37+
end
38+
39+
it "reaps only expired jobs from a mixed queue" do
40+
store = JoobQ.store.as(RedisStore)
41+
42+
expired_job = ExampleJob.new(1)
43+
expired_job.expires = (Time.local - 1.hour).to_unix_ms
44+
store.enqueue(expired_job)
45+
46+
valid_job = ExampleJob.new(2)
47+
valid_job.expires = (Time.local + 1.hour).to_unix_ms
48+
store.enqueue(valid_job)
49+
50+
reaper = ExpiredJobReaper.new(store: store)
51+
reaper.reap_once
52+
53+
store.queue_size("example").should eq(1)
54+
end
55+
56+
it "reaps expired jobs from processing queue" do
57+
store = JoobQ.store.as(RedisStore)
58+
59+
job = ExampleJob.new(1)
60+
job.expires = (Time.local - 1.hour).to_unix_ms
61+
62+
processing_key = "joobq:processing:example"
63+
store.redis.rpush(processing_key, job.to_json)
64+
65+
store.redis.llen(processing_key).should eq(1)
66+
67+
reaper = ExpiredJobReaper.new(store: store)
68+
reaper.reap_once
69+
70+
store.redis.llen(processing_key).should eq(0)
71+
store.redis.zcard("joobq:dead_letter").should be >= 1
72+
end
73+
74+
it "handles pagination with many expired jobs" do
75+
store = JoobQ.store.as(RedisStore)
76+
77+
# Enqueue more than PAGE_SIZE (100) expired jobs
78+
150.times do |i|
79+
job = ExampleJob.new(i)
80+
job.expires = (Time.local - 1.hour).to_unix_ms
81+
store.enqueue(job)
82+
end
83+
84+
store.queue_size("example").should eq(150)
85+
86+
reaper = ExpiredJobReaper.new(store: store)
87+
reaper.reap_once
88+
89+
store.queue_size("example").should eq(0)
90+
end
91+
92+
it "increments stats counters" do
93+
store = JoobQ.store.as(RedisStore)
94+
95+
job = ExampleJob.new(1)
96+
job.expires = (Time.local - 1.hour).to_unix_ms
97+
store.enqueue(job)
98+
99+
reaper = ExpiredJobReaper.new(store: store)
100+
reaper.reap_once
101+
102+
expired_count = store.redis.hget("joobq:stats:expired", "example")
103+
expired_count.should_not be_nil
104+
expired_count.should_not be_nil
105+
expired_count.as(String).to_i.should be >= 1
106+
end
107+
108+
it "continues scanning other queues after error in one" do
109+
store = JoobQ.store.as(RedisStore)
110+
111+
# Enqueue an expired job in the "single" queue
112+
job = Job1.new
113+
job.expires = (Time.local - 1.hour).to_unix_ms
114+
store.enqueue(job)
115+
116+
store.queue_size("single").should eq(1)
117+
118+
reaper = ExpiredJobReaper.new(store: store)
119+
reaper.reap_once
120+
121+
# The reaper should still process other queues even if one has issues
122+
store.queue_size("single").should eq(0)
123+
end
124+
end
125+
126+
describe "#start and #stop" do
127+
it "starts and stops the reaper fiber" do
128+
store = JoobQ.store.as(RedisStore)
129+
reaper = ExpiredJobReaper.new(store: store, interval: 1.second)
130+
131+
reaper.start
132+
sleep 0.1.seconds
133+
134+
reaper.stop
135+
# Should not raise
136+
end
137+
138+
it "is idempotent on double start" do
139+
store = JoobQ.store.as(RedisStore)
140+
reaper = ExpiredJobReaper.new(store: store, interval: 1.second)
141+
142+
reaper.start
143+
reaper.start # should be no-op
144+
145+
reaper.stop
146+
end
147+
148+
it "is idempotent on double stop" do
149+
store = JoobQ.store.as(RedisStore)
150+
reaper = ExpiredJobReaper.new(store: store, interval: 1.second)
151+
152+
reaper.start
153+
reaper.stop
154+
reaper.stop # should be no-op
155+
end
156+
end
157+
158+
describe "scan_limit" do
159+
it "respects the scan limit" do
160+
store = JoobQ.store.as(RedisStore)
161+
162+
# Enqueue 200 expired jobs
163+
200.times do |i|
164+
job = ExampleJob.new(i)
165+
job.expires = (Time.local - 1.hour).to_unix_ms
166+
store.enqueue(job)
167+
end
168+
169+
# Set scan limit to 100 — should only scan first 100 entries
170+
reaper = ExpiredJobReaper.new(store: store, scan_limit: 100)
171+
reaper.reap_once
172+
173+
# Some jobs should remain since scan limit was hit
174+
store.queue_size("example").should eq(100)
175+
end
176+
end
177+
end
178+
end

spec/queue_spec.cr

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ module JoobQ
5858
sleep 0.1.seconds
5959
end
6060

61-
queue.size.should eq(0) # All jobs should be processed
61+
queue.size.should eq(0) # All jobs should be processed
6262
queue.stop!
6363
end
6464

src/joobq.cr

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ module JoobQ
7676
end
7777

7878
# Initialize configuration with hybrid loading method (requires block)
79-
def self.initialize_config_with(loading_method : Symbol, *args, **kwargs, &block) : Nil
79+
def self.initialize_config_with(loading_method : Symbol, *args, **kwargs, &) : Nil
8080
return if @@config_initialized
8181

8282
case loading_method
@@ -180,6 +180,9 @@ module JoobQ
180180
# Start delayed job scheduler (processes retrying jobs from DELAYED_SET)
181181
config.delayed_job_scheduler.start
182182

183+
# Start expired job reaper (scans queues for expired waiting/processing jobs)
184+
config.expired_job_reaper.start
185+
183186
queues.each do |key, queue|
184187
Log.info { "JoobQ starting #{key} queue..." }
185188
queue.start

src/joobq/configure.cr

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,19 @@ module JoobQ
157157
# Delayed job scheduler (processes retrying jobs)
158158
property delayed_job_scheduler : DelayedJobScheduler = DelayedJobScheduler.new
159159

160+
# Expired job reaper configuration
161+
property reaper_interval : Time::Span = 30.seconds
162+
property reaper_scan_limit : Int32 = 1000
163+
164+
# Expired job reaper (scans queues for expired jobs)
165+
getter expired_job_reaper : ExpiredJobReaper do
166+
ExpiredJobReaper.new(
167+
store: store,
168+
interval: reaper_interval,
169+
scan_limit: reaper_scan_limit
170+
)
171+
end
172+
160173
# DSL: Add custom middlewares
161174
def use(& : ->)
162175
yield middlewares

src/joobq/expired_job_reaper.cr

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
module JoobQ
2+
class ExpiredJobReaper
3+
Log = ::Log.for("EXPIRED_JOB_REAPER")
4+
5+
PAGE_SIZE = 100
6+
DEAD_LETTER = "joobq:dead_letter"
7+
PROCESSING = "joobq:processing"
8+
9+
@running = Atomic(Bool).new(false)
10+
@reaper_fiber : Fiber?
11+
12+
def initialize(
13+
@store : Store = RedisStore.instance,
14+
@interval : Time::Span = 30.seconds,
15+
@scan_limit : Int32 = 1000,
16+
)
17+
end
18+
19+
def start
20+
return if @running.get
21+
@running.set(true)
22+
23+
@reaper_fiber = spawn do
24+
Log.info &.emit("Expired job reaper started",
25+
interval_seconds: @interval.total_seconds,
26+
scan_limit: @scan_limit
27+
)
28+
29+
while @running.get
30+
begin
31+
reap_expired_jobs
32+
sleep @interval
33+
rescue ex
34+
Log.error &.emit("Expired job reaper error", error: ex.message)
35+
sleep 5.seconds
36+
end
37+
end
38+
end
39+
end
40+
41+
def stop
42+
return unless @running.get
43+
@running.set(false)
44+
sleep 0.1.seconds
45+
Log.info &.emit("Expired job reaper stopped")
46+
end
47+
48+
# Public method for testability — runs a single reap cycle
49+
def reap_once
50+
reap_expired_jobs
51+
end
52+
53+
private def reap_expired_jobs
54+
return unless @store.is_a?(RedisStore)
55+
56+
redis_store = @store.as(RedisStore)
57+
58+
JoobQ.config.queues.each do |queue_name, _|
59+
begin
60+
reaped_waiting = scan_and_reap_queue(redis_store, queue_name)
61+
62+
processing_key = "#{PROCESSING}:#{queue_name}"
63+
reaped_processing = scan_and_reap_queue(redis_store, processing_key, stats_queue: queue_name)
64+
65+
total = reaped_waiting + reaped_processing
66+
if total > 0
67+
Log.info &.emit("Reaped expired jobs",
68+
queue: queue_name,
69+
from_waiting: reaped_waiting,
70+
from_processing: reaped_processing
71+
)
72+
end
73+
rescue ex
74+
Log.error &.emit("Error reaping expired jobs for queue",
75+
queue: queue_name,
76+
error: ex.message
77+
)
78+
end
79+
end
80+
end
81+
82+
private def scan_and_reap_queue(
83+
redis_store : RedisStore,
84+
list_key : String,
85+
stats_queue : String? = nil,
86+
) : Int32
87+
stats_key = stats_queue || list_key
88+
current_time_ms = Time.local.to_unix_ms
89+
total_reaped = 0
90+
total_scanned = 0
91+
offset = 0
92+
93+
loop do
94+
break if total_scanned >= @scan_limit
95+
96+
batch_size = Math.min(PAGE_SIZE, @scan_limit - total_scanned)
97+
jobs = redis_store.redis.lrange(list_key, offset, offset + batch_size - 1).map(&.as(String))
98+
break if jobs.empty?
99+
100+
total_scanned += jobs.size
101+
expired_jobs = [] of String
102+
103+
jobs.each do |job_json|
104+
begin
105+
parsed = JSON.parse(job_json)
106+
expires = parsed["expires"]?.try(&.as_i64)
107+
next unless expires
108+
expired_jobs << job_json if current_time_ms > expires
109+
rescue ex
110+
Log.warn &.emit("Failed to parse job during reaping",
111+
list_key: list_key,
112+
error: ex.message
113+
)
114+
end
115+
end
116+
117+
if !expired_jobs.empty?
118+
redis_store.redis.pipelined do |pipe|
119+
expired_jobs.each do |job_json|
120+
pipe.lrem(list_key, 0, job_json)
121+
pipe.zadd(DEAD_LETTER, current_time_ms, job_json)
122+
end
123+
pipe.hincrby("joobq:stats:expired", stats_key, expired_jobs.size)
124+
pipe.hincrby("joobq:stats:dead_letter", stats_key, expired_jobs.size)
125+
end
126+
total_reaped += expired_jobs.size
127+
# Removed items shift list down; advance past non-expired items only
128+
offset += (jobs.size - expired_jobs.size)
129+
else
130+
offset += jobs.size
131+
end
132+
133+
break if jobs.size < batch_size
134+
end
135+
136+
total_reaped
137+
end
138+
end
139+
end

0 commit comments

Comments
 (0)