diff --git a/.gitignore b/.gitignore index c19a8b9..74cc888 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +.DS_Store /.bundle/ /.yardoc /Gemfile.lock diff --git a/bin/delayed_job_worker_pool b/bin/worker_pool similarity index 100% rename from bin/delayed_job_worker_pool rename to bin/worker_pool diff --git a/lib/delayed_job_worker_pool/worker.rb b/lib/delayed_job_worker_pool/worker.rb index 5ed4c59..8756055 100644 --- a/lib/delayed_job_worker_pool/worker.rb +++ b/lib/delayed_job_worker_pool/worker.rb @@ -4,10 +4,14 @@ module DelayedJobWorkerPool module Worker extend self - def run(options = {}) - dj_worker = Delayed::Worker.new(options) - dj_worker.name = options[:name] if options.include?(:name) - dj_worker.start + def run(options = {}, worker_number = 0) + if options.key?(:worker_factory) + options[:worker_factory].call(options, worker_number) + else + dj_worker = Delayed::Worker.new(options) + dj_worker.name = options[:name] if options.include?(:name) + dj_worker.start + end end end end diff --git a/lib/delayed_job_worker_pool/worker_group_options.rb b/lib/delayed_job_worker_pool/worker_group_options.rb index a6d35de..f7b875e 100644 --- a/lib/delayed_job_worker_pool/worker_group_options.rb +++ b/lib/delayed_job_worker_pool/worker_group_options.rb @@ -2,7 +2,7 @@ module DelayedJobWorkerPool class WorkerGroupOptions - DJ_SETTINGS = [:queues, :min_priority, :max_priority, :sleep_delay, :read_ahead].freeze + DJ_SETTINGS = [:queues, :min_priority, :max_priority, :sleep_delay, :read_ahead, :worker_factory].freeze GROUP_SETTINGS = [:workers].freeze attr_accessor *DJ_SETTINGS, *GROUP_SETTINGS diff --git a/lib/delayed_job_worker_pool/worker_pool.rb b/lib/delayed_job_worker_pool/worker_pool.rb index 8e12372..50935f5 100644 --- a/lib/delayed_job_worker_pool/worker_pool.rb +++ b/lib/delayed_job_worker_pool/worker_pool.rb @@ -109,6 +109,7 @@ def handle_dead_worker(worker_pid, status) invoke_callback(:after_worker_shutdown, worker_info(worker_pid, group)) registry.remove_worker(worker_pid) + # FIXME: we lose worker_number here fork_worker(group) unless shutting_down end @@ -130,12 +131,12 @@ def fork_workers registry.add_group(name, group.dj_worker_options) - workers.times { fork_worker(name) } + workers.times { |worker_number| fork_worker(name, worker_number) } end end - def fork_worker(group) - worker_pid = Kernel.fork { run_worker(group) } + def fork_worker(group, worker_number = 0) + worker_pid = Kernel.fork { run_worker(group, worker_number) } log("Started worker in group #{group}: #{worker_pid}") registry.add_worker(group, worker_pid) @@ -143,7 +144,7 @@ def fork_worker(group) invoke_callback(:after_worker_boot, worker_info(worker_pid, group)) end - def run_worker(group) + def run_worker(group, worker_number = 0) master_alive_write_pipe.close uninstall_signal_handlers @@ -158,7 +159,7 @@ def run_worker(group) invoke_callback(:on_worker_boot, worker_info(Process.pid, group)) - DelayedJobWorkerPool::Worker.run(worker_options(Process.pid, group)) + DelayedJobWorkerPool::Worker.run(worker_options(Process.pid, group), worker_number) rescue StandardError => e log("Worker failed with error: #{e.message}\n#{e.backtrace.join("\n")}") exit(1) diff --git a/spec/delayed_job_worker_pool_spec.rb b/spec/delayed_job_worker_pool_spec.rb index cf04437..8e544ad 100644 --- a/spec/delayed_job_worker_pool_spec.rb +++ b/spec/delayed_job_worker_pool_spec.rb @@ -4,7 +4,6 @@ require 'open3' require 'active_record' require 'active_support/all' -require 'delayed_job_active_record' require_relative 'dummy/app/jobs/touch_file_job' describe DelayedJobWorkerPool do diff --git a/delayed_job_worker_pool.gemspec b/worker_pool.gemspec similarity index 67% rename from delayed_job_worker_pool.gemspec rename to worker_pool.gemspec index 5b8f7e2..e17bfef 100644 --- a/delayed_job_worker_pool.gemspec +++ b/worker_pool.gemspec @@ -5,29 +5,25 @@ $LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib) require 'delayed_job_worker_pool/version' Gem::Specification.new do |spec| - spec.name = 'delayed_job_worker_pool' + spec.name = 'worker_pool' spec.version = DelayedJobWorkerPool::VERSION spec.authors = ['Joel Turkel'] spec.email = ['jturkel@salsify.com'] - spec.summary = 'Worker process pooling for Delayed Job' - spec.homepage = 'https://github.com/salsify/delayed_job_worker_pool' + spec.summary = 'Worker process pooling' + spec.homepage = 'https://github.com/starburstlabs/worker_pool' spec.license = 'MIT' spec.files = `git ls-files -z`.split("\x0").reject { |f| f.match(%r{^(test|spec|features)/}) } - spec.executables = ['delayed_job_worker_pool'] + spec.executables = ['worker_pool'] spec.require_paths = ['lib'] spec.required_ruby_version = '>= 2.5' - spec.add_dependency 'delayed_job', ['>= 3.0', '< 4.2'] - spec.add_development_dependency 'bundler', '~> 2.1' - spec.add_development_dependency 'delayed_job_active_record' spec.add_development_dependency 'rails', '>= 4.2', '< 6' spec.add_development_dependency 'rake', '~> 10.0' spec.add_development_dependency 'rspec', '>= 3.3' - spec.add_development_dependency 'salsify_rubocop', '0.85.0' spec.add_development_dependency 'sprockets', '< 4' spec.add_development_dependency 'sqlite3', '>= 1.3' end