Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 57 additions & 1 deletion lib/resque/pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,61 @@ def all_known_queues
config.keys | workers.keys
end

# Get the number of workers declared in the config file for queues
#
# Supports both upstream syntax (integer value) and extended syntax
# (array with workers key):
#
# # upstream syntax
# foo: 4
#
# # extended syntax
# foo:
# - workers: 4
# - fork_per_job: false
#
# @param queues [String] a given key/entry from the config file
# @return [Integer] the declared count of workers, or 0 if not found
def config_get_worker_count(queues)
return 0 unless config.has_key?(queues)

v = config[queues]

if v.is_a?(Integer)
return v
elsif v.is_a?(Array)
workers_entry = v.find { |entry| entry.is_a?(Hash) && entry.has_key?('workers') }
workers_count = workers_entry ? workers_entry['workers'] : 0
return workers_count.to_i
end

0
end

# Check if certain queues should have their workers fork per job
#
# Default behavior is to fork. Set fork_per_job: false in config to disable:
#
# foo:
# - workers: 4
# - fork_per_job: false
#
# @param queues [Array<String>|String] the input queue/queues
# @return [Boolean] true if should fork (default), false if fork_per_job: false
def fork_enabled_for_queues?(queues)
queues_key = queues.is_a?(Array) ? queues.join(',') : queues
return true unless config.has_key?(queues_key)

v = config[queues_key]

if v.is_a?(Array)
fork_entry = v.find { |entry| entry.is_a?(Hash) && entry.has_key?('fork_per_job') }
return (fork_entry ? !!fork_entry['fork_per_job'] : true)
end

true
end

# }}}
# methods that operate on a single grouping of queues {{{
# perhaps this means a class is waiting to be extracted
Expand All @@ -416,7 +471,7 @@ def quit_excess_workers_for(queues)
end

def worker_delta_for(queues)
config.fetch(queues, 0) - workers.fetch(queues, []).size
config_get_worker_count(queues) - workers.fetch(queues, []).size
end

def pids_for(queues)
Expand Down Expand Up @@ -450,6 +505,7 @@ def create_worker(queues)
queues = queues.to_s.split(',')
worker = ::Resque::Worker.new(*queues)
worker.pool_master_pid = Process.pid
worker.fork_per_job = fork_enabled_for_queues?(queues)
worker.term_timeout = (ENV['RESQUE_TERM_TIMEOUT'] || 4.0).to_f
worker.term_child = ENV['TERM_CHILD']
if worker.respond_to?(:run_at_exit_hooks=)
Expand Down
2 changes: 1 addition & 1 deletion lib/resque/pool/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
module Resque
class Pool
VERSION = "0.8.0"
VERSION = "0.8.0.skroutz.1"
end
end
22 changes: 22 additions & 0 deletions spec/resque-pool-mixed.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
foo: 1

lala:
- workers: 7
- fork_per_job: false

production:
"foo,bar":
- workers: 10

development:
"foo,bar": 4
"baz":
- workers: 23
"koko":
- workers: 1
- fork_per_job: false

test:
"bar":
- workers: 5
"foo,bar": 3
25 changes: 25 additions & 0 deletions spec/resque-pool-skroutz.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
foo:
- workers: 1

lala:
- workers: 7
- fork_per_job: false

production:
"foo,bar":
- workers: 10

development:
"foo,bar":
- workers: 4
"baz":
- workers: 23
"koko":
- workers: 1
- fork_per_job: false

test:
"bar":
- workers: 5
"foo,bar":
- workers: 3
156 changes: 156 additions & 0 deletions spec/resque_pool_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -324,3 +324,159 @@ module Rails; end
end
end
end

describe Resque::Pool, "when config file has skroutz syntax only" do
subject { Resque::Pool.new('spec/resque-pool-skroutz.yml') }

context "when RACK_ENV is set" do
before { ENV['RACK_ENV'] = 'development' }

it "fetches all the known queues (global ones plus the ones for the specific environment)" do
subject.all_known_queues.should == ["foo", "lala", "foo,bar", "baz", "koko"]
end

it "fetches the count of workers correctly" do
subject.config_get_worker_count("foo").should == 1
subject.config_get_worker_count("lala").should == 7
subject.config_get_worker_count("foo,bar").should == 4
subject.config_get_worker_count("baz").should == 23
subject.config_get_worker_count("koko").should == 1
end

it "recognizes if fork is enabled or not" do
subject.fork_enabled_for_queues?(["foo"]).should == true
subject.fork_enabled_for_queues?(["lala"]).should == false
subject.fork_enabled_for_queues?(["foo", "bar"]).should == true
subject.fork_enabled_for_queues?(["baz"]).should == true
subject.fork_enabled_for_queues?(["koko"]).should == false
end

it "creates forking workers if fork is enabled (forking is true by default)" do
worker_foo = subject.create_worker("foo")
worker_foo_bar = subject.create_worker("foo,bar")
worker_baz = subject.create_worker("baz")

worker_foo.fork_per_job?.should == true
worker_foo_bar.fork_per_job?.should == true
worker_baz.fork_per_job?.should == true
end

it "creates non forking workers if fork is not enabled" do
worker_lala = subject.create_worker("lala")
worker_koko = subject.create_worker("koko")

worker_lala.fork_per_job?.should == false
worker_koko.fork_per_job?.should == false
end
end

context "when RACK_ENV is not set" do
it "fetches all the known queues (only the global ones)" do
subject.all_known_queues.should == ["foo", "lala"]
end

it "loads keys which do not have an environment and fetches the count of workers correctly" do
subject.config_get_worker_count("foo").should == 1
subject.config_get_worker_count("lala").should == 7
subject.config_get_worker_count("foo,bar").should == 0
subject.config_get_worker_count("bar").should == 0
end

it "loads keys which do not have an enviroment and recognizes if fork is enabled or not" do
subject.fork_enabled_for_queues?(["foo"]).should == true
subject.fork_enabled_for_queues?(["lala"]).should == false
subject.fork_enabled_for_queues?(["foo", "bar"]).should == true
subject.fork_enabled_for_queues?(["bar"]).should == true
end

it "creates forking workers if fork is enabled" do
worker_foo = subject.create_worker("foo")

worker_foo.fork_per_job?.should == true
end

it "creates non forking workers if fork is not enabled" do
worker_foo = subject.create_worker("lala")

worker_foo.fork_per_job?.should == false
end
end
end

describe Resque::Pool, "when config file has mixed syntax" do
subject { Resque::Pool.new('spec/resque-pool-mixed.yml') }

context "when RACK_ENV is set" do
before { ENV['RACK_ENV'] = 'development' }

it "fetches all the known queues (global ones plus the ones for the specific environment)" do
subject.all_known_queues.should == ["foo", "lala", "foo,bar", "baz", "koko"]
end

it "fetches the count of workers correctly" do
subject.config_get_worker_count("foo").should == 1
subject.config_get_worker_count("lala").should == 7
subject.config_get_worker_count("foo,bar").should == 4
subject.config_get_worker_count("baz").should == 23
subject.config_get_worker_count("koko").should == 1
end

it "recognizes if fork is enabled or not" do
subject.fork_enabled_for_queues?(["foo"]).should == true
subject.fork_enabled_for_queues?(["lala"]).should == false
subject.fork_enabled_for_queues?(["foo", "bar"]).should == true
subject.fork_enabled_for_queues?(["baz"]).should == true
subject.fork_enabled_for_queues?(["koko"]).should == false
end

it "creates forking workers if fork is enabled (forking is true by default)" do
worker_foo = subject.create_worker("foo")
worker_foo_bar = subject.create_worker("foo,bar")
worker_baz = subject.create_worker("baz")

worker_foo.fork_per_job?.should == true
worker_foo_bar.fork_per_job?.should == true
worker_baz.fork_per_job?.should == true
end

it "creates non forking workers if fork is not enabled" do
worker_lala = subject.create_worker("lala")
worker_koko = subject.create_worker("koko")

worker_lala.fork_per_job?.should == false
worker_koko.fork_per_job?.should == false
end
end

context "when RACK_ENV is not set" do
it "fetches all the known queues (only the global ones)" do
subject.all_known_queues.should == ["foo", "lala"]
end

it "loads keys which do not have an environment and fetches the count of workers correctly" do
subject.config_get_worker_count("foo").should == 1
subject.config_get_worker_count("lala").should == 7
subject.config_get_worker_count("foo,bar").should == 0
subject.config_get_worker_count("bar").should == 0
end

it "loads keys which do not have an enviroment and recognizes if fork is enabled or not" do
subject.fork_enabled_for_queues?(["foo"]).should == true
subject.fork_enabled_for_queues?(["lala"]).should == false
subject.fork_enabled_for_queues?(["foo", "bar"]).should == true
subject.fork_enabled_for_queues?(["bar"]).should == true
end

it "creates forking workers if fork is enabled" do
worker_foo = subject.create_worker("foo")

worker_foo.fork_per_job?.should == true
end

it "creates non forking workers if fork is not enabled" do
worker_foo = subject.create_worker("lala")

worker_foo.fork_per_job?.should == false
end
end
end