Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Support managed by external controller #2203

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
a0def9f
Introduce ManagedBy field to RunPolicy that is used by each Kubeflow …
mszadkow Aug 8, 2024
c11c351
Update Kubeflow JOb manifests
mszadkow Aug 8, 2024
3843250
Update Kubeflow Jobs Reconcile to use ManagedBy field to decide if sk…
mszadkow Aug 8, 2024
d8cc545
job controller test
mszadkow Aug 9, 2024
2440253
spec validation webhook
mszadkow Aug 9, 2024
cd342b3
add manageBy maxLenght const
mszadkow Aug 9, 2024
f245891
generate new manifest
mszadkow Aug 9, 2024
4f7694a
revert webhook formatting
mszadkow Aug 13, 2024
cc28b5e
Move allowed controllers constants in one place
mszadkow Aug 13, 2024
d94a23c
Make validatation for allowed managedBy values
mszadkow Aug 13, 2024
6f471d6
Update after controllers constants move
mszadkow Aug 13, 2024
4c5565d
Update jobs controller tests
mszadkow Aug 13, 2024
d17a9b9
Update validateManagedBy webhook
mszadkow Aug 14, 2024
5544a5b
Remove validation for the length of ManagedBy field
mszadkow Aug 28, 2024
e086a9e
Update after code review
mszadkow Sep 4, 2024
52d4e46
Update ManagedBy comment
mszadkow Sep 10, 2024
3870734
E2E tests for managedBy
mszadkow Sep 11, 2024
9b35574
Update generated files and manifests
mszadkow Sep 11, 2024
ada29bc
Rework after code review
mszadkow Sep 11, 2024
de5d02b
Revert kustomization change
mszadkow Sep 12, 2024
7d3bb89
Update job_test and logging
mszadkow Sep 12, 2024
353d505
Provide immutability check for ManagedBy
mszadkow Sep 12, 2024
80576f6
Avoid making copy of runPolicy
mszadkow Sep 12, 2024
18a5313
Split RunPolicy validators to Update and Create
mszadkow Sep 13, 2024
2f920a8
Fix the naming and call validate always
mszadkow Sep 13, 2024
8e8a637
Update tests
mszadkow Sep 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/api/kubeflow.org_v1_generated.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,14 @@ Suspending a Job will reset the StartTime field of the Job.


Defaults to false.
| *`managedBy`* __string__ | ManagedBy is used to indicate the controller or entity that manages a job.
The value must be either an empty, 'kubeflow.org/training-operator' or
'kueue.x-k8s.io/multikueue'.
The training-operator reconciles a job which doesn't have this
field at all or the field value is the reserved string
'kubeflow.org/training-operator', but delegates reconciling the job
with 'kueue.x-k8s.io/multikueue' to the Kueue.
The field is immutable.
|===


Expand Down
4 changes: 4 additions & 0 deletions hack/python-sdk/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,10 @@
"description": "CleanPodPolicy defines the policy to kill pods after the job completes. Default to None.",
"type": "string"
},
"managedBy": {
"description": "ManagedBy is used to indicate the controller or entity that manages a job. The value must be either an empty, 'kubeflow.org/training-operator' or 'kueue.x-k8s.io/multikueue'. The training-operator reconciles a job which doesn't have this field at all or the field value is the reserved string 'kubeflow.org/training-operator', but delegates reconciling the job with 'kueue.x-k8s.io/multikueue' to the Kueue. The field is immutable.",
"type": "string"
},
"schedulingPolicy": {
"description": "SchedulingPolicy defines the policy related to scheduling, e.g. gang-scheduling",
"$ref": "#/definitions/kubeflow.org.v1.SchedulingPolicy"
Expand Down
10 changes: 10 additions & 0 deletions manifests/base/crds/kubeflow.org_jaxjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7306,6 +7306,16 @@ spec:
CleanPodPolicy defines the policy to kill pods after the job completes.
Default to None.
type: string
managedBy:
description: |-
ManagedBy is used to indicate the controller or entity that manages a job.
The value must be either an empty, 'kubeflow.org/training-operator' or
'kueue.x-k8s.io/multikueue'.
The training-operator reconciles a job which doesn't have this
field at all or the field value is the reserved string
'kubeflow.org/training-operator', but delegates reconciling the job
with 'kueue.x-k8s.
type: string
schedulingPolicy:
description: SchedulingPolicy defines the policy related to scheduling,
e.g. gang-scheduling
Expand Down
10 changes: 10 additions & 0 deletions manifests/base/crds/kubeflow.org_mpijobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7311,6 +7311,16 @@ spec:
CleanPodPolicy defines the policy to kill pods after the job completes.
Default to None.
type: string
managedBy:
description: |-
ManagedBy is used to indicate the controller or entity that manages a job.
The value must be either an empty, 'kubeflow.org/training-operator' or
'kueue.x-k8s.io/multikueue'.
The training-operator reconciles a job which doesn't have this
field at all or the field value is the reserved string
'kubeflow.org/training-operator', but delegates reconciling the job
with 'kueue.x-k8s.
type: string
schedulingPolicy:
description: SchedulingPolicy defines the policy related to scheduling,
e.g. gang-scheduling
Expand Down
10 changes: 10 additions & 0 deletions manifests/base/crds/kubeflow.org_paddlejobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7793,6 +7793,16 @@ spec:
CleanPodPolicy defines the policy to kill pods after the job completes.
Default to None.
type: string
managedBy:
description: |-
ManagedBy is used to indicate the controller or entity that manages a job.
The value must be either an empty, 'kubeflow.org/training-operator' or
'kueue.x-k8s.io/multikueue'.
The training-operator reconciles a job which doesn't have this
field at all or the field value is the reserved string
'kubeflow.org/training-operator', but delegates reconciling the job
with 'kueue.x-k8s.
type: string
schedulingPolicy:
description: SchedulingPolicy defines the policy related to scheduling,
e.g. gang-scheduling
Expand Down
10 changes: 10 additions & 0 deletions manifests/base/crds/kubeflow.org_pytorchjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7830,6 +7830,16 @@ spec:
CleanPodPolicy defines the policy to kill pods after the job completes.
Default to None.
type: string
managedBy:
description: |-
ManagedBy is used to indicate the controller or entity that manages a job.
The value must be either an empty, 'kubeflow.org/training-operator' or
'kueue.x-k8s.io/multikueue'.
The training-operator reconciles a job which doesn't have this
field at all or the field value is the reserved string
'kubeflow.org/training-operator', but delegates reconciling the job
with 'kueue.x-k8s.
type: string
schedulingPolicy:
description: SchedulingPolicy defines the policy related to scheduling,
e.g. gang-scheduling
Expand Down
10 changes: 10 additions & 0 deletions manifests/base/crds/kubeflow.org_tfjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,16 @@ spec:
CleanPodPolicy defines the policy to kill pods after the job completes.
Default to None.
type: string
managedBy:
description: |-
ManagedBy is used to indicate the controller or entity that manages a job.
The value must be either an empty, 'kubeflow.org/training-operator' or
'kueue.x-k8s.io/multikueue'.
The training-operator reconciles a job which doesn't have this
field at all or the field value is the reserved string
'kubeflow.org/training-operator', but delegates reconciling the job
with 'kueue.x-k8s.
type: string
schedulingPolicy:
description: SchedulingPolicy defines the policy related to scheduling,
e.g. gang-scheduling
Expand Down
10 changes: 10 additions & 0 deletions manifests/base/crds/kubeflow.org_xgboostjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,16 @@ spec:
CleanPodPolicy defines the policy to kill pods after the job completes.
Default to None.
type: string
managedBy:
description: |-
ManagedBy is used to indicate the controller or entity that manages a job.
The value must be either an empty, 'kubeflow.org/training-operator' or
'kueue.x-k8s.io/multikueue'.
The training-operator reconciles a job which doesn't have this
field at all or the field value is the reserved string
'kubeflow.org/training-operator', but delegates reconciling the job
with 'kueue.x-k8s.
type: string
schedulingPolicy:
description: SchedulingPolicy defines the policy related to scheduling,
e.g. gang-scheduling
Expand Down
16 changes: 16 additions & 0 deletions pkg/apis/kubeflow.org/v1/common_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ const (

// JobRoleLabel represents the label key for the job role, e.g. master.
JobRoleLabel = "training.kubeflow.org/job-role"

// KubeflowJobsController represents the value of the default jobs controller
KubeflowJobsController = "kubeflow.org/training-operator"

// MultiKueueController represents the MultiKueue controller
MultiKueueController = "kueue.x-k8s.io/multikueue"
)

// JobStatus represents the current observed state of the training Job.
Expand Down Expand Up @@ -221,6 +227,16 @@ type RunPolicy struct {
// +kubebuilder:default:=false
// +optional
Suspend *bool `json:"suspend,omitempty"`

// ManagedBy is used to indicate the controller or entity that manages a job.
// The value must be either an empty, 'kubeflow.org/training-operator' or
// 'kueue.x-k8s.io/multikueue'.
// The training-operator reconciles a job which doesn't have this
// field at all or the field value is the reserved string
// 'kubeflow.org/training-operator', but delegates reconciling the job
// with 'kueue.x-k8s.io/multikueue' to the Kueue.
// The field is immutable.
ManagedBy *string `json:"managedBy,omitempty"`
}

// SchedulingPolicy encapsulates various scheduling policies of the distributed training
Expand Down
7 changes: 7 additions & 0 deletions pkg/apis/kubeflow.org/v1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions pkg/apis/kubeflow.org/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 32 additions & 0 deletions pkg/common/util/webhooks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package util

import (
v1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"

apivalidation "k8s.io/apimachinery/pkg/api/validation"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/validation/field"
)

var supportedJobControllers = sets.New(
v1.MultiKueueController,
v1.KubeflowJobsController)

func ValidateRunPolicy(runPolicy *v1.RunPolicy) field.ErrorList {
errs := field.ErrorList{}
if runPolicy.ManagedBy != nil {
manager := *runPolicy.ManagedBy
if !supportedJobControllers.Has(manager) {
fieldPath := field.NewPath("spec", "runPolicy", "managedBy")
errs = append(errs, field.NotSupported(fieldPath, manager, supportedJobControllers.UnsortedList()))
}
mszadkow marked this conversation as resolved.
Show resolved Hide resolved
}
return errs
}

func ValidateRunPolicyUpdate(oldRunPolicy, newRunPolicy *v1.RunPolicy) field.ErrorList {
oldManager := oldRunPolicy.ManagedBy
newManager := newRunPolicy.ManagedBy
fieldPath := field.NewPath("spec", "runPolicy", "managedBy")
return apivalidation.ValidateImmutableField(newManager, oldManager, fieldPath)
}
7 changes: 7 additions & 0 deletions pkg/controller.v1/common/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,3 +455,10 @@ func (jc *JobController) CleanupJob(runPolicy *apiv1.RunPolicy, jobStatus apiv1.
func (jc *JobController) calcPGMinResources(minMember int32, replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec) *corev1.ResourceList {
return CalcPGMinResources(minMember, replicas, jc.PriorityClassLister.Get)
}

func (jc *JobController) ManagedByExternalController(controllerName *string) *string {
if controllerName != nil && *controllerName != apiv1.KubeflowJobsController {
return controllerName
}
return nil
}
42 changes: 42 additions & 0 deletions pkg/controller.v1/common/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/record"
"k8s.io/utils/ptr"
)

func TestDeletePodsAndServices(T *testing.T) {
Expand Down Expand Up @@ -219,6 +220,47 @@ func TestPastActiveDeadline(T *testing.T) {
}
}

func TestManagedByExternalController(T *testing.T) {
cases := map[string]struct {
managedBy *string
wantControllerName *string
}{
"managedBy is nil": {
managedBy: nil,
wantControllerName: nil,
},
"managedBy is empty": {
mszadkow marked this conversation as resolved.
Show resolved Hide resolved
managedBy: ptr.To[string](""),
wantControllerName: ptr.To[string](""),
},
"managedBy is training-operator controller": {
managedBy: ptr.To[string](apiv1.KubeflowJobsController),
wantControllerName: nil,
},
"managedBy is not the training-operator controller": {
managedBy: ptr.To[string]("kueue.x-k8s.io/multikueue"),
wantControllerName: ptr.To[string]("kueue.x-k8s.io/multikueue"),
},
"managedBy is other value": {
managedBy: ptr.To[string]("other-job-controller"),
wantControllerName: ptr.To[string]("other-job-controller"),
},
}
for name, tc := range cases {
T.Run(name, func(t *testing.T) {
jobController := JobController{}
runPolicy := &apiv1.RunPolicy{
ManagedBy: tc.managedBy,
}

gotControllerName := jobController.ManagedByExternalController(runPolicy.ManagedBy)
if diff := cmp.Diff(tc.wantControllerName, gotControllerName); diff != "" {
t.Errorf("Unexpected manager controller (-want +got):\n%s", diff)
}
})
}
}

func newPod(name string, phase corev1.PodPhase) *corev1.Pod {
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Expand Down
5 changes: 5 additions & 0 deletions pkg/controller.v1/mpi/mpijob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ func (jc *MPIJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
return ctrl.Result{}, client.IgnoreNotFound(err)
}

if manager := jc.ManagedByExternalController(mpijob.Spec.RunPolicy.ManagedBy); manager != nil {
logger.Info("Skipping MPIJob managed by a custom controller", "managed-by", manager)
return ctrl.Result{}, nil
}

if err = kubeflowv1.ValidateV1MpiJobSpec(&mpijob.Spec); err != nil {
logger.Error(err, "MPIJob failed validation")
jc.Recorder.Eventf(mpijob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobFailedValidationReason),
Expand Down
61 changes: 61 additions & 0 deletions pkg/controller.v1/mpi/mpijob_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1066,6 +1066,67 @@ var _ = Describe("MPIJob controller", func() {
By("Checking if the startTime is updated")
Expect(created.Status.StartTime).ShouldNot(Equal(startTimeBeforeSuspended))
})

It("Should not reconcile a job while managed by external controller", func() {
By("Creating a MPIJob managed by external controller")
job.Spec.RunPolicy = kubeflowv1.RunPolicy{
ManagedBy: ptr.To(kubeflowv1.MultiKueueController),
}
job.Spec.RunPolicy.Suspend = ptr.To(true)
Expect(testK8sClient.Create(ctx, job)).Should(Succeed())

created := &kubeflowv1.MPIJob{}
By("Checking created MPIJob")
Eventually(func() bool {
err := testK8sClient.Get(ctx, jobKey, created)
return err == nil
}, testutil.Timeout, testutil.Interval).Should(BeTrue())

By("Checking created MPIJob has a nil startTime")
Consistently(func() *metav1.Time {
Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed())
return created.Status.StartTime
}, testutil.ConsistentDuration, testutil.Interval).Should(BeNil())

By("Checking if the pods and services aren't created")
Consistently(func() bool {
launcherPod := &corev1.Pod{}
workerPod := &corev1.Pod{}
launcherSvc := &corev1.Service{}
workerSvc := &corev1.Service{}
errMasterPod := testK8sClient.Get(ctx, launcherKey, launcherPod)
mszadkow marked this conversation as resolved.
Show resolved Hide resolved
errWorkerPod := testK8sClient.Get(ctx, worker0Key, workerPod)
errMasterSvc := testK8sClient.Get(ctx, launcherKey, launcherSvc)
errWorkerSvc := testK8sClient.Get(ctx, worker0Key, workerSvc)
return errors.IsNotFound(errMasterPod) && errors.IsNotFound(errWorkerPod) &&
errors.IsNotFound(errMasterSvc) && errors.IsNotFound(errWorkerSvc)
}, testutil.ConsistentDuration, testutil.Interval).Should(BeTrue(), "pods and services should be created by external controller (here not existent)")

By("Checking if the MPIJob status was not updated")
Eventually(func() []kubeflowv1.JobCondition {
Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed())
return created.Status.Conditions
}, testutil.Timeout, testutil.Interval).Should(BeComparableTo([]kubeflowv1.JobCondition(nil)))

By("Unsuspending the MPIJob")
Eventually(func() error {
Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed())
created.Spec.RunPolicy.Suspend = ptr.To(false)
return testK8sClient.Update(ctx, created)
}, testutil.Timeout, testutil.Interval).Should(Succeed())

By("Checking created MPIJob still has a nil startTime")
Consistently(func() *metav1.Time {
Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed())
return created.Status.StartTime
}, testutil.ConsistentDuration, testutil.Interval).Should(BeNil())

By("Checking if the MPIJob status was not updated, even after unsuspending")
Eventually(func() []kubeflowv1.JobCondition {
Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed())
return created.Status.Conditions
}, testutil.Timeout, testutil.Interval).Should(BeComparableTo([]kubeflowv1.JobCondition(nil)))
})
})
})

Expand Down
5 changes: 5 additions & 0 deletions pkg/controller.v1/paddlepaddle/paddlepaddle_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ func (r *PaddleJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
return ctrl.Result{}, client.IgnoreNotFound(err)
}

if manager := r.ManagedByExternalController(paddlejob.Spec.RunPolicy.ManagedBy); manager != nil {
logger.Info("Skipping PaddleJob managed by a custom controller", "managed-by", manager)
return ctrl.Result{}, nil
}

// Check if reconciliation is needed
jobKey, err := common.KeyFunc(paddlejob)
if err != nil {
Expand Down
Loading
Loading