From ed4df4d884f8036064075fc659503e540e9e4072 Mon Sep 17 00:00:00 2001 From: Michal Szadkowski Date: Thu, 1 Aug 2024 14:08:32 +0200 Subject: [PATCH 1/3] Provide Kubeflow PaddleJob support for MultiKueue --- charts/kueue/templates/rbac/role.yaml | 1 + config/components/rbac/role.yaml | 1 + pkg/controller/jobframework/validation.go | 3 +- .../jobs/paddlejob/paddlejob_controller.go | 3 +- .../paddlejob/paddlejob_controller_test.go | 10 +- .../paddlejob/paddlejob_multikueue_adapter.go | 117 +++++++++++++ .../paddlejob_multikueue_adapter_test.go | 159 ++++++++++++++++++ pkg/util/testingjobs/paddlejob/wrappers.go | 113 +++++++++---- .../create-multikueue-kubeconfig.sh | 16 ++ test/e2e/multikueue/e2e_test.go | 86 ++++++++++ test/e2e/multikueue/suite_test.go | 2 + .../paddlejob/paddlejob_controller_test.go | 5 +- .../integration/multikueue/multikueue_test.go | 138 +++++++++++++++ test/integration/multikueue/suite_test.go | 13 ++ 14 files changed, 627 insertions(+), 40 deletions(-) create mode 100644 pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_multikueue_adapter.go create mode 100644 pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_multikueue_adapter_test.go diff --git a/charts/kueue/templates/rbac/role.yaml b/charts/kueue/templates/rbac/role.yaml index 6e8f026399..12040ddf5d 100644 --- a/charts/kueue/templates/rbac/role.yaml +++ b/charts/kueue/templates/rbac/role.yaml @@ -253,6 +253,7 @@ rules: - paddlejobs/status verbs: - get + - patch - update - apiGroups: - kubeflow.org diff --git a/config/components/rbac/role.yaml b/config/components/rbac/role.yaml index ba8e121e84..2d38be7220 100644 --- a/config/components/rbac/role.yaml +++ b/config/components/rbac/role.yaml @@ -252,6 +252,7 @@ rules: - paddlejobs/status verbs: - get + - patch - update - apiGroups: - kubeflow.org diff --git a/pkg/controller/jobframework/validation.go b/pkg/controller/jobframework/validation.go index 4168abebd4..5dc7bc6cd5 100644 --- a/pkg/controller/jobframework/validation.go +++ b/pkg/controller/jobframework/validation.go @@ -39,7 +39,8 @@ var ( supportedPrebuiltWlJobGVKs = sets.New( batchv1.SchemeGroupVersion.WithKind("Job").String(), jobset.SchemeGroupVersion.WithKind("JobSet").String(), - kftraining.SchemeGroupVersion.WithKind(kftraining.TFJobKind).String()) + kftraining.SchemeGroupVersion.WithKind(kftraining.TFJobKind).String(), + kftraining.SchemeGroupVersion.WithKind(kftraining.PaddleJobKind).String()) ) // ValidateJobOnCreate encapsulates all GenericJob validations that must be performed on a Create operation diff --git a/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_controller.go b/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_controller.go index c9fc931d67..e53957d1d4 100644 --- a/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_controller.go +++ b/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_controller.go @@ -45,13 +45,14 @@ func init() { JobType: &kftraining.PaddleJob{}, AddToScheme: kftraining.AddToScheme, IsManagingObjectsOwner: isPaddleJob, + MultiKueueAdapter: &multikueueAdapter{}, })) } // +kubebuilder:rbac:groups=scheduling.k8s.io,resources=priorityclasses,verbs=list;get;watch // +kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update;patch // +kubebuilder:rbac:groups=kubeflow.org,resources=paddlejobs,verbs=get;list;watch;update;patch -// +kubebuilder:rbac:groups=kubeflow.org,resources=paddlejobs/status,verbs=get;update +// +kubebuilder:rbac:groups=kubeflow.org,resources=paddlejobs/status,verbs=get;update;patch // +kubebuilder:rbac:groups=kubeflow.org,resources=paddlejobs/finalizers,verbs=get;update // +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads/status,verbs=get;update;patch diff --git a/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_controller_test.go b/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_controller_test.go index 4d99f6ed39..e2949c2f51 100644 --- a/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_controller_test.go +++ b/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_controller_test.go @@ -255,8 +255,8 @@ func TestReconciler(t *testing.T) { reconcilerOptions: []jobframework.Option{ jobframework.WithManageJobsWithoutQueueName(true), }, - job: testingpaddlejob.MakePaddleJob("paddlejob", "ns").Parallelism(2).Obj(), - wantJob: testingpaddlejob.MakePaddleJob("paddlejob", "ns").Parallelism(2).Obj(), + job: testingpaddlejob.MakePaddleJob("paddlejob", "ns").PaddleReplicaSpecsDefault().Parallelism(2).Obj(), + wantJob: testingpaddlejob.MakePaddleJob("paddlejob", "ns").PaddleReplicaSpecsDefault().Parallelism(2).Obj(), wantWorkloads: []kueue.Workload{ *utiltesting.MakeWorkload("paddlejob", "ns"). PodSets( @@ -270,12 +270,13 @@ func TestReconciler(t *testing.T) { reconcilerOptions: []jobframework.Option{ jobframework.WithManageJobsWithoutQueueName(false), }, - job: testingpaddlejob.MakePaddleJob("paddlejob", "ns").Parallelism(2).Obj(), - wantJob: testingpaddlejob.MakePaddleJob("paddlejob", "ns").Parallelism(2).Obj(), + job: testingpaddlejob.MakePaddleJob("paddlejob", "ns").PaddleReplicaSpecsDefault().Parallelism(2).Obj(), + wantJob: testingpaddlejob.MakePaddleJob("paddlejob", "ns").PaddleReplicaSpecsDefault().Parallelism(2).Obj(), wantWorkloads: []kueue.Workload{}, }, "when workload is evicted, suspended is reset, restore node affinity": { job: testingpaddlejob.MakePaddleJob("paddlejob", "ns"). + PaddleReplicaSpecsDefault(). Image(""). Args(nil). Queue("foo"). @@ -319,6 +320,7 @@ func TestReconciler(t *testing.T) { Obj(), }, wantJob: testingpaddlejob.MakePaddleJob("paddlejob", "ns"). + PaddleReplicaSpecsDefault(). Image(""). Args(nil). Queue("foo"). diff --git a/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_multikueue_adapter.go b/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_multikueue_adapter.go new file mode 100644 index 0000000000..718e133a3f --- /dev/null +++ b/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_multikueue_adapter.go @@ -0,0 +1,117 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package paddlejob + +import ( + "context" + "errors" + "fmt" + + kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + + kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" + "sigs.k8s.io/kueue/pkg/controller/constants" + "sigs.k8s.io/kueue/pkg/controller/jobframework" + "sigs.k8s.io/kueue/pkg/util/api" + clientutil "sigs.k8s.io/kueue/pkg/util/client" +) + +type multikueueAdapter struct{} + +var _ jobframework.MultiKueueAdapter = (*multikueueAdapter)(nil) + +func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Client, remoteClient client.Client, key types.NamespacedName, workloadName, origin string) error { + localJob := kftraining.PaddleJob{} + err := localClient.Get(ctx, key, &localJob) + if err != nil { + return err + } + + remoteJob := &kftraining.PaddleJob{} + err = remoteClient.Get(ctx, key, remoteJob) + if client.IgnoreNotFound(err) != nil { + return err + } + + // if the remote exists, just copy the status + if err == nil { + return clientutil.PatchStatus(ctx, localClient, &localJob, func() (bool, error) { + localJob.Status = remoteJob.Status + return true, nil + }) + } + + remoteJob = &kftraining.PaddleJob{ + ObjectMeta: api.CloneObjectMetaForCreation(&localJob.ObjectMeta), + Spec: *localJob.Spec.DeepCopy(), + } + + // add the prebuilt workload + if remoteJob.Labels == nil { + remoteJob.Labels = map[string]string{} + } + remoteJob.Labels[constants.PrebuiltWorkloadLabel] = workloadName + remoteJob.Labels[kueuealpha.MultiKueueOriginLabel] = origin + + return remoteClient.Create(ctx, remoteJob) +} + +func (b *multikueueAdapter) DeleteRemoteObject(ctx context.Context, remoteClient client.Client, key types.NamespacedName) error { + job := kftraining.PaddleJob{} + err := remoteClient.Get(ctx, key, &job) + if err != nil { + return client.IgnoreNotFound(err) + } + return client.IgnoreNotFound(remoteClient.Delete(ctx, &job)) +} + +func (b *multikueueAdapter) KeepAdmissionCheckPending() bool { + return false +} + +func (b *multikueueAdapter) IsJobManagedByKueue(ctx context.Context, c client.Client, key types.NamespacedName) (bool, string, error) { + return true, "", nil +} + +func (b *multikueueAdapter) GVK() schema.GroupVersionKind { + return gvk +} + +var _ jobframework.MultiKueueWatcher = (*multikueueAdapter)(nil) + +func (*multikueueAdapter) GetEmptyList() client.ObjectList { + return &kftraining.PaddleJobList{} +} + +func (*multikueueAdapter) WorkloadKeyFor(o runtime.Object) (types.NamespacedName, error) { + paddleJob, isPaddleJob := o.(*kftraining.PaddleJob) + if !isPaddleJob { + return types.NamespacedName{}, errors.New("not a PaddleJob") + } + + prebuiltWl, hasPrebuiltWorkload := paddleJob.Labels[constants.PrebuiltWorkloadLabel] + if !hasPrebuiltWorkload { + return types.NamespacedName{}, fmt.Errorf("no prebuilt workload found for PaddleJob: %s", klog.KObj(paddleJob)) + } + + return types.NamespacedName{Name: prebuiltWl, Namespace: paddleJob.Namespace}, nil +} diff --git a/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_multikueue_adapter_test.go b/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_multikueue_adapter_test.go new file mode 100644 index 0000000000..2c71041ecf --- /dev/null +++ b/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_multikueue_adapter_test.go @@ -0,0 +1,159 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package paddlejob + +import ( + "context" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/interceptor" + + kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" + "sigs.k8s.io/kueue/pkg/controller/constants" + "sigs.k8s.io/kueue/pkg/util/slices" + utiltesting "sigs.k8s.io/kueue/pkg/util/testing" + kfutiltesting "sigs.k8s.io/kueue/pkg/util/testingjobs/paddlejob" +) + +const ( + TestNamespace = "ns" +) + +func TestMultikueueAdapter(t *testing.T) { + objCheckOpts := []cmp.Option{ + cmpopts.IgnoreFields(metav1.ObjectMeta{}, "ResourceVersion"), + cmpopts.EquateEmpty(), + } + + paddleJobBuilder := kfutiltesting.MakePaddleJob("paddlejob1", TestNamespace).Queue("queue").Suspend(false) + + cases := map[string]struct { + managersPaddleJobs []kftraining.PaddleJob + workerPaddleJobs []kftraining.PaddleJob + + operation func(ctx context.Context, adapter *multikueueAdapter, managerClient, workerClient client.Client) error + + wantError error + wantManagersPaddleJobs []kftraining.PaddleJob + wantWorkerPaddleJobs []kftraining.PaddleJob + }{ + "sync creates missing remote PaddleJob": { + managersPaddleJobs: []kftraining.PaddleJob{ + *paddleJobBuilder.Clone().Obj(), + }, + operation: func(ctx context.Context, adapter *multikueueAdapter, managerClient, workerClient client.Client) error { + return adapter.SyncJob(ctx, managerClient, workerClient, types.NamespacedName{Name: "paddlejob1", Namespace: TestNamespace}, "wl1", "origin1") + }, + + wantManagersPaddleJobs: []kftraining.PaddleJob{ + *paddleJobBuilder.Clone().Obj(), + }, + wantWorkerPaddleJobs: []kftraining.PaddleJob{ + *paddleJobBuilder.Clone(). + Label(constants.PrebuiltWorkloadLabel, "wl1"). + Label(kueuealpha.MultiKueueOriginLabel, "origin1"). + Obj(), + }, + }, + "sync status from remote PaddleJob": { + managersPaddleJobs: []kftraining.PaddleJob{ + *paddleJobBuilder.Clone().Obj(), + }, + workerPaddleJobs: []kftraining.PaddleJob{ + *paddleJobBuilder.Clone(). + Label(constants.PrebuiltWorkloadLabel, "wl1"). + Label(kueuealpha.MultiKueueOriginLabel, "origin1"). + StatusConditions(kftraining.JobCondition{Type: kftraining.JobSucceeded, Status: corev1.ConditionTrue}). + Obj(), + }, + operation: func(ctx context.Context, adapter *multikueueAdapter, managerClient, workerClient client.Client) error { + return adapter.SyncJob(ctx, managerClient, workerClient, types.NamespacedName{Name: "paddlejob1", Namespace: TestNamespace}, "wl1", "origin1") + }, + + wantManagersPaddleJobs: []kftraining.PaddleJob{ + *paddleJobBuilder.Clone(). + StatusConditions(kftraining.JobCondition{Type: kftraining.JobSucceeded, Status: corev1.ConditionTrue}). + Obj(), + }, + wantWorkerPaddleJobs: []kftraining.PaddleJob{ + *paddleJobBuilder.Clone(). + Label(constants.PrebuiltWorkloadLabel, "wl1"). + Label(kueuealpha.MultiKueueOriginLabel, "origin1"). + StatusConditions(kftraining.JobCondition{Type: kftraining.JobSucceeded, Status: corev1.ConditionTrue}). + Obj(), + }, + }, + "remote PaddleJob is deleted": { + workerPaddleJobs: []kftraining.PaddleJob{ + *paddleJobBuilder.Clone(). + Label(constants.PrebuiltWorkloadLabel, "wl1"). + Label(kueuealpha.MultiKueueOriginLabel, "origin1"). + Obj(), + }, + operation: func(ctx context.Context, adapter *multikueueAdapter, managerClient, workerClient client.Client) error { + return adapter.DeleteRemoteObject(ctx, workerClient, types.NamespacedName{Name: "paddlejob1", Namespace: TestNamespace}) + }, + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + managerBuilder := utiltesting.NewClientBuilder(kftraining.AddToScheme).WithInterceptorFuncs(interceptor.Funcs{SubResourcePatch: utiltesting.TreatSSAAsStrategicMerge}) + managerBuilder = managerBuilder.WithLists(&kftraining.PaddleJobList{Items: tc.managersPaddleJobs}) + managerBuilder = managerBuilder.WithStatusSubresource(slices.Map(tc.managersPaddleJobs, func(w *kftraining.PaddleJob) client.Object { return w })...) + managerClient := managerBuilder.Build() + + workerBuilder := utiltesting.NewClientBuilder(kftraining.AddToScheme).WithInterceptorFuncs(interceptor.Funcs{SubResourcePatch: utiltesting.TreatSSAAsStrategicMerge}) + workerBuilder = workerBuilder.WithLists(&kftraining.PaddleJobList{Items: tc.workerPaddleJobs}) + workerClient := workerBuilder.Build() + + ctx, _ := utiltesting.ContextWithLog(t) + + adapter := &multikueueAdapter{} + + gotErr := tc.operation(ctx, adapter, managerClient, workerClient) + + if diff := cmp.Diff(tc.wantError, gotErr, cmpopts.EquateErrors()); diff != "" { + t.Errorf("unexpected error (-want/+got):\n%s", diff) + } + + gotManagersPaddleJob := &kftraining.PaddleJobList{} + if err := managerClient.List(ctx, gotManagersPaddleJob); err != nil { + t.Errorf("unexpected list manager's PaddleJobs error %s", err) + } else { + if diff := cmp.Diff(tc.wantManagersPaddleJobs, gotManagersPaddleJob.Items, objCheckOpts...); diff != "" { + t.Errorf("unexpected manager's PaddleJobs (-want/+got):\n%s", diff) + } + } + + gotWorkerPaddleJobs := &kftraining.PaddleJobList{} + if err := workerClient.List(ctx, gotWorkerPaddleJobs); err != nil { + t.Errorf("unexpected list worker's PaddleJobs error %s", err) + } else { + if diff := cmp.Diff(tc.wantWorkerPaddleJobs, gotWorkerPaddleJobs.Items, objCheckOpts...); diff != "" { + t.Errorf("unexpected worker's PaddleJobs (-want/+got):\n%s", diff) + } + } + }) + } +} diff --git a/pkg/util/testingjobs/paddlejob/wrappers.go b/pkg/util/testingjobs/paddlejob/wrappers.go index ee8152e030..7172b939ab 100644 --- a/pkg/util/testingjobs/paddlejob/wrappers.go +++ b/pkg/util/testingjobs/paddlejob/wrappers.go @@ -42,44 +42,87 @@ func MakePaddleJob(name, ns string) *PaddleJobWrapper { RunPolicy: kftraining.RunPolicy{ Suspend: ptr.To(true), }, - PaddleReplicaSpecs: map[kftraining.ReplicaType]*kftraining.ReplicaSpec{ - kftraining.PaddleJobReplicaTypeMaster: { - Replicas: ptr.To[int32](1), - Template: corev1.PodTemplateSpec{ - Spec: corev1.PodSpec{ - RestartPolicy: "Never", - Containers: []corev1.Container{ - { - Name: "c", - Image: "pause", - Command: []string{}, - Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{}}, - }, - }, - NodeSelector: map[string]string{}, - }, + PaddleReplicaSpecs: make(map[kftraining.ReplicaType]*kftraining.ReplicaSpec), + }, + }} +} + +type PaddleReplicaSpecRequirement struct { + ReplicaType kftraining.ReplicaType + Name string + ReplicaCount int32 + Annotations map[string]string + RestartPolicy kftraining.RestartPolicy +} + +func (j *PaddleJobWrapper) PaddleReplicaSpecs(replicaSpecs ...PaddleReplicaSpecRequirement) *PaddleJobWrapper { + j = j.PaddleReplicaSpecsDefault() + for _, rs := range replicaSpecs { + j.Spec.PaddleReplicaSpecs[rs.ReplicaType].Replicas = ptr.To[int32](rs.ReplicaCount) + j.Spec.PaddleReplicaSpecs[rs.ReplicaType].Template.Name = rs.Name + j.Spec.PaddleReplicaSpecs[rs.ReplicaType].Template.Spec.RestartPolicy = corev1.RestartPolicy(rs.RestartPolicy) + j.Spec.PaddleReplicaSpecs[rs.ReplicaType].Template.Spec.Containers[0].Name = "paddle" + + if rs.Annotations != nil { + j.Spec.PaddleReplicaSpecs[rs.ReplicaType].Template.ObjectMeta.Annotations = rs.Annotations + } + } + + return j +} + +func (j *PaddleJobWrapper) PaddleReplicaSpecsDefault() *PaddleJobWrapper { + j.Spec.PaddleReplicaSpecs[kftraining.PaddleJobReplicaTypeMaster] = &kftraining.ReplicaSpec{ + Replicas: ptr.To[int32](1), + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + RestartPolicy: "Never", + Containers: []corev1.Container{ + { + Name: "c", + Image: "pause", + Command: []string{}, + Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{}}, }, }, - kftraining.PaddleJobReplicaTypeWorker: { - Replicas: ptr.To[int32](1), - Template: corev1.PodTemplateSpec{ - Spec: corev1.PodSpec{ - RestartPolicy: "Never", - Containers: []corev1.Container{ - { - Name: "c", - Image: "pause", - Command: []string{}, - Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{}}, - }, - }, - NodeSelector: map[string]string{}, - }, + NodeSelector: map[string]string{}, + }, + }, + } + + j.Spec.PaddleReplicaSpecs[kftraining.PaddleJobReplicaTypeWorker] = &kftraining.ReplicaSpec{ + Replicas: ptr.To[int32](1), + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + RestartPolicy: "Never", + Containers: []corev1.Container{ + { + Name: "c", + Image: "pause", + Command: []string{}, + Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{}}, }, }, + NodeSelector: map[string]string{}, }, }, - }} + } + + return j +} + +// Clone returns deep copy of the PaddleJobWrapper. +func (j *PaddleJobWrapper) Clone() *PaddleJobWrapper { + return &PaddleJobWrapper{PaddleJob: *j.DeepCopy()} +} + +// Label sets the label key and value +func (j *PaddleJobWrapper) Label(key, value string) *PaddleJobWrapper { + if j.Labels == nil { + j.Labels = make(map[string]string) + } + j.Labels[key] = value + return j } // PriorityClass updates job priorityclass. @@ -166,3 +209,9 @@ func (j *PaddleJobWrapper) Active(rType kftraining.ReplicaType, c int32) *Paddle } return j } + +// StatusConditions updates status conditions of the PaddleJob. +func (j *PaddleJobWrapper) StatusConditions(conditions ...kftraining.JobCondition) *PaddleJobWrapper { + j.Status.Conditions = conditions + return j +} diff --git a/site/static/examples/multikueue/create-multikueue-kubeconfig.sh b/site/static/examples/multikueue/create-multikueue-kubeconfig.sh index edb87f1303..eb3224306f 100644 --- a/site/static/examples/multikueue/create-multikueue-kubeconfig.sh +++ b/site/static/examples/multikueue/create-multikueue-kubeconfig.sh @@ -101,6 +101,22 @@ rules: - tfjobs/status verbs: - get +- apiGroups: + - kubeflow.org + resources: + - paddlejobs + verbs: + - create + - delete + - get + - list + - watch +- apiGroups: + - kubeflow.org + resources: + - paddlejobs/status + verbs: + - get --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding diff --git a/test/e2e/multikueue/e2e_test.go b/test/e2e/multikueue/e2e_test.go index 2ca6545bb7..47ba05bf3a 100644 --- a/test/e2e/multikueue/e2e_test.go +++ b/test/e2e/multikueue/e2e_test.go @@ -40,10 +40,12 @@ import ( kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" workloadjob "sigs.k8s.io/kueue/pkg/controller/jobs/job" workloadjobset "sigs.k8s.io/kueue/pkg/controller/jobs/jobset" + workloadpaddlejob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/paddlejob" workloadtfjob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/tfjob" utiltesting "sigs.k8s.io/kueue/pkg/util/testing" testingjob "sigs.k8s.io/kueue/pkg/util/testingjobs/job" testingjobset "sigs.k8s.io/kueue/pkg/util/testingjobs/jobset" + testingpaddlejob "sigs.k8s.io/kueue/pkg/util/testingjobs/paddlejob" testingtfjob "sigs.k8s.io/kueue/pkg/util/testingjobs/tfjob" "sigs.k8s.io/kueue/pkg/workload" "sigs.k8s.io/kueue/test/util" @@ -459,6 +461,90 @@ var _ = ginkgo.Describe("MultiKueue", func() { }, util.Timeout, util.Interval).Should(gomega.Succeed()) }) }) + + ginkgo.It("Should run a kubeflow PaddleJob on worker if admitted", func() { + paddleJob := testingpaddlejob.MakePaddleJob("paddlejob1", managerNs.Name). + Queue(managerLq.Name). + PaddleReplicaSpecs( + testingpaddlejob.PaddleReplicaSpecRequirement{ + ReplicaType: kftraining.PaddleJobReplicaTypeMaster, + ReplicaCount: 1, + RestartPolicy: "OnFailure", + }, + testingpaddlejob.PaddleReplicaSpecRequirement{ + ReplicaType: kftraining.PaddleJobReplicaTypeWorker, + ReplicaCount: 1, + RestartPolicy: "OnFailure", + }, + ). + Request(kftraining.PaddleJobReplicaTypeMaster, corev1.ResourceCPU, "0.6"). + Request(kftraining.PaddleJobReplicaTypeMaster, corev1.ResourceMemory, "200M"). + Request(kftraining.PaddleJobReplicaTypeWorker, corev1.ResourceCPU, "0.5"). + Request(kftraining.PaddleJobReplicaTypeWorker, corev1.ResourceMemory, "200M"). + Image("gcr.io/k8s-staging-perf-tests/sleep:v0.1.0"). + Args([]string{"1ms"}). + Obj() + + ginkgo.By("Creating the PaddleJob", func() { + gomega.Expect(k8sManagerClient.Create(ctx, paddleJob)).Should(gomega.Succeed()) + }) + + wlLookupKey := types.NamespacedName{Name: workloadpaddlejob.GetWorkloadNameForPaddleJob(paddleJob.Name, paddleJob.UID), Namespace: managerNs.Name} + + // the execution should be given to the worker + ginkgo.By("Waiting to be admitted in worker1 and manager", func() { + gomega.Eventually(func(g gomega.Gomega) { + createdWorkload := &kueue.Workload{} + g.Expect(k8sManagerClient.Get(ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) + g.Expect(apimeta.FindStatusCondition(createdWorkload.Status.Conditions, kueue.WorkloadAdmitted)).To(gomega.BeComparableTo(&metav1.Condition{ + Type: kueue.WorkloadAdmitted, + Status: metav1.ConditionTrue, + Reason: "Admitted", + Message: "The workload is admitted", + }, util.IgnoreConditionTimestampsAndObservedGeneration)) + g.Expect(workload.FindAdmissionCheck(createdWorkload.Status.AdmissionChecks, multiKueueAc.Name)).To(gomega.BeComparableTo(&kueue.AdmissionCheckState{ + Name: multiKueueAc.Name, + State: kueue.CheckStateReady, + Message: `The workload got reservation on "worker1"`, + }, cmpopts.IgnoreFields(kueue.AdmissionCheckState{}, "LastTransitionTime"))) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("Waiting for the PaddleJob to finish", func() { + gomega.Eventually(func(g gomega.Gomega) { + createdPaddleJob := &kftraining.PaddleJob{} + g.Expect(k8sManagerClient.Get(ctx, client.ObjectKeyFromObject(paddleJob), createdPaddleJob)).To(gomega.Succeed()) + g.Expect(createdPaddleJob.Status.ReplicaStatuses[kftraining.PaddleJobReplicaTypeMaster]).To(gomega.BeComparableTo( + &kftraining.ReplicaStatus{ + Active: 0, + Succeeded: 1, + Selector: fmt.Sprintf("training.kubeflow.org/job-name=%s,training.kubeflow.org/operator-name=paddlejob-controller,training.kubeflow.org/replica-type=master", createdPaddleJob.Name), + }, + util.IgnoreConditionTimestampsAndObservedGeneration)) + + createdWorkload := &kueue.Workload{} + g.Expect(k8sManagerClient.Get(ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) + g.Expect(apimeta.FindStatusCondition(createdWorkload.Status.Conditions, kueue.WorkloadFinished)).To(gomega.BeComparableTo(&metav1.Condition{ + Type: kueue.WorkloadFinished, + Status: metav1.ConditionTrue, + Reason: kueue.WorkloadFinishedReasonSucceeded, + Message: fmt.Sprintf("PaddleJob %s is successfully completed.", paddleJob.Name), + }, util.IgnoreConditionTimestampsAndObservedGeneration)) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("Checking no objects are left in the worker clusters and the PaddleJob is completed", func() { + gomega.Eventually(func(g gomega.Gomega) { + workerWl := &kueue.Workload{} + g.Expect(k8sWorker1Client.Get(ctx, wlLookupKey, workerWl)).To(utiltesting.BeNotFoundError()) + g.Expect(k8sWorker2Client.Get(ctx, wlLookupKey, workerWl)).To(utiltesting.BeNotFoundError()) + workerPaddleJob := &kftraining.PaddleJob{} + g.Expect(k8sWorker1Client.Get(ctx, client.ObjectKeyFromObject(paddleJob), workerPaddleJob)).To(utiltesting.BeNotFoundError()) + g.Expect(k8sWorker2Client.Get(ctx, client.ObjectKeyFromObject(paddleJob), workerPaddleJob)).To(utiltesting.BeNotFoundError()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + }) + }) ginkgo.When("The connection to a worker cluster is unreliable", func() { ginkgo.It("Should update the cluster status to reflect the connection state", func() { diff --git a/test/e2e/multikueue/suite_test.go b/test/e2e/multikueue/suite_test.go index fb7fe26026..02fc6a43d8 100644 --- a/test/e2e/multikueue/suite_test.go +++ b/test/e2e/multikueue/suite_test.go @@ -89,6 +89,8 @@ func kubeconfigForMultiKueueSA(ctx context.Context, c client.Client, restConfig policyRule(kueue.SchemeGroupVersion.Group, "workloads/status", "get", "patch", "update"), policyRule(kftraining.SchemeGroupVersion.Group, "tfjobs", resourceVerbs...), policyRule(kftraining.SchemeGroupVersion.Group, "tfjobs/status", "get"), + policyRule(kftraining.SchemeGroupVersion.Group, "paddlejobs", resourceVerbs...), + policyRule(kftraining.SchemeGroupVersion.Group, "paddlejobs/status", "get"), }, } err := c.Create(ctx, cr) diff --git a/test/integration/controller/jobs/paddlejob/paddlejob_controller_test.go b/test/integration/controller/jobs/paddlejob/paddlejob_controller_test.go index 3f43f972c6..452f0c1a1e 100644 --- a/test/integration/controller/jobs/paddlejob/paddlejob_controller_test.go +++ b/test/integration/controller/jobs/paddlejob/paddlejob_controller_test.go @@ -75,7 +75,7 @@ var _ = ginkgo.Describe("Job controller", ginkgo.Ordered, ginkgo.ContinueOnFailu }) ginkgo.It("Should reconcile PaddleJobs", func() { - kfJob := kubeflowjob.KubeflowJob{KFJobControl: (*workloadpaddlejob.JobControl)(testingpaddlejob.MakePaddleJob(jobName, ns.Name).Obj())} + kfJob := kubeflowjob.KubeflowJob{KFJobControl: (*workloadpaddlejob.JobControl)(testingpaddlejob.MakePaddleJob(jobName, ns.Name).PaddleReplicaSpecsDefault().Obj())} createdJob := kubeflowjob.KubeflowJob{KFJobControl: (*workloadpaddlejob.JobControl)(&kftraining.PaddleJob{})} kftesting.ShouldReconcileJob(ctx, k8sClient, kfJob, createdJob, []kftesting.PodSetsResource{ @@ -127,7 +127,7 @@ var _ = ginkgo.Describe("Job controller when waitForPodsReady enabled", ginkgo.O ginkgo.DescribeTable("Single job at different stages of progress towards completion", func(podsReadyTestSpec kftesting.PodsReadyTestSpec) { - kfJob := kubeflowjob.KubeflowJob{KFJobControl: (*workloadpaddlejob.JobControl)(testingpaddlejob.MakePaddleJob(jobName, ns.Name).Parallelism(2).Obj())} + kfJob := kubeflowjob.KubeflowJob{KFJobControl: (*workloadpaddlejob.JobControl)(testingpaddlejob.MakePaddleJob(jobName, ns.Name).PaddleReplicaSpecsDefault().Parallelism(2).Obj())} createdJob := kubeflowjob.KubeflowJob{KFJobControl: (*workloadpaddlejob.JobControl)(&kftraining.PaddleJob{})} kftesting.JobControllerWhenWaitForPodsReadyEnabled(ctx, k8sClient, kfJob, createdJob, podsReadyTestSpec, []kftesting.PodSetsResource{ @@ -281,6 +281,7 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", ginkgo.Orde kfJob := kubeflowjob.KubeflowJob{KFJobControl: (*workloadpaddlejob.JobControl)( testingpaddlejob.MakePaddleJob(jobName, ns.Name).Queue(localQueue.Name). + PaddleReplicaSpecsDefault(). Request(kftraining.PaddleJobReplicaTypeMaster, corev1.ResourceCPU, "3"). Request(kftraining.PaddleJobReplicaTypeWorker, corev1.ResourceCPU, "4"). Obj(), diff --git a/test/integration/multikueue/multikueue_test.go b/test/integration/multikueue/multikueue_test.go index 73466c9949..4935c1fd5e 100644 --- a/test/integration/multikueue/multikueue_test.go +++ b/test/integration/multikueue/multikueue_test.go @@ -41,11 +41,13 @@ import ( "sigs.k8s.io/kueue/pkg/controller/admissionchecks/multikueue" workloadjob "sigs.k8s.io/kueue/pkg/controller/jobs/job" workloadjobset "sigs.k8s.io/kueue/pkg/controller/jobs/jobset" + workloadpaddlejob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/paddlejob" workloadtfjob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/tfjob" "sigs.k8s.io/kueue/pkg/features" utiltesting "sigs.k8s.io/kueue/pkg/util/testing" testingjob "sigs.k8s.io/kueue/pkg/util/testingjobs/job" testingjobset "sigs.k8s.io/kueue/pkg/util/testingjobs/jobset" + testingpaddlejob "sigs.k8s.io/kueue/pkg/util/testingjobs/paddlejob" testingtfjob "sigs.k8s.io/kueue/pkg/util/testingjobs/tfjob" "sigs.k8s.io/kueue/pkg/workload" "sigs.k8s.io/kueue/test/util" @@ -925,6 +927,142 @@ var _ = ginkgo.Describe("Multikueue", ginkgo.Ordered, ginkgo.ContinueOnFailure, }) }) + ginkgo.FIt("Should run a PaddleJob on worker if admitted", func() { + paddleJob := testingpaddlejob.MakePaddleJob("paddlejob1", managerNs.Name). + Queue(managerLq.Name). + PaddleReplicaSpecs( + testingpaddlejob.PaddleReplicaSpecRequirement{ + ReplicaType: kftraining.PaddleJobReplicaTypeMaster, + ReplicaCount: 1, + Name: "paddlejob-master", + RestartPolicy: "OnFailure", + }, + testingpaddlejob.PaddleReplicaSpecRequirement{ + ReplicaType: kftraining.PaddleJobReplicaTypeWorker, + ReplicaCount: 3, + Name: "paddlejob-worker", + RestartPolicy: "OnFailure", + }, + ). + Obj() + gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, paddleJob)).Should(gomega.Succeed()) + + wlLookupKey := types.NamespacedName{Name: workloadpaddlejob.GetWorkloadNameForPaddleJob(paddleJob.Name, paddleJob.UID), Namespace: managerNs.Name} + admission := utiltesting.MakeAdmission(managerCq.Name).PodSets( + kueue.PodSetAssignment{ + Name: "master", + }, kueue.PodSetAssignment{ + Name: "worker", + }, + ).Obj() + + ginkgo.By("setting workload reservation in the management cluster", func() { + createdWorkload := &kueue.Workload{} + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) + g.Expect(util.SetQuotaReservation(managerTestCluster.ctx, managerTestCluster.client, createdWorkload, admission)).To(gomega.Succeed()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("checking the workload creation in the worker clusters", func() { + managerWl := &kueue.Workload{} + createdWorkload := &kueue.Workload{} + gomega.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, managerWl)).To(gomega.Succeed()) + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) + g.Expect(createdWorkload.Spec).To(gomega.BeComparableTo(managerWl.Spec)) + g.Expect(worker1TestCluster.client.Get(worker1TestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) + g.Expect(createdWorkload.Spec).To(gomega.BeComparableTo(managerWl.Spec)) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("setting workload reservation in worker2, the workload is admitted in manager and worker1 wl is removed", func() { + createdWorkload := &kueue.Workload{} + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) + g.Expect(util.SetQuotaReservation(worker2TestCluster.ctx, worker2TestCluster.client, createdWorkload, admission)).To(gomega.Succeed()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) + acs := workload.FindAdmissionCheck(createdWorkload.Status.AdmissionChecks, multikueueAC.Name) + g.Expect(acs).NotTo(gomega.BeNil()) + g.Expect(acs.State).To(gomega.Equal(kueue.CheckStateReady)) + g.Expect(acs.Message).To(gomega.Equal(`The workload got reservation on "worker2"`)) + + g.Expect(apimeta.FindStatusCondition(createdWorkload.Status.Conditions, kueue.WorkloadAdmitted)).To(gomega.BeComparableTo(&metav1.Condition{ + Type: kueue.WorkloadAdmitted, + Status: metav1.ConditionTrue, + Reason: "Admitted", + Message: "The workload is admitted", + }, util.IgnoreConditionTimestampsAndObservedGeneration)) + }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) + + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(worker1TestCluster.client.Get(worker1TestCluster.ctx, wlLookupKey, createdWorkload)).To(utiltesting.BeNotFoundError()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("changing the status of the PaddleJob in the worker, updates the manager's PaddleJob status", func() { + gomega.Eventually(func(g gomega.Gomega) { + createdPaddleJob := kftraining.PaddleJob{} + g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, client.ObjectKeyFromObject(paddleJob), &createdPaddleJob)).To(gomega.Succeed()) + createdPaddleJob.Status.ReplicaStatuses = map[kftraining.ReplicaType]*kftraining.ReplicaStatus{ + kftraining.PaddleJobReplicaTypeMaster: { + Active: 1, + }, + kftraining.PaddleJobReplicaTypeWorker: { + Active: 3, + }, + } + g.Expect(worker2TestCluster.client.Status().Update(worker2TestCluster.ctx, &createdPaddleJob)).To(gomega.Succeed()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + gomega.Eventually(func(g gomega.Gomega) { + createdPaddleJob := kftraining.PaddleJob{} + g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, client.ObjectKeyFromObject(paddleJob), &createdPaddleJob)).To(gomega.Succeed()) + g.Expect(createdPaddleJob.Status.ReplicaStatuses).To(gomega.Equal( + map[kftraining.ReplicaType]*kftraining.ReplicaStatus{ + kftraining.PaddleJobReplicaTypeMaster: { + Active: 1, + }, + kftraining.PaddleJobReplicaTypeWorker: { + Active: 3, + }, + })) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("finishing the worker PaddleJob, the manager's wl is marked as finished and the worker2 wl removed", func() { + gomega.Eventually(func(g gomega.Gomega) { + createdPaddleJob := kftraining.PaddleJob{} + g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, client.ObjectKeyFromObject(paddleJob), &createdPaddleJob)).To(gomega.Succeed()) + createdPaddleJob.Status.Conditions = append(createdPaddleJob.Status.Conditions, kftraining.JobCondition{ + Type: kftraining.JobSucceeded, + Status: corev1.ConditionTrue, + Reason: "ByTest", + Message: "PaddleJob finished successfully", + }) + g.Expect(worker2TestCluster.client.Status().Update(worker2TestCluster.ctx, &createdPaddleJob)).To(gomega.Succeed()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + gomega.Eventually(func(g gomega.Gomega) { + createdWorkload := &kueue.Workload{} + g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) + g.Expect(apimeta.FindStatusCondition(createdWorkload.Status.Conditions, kueue.WorkloadFinished)).To(gomega.BeComparableTo(&metav1.Condition{ + Type: kueue.WorkloadFinished, + Status: metav1.ConditionTrue, + Reason: string(kftraining.JobSucceeded), + Message: "PaddleJob finished successfully", + }, util.IgnoreConditionTimestampsAndObservedGeneration)) + }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) + + gomega.Eventually(func(g gomega.Gomega) { + createdWorkload := &kueue.Workload{} + g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, wlLookupKey, createdWorkload)).To(utiltesting.BeNotFoundError()) + }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) + }) + }) + ginkgo.It("Should remove the worker's workload and job after reconnect when the managers job and workload are deleted", func() { job := testingjob.MakeJob("job", managerNs.Name). Queue(managerLq.Name). diff --git a/test/integration/multikueue/suite_test.go b/test/integration/multikueue/suite_test.go index 0c03931b1d..742e1dbffb 100644 --- a/test/integration/multikueue/suite_test.go +++ b/test/integration/multikueue/suite_test.go @@ -41,6 +41,7 @@ import ( "sigs.k8s.io/kueue/pkg/controller/core/indexer" workloadjob "sigs.k8s.io/kueue/pkg/controller/jobs/job" workloadjobset "sigs.k8s.io/kueue/pkg/controller/jobs/jobset" + workloadpaddlejob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/paddlejob" workloadtfjob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/tfjob" "sigs.k8s.io/kueue/pkg/queue" "sigs.k8s.io/kueue/pkg/util/kubeversion" @@ -148,6 +149,18 @@ func managerSetup(ctx context.Context, mgr manager.Manager) { err = workloadtfjob.SetupTFJobWebhook(mgr) gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + err = workloadpaddlejob.SetupIndexes(ctx, mgr.GetFieldIndexer()) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + paddleJobReconciler := workloadpaddlejob.NewReconciler( + mgr.GetClient(), + mgr.GetEventRecorderFor(constants.JobControllerName)) + err = paddleJobReconciler.SetupWithManager(mgr) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + err = workloadpaddlejob.SetupPaddleJobWebhook(mgr) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) } func managerAndMultiKueueSetup(ctx context.Context, mgr manager.Manager, gcInterval time.Duration) { From 1ac729c76c5e2ba680c59c65d77d80d6e6906047 Mon Sep 17 00:00:00 2001 From: Michal Szadkowski Date: Thu, 1 Aug 2024 16:45:54 +0200 Subject: [PATCH 2/3] update after code review --- test/e2e/multikueue/e2e_test.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/test/e2e/multikueue/e2e_test.go b/test/e2e/multikueue/e2e_test.go index 47ba05bf3a..ec0c97110c 100644 --- a/test/e2e/multikueue/e2e_test.go +++ b/test/e2e/multikueue/e2e_test.go @@ -463,6 +463,7 @@ var _ = ginkgo.Describe("MultiKueue", func() { }) ginkgo.It("Should run a kubeflow PaddleJob on worker if admitted", func() { + // Since it requires 1600M memory, this job can only be admitted in worker 2. paddleJob := testingpaddlejob.MakePaddleJob("paddlejob1", managerNs.Name). Queue(managerLq.Name). PaddleReplicaSpecs( @@ -477,10 +478,10 @@ var _ = ginkgo.Describe("MultiKueue", func() { RestartPolicy: "OnFailure", }, ). - Request(kftraining.PaddleJobReplicaTypeMaster, corev1.ResourceCPU, "0.6"). - Request(kftraining.PaddleJobReplicaTypeMaster, corev1.ResourceMemory, "200M"). - Request(kftraining.PaddleJobReplicaTypeWorker, corev1.ResourceCPU, "0.5"). - Request(kftraining.PaddleJobReplicaTypeWorker, corev1.ResourceMemory, "200M"). + Request(kftraining.PaddleJobReplicaTypeMaster, corev1.ResourceCPU, "0.2"). + Request(kftraining.PaddleJobReplicaTypeMaster, corev1.ResourceMemory, "800M"). + Request(kftraining.PaddleJobReplicaTypeWorker, corev1.ResourceCPU, "0.2"). + Request(kftraining.PaddleJobReplicaTypeWorker, corev1.ResourceMemory, "800M"). Image("gcr.io/k8s-staging-perf-tests/sleep:v0.1.0"). Args([]string{"1ms"}). Obj() @@ -492,7 +493,7 @@ var _ = ginkgo.Describe("MultiKueue", func() { wlLookupKey := types.NamespacedName{Name: workloadpaddlejob.GetWorkloadNameForPaddleJob(paddleJob.Name, paddleJob.UID), Namespace: managerNs.Name} // the execution should be given to the worker - ginkgo.By("Waiting to be admitted in worker1 and manager", func() { + ginkgo.By("Waiting to be admitted in worker2 and manager", func() { gomega.Eventually(func(g gomega.Gomega) { createdWorkload := &kueue.Workload{} g.Expect(k8sManagerClient.Get(ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) @@ -505,7 +506,7 @@ var _ = ginkgo.Describe("MultiKueue", func() { g.Expect(workload.FindAdmissionCheck(createdWorkload.Status.AdmissionChecks, multiKueueAc.Name)).To(gomega.BeComparableTo(&kueue.AdmissionCheckState{ Name: multiKueueAc.Name, State: kueue.CheckStateReady, - Message: `The workload got reservation on "worker1"`, + Message: `The workload got reservation on "worker2"`, }, cmpopts.IgnoreFields(kueue.AdmissionCheckState{}, "LastTransitionTime"))) }, util.Timeout, util.Interval).Should(gomega.Succeed()) }) From e1a88e31c4dced074304615b0cf3769f839e5672 Mon Sep 17 00:00:00 2001 From: Michal Szadkowski Date: Fri, 2 Aug 2024 08:27:22 +0200 Subject: [PATCH 3/3] fix after review --- .../kubeflow/jobs/paddlejob/paddlejob_multikueue_adapter.go | 4 ++-- test/e2e/multikueue/e2e_test.go | 1 - test/integration/multikueue/multikueue_test.go | 2 +- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_multikueue_adapter.go b/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_multikueue_adapter.go index 718e133a3f..850ee3beb2 100644 --- a/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_multikueue_adapter.go +++ b/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_multikueue_adapter.go @@ -67,7 +67,7 @@ func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Clie // add the prebuilt workload if remoteJob.Labels == nil { - remoteJob.Labels = map[string]string{} + remoteJob.Labels = make(map[string]string, 2) } remoteJob.Labels[constants.PrebuiltWorkloadLabel] = workloadName remoteJob.Labels[kueuealpha.MultiKueueOriginLabel] = origin @@ -88,7 +88,7 @@ func (b *multikueueAdapter) KeepAdmissionCheckPending() bool { return false } -func (b *multikueueAdapter) IsJobManagedByKueue(ctx context.Context, c client.Client, key types.NamespacedName) (bool, string, error) { +func (b *multikueueAdapter) IsJobManagedByKueue(context.Context, client.Client, types.NamespacedName) (bool, string, error) { return true, "", nil } diff --git a/test/e2e/multikueue/e2e_test.go b/test/e2e/multikueue/e2e_test.go index ec0c97110c..ff0401a925 100644 --- a/test/e2e/multikueue/e2e_test.go +++ b/test/e2e/multikueue/e2e_test.go @@ -545,7 +545,6 @@ var _ = ginkgo.Describe("MultiKueue", func() { }, util.Timeout, util.Interval).Should(gomega.Succeed()) }) }) - }) ginkgo.When("The connection to a worker cluster is unreliable", func() { ginkgo.It("Should update the cluster status to reflect the connection state", func() { diff --git a/test/integration/multikueue/multikueue_test.go b/test/integration/multikueue/multikueue_test.go index 4935c1fd5e..b38e180b93 100644 --- a/test/integration/multikueue/multikueue_test.go +++ b/test/integration/multikueue/multikueue_test.go @@ -927,7 +927,7 @@ var _ = ginkgo.Describe("Multikueue", ginkgo.Ordered, ginkgo.ContinueOnFailure, }) }) - ginkgo.FIt("Should run a PaddleJob on worker if admitted", func() { + ginkgo.It("Should run a PaddleJob on worker if admitted", func() { paddleJob := testingpaddlejob.MakePaddleJob("paddlejob1", managerNs.Name). Queue(managerLq.Name). PaddleReplicaSpecs(