Skip to content

Commit

Permalink
job: add isolated Job wrapper (#42)
Browse files Browse the repository at this point in the history
* job: prevent more than once instance of a job from running at once

* add docstring

* fixes/updates

* bump go version to match tests

* version go

* revert version change

* clean up job name

* downgrade type

* fix test

* Update quartz/job.go

* Update quartz/job_test.go

* Apply suggestions from code review

Co-authored-by: Eugene R. <[email protected]>

Co-authored-by: Eugene R <[email protected]>
  • Loading branch information
tychoish and reugn authored Jan 9, 2023
1 parent f28a9d7 commit a3e457b
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 0 deletions.
26 changes: 26 additions & 0 deletions quartz/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"net/http"
"os/exec"
"sync/atomic"
)

// Job represents an interface to be implemented by structs which represent a 'job'
Expand Down Expand Up @@ -151,3 +152,28 @@ func (cu *CurlJob) Execute(ctx context.Context) {
cu.StatusCode = resp.StatusCode
cu.Response = string(body)
}

type isolatedJob struct {
Job
// TODO: switch this to an atomic.Bool when upgrading to/past go1.19
isRunning *atomic.Value
}

// Execute is called by a Scheduler when the Trigger associated with this job fires.
func (j *isolatedJob) Execute(ctx context.Context) {
if wasRunning := j.isRunning.Swap(true); wasRunning != nil && wasRunning.(bool) {
return
}
defer j.isRunning.Store(false)

j.Job.Execute(ctx)
}

// NewIsolatedJob wraps a job object and ensures that only one
// instance of the job's Execute method can be called at a time.
func NewIsolatedJob(underlying Job) Job {
return &isolatedJob{
Job: underlying,
isRunning: &atomic.Value{},
}
}
84 changes: 84 additions & 0 deletions quartz/job_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package quartz_test

import (
"context"
"errors"
"math/rand"
"runtime"
"sync/atomic"
"testing"
"time"

"github.com/reugn/go-quartz/quartz"
)

func TestMultipleExecution(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
var n int64
job := quartz.NewIsolatedJob(quartz.NewFunctionJob(func(ctx context.Context) (bool, error) {
atomic.AddInt64(&n, 1)
timer := time.NewTimer(time.Minute)
defer timer.Stop()
select {
case <-ctx.Done():
if err := ctx.Err(); errors.Is(err, context.DeadlineExceeded) {
t.Error("should not have timed out")
}
case <-timer.C:
t.Error("should not have reached timeout")
}

return false, ctx.Err()
}))

// start a bunch of threads that run jobs
sig := make(chan struct{})
for i := 0; i < runtime.NumCPU(); i++ {
go func() {
timer := time.NewTimer(0)
defer timer.Stop()
count := 0
defer func() {
if count == 0 {
t.Error("should run at least once")
}
}()
for {
count++
select {
case <-timer.C:
// sleep for a jittered amount of
// time, less than 11ms
job.Execute(ctx)
case <-ctx.Done():
return
case <-sig:
return
}
timer.Reset(1 + time.Duration(rand.Int63n(10))*time.Millisecond)
}
}()
}

// check very often that we've only run one job
ticker := time.NewTicker(2 * time.Millisecond)
for i := 0; i < 1000; i++ {
select {
case <-ticker.C:
if atomic.LoadInt64(&n) != 1 {
t.Error("only one job should run")
}
case <-ctx.Done():
t.Error("should not have reached timeout")
break
}
}

// stop all of the adding threads without canceling
// the context
close(sig)
if atomic.LoadInt64(&n) != 1 {
t.Error("only one job should run")
}
}

0 comments on commit a3e457b

Please sign in to comment.