diff --git a/README.md b/README.md index 0c86771..2085db1 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,7 @@ type Scheduler interface { // IsStarted determines whether the scheduler has been started. IsStarted() bool // ScheduleJob schedules a job using a specified trigger. - ScheduleJob(job Job, trigger Trigger) error + ScheduleJob(ctx context.Context, job Job, trigger Trigger) error // GetJobKeys returns the keys of all of the scheduled jobs. GetJobKeys() []int // GetScheduledJob returns the scheduled job with the specified key. diff --git a/examples/main.go b/examples/main.go index ee9c15e..1199dee 100644 --- a/examples/main.go +++ b/examples/main.go @@ -35,11 +35,11 @@ func sampleScheduler(ctx context.Context, wg *sync.WaitGroup) { cronJob := PrintJob{"Cron job"} sched.Start(ctx) - sched.ScheduleJob(&PrintJob{"Ad hoc Job"}, quartz.NewRunOnceTrigger(time.Second*5)) - sched.ScheduleJob(&PrintJob{"First job"}, quartz.NewSimpleTrigger(time.Second*12)) - sched.ScheduleJob(&PrintJob{"Second job"}, quartz.NewSimpleTrigger(time.Second*6)) - sched.ScheduleJob(&PrintJob{"Third job"}, quartz.NewSimpleTrigger(time.Second*3)) - sched.ScheduleJob(&cronJob, cronTrigger) + sched.ScheduleJob(ctx, &PrintJob{"Ad hoc Job"}, quartz.NewRunOnceTrigger(time.Second*5)) + sched.ScheduleJob(ctx, &PrintJob{"First job"}, quartz.NewSimpleTrigger(time.Second*12)) + sched.ScheduleJob(ctx, &PrintJob{"Second job"}, quartz.NewSimpleTrigger(time.Second*6)) + sched.ScheduleJob(ctx, &PrintJob{"Third job"}, quartz.NewSimpleTrigger(time.Second*3)) + sched.ScheduleJob(ctx, &cronJob, cronTrigger) time.Sleep(time.Second * 10) @@ -78,9 +78,9 @@ func sampleJobs(ctx context.Context, wg *sync.WaitGroup) { } functionJob := quartz.NewFunctionJobWithDesc("42", func(_ context.Context) (int, error) { return 42, nil }) - sched.ScheduleJob(shellJob, cronTrigger) - sched.ScheduleJob(curlJob, quartz.NewSimpleTrigger(time.Second*7)) - sched.ScheduleJob(functionJob, quartz.NewSimpleTrigger(time.Second*3)) + sched.ScheduleJob(ctx, shellJob, cronTrigger) + sched.ScheduleJob(ctx, curlJob, quartz.NewSimpleTrigger(time.Second*7)) + sched.ScheduleJob(ctx, functionJob, quartz.NewSimpleTrigger(time.Second*3)) time.Sleep(time.Second * 10) diff --git a/quartz/function_job_test.go b/quartz/function_job_test.go index 86e397c..0a561d4 100644 --- a/quartz/function_job_test.go +++ b/quartz/function_job_test.go @@ -26,8 +26,8 @@ func TestFunctionJob(t *testing.T) { sched := quartz.NewStdScheduler() sched.Start(ctx) - sched.ScheduleJob(funcJob1, quartz.NewRunOnceTrigger(time.Millisecond*300)) - sched.ScheduleJob(funcJob2, quartz.NewRunOnceTrigger(time.Millisecond*800)) + sched.ScheduleJob(ctx, funcJob1, quartz.NewRunOnceTrigger(time.Millisecond*300)) + sched.ScheduleJob(ctx, funcJob2, quartz.NewRunOnceTrigger(time.Millisecond*800)) time.Sleep(time.Second) sched.Clear() sched.Stop() diff --git a/quartz/scheduler.go b/quartz/scheduler.go index b005e96..715cbbc 100644 --- a/quartz/scheduler.go +++ b/quartz/scheduler.go @@ -29,7 +29,7 @@ type Scheduler interface { IsStarted() bool // ScheduleJob schedules a job using a specified trigger. - ScheduleJob(job Job, trigger Trigger) error + ScheduleJob(ctx context.Context, job Job, trigger Trigger) error // GetJobKeys returns the keys of all of the scheduled jobs. GetJobKeys() []int @@ -56,9 +56,9 @@ type Scheduler interface { // StdScheduler implements the quartz.Scheduler interface. type StdScheduler struct { mtx sync.Mutex + wg *sync.WaitGroup queue *priorityQueue interrupt chan struct{} - signal chan struct{} cancel context.CancelFunc feeder chan *item dispatch chan *item @@ -94,17 +94,16 @@ func NewStdScheduler() Scheduler { func NewStdSchedulerWithOptions(opts StdSchedulerOptions) *StdScheduler { return &StdScheduler{ queue: &priorityQueue{}, + wg: &sync.WaitGroup{}, interrupt: make(chan struct{}, 1), - cancel: func() {}, feeder: make(chan *item), - signal: make(chan struct{}), dispatch: make(chan *item), opts: opts, } } // ScheduleJob schedules a Job using a specified Trigger. -func (sched *StdScheduler) ScheduleJob(job Job, trigger Trigger) error { +func (sched *StdScheduler) ScheduleJob(ctx context.Context, job Job, trigger Trigger) error { nextRunTime, err := trigger.NextFireTime(NowNano()) if err != nil { return err @@ -118,8 +117,8 @@ func (sched *StdScheduler) ScheduleJob(job Job, trigger Trigger) error { index: 0, }: return nil - case <-sched.signal: - return context.Canceled + case <-ctx.Done(): + return ctx.Err() } } @@ -135,23 +134,26 @@ func (sched *StdScheduler) Start(ctx context.Context) { ctx, sched.cancel = context.WithCancel(ctx) go func() { <-ctx.Done(); sched.Stop() }() // start the feed reader + sched.wg.Add(1) go sched.startFeedReader(ctx) // start scheduler execution loop + sched.wg.Add(1) go sched.startExecutionLoop(ctx) // starts worker pool when WorkerLimit is > 0 sched.startWorkers(ctx) sched.started = true - sched.signal = make(chan struct{}) } // Wait blocks until the scheduler shuts down. func (sched *StdScheduler) Wait(ctx context.Context) { + sig := make(chan struct{}) + go func() { defer close(sig); sched.wg.Wait() }() select { case <-ctx.Done(): - case <-sched.signal: + case <-sig: } } @@ -227,11 +229,10 @@ func (sched *StdScheduler) Stop() { log.Printf("Closing the StdScheduler.") sched.cancel() sched.started = false - close(sched.signal) } func (sched *StdScheduler) startExecutionLoop(ctx context.Context) { - + defer sched.wg.Done() for { if sched.queueLen() == 0 { select { @@ -261,7 +262,9 @@ func (sched *StdScheduler) startExecutionLoop(ctx context.Context) { func (sched *StdScheduler) startWorkers(ctx context.Context) { if sched.opts.WorkerLimit > 0 { for i := 0; i < sched.opts.WorkerLimit; i++ { + sched.wg.Add(1) go func() { + defer sched.wg.Done() for { select { case <-ctx.Done(): @@ -320,7 +323,11 @@ func (sched *StdScheduler) executeAndReschedule(ctx context.Context) { return } default: - go it.Job.Execute(ctx) + sched.wg.Add(1) + go func() { + defer sched.wg.Done() + it.Job.Execute(ctx) + }() } } @@ -338,6 +345,7 @@ func (sched *StdScheduler) executeAndReschedule(ctx context.Context) { } func (sched *StdScheduler) startFeedReader(ctx context.Context) { + defer sched.wg.Done() for { select { case item := <-sched.feeder: diff --git a/quartz/scheduler_test.go b/quartz/scheduler_test.go index 8e28437..a90bc1f 100644 --- a/quartz/scheduler_test.go +++ b/quartz/scheduler_test.go @@ -35,10 +35,10 @@ func TestScheduler(t *testing.T) { jobKeys[3] = errCurlJob.Key() sched.Start(ctx) - sched.ScheduleJob(shellJob, quartz.NewSimpleTrigger(time.Millisecond*800)) - sched.ScheduleJob(curlJob, quartz.NewRunOnceTrigger(time.Millisecond)) - sched.ScheduleJob(errShellJob, quartz.NewRunOnceTrigger(time.Millisecond)) - sched.ScheduleJob(errCurlJob, quartz.NewSimpleTrigger(time.Millisecond*800)) + sched.ScheduleJob(ctx, shellJob, quartz.NewSimpleTrigger(time.Millisecond*800)) + sched.ScheduleJob(ctx, curlJob, quartz.NewRunOnceTrigger(time.Millisecond)) + sched.ScheduleJob(ctx, errShellJob, quartz.NewRunOnceTrigger(time.Millisecond)) + sched.ScheduleJob(ctx, errCurlJob, quartz.NewSimpleTrigger(time.Millisecond*800)) time.Sleep(time.Second) scheduledJobKeys := sched.GetJobKeys() @@ -89,17 +89,19 @@ func TestSchedulerBlockingSemantics(t *testing.T) { sched.Start(ctx) var n int64 - sched.ScheduleJob(quartz.NewFunctionJob(func(ctx context.Context) (bool, error) { - atomic.AddInt64(&n, 1) - timer := time.NewTimer(time.Hour) - defer timer.Stop() - select { - case <-timer.C: - return false, nil - case <-ctx.Done(): - return true, nil - } - }), quartz.NewSimpleTrigger(time.Millisecond)) + sched.ScheduleJob(ctx, + quartz.NewFunctionJob(func(ctx context.Context) (bool, error) { + atomic.AddInt64(&n, 1) + timer := time.NewTimer(time.Hour) + defer timer.Stop() + select { + case <-timer.C: + return false, nil + case <-ctx.Done(): + return true, nil + } + }), + quartz.NewSimpleTrigger(time.Millisecond)) ticker := time.NewTicker(4 * time.Millisecond) <-ticker.C @@ -197,7 +199,7 @@ func TestSchedulerCancel(t *testing.T) { } for i := 0; i < 100; i++ { - if err := sched.ScheduleJob( + if err := sched.ScheduleJob(ctx, quartz.NewFunctionJob(hourJob), quartz.NewSimpleTrigger(100*time.Millisecond), ); err != nil { @@ -229,7 +231,7 @@ func TestSchedulerCancel(t *testing.T) { sched.Wait(waitCtx) if err := waitCtx.Err(); err != nil { - t.Fatal("waiting timed out before resources were released") + t.Fatal("waiting timed out before resources were released", err) } endingRoutines := runtime.NumGoroutine()