Skip to content

Commit

Permalink
Merge pull request #1421 from diggerhq/feat/concurrency-limit-for-bat…
Browse files Browse the repository at this point in the history
…ch-jobs

added a limit on concurrency for jobs triggered
  • Loading branch information
ZIJ authored May 8, 2024
2 parents 4d6264a + 3819f23 commit 1d39127
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 34 deletions.
13 changes: 13 additions & 0 deletions backend/config/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package config

import (
"strings"
"time"

"github.com/spf13/viper"
Expand All @@ -9,12 +10,24 @@ import (
// Config represents an alias to viper config
type Config = viper.Viper

var DiggerConfig *Config

// New returns a new pointer to the config
func New() *Config {
v := viper.New()
v.SetEnvPrefix("DIGGER")
v.SetEnvKeyReplacer(strings.NewReplacer("-", "_"))
v.SetDefault("port", 3000)
v.SetDefault("usersvc_on", true)
v.SetDefault("build_date", "null")
v.SetDefault("deployed_at", time.Now().UTC().Format(time.RFC3339))
v.SetDefault("max_concurrency_per_batch", "0")
v.BindEnv()
return v
}

func init() {
cfg := New()
cfg.AutomaticEnv()
DiggerConfig = cfg
}
18 changes: 3 additions & 15 deletions backend/controllers/github.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/base64"
"encoding/json"
"fmt"
"github.com/diggerhq/digger/backend/services"
"log"
"math/rand"
"net/http"
Expand Down Expand Up @@ -716,7 +717,7 @@ func handleIssueCommentEvent(gh utils.GithubClientProvider, payload *github.Issu
}

func TriggerDiggerJobs(client *github.Client, repoOwner string, repoName string, batchId *uuid.UUID, prNumber int, prService *dg_github.GithubService) error {
batch, err := models.DB.GetDiggerBatch(batchId)
_, err := models.DB.GetDiggerBatch(batchId)
if err != nil {
log.Printf("failed to get digger batch, %v\n", err)
return fmt.Errorf("failed to get digger batch, %v\n", err)
Expand All @@ -738,23 +739,10 @@ func TriggerDiggerJobs(client *github.Client, repoOwner string, repoName string,
log.Printf("jobString: %v \n", jobString)

// TODO: make workflow file name configurable
err = utils.TriggerGithubWorkflow(client, repoOwner, repoName, job, jobString, *batch.CommentId)
err = services.ScheduleJob(client, repoOwner, repoName, batchId, &job)
if err != nil {
log.Printf("failed to trigger github workflow, %v\n", err)
return fmt.Errorf("failed to trigger github workflow, %v\n", err)
} else {

_, workflowRunUrl, err := utils.GetWorkflowIdAndUrlFromDiggerJobId(client, repoOwner, repoName, job.DiggerJobID)
if err != nil {
log.Printf("failed to find workflow url: %v\n", err)
}
job.Status = orchestrator_scheduler.DiggerJobTriggered
job.WorkflowRunUrl = &workflowRunUrl
err = models.DB.UpdateDiggerJob(&job)
if err != nil {
log.Printf("failed to Update digger job state: %v\n", err)
return fmt.Errorf("failed to Update digger job state: %v\n", err)
}
}
}
return nil
Expand Down
5 changes: 2 additions & 3 deletions backend/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"fmt"
"github.com/diggerhq/digger/backend/config"
"html/template"
"io/fs"
"log"
Expand All @@ -11,7 +12,6 @@ import (

"embed"

"github.com/alextanhongpin/go-gin-starter/config"
"github.com/diggerhq/digger/backend/controllers"
"github.com/diggerhq/digger/backend/middleware"
"github.com/diggerhq/digger/backend/models"
Expand All @@ -31,8 +31,7 @@ var templates embed.FS
func main() {

initLogging()
cfg := config.New()
cfg.AutomaticEnv()
cfg := config.DiggerConfig
web := controllers.WebController{Config: cfg}

if err := sentry.Init(sentry.ClientOptions{
Expand Down
30 changes: 30 additions & 0 deletions backend/models/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,36 @@ func (db *Database) GetDiggerJobsForBatch(batchId uuid.UUID) ([]DiggerJob, error
return jobs, nil
}

func (db *Database) GetDiggerJobsForBatchWithStatus(batchId uuid.UUID, status []scheduler.DiggerJobStatus) ([]DiggerJob, error) {
jobs := make([]DiggerJob, 0)

var where *gorm.DB
where = db.GormDB.Where("digger_jobs.batch_id = ?", batchId).Where("status IN ?", status)

result := where.Preload("Batch").Preload("DiggerJobSummary").Find(&jobs)
if result.Error != nil {
if !errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, result.Error
}
}
return jobs, nil
}

func (db *Database) GetDiggerJobsWithStatus(status scheduler.DiggerJobStatus) ([]DiggerJob, error) {
jobs := make([]DiggerJob, 0)

var where *gorm.DB
where = db.GormDB.Where("status = ?", status)

result := where.Preload("Batch").Find(&jobs)
if result.Error != nil {
if !errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, result.Error
}
}
return jobs, nil
}

func (db *Database) GetPendingParentDiggerJobs(batchId *uuid.UUID) ([]DiggerJob, error) {
jobs := make([]DiggerJob, 0)

Expand Down
49 changes: 39 additions & 10 deletions backend/services/scheduler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package services

import (
"fmt"
"github.com/diggerhq/digger/backend/config"
"github.com/diggerhq/digger/backend/models"
"github.com/diggerhq/digger/backend/utils"
orchestrator_scheduler "github.com/diggerhq/digger/libs/orchestrator/scheduler"
Expand Down Expand Up @@ -42,43 +44,70 @@ func DiggerJobCompleted(client *github.Client, batchId *uuid.UUID, parentJob *mo
if err != nil {
return err
}
TriggerJob(client, repoOwner, repoName, batchId, job)
ScheduleJob(client, repoOwner, repoName, batchId, job)
}

}
return nil
}

func TriggerJob(client *github.Client, repoOwner string, repoName string, batchId *uuid.UUID, job *models.DiggerJob) {
func ScheduleJob(client *github.Client, repoOwner string, repoName string, batchId *uuid.UUID, job *models.DiggerJob) error {
maxConcurrencyForBatch := config.DiggerConfig.GetInt("max_concurrency_per_batch")
if maxConcurrencyForBatch == 0 {
// concurrency limits not set
TriggerJob(client, repoOwner, repoName, batchId, job)
} else {
// concurrency limits set
log.Printf("Scheduling job with concurrency limit: %v per batch", maxConcurrencyForBatch)
jobs, err := models.DB.GetDiggerJobsForBatchWithStatus(*batchId, []orchestrator_scheduler.DiggerJobStatus{
orchestrator_scheduler.DiggerJobTriggered,
orchestrator_scheduler.DiggerJobStarted,
})
if err != nil {
log.Printf("GetDiggerJobsForBatchWithStatus err: %v\n", err)
return err
}
log.Printf("Length of jobs: %v", len(jobs))
if len(jobs) >= maxConcurrencyForBatch {
log.Printf("max concurrency for jobs reached: %v, queuing until more jobs succeed", len(jobs))
job.Status = orchestrator_scheduler.DiggerJobQueuedForRun
models.DB.UpdateDiggerJob(job)
return nil
} else {
TriggerJob(client, repoOwner, repoName, batchId, job)
}
}
return nil
}

func TriggerJob(client *github.Client, repoOwner string, repoName string, batchId *uuid.UUID, job *models.DiggerJob) error {
log.Printf("TriggerJob jobId: %v", job.DiggerJobID)

batch, err := models.DB.GetDiggerBatch(batchId)
if err != nil {
log.Printf("TriggerJob err: %v\n", err)
return
return err
}

if job.SerializedJobSpec == nil {
log.Printf("GitHub job can't be nil")
return fmt.Errorf("JobSpec is nil, skipping")
}
jobString := string(job.SerializedJobSpec)
log.Printf("jobString: %v \n", jobString)

err = utils.TriggerGithubWorkflow(client, repoOwner, repoName, *job, jobString, *batch.CommentId)
if err != nil {
log.Printf("TriggerJob err: %v\n", err)
return
}

_, workflowRunUrl, err := utils.GetWorkflowIdAndUrlFromDiggerJobId(client, repoOwner, repoName, job.DiggerJobID)
if err != nil {
log.Printf("failed to find workflow url: %v\n", err)
return err
}

job.Status = orchestrator_scheduler.DiggerJobTriggered
job.WorkflowRunUrl = &workflowRunUrl
err = models.DB.UpdateDiggerJob(job)
if err != nil {
log.Printf("failed to Update digger job state: %v\n", err)
return err
}

return nil
}
24 changes: 23 additions & 1 deletion backend/tasks/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package main

import (
"github.com/diggerhq/digger/backend/models"
"github.com/diggerhq/digger/backend/services"
"github.com/diggerhq/digger/backend/utils"
"github.com/diggerhq/digger/libs/orchestrator/scheduler"
"github.com/robfig/cron"
"log"
"os"
Expand All @@ -21,7 +23,7 @@ func main() {
c := cron.New()

// RunQueues state machine
c.AddFunc("* * * * *", func() {
c.AddFunc("0 * * * * *", func() {
runQueues, err := models.DB.GetFirstRunQueueForEveryProject()
if err != nil {
log.Printf("Error fetching Latest queueItem runs: %v", err)
Expand All @@ -43,6 +45,26 @@ func main() {
}
})

// Triggered queued jobs for a batch
c.AddFunc("30 * * * * *", func() {
jobs, err := models.DB.GetDiggerJobsWithStatus(scheduler.DiggerJobQueuedForRun)
if err != nil {
log.Printf("Failed to get Jobs %v", err)
}
for _, job := range jobs {
batch := job.Batch
repoFullName := batch.RepoFullName
repoName := batch.RepoName
repoOwner := batch.RepoOwner
githubInstallationid := batch.GithubInstallationId
service, _, err := utils.GetGithubService(&utils.DiggerGithubRealClientProvider{}, githubInstallationid, repoFullName, repoOwner, repoName)
if err != nil {
log.Printf("Failed to get github service: %v", err)
}
services.ScheduleJob(service.Client, repoOwner, repoName, &batch.ID, &job)
}
})

// Start the Cron job scheduler
c.Start()

Expand Down
2 changes: 2 additions & 0 deletions go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,8 @@ github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
github.com/schollz/closestmatch v2.1.0+incompatible h1:Uel2GXEpJqOWBrlyI+oY9LTiyyjYS17cCYRqP13/SHk=
github.com/schollz/closestmatch v2.1.0+incompatible/go.mod h1:RtP1ddjLong6gTkbtmuhtR2uUrrJOpYzYRvbcPAid+g=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I=
github.com/segmentio/conf v1.2.0 h1:5OT9+6OyVHLsFLsiJa/2KlqiA1m7mpdUBlkB/qYTMts=
github.com/segmentio/conf v1.2.0/go.mod h1:Y3B9O/PqqWqjyxyWWseyj/quPEtMu1zDp/kVbSWWaB0=
github.com/shopspring/decimal v1.2.0 h1:abSATXmQEYyShuxI4/vyW3tV1MrKAJzCZ/0zLUXYbsQ=
github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5IYyJwS/kOiWx8mHo=
github.com/slack-go/slack v0.10.3 h1:kKYwlKY73AfSrtAk9UHWCXXfitudkDztNI9GYBviLxw=
Expand Down
15 changes: 10 additions & 5 deletions libs/orchestrator/scheduler/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ const (
type DiggerJobStatus int8

const (
DiggerJobCreated DiggerJobStatus = 1
DiggerJobTriggered DiggerJobStatus = 2
DiggerJobFailed DiggerJobStatus = 3
DiggerJobStarted DiggerJobStatus = 4
DiggerJobSucceeded DiggerJobStatus = 5
DiggerJobCreated DiggerJobStatus = 1
DiggerJobTriggered DiggerJobStatus = 2
DiggerJobFailed DiggerJobStatus = 3
DiggerJobStarted DiggerJobStatus = 4
DiggerJobSucceeded DiggerJobStatus = 5
DiggerJobQueuedForRun DiggerJobStatus = 6
)

func (d *DiggerJobStatus) ToString() string {
Expand All @@ -46,6 +47,8 @@ func (d *DiggerJobStatus) ToString() string {
return "running"
case DiggerJobCreated:
return "created"
case DiggerJobQueuedForRun:
return "created"
default:
return "unknown status"
}
Expand All @@ -63,6 +66,8 @@ func (d *DiggerJobStatus) ToEmoji() string {
return ":arrows_counterclockwise:"
case DiggerJobCreated:
return ":clock11:"
case DiggerJobQueuedForRun:
return ":clock11:"
default:
return ":question:"
}
Expand Down

0 comments on commit 1d39127

Please sign in to comment.