Skip to content

Commit

Permalink
refactor: use atomic types introduced in go1.19
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn committed Sep 26, 2024
1 parent 8db8288 commit dddb536
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 44 deletions.
8 changes: 4 additions & 4 deletions job/function_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ func TestFunctionJob(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var n int32 = 2
var n atomic.Int32
funcJob1 := job.NewFunctionJob(func(_ context.Context) (string, error) {
atomic.AddInt32(&n, 2)
n.Add(2)
return "fired1", nil
})

funcJob2 := job.NewFunctionJob(func(_ context.Context) (*int, error) {
atomic.AddInt32(&n, 2)
n.Add(2)
result := 42
return &result, nil
})
Expand All @@ -48,7 +48,7 @@ func TestFunctionJob(t *testing.T) {
assert.NotEqual(t, funcJob2.Result(), nil)
assert.Equal(t, *funcJob2.Result(), 42)

assert.Equal(t, int(atomic.LoadInt32(&n)), 6)
assert.Equal(t, n.Load(), 4)
}

func TestNewFunctionJob_WithDesc(t *testing.T) {
Expand Down
11 changes: 5 additions & 6 deletions job/isolated_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ import (

type isolatedJob struct {
quartz.Job
// TODO: switch this to an atomic.Bool when upgrading to/past go1.19
isRunning *atomic.Value
isRunning atomic.Bool
}

var _ quartz.Job = (*isolatedJob)(nil)

// Execute is called by a Scheduler when the Trigger associated with this job fires.
// Execute is called by a Scheduler when the Trigger associated
// with this job fires.
func (j *isolatedJob) Execute(ctx context.Context) error {
if wasRunning := j.isRunning.Swap(true); wasRunning != nil && wasRunning.(bool) {
if wasRunning := j.isRunning.Swap(true); wasRunning {
return errors.New("job is running")
}
defer j.isRunning.Store(false)
Expand All @@ -30,7 +30,6 @@ func (j *isolatedJob) Execute(ctx context.Context) error {
// instance of the job's Execute method can be called at a time.
func NewIsolatedJob(underlying quartz.Job) quartz.Job {
return &isolatedJob{
Job: underlying,
isRunning: &atomic.Value{},
Job: underlying,
}
}
10 changes: 5 additions & 5 deletions job/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import (
func TestMultipleExecution(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
var n int64
var n atomic.Int64
job1 := job.NewIsolatedJob(job.NewFunctionJob(func(ctx context.Context) (bool, error) {
atomic.AddInt64(&n, 1)
n.Add(1)
timer := time.NewTimer(time.Minute)
defer timer.Stop()
select {
Expand Down Expand Up @@ -72,7 +72,7 @@ loop:
for i := 0; i < 1000; i++ {
select {
case <-ticker.C:
if atomic.LoadInt64(&n) != 1 {
if n.Load() != 1 {
t.Error("only one job should run")
}
case <-ctx.Done():
Expand All @@ -83,7 +83,7 @@ loop:

// stop all the adding threads without canceling the context
close(sig)
if atomic.LoadInt64(&n) != 1 {
if n.Load() != 1 {
t.Error("only one job should run")
}
}
Expand Down Expand Up @@ -222,7 +222,7 @@ func TestShellJob_Execute(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sh := job.NewShellJob(tt.args.Cmd)
_ = sh.Execute(context.TODO())
_ = sh.Execute(context.Background())

assert.Equal(t, tt.args.ExitCode, sh.ExitCode())
assert.Equal(t, tt.args.Stderr, sh.Stderr())
Expand Down
60 changes: 31 additions & 29 deletions quartz/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,10 @@ func TestScheduler_BlockingSemantics(t *testing.T) {
defer cancel()
sched.Start(ctx)

var n int64
var n atomic.Int64
timerJob := quartz.NewJobDetail(
job.NewFunctionJob(func(ctx context.Context) (bool, error) {
atomic.AddInt64(&n, 1)
n.Add(1)
timer := time.NewTimer(time.Hour)
defer timer.Stop()
select {
Expand All @@ -139,7 +139,7 @@ func TestScheduler_BlockingSemantics(t *testing.T) {
}
ticker := time.NewTicker(100 * time.Millisecond)
<-ticker.C
if atomic.LoadInt64(&n) == 0 {
if n.Load() == 0 {
t.Error("job should have run at least once")
}

Expand All @@ -152,7 +152,7 @@ func TestScheduler_BlockingSemantics(t *testing.T) {
case <-ctx.Done():
break BLOCKING
case <-ticker.C:
num := atomic.LoadInt64(&n)
num := n.Load()
if num != 1 {
t.Error("job should have only run once", num)
}
Expand All @@ -166,7 +166,7 @@ func TestScheduler_BlockingSemantics(t *testing.T) {
case <-ctx.Done():
break NONBLOCKING
case <-ticker.C:
num := atomic.LoadInt64(&n)
num := n.Load()
if num <= lastN {
t.Errorf("on iter %d n did not increase %d",
iters, num,
Expand All @@ -182,7 +182,7 @@ func TestScheduler_BlockingSemantics(t *testing.T) {
case <-ctx.Done():
break WORKERS
case <-ticker.C:
num := atomic.LoadInt64(&n)
num := n.Load()
if num > int64(opts.WorkerLimit) {
t.Errorf("on iter %d n %d was more than limit %d",
iters, num, opts.WorkerLimit,
Expand Down Expand Up @@ -292,16 +292,16 @@ func TestScheduler_Cancel(t *testing.T) {
}

func TestScheduler_JobWithRetries(t *testing.T) {
var n int32
var n atomic.Int32
funcRetryJob := job.NewFunctionJob(func(_ context.Context) (string, error) {
atomic.AddInt32(&n, 1)
if n < 3 {
if n.Add(1) < 3 {
return "", errors.New("less than 3")
}
return "ok", nil
})
ctx := context.Background()
sched := quartz.NewStdScheduler()

opts := quartz.NewDefaultJobDetailOptions()
opts.MaxRetries = 3
opts.RetryInterval = 50 * time.Millisecond
Expand All @@ -312,44 +312,46 @@ func TestScheduler_JobWithRetries(t *testing.T) {
)
err := sched.ScheduleJob(jobDetail, quartz.NewRunOnceTrigger(time.Millisecond))
assert.IsNil(t, err)

err = sched.ScheduleJob(jobDetail, quartz.NewRunOnceTrigger(time.Millisecond))
assert.ErrorIs(t, err, quartz.ErrIllegalState)
assert.ErrorIs(t, err, quartz.ErrJobAlreadyExists)

jobDetail.Options().Replace = true
err = sched.ScheduleJob(jobDetail, quartz.NewRunOnceTrigger(time.Millisecond))
assert.IsNil(t, err)

assert.Equal(t, funcRetryJob.JobStatus(), job.StatusNA)
assert.Equal(t, int(atomic.LoadInt32(&n)), 0)
assert.Equal(t, n.Load(), 0)

sched.Start(ctx)

time.Sleep(25 * time.Millisecond)
assert.Equal(t, funcRetryJob.JobStatus(), job.StatusFailure)
assert.Equal(t, int(atomic.LoadInt32(&n)), 1)
assert.Equal(t, n.Load(), 1)

time.Sleep(50 * time.Millisecond)
assert.Equal(t, funcRetryJob.JobStatus(), job.StatusFailure)
assert.Equal(t, int(atomic.LoadInt32(&n)), 2)
assert.Equal(t, n.Load(), 2)

time.Sleep(100 * time.Millisecond)
assert.Equal(t, funcRetryJob.JobStatus(), job.StatusOK)
assert.Equal(t, int(atomic.LoadInt32(&n)), 3)
assert.Equal(t, n.Load(), 3)

sched.Stop()
}

func TestScheduler_JobWithRetriesCtxDone(t *testing.T) {
var n int32
var n atomic.Int32
funcRetryJob := job.NewFunctionJob(func(_ context.Context) (string, error) {
atomic.AddInt32(&n, 1)
if n < 3 {
if n.Add(1) < 3 {
return "", errors.New("less than 3")
}
return "ok", nil
})
ctx, cancel := context.WithCancel(context.Background())
sched := quartz.NewStdScheduler()

opts := quartz.NewDefaultJobDetailOptions()
opts.MaxRetries = 3
opts.RetryInterval = 50 * time.Millisecond
Expand All @@ -362,23 +364,23 @@ func TestScheduler_JobWithRetriesCtxDone(t *testing.T) {
assert.IsNil(t, err)

assert.Equal(t, funcRetryJob.JobStatus(), job.StatusNA)
assert.Equal(t, int(atomic.LoadInt32(&n)), 0)
assert.Equal(t, n.Load(), 0)

sched.Start(ctx)

time.Sleep(25 * time.Millisecond)
assert.Equal(t, funcRetryJob.JobStatus(), job.StatusFailure)
assert.Equal(t, int(atomic.LoadInt32(&n)), 1)
assert.Equal(t, n.Load(), 1)

time.Sleep(50 * time.Millisecond)
assert.Equal(t, funcRetryJob.JobStatus(), job.StatusFailure)
assert.Equal(t, int(atomic.LoadInt32(&n)), 2)
assert.Equal(t, n.Load(), 2)

cancel() // cancel the context after first retry

time.Sleep(100 * time.Millisecond)
assert.Equal(t, funcRetryJob.JobStatus(), job.StatusFailure)
assert.Equal(t, int(atomic.LoadInt32(&n)), 2)
assert.Equal(t, n.Load(), 2)

sched.Stop()
}
Expand Down Expand Up @@ -413,9 +415,9 @@ func TestScheduler_JobPanic(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 35*time.Millisecond)
defer cancel()

var n int32
var n atomic.Int32
addJob := job.NewFunctionJob(func(_ context.Context) (int32, error) {
return atomic.AddInt32(&n, 1), nil
return n.Add(1), nil
})
panicJob := job.NewFunctionJob(func(_ context.Context) (int32, error) {
panic("error")
Expand All @@ -432,37 +434,37 @@ func TestScheduler_JobPanic(t *testing.T) {
assert.IsNil(t, err)

sched.Wait(ctx)
assert.Equal(t, int(atomic.LoadInt32(&n)), 3)
assert.Equal(t, n.Load(), 3)
}

func TestScheduler_PauseResume(t *testing.T) {
var n int32
var n atomic.Int32
funcJob := job.NewFunctionJob(func(_ context.Context) (string, error) {
atomic.AddInt32(&n, 1)
n.Add(1)
return "ok", nil
})
sched := quartz.NewStdScheduler()
jobDetail := quartz.NewJobDetail(funcJob, quartz.NewJobKey("funcJob"))
err := sched.ScheduleJob(jobDetail, quartz.NewSimpleTrigger(10*time.Millisecond))
assert.IsNil(t, err)

assert.Equal(t, int(atomic.LoadInt32(&n)), 0)
assert.Equal(t, n.Load(), 0)
sched.Start(context.Background())

time.Sleep(55 * time.Millisecond)
assert.Equal(t, int(atomic.LoadInt32(&n)), 5)
assert.Equal(t, n.Load(), 5)

err = sched.PauseJob(jobDetail.JobKey())
assert.IsNil(t, err)

time.Sleep(55 * time.Millisecond)
assert.Equal(t, int(atomic.LoadInt32(&n)), 5)
assert.Equal(t, n.Load(), 5)

err = sched.ResumeJob(jobDetail.JobKey())
assert.IsNil(t, err)

time.Sleep(55 * time.Millisecond)
assert.Equal(t, int(atomic.LoadInt32(&n)), 10)
assert.Equal(t, n.Load(), 10)

sched.Stop()
}
Expand Down

0 comments on commit dddb536

Please sign in to comment.