Skip to content

Commit

Permalink
Report errors to the error reporter
Browse files Browse the repository at this point in the history
  • Loading branch information
npezza93 committed Oct 6, 2024
1 parent a57660c commit 4c9a81b
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 14 deletions.
9 changes: 5 additions & 4 deletions app/models/solid_queue/claimed_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ def release_all
end
end

def fail_all_with(error)
def fail_all_with(error, reraise:)
SolidQueue.instrument(:fail_many_claimed) do |payload|
includes(:job).tap do |executions|
executions.each { |execution| execution.failed_with(error) }
executions.each { |execution| execution.failed_with(error, reraise: reraise) }

payload[:process_ids] = executions.map(&:process_id).uniq
payload[:job_ids] = executions.map(&:job_id).uniq
Expand All @@ -63,7 +63,7 @@ def perform
if result.success?
finished
else
failed_with(result.error)
failed_with(result.error, reraise: true)
end
ensure
job.unblock_next_blocked_job
Expand All @@ -82,11 +82,12 @@ def discard
raise UndiscardableError, "Can't discard a job in progress"
end

def failed_with(error)
def failed_with(error, reraise:)
transaction do
job.failed_with(error)
destroy!
end
raise error if reraise
end

private
Expand Down
4 changes: 2 additions & 2 deletions app/models/solid_queue/process/executor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ module Executor
after_destroy :release_all_claimed_executions
end

def fail_all_claimed_executions_with(error)
def fail_all_claimed_executions_with(error, reraise:)
if claims_executions?
claimed_executions.fail_all_with(error)
claimed_executions.fail_all_with(error, reraise: reraise)
end
end

Expand Down
2 changes: 1 addition & 1 deletion app/models/solid_queue/process/prunable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def prune(excluding: nil)

def prune
error = Processes::ProcessPrunedError.new(last_heartbeat_at)
fail_all_claimed_executions_with(error)
fail_all_claimed_executions_with(error, reraise: false)

deregister(pruned: true)
end
Expand Down
2 changes: 1 addition & 1 deletion lib/solid_queue/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ def replace_fork(pid, status)
def handle_claimed_jobs_by(terminated_fork, status)
if registered_process = process.supervisees.find_by(name: terminated_fork.name)
error = Processes::ProcessExitError.new(status)
registered_process.fail_all_claimed_executions_with(error)
registered_process.fail_all_claimed_executions_with(error, reraise: false)
end
end

Expand Down
3 changes: 2 additions & 1 deletion lib/solid_queue/supervisor/maintenance.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ def prune_dead_processes

def fail_orphaned_executions
wrap_in_app_executor do
ClaimedExecution.orphaned.fail_all_with(Processes::ProcessMissingError.new)
ClaimedExecution.orphaned.
fail_all_with(Processes::ProcessMissingError.new, reraise: false)
end
end
end
Expand Down
14 changes: 9 additions & 5 deletions test/models/solid_queue/claimed_execution_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ class SolidQueue::ClaimedExecutionTest < ActiveSupport::TestCase
job = claimed_execution.job

assert_difference -> { SolidQueue::ClaimedExecution.count } => -1, -> { SolidQueue::FailedExecution.count } => 1 do
claimed_execution.perform
assert_raises RuntimeError do
claimed_execution.perform
end
end

assert_not job.reload.finished?
Expand All @@ -37,10 +39,12 @@ class SolidQueue::ClaimedExecutionTest < ActiveSupport::TestCase
test "job failures are reported via Rails error subscriber" do
subscriber = ErrorBuffer.new

with_error_subscriber(subscriber) do
claimed_execution = prepare_and_claim_job RaisingJob.perform_later(RuntimeError, "B")
assert_raises RuntimeError do
with_error_subscriber(subscriber) do
claimed_execution = prepare_and_claim_job RaisingJob.perform_later(RuntimeError, "B")

claimed_execution.perform
claimed_execution.perform
end
end

assert_equal 1, subscriber.errors.count
Expand All @@ -63,7 +67,7 @@ class SolidQueue::ClaimedExecutionTest < ActiveSupport::TestCase
job = claimed_execution.job

assert_difference -> { SolidQueue::ClaimedExecution.count } => -1, -> { SolidQueue::FailedExecution.count } => 1 do
claimed_execution.failed_with(RuntimeError.new)
claimed_execution.failed_with(RuntimeError.new, reraise: false)
end

assert job.reload.failed?
Expand Down
17 changes: 17 additions & 0 deletions test/unit/worker_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,23 @@ class WorkerTest < ActiveSupport::TestCase
SolidQueue.on_thread_error = original_on_thread_error
end

test "errors on claimed executions are reported via Rails error subscriber" do
subscriber = ErrorBuffer.new
Rails.error.subscribe(subscriber)

RaisingJob.perform_later(RuntimeError, "B")

@worker.start

wait_for_jobs_to_finish_for(1.second)
@worker.wake_up

assert_equal 1, subscriber.errors.count
assert_equal "This is a RuntimeError exception", subscriber.messages.first
ensure
Rails.error.unsubscribe(subscriber) if Rails.error.respond_to?(:unsubscribe)
end

test "claim and process more enqueued jobs than the pool size allows to process at once" do
5.times do |i|
StoreResultJob.perform_later(:paused, pause: 0.1.second)
Expand Down

0 comments on commit 4c9a81b

Please sign in to comment.