From d3a4466b0dd3e3336365b335452ef08f5dcd0f9f Mon Sep 17 00:00:00 2001 From: Nick Pezza Date: Sat, 5 Oct 2024 21:00:52 -0400 Subject: [PATCH] Report errors to the error reporter --- app/models/solid_queue/claimed_execution.rb | 9 +++++---- app/models/solid_queue/process/executor.rb | 4 ++-- app/models/solid_queue/process/prunable.rb | 2 +- lib/solid_queue/supervisor.rb | 2 +- lib/solid_queue/supervisor/maintenance.rb | 3 ++- .../solid_queue/claimed_execution_test.rb | 15 +++++++++------ test/unit/worker_test.rb | 17 +++++++++++++++++ 7 files changed, 37 insertions(+), 15 deletions(-) diff --git a/app/models/solid_queue/claimed_execution.rb b/app/models/solid_queue/claimed_execution.rb index d4abf45a..ba469a3e 100644 --- a/app/models/solid_queue/claimed_execution.rb +++ b/app/models/solid_queue/claimed_execution.rb @@ -36,10 +36,10 @@ def release_all end end - def fail_all_with(error) + def fail_all_with(error, reraise:) SolidQueue.instrument(:fail_many_claimed) do |payload| includes(:job).tap do |executions| - executions.each { |execution| execution.failed_with(error) } + executions.each { |execution| execution.failed_with(error, reraise: reraise) } payload[:process_ids] = executions.map(&:process_id).uniq payload[:job_ids] = executions.map(&:job_id).uniq @@ -63,7 +63,7 @@ def perform if result.success? finished else - failed_with(result.error) + failed_with(result.error, reraise: true) end ensure job.unblock_next_blocked_job @@ -82,11 +82,12 @@ def discard raise UndiscardableError, "Can't discard a job in progress" end - def failed_with(error) + def failed_with(error, reraise:) transaction do job.failed_with(error) destroy! end + raise error if reraise end private diff --git a/app/models/solid_queue/process/executor.rb b/app/models/solid_queue/process/executor.rb index 8dcd12aa..23c41911 100644 --- a/app/models/solid_queue/process/executor.rb +++ b/app/models/solid_queue/process/executor.rb @@ -11,9 +11,9 @@ module Executor after_destroy :release_all_claimed_executions end - def fail_all_claimed_executions_with(error) + def fail_all_claimed_executions_with(error, reraise:) if claims_executions? - claimed_executions.fail_all_with(error) + claimed_executions.fail_all_with(error, reraise: reraise) end end diff --git a/app/models/solid_queue/process/prunable.rb b/app/models/solid_queue/process/prunable.rb index 85341d1d..f116138e 100644 --- a/app/models/solid_queue/process/prunable.rb +++ b/app/models/solid_queue/process/prunable.rb @@ -23,7 +23,7 @@ def prune(excluding: nil) def prune error = Processes::ProcessPrunedError.new(last_heartbeat_at) - fail_all_claimed_executions_with(error) + fail_all_claimed_executions_with(error, reraise: false) deregister(pruned: true) end diff --git a/lib/solid_queue/supervisor.rb b/lib/solid_queue/supervisor.rb index 9ef736e4..511886d4 100644 --- a/lib/solid_queue/supervisor.rb +++ b/lib/solid_queue/supervisor.rb @@ -173,7 +173,7 @@ def replace_fork(pid, status) def handle_claimed_jobs_by(terminated_fork, status) if registered_process = process.supervisees.find_by(name: terminated_fork.name) error = Processes::ProcessExitError.new(status) - registered_process.fail_all_claimed_executions_with(error) + registered_process.fail_all_claimed_executions_with(error, reraise: false) end end diff --git a/lib/solid_queue/supervisor/maintenance.rb b/lib/solid_queue/supervisor/maintenance.rb index 1b6b5204..3ce8fa53 100644 --- a/lib/solid_queue/supervisor/maintenance.rb +++ b/lib/solid_queue/supervisor/maintenance.rb @@ -29,7 +29,8 @@ def prune_dead_processes def fail_orphaned_executions wrap_in_app_executor do - ClaimedExecution.orphaned.fail_all_with(Processes::ProcessMissingError.new) + ClaimedExecution.orphaned. + fail_all_with(Processes::ProcessMissingError.new, reraise: false) end end end diff --git a/test/models/solid_queue/claimed_execution_test.rb b/test/models/solid_queue/claimed_execution_test.rb index 226dad77..6ccbb8e8 100644 --- a/test/models/solid_queue/claimed_execution_test.rb +++ b/test/models/solid_queue/claimed_execution_test.rb @@ -22,7 +22,9 @@ class SolidQueue::ClaimedExecutionTest < ActiveSupport::TestCase job = claimed_execution.job assert_difference -> { SolidQueue::ClaimedExecution.count } => -1, -> { SolidQueue::FailedExecution.count } => 1 do - claimed_execution.perform + assert_raises RuntimeError do + claimed_execution.perform + end end assert_not job.reload.finished? @@ -37,10 +39,12 @@ class SolidQueue::ClaimedExecutionTest < ActiveSupport::TestCase test "job failures are reported via Rails error subscriber" do subscriber = ErrorBuffer.new - with_error_subscriber(subscriber) do - claimed_execution = prepare_and_claim_job RaisingJob.perform_later(RuntimeError, "B") + assert_raises RuntimeError do + with_error_subscriber(subscriber) do + claimed_execution = prepare_and_claim_job RaisingJob.perform_later(RuntimeError, "B") - claimed_execution.perform + claimed_execution.perform + end end assert_equal 1, subscriber.errors.count @@ -61,9 +65,8 @@ class SolidQueue::ClaimedExecutionTest < ActiveSupport::TestCase test "fail with error" do claimed_execution = prepare_and_claim_job AddToBufferJob.perform_later(42) job = claimed_execution.job - assert_difference -> { SolidQueue::ClaimedExecution.count } => -1, -> { SolidQueue::FailedExecution.count } => 1 do - claimed_execution.failed_with(RuntimeError.new) + claimed_execution.failed_with(RuntimeError.new, reraise: false) end assert job.reload.failed? diff --git a/test/unit/worker_test.rb b/test/unit/worker_test.rb index 3523e4a1..09999808 100644 --- a/test/unit/worker_test.rb +++ b/test/unit/worker_test.rb @@ -67,6 +67,23 @@ class WorkerTest < ActiveSupport::TestCase SolidQueue.on_thread_error = original_on_thread_error end + test "errors on claimed executions are reported via Rails error subscriber" do + subscriber = ErrorBuffer.new + Rails.error.subscribe(subscriber) + + RaisingJob.perform_later(RuntimeError, "B") + + @worker.start + + wait_for_jobs_to_finish_for(1.second) + @worker.wake_up + + assert_equal 1, subscriber.errors.count + assert_equal "This is a RuntimeError exception", subscriber.messages.first + ensure + Rails.error.unsubscribe(subscriber) if Rails.error.respond_to?(:unsubscribe) + end + test "claim and process more enqueued jobs than the pool size allows to process at once" do 5.times do |i| StoreResultJob.perform_later(:paused, pause: 0.1.second)