Skip to content

Commit

Permalink
scheduler: wait for all threads (#43)
Browse files Browse the repository at this point in the history
* scheduler: wait for all threads

* fix example compile

* cleanup constructor

* fix docs
  • Loading branch information
tychoish authored Jan 10, 2023
1 parent a3e457b commit 5dd722b
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 40 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type Scheduler interface {
// IsStarted determines whether the scheduler has been started.
IsStarted() bool
// ScheduleJob schedules a job using a specified trigger.
ScheduleJob(job Job, trigger Trigger) error
ScheduleJob(ctx context.Context, job Job, trigger Trigger) error
// GetJobKeys returns the keys of all of the scheduled jobs.
GetJobKeys() []int
// GetScheduledJob returns the scheduled job with the specified key.
Expand Down
16 changes: 8 additions & 8 deletions examples/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ func sampleScheduler(ctx context.Context, wg *sync.WaitGroup) {
cronJob := PrintJob{"Cron job"}
sched.Start(ctx)

sched.ScheduleJob(&PrintJob{"Ad hoc Job"}, quartz.NewRunOnceTrigger(time.Second*5))
sched.ScheduleJob(&PrintJob{"First job"}, quartz.NewSimpleTrigger(time.Second*12))
sched.ScheduleJob(&PrintJob{"Second job"}, quartz.NewSimpleTrigger(time.Second*6))
sched.ScheduleJob(&PrintJob{"Third job"}, quartz.NewSimpleTrigger(time.Second*3))
sched.ScheduleJob(&cronJob, cronTrigger)
sched.ScheduleJob(ctx, &PrintJob{"Ad hoc Job"}, quartz.NewRunOnceTrigger(time.Second*5))
sched.ScheduleJob(ctx, &PrintJob{"First job"}, quartz.NewSimpleTrigger(time.Second*12))
sched.ScheduleJob(ctx, &PrintJob{"Second job"}, quartz.NewSimpleTrigger(time.Second*6))
sched.ScheduleJob(ctx, &PrintJob{"Third job"}, quartz.NewSimpleTrigger(time.Second*3))
sched.ScheduleJob(ctx, &cronJob, cronTrigger)

time.Sleep(time.Second * 10)

Expand Down Expand Up @@ -78,9 +78,9 @@ func sampleJobs(ctx context.Context, wg *sync.WaitGroup) {
}
functionJob := quartz.NewFunctionJobWithDesc("42", func(_ context.Context) (int, error) { return 42, nil })

sched.ScheduleJob(shellJob, cronTrigger)
sched.ScheduleJob(curlJob, quartz.NewSimpleTrigger(time.Second*7))
sched.ScheduleJob(functionJob, quartz.NewSimpleTrigger(time.Second*3))
sched.ScheduleJob(ctx, shellJob, cronTrigger)
sched.ScheduleJob(ctx, curlJob, quartz.NewSimpleTrigger(time.Second*7))
sched.ScheduleJob(ctx, functionJob, quartz.NewSimpleTrigger(time.Second*3))

time.Sleep(time.Second * 10)

Expand Down
4 changes: 2 additions & 2 deletions quartz/function_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ func TestFunctionJob(t *testing.T) {

sched := quartz.NewStdScheduler()
sched.Start(ctx)
sched.ScheduleJob(funcJob1, quartz.NewRunOnceTrigger(time.Millisecond*300))
sched.ScheduleJob(funcJob2, quartz.NewRunOnceTrigger(time.Millisecond*800))
sched.ScheduleJob(ctx, funcJob1, quartz.NewRunOnceTrigger(time.Millisecond*300))
sched.ScheduleJob(ctx, funcJob2, quartz.NewRunOnceTrigger(time.Millisecond*800))
time.Sleep(time.Second)
sched.Clear()
sched.Stop()
Expand Down
32 changes: 20 additions & 12 deletions quartz/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type Scheduler interface {
IsStarted() bool

// ScheduleJob schedules a job using a specified trigger.
ScheduleJob(job Job, trigger Trigger) error
ScheduleJob(ctx context.Context, job Job, trigger Trigger) error

// GetJobKeys returns the keys of all of the scheduled jobs.
GetJobKeys() []int
Expand All @@ -56,9 +56,9 @@ type Scheduler interface {
// StdScheduler implements the quartz.Scheduler interface.
type StdScheduler struct {
mtx sync.Mutex
wg *sync.WaitGroup
queue *priorityQueue
interrupt chan struct{}
signal chan struct{}
cancel context.CancelFunc
feeder chan *item
dispatch chan *item
Expand Down Expand Up @@ -94,17 +94,16 @@ func NewStdScheduler() Scheduler {
func NewStdSchedulerWithOptions(opts StdSchedulerOptions) *StdScheduler {
return &StdScheduler{
queue: &priorityQueue{},
wg: &sync.WaitGroup{},
interrupt: make(chan struct{}, 1),
cancel: func() {},
feeder: make(chan *item),
signal: make(chan struct{}),
dispatch: make(chan *item),
opts: opts,
}
}

// ScheduleJob schedules a Job using a specified Trigger.
func (sched *StdScheduler) ScheduleJob(job Job, trigger Trigger) error {
func (sched *StdScheduler) ScheduleJob(ctx context.Context, job Job, trigger Trigger) error {
nextRunTime, err := trigger.NextFireTime(NowNano())
if err != nil {
return err
Expand All @@ -118,8 +117,8 @@ func (sched *StdScheduler) ScheduleJob(job Job, trigger Trigger) error {
index: 0,
}:
return nil
case <-sched.signal:
return context.Canceled
case <-ctx.Done():
return ctx.Err()
}
}

Expand All @@ -135,23 +134,26 @@ func (sched *StdScheduler) Start(ctx context.Context) {
ctx, sched.cancel = context.WithCancel(ctx)
go func() { <-ctx.Done(); sched.Stop() }()
// start the feed reader
sched.wg.Add(1)
go sched.startFeedReader(ctx)

// start scheduler execution loop
sched.wg.Add(1)
go sched.startExecutionLoop(ctx)

// starts worker pool when WorkerLimit is > 0
sched.startWorkers(ctx)

sched.started = true
sched.signal = make(chan struct{})
}

// Wait blocks until the scheduler shuts down.
func (sched *StdScheduler) Wait(ctx context.Context) {
sig := make(chan struct{})
go func() { defer close(sig); sched.wg.Wait() }()
select {
case <-ctx.Done():
case <-sched.signal:
case <-sig:
}
}

Expand Down Expand Up @@ -227,11 +229,10 @@ func (sched *StdScheduler) Stop() {
log.Printf("Closing the StdScheduler.")
sched.cancel()
sched.started = false
close(sched.signal)
}

func (sched *StdScheduler) startExecutionLoop(ctx context.Context) {

defer sched.wg.Done()
for {
if sched.queueLen() == 0 {
select {
Expand Down Expand Up @@ -261,7 +262,9 @@ func (sched *StdScheduler) startExecutionLoop(ctx context.Context) {
func (sched *StdScheduler) startWorkers(ctx context.Context) {
if sched.opts.WorkerLimit > 0 {
for i := 0; i < sched.opts.WorkerLimit; i++ {
sched.wg.Add(1)
go func() {
defer sched.wg.Done()
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -320,7 +323,11 @@ func (sched *StdScheduler) executeAndReschedule(ctx context.Context) {
return
}
default:
go it.Job.Execute(ctx)
sched.wg.Add(1)
go func() {
defer sched.wg.Done()
it.Job.Execute(ctx)
}()
}
}

Expand All @@ -338,6 +345,7 @@ func (sched *StdScheduler) executeAndReschedule(ctx context.Context) {
}

func (sched *StdScheduler) startFeedReader(ctx context.Context) {
defer sched.wg.Done()
for {
select {
case item := <-sched.feeder:
Expand Down
36 changes: 19 additions & 17 deletions quartz/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ func TestScheduler(t *testing.T) {
jobKeys[3] = errCurlJob.Key()

sched.Start(ctx)
sched.ScheduleJob(shellJob, quartz.NewSimpleTrigger(time.Millisecond*800))
sched.ScheduleJob(curlJob, quartz.NewRunOnceTrigger(time.Millisecond))
sched.ScheduleJob(errShellJob, quartz.NewRunOnceTrigger(time.Millisecond))
sched.ScheduleJob(errCurlJob, quartz.NewSimpleTrigger(time.Millisecond*800))
sched.ScheduleJob(ctx, shellJob, quartz.NewSimpleTrigger(time.Millisecond*800))
sched.ScheduleJob(ctx, curlJob, quartz.NewRunOnceTrigger(time.Millisecond))
sched.ScheduleJob(ctx, errShellJob, quartz.NewRunOnceTrigger(time.Millisecond))
sched.ScheduleJob(ctx, errCurlJob, quartz.NewSimpleTrigger(time.Millisecond*800))

time.Sleep(time.Second)
scheduledJobKeys := sched.GetJobKeys()
Expand Down Expand Up @@ -89,17 +89,19 @@ func TestSchedulerBlockingSemantics(t *testing.T) {
sched.Start(ctx)

var n int64
sched.ScheduleJob(quartz.NewFunctionJob(func(ctx context.Context) (bool, error) {
atomic.AddInt64(&n, 1)
timer := time.NewTimer(time.Hour)
defer timer.Stop()
select {
case <-timer.C:
return false, nil
case <-ctx.Done():
return true, nil
}
}), quartz.NewSimpleTrigger(time.Millisecond))
sched.ScheduleJob(ctx,
quartz.NewFunctionJob(func(ctx context.Context) (bool, error) {
atomic.AddInt64(&n, 1)
timer := time.NewTimer(time.Hour)
defer timer.Stop()
select {
case <-timer.C:
return false, nil
case <-ctx.Done():
return true, nil
}
}),
quartz.NewSimpleTrigger(time.Millisecond))

ticker := time.NewTicker(4 * time.Millisecond)
<-ticker.C
Expand Down Expand Up @@ -197,7 +199,7 @@ func TestSchedulerCancel(t *testing.T) {
}

for i := 0; i < 100; i++ {
if err := sched.ScheduleJob(
if err := sched.ScheduleJob(ctx,
quartz.NewFunctionJob(hourJob),
quartz.NewSimpleTrigger(100*time.Millisecond),
); err != nil {
Expand Down Expand Up @@ -229,7 +231,7 @@ func TestSchedulerCancel(t *testing.T) {

sched.Wait(waitCtx)
if err := waitCtx.Err(); err != nil {
t.Fatal("waiting timed out before resources were released")
t.Fatal("waiting timed out before resources were released", err)
}

endingRoutines := runtime.NumGoroutine()
Expand Down

0 comments on commit 5dd722b

Please sign in to comment.