Skip to content

Commit

Permalink
Merge pull request #213 from datarootsio/adding-in-progress-status
Browse files Browse the repository at this point in the history
Adding in progress status
  • Loading branch information
bart6114 authored Oct 4, 2024
2 parents 4e4ef3f + 06a6c06 commit 6e95c59
Show file tree
Hide file tree
Showing 10 changed files with 219 additions and 62 deletions.
12 changes: 6 additions & 6 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
- uses: actions/setup-go@v5
with:
go-version: "^1.20"
- uses: golangci/golangci-lint-action@v5.1.0
- uses: golangci/golangci-lint-action@v6.1.1

tests:
runs-on: ubuntu-latest
Expand Down Expand Up @@ -46,7 +46,7 @@ jobs:
- name: Bump version and push tag
if: github.ref == 'refs/heads/main'
id: tag_action
uses: anothrNick/github-tag-action@1.67.0
uses: anothrNick/github-tag-action@1.71.0
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
WITH_V: true
Expand Down Expand Up @@ -91,20 +91,20 @@ jobs:
- run: cp ${{ matrix.goos }}/${{ matrix.goarch }}/cheek ${{ matrix.goos }}/${{ matrix.goarch }}/cheek-${{ needs.version_tag.outputs.new_tag }}
## upload binary to google storage
- id: auth
uses: google-github-actions/[email protected].2
uses: google-github-actions/[email protected].6
with:
credentials_json: ${{ secrets.gcp_credentials_cheek }}
- id: upload-files
uses: google-github-actions/upload-cloud-storage@v2.1.0
uses: google-github-actions/upload-cloud-storage@v2.2.0
with:
path: ${{ matrix.goos }}/${{ matrix.goarch }}/cheek-${{ steps.vars.outputs.sha_short }}
destination: cheek-scheduler/${{ matrix.goos }}/${{ matrix.goarch }}/
- uses: google-github-actions/upload-cloud-storage@v1.0.3
- uses: google-github-actions/upload-cloud-storage@v2.2.0
if: github.ref == 'refs/heads/main'
with:
path: ${{ matrix.goos }}/${{ matrix.goarch }}/cheek-${{ needs.version_tag.outputs.new_tag }}
destination: cheek-scheduler/${{ matrix.goos }}/${{ matrix.goarch }}/
- uses: google-github-actions/upload-cloud-storage@v1.0.3
- uses: google-github-actions/upload-cloud-storage@v2.2.0
if: github.ref == 'refs/heads/main'
with:
path: ${{ matrix.goos }}/${{ matrix.goarch }}/cheek
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ toolchain go1.22.4
require github.com/stretchr/testify v1.9.0

require (
github.com/adhocore/gronx v1.8.1
github.com/adhocore/gronx v1.19.1
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
github.com/rs/zerolog v1.33.0
gopkg.in/yaml.v3 v3.0.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
github.com/adhocore/gronx v1.8.1 h1:F2mLTG5sB11z7vplwD4iydz3YCEjstSfYmCrdSm3t6A=
github.com/adhocore/gronx v1.8.1/go.mod h1:7oUY1WAU8rEJWmAxXR2DN0JaO4gi9khSgKjiRypqteg=
github.com/adhocore/gronx v1.19.1 h1:S4c3uVp5jPjnk00De0lslyTenGJ4nA3Ydbkj1SbdPVc=
github.com/adhocore/gronx v1.19.1/go.mod h1:7oUY1WAU8rEJWmAxXR2DN0JaO4gi9khSgKjiRypqteg=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
5 changes: 3 additions & 2 deletions pkg/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ func OpenDB(dbPath string) (*sqlx.DB, error) {

func InitDB(db *sqlx.DB) error {
_, err := db.Exec(`CREATE TABLE IF NOT EXISTS log (
id INTEGER PRIMARY KEY,
id INTEGER PRIMARY KEY AUTOINCREMENT,
job TEXT,
triggered_at DATETIME DEFAULT CURRENT_TIMESTAMP,
triggered_by TEXT,
duration INTEGER,
status INTEGER,
message TEXT
message TEXT,
UNIQUE(job, triggered_at, triggered_by)
)`)
if err != nil {
return fmt.Errorf("create log table: %w", err)
Expand Down
4 changes: 2 additions & 2 deletions pkg/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ func getScheduleStatus(s *Schedule) httprouter.Handle {
for _, j := range s.Jobs {
j.loadRunsFromDb(1, false)
lastRunStatus := j.Runs[0].Status
ssr.Status[j.Name] = lastRunStatus
if lastRunStatus == 1 {
ssr.Status[j.Name] = *lastRunStatus
if *lastRunStatus == 1 {
ssr.FailedRunCount++
}
}
Expand Down
132 changes: 100 additions & 32 deletions pkg/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ import (
"gopkg.in/yaml.v3"
)

// Global status constants
const (
StatusOK int = 0
StatusError int = -1
)

// OnEvent contains specs on what needs to happen after a job event.
type OnEvent struct {
TriggerJob []string `yaml:"trigger_job,omitempty" json:"trigger_job,omitempty"`
Expand Down Expand Up @@ -46,8 +52,8 @@ type JobSpec struct {

// JobRun holds information about a job execution.
type JobRun struct {
LogEntryId int `json:"id,omitempty" db:"id"`
Status int `json:"status" db:"status,omitempty"`
LogEntryId int `json:"id,omitempty" db:"id"`
Status *int `json:"status,omitempty" db:"status,omitempty"`
logBuf bytes.Buffer
Log string `json:"log" db:"message"`
Name string `json:"name" db:"job"`
Expand All @@ -62,12 +68,39 @@ func (jr *JobRun) flushLogBuffer() {
jr.Log = jr.logBuf.String()
}

func (j *JobSpec) setup(trigger string) JobRun {
// Initialize the JobRun before executing the command
jr := JobRun{
Name: j.Name,
TriggeredAt: j.now(),
TriggeredBy: trigger,
Status: nil,
jobRef: j,
}

// Log the job run immediately to the database to mark the job as started
jr.logToDb()

return jr
}

func (jr *JobRun) logToDb() {
if jr.jobRef.cfg.DB == nil {
jr.jobRef.log.Warn().Str("job", jr.Name).Msg("No db connection, not saving job log to db.")
return
}
_, err := jr.jobRef.cfg.DB.Exec("INSERT INTO log (job, triggered_at, triggered_by, duration, status, message) VALUES (?, ?, ?, ?, ?, ?)", jr.Name, jr.TriggeredAt, jr.TriggeredBy, jr.Duration, jr.Status, jr.Log)

// Perform an UPSERT (insert or update)
_, err := jr.jobRef.cfg.DB.Exec(`
INSERT INTO log (job,triggered_at ,triggered_by, duration, status, message)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(job, triggered_at, triggered_by) DO UPDATE SET
duration = excluded.duration,
status = excluded.status,
message = excluded.message;
`,
jr.Name, jr.TriggeredAt, jr.TriggeredBy, jr.Duration, jr.Status, jr.Log)

if err != nil {
if jr.jobRef.globalSchedule != nil {
jr.jobRef.globalSchedule.log.Warn().Str("job", jr.Name).Err(err).Msg("Couldn't save job log to db.")
Expand All @@ -91,26 +124,35 @@ func (j *JobSpec) execCommandWithRetry(trigger string) JobRun {
var jr JobRun
const timeOut = 5 * time.Second

for tries < j.Retries+1 {
// Initialize the JobRun with the first trigger
jr = j.setup(trigger)

for tries < j.Retries+1 {
switch {
case tries == 0:
jr = j.execCommand(trigger)
// First attempt with the original trigger
jr = j.execCommand(jr, trigger)
default:
jr = j.execCommand(fmt.Sprintf("%s[retry=%v]", trigger, tries))
// On retries, update the trigger with retry count and rerun
jr = j.execCommand(jr, fmt.Sprintf("%s[retry=%d]", trigger, tries))
}

// finalise logging etc
// Finalize logging, etc.
j.finalize(&jr)

if jr.Status == 0 {
if *jr.Status == StatusOK {
// Exit if the job succeeded (Status 0)
break
}
j.log.Debug().Str("job", j.Name).Int("exitcode", jr.Status).Msgf("job exited unsuccessfully, launching retry after %v timeout.", timeOut)

// Log the unsuccessful attempt and retry
j.log.Debug().Str("job", j.Name).Int("exitcode", *jr.Status).Msgf("job exited unsuccessfully, launching retry after %v timeout.", timeOut)

// Increment the attempt counter
tries++
time.Sleep(timeOut)

}

return jr
}

Expand All @@ -122,11 +164,8 @@ func (j JobSpec) now() time.Time {
return time.Now()
}

func (j *JobSpec) execCommand(trigger string) JobRun {
func (j *JobSpec) execCommand(jr JobRun, trigger string) JobRun {
j.log.Info().Str("job", j.Name).Str("trigger", trigger).Msgf("Job triggered")
// init status to non-zero until execution says otherwise
jr := JobRun{Name: j.Name, TriggeredAt: j.now(), TriggeredBy: trigger, Status: -1, jobRef: j}

suppressLogs := j.cfg.SuppressLogs

var cmd *exec.Cmd
Expand All @@ -138,14 +177,17 @@ func (j *JobSpec) execCommand(trigger string) JobRun {
if !suppressLogs {
fmt.Println(err.Error())
}
errStatus := StatusError
jr.Status = &errStatus // Set failure status when no command is specified

return jr
case 1:
cmd = exec.Command(j.Command[0])
default:
cmd = exec.Command(j.Command[0], j.Command[1:]...)
}

// add env vars
// Add env vars
cmd.Env = os.Environ()
for k, v := range j.Env {
cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", k, v))
Expand All @@ -161,37 +203,58 @@ func (j *JobSpec) execCommand(trigger string) JobRun {
w = io.MultiWriter(os.Stdout, &jr.logBuf)
}

// merge stdout and stderr to same writer
// Merge stdout and stderr to same writer
cmd.Stdout = w
cmd.Stderr = w

// Start command execution
err := cmd.Start()
if err != nil {
// Existing logging logic
if !suppressLogs {
fmt.Println(err.Error())
}
j.log.Warn().Str("job", j.Name).Str("trigger", trigger).Int("exitcode", jr.Status).Err(err).Msg("job unable to start")
// also send this to terminal output
_, err = w.Write([]byte(fmt.Sprintf("job unable to start: %v", err.Error())))
if err != nil {
j.log.Debug().Str("job", j.Name).Err(err).Msg("can't write to log buffer")
}

// Log the initial error and set the exit code
exitCode := StatusError
j.log.Warn().Str("job", j.Name).Str("trigger", trigger).Int("exitcode", exitCode).Err(err).Msg("job unable to start")

// Also send this to terminal output
logMessage := fmt.Sprintf("job unable to start: %v", err.Error())
_, writeErr := w.Write([]byte(logMessage)) // Ensure we log this message
if writeErr != nil {
j.log.Debug().Str("job", j.Name).Err(writeErr).Msg("can't write to log buffer")
}
jr.Log = logMessage // Capture log message to jr.Log
jr.Status = &exitCode // Set the exit code in the job result
return jr
}

// Wait for the command to finish and check for errors
if err := cmd.Wait(); err != nil {
if exitError, ok := err.(*exec.ExitError); ok {
jr.Status = exitError.ExitCode()
j.log.Warn().Str("job", j.Name).Msgf("Exit code %v", exitError.ExitCode())
}
// Get the exact exit code from ExitError
exitCode := exitError.ExitCode()
jr.Status = &exitCode // Set the exit code in the job result
j.log.Warn().Str("job", j.Name).Msgf("Exit code: %d", exitCode)
jr.Log += fmt.Sprintf("Exit code: %d\n", exitCode)

return jr
} else {
// Handle unexpected errors
exitCode := StatusError
j.log.Error().Str("job", j.Name).Err(err).Msg("unexpected error during command execution")
jr.Status = &exitCode
return jr
}
} else {
// No error, command exited successfully
StatusCode := StatusOK
jr.Status = &StatusCode // Command succeeded, set exit code 0
}

jr.Duration = time.Duration(time.Since(jr.TriggeredAt).Milliseconds())
jr.Status = 0
j.log.Debug().Str("job", j.Name).Int("exitcode", jr.Status).Msgf("job exited status: %v", jr.Status)

j.log.Debug().Str("job", j.Name).Int("exitcode", *jr.Status).Msgf("job exited with status: %d", *jr.Status)

return jr
}
Expand Down Expand Up @@ -272,7 +335,7 @@ func (j *JobSpec) OnEvent(jr *JobRun) {
var webhooksToCall []string
var slackWebhooksToCall []string

switch jr.Status == 0 {
switch *jr.Status == StatusOK {
case true: // after success
jobsToTrigger = j.OnSuccess.TriggerJob
webhooksToCall = j.OnSuccess.NotifyWebhook
Expand Down Expand Up @@ -352,12 +415,17 @@ func (j JobSpec) ToYAML(includeRuns bool) (string, error) {
func RunJob(log zerolog.Logger, cfg Config, scheduleFn string, jobName string) (JobRun, error) {
s, err := loadSchedule(log, cfg, scheduleFn)
if err != nil {
fmt.Printf("error loading schedule: %s\n", err)
os.Exit(1)
log.Error().Err(err).Msgf("error loading schedule: %s", scheduleFn)
return JobRun{}, fmt.Errorf("failed to load schedule: %w", err)
}

for _, job := range s.Jobs {
if job.Name == jobName {
jr := job.execCommand("manual")
// Use the setup function to create a JobRun instance
jr := job.setup("manual")

// Execute the command with the initialized JobRun and the trigger string
jr = job.execCommand(jr, "manual")
job.finalize(&jr)
return jr, nil
}
Expand Down
Loading

0 comments on commit 6e95c59

Please sign in to comment.