diff --git a/examples/main.go b/examples/main.go index cfe93f7..fb31f89 100644 --- a/examples/main.go +++ b/examples/main.go @@ -94,7 +94,9 @@ func sampleJobs(ctx context.Context, wg *sync.WaitGroup) { return } curlJob := job.NewCurlJob(request) - functionJob := job.NewFunctionJobWithDesc("42", func(_ context.Context) (int, error) { return 42, nil }) + functionJob := job.NewFunctionJobWithDesc( + func(_ context.Context) (int, error) { return 42, nil }, + "42") shellJobDetail := quartz.NewJobDetail(shellJob, quartz.NewJobKey("shellJob")) curlJobDetail := quartz.NewJobDetail(curlJob, quartz.NewJobKey("curlJob")) @@ -114,7 +116,7 @@ func sampleJobs(ctx context.Context, wg *sync.WaitGroup) { } else { fmt.Println(string(response)) } - fmt.Printf("Function job result: %v\n", *functionJob.Result()) + fmt.Printf("Function job result: %v\n", functionJob.Result()) time.Sleep(time.Second * 2) sched.Stop() diff --git a/job/curl_job.go b/job/curl_job.go index 628a73d..441dbd5 100644 --- a/job/curl_job.go +++ b/job/curl_job.go @@ -12,55 +12,58 @@ import ( "github.com/reugn/go-quartz/quartz" ) -// CurlJob represents a cURL command Job, implements the quartz.Job interface. -// cURL is a command-line tool for getting or sending data including files -// using URL syntax. +// CurlJob represents a job that can be used to schedule HTTP requests. +// It implements the [quartz.Job] interface. type CurlJob struct { - sync.Mutex - httpClient HTTPHandler - request *http.Request - response *http.Response - jobStatus Status + mtx sync.Mutex + httpClient HTTPHandler + request *http.Request + response *http.Response + jobStatus Status + + once sync.Once description string callback func(context.Context, *CurlJob) } var _ quartz.Job = (*CurlJob)(nil) -// HTTPHandler sends an HTTP request and returns an HTTP response, -// following policy (such as redirects, cookies, auth) as configured -// on the implementing HTTP client. +// HTTPHandler sends an HTTP request and returns an HTTP response, following +// policy (such as redirects, cookies, auth) as configured on the implementing +// HTTP client. type HTTPHandler interface { Do(req *http.Request) (*http.Response, error) } -// CurlJobOptions represents optional parameters for constructing a CurlJob. +// CurlJobOptions represents optional parameters for constructing a [CurlJob]. type CurlJobOptions struct { HTTPClient HTTPHandler Callback func(context.Context, *CurlJob) } -// NewCurlJob returns a new CurlJob using the default HTTP client. +// NewCurlJob returns a new [CurlJob] using the default HTTP client. func NewCurlJob(request *http.Request) *CurlJob { return NewCurlJobWithOptions(request, CurlJobOptions{HTTPClient: http.DefaultClient}) } -// NewCurlJobWithOptions returns a new CurlJob configured with CurlJobOptions. +// NewCurlJobWithOptions returns a new [CurlJob] configured with [CurlJobOptions]. func NewCurlJobWithOptions(request *http.Request, opts CurlJobOptions) *CurlJob { if opts.HTTPClient == nil { opts.HTTPClient = http.DefaultClient } return &CurlJob{ - httpClient: opts.HTTPClient, - request: request, - jobStatus: StatusNA, - description: formatRequest(request), - callback: opts.Callback, + httpClient: opts.HTTPClient, + request: request, + jobStatus: StatusNA, + callback: opts.Callback, } } // Description returns the description of the CurlJob. func (cu *CurlJob) Description() string { + cu.once.Do(func() { + cu.description = formatRequest(cu.request) + }) return fmt.Sprintf("CurlJob%s%s", quartz.Sep, cu.description) } @@ -68,8 +71,8 @@ func (cu *CurlJob) Description() string { // representation. // If body is true, DumpResponse also returns the body. func (cu *CurlJob) DumpResponse(body bool) ([]byte, error) { - cu.Lock() - defer cu.Unlock() + cu.mtx.Lock() + defer cu.mtx.Unlock() if cu.response != nil { return httputil.DumpResponse(cu.response, body) } @@ -78,42 +81,43 @@ func (cu *CurlJob) DumpResponse(body bool) ([]byte, error) { // JobStatus returns the status of the CurlJob. func (cu *CurlJob) JobStatus() Status { - cu.Lock() - defer cu.Unlock() + cu.mtx.Lock() + defer cu.mtx.Unlock() return cu.jobStatus } func formatRequest(r *http.Request) string { - var request []string - url := fmt.Sprintf("%v %v %v", r.Method, r.URL, r.Proto) - request = append(request, url) + var sb strings.Builder + _, _ = fmt.Fprintf(&sb, "%v %v %v", r.Method, r.URL, r.Proto) for name, headers := range r.Header { for _, h := range headers { - request = append(request, fmt.Sprintf("%v: %v", name, h)) + _, _ = fmt.Fprintf(&sb, "\n%v: %v", name, h) } } if r.ContentLength > 0 { - request = append(request, fmt.Sprintf("Content Length: %d", r.ContentLength)) + _, _ = fmt.Fprintf(&sb, "\nContent Length: %d", r.ContentLength) } - return strings.Join(request, "\n") + return sb.String() } // Execute is called by a Scheduler when the Trigger associated with this job fires. func (cu *CurlJob) Execute(ctx context.Context) error { - cu.Lock() + cu.mtx.Lock() cu.request = cu.request.WithContext(ctx) var err error cu.response, err = cu.httpClient.Do(cu.request) - if err == nil && cu.response.StatusCode >= 200 && cu.response.StatusCode < 400 { + // update job status based on HTTP response code + if cu.response != nil && cu.response.StatusCode >= http.StatusOK && + cu.response.StatusCode < http.StatusBadRequest { cu.jobStatus = StatusOK } else { cu.jobStatus = StatusFailure } - cu.Unlock() + cu.mtx.Unlock() if cu.callback != nil { cu.callback(ctx, cu) } - return nil + return err } diff --git a/job/function_job.go b/job/function_job.go index a4c3f70..fb53e17 100644 --- a/job/function_job.go +++ b/job/function_job.go @@ -8,81 +8,81 @@ import ( "github.com/reugn/go-quartz/quartz" ) -// Function represents an argument-less function which returns -// a generic type R and a possible error. +// Function represents a function which takes a [context.Context] as its +// only argument and returns a generic type R and a possible error. type Function[R any] func(context.Context) (R, error) -// FunctionJob represents a Job that invokes the passed Function, -// implements the quartz.Job interface. +// FunctionJob represents a Job that invokes the passed [Function], +// implements the [quartz.Job] interface. type FunctionJob[R any] struct { - sync.RWMutex - function *Function[R] - desc string - result *R - err error - jobStatus Status + mtx sync.RWMutex + function Function[R] + description string + result R + err error + jobStatus Status } var _ quartz.Job = (*FunctionJob[any])(nil) -// NewFunctionJob returns a new FunctionJob without an explicit description. +// NewFunctionJob returns a new [FunctionJob] with a generated description. func NewFunctionJob[R any](function Function[R]) *FunctionJob[R] { - return &FunctionJob[R]{ - function: &function, - desc: fmt.Sprintf("FunctionJob%s%p", quartz.Sep, &function), - jobStatus: StatusNA, - } + return NewFunctionJobWithDesc( + function, + fmt.Sprintf("FunctionJob%s%p", quartz.Sep, &function), + ) } -// NewFunctionJobWithDesc returns a new FunctionJob with an explicit description. -func NewFunctionJobWithDesc[R any](desc string, function Function[R]) *FunctionJob[R] { +// NewFunctionJobWithDesc returns a new [FunctionJob] with the specified +// description. +func NewFunctionJobWithDesc[R any](function Function[R], + description string) *FunctionJob[R] { return &FunctionJob[R]{ - function: &function, - desc: desc, - jobStatus: StatusNA, + function: function, + description: description, + jobStatus: StatusNA, } } // Description returns the description of the FunctionJob. func (f *FunctionJob[R]) Description() string { - return f.desc + return f.description } // Execute is called by a Scheduler when the Trigger associated with this job fires. -// It invokes the held function, setting the results in Result and Error members. +// It invokes the held function, setting the results in result and error members. func (f *FunctionJob[R]) Execute(ctx context.Context) error { - result, err := (*f.function)(ctx) - f.Lock() + result, err := f.function(ctx) + f.mtx.Lock() if err != nil { + var zero R f.jobStatus = StatusFailure - f.result = nil - f.err = err + f.result, f.err = zero, err } else { f.jobStatus = StatusOK - f.result = &result - f.err = nil + f.result, f.err = result, nil } - f.Unlock() + f.mtx.Unlock() return err } // Result returns the result of the FunctionJob. -func (f *FunctionJob[R]) Result() *R { - f.RLock() - defer f.RUnlock() +func (f *FunctionJob[R]) Result() R { + f.mtx.RLock() + defer f.mtx.RUnlock() return f.result } // Error returns the error of the FunctionJob. func (f *FunctionJob[R]) Error() error { - f.RLock() - defer f.RUnlock() + f.mtx.RLock() + defer f.mtx.RUnlock() return f.err } // JobStatus returns the status of the FunctionJob. func (f *FunctionJob[R]) JobStatus() Status { - f.RLock() - defer f.RUnlock() + f.mtx.RLock() + defer f.mtx.RUnlock() return f.jobStatus } diff --git a/job/function_job_test.go b/job/function_job_test.go index be4166e..b54e187 100644 --- a/job/function_job_test.go +++ b/job/function_job_test.go @@ -21,24 +21,28 @@ func TestFunctionJob(t *testing.T) { return "fired1", nil }) - funcJob2 := job.NewFunctionJob(func(_ context.Context) (int, error) { + funcJob2 := job.NewFunctionJob(func(_ context.Context) (*int, error) { atomic.AddInt32(&n, 2) - return 42, nil + result := 42 + return &result, nil }) sched := quartz.NewStdScheduler() sched.Start(ctx) - assert.IsNil(t, sched.ScheduleJob(quartz.NewJobDetail(funcJob1, quartz.NewJobKey("funcJob1")), + + assert.IsNil(t, sched.ScheduleJob(quartz.NewJobDetail(funcJob1, + quartz.NewJobKey("funcJob1")), quartz.NewRunOnceTrigger(time.Millisecond*300))) - assert.IsNil(t, sched.ScheduleJob(quartz.NewJobDetail(funcJob2, quartz.NewJobKey("funcJob2")), + assert.IsNil(t, sched.ScheduleJob(quartz.NewJobDetail(funcJob2, + quartz.NewJobKey("funcJob2")), quartz.NewRunOnceTrigger(time.Millisecond*800))) + time.Sleep(time.Second) assert.IsNil(t, sched.Clear()) sched.Stop() assert.Equal(t, funcJob1.JobStatus(), job.StatusOK) - assert.NotEqual(t, funcJob1.Result(), nil) - assert.Equal(t, *funcJob1.Result(), "fired1") + assert.Equal(t, funcJob1.Result(), "fired1") assert.Equal(t, funcJob2.JobStatus(), job.StatusOK) assert.NotEqual(t, funcJob2.Result(), nil) @@ -47,22 +51,22 @@ func TestFunctionJob(t *testing.T) { assert.Equal(t, int(atomic.LoadInt32(&n)), 6) } -func TestNewFunctionJobWithDesc(t *testing.T) { +func TestNewFunctionJob_WithDesc(t *testing.T) { jobDesc := "test job" - funcJob1 := job.NewFunctionJobWithDesc(jobDesc, func(_ context.Context) (string, error) { + funcJob1 := job.NewFunctionJobWithDesc(func(_ context.Context) (string, error) { return "fired1", nil - }) + }, jobDesc) - funcJob2 := job.NewFunctionJobWithDesc(jobDesc, func(_ context.Context) (string, error) { + funcJob2 := job.NewFunctionJobWithDesc(func(_ context.Context) (string, error) { return "fired2", nil - }) + }, jobDesc) assert.Equal(t, funcJob1.Description(), jobDesc) assert.Equal(t, funcJob2.Description(), jobDesc) } -func TestFunctionJobRespectsContext(t *testing.T) { +func TestFunctionJob_RespectsContext(t *testing.T) { var n int funcJob2 := job.NewFunctionJob(func(ctx context.Context) (bool, error) { timer := time.NewTimer(time.Hour) @@ -79,6 +83,7 @@ func TestFunctionJobRespectsContext(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + sig := make(chan struct{}) go func() { defer close(sig); _ = funcJob2.Execute(ctx) }() @@ -92,5 +97,5 @@ func TestFunctionJobRespectsContext(t *testing.T) { t.Fatal("job side effect should have reflected cancelation:", n) } assert.ErrorIs(t, funcJob2.Error(), context.Canceled) - assert.IsNil(t, funcJob2.Result()) + assert.Equal(t, funcJob2.Result(), false) } diff --git a/job/job_status.go b/job/job_status.go index 8c0cb77..48766a8 100644 --- a/job/job_status.go +++ b/job/job_status.go @@ -7,9 +7,9 @@ const ( // StatusNA is the initial Job status. StatusNA Status = iota - // StatusOK indicates the Job completed successfully. + // StatusOK indicates that the Job completed successfully. StatusOK - // StatusFailure indicates the Job failed. + // StatusFailure indicates that the Job failed. StatusFailure ) diff --git a/job/job_test.go b/job/job_test.go index f718be8..315f1ed 100644 --- a/job/job_test.go +++ b/job/job_test.go @@ -21,7 +21,7 @@ func TestMultipleExecution(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() var n int64 - job := job.NewIsolatedJob(job.NewFunctionJob(func(ctx context.Context) (bool, error) { + job1 := job.NewIsolatedJob(job.NewFunctionJob(func(ctx context.Context) (bool, error) { atomic.AddInt64(&n, 1) timer := time.NewTimer(time.Minute) defer timer.Stop() @@ -55,7 +55,7 @@ func TestMultipleExecution(t *testing.T) { case <-timer.C: // sleep for a jittered amount of // time, less than 11ms - _ = job.Execute(ctx) + _ = job1.Execute(ctx) case <-ctx.Done(): return case <-sig: @@ -66,7 +66,7 @@ func TestMultipleExecution(t *testing.T) { }() } - // check very often that we've only run one job + // confirm regularly that only a single job execution has occurred ticker := time.NewTicker(2 * time.Millisecond) loop: for i := 0; i < 1000; i++ { @@ -81,8 +81,7 @@ loop: } } - // stop all of the adding threads without canceling - // the context + // stop all the adding threads without canceling the context close(sig) if atomic.LoadInt64(&n) != 1 { t.Error("only one job should run") diff --git a/job/shell_job.go b/job/shell_job.go index 7fb1218..80ca5e6 100644 --- a/job/shell_job.go +++ b/job/shell_job.go @@ -11,10 +11,12 @@ import ( "github.com/reugn/go-quartz/quartz" ) -// ShellJob represents a shell command Job, implements the quartz.Job interface. -// Be aware of runtime.GOOS when sending shell commands for execution. +// ShellJob represents a shell command Job, implements the [quartz.Job] interface. +// The command will be executed using bash if available; otherwise, sh will be used. +// Consider the interpreter type and target environment when formulating commands +// for execution. type ShellJob struct { - sync.Mutex + mtx sync.Mutex cmd string exitCode int stdout string @@ -25,7 +27,7 @@ type ShellJob struct { var _ quartz.Job = (*ShellJob)(nil) -// NewShellJob returns a new ShellJob for the given command. +// NewShellJob returns a new [ShellJob] for the given command. func NewShellJob(cmd string) *ShellJob { return &ShellJob{ cmd: cmd, @@ -33,7 +35,7 @@ func NewShellJob(cmd string) *ShellJob { } } -// NewShellJobWithCallback returns a new ShellJob with the given callback function. +// NewShellJobWithCallback returns a new [ShellJob] with the given callback function. func NewShellJobWithCallback(cmd string, f func(context.Context, *ShellJob)) *ShellJob { return &ShellJob{ cmd: cmd, @@ -55,7 +57,7 @@ var ( func getShell() string { shellOnce.Do(func() { _, err := exec.LookPath("/bin/bash") - // if not found bash binary, use `sh`. + // if bash binary is not found, use `sh`. if err != nil { shellPath = "sh" } @@ -72,11 +74,10 @@ func (sh *ShellJob) Execute(ctx context.Context) error { cmd.Stdout = io.Writer(&stdout) cmd.Stderr = io.Writer(&stderr) - err := cmd.Run() + err := cmd.Run() // run the command - sh.Lock() - sh.stdout = stdout.String() - sh.stderr = stderr.String() + sh.mtx.Lock() + sh.stdout, sh.stderr = stdout.String(), stderr.String() sh.exitCode = cmd.ProcessState.ExitCode() if err != nil { @@ -84,38 +85,38 @@ func (sh *ShellJob) Execute(ctx context.Context) error { } else { sh.jobStatus = StatusOK } - sh.Unlock() + sh.mtx.Unlock() if sh.callback != nil { sh.callback(ctx, sh) } - return nil + return err } // ExitCode returns the exit code of the ShellJob. func (sh *ShellJob) ExitCode() int { - sh.Lock() - defer sh.Unlock() + sh.mtx.Lock() + defer sh.mtx.Unlock() return sh.exitCode } // Stdout returns the captured stdout output of the ShellJob. func (sh *ShellJob) Stdout() string { - sh.Lock() - defer sh.Unlock() + sh.mtx.Lock() + defer sh.mtx.Unlock() return sh.stdout } // Stderr returns the captured stderr output of the ShellJob. func (sh *ShellJob) Stderr() string { - sh.Lock() - defer sh.Unlock() + sh.mtx.Lock() + defer sh.mtx.Unlock() return sh.stderr } // JobStatus returns the status of the ShellJob. func (sh *ShellJob) JobStatus() Status { - sh.Lock() - defer sh.Unlock() + sh.mtx.Lock() + defer sh.mtx.Unlock() return sh.jobStatus }