Skip to content

Commit

Permalink
feat(scheduler): add a channel to handle misfired jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn committed Apr 12, 2024
1 parent e55b059 commit 7e767c8
Showing 1 changed file with 12 additions and 1 deletion.
13 changes: 12 additions & 1 deletion quartz/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ type StdSchedulerOptions struct {
// using a custom implementation of the JobQueue, where operations
// may timeout or fail.
RetryInterval time.Duration

// MisfiredChan allows for creating event listeners to handle jobs that
// have failed to be executed on time.
// A misfire occurs if a job misses its firing time because the scheduler
// is down or there are no available workers in the pool to execute the job.
MisfiredChan chan ScheduledJob
}

// Verify StdScheduler satisfies the Scheduler interface.
Expand Down Expand Up @@ -471,6 +477,7 @@ func (sched *StdScheduler) executeAndReschedule(ctx context.Context) {
case sched.opts.WorkerLimit > 0:
select {
case sched.dispatch <- scheduled:
case sched.opts.MisfiredChan <- scheduled:
case <-ctx.Done():
return
}
Expand Down Expand Up @@ -516,7 +523,11 @@ func (sched *StdScheduler) validateJob(job ScheduledJob) (bool, func() (int64, e
now := NowNano()
if job.NextRunTime() < now-sched.opts.OutdatedThreshold.Nanoseconds() {
duration := time.Duration(now - job.NextRunTime())
logger.Debugf("Job %s skipped as outdated %s.", job.JobDetail().jobKey, duration)
logger.Debugf("Job %s is outdated %s.", job.JobDetail().jobKey, duration)
select {
case sched.opts.MisfiredChan <- job:
default:
}
return false, func() (int64, error) { return job.Trigger().NextFireTime(now) }
} else if job.NextRunTime() > now {
logger.Debugf("Job %s is not due to run yet.", job.JobDetail().jobKey)
Expand Down

0 comments on commit 7e767c8

Please sign in to comment.