Skip to content

Commit 00f7487

Browse files
authored
Merge pull request #61 from rainforestapp/RF-30293-revert-to-4.0.0.alpha13
[RF-30293] Revert to 4.0.0.alpha13
2 parents dd72041 + 08fa342 commit 00f7487

File tree

5 files changed

+39
-85
lines changed

5 files changed

+39
-85
lines changed

lib/queue_classic_plus.rb

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,14 @@ module QueueClassicPlus
1515

1616
def self.migrate(c = QC::default_conn_adapter.connection)
1717
conn = QC::ConnAdapter.new(connection: c)
18-
conn.execute("ALTER TABLE queue_classic_jobs ADD COLUMN IF NOT EXISTS last_error TEXT")
19-
conn.execute("ALTER TABLE queue_classic_jobs ADD COLUMN IF NOT EXISTS remaining_retries INTEGER")
20-
conn.execute("ALTER TABLE queue_classic_jobs ADD COLUMN IF NOT EXISTS lock BOOLEAN NOT NULL DEFAULT FALSE")
21-
conn.execute("CREATE UNIQUE INDEX IF NOT EXISTS index_queue_classic_jobs_enqueue_lock on queue_classic_jobs(q_name, method, args) WHERE lock IS TRUE")
18+
conn.execute("ALTER TABLE queue_classic_jobs ADD COLUMN last_error TEXT")
19+
conn.execute("ALTER TABLE queue_classic_jobs ADD COLUMN remaining_retries INTEGER")
2220
end
2321

2422
def self.demigrate(c = QC::default_conn_adapter.connection)
2523
conn = QC::ConnAdapter.new(connection: c)
26-
conn.execute("ALTER TABLE queue_classic_jobs DROP COLUMN IF EXISTS last_error")
27-
conn.execute("ALTER TABLE queue_classic_jobs DROP COLUMN IF EXISTS remaining_retries")
28-
conn.execute("DROP INDEX IF EXISTS index_queue_classic_jobs_enqueue_lock")
29-
conn.execute("ALTER TABLE queue_classic_jobs DROP COLUMN IF EXISTS lock")
24+
conn.execute("ALTER TABLE queue_classic_jobs DROP COLUMN last_error")
25+
conn.execute("ALTER TABLE queue_classic_jobs DROP COLUMN remaining_retries")
3026
end
3127

3228
def self.exception_handler

lib/queue_classic_plus/base.rb

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,32 @@ def self.logger
5252
QueueClassicPlus.logger
5353
end
5454

55+
def self.can_enqueue?(method, *args)
56+
if locked?
57+
max_lock_time = ENV.fetch("QUEUE_CLASSIC_MAX_LOCK_TIME", 10 * 60).to_i
58+
59+
q = "SELECT COUNT(1) AS count
60+
FROM
61+
(
62+
SELECT 1
63+
FROM queue_classic_jobs
64+
WHERE q_name = $1 AND method = $2 AND args::text = $3::text
65+
AND (locked_at IS NULL OR locked_at > current_timestamp - interval '#{max_lock_time} seconds')
66+
LIMIT 1
67+
)
68+
AS x"
69+
70+
result = QC.default_conn_adapter.execute(q, @queue, method, JSON.dump(serialized(args)))
71+
result['count'].to_i == 0
72+
else
73+
true
74+
end
75+
end
76+
5577
def self.enqueue(method, *args)
56-
queue.enqueue(method, *serialized(args), lock: locked?)
78+
if can_enqueue?(method, *args)
79+
queue.enqueue(method, *serialized(args))
80+
end
5781
end
5882

5983
def self.enqueue_perform(*args)

lib/queue_classic_plus/queue_classic/queue.rb

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -50,25 +50,5 @@ def lock
5050
end
5151
end
5252

53-
def enqueue(method, *args, lock: false)
54-
QC.log_yield(:measure => 'queue.enqueue') do
55-
insert_sql = <<-EOF
56-
INSERT INTO #{QC.table_name} (q_name, method, args, lock)
57-
VALUES ($1, $2, $3, $4)
58-
ON CONFLICT (q_name, method, args) WHERE lock IS TRUE DO NOTHING
59-
RETURNING id
60-
EOF
61-
begin
62-
retries ||= 0
63-
conn_adapter.execute(insert_sql, name, method, JSON.dump(args), lock)
64-
rescue PG::Error => error
65-
if (retries += 1) < 2
66-
retry
67-
else
68-
raise
69-
end
70-
end
71-
end
72-
end
7353
end
7454
end

lib/queue_classic_plus/version.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
module QueueClassicPlus
2-
VERSION = '4.0.0.alpha17'.freeze
2+
VERSION = '4.0.0.alpha18'.freeze
33
end

spec/base_spec.rb

Lines changed: 9 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -3,24 +3,6 @@
33

44
describe QueueClassicPlus::Base do
55
context "A child of QueueClassicPlus::Base" do
6-
subject do
7-
Class.new(QueueClassicPlus::Base) do
8-
@queue = :test
9-
end
10-
end
11-
12-
it "allows multiple enqueues" do
13-
threads = []
14-
10.times do
15-
threads << Thread.new do
16-
subject.do
17-
end
18-
end
19-
threads.each(&:join)
20-
21-
expect(subject).to have_queue_size_of(10)
22-
end
23-
246
context "that is locked" do
257
subject do
268
Class.new(QueueClassicPlus::Base) do
@@ -30,28 +12,9 @@
3012
end
3113

3214
it "does not allow multiple enqueues" do
33-
threads = []
34-
10.times do
35-
threads << Thread.new do
36-
subject.do
37-
expect(subject).to have_queue_size_of(1)
38-
end
39-
end
40-
threads.each(&:join)
41-
end
42-
43-
it "allows enqueueing same job with different arguments" do
44-
threads = []
45-
(1..3).each do |arg|
46-
10.times do
47-
threads << Thread.new do
48-
subject.do(arg)
49-
end
50-
end
51-
end
52-
threads.each(&:join)
53-
54-
expect(subject).to have_queue_size_of(3)
15+
subject.do
16+
subject.do
17+
expect(subject).to have_queue_size_of(1)
5518
end
5619

5720
it "checks for an existing job using the same serializing as job enqueuing" do
@@ -65,22 +28,13 @@
6528
subject.do(date)
6629
expect(subject).to have_queue_size_of(1)
6730
end
68-
end
69-
70-
context "when in a transaction" do
71-
subject do
72-
Class.new(QueueClassicPlus::Base) do
73-
@queue = :test
74-
lock!
75-
end
76-
end
7731

78-
it "does not create another transaction when enqueueing" do
79-
conn = QC.default_conn_adapter.connection
80-
expect(conn).to receive(:transaction).exactly(1).times.and_call_original
81-
conn.transaction do
82-
subject.do
83-
end
32+
it "does allow multiple enqueues if something got locked for too long" do
33+
subject.do
34+
one_day_ago = Time.now - 60*60*24
35+
execute "UPDATE queue_classic_jobs SET locked_at = '#{one_day_ago}' WHERE q_name = 'test'"
36+
subject.do
37+
expect(subject).to have_queue_size_of(2)
8438
end
8539
end
8640

0 commit comments

Comments
 (0)