Skip to content

Commit 28a08db

Browse files
committed
Refactor whitespace in redis_store.cr for improved code clarity
- Removed unnecessary blank lines in the redis_store.cr file to enhance readability and maintain consistency in code formatting. - This minor adjustment contributes to a cleaner codebase, making it easier to navigate and understand the job processing logic within the JoobQ system.
1 parent b798ab7 commit 28a08db

2 files changed

Lines changed: 412 additions & 0 deletions

File tree

Lines changed: 360 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,360 @@
1+
require "./spec_helper"
2+
3+
module JoobQ
4+
# Helper method to verify job is in exactly one location
5+
def self.verify_single_location(store : RedisStore, queue_name : String,
6+
main : Int32 = 0, processing : Int32 = 0,
7+
delayed : Int32 = 0, dead : Int32 = 0)
8+
store.queue_size(queue_name).should eq main
9+
store.redis.llen("joobq:processing:#{queue_name}").should eq processing
10+
store.set_size(RedisStore::DELAYED_SET).should eq delayed
11+
store.redis.zcard("joobq:dead_letter").should eq dead
12+
13+
# Verify total is exactly 1 (or 0 if all are 0)
14+
total = main + processing + delayed + dead
15+
(total == 0 || total == 1).should be_true
16+
end
17+
18+
describe "Unique Queue Membership" do
19+
before_each do
20+
JoobQ.reset
21+
end
22+
23+
describe "Job exists in only one location at a time" do
24+
it "job is only in main queue when first enqueued" do
25+
store = JoobQ.store.as(RedisStore)
26+
queue_name = "example"
27+
job = ExampleJob.new(1)
28+
29+
# Enqueue job
30+
store.enqueue(job)
31+
32+
# Verify it's ONLY in main queue
33+
store.queue_size(queue_name).should eq 1
34+
store.redis.llen("joobq:processing:#{queue_name}").should eq 0
35+
store.set_size(RedisStore::DELAYED_SET).should eq 0
36+
store.redis.zcard("joobq:dead_letter").should eq 0
37+
end
38+
39+
it "job is only in processing queue when claimed by worker" do
40+
store = JoobQ.store.as(RedisStore)
41+
queue_name = "example"
42+
job = ExampleJob.new(1)
43+
44+
# Enqueue and claim job
45+
store.enqueue(job)
46+
claimed_job = store.claim_job(queue_name, "worker-1", ExampleJob)
47+
48+
# Verify it's ONLY in processing queue
49+
store.queue_size(queue_name).should eq 0
50+
store.redis.llen("joobq:processing:#{queue_name}").should eq 1
51+
store.set_size(RedisStore::DELAYED_SET).should eq 0
52+
store.redis.zcard("joobq:dead_letter").should eq 0
53+
end
54+
55+
it "job is only in delayed queue after retry scheduling" do
56+
store = JoobQ.store.as(RedisStore)
57+
queue_name = "example"
58+
job = ExampleJob.new(1)
59+
job.retrying!
60+
61+
# Add to processing first
62+
processing_key = "joobq:processing:#{queue_name}"
63+
store.redis.lpush(processing_key, job.to_json)
64+
65+
# Move to retry (atomic)
66+
delay_ms = 1000_i64
67+
success = store.move_to_retry_atomic(job, queue_name, delay_ms)
68+
69+
success.should be_true
70+
71+
# Verify it's ONLY in delayed queue
72+
store.queue_size(queue_name).should eq 0
73+
store.redis.llen(processing_key).should eq 0
74+
store.set_size(RedisStore::DELAYED_SET).should eq 1
75+
store.redis.zcard("joobq:dead_letter").should eq 0
76+
end
77+
78+
it "job is only in dead letter queue after moving from processing" do
79+
store = JoobQ.store.as(RedisStore)
80+
queue_name = "example"
81+
job = ExampleJob.new(1)
82+
job.dead!
83+
84+
# Add to processing first
85+
processing_key = "joobq:processing:#{queue_name}"
86+
store.redis.lpush(processing_key, job.to_json)
87+
88+
# Move to dead letter (atomic)
89+
store.move_to_dead_letter_atomic(job, queue_name)
90+
91+
# Verify it's ONLY in dead letter queue
92+
store.queue_size(queue_name).should eq 0
93+
store.redis.llen(processing_key).should eq 0
94+
store.set_size(RedisStore::DELAYED_SET).should eq 0
95+
store.redis.zcard("joobq:dead_letter").should eq 1
96+
end
97+
98+
it "job moves back to main queue from delayed queue" do
99+
store = JoobQ.store.as(RedisStore)
100+
queue_name = "example"
101+
job = ExampleJob.new(1)
102+
job.retrying!
103+
104+
# Add to delayed queue with past timestamp
105+
past_time = Time.local.to_unix_ms - 5000
106+
store.redis.zadd(RedisStore::DELAYED_SET, past_time, job.to_json)
107+
108+
# Process due jobs
109+
store.process_due_delayed_jobs(queue_name)
110+
111+
# Verify it's ONLY in main queue
112+
store.queue_size(queue_name).should eq 1
113+
store.redis.llen("joobq:processing:#{queue_name}").should eq 0
114+
store.set_size(RedisStore::DELAYED_SET).should eq 0
115+
store.redis.zcard("joobq:dead_letter").should eq 0
116+
end
117+
end
118+
119+
describe "Cleanup ensures single location (edge cases)" do
120+
it "removes job from all queues when moving to dead letter" do
121+
store = JoobQ.store.as(RedisStore)
122+
queue_name = "example"
123+
job = ExampleJob.new(1)
124+
job.dead!
125+
job_json = job.to_json
126+
127+
# Artificially add job to MULTIPLE locations (shouldn't happen, but test cleanup)
128+
store.redis.lpush(queue_name, job_json)
129+
store.redis.lpush("joobq:processing:#{queue_name}", job_json)
130+
store.redis.zadd(RedisStore::DELAYED_SET, Time.local.to_unix_ms, job_json)
131+
132+
# Verify job is in multiple places
133+
store.queue_size(queue_name).should eq 1
134+
store.redis.llen("joobq:processing:#{queue_name}").should eq 1
135+
store.set_size(RedisStore::DELAYED_SET).should eq 1
136+
137+
# Move to dead letter (should clean up all locations)
138+
store.move_to_dead_letter_atomic(job, queue_name)
139+
140+
# Verify it's ONLY in dead letter queue
141+
store.queue_size(queue_name).should eq 0
142+
store.redis.llen("joobq:processing:#{queue_name}").should eq 0
143+
store.set_size(RedisStore::DELAYED_SET).should eq 0
144+
store.redis.zcard("joobq:dead_letter").should eq 1
145+
146+
# Verify the job in dead letter is correct
147+
dead_jobs = store.redis.zrange("joobq:dead_letter", 0, -1)
148+
dead_jobs.size.should eq 1
149+
end
150+
151+
it "removes duplicate entries from processing queue" do
152+
store = JoobQ.store.as(RedisStore)
153+
queue_name = "example"
154+
job = ExampleJob.new(1)
155+
job_json = job.to_json
156+
processing_key = "joobq:processing:#{queue_name}"
157+
158+
# Add job multiple times to processing (shouldn't happen, but test cleanup)
159+
3.times { store.redis.lpush(processing_key, job_json) }
160+
store.redis.llen(processing_key).should eq 3
161+
162+
# Move to retry (should remove ALL occurrences)
163+
job.retrying!
164+
success = store.move_to_retry_atomic(job, queue_name, 1000_i64)
165+
166+
success.should be_true
167+
168+
# Verify ALL occurrences are removed
169+
store.redis.llen(processing_key).should eq 0
170+
store.set_size(RedisStore::DELAYED_SET).should eq 1
171+
end
172+
173+
it "ensures no duplicates when scheduling delayed retry" do
174+
store = JoobQ.store.as(RedisStore)
175+
queue_name = "example"
176+
job = ExampleJob.new(1)
177+
job.retrying!
178+
job_json = job.to_json
179+
processing_key = "joobq:processing:#{queue_name}"
180+
181+
# Add job to processing
182+
store.redis.lpush(processing_key, job_json)
183+
184+
# Schedule delayed retry
185+
result = store.schedule_delayed_retry(job, queue_name, 1000_i64)
186+
result.should be_true
187+
188+
# Verify no duplicates in any queue
189+
store.queue_size(queue_name).should eq 0
190+
store.redis.llen(processing_key).should eq 0
191+
store.set_size(RedisStore::DELAYED_SET).should eq 1
192+
store.redis.zcard("joobq:dead_letter").should eq 0
193+
194+
# Verify only ONE entry in delayed queue
195+
delayed_jobs = store.redis.zrange(RedisStore::DELAYED_SET, 0, -1)
196+
delayed_jobs.size.should eq 1
197+
end
198+
end
199+
200+
describe "Multiple jobs maintain unique locations" do
201+
it "multiple jobs in different queues don't interfere" do
202+
store = JoobQ.store.as(RedisStore)
203+
204+
# Create 3 jobs in different states
205+
job1 = ExampleJob.new(1)
206+
job2 = ExampleJob.new(2)
207+
job3 = ExampleJob.new(3)
208+
209+
# Job 1: In main queue
210+
store.enqueue(job1)
211+
212+
# Job 2: In delayed queue
213+
job2.retrying!
214+
store.redis.zadd(RedisStore::DELAYED_SET, Time.local.to_unix_ms + 5000, job2.to_json)
215+
216+
# Job 3: In dead letter
217+
job3.dead!
218+
store.redis.zadd("joobq:dead_letter", Time.local.to_unix_ms, job3.to_json)
219+
220+
# Verify each job is in exactly one location
221+
# Job 1
222+
main_queue_jobs = store.redis.lrange("example", 0, -1)
223+
main_queue_jobs.any? { |j| j.as(String).includes?(job1.jid.to_s) }.should be_true
224+
225+
# Job 2
226+
delayed_jobs = store.redis.zrange(RedisStore::DELAYED_SET, 0, -1)
227+
delayed_jobs.any? { |j| j.as(String).includes?(job2.jid.to_s) }.should be_true
228+
229+
# Job 3
230+
dead_jobs = store.redis.zrange("joobq:dead_letter", 0, -1)
231+
dead_jobs.any? { |j| j.as(String).includes?(job3.jid.to_s) }.should be_true
232+
233+
# Verify total unique locations
234+
store.queue_size("example").should eq 1
235+
store.set_size(RedisStore::DELAYED_SET).should eq 1
236+
store.redis.zcard("joobq:dead_letter").should eq 1
237+
end
238+
239+
it "processing multiple jobs maintains uniqueness" do
240+
store = JoobQ.store.as(RedisStore)
241+
queue_name = "example"
242+
243+
# Enqueue 5 jobs
244+
jobs = (1..5).map { |i| ExampleJob.new(i) }
245+
jobs.each { |job| store.enqueue(job) }
246+
247+
store.queue_size(queue_name).should eq 5
248+
249+
# Claim 3 jobs (move to processing)
250+
3.times { store.claim_job(queue_name, "worker-1", ExampleJob) }
251+
252+
# Verify distribution
253+
store.queue_size(queue_name).should eq 2
254+
store.redis.llen("joobq:processing:#{queue_name}").should eq 3
255+
256+
# Total should still be 5 unique jobs
257+
(store.queue_size(queue_name) + store.redis.llen("joobq:processing:#{queue_name}")).should eq 5
258+
end
259+
end
260+
261+
describe "Retry lock prevents duplicate scheduling" do
262+
it "prevents duplicate retry scheduling with lock" do
263+
store = JoobQ.store.as(RedisStore)
264+
queue_name = "example"
265+
queue = JoobQ["example"]
266+
job = ExampleJob.new(1)
267+
job.retries = 2
268+
job.max_retries = 3
269+
270+
# Add to processing
271+
processing_key = "joobq:processing:#{queue_name}"
272+
store.redis.lpush(processing_key, job.to_json)
273+
274+
# First retry attempt (should succeed)
275+
retry_attempt = job.max_retries - job.retries
276+
success1 = ExponentialBackoff.retry_idempotent_atomic(job, queue, retry_attempt)
277+
success1.should be_true
278+
279+
# Add job back to processing (simulate duplicate processing)
280+
store.redis.lpush(processing_key, job.to_json)
281+
282+
# Second retry attempt (should fail due to lock)
283+
success2 = ExponentialBackoff.retry_idempotent_atomic(job, queue, retry_attempt)
284+
success2.should be_false
285+
286+
# Verify job is in delayed queue only once
287+
store.set_size(RedisStore::DELAYED_SET).should eq 1
288+
289+
# Verify lock exists
290+
retry_lock_key = "joobq:retry_lock:#{job.jid}"
291+
store.redis.exists(retry_lock_key).should eq 1
292+
end
293+
294+
it "cleans up retry lock after successful completion" do
295+
store = JoobQ.store.as(RedisStore)
296+
queue = JoobQ["example"]
297+
job = ExampleJob.new(1)
298+
299+
# Set a retry lock
300+
retry_lock_key = "joobq:retry_lock:#{job.jid}"
301+
store.redis.set(retry_lock_key, "retrying", ex: 30)
302+
store.redis.exists(retry_lock_key).should eq 1
303+
304+
# Cleanup lock
305+
ExponentialBackoff.cleanup_retry_lock_atomic(job, queue)
306+
307+
# Verify lock is removed
308+
store.redis.exists(retry_lock_key).should eq 0
309+
end
310+
end
311+
312+
describe "Complete flow maintains uniqueness" do
313+
it "job moves through complete lifecycle uniquely: Enqueued -> Processing -> Retrying -> Delayed -> Enqueued -> Processing -> Dead" do
314+
store = JoobQ.store.as(RedisStore)
315+
queue_name = "example"
316+
queue = JoobQ["example"]
317+
job = ExampleJob.new(1)
318+
job.retries = 1
319+
job.max_retries = 1
320+
321+
# Step 1: Enqueued
322+
store.enqueue(job)
323+
JoobQ.verify_single_location(store, queue_name, main: 1)
324+
325+
# Step 2: Processing (claim job)
326+
claimed_job_json = store.claim_job(queue_name, "worker-1", ExampleJob)
327+
claimed_job_json.should_not be_nil
328+
JoobQ.verify_single_location(store, queue_name, processing: 1)
329+
330+
# Step 3: Retrying (job fails, moves to delayed)
331+
claimed_job = ExampleJob.from_json(claimed_job_json.not_nil!)
332+
claimed_job.retrying!
333+
retry_attempt = claimed_job.max_retries - claimed_job.retries
334+
success = ExponentialBackoff.retry_idempotent_atomic(claimed_job, queue, retry_attempt)
335+
success.should be_true
336+
JoobQ.verify_single_location(store, queue_name, delayed: 1)
337+
338+
# Step 4: Back to Enqueued (process due jobs)
339+
# Make job due by updating its score
340+
delayed_jobs = store.redis.zrange(RedisStore::DELAYED_SET, 0, -1)
341+
job_json = delayed_jobs[0].as(String)
342+
store.redis.zadd(RedisStore::DELAYED_SET, Time.local.to_unix_ms - 1000, job_json)
343+
store.process_due_delayed_jobs(queue_name)
344+
JoobQ.verify_single_location(store, queue_name, main: 1)
345+
346+
# Step 5: Processing again
347+
claimed_job_json2 = store.claim_job(queue_name, "worker-1", ExampleJob)
348+
claimed_job_json2.should_not be_nil
349+
JoobQ.verify_single_location(store, queue_name, processing: 1)
350+
351+
# Step 6: Dead (no retries left)
352+
claimed_job2 = ExampleJob.from_json(claimed_job_json2.not_nil!)
353+
claimed_job2.retries = 0
354+
ExponentialBackoff.move_to_dead_letter_atomic(claimed_job2, queue)
355+
JoobQ.verify_single_location(store, queue_name, dead: 1)
356+
end
357+
end
358+
end
359+
end
360+

0 commit comments

Comments
 (0)