From a68b809459aae81ef015ce4697992f9bfb3e73f6 Mon Sep 17 00:00:00 2001 From: reugn Date: Sat, 13 Apr 2024 12:30:56 +0300 Subject: [PATCH] feat(scheduler): add a channel to handle misfired jobs --- quartz/scheduler.go | 14 +++++++++++++- quartz/scheduler_test.go | 26 ++++++++++++++++++++++++++ 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/quartz/scheduler.go b/quartz/scheduler.go index 9fc7fa9..5c77b52 100644 --- a/quartz/scheduler.go +++ b/quartz/scheduler.go @@ -112,6 +112,14 @@ type StdSchedulerOptions struct { // using a custom implementation of the JobQueue, where operations // may timeout or fail. RetryInterval time.Duration + + // MisfiredChan allows the creation of event listeners to handle jobs that + // have failed to be executed on time and have been skipped by the scheduler. + // + // Misfires can occur due to insufficient resources or scheduler downtime. + // Adjust OutdatedThreshold to establish an acceptable delay time and + // ensure regular job execution. + MisfiredChan chan ScheduledJob } // Verify StdScheduler satisfies the Scheduler interface. @@ -516,7 +524,11 @@ func (sched *StdScheduler) validateJob(job ScheduledJob) (bool, func() (int64, e now := NowNano() if job.NextRunTime() < now-sched.opts.OutdatedThreshold.Nanoseconds() { duration := time.Duration(now - job.NextRunTime()) - logger.Debugf("Job %s skipped as outdated %s.", job.JobDetail().jobKey, duration) + logger.Debugf("Job %s is outdated %s.", job.JobDetail().jobKey, duration) + select { + case sched.opts.MisfiredChan <- job: + default: + } return false, func() (int64, error) { return job.Trigger().NextFireTime(now) } } else if job.NextRunTime() > now { logger.Debugf("Job %s is not due to run yet.", job.JobDetail().jobKey) diff --git a/quartz/scheduler_test.go b/quartz/scheduler_test.go index 62dbf03..efa4df6 100644 --- a/quartz/scheduler_test.go +++ b/quartz/scheduler_test.go @@ -382,6 +382,32 @@ func TestScheduler_JobWithRetriesCtxDone(t *testing.T) { sched.Stop() } +func TestScheduler_MisfiredJob(t *testing.T) { + funcJob := job.NewFunctionJob(func(_ context.Context) (string, error) { + time.Sleep(20 * time.Millisecond) + return "ok", nil + }) + + misfiredChan := make(chan quartz.ScheduledJob, 1) + sched := quartz.NewStdSchedulerWithOptions(quartz.StdSchedulerOptions{ + BlockingExecution: true, + OutdatedThreshold: time.Millisecond, + RetryInterval: time.Millisecond, + MisfiredChan: misfiredChan, + }, nil) + + jobDetail := quartz.NewJobDetail(funcJob, quartz.NewJobKey("funcJob")) + err := sched.ScheduleJob(jobDetail, quartz.NewSimpleTrigger(2*time.Millisecond)) + assert.IsNil(t, err) + + sched.Start(context.Background()) + + job := <-misfiredChan + assert.Equal(t, job.JobDetail().JobKey().Name(), "funcJob") + + sched.Stop() +} + func TestScheduler_PauseResume(t *testing.T) { var n int32 funcJob := job.NewFunctionJob(func(_ context.Context) (string, error) {