Skip to content

Commit

Permalink
Merge pull request #23 from basecamp/solid-queue-adapter
Browse files Browse the repository at this point in the history
Implement a solid queue adapter
  • Loading branch information
rosa authored Oct 26, 2023
2 parents 7829037 + 5eb1b0f commit fc04f21
Show file tree
Hide file tree
Showing 26 changed files with 383 additions and 27 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/ruby.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ jobs:
rubocop:
name: Rubocop
runs-on: ubuntu-latest
env:
BUNDLE_GITHUB__COM: x-access-token:${{ secrets.GH_TOKEN }}
steps:
- name: Checkout code
uses: actions/checkout@v3
Expand All @@ -37,6 +39,7 @@ jobs:
runs-on: ubuntu-latest
env:
CI: true
BUNDLE_GITHUB__COM: x-access-token:${{ secrets.GH_TOKEN }}
strategy:
matrix:
ruby-version: ['3.0']
Expand Down
1 change: 1 addition & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ inherit_gem: { rubocop-37signals: rubocop.yml }
AllCops:
TargetRubyVersion: 3.0
Exclude:
- "test/dummy/db/schema.rb"
- "infrastructure/ci/plugins/**/*"
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ gemspec
gem "sqlite3"

gem "sprockets-rails"
gem "solid_queue", bc: "solid_queue", require: false
gem "rubocop-37signals", bc: "house-style", require: false
gem "puma"
8 changes: 8 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@ GIT
rubocop-performance
rubocop-rails

GIT
remote: https://github.com/basecamp/solid_queue
revision: d844a308e5c62b3d8f1942491c5433aca83dd2a9
specs:
solid_queue (0.1.0)
rails (>= 7.0.3.1)

PATH
remote: .
specs:
Expand Down Expand Up @@ -279,6 +286,7 @@ DEPENDENCIES
rubocop-performance
rubocop-rails
selenium-webdriver
solid_queue!
sprockets-rails
sqlite3

Expand Down
7 changes: 3 additions & 4 deletions lib/active_job/jobs_relation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def each(&block)
# This operation is only valid for sets of failed jobs. It will
# raise an error +ActiveJob::Errors::InvalidOperation+ otherwise.
def retry_all
ensure_failed_queue
ensure_failed_status
queue_adapter.retry_all_jobs(self)
nil
end
Expand All @@ -131,7 +131,7 @@ def retry_all
# This operation is only valid for sets of failed jobs. It will
# raise an error +ActiveJob::Errors::InvalidOperation+ otherwise.
def retry_job(job)
ensure_failed_queue
ensure_failed_status
queue_adapter.retry_job(job, self)
end

Expand Down Expand Up @@ -201,7 +201,6 @@ def set_defaults
self.offset_value = 0
self.limit_value = ALL_JOBS_LIMIT
self.status = :pending
self.queue_name = ActiveJob::Base.default_queue_name
end

def clone_with(**properties)
Expand Down Expand Up @@ -258,7 +257,7 @@ def satisfy_filter?(job)
job.class_name == job_class_name
end

def ensure_failed_queue
def ensure_failed_status
raise ActiveJob::Errors::InvalidOperation, "This operation can only be performed on failed jobs, but these jobs are #{status}" unless failed?
end

Expand Down
2 changes: 1 addition & 1 deletion lib/active_job/queue_adapters/resque_ext.rb
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ def fetch_failed_resque_jobs

def fetch_queue_resque_jobs
unless jobs_relation.queue_name.present?
raise ActiveJob::Errors::QueryError, "This adapter only supports fetching failed jobs when no queue name is provided"
raise ActiveJob::Errors::QueryError, "This adapter requires a queue name unless fetching failed jobs"
end
Array.wrap(Resque.peek(jobs_relation.queue_name, jobs_relation.offset_value, jobs_relation.limit_value))
end
Expand Down
157 changes: 157 additions & 0 deletions lib/active_job/queue_adapters/solid_queue_ext.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
module ActiveJob::QueueAdapters::SolidQueueExt
def activating(&block)
block.call
end

def queue_names
SolidQueue::Queue.all.map(&:name)
end

# Returns an array with the list of queues. Each queue is represented as a hash
# with these attributes:
# {
# "name": "queue_name",
# "size": 1,
# active: true
# }
def queues
SolidQueue::Queue.all.collect do |queue|
{
name: queue.name,
size: queue.size,
active: !queue.paused?
}
end
end

def queue_size(queue_name)
find_queue_by_name(queue_name).size
end

def clear_queue(queue_name)
find_queue_by_name(queue_name).clear
end

def pause_queue(queue_name)
find_queue_by_name(queue_name).pause
end

def resume_queue(queue_name)
find_queue_by_name(queue_name).resume
end

def queue_paused?(queue_name)
find_queue_by_name(queue_name).paused?
end

def jobs_count(jobs_relation)
find_solid_queue_jobs_within(jobs_relation).count
end

def fetch_jobs(jobs_relation)
find_solid_queue_jobs_within(jobs_relation).map { |job| deserialize_and_proxy_job(job) }
end

def support_class_name_filtering?
true
end

def retry_all_jobs(jobs_relation)
find_solid_queue_jobs_within(jobs_relation).each(&:retry)
end

def retry_job(job, jobs_relation)
find_solid_queue_job!(job.job_id, jobs_relation).retry
end

def discard_all_jobs(jobs_relation)
find_solid_queue_jobs_within(jobs_relation).each(&:discard)
end

def discard_job(job, jobs_relation)
find_solid_queue_job!(job.job_id, jobs_relation).discard
end

def find_job(job_id, jobs_relation)
if job = find_solid_queue_job(job_id, jobs_relation)
deserialize_and_proxy_job job
end
end

private
def find_queue_by_name(queue_name)
SolidQueue::Queue.find_by_name(queue_name)
end

def find_solid_queue_job!(job_id, jobs_relation)
find_solid_queue_job(job_id, jobs_relation) or raise ActiveJob::Errors::JobNotFoundError.new(job_id)
end

def find_solid_queue_job(job_id, jobs_relation)
find_solid_queue_jobs_within(jobs_relation).find_by(active_job_id: job_id)
end

def find_solid_queue_jobs_within(jobs_relation)
JobFilter.new(jobs_relation).jobs
end

def deserialize_and_proxy_job(solid_queue_job)
ActiveJob::JobProxy.new(solid_queue_job.arguments).tap do |job|
job.last_execution_error = execution_error_from_job(solid_queue_job)
job.raw_data = solid_queue_job.as_json
job.failed_at = solid_queue_job.failed_execution&.created_at
end
end

def execution_error_from_job(solid_queue_job)
if solid_queue_job.failed?
ActiveJob::ExecutionError.new \
error_class: solid_queue_job.failed_execution.exception_class,
message: solid_queue_job.failed_execution.message,
backtrace: solid_queue_job.failed_execution.backtrace
end
end

class JobFilter
def initialize(jobs_relation)
@jobs_relation = jobs_relation
end

def jobs
filter_by_status
.then { |jobs| filter_by_queue(jobs) }
.then { |jobs| filter_by_class(jobs) }
.then { |jobs| limit(jobs) }
.then { |jobs| offset(jobs) }
end

private
attr_reader :jobs_relation

delegate :queue_name, :status, :limit_value, :offset_value, :job_class_name, to: :jobs_relation

def filter_by_status
case status
when :pending then SolidQueue::Job.joins(:ready_execution)
when :failed then SolidQueue::Job.joins(:failed_execution)
else SolidQueue::Job.all
end
end

def filter_by_queue(jobs)
queue_name.present? ? jobs.where(queue_name: queue_name) : jobs
end

def filter_by_class(jobs)
job_class_name.present? ? jobs.where(class_name: job_class_name) : jobs
end

def limit(jobs)
limit_value.present? ? jobs.limit(limit_value) : jobs
end

def offset(jobs)
offset_value.present? ? jobs.offset(offset_value) : jobs
end
end
end
2 changes: 1 addition & 1 deletion lib/mission_control/jobs/console/context.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module MissionControl::Jobs::Console::Context
mattr_accessor :jobs_server

def evaluate(line, line_no, exception: nil)
def evaluate(*)
if MissionControl::Jobs::Current.server
MissionControl::Jobs::Current.server.activating { super }
else
Expand Down
2 changes: 2 additions & 0 deletions lib/mission_control/jobs/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ class Engine < ::Rails::Engine

config.before_initialize do
ActiveJob::QueueAdapters::ResqueAdapter.prepend ActiveJob::QueueAdapters::ResqueExt
ActiveJob::QueueAdapters::SolidQueueAdapter.prepend ActiveJob::QueueAdapters::SolidQueueExt

Resque.prepend Resque::ThreadSafeRedis
end

Expand Down
1 change: 1 addition & 0 deletions mission_control-jobs.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Gem::Specification.new do |spec|
spec.add_dependency 'stimulus-rails'

spec.add_development_dependency "resque"
spec.add_development_dependency "solid_queue"
spec.add_development_dependency "capybara"
spec.add_development_dependency "selenium-webdriver"
spec.add_development_dependency "resque-pause"
Expand Down
2 changes: 1 addition & 1 deletion test/active_job/queue_adapters/adapter_testing.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ module ActiveJob::QueueAdapters::AdapterTesting
include CountJobs, DiscardJobs, FindJobs, JobBatches, QueryJobs, Queues, RetryJobs

setup do
ApplicationJob.queue_adapter = queue_adapter
ActiveJob::Base.queue_adapter = queue_adapter
end
end

Expand Down
7 changes: 5 additions & 2 deletions test/active_job/queue_adapters/adapter_testing/count_jobs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ module ActiveJob::QueueAdapters::AdapterTesting::CountJobs
5.times { DummyJob.perform_later }
10.times { DummyReloadedJob.perform_later }

assert_equal 5, ApplicationJob.jobs.where(job_class: "DummyJob").count
assert_equal 10, ApplicationJob.jobs.where(job_class: "DummyReloadedJob").count
queue = ApplicationJob.queues[:default]

assert_equal 5, queue.jobs.where(job_class: "DummyJob").count
assert_equal 10, queue.jobs.where(job_class: "DummyReloadedJob").count
end

test "count the pending jobs in a given queue" do
Expand All @@ -32,6 +34,7 @@ module ActiveJob::QueueAdapters::AdapterTesting::CountJobs
DummyJob.queue_as :other_queue
3.times { DummyJob.perform_later }

assert_equal 8, ApplicationJob.queues.sum(&:size)
assert_equal 5, ApplicationJob.jobs.where(queue: "default").count
assert_equal 3, ApplicationJob.jobs.where(queue: "other_queue").count
assert_equal 3, ApplicationJob.jobs.where(queue: :other_queue).count
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ module ActiveJob::QueueAdapters::AdapterTesting::DiscardJobs
test "discard all pending jobs" do
10.times { |index| DummyJob.perform_later(index) }

pending_jobs = ActiveJob.jobs.pending
pending_jobs = ApplicationJob.queues[:default].jobs.pending
assert_not_empty pending_jobs
pending_jobs.discard_all

Expand Down Expand Up @@ -56,7 +56,7 @@ module ActiveJob::QueueAdapters::AdapterTesting::DiscardJobs
test "discard all pending withing a given page" do
10.times { |index| DummyJob.perform_later(index) }

pending_jobs = ActiveJob.jobs.pending
pending_jobs = ApplicationJob.queues[:default].jobs.pending
page_of_jobs = pending_jobs.offset(2).limit(3)
page_of_jobs.discard_all

Expand Down Expand Up @@ -85,7 +85,7 @@ module ActiveJob::QueueAdapters::AdapterTesting::DiscardJobs
test "discard a single pending job" do
DummyJob.perform_later

pending_jobs = ActiveJob.jobs.pending
pending_jobs = ApplicationJob.queues[:default].jobs.pending
assert_not_empty pending_jobs

pending_job = pending_jobs.last
Expand Down
3 changes: 2 additions & 1 deletion test/active_job/queue_adapters/adapter_testing/query_jobs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ module ActiveJob::QueueAdapters::AdapterTesting::QueryJobs
10.times { DummyReloadedJob.perform_later }
2.times { DummyJob.perform_later }
15.times { DummyReloadedJob.perform_later }
assert_equal [ "DummyJob", "DummyReloadedJob" ], ApplicationJob.jobs.job_classes

assert_equal [ "DummyJob", "DummyReloadedJob" ], ApplicationJob.queues[:default].jobs.job_classes
end
end
35 changes: 26 additions & 9 deletions test/active_job/queue_adapters/resque_adapter_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,26 +45,43 @@ class ActiveJob::QueueAdapters::ResqueAdapterTest < ActiveSupport::TestCase
end.each(&:join)
end

test "use different queue adapters via active job" do
test "use different resque adapters via active job" do
redis_1 = create_resque_redis("redis_1")
adapter_1 = ActiveJob::QueueAdapters::ResqueAdapter.new(redis_1)
redis_2 = create_resque_redis("redis_2")
adapter_2 = ActiveJob::QueueAdapters::ResqueAdapter.new(redis_2)

adapter_1.activating do
5.times { DummyJob.perform_later }
with_active_job_adapter(adapter_1) do
adapter_1.activating do
5.times { DummyJob.perform_later }
end
end

adapter_2.activating do
10.times { DummyJob.perform_later }
with_active_job_adapter(adapter_2) do
adapter_2.activating do
10.times { DummyJob.perform_later }
end
end

adapter_1.activating do
assert_equal 5, ApplicationJob.jobs.pending.count
with_active_job_adapter(adapter_1) do
adapter_1.activating do
assert_equal 5, ApplicationJob.jobs.pending.count
end
end

adapter_2.activating do
assert_equal 10, ApplicationJob.jobs.pending.count
with_active_job_adapter(adapter_2) do
adapter_2.activating do
assert_equal 10, ApplicationJob.jobs.pending.count
end
end
end

private
def with_active_job_adapter(adapter, &block)
previous_adapter = ActiveJob::Base.current_queue_adapter
ActiveJob::Base.current_queue_adapter = adapter
yield
ensure
ActiveJob::Base.current_queue_adapter = previous_adapter
end
end
Loading

0 comments on commit fc04f21

Please sign in to comment.