From dddb53605db9381c2e42a03dbb9d4a3ba31d998e Mon Sep 17 00:00:00 2001 From: reugn Date: Thu, 26 Sep 2024 17:51:26 +0300 Subject: [PATCH] refactor: use atomic types introduced in go1.19 --- job/function_job_test.go | 8 +++--- job/isolated_job.go | 11 ++++---- job/job_test.go | 10 +++---- quartz/scheduler_test.go | 60 +++++++++++++++++++++------------------- 4 files changed, 45 insertions(+), 44 deletions(-) diff --git a/job/function_job_test.go b/job/function_job_test.go index b54e187..2174e0b 100644 --- a/job/function_job_test.go +++ b/job/function_job_test.go @@ -15,14 +15,14 @@ func TestFunctionJob(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - var n int32 = 2 + var n atomic.Int32 funcJob1 := job.NewFunctionJob(func(_ context.Context) (string, error) { - atomic.AddInt32(&n, 2) + n.Add(2) return "fired1", nil }) funcJob2 := job.NewFunctionJob(func(_ context.Context) (*int, error) { - atomic.AddInt32(&n, 2) + n.Add(2) result := 42 return &result, nil }) @@ -48,7 +48,7 @@ func TestFunctionJob(t *testing.T) { assert.NotEqual(t, funcJob2.Result(), nil) assert.Equal(t, *funcJob2.Result(), 42) - assert.Equal(t, int(atomic.LoadInt32(&n)), 6) + assert.Equal(t, n.Load(), 4) } func TestNewFunctionJob_WithDesc(t *testing.T) { diff --git a/job/isolated_job.go b/job/isolated_job.go index 578559c..f02b5c9 100644 --- a/job/isolated_job.go +++ b/job/isolated_job.go @@ -10,15 +10,15 @@ import ( type isolatedJob struct { quartz.Job - // TODO: switch this to an atomic.Bool when upgrading to/past go1.19 - isRunning *atomic.Value + isRunning atomic.Bool } var _ quartz.Job = (*isolatedJob)(nil) -// Execute is called by a Scheduler when the Trigger associated with this job fires. +// Execute is called by a Scheduler when the Trigger associated +// with this job fires. func (j *isolatedJob) Execute(ctx context.Context) error { - if wasRunning := j.isRunning.Swap(true); wasRunning != nil && wasRunning.(bool) { + if wasRunning := j.isRunning.Swap(true); wasRunning { return errors.New("job is running") } defer j.isRunning.Store(false) @@ -30,7 +30,6 @@ func (j *isolatedJob) Execute(ctx context.Context) error { // instance of the job's Execute method can be called at a time. func NewIsolatedJob(underlying quartz.Job) quartz.Job { return &isolatedJob{ - Job: underlying, - isRunning: &atomic.Value{}, + Job: underlying, } } diff --git a/job/job_test.go b/job/job_test.go index 315f1ed..1570900 100644 --- a/job/job_test.go +++ b/job/job_test.go @@ -20,9 +20,9 @@ import ( func TestMultipleExecution(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - var n int64 + var n atomic.Int64 job1 := job.NewIsolatedJob(job.NewFunctionJob(func(ctx context.Context) (bool, error) { - atomic.AddInt64(&n, 1) + n.Add(1) timer := time.NewTimer(time.Minute) defer timer.Stop() select { @@ -72,7 +72,7 @@ loop: for i := 0; i < 1000; i++ { select { case <-ticker.C: - if atomic.LoadInt64(&n) != 1 { + if n.Load() != 1 { t.Error("only one job should run") } case <-ctx.Done(): @@ -83,7 +83,7 @@ loop: // stop all the adding threads without canceling the context close(sig) - if atomic.LoadInt64(&n) != 1 { + if n.Load() != 1 { t.Error("only one job should run") } } @@ -222,7 +222,7 @@ func TestShellJob_Execute(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { sh := job.NewShellJob(tt.args.Cmd) - _ = sh.Execute(context.TODO()) + _ = sh.Execute(context.Background()) assert.Equal(t, tt.args.ExitCode, sh.ExitCode()) assert.Equal(t, tt.args.Stderr, sh.Stderr()) diff --git a/quartz/scheduler_test.go b/quartz/scheduler_test.go index 0f712a7..aff9e7b 100644 --- a/quartz/scheduler_test.go +++ b/quartz/scheduler_test.go @@ -115,10 +115,10 @@ func TestScheduler_BlockingSemantics(t *testing.T) { defer cancel() sched.Start(ctx) - var n int64 + var n atomic.Int64 timerJob := quartz.NewJobDetail( job.NewFunctionJob(func(ctx context.Context) (bool, error) { - atomic.AddInt64(&n, 1) + n.Add(1) timer := time.NewTimer(time.Hour) defer timer.Stop() select { @@ -139,7 +139,7 @@ func TestScheduler_BlockingSemantics(t *testing.T) { } ticker := time.NewTicker(100 * time.Millisecond) <-ticker.C - if atomic.LoadInt64(&n) == 0 { + if n.Load() == 0 { t.Error("job should have run at least once") } @@ -152,7 +152,7 @@ func TestScheduler_BlockingSemantics(t *testing.T) { case <-ctx.Done(): break BLOCKING case <-ticker.C: - num := atomic.LoadInt64(&n) + num := n.Load() if num != 1 { t.Error("job should have only run once", num) } @@ -166,7 +166,7 @@ func TestScheduler_BlockingSemantics(t *testing.T) { case <-ctx.Done(): break NONBLOCKING case <-ticker.C: - num := atomic.LoadInt64(&n) + num := n.Load() if num <= lastN { t.Errorf("on iter %d n did not increase %d", iters, num, @@ -182,7 +182,7 @@ func TestScheduler_BlockingSemantics(t *testing.T) { case <-ctx.Done(): break WORKERS case <-ticker.C: - num := atomic.LoadInt64(&n) + num := n.Load() if num > int64(opts.WorkerLimit) { t.Errorf("on iter %d n %d was more than limit %d", iters, num, opts.WorkerLimit, @@ -292,16 +292,16 @@ func TestScheduler_Cancel(t *testing.T) { } func TestScheduler_JobWithRetries(t *testing.T) { - var n int32 + var n atomic.Int32 funcRetryJob := job.NewFunctionJob(func(_ context.Context) (string, error) { - atomic.AddInt32(&n, 1) - if n < 3 { + if n.Add(1) < 3 { return "", errors.New("less than 3") } return "ok", nil }) ctx := context.Background() sched := quartz.NewStdScheduler() + opts := quartz.NewDefaultJobDetailOptions() opts.MaxRetries = 3 opts.RetryInterval = 50 * time.Millisecond @@ -312,44 +312,46 @@ func TestScheduler_JobWithRetries(t *testing.T) { ) err := sched.ScheduleJob(jobDetail, quartz.NewRunOnceTrigger(time.Millisecond)) assert.IsNil(t, err) + err = sched.ScheduleJob(jobDetail, quartz.NewRunOnceTrigger(time.Millisecond)) assert.ErrorIs(t, err, quartz.ErrIllegalState) assert.ErrorIs(t, err, quartz.ErrJobAlreadyExists) + jobDetail.Options().Replace = true err = sched.ScheduleJob(jobDetail, quartz.NewRunOnceTrigger(time.Millisecond)) assert.IsNil(t, err) assert.Equal(t, funcRetryJob.JobStatus(), job.StatusNA) - assert.Equal(t, int(atomic.LoadInt32(&n)), 0) + assert.Equal(t, n.Load(), 0) sched.Start(ctx) time.Sleep(25 * time.Millisecond) assert.Equal(t, funcRetryJob.JobStatus(), job.StatusFailure) - assert.Equal(t, int(atomic.LoadInt32(&n)), 1) + assert.Equal(t, n.Load(), 1) time.Sleep(50 * time.Millisecond) assert.Equal(t, funcRetryJob.JobStatus(), job.StatusFailure) - assert.Equal(t, int(atomic.LoadInt32(&n)), 2) + assert.Equal(t, n.Load(), 2) time.Sleep(100 * time.Millisecond) assert.Equal(t, funcRetryJob.JobStatus(), job.StatusOK) - assert.Equal(t, int(atomic.LoadInt32(&n)), 3) + assert.Equal(t, n.Load(), 3) sched.Stop() } func TestScheduler_JobWithRetriesCtxDone(t *testing.T) { - var n int32 + var n atomic.Int32 funcRetryJob := job.NewFunctionJob(func(_ context.Context) (string, error) { - atomic.AddInt32(&n, 1) - if n < 3 { + if n.Add(1) < 3 { return "", errors.New("less than 3") } return "ok", nil }) ctx, cancel := context.WithCancel(context.Background()) sched := quartz.NewStdScheduler() + opts := quartz.NewDefaultJobDetailOptions() opts.MaxRetries = 3 opts.RetryInterval = 50 * time.Millisecond @@ -362,23 +364,23 @@ func TestScheduler_JobWithRetriesCtxDone(t *testing.T) { assert.IsNil(t, err) assert.Equal(t, funcRetryJob.JobStatus(), job.StatusNA) - assert.Equal(t, int(atomic.LoadInt32(&n)), 0) + assert.Equal(t, n.Load(), 0) sched.Start(ctx) time.Sleep(25 * time.Millisecond) assert.Equal(t, funcRetryJob.JobStatus(), job.StatusFailure) - assert.Equal(t, int(atomic.LoadInt32(&n)), 1) + assert.Equal(t, n.Load(), 1) time.Sleep(50 * time.Millisecond) assert.Equal(t, funcRetryJob.JobStatus(), job.StatusFailure) - assert.Equal(t, int(atomic.LoadInt32(&n)), 2) + assert.Equal(t, n.Load(), 2) cancel() // cancel the context after first retry time.Sleep(100 * time.Millisecond) assert.Equal(t, funcRetryJob.JobStatus(), job.StatusFailure) - assert.Equal(t, int(atomic.LoadInt32(&n)), 2) + assert.Equal(t, n.Load(), 2) sched.Stop() } @@ -413,9 +415,9 @@ func TestScheduler_JobPanic(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 35*time.Millisecond) defer cancel() - var n int32 + var n atomic.Int32 addJob := job.NewFunctionJob(func(_ context.Context) (int32, error) { - return atomic.AddInt32(&n, 1), nil + return n.Add(1), nil }) panicJob := job.NewFunctionJob(func(_ context.Context) (int32, error) { panic("error") @@ -432,13 +434,13 @@ func TestScheduler_JobPanic(t *testing.T) { assert.IsNil(t, err) sched.Wait(ctx) - assert.Equal(t, int(atomic.LoadInt32(&n)), 3) + assert.Equal(t, n.Load(), 3) } func TestScheduler_PauseResume(t *testing.T) { - var n int32 + var n atomic.Int32 funcJob := job.NewFunctionJob(func(_ context.Context) (string, error) { - atomic.AddInt32(&n, 1) + n.Add(1) return "ok", nil }) sched := quartz.NewStdScheduler() @@ -446,23 +448,23 @@ func TestScheduler_PauseResume(t *testing.T) { err := sched.ScheduleJob(jobDetail, quartz.NewSimpleTrigger(10*time.Millisecond)) assert.IsNil(t, err) - assert.Equal(t, int(atomic.LoadInt32(&n)), 0) + assert.Equal(t, n.Load(), 0) sched.Start(context.Background()) time.Sleep(55 * time.Millisecond) - assert.Equal(t, int(atomic.LoadInt32(&n)), 5) + assert.Equal(t, n.Load(), 5) err = sched.PauseJob(jobDetail.JobKey()) assert.IsNil(t, err) time.Sleep(55 * time.Millisecond) - assert.Equal(t, int(atomic.LoadInt32(&n)), 5) + assert.Equal(t, n.Load(), 5) err = sched.ResumeJob(jobDetail.JobKey()) assert.IsNil(t, err) time.Sleep(55 * time.Millisecond) - assert.Equal(t, int(atomic.LoadInt32(&n)), 10) + assert.Equal(t, n.Load(), 10) sched.Stop() }