Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generalize the worker pool implementation #1

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
.DS_Store
/.bundle/
/.yardoc
/Gemfile.lock
Expand Down
File renamed without changes.
12 changes: 8 additions & 4 deletions lib/delayed_job_worker_pool/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@ module DelayedJobWorkerPool
module Worker
extend self

def run(options = {})
dj_worker = Delayed::Worker.new(options)
dj_worker.name = options[:name] if options.include?(:name)
dj_worker.start
def run(options = {}, worker_number = 0)
if options.key?(:worker_factory)
options[:worker_factory].call(options, worker_number)
else
dj_worker = Delayed::Worker.new(options)
dj_worker.name = options[:name] if options.include?(:name)
dj_worker.start
end
end
end
end
2 changes: 1 addition & 1 deletion lib/delayed_job_worker_pool/worker_group_options.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

module DelayedJobWorkerPool
class WorkerGroupOptions
DJ_SETTINGS = [:queues, :min_priority, :max_priority, :sleep_delay, :read_ahead].freeze
DJ_SETTINGS = [:queues, :min_priority, :max_priority, :sleep_delay, :read_ahead, :worker_factory].freeze
GROUP_SETTINGS = [:workers].freeze

attr_accessor *DJ_SETTINGS, *GROUP_SETTINGS
Expand Down
11 changes: 6 additions & 5 deletions lib/delayed_job_worker_pool/worker_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ def handle_dead_worker(worker_pid, status)
invoke_callback(:after_worker_shutdown, worker_info(worker_pid, group))

registry.remove_worker(worker_pid)
# FIXME: we lose worker_number here
fork_worker(group) unless shutting_down
end

Expand All @@ -130,20 +131,20 @@ def fork_workers

registry.add_group(name, group.dj_worker_options)

workers.times { fork_worker(name) }
workers.times { |worker_number| fork_worker(name, worker_number) }
end
end

def fork_worker(group)
worker_pid = Kernel.fork { run_worker(group) }
def fork_worker(group, worker_number = 0)
worker_pid = Kernel.fork { run_worker(group, worker_number) }
log("Started worker in group #{group}: #{worker_pid}")

registry.add_worker(group, worker_pid)

invoke_callback(:after_worker_boot, worker_info(worker_pid, group))
end

def run_worker(group)
def run_worker(group, worker_number = 0)
master_alive_write_pipe.close

uninstall_signal_handlers
Expand All @@ -158,7 +159,7 @@ def run_worker(group)

invoke_callback(:on_worker_boot, worker_info(Process.pid, group))

DelayedJobWorkerPool::Worker.run(worker_options(Process.pid, group))
DelayedJobWorkerPool::Worker.run(worker_options(Process.pid, group), worker_number)
rescue StandardError => e
log("Worker failed with error: #{e.message}\n#{e.backtrace.join("\n")}")
exit(1)
Expand Down
1 change: 0 additions & 1 deletion spec/delayed_job_worker_pool_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
require 'open3'
require 'active_record'
require 'active_support/all'
require 'delayed_job_active_record'
require_relative 'dummy/app/jobs/touch_file_job'

describe DelayedJobWorkerPool do
Expand Down
12 changes: 4 additions & 8 deletions delayed_job_worker_pool.gemspec → worker_pool.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,25 @@ $LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib)
require 'delayed_job_worker_pool/version'

Gem::Specification.new do |spec|
spec.name = 'delayed_job_worker_pool'
spec.name = 'worker_pool'
spec.version = DelayedJobWorkerPool::VERSION
spec.authors = ['Joel Turkel']
spec.email = ['[email protected]']

spec.summary = 'Worker process pooling for Delayed Job'
spec.homepage = 'https://github.com/salsify/delayed_job_worker_pool'
spec.summary = 'Worker process pooling'
spec.homepage = 'https://github.com/starburstlabs/worker_pool'
spec.license = 'MIT'

spec.files = `git ls-files -z`.split("\x0").reject { |f| f.match(%r{^(test|spec|features)/}) }
spec.executables = ['delayed_job_worker_pool']
spec.executables = ['worker_pool']
spec.require_paths = ['lib']

spec.required_ruby_version = '>= 2.5'

spec.add_dependency 'delayed_job', ['>= 3.0', '< 4.2']

spec.add_development_dependency 'bundler', '~> 2.1'
spec.add_development_dependency 'delayed_job_active_record'
spec.add_development_dependency 'rails', '>= 4.2', '< 6'
spec.add_development_dependency 'rake', '~> 10.0'
spec.add_development_dependency 'rspec', '>= 3.3'
spec.add_development_dependency 'salsify_rubocop', '0.85.0'
spec.add_development_dependency 'sprockets', '< 4'
spec.add_development_dependency 'sqlite3', '>= 1.3'
end