diff --git a/cmd/influxd/launcher/launcher.go b/cmd/influxd/launcher/launcher.go index e03e514144f..ab81d6071a4 100644 --- a/cmd/influxd/launcher/launcher.go +++ b/cmd/influxd/launcher/launcher.go @@ -453,7 +453,7 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) { query.QueryServiceBridge{AsyncQueryService: m.queryController}, ) - executor, executorMetrics := executor.NewExecutor( + executorInstance, executorMetrics := executor.NewExecutor( m.log.With(zap.String("service", "task-executor")), query.QueryServiceBridge{AsyncQueryService: m.queryController}, ts.UserService, @@ -461,11 +461,12 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) { combinedTaskService, executor.WithFlagger(m.flagger), ) - err = executor.LoadExistingScheduleRuns(ctx) + err = executorInstance.LoadExistingScheduleRuns(ctx) if err != nil { m.log.Fatal("could not load existing scheduled runs", zap.Error(err)) } - m.executor = executor + executorInstance.SetLimitFunc(executor.ConcurrencyLimit(executorInstance, fluxlang.DefaultService)) + m.executor = executorInstance m.reg.MustRegister(executorMetrics.PrometheusCollectors()...) schLogger := m.log.With(zap.String("service", "task-scheduler")) @@ -476,7 +477,7 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) { err error ) sch, sm, err = scheduler.NewScheduler( - executor, + executorInstance, taskbackend.NewSchedulableTaskService(m.kvService), scheduler.WithOnErrorFn(func(ctx context.Context, taskID scheduler.ID, scheduledAt time.Time, err error) { schLogger.Info( @@ -505,7 +506,7 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) { taskCoord := coordinator.NewCoordinator( coordLogger, sch, - executor) + executorInstance) taskSvc = middleware.New(combinedTaskService, taskCoord) if err := taskbackend.TaskNotifyCoordinatorOfExisting( @@ -514,7 +515,7 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) { combinedTaskService, taskCoord, func(ctx context.Context, taskID platform2.ID, runID platform2.ID) error { - _, err := executor.ResumeCurrentRun(ctx, taskID, runID) + _, err := executorInstance.ResumeCurrentRun(ctx, taskID, runID) return err }, coordLogger); err != nil {