From e63a99038491783be62f34dff5d6920b2725a517 Mon Sep 17 00:00:00 2001 From: Michal Szadkowski Date: Wed, 7 Aug 2024 17:24:03 +0200 Subject: [PATCH 1/4] attempt to commonize Kubeflow jobs Multikueue support methods --- pkg/controller/jobframework/interface.go | 5 ++ pkg/controller/jobs/kubeflow/common/common.go | 89 +++++++++++++++++++ .../paddlejob/paddlejob_multikueue_adapter.go | 68 +++----------- .../pytorchjob/pytorch_multikueue_adapter.go | 68 +++----------- .../jobs/tfjob/tfjob_multikueue_adapter.go | 68 +++----------- .../xgboostjob_multikueue_adapter.go | 68 +++----------- .../integration/multikueue/multikueue_test.go | 2 +- 7 files changed, 151 insertions(+), 217 deletions(-) create mode 100644 pkg/controller/jobs/kubeflow/common/common.go diff --git a/pkg/controller/jobframework/interface.go b/pkg/controller/jobframework/interface.go index 32193d1714..8445f439b8 100644 --- a/pkg/controller/jobframework/interface.go +++ b/pkg/controller/jobframework/interface.go @@ -172,6 +172,11 @@ type MultiKueueAdapter interface { GVK() schema.GroupVersionKind } +type UpdateRemoteJob interface { + UpdateRemoteJobStatus(localJob, remoteJob interface{}) + UpdateRemoteJobSpec(localJob, remoteJob interface{}) +} + // MultiKueueWatcher optional interface that can be implemented by a MultiKueueAdapter // to receive job related watch events from the worker cluster. // If not implemented, MultiKueue will only receive events related to the job's workload. diff --git a/pkg/controller/jobs/kubeflow/common/common.go b/pkg/controller/jobs/kubeflow/common/common.go new file mode 100644 index 0000000000..07472a34bd --- /dev/null +++ b/pkg/controller/jobs/kubeflow/common/common.go @@ -0,0 +1,89 @@ +package common + +import ( + "context" + "fmt" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" + + "sigs.k8s.io/controller-runtime/pkg/client" + kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" + "sigs.k8s.io/kueue/pkg/controller/constants" + "sigs.k8s.io/kueue/pkg/controller/jobframework" + clientutil "sigs.k8s.io/kueue/pkg/util/client" +) + +type objAsPtr[T any] interface { + metav1.Object + client.Object + *T +} + +func SyncJob[PtrT objAsPtr[T], T any]( + ctx context.Context, + localClient client.Client, + remoteClient client.Client, + key types.NamespacedName, + workloadName, origin string, + b jobframework.UpdateRemoteJob) error { + + localJob := PtrT(new(T)) + err := localClient.Get(ctx, key, localJob) + if err != nil { + return err + } + + remoteJob := PtrT(new(T)) + err = remoteClient.Get(ctx, key, remoteJob) + if client.IgnoreNotFound(err) != nil { + return err + } + + if err == nil { + return clientutil.PatchStatus(ctx, localClient, localJob, func() (bool, error) { + // if the remote exists, just copy the status + b.UpdateRemoteJobStatus(localJob, remoteJob) + return true, nil + }) + } + + remoteJob = PtrT(new(T)) + b.UpdateRemoteJobSpec(localJob, remoteJob) + + // add the prebuilt workload + labels := remoteJob.GetLabels() + if remoteJob.GetLabels() == nil { + labels = make(map[string]string, 2) + } + labels[constants.PrebuiltWorkloadLabel] = workloadName + labels[kueuealpha.MultiKueueOriginLabel] = origin + remoteJob.SetLabels(labels) + + return remoteClient.Create(ctx, remoteJob) +} + +func DeleteRemoteObject[PtrT objAsPtr[T], T any](ctx context.Context, remoteClient client.Client, key types.NamespacedName) error { + job := PtrT(new(T)) + err := remoteClient.Get(ctx, key, job) + if err != nil { + return client.IgnoreNotFound(err) + } + return client.IgnoreNotFound(remoteClient.Delete(ctx, job)) +} + +func WorkloadKeyFor[PtrT objAsPtr[T], T any](o runtime.Object, JobName string) (types.NamespacedName, error) { + job, isTheJob := o.(PtrT) + if !isTheJob { + return types.NamespacedName{}, fmt.Errorf("not a %s", JobName) + } + + prebuiltWl, hasPrebuiltWorkload := job.GetLabels()[constants.PrebuiltWorkloadLabel] + if !hasPrebuiltWorkload { + return types.NamespacedName{}, fmt.Errorf("no prebuilt workload found for %s: %s", JobName, klog.KObj(job)) + } + + return types.NamespacedName{Name: prebuiltWl, Namespace: job.GetNamespace()}, nil +} diff --git a/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_multikueue_adapter.go b/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_multikueue_adapter.go index 850ee3beb2..8abf0247af 100644 --- a/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_multikueue_adapter.go +++ b/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_multikueue_adapter.go @@ -18,70 +18,40 @@ package paddlejob import ( "context" - "errors" - "fmt" kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" - "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" - kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" - "sigs.k8s.io/kueue/pkg/controller/constants" "sigs.k8s.io/kueue/pkg/controller/jobframework" + kfcommon "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/common" "sigs.k8s.io/kueue/pkg/util/api" - clientutil "sigs.k8s.io/kueue/pkg/util/client" ) type multikueueAdapter struct{} var _ jobframework.MultiKueueAdapter = (*multikueueAdapter)(nil) +var _ jobframework.UpdateRemoteJob = (*multikueueAdapter)(nil) -func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Client, remoteClient client.Client, key types.NamespacedName, workloadName, origin string) error { - localJob := kftraining.PaddleJob{} - err := localClient.Get(ctx, key, &localJob) - if err != nil { - return err - } - - remoteJob := &kftraining.PaddleJob{} - err = remoteClient.Get(ctx, key, remoteJob) - if client.IgnoreNotFound(err) != nil { - return err - } - - // if the remote exists, just copy the status - if err == nil { - return clientutil.PatchStatus(ctx, localClient, &localJob, func() (bool, error) { - localJob.Status = remoteJob.Status - return true, nil - }) - } - - remoteJob = &kftraining.PaddleJob{ - ObjectMeta: api.CloneObjectMetaForCreation(&localJob.ObjectMeta), - Spec: *localJob.Spec.DeepCopy(), - } +func (b *multikueueAdapter) UpdateRemoteJobStatus(localJob, remoteJob interface{}) { + localJob.(*kftraining.PaddleJob).Status = remoteJob.(*kftraining.PaddleJob).Status +} - // add the prebuilt workload - if remoteJob.Labels == nil { - remoteJob.Labels = make(map[string]string, 2) +func (b *multikueueAdapter) UpdateRemoteJobSpec(localJob, remoteJob interface{}) { + *remoteJob.(*kftraining.PaddleJob) = kftraining.PaddleJob{ + ObjectMeta: api.CloneObjectMetaForCreation(&localJob.(*kftraining.PaddleJob).ObjectMeta), + Spec: *localJob.(*kftraining.PaddleJob).Spec.DeepCopy(), } - remoteJob.Labels[constants.PrebuiltWorkloadLabel] = workloadName - remoteJob.Labels[kueuealpha.MultiKueueOriginLabel] = origin +} - return remoteClient.Create(ctx, remoteJob) +func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Client, remoteClient client.Client, key types.NamespacedName, workloadName, origin string) error { + return kfcommon.SyncJob[*kftraining.PaddleJob](ctx, localClient, remoteClient, key, workloadName, origin, b) } func (b *multikueueAdapter) DeleteRemoteObject(ctx context.Context, remoteClient client.Client, key types.NamespacedName) error { - job := kftraining.PaddleJob{} - err := remoteClient.Get(ctx, key, &job) - if err != nil { - return client.IgnoreNotFound(err) - } - return client.IgnoreNotFound(remoteClient.Delete(ctx, &job)) + return kfcommon.DeleteRemoteObject[*kftraining.PaddleJob](ctx, remoteClient, key) } func (b *multikueueAdapter) KeepAdmissionCheckPending() bool { @@ -103,15 +73,5 @@ func (*multikueueAdapter) GetEmptyList() client.ObjectList { } func (*multikueueAdapter) WorkloadKeyFor(o runtime.Object) (types.NamespacedName, error) { - paddleJob, isPaddleJob := o.(*kftraining.PaddleJob) - if !isPaddleJob { - return types.NamespacedName{}, errors.New("not a PaddleJob") - } - - prebuiltWl, hasPrebuiltWorkload := paddleJob.Labels[constants.PrebuiltWorkloadLabel] - if !hasPrebuiltWorkload { - return types.NamespacedName{}, fmt.Errorf("no prebuilt workload found for PaddleJob: %s", klog.KObj(paddleJob)) - } - - return types.NamespacedName{Name: prebuiltWl, Namespace: paddleJob.Namespace}, nil + return kfcommon.WorkloadKeyFor[*kftraining.PaddleJob](o, kftraining.PaddleJobKind) } diff --git a/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorch_multikueue_adapter.go b/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorch_multikueue_adapter.go index 2976d79add..b4c58d24bc 100644 --- a/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorch_multikueue_adapter.go +++ b/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorch_multikueue_adapter.go @@ -18,70 +18,40 @@ package pytorchjob import ( "context" - "errors" - "fmt" kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" - "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" - kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" - "sigs.k8s.io/kueue/pkg/controller/constants" "sigs.k8s.io/kueue/pkg/controller/jobframework" + kfcommon "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/common" "sigs.k8s.io/kueue/pkg/util/api" - clientutil "sigs.k8s.io/kueue/pkg/util/client" ) type multikueueAdapter struct{} var _ jobframework.MultiKueueAdapter = (*multikueueAdapter)(nil) +var _ jobframework.UpdateRemoteJob = (*multikueueAdapter)(nil) -func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Client, remoteClient client.Client, key types.NamespacedName, workloadName, origin string) error { - localJob := kftraining.PyTorchJob{} - err := localClient.Get(ctx, key, &localJob) - if err != nil { - return err - } - - remoteJob := &kftraining.PyTorchJob{} - err = remoteClient.Get(ctx, key, remoteJob) - if client.IgnoreNotFound(err) != nil { - return err - } - - // if the remote exists, just copy the status - if err == nil { - return clientutil.PatchStatus(ctx, localClient, &localJob, func() (bool, error) { - localJob.Status = remoteJob.Status - return true, nil - }) - } - - remoteJob = &kftraining.PyTorchJob{ - ObjectMeta: api.CloneObjectMetaForCreation(&localJob.ObjectMeta), - Spec: *localJob.Spec.DeepCopy(), - } +func (b *multikueueAdapter) UpdateRemoteJobStatus(localJob, remoteJob interface{}) { + localJob.(*kftraining.PyTorchJob).Status = remoteJob.(*kftraining.PyTorchJob).Status +} - // add the prebuilt workload - if remoteJob.Labels == nil { - remoteJob.Labels = make(map[string]string, 2) +func (b *multikueueAdapter) UpdateRemoteJobSpec(localJob, remoteJob interface{}) { + *remoteJob.(*kftraining.PyTorchJob) = kftraining.PyTorchJob{ + ObjectMeta: api.CloneObjectMetaForCreation(&localJob.(*kftraining.PyTorchJob).ObjectMeta), + Spec: *localJob.(*kftraining.PyTorchJob).Spec.DeepCopy(), } - remoteJob.Labels[constants.PrebuiltWorkloadLabel] = workloadName - remoteJob.Labels[kueuealpha.MultiKueueOriginLabel] = origin +} - return remoteClient.Create(ctx, remoteJob) +func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Client, remoteClient client.Client, key types.NamespacedName, workloadName, origin string) error { + return kfcommon.SyncJob[*kftraining.PyTorchJob](ctx, localClient, remoteClient, key, workloadName, origin, b) } func (b *multikueueAdapter) DeleteRemoteObject(ctx context.Context, remoteClient client.Client, key types.NamespacedName) error { - job := kftraining.PyTorchJob{} - err := remoteClient.Get(ctx, key, &job) - if err != nil { - return client.IgnoreNotFound(err) - } - return client.IgnoreNotFound(remoteClient.Delete(ctx, &job)) + return kfcommon.DeleteRemoteObject[*kftraining.PyTorchJob](ctx, remoteClient, key) } func (b *multikueueAdapter) KeepAdmissionCheckPending() bool { @@ -103,15 +73,5 @@ func (*multikueueAdapter) GetEmptyList() client.ObjectList { } func (*multikueueAdapter) WorkloadKeyFor(o runtime.Object) (types.NamespacedName, error) { - pyTorchJob, isPyTorchJob := o.(*kftraining.PyTorchJob) - if !isPyTorchJob { - return types.NamespacedName{}, errors.New("not a PyTorchJob") - } - - prebuiltWl, hasPrebuiltWorkload := pyTorchJob.Labels[constants.PrebuiltWorkloadLabel] - if !hasPrebuiltWorkload { - return types.NamespacedName{}, fmt.Errorf("no prebuilt workload found for PyTorchJob: %s", klog.KObj(pyTorchJob)) - } - - return types.NamespacedName{Name: prebuiltWl, Namespace: pyTorchJob.Namespace}, nil + return kfcommon.WorkloadKeyFor[*kftraining.PyTorchJob](o, kftraining.PyTorchJobKind) } diff --git a/pkg/controller/jobs/kubeflow/jobs/tfjob/tfjob_multikueue_adapter.go b/pkg/controller/jobs/kubeflow/jobs/tfjob/tfjob_multikueue_adapter.go index 4c81578ef8..a33cbcb87a 100644 --- a/pkg/controller/jobs/kubeflow/jobs/tfjob/tfjob_multikueue_adapter.go +++ b/pkg/controller/jobs/kubeflow/jobs/tfjob/tfjob_multikueue_adapter.go @@ -18,70 +18,40 @@ package tfjob import ( "context" - "errors" - "fmt" kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" - "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" - kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" - "sigs.k8s.io/kueue/pkg/controller/constants" "sigs.k8s.io/kueue/pkg/controller/jobframework" + kfcommon "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/common" "sigs.k8s.io/kueue/pkg/util/api" - clientutil "sigs.k8s.io/kueue/pkg/util/client" ) type multikueueAdapter struct{} var _ jobframework.MultiKueueAdapter = (*multikueueAdapter)(nil) +var _ jobframework.UpdateRemoteJob = (*multikueueAdapter)(nil) -func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Client, remoteClient client.Client, key types.NamespacedName, workloadName, origin string) error { - localJob := kftraining.TFJob{} - err := localClient.Get(ctx, key, &localJob) - if err != nil { - return err - } - - remoteJob := &kftraining.TFJob{} - err = remoteClient.Get(ctx, key, remoteJob) - if client.IgnoreNotFound(err) != nil { - return err - } - - // if the remote exists, just copy the status - if err == nil { - return clientutil.PatchStatus(ctx, localClient, &localJob, func() (bool, error) { - localJob.Status = remoteJob.Status - return true, nil - }) - } - - remoteJob = &kftraining.TFJob{ - ObjectMeta: api.CloneObjectMetaForCreation(&localJob.ObjectMeta), - Spec: *localJob.Spec.DeepCopy(), - } +func (b *multikueueAdapter) UpdateRemoteJobStatus(localJob, remoteJob interface{}) { + localJob.(*kftraining.TFJob).Status = remoteJob.(*kftraining.TFJob).Status +} - // add the prebuilt workload - if remoteJob.Labels == nil { - remoteJob.Labels = make(map[string]string, 2) +func (b *multikueueAdapter) UpdateRemoteJobSpec(localJob, remoteJob interface{}) { + *remoteJob.(*kftraining.TFJob) = kftraining.TFJob{ + ObjectMeta: api.CloneObjectMetaForCreation(&localJob.(*kftraining.TFJob).ObjectMeta), + Spec: *localJob.(*kftraining.TFJob).Spec.DeepCopy(), } - remoteJob.Labels[constants.PrebuiltWorkloadLabel] = workloadName - remoteJob.Labels[kueuealpha.MultiKueueOriginLabel] = origin +} - return remoteClient.Create(ctx, remoteJob) +func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Client, remoteClient client.Client, key types.NamespacedName, workloadName, origin string) error { + return kfcommon.SyncJob[*kftraining.TFJob](ctx, localClient, remoteClient, key, workloadName, origin, b) } func (b *multikueueAdapter) DeleteRemoteObject(ctx context.Context, remoteClient client.Client, key types.NamespacedName) error { - job := kftraining.TFJob{} - err := remoteClient.Get(ctx, key, &job) - if err != nil { - return client.IgnoreNotFound(err) - } - return client.IgnoreNotFound(remoteClient.Delete(ctx, &job)) + return kfcommon.DeleteRemoteObject[*kftraining.TFJob](ctx, remoteClient, key) } func (b *multikueueAdapter) KeepAdmissionCheckPending() bool { @@ -103,15 +73,5 @@ func (*multikueueAdapter) GetEmptyList() client.ObjectList { } func (*multikueueAdapter) WorkloadKeyFor(o runtime.Object) (types.NamespacedName, error) { - tfJob, isTfJob := o.(*kftraining.TFJob) - if !isTfJob { - return types.NamespacedName{}, errors.New("not a TFJob") - } - - prebuiltWl, hasPrebuiltWorkload := tfJob.Labels[constants.PrebuiltWorkloadLabel] - if !hasPrebuiltWorkload { - return types.NamespacedName{}, fmt.Errorf("no prebuilt workload found for TFJob: %s", klog.KObj(tfJob)) - } - - return types.NamespacedName{Name: prebuiltWl, Namespace: tfJob.Namespace}, nil + return kfcommon.WorkloadKeyFor[*kftraining.TFJob](o, kftraining.TFJobKind) } diff --git a/pkg/controller/jobs/kubeflow/jobs/xgboostjob/xgboostjob_multikueue_adapter.go b/pkg/controller/jobs/kubeflow/jobs/xgboostjob/xgboostjob_multikueue_adapter.go index b5b841f0bc..14aaff4d18 100644 --- a/pkg/controller/jobs/kubeflow/jobs/xgboostjob/xgboostjob_multikueue_adapter.go +++ b/pkg/controller/jobs/kubeflow/jobs/xgboostjob/xgboostjob_multikueue_adapter.go @@ -18,70 +18,40 @@ package xgboostjob import ( "context" - "errors" - "fmt" kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" - "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" - kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" - "sigs.k8s.io/kueue/pkg/controller/constants" "sigs.k8s.io/kueue/pkg/controller/jobframework" + kfcommon "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/common" "sigs.k8s.io/kueue/pkg/util/api" - clientutil "sigs.k8s.io/kueue/pkg/util/client" ) type multikueueAdapter struct{} var _ jobframework.MultiKueueAdapter = (*multikueueAdapter)(nil) +var _ jobframework.UpdateRemoteJob = (*multikueueAdapter)(nil) -func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Client, remoteClient client.Client, key types.NamespacedName, workloadName, origin string) error { - localJob := kftraining.XGBoostJob{} - err := localClient.Get(ctx, key, &localJob) - if err != nil { - return err - } - - remoteJob := &kftraining.XGBoostJob{} - err = remoteClient.Get(ctx, key, remoteJob) - if client.IgnoreNotFound(err) != nil { - return err - } - - // if the remote exists, just copy the status - if err == nil { - return clientutil.PatchStatus(ctx, localClient, &localJob, func() (bool, error) { - localJob.Status = remoteJob.Status - return true, nil - }) - } - - remoteJob = &kftraining.XGBoostJob{ - ObjectMeta: api.CloneObjectMetaForCreation(&localJob.ObjectMeta), - Spec: *localJob.Spec.DeepCopy(), - } +func (b *multikueueAdapter) UpdateRemoteJobStatus(localJob, remoteJob interface{}) { + localJob.(*kftraining.XGBoostJob).Status = remoteJob.(*kftraining.XGBoostJob).Status +} - // add the prebuilt workload - if remoteJob.Labels == nil { - remoteJob.Labels = make(map[string]string, 2) +func (b *multikueueAdapter) UpdateRemoteJobSpec(localJob, remoteJob interface{}) { + *remoteJob.(*kftraining.XGBoostJob) = kftraining.XGBoostJob{ + ObjectMeta: api.CloneObjectMetaForCreation(&localJob.(*kftraining.XGBoostJob).ObjectMeta), + Spec: *localJob.(*kftraining.XGBoostJob).Spec.DeepCopy(), } - remoteJob.Labels[constants.PrebuiltWorkloadLabel] = workloadName - remoteJob.Labels[kueuealpha.MultiKueueOriginLabel] = origin +} - return remoteClient.Create(ctx, remoteJob) +func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Client, remoteClient client.Client, key types.NamespacedName, workloadName, origin string) error { + return kfcommon.SyncJob[*kftraining.XGBoostJob](ctx, localClient, remoteClient, key, workloadName, origin, b) } func (b *multikueueAdapter) DeleteRemoteObject(ctx context.Context, remoteClient client.Client, key types.NamespacedName) error { - job := kftraining.XGBoostJob{} - err := remoteClient.Get(ctx, key, &job) - if err != nil { - return client.IgnoreNotFound(err) - } - return client.IgnoreNotFound(remoteClient.Delete(ctx, &job)) + return kfcommon.DeleteRemoteObject[*kftraining.XGBoostJob](ctx, remoteClient, key) } func (b *multikueueAdapter) KeepAdmissionCheckPending() bool { @@ -103,15 +73,5 @@ func (*multikueueAdapter) GetEmptyList() client.ObjectList { } func (*multikueueAdapter) WorkloadKeyFor(o runtime.Object) (types.NamespacedName, error) { - xgBoostJob, isXgBoostJob := o.(*kftraining.XGBoostJob) - if !isXgBoostJob { - return types.NamespacedName{}, errors.New("not a XgBoostJob") - } - - prebuiltWl, hasPrebuiltWorkload := xgBoostJob.Labels[constants.PrebuiltWorkloadLabel] - if !hasPrebuiltWorkload { - return types.NamespacedName{}, fmt.Errorf("no prebuilt workload found for XgBoostJob: %s", klog.KObj(xgBoostJob)) - } - - return types.NamespacedName{Name: prebuiltWl, Namespace: xgBoostJob.Namespace}, nil + return kfcommon.WorkloadKeyFor[*kftraining.XGBoostJob](o, kftraining.XGBoostJobKind) } diff --git a/test/integration/multikueue/multikueue_test.go b/test/integration/multikueue/multikueue_test.go index 856cbb453e..d339ead72e 100644 --- a/test/integration/multikueue/multikueue_test.go +++ b/test/integration/multikueue/multikueue_test.go @@ -897,7 +897,7 @@ var _ = ginkgo.Describe("Multikueue", ginkgo.Ordered, ginkgo.ContinueOnFailure, Succeeded: 1, }, })) - }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) }) ginkgo.By("finishing the worker TFJob, the manager's wl is marked as finished and the worker2 wl removed", func() { From f065e3fa3351a425d8019eff9fad89f6d9cbd297 Mon Sep 17 00:00:00 2001 From: Michal Szadkowski Date: Tue, 13 Aug 2024 15:57:40 +0200 Subject: [PATCH 2/4] make generic adapter to be specialized by job type --- pkg/controller/jobframework/interface.go | 5 -- pkg/controller/jobs/kubeflow/common/common.go | 65 ++++++++++++++++--- .../jobs/paddlejob/paddlejob_controller.go | 3 +- .../paddlejob/paddlejob_multikueue_adapter.go | 50 +++----------- .../paddlejob_multikueue_adapter_test.go | 13 ++-- .../pytorchjob/pytorch_multikueue_adapter.go | 50 +++----------- .../pytorch_multikueue_adapter_test.go | 12 ++-- .../jobs/pytorchjob/pytorchjob_controller.go | 3 +- .../kubeflow/jobs/tfjob/tfjob_controller.go | 3 +- .../jobs/tfjob/tfjob_multikueue_adapter.go | 50 +++----------- .../tfjob/tfjob_multikueue_adapter_test.go | 12 ++-- .../jobs/xgboostjob/xgboostjob_controller.go | 3 +- .../xgboostjob_multikueue_adapter.go | 50 +++----------- .../xgboostjob_multikueue_adapter_test.go | 12 ++-- 14 files changed, 124 insertions(+), 207 deletions(-) diff --git a/pkg/controller/jobframework/interface.go b/pkg/controller/jobframework/interface.go index 8445f439b8..32193d1714 100644 --- a/pkg/controller/jobframework/interface.go +++ b/pkg/controller/jobframework/interface.go @@ -172,11 +172,6 @@ type MultiKueueAdapter interface { GVK() schema.GroupVersionKind } -type UpdateRemoteJob interface { - UpdateRemoteJobStatus(localJob, remoteJob interface{}) - UpdateRemoteJobSpec(localJob, remoteJob interface{}) -} - // MultiKueueWatcher optional interface that can be implemented by a MultiKueueAdapter // to receive job related watch events from the worker cluster. // If not implemented, MultiKueue will only receive events related to the job's workload. diff --git a/pkg/controller/jobs/kubeflow/common/common.go b/pkg/controller/jobs/kubeflow/common/common.go index 07472a34bd..6071971125 100644 --- a/pkg/controller/jobs/kubeflow/common/common.go +++ b/pkg/controller/jobs/kubeflow/common/common.go @@ -6,10 +6,12 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" + kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" "sigs.k8s.io/kueue/pkg/controller/constants" "sigs.k8s.io/kueue/pkg/controller/jobframework" @@ -22,14 +24,53 @@ type objAsPtr[T any] interface { *T } -func SyncJob[PtrT objAsPtr[T], T any]( +type adapter[PtrT objAsPtr[T], T any] struct { + copySpec func(dst, src PtrT) + copyStatus func(dst, src PtrT) + emptyList func() client.ObjectList + gwk schema.GroupVersionKind + typeName string +} + +type fullInterface interface { + jobframework.MultiKueueAdapter + jobframework.MultiKueueWatcher +} + +func NewAdapter[PtrT objAsPtr[T], T any]( + copySpec func(dst, src PtrT), + copyStatus func(dst, src PtrT), + emptyList func() client.ObjectList, + gwk schema.GroupVersionKind, + typeName string, +) fullInterface { + return &adapter[PtrT, T]{ + copySpec: copySpec, + copyStatus: copyStatus, + emptyList: emptyList, + gwk: gwk, + typeName: typeName, + } +} + +func (a adapter[PtrT, T]) GVK() schema.GroupVersionKind { + return a.gwk +} + +func (a adapter[PtrT, T]) KeepAdmissionCheckPending() bool { + return false +} + +func (a adapter[PtrT, T]) IsJobManagedByKueue(_ context.Context, _ client.Client, _ types.NamespacedName) (bool, string, error) { + return true, "", nil +} + +func (a adapter[PtrT, T]) SyncJob( ctx context.Context, localClient client.Client, remoteClient client.Client, key types.NamespacedName, - workloadName, origin string, - b jobframework.UpdateRemoteJob) error { - + workloadName, origin string) error { localJob := PtrT(new(T)) err := localClient.Get(ctx, key, localJob) if err != nil { @@ -45,13 +86,13 @@ func SyncJob[PtrT objAsPtr[T], T any]( if err == nil { return clientutil.PatchStatus(ctx, localClient, localJob, func() (bool, error) { // if the remote exists, just copy the status - b.UpdateRemoteJobStatus(localJob, remoteJob) + a.copyStatus(localJob, remoteJob) return true, nil }) } remoteJob = PtrT(new(T)) - b.UpdateRemoteJobSpec(localJob, remoteJob) + a.copySpec(remoteJob, localJob) // add the prebuilt workload labels := remoteJob.GetLabels() @@ -65,7 +106,7 @@ func SyncJob[PtrT objAsPtr[T], T any]( return remoteClient.Create(ctx, remoteJob) } -func DeleteRemoteObject[PtrT objAsPtr[T], T any](ctx context.Context, remoteClient client.Client, key types.NamespacedName) error { +func (a adapter[PtrT, T]) DeleteRemoteObject(ctx context.Context, remoteClient client.Client, key types.NamespacedName) error { job := PtrT(new(T)) err := remoteClient.Get(ctx, key, job) if err != nil { @@ -74,15 +115,19 @@ func DeleteRemoteObject[PtrT objAsPtr[T], T any](ctx context.Context, remoteClie return client.IgnoreNotFound(remoteClient.Delete(ctx, job)) } -func WorkloadKeyFor[PtrT objAsPtr[T], T any](o runtime.Object, JobName string) (types.NamespacedName, error) { +func (a adapter[PtrT, T]) GetEmptyList() client.ObjectList { + return a.emptyList() +} + +func (a adapter[PtrT, T]) WorkloadKeyFor(o runtime.Object) (types.NamespacedName, error) { job, isTheJob := o.(PtrT) if !isTheJob { - return types.NamespacedName{}, fmt.Errorf("not a %s", JobName) + return types.NamespacedName{}, fmt.Errorf("not a %s", a.typeName) } prebuiltWl, hasPrebuiltWorkload := job.GetLabels()[constants.PrebuiltWorkloadLabel] if !hasPrebuiltWorkload { - return types.NamespacedName{}, fmt.Errorf("no prebuilt workload found for %s: %s", JobName, klog.KObj(job)) + return types.NamespacedName{}, fmt.Errorf("no prebuilt workload found for %s: %s", a.typeName, klog.KObj(job)) } return types.NamespacedName{Name: prebuiltWl, Namespace: job.GetNamespace()}, nil diff --git a/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_controller.go b/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_controller.go index e53957d1d4..4181b6f82f 100644 --- a/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_controller.go +++ b/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_controller.go @@ -29,6 +29,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/kueue/pkg/controller/jobframework" + kfcommon "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/common" "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/kubeflowjob" ) @@ -45,7 +46,7 @@ func init() { JobType: &kftraining.PaddleJob{}, AddToScheme: kftraining.AddToScheme, IsManagingObjectsOwner: isPaddleJob, - MultiKueueAdapter: &multikueueAdapter{}, + MultiKueueAdapter: kfcommon.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.PaddleJobKind), })) } diff --git a/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_multikueue_adapter.go b/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_multikueue_adapter.go index 8abf0247af..3e0c834e23 100644 --- a/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_multikueue_adapter.go +++ b/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_multikueue_adapter.go @@ -17,12 +17,7 @@ limitations under the License. package paddlejob import ( - "context" - kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/kueue/pkg/controller/jobframework" @@ -30,48 +25,19 @@ import ( "sigs.k8s.io/kueue/pkg/util/api" ) -type multikueueAdapter struct{} - -var _ jobframework.MultiKueueAdapter = (*multikueueAdapter)(nil) -var _ jobframework.UpdateRemoteJob = (*multikueueAdapter)(nil) +var _ jobframework.MultiKueueAdapter = kfcommon.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.PaddleJobKind) -func (b *multikueueAdapter) UpdateRemoteJobStatus(localJob, remoteJob interface{}) { - localJob.(*kftraining.PaddleJob).Status = remoteJob.(*kftraining.PaddleJob).Status +func copyJobStatus(dst, src *kftraining.PaddleJob) { + dst.Status = src.Status } -func (b *multikueueAdapter) UpdateRemoteJobSpec(localJob, remoteJob interface{}) { - *remoteJob.(*kftraining.PaddleJob) = kftraining.PaddleJob{ - ObjectMeta: api.CloneObjectMetaForCreation(&localJob.(*kftraining.PaddleJob).ObjectMeta), - Spec: *localJob.(*kftraining.PaddleJob).Spec.DeepCopy(), +func copyJobSpec(dst, src *kftraining.PaddleJob) { + *dst = kftraining.PaddleJob{ + ObjectMeta: api.CloneObjectMetaForCreation(&src.ObjectMeta), + Spec: *src.Spec.DeepCopy(), } } -func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Client, remoteClient client.Client, key types.NamespacedName, workloadName, origin string) error { - return kfcommon.SyncJob[*kftraining.PaddleJob](ctx, localClient, remoteClient, key, workloadName, origin, b) -} - -func (b *multikueueAdapter) DeleteRemoteObject(ctx context.Context, remoteClient client.Client, key types.NamespacedName) error { - return kfcommon.DeleteRemoteObject[*kftraining.PaddleJob](ctx, remoteClient, key) -} - -func (b *multikueueAdapter) KeepAdmissionCheckPending() bool { - return false -} - -func (b *multikueueAdapter) IsJobManagedByKueue(context.Context, client.Client, types.NamespacedName) (bool, string, error) { - return true, "", nil -} - -func (b *multikueueAdapter) GVK() schema.GroupVersionKind { - return gvk -} - -var _ jobframework.MultiKueueWatcher = (*multikueueAdapter)(nil) - -func (*multikueueAdapter) GetEmptyList() client.ObjectList { +func getEmptyList() client.ObjectList { return &kftraining.PaddleJobList{} } - -func (*multikueueAdapter) WorkloadKeyFor(o runtime.Object) (types.NamespacedName, error) { - return kfcommon.WorkloadKeyFor[*kftraining.PaddleJob](o, kftraining.PaddleJobKind) -} diff --git a/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_multikueue_adapter_test.go b/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_multikueue_adapter_test.go index 2c71041ecf..35855c042e 100644 --- a/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_multikueue_adapter_test.go +++ b/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_multikueue_adapter_test.go @@ -31,6 +31,8 @@ import ( kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" "sigs.k8s.io/kueue/pkg/controller/constants" + "sigs.k8s.io/kueue/pkg/controller/jobframework" + kfcommon "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/common" "sigs.k8s.io/kueue/pkg/util/slices" utiltesting "sigs.k8s.io/kueue/pkg/util/testing" kfutiltesting "sigs.k8s.io/kueue/pkg/util/testingjobs/paddlejob" @@ -52,7 +54,7 @@ func TestMultikueueAdapter(t *testing.T) { managersPaddleJobs []kftraining.PaddleJob workerPaddleJobs []kftraining.PaddleJob - operation func(ctx context.Context, adapter *multikueueAdapter, managerClient, workerClient client.Client) error + operation func(ctx context.Context, adapter jobframework.MultiKueueAdapter, managerClient, workerClient client.Client) error wantError error wantManagersPaddleJobs []kftraining.PaddleJob @@ -62,7 +64,8 @@ func TestMultikueueAdapter(t *testing.T) { managersPaddleJobs: []kftraining.PaddleJob{ *paddleJobBuilder.Clone().Obj(), }, - operation: func(ctx context.Context, adapter *multikueueAdapter, managerClient, workerClient client.Client) error { + + operation: func(ctx context.Context, adapter jobframework.MultiKueueAdapter, managerClient, workerClient client.Client) error { return adapter.SyncJob(ctx, managerClient, workerClient, types.NamespacedName{Name: "paddlejob1", Namespace: TestNamespace}, "wl1", "origin1") }, @@ -87,7 +90,7 @@ func TestMultikueueAdapter(t *testing.T) { StatusConditions(kftraining.JobCondition{Type: kftraining.JobSucceeded, Status: corev1.ConditionTrue}). Obj(), }, - operation: func(ctx context.Context, adapter *multikueueAdapter, managerClient, workerClient client.Client) error { + operation: func(ctx context.Context, adapter jobframework.MultiKueueAdapter, managerClient, workerClient client.Client) error { return adapter.SyncJob(ctx, managerClient, workerClient, types.NamespacedName{Name: "paddlejob1", Namespace: TestNamespace}, "wl1", "origin1") }, @@ -111,7 +114,7 @@ func TestMultikueueAdapter(t *testing.T) { Label(kueuealpha.MultiKueueOriginLabel, "origin1"). Obj(), }, - operation: func(ctx context.Context, adapter *multikueueAdapter, managerClient, workerClient client.Client) error { + operation: func(ctx context.Context, adapter jobframework.MultiKueueAdapter, managerClient, workerClient client.Client) error { return adapter.DeleteRemoteObject(ctx, workerClient, types.NamespacedName{Name: "paddlejob1", Namespace: TestNamespace}) }, }, @@ -129,7 +132,7 @@ func TestMultikueueAdapter(t *testing.T) { ctx, _ := utiltesting.ContextWithLog(t) - adapter := &multikueueAdapter{} + adapter := kfcommon.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.PaddleJobKind) gotErr := tc.operation(ctx, adapter, managerClient, workerClient) diff --git a/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorch_multikueue_adapter.go b/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorch_multikueue_adapter.go index b4c58d24bc..90cf1f9e5d 100644 --- a/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorch_multikueue_adapter.go +++ b/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorch_multikueue_adapter.go @@ -17,12 +17,7 @@ limitations under the License. package pytorchjob import ( - "context" - kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/kueue/pkg/controller/jobframework" @@ -30,48 +25,19 @@ import ( "sigs.k8s.io/kueue/pkg/util/api" ) -type multikueueAdapter struct{} - -var _ jobframework.MultiKueueAdapter = (*multikueueAdapter)(nil) -var _ jobframework.UpdateRemoteJob = (*multikueueAdapter)(nil) +var _ jobframework.MultiKueueAdapter = kfcommon.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.PyTorchJobKind) -func (b *multikueueAdapter) UpdateRemoteJobStatus(localJob, remoteJob interface{}) { - localJob.(*kftraining.PyTorchJob).Status = remoteJob.(*kftraining.PyTorchJob).Status +func copyJobStatus(dst, src *kftraining.PyTorchJob) { + dst.Status = src.Status } -func (b *multikueueAdapter) UpdateRemoteJobSpec(localJob, remoteJob interface{}) { - *remoteJob.(*kftraining.PyTorchJob) = kftraining.PyTorchJob{ - ObjectMeta: api.CloneObjectMetaForCreation(&localJob.(*kftraining.PyTorchJob).ObjectMeta), - Spec: *localJob.(*kftraining.PyTorchJob).Spec.DeepCopy(), +func copyJobSpec(dst, src *kftraining.PyTorchJob) { + *dst = kftraining.PyTorchJob{ + ObjectMeta: api.CloneObjectMetaForCreation(&src.ObjectMeta), + Spec: *src.Spec.DeepCopy(), } } -func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Client, remoteClient client.Client, key types.NamespacedName, workloadName, origin string) error { - return kfcommon.SyncJob[*kftraining.PyTorchJob](ctx, localClient, remoteClient, key, workloadName, origin, b) -} - -func (b *multikueueAdapter) DeleteRemoteObject(ctx context.Context, remoteClient client.Client, key types.NamespacedName) error { - return kfcommon.DeleteRemoteObject[*kftraining.PyTorchJob](ctx, remoteClient, key) -} - -func (b *multikueueAdapter) KeepAdmissionCheckPending() bool { - return false -} - -func (b *multikueueAdapter) IsJobManagedByKueue(context.Context, client.Client, types.NamespacedName) (bool, string, error) { - return true, "", nil -} - -func (b *multikueueAdapter) GVK() schema.GroupVersionKind { - return gvk -} - -var _ jobframework.MultiKueueWatcher = (*multikueueAdapter)(nil) - -func (*multikueueAdapter) GetEmptyList() client.ObjectList { +func getEmptyList() client.ObjectList { return &kftraining.PyTorchJobList{} } - -func (*multikueueAdapter) WorkloadKeyFor(o runtime.Object) (types.NamespacedName, error) { - return kfcommon.WorkloadKeyFor[*kftraining.PyTorchJob](o, kftraining.PyTorchJobKind) -} diff --git a/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorch_multikueue_adapter_test.go b/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorch_multikueue_adapter_test.go index 8ae5db1a41..1c6a8a1711 100644 --- a/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorch_multikueue_adapter_test.go +++ b/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorch_multikueue_adapter_test.go @@ -32,6 +32,8 @@ import ( kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" "sigs.k8s.io/kueue/pkg/controller/constants" + "sigs.k8s.io/kueue/pkg/controller/jobframework" + kfcommon "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/common" "sigs.k8s.io/kueue/pkg/util/slices" utiltesting "sigs.k8s.io/kueue/pkg/util/testing" kfutiltesting "sigs.k8s.io/kueue/pkg/util/testingjobs/pytorchjob" @@ -53,7 +55,7 @@ func TestMultikueueAdapter(t *testing.T) { managersPyTorchJobs []kftraining.PyTorchJob workerPyTorchJobs []kftraining.PyTorchJob - operation func(ctx context.Context, adapter *multikueueAdapter, managerClient, workerClient client.Client) error + operation func(ctx context.Context, adapter jobframework.MultiKueueAdapter, managerClient, workerClient client.Client) error wantError error wantManagersPyTorchJobs []kftraining.PyTorchJob @@ -63,7 +65,7 @@ func TestMultikueueAdapter(t *testing.T) { managersPyTorchJobs: []kftraining.PyTorchJob{ *pyTorchJobBuilder.Clone().Obj(), }, - operation: func(ctx context.Context, adapter *multikueueAdapter, managerClient, workerClient client.Client) error { + operation: func(ctx context.Context, adapter jobframework.MultiKueueAdapter, managerClient, workerClient client.Client) error { return adapter.SyncJob(ctx, managerClient, workerClient, types.NamespacedName{Name: "pytorchjob1", Namespace: TestNamespace}, "wl1", "origin1") }, @@ -88,7 +90,7 @@ func TestMultikueueAdapter(t *testing.T) { StatusConditions(kftraining.JobCondition{Type: kftraining.JobSucceeded, Status: corev1.ConditionTrue}). Obj(), }, - operation: func(ctx context.Context, adapter *multikueueAdapter, managerClient, workerClient client.Client) error { + operation: func(ctx context.Context, adapter jobframework.MultiKueueAdapter, managerClient, workerClient client.Client) error { return adapter.SyncJob(ctx, managerClient, workerClient, types.NamespacedName{Name: "pytorchjob1", Namespace: TestNamespace}, "wl1", "origin1") }, @@ -112,7 +114,7 @@ func TestMultikueueAdapter(t *testing.T) { Label(kueuealpha.MultiKueueOriginLabel, "origin1"). Obj(), }, - operation: func(ctx context.Context, adapter *multikueueAdapter, managerClient, workerClient client.Client) error { + operation: func(ctx context.Context, adapter jobframework.MultiKueueAdapter, managerClient, workerClient client.Client) error { return adapter.DeleteRemoteObject(ctx, workerClient, types.NamespacedName{Name: "pytorchjob1", Namespace: TestNamespace}) }, }, @@ -130,7 +132,7 @@ func TestMultikueueAdapter(t *testing.T) { ctx, _ := utiltesting.ContextWithLog(t) - adapter := &multikueueAdapter{} + adapter := kfcommon.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.PyTorchJobKind) gotErr := tc.operation(ctx, adapter, managerClient, workerClient) diff --git a/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorchjob_controller.go b/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorchjob_controller.go index 1bfc88cb95..b66f50a147 100644 --- a/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorchjob_controller.go +++ b/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorchjob_controller.go @@ -29,6 +29,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/kueue/pkg/controller/jobframework" + kfcommon "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/common" "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/kubeflowjob" ) @@ -45,7 +46,7 @@ func init() { JobType: &kftraining.PyTorchJob{}, AddToScheme: kftraining.AddToScheme, IsManagingObjectsOwner: isPyTorchJob, - MultiKueueAdapter: &multikueueAdapter{}, + MultiKueueAdapter: kfcommon.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.PyTorchJobKind), })) } diff --git a/pkg/controller/jobs/kubeflow/jobs/tfjob/tfjob_controller.go b/pkg/controller/jobs/kubeflow/jobs/tfjob/tfjob_controller.go index e3932ab393..e14099ac1f 100644 --- a/pkg/controller/jobs/kubeflow/jobs/tfjob/tfjob_controller.go +++ b/pkg/controller/jobs/kubeflow/jobs/tfjob/tfjob_controller.go @@ -29,6 +29,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/kueue/pkg/controller/jobframework" + kfcommon "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/common" "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/kubeflowjob" ) @@ -45,7 +46,7 @@ func init() { JobType: &kftraining.TFJob{}, AddToScheme: kftraining.AddToScheme, IsManagingObjectsOwner: isTFJob, - MultiKueueAdapter: &multikueueAdapter{}, + MultiKueueAdapter: kfcommon.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.TFJobKind), })) } diff --git a/pkg/controller/jobs/kubeflow/jobs/tfjob/tfjob_multikueue_adapter.go b/pkg/controller/jobs/kubeflow/jobs/tfjob/tfjob_multikueue_adapter.go index a33cbcb87a..a2d619c9b6 100644 --- a/pkg/controller/jobs/kubeflow/jobs/tfjob/tfjob_multikueue_adapter.go +++ b/pkg/controller/jobs/kubeflow/jobs/tfjob/tfjob_multikueue_adapter.go @@ -17,12 +17,7 @@ limitations under the License. package tfjob import ( - "context" - kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/kueue/pkg/controller/jobframework" @@ -30,48 +25,19 @@ import ( "sigs.k8s.io/kueue/pkg/util/api" ) -type multikueueAdapter struct{} - -var _ jobframework.MultiKueueAdapter = (*multikueueAdapter)(nil) -var _ jobframework.UpdateRemoteJob = (*multikueueAdapter)(nil) +var _ jobframework.MultiKueueAdapter = kfcommon.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.TFJobKind) -func (b *multikueueAdapter) UpdateRemoteJobStatus(localJob, remoteJob interface{}) { - localJob.(*kftraining.TFJob).Status = remoteJob.(*kftraining.TFJob).Status +func copyJobStatus(dst, src *kftraining.TFJob) { + dst.Status = src.Status } -func (b *multikueueAdapter) UpdateRemoteJobSpec(localJob, remoteJob interface{}) { - *remoteJob.(*kftraining.TFJob) = kftraining.TFJob{ - ObjectMeta: api.CloneObjectMetaForCreation(&localJob.(*kftraining.TFJob).ObjectMeta), - Spec: *localJob.(*kftraining.TFJob).Spec.DeepCopy(), +func copyJobSpec(dst, src *kftraining.TFJob) { + *dst = kftraining.TFJob{ + ObjectMeta: api.CloneObjectMetaForCreation(&src.ObjectMeta), + Spec: *src.Spec.DeepCopy(), } } -func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Client, remoteClient client.Client, key types.NamespacedName, workloadName, origin string) error { - return kfcommon.SyncJob[*kftraining.TFJob](ctx, localClient, remoteClient, key, workloadName, origin, b) -} - -func (b *multikueueAdapter) DeleteRemoteObject(ctx context.Context, remoteClient client.Client, key types.NamespacedName) error { - return kfcommon.DeleteRemoteObject[*kftraining.TFJob](ctx, remoteClient, key) -} - -func (b *multikueueAdapter) KeepAdmissionCheckPending() bool { - return false -} - -func (b *multikueueAdapter) IsJobManagedByKueue(ctx context.Context, c client.Client, key types.NamespacedName) (bool, string, error) { - return true, "", nil -} - -func (b *multikueueAdapter) GVK() schema.GroupVersionKind { - return gvk -} - -var _ jobframework.MultiKueueWatcher = (*multikueueAdapter)(nil) - -func (*multikueueAdapter) GetEmptyList() client.ObjectList { +func getEmptyList() client.ObjectList { return &kftraining.TFJobList{} } - -func (*multikueueAdapter) WorkloadKeyFor(o runtime.Object) (types.NamespacedName, error) { - return kfcommon.WorkloadKeyFor[*kftraining.TFJob](o, kftraining.TFJobKind) -} diff --git a/pkg/controller/jobs/kubeflow/jobs/tfjob/tfjob_multikueue_adapter_test.go b/pkg/controller/jobs/kubeflow/jobs/tfjob/tfjob_multikueue_adapter_test.go index 69fd4d5915..e9e2bd5a05 100644 --- a/pkg/controller/jobs/kubeflow/jobs/tfjob/tfjob_multikueue_adapter_test.go +++ b/pkg/controller/jobs/kubeflow/jobs/tfjob/tfjob_multikueue_adapter_test.go @@ -31,6 +31,8 @@ import ( kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" "sigs.k8s.io/kueue/pkg/controller/constants" + "sigs.k8s.io/kueue/pkg/controller/jobframework" + kfcommon "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/common" "sigs.k8s.io/kueue/pkg/util/slices" utiltesting "sigs.k8s.io/kueue/pkg/util/testing" kfutiltesting "sigs.k8s.io/kueue/pkg/util/testingjobs/tfjob" @@ -52,7 +54,7 @@ func TestMultikueueAdapter(t *testing.T) { managersTFJobs []kftraining.TFJob workerTFJobs []kftraining.TFJob - operation func(ctx context.Context, adapter *multikueueAdapter, managerClient, workerClient client.Client) error + operation func(ctx context.Context, adapter jobframework.MultiKueueAdapter, managerClient, workerClient client.Client) error wantError error wantManagersTFJobs []kftraining.TFJob @@ -62,7 +64,7 @@ func TestMultikueueAdapter(t *testing.T) { managersTFJobs: []kftraining.TFJob{ *tfJobBuilder.Clone().Obj(), }, - operation: func(ctx context.Context, adapter *multikueueAdapter, managerClient, workerClient client.Client) error { + operation: func(ctx context.Context, adapter jobframework.MultiKueueAdapter, managerClient, workerClient client.Client) error { return adapter.SyncJob(ctx, managerClient, workerClient, types.NamespacedName{Name: "tfjob1", Namespace: TestNamespace}, "wl1", "origin1") }, @@ -87,7 +89,7 @@ func TestMultikueueAdapter(t *testing.T) { StatusConditions(kftraining.JobCondition{Type: kftraining.JobSucceeded, Status: corev1.ConditionTrue}). Obj(), }, - operation: func(ctx context.Context, adapter *multikueueAdapter, managerClient, workerClient client.Client) error { + operation: func(ctx context.Context, adapter jobframework.MultiKueueAdapter, managerClient, workerClient client.Client) error { return adapter.SyncJob(ctx, managerClient, workerClient, types.NamespacedName{Name: "tfjob1", Namespace: TestNamespace}, "wl1", "origin1") }, @@ -111,7 +113,7 @@ func TestMultikueueAdapter(t *testing.T) { Label(kueuealpha.MultiKueueOriginLabel, "origin1"). Obj(), }, - operation: func(ctx context.Context, adapter *multikueueAdapter, managerClient, workerClient client.Client) error { + operation: func(ctx context.Context, adapter jobframework.MultiKueueAdapter, managerClient, workerClient client.Client) error { return adapter.DeleteRemoteObject(ctx, workerClient, types.NamespacedName{Name: "tfjob1", Namespace: TestNamespace}) }, }, @@ -129,7 +131,7 @@ func TestMultikueueAdapter(t *testing.T) { ctx, _ := utiltesting.ContextWithLog(t) - adapter := &multikueueAdapter{} + adapter := kfcommon.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.TFJobKind) gotErr := tc.operation(ctx, adapter, managerClient, workerClient) diff --git a/pkg/controller/jobs/kubeflow/jobs/xgboostjob/xgboostjob_controller.go b/pkg/controller/jobs/kubeflow/jobs/xgboostjob/xgboostjob_controller.go index 72f624c876..c37d4ba62a 100644 --- a/pkg/controller/jobs/kubeflow/jobs/xgboostjob/xgboostjob_controller.go +++ b/pkg/controller/jobs/kubeflow/jobs/xgboostjob/xgboostjob_controller.go @@ -29,6 +29,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/kueue/pkg/controller/jobframework" + kfcommon "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/common" "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/kubeflowjob" ) @@ -45,7 +46,7 @@ func init() { JobType: &kftraining.XGBoostJob{}, AddToScheme: kftraining.AddToScheme, IsManagingObjectsOwner: isXGBoostJob, - MultiKueueAdapter: &multikueueAdapter{}, + MultiKueueAdapter: kfcommon.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.XGBoostJobKind), })) } diff --git a/pkg/controller/jobs/kubeflow/jobs/xgboostjob/xgboostjob_multikueue_adapter.go b/pkg/controller/jobs/kubeflow/jobs/xgboostjob/xgboostjob_multikueue_adapter.go index 14aaff4d18..bdf299eefa 100644 --- a/pkg/controller/jobs/kubeflow/jobs/xgboostjob/xgboostjob_multikueue_adapter.go +++ b/pkg/controller/jobs/kubeflow/jobs/xgboostjob/xgboostjob_multikueue_adapter.go @@ -17,12 +17,7 @@ limitations under the License. package xgboostjob import ( - "context" - kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/kueue/pkg/controller/jobframework" @@ -30,48 +25,19 @@ import ( "sigs.k8s.io/kueue/pkg/util/api" ) -type multikueueAdapter struct{} - -var _ jobframework.MultiKueueAdapter = (*multikueueAdapter)(nil) -var _ jobframework.UpdateRemoteJob = (*multikueueAdapter)(nil) +var _ jobframework.MultiKueueAdapter = kfcommon.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.XGBoostJobKind) -func (b *multikueueAdapter) UpdateRemoteJobStatus(localJob, remoteJob interface{}) { - localJob.(*kftraining.XGBoostJob).Status = remoteJob.(*kftraining.XGBoostJob).Status +func copyJobStatus(dst, src *kftraining.XGBoostJob) { + dst.Status = src.Status } -func (b *multikueueAdapter) UpdateRemoteJobSpec(localJob, remoteJob interface{}) { - *remoteJob.(*kftraining.XGBoostJob) = kftraining.XGBoostJob{ - ObjectMeta: api.CloneObjectMetaForCreation(&localJob.(*kftraining.XGBoostJob).ObjectMeta), - Spec: *localJob.(*kftraining.XGBoostJob).Spec.DeepCopy(), +func copyJobSpec(dst, src *kftraining.XGBoostJob) { + *dst = kftraining.XGBoostJob{ + ObjectMeta: api.CloneObjectMetaForCreation(&src.ObjectMeta), + Spec: *src.Spec.DeepCopy(), } } -func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Client, remoteClient client.Client, key types.NamespacedName, workloadName, origin string) error { - return kfcommon.SyncJob[*kftraining.XGBoostJob](ctx, localClient, remoteClient, key, workloadName, origin, b) -} - -func (b *multikueueAdapter) DeleteRemoteObject(ctx context.Context, remoteClient client.Client, key types.NamespacedName) error { - return kfcommon.DeleteRemoteObject[*kftraining.XGBoostJob](ctx, remoteClient, key) -} - -func (b *multikueueAdapter) KeepAdmissionCheckPending() bool { - return false -} - -func (b *multikueueAdapter) IsJobManagedByKueue(context.Context, client.Client, types.NamespacedName) (bool, string, error) { - return true, "", nil -} - -func (b *multikueueAdapter) GVK() schema.GroupVersionKind { - return gvk -} - -var _ jobframework.MultiKueueWatcher = (*multikueueAdapter)(nil) - -func (*multikueueAdapter) GetEmptyList() client.ObjectList { +func getEmptyList() client.ObjectList { return &kftraining.XGBoostJobList{} } - -func (*multikueueAdapter) WorkloadKeyFor(o runtime.Object) (types.NamespacedName, error) { - return kfcommon.WorkloadKeyFor[*kftraining.XGBoostJob](o, kftraining.XGBoostJobKind) -} diff --git a/pkg/controller/jobs/kubeflow/jobs/xgboostjob/xgboostjob_multikueue_adapter_test.go b/pkg/controller/jobs/kubeflow/jobs/xgboostjob/xgboostjob_multikueue_adapter_test.go index 85dbbcb6c9..3a36002809 100644 --- a/pkg/controller/jobs/kubeflow/jobs/xgboostjob/xgboostjob_multikueue_adapter_test.go +++ b/pkg/controller/jobs/kubeflow/jobs/xgboostjob/xgboostjob_multikueue_adapter_test.go @@ -31,6 +31,8 @@ import ( kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" "sigs.k8s.io/kueue/pkg/controller/constants" + "sigs.k8s.io/kueue/pkg/controller/jobframework" + kfcommon "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/common" "sigs.k8s.io/kueue/pkg/util/slices" utiltesting "sigs.k8s.io/kueue/pkg/util/testing" kfutiltesting "sigs.k8s.io/kueue/pkg/util/testingjobs/xgboostjob" @@ -52,7 +54,7 @@ func TestMultikueueAdapter(t *testing.T) { managersXGBoostJobs []kftraining.XGBoostJob workerXGBoostJobs []kftraining.XGBoostJob - operation func(ctx context.Context, adapter *multikueueAdapter, managerClient, workerClient client.Client) error + operation func(ctx context.Context, adapter jobframework.MultiKueueAdapter, managerClient, workerClient client.Client) error wantError error wantManagersXGBoostJobs []kftraining.XGBoostJob @@ -62,7 +64,7 @@ func TestMultikueueAdapter(t *testing.T) { managersXGBoostJobs: []kftraining.XGBoostJob{ *xgboostJobBuilder.Clone().Obj(), }, - operation: func(ctx context.Context, adapter *multikueueAdapter, managerClient, workerClient client.Client) error { + operation: func(ctx context.Context, adapter jobframework.MultiKueueAdapter, managerClient, workerClient client.Client) error { return adapter.SyncJob(ctx, managerClient, workerClient, types.NamespacedName{Name: "xgboostjob1", Namespace: TestNamespace}, "wl1", "origin1") }, @@ -87,7 +89,7 @@ func TestMultikueueAdapter(t *testing.T) { StatusConditions(kftraining.JobCondition{Type: kftraining.JobSucceeded, Status: corev1.ConditionTrue}). Obj(), }, - operation: func(ctx context.Context, adapter *multikueueAdapter, managerClient, workerClient client.Client) error { + operation: func(ctx context.Context, adapter jobframework.MultiKueueAdapter, managerClient, workerClient client.Client) error { return adapter.SyncJob(ctx, managerClient, workerClient, types.NamespacedName{Name: "xgboostjob1", Namespace: TestNamespace}, "wl1", "origin1") }, @@ -111,7 +113,7 @@ func TestMultikueueAdapter(t *testing.T) { Label(kueuealpha.MultiKueueOriginLabel, "origin1"). Obj(), }, - operation: func(ctx context.Context, adapter *multikueueAdapter, managerClient, workerClient client.Client) error { + operation: func(ctx context.Context, adapter jobframework.MultiKueueAdapter, managerClient, workerClient client.Client) error { return adapter.DeleteRemoteObject(ctx, workerClient, types.NamespacedName{Name: "xgboostjob1", Namespace: TestNamespace}) }, }, @@ -129,7 +131,7 @@ func TestMultikueueAdapter(t *testing.T) { ctx, _ := utiltesting.ContextWithLog(t) - adapter := &multikueueAdapter{} + adapter := kfcommon.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.XGBoostJobKind) gotErr := tc.operation(ctx, adapter, managerClient, workerClient) From 7140be89c2e50c02f4b2b1101b1fcc3daed06fef Mon Sep 17 00:00:00 2001 From: Michal Szadkowski Date: Wed, 14 Aug 2024 14:36:49 +0200 Subject: [PATCH 3/4] Move common Multikueue adapter to Kubeflowjob package --- .../jobs/paddlejob/paddlejob_controller.go | 3 +-- .../paddlejob/paddlejob_multikueue_adapter.go | 4 ++-- .../paddlejob_multikueue_adapter_test.go | 4 ++-- .../pytorchjob/pytorch_multikueue_adapter.go | 4 ++-- .../pytorch_multikueue_adapter_test.go | 4 ++-- .../jobs/pytorchjob/pytorchjob_controller.go | 3 +-- .../kubeflow/jobs/tfjob/tfjob_controller.go | 3 +-- .../jobs/tfjob/tfjob_multikueue_adapter.go | 4 ++-- .../tfjob/tfjob_multikueue_adapter_test.go | 4 ++-- .../jobs/xgboostjob/xgboostjob_controller.go | 3 +-- .../xgboostjob_multikueue_adapter.go | 4 ++-- .../xgboostjob_multikueue_adapter_test.go | 4 ++-- .../kubeflowjob_multikueue_adapter.go} | 18 +++++++++++++++++- test/integration/multikueue/multikueue_test.go | 2 +- 14 files changed, 38 insertions(+), 26 deletions(-) rename pkg/controller/jobs/kubeflow/{common/common.go => kubeflowjob/kubeflowjob_multikueue_adapter.go} (85%) diff --git a/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_controller.go b/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_controller.go index 4181b6f82f..ec1c557466 100644 --- a/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_controller.go +++ b/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_controller.go @@ -29,7 +29,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/kueue/pkg/controller/jobframework" - kfcommon "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/common" "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/kubeflowjob" ) @@ -46,7 +45,7 @@ func init() { JobType: &kftraining.PaddleJob{}, AddToScheme: kftraining.AddToScheme, IsManagingObjectsOwner: isPaddleJob, - MultiKueueAdapter: kfcommon.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.PaddleJobKind), + MultiKueueAdapter: kubeflowjob.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.PaddleJobKind), })) } diff --git a/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_multikueue_adapter.go b/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_multikueue_adapter.go index 3e0c834e23..d4719e014c 100644 --- a/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_multikueue_adapter.go +++ b/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_multikueue_adapter.go @@ -21,11 +21,11 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/kueue/pkg/controller/jobframework" - kfcommon "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/common" + "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/kubeflowjob" "sigs.k8s.io/kueue/pkg/util/api" ) -var _ jobframework.MultiKueueAdapter = kfcommon.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.PaddleJobKind) +var _ jobframework.MultiKueueAdapter = kubeflowjob.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.PaddleJobKind) func copyJobStatus(dst, src *kftraining.PaddleJob) { dst.Status = src.Status diff --git a/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_multikueue_adapter_test.go b/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_multikueue_adapter_test.go index 35855c042e..968186b46b 100644 --- a/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_multikueue_adapter_test.go +++ b/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_multikueue_adapter_test.go @@ -32,7 +32,7 @@ import ( kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" "sigs.k8s.io/kueue/pkg/controller/constants" "sigs.k8s.io/kueue/pkg/controller/jobframework" - kfcommon "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/common" + "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/kubeflowjob" "sigs.k8s.io/kueue/pkg/util/slices" utiltesting "sigs.k8s.io/kueue/pkg/util/testing" kfutiltesting "sigs.k8s.io/kueue/pkg/util/testingjobs/paddlejob" @@ -132,7 +132,7 @@ func TestMultikueueAdapter(t *testing.T) { ctx, _ := utiltesting.ContextWithLog(t) - adapter := kfcommon.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.PaddleJobKind) + adapter := kubeflowjob.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.PaddleJobKind) gotErr := tc.operation(ctx, adapter, managerClient, workerClient) diff --git a/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorch_multikueue_adapter.go b/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorch_multikueue_adapter.go index 90cf1f9e5d..8fe0225d19 100644 --- a/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorch_multikueue_adapter.go +++ b/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorch_multikueue_adapter.go @@ -21,11 +21,11 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/kueue/pkg/controller/jobframework" - kfcommon "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/common" + "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/kubeflowjob" "sigs.k8s.io/kueue/pkg/util/api" ) -var _ jobframework.MultiKueueAdapter = kfcommon.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.PyTorchJobKind) +var _ jobframework.MultiKueueAdapter = kubeflowjob.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.PyTorchJobKind) func copyJobStatus(dst, src *kftraining.PyTorchJob) { dst.Status = src.Status diff --git a/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorch_multikueue_adapter_test.go b/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorch_multikueue_adapter_test.go index 1c6a8a1711..4bc935eb46 100644 --- a/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorch_multikueue_adapter_test.go +++ b/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorch_multikueue_adapter_test.go @@ -33,7 +33,7 @@ import ( kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" "sigs.k8s.io/kueue/pkg/controller/constants" "sigs.k8s.io/kueue/pkg/controller/jobframework" - kfcommon "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/common" + "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/kubeflowjob" "sigs.k8s.io/kueue/pkg/util/slices" utiltesting "sigs.k8s.io/kueue/pkg/util/testing" kfutiltesting "sigs.k8s.io/kueue/pkg/util/testingjobs/pytorchjob" @@ -132,7 +132,7 @@ func TestMultikueueAdapter(t *testing.T) { ctx, _ := utiltesting.ContextWithLog(t) - adapter := kfcommon.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.PyTorchJobKind) + adapter := kubeflowjob.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.PyTorchJobKind) gotErr := tc.operation(ctx, adapter, managerClient, workerClient) diff --git a/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorchjob_controller.go b/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorchjob_controller.go index b66f50a147..8daf2abc6b 100644 --- a/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorchjob_controller.go +++ b/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorchjob_controller.go @@ -29,7 +29,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/kueue/pkg/controller/jobframework" - kfcommon "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/common" "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/kubeflowjob" ) @@ -46,7 +45,7 @@ func init() { JobType: &kftraining.PyTorchJob{}, AddToScheme: kftraining.AddToScheme, IsManagingObjectsOwner: isPyTorchJob, - MultiKueueAdapter: kfcommon.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.PyTorchJobKind), + MultiKueueAdapter: kubeflowjob.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.PyTorchJobKind), })) } diff --git a/pkg/controller/jobs/kubeflow/jobs/tfjob/tfjob_controller.go b/pkg/controller/jobs/kubeflow/jobs/tfjob/tfjob_controller.go index e14099ac1f..4774d3f7fc 100644 --- a/pkg/controller/jobs/kubeflow/jobs/tfjob/tfjob_controller.go +++ b/pkg/controller/jobs/kubeflow/jobs/tfjob/tfjob_controller.go @@ -29,7 +29,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/kueue/pkg/controller/jobframework" - kfcommon "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/common" "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/kubeflowjob" ) @@ -46,7 +45,7 @@ func init() { JobType: &kftraining.TFJob{}, AddToScheme: kftraining.AddToScheme, IsManagingObjectsOwner: isTFJob, - MultiKueueAdapter: kfcommon.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.TFJobKind), + MultiKueueAdapter: kubeflowjob.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.TFJobKind), })) } diff --git a/pkg/controller/jobs/kubeflow/jobs/tfjob/tfjob_multikueue_adapter.go b/pkg/controller/jobs/kubeflow/jobs/tfjob/tfjob_multikueue_adapter.go index a2d619c9b6..2ea08965fd 100644 --- a/pkg/controller/jobs/kubeflow/jobs/tfjob/tfjob_multikueue_adapter.go +++ b/pkg/controller/jobs/kubeflow/jobs/tfjob/tfjob_multikueue_adapter.go @@ -21,11 +21,11 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/kueue/pkg/controller/jobframework" - kfcommon "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/common" + "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/kubeflowjob" "sigs.k8s.io/kueue/pkg/util/api" ) -var _ jobframework.MultiKueueAdapter = kfcommon.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.TFJobKind) +var _ jobframework.MultiKueueAdapter = kubeflowjob.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.TFJobKind) func copyJobStatus(dst, src *kftraining.TFJob) { dst.Status = src.Status diff --git a/pkg/controller/jobs/kubeflow/jobs/tfjob/tfjob_multikueue_adapter_test.go b/pkg/controller/jobs/kubeflow/jobs/tfjob/tfjob_multikueue_adapter_test.go index e9e2bd5a05..b8918d3b28 100644 --- a/pkg/controller/jobs/kubeflow/jobs/tfjob/tfjob_multikueue_adapter_test.go +++ b/pkg/controller/jobs/kubeflow/jobs/tfjob/tfjob_multikueue_adapter_test.go @@ -32,7 +32,7 @@ import ( kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" "sigs.k8s.io/kueue/pkg/controller/constants" "sigs.k8s.io/kueue/pkg/controller/jobframework" - kfcommon "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/common" + "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/kubeflowjob" "sigs.k8s.io/kueue/pkg/util/slices" utiltesting "sigs.k8s.io/kueue/pkg/util/testing" kfutiltesting "sigs.k8s.io/kueue/pkg/util/testingjobs/tfjob" @@ -131,7 +131,7 @@ func TestMultikueueAdapter(t *testing.T) { ctx, _ := utiltesting.ContextWithLog(t) - adapter := kfcommon.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.TFJobKind) + adapter := kubeflowjob.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.TFJobKind) gotErr := tc.operation(ctx, adapter, managerClient, workerClient) diff --git a/pkg/controller/jobs/kubeflow/jobs/xgboostjob/xgboostjob_controller.go b/pkg/controller/jobs/kubeflow/jobs/xgboostjob/xgboostjob_controller.go index c37d4ba62a..aa8498ca98 100644 --- a/pkg/controller/jobs/kubeflow/jobs/xgboostjob/xgboostjob_controller.go +++ b/pkg/controller/jobs/kubeflow/jobs/xgboostjob/xgboostjob_controller.go @@ -29,7 +29,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/kueue/pkg/controller/jobframework" - kfcommon "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/common" "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/kubeflowjob" ) @@ -46,7 +45,7 @@ func init() { JobType: &kftraining.XGBoostJob{}, AddToScheme: kftraining.AddToScheme, IsManagingObjectsOwner: isXGBoostJob, - MultiKueueAdapter: kfcommon.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.XGBoostJobKind), + MultiKueueAdapter: kubeflowjob.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.XGBoostJobKind), })) } diff --git a/pkg/controller/jobs/kubeflow/jobs/xgboostjob/xgboostjob_multikueue_adapter.go b/pkg/controller/jobs/kubeflow/jobs/xgboostjob/xgboostjob_multikueue_adapter.go index bdf299eefa..fdb67bbea1 100644 --- a/pkg/controller/jobs/kubeflow/jobs/xgboostjob/xgboostjob_multikueue_adapter.go +++ b/pkg/controller/jobs/kubeflow/jobs/xgboostjob/xgboostjob_multikueue_adapter.go @@ -21,11 +21,11 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/kueue/pkg/controller/jobframework" - kfcommon "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/common" + "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/kubeflowjob" "sigs.k8s.io/kueue/pkg/util/api" ) -var _ jobframework.MultiKueueAdapter = kfcommon.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.XGBoostJobKind) +var _ jobframework.MultiKueueAdapter = kubeflowjob.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.XGBoostJobKind) func copyJobStatus(dst, src *kftraining.XGBoostJob) { dst.Status = src.Status diff --git a/pkg/controller/jobs/kubeflow/jobs/xgboostjob/xgboostjob_multikueue_adapter_test.go b/pkg/controller/jobs/kubeflow/jobs/xgboostjob/xgboostjob_multikueue_adapter_test.go index 3a36002809..0615992ac8 100644 --- a/pkg/controller/jobs/kubeflow/jobs/xgboostjob/xgboostjob_multikueue_adapter_test.go +++ b/pkg/controller/jobs/kubeflow/jobs/xgboostjob/xgboostjob_multikueue_adapter_test.go @@ -32,7 +32,7 @@ import ( kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" "sigs.k8s.io/kueue/pkg/controller/constants" "sigs.k8s.io/kueue/pkg/controller/jobframework" - kfcommon "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/common" + "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/kubeflowjob" "sigs.k8s.io/kueue/pkg/util/slices" utiltesting "sigs.k8s.io/kueue/pkg/util/testing" kfutiltesting "sigs.k8s.io/kueue/pkg/util/testingjobs/xgboostjob" @@ -131,7 +131,7 @@ func TestMultikueueAdapter(t *testing.T) { ctx, _ := utiltesting.ContextWithLog(t) - adapter := kfcommon.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.XGBoostJobKind) + adapter := kubeflowjob.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.XGBoostJobKind) gotErr := tc.operation(ctx, adapter, managerClient, workerClient) diff --git a/pkg/controller/jobs/kubeflow/common/common.go b/pkg/controller/jobs/kubeflow/kubeflowjob/kubeflowjob_multikueue_adapter.go similarity index 85% rename from pkg/controller/jobs/kubeflow/common/common.go rename to pkg/controller/jobs/kubeflow/kubeflowjob/kubeflowjob_multikueue_adapter.go index 6071971125..0d15c91b17 100644 --- a/pkg/controller/jobs/kubeflow/common/common.go +++ b/pkg/controller/jobs/kubeflow/kubeflowjob/kubeflowjob_multikueue_adapter.go @@ -1,4 +1,20 @@ -package common +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubeflowjob import ( "context" diff --git a/test/integration/multikueue/multikueue_test.go b/test/integration/multikueue/multikueue_test.go index d339ead72e..856cbb453e 100644 --- a/test/integration/multikueue/multikueue_test.go +++ b/test/integration/multikueue/multikueue_test.go @@ -897,7 +897,7 @@ var _ = ginkgo.Describe("Multikueue", ginkgo.Ordered, ginkgo.ContinueOnFailure, Succeeded: 1, }, })) - }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) }) ginkgo.By("finishing the worker TFJob, the manager's wl is marked as finished and the worker2 wl removed", func() { From 5ed0f3bd0dc5cf750745c05b6bcb5438814ae40d Mon Sep 17 00:00:00 2001 From: Michal Szadkowski Date: Mon, 19 Aug 2024 17:50:47 +0200 Subject: [PATCH 4/4] Changes after code review --- .../jobs/paddlejob/paddlejob_controller.go | 2 +- .../paddlejob/paddlejob_multikueue_adapter.go | 2 +- .../paddlejob_multikueue_adapter_test.go | 2 +- .../pytorchjob/pytorch_multikueue_adapter.go | 2 +- .../pytorch_multikueue_adapter_test.go | 2 +- .../jobs/pytorchjob/pytorchjob_controller.go | 2 +- .../kubeflow/jobs/tfjob/tfjob_controller.go | 2 +- .../jobs/tfjob/tfjob_multikueue_adapter.go | 2 +- .../tfjob/tfjob_multikueue_adapter_test.go | 2 +- .../jobs/xgboostjob/xgboostjob_controller.go | 2 +- .../xgboostjob_multikueue_adapter.go | 2 +- .../xgboostjob_multikueue_adapter_test.go | 2 +- .../kubeflowjob_multikueue_adapter.go | 25 ++++++++----------- 13 files changed, 22 insertions(+), 27 deletions(-) diff --git a/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_controller.go b/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_controller.go index ec1c557466..eb091ba7a6 100644 --- a/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_controller.go +++ b/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_controller.go @@ -45,7 +45,7 @@ func init() { JobType: &kftraining.PaddleJob{}, AddToScheme: kftraining.AddToScheme, IsManagingObjectsOwner: isPaddleJob, - MultiKueueAdapter: kubeflowjob.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.PaddleJobKind), + MultiKueueAdapter: kubeflowjob.NewMKAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk), })) } diff --git a/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_multikueue_adapter.go b/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_multikueue_adapter.go index d4719e014c..6617b478e3 100644 --- a/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_multikueue_adapter.go +++ b/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_multikueue_adapter.go @@ -25,7 +25,7 @@ import ( "sigs.k8s.io/kueue/pkg/util/api" ) -var _ jobframework.MultiKueueAdapter = kubeflowjob.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.PaddleJobKind) +var _ jobframework.MultiKueueAdapter = kubeflowjob.NewMKAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk) func copyJobStatus(dst, src *kftraining.PaddleJob) { dst.Status = src.Status diff --git a/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_multikueue_adapter_test.go b/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_multikueue_adapter_test.go index 968186b46b..d896d989b4 100644 --- a/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_multikueue_adapter_test.go +++ b/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_multikueue_adapter_test.go @@ -132,7 +132,7 @@ func TestMultikueueAdapter(t *testing.T) { ctx, _ := utiltesting.ContextWithLog(t) - adapter := kubeflowjob.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.PaddleJobKind) + adapter := kubeflowjob.NewMKAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk) gotErr := tc.operation(ctx, adapter, managerClient, workerClient) diff --git a/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorch_multikueue_adapter.go b/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorch_multikueue_adapter.go index 8fe0225d19..2e11d957e7 100644 --- a/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorch_multikueue_adapter.go +++ b/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorch_multikueue_adapter.go @@ -25,7 +25,7 @@ import ( "sigs.k8s.io/kueue/pkg/util/api" ) -var _ jobframework.MultiKueueAdapter = kubeflowjob.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.PyTorchJobKind) +var _ jobframework.MultiKueueAdapter = kubeflowjob.NewMKAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk) func copyJobStatus(dst, src *kftraining.PyTorchJob) { dst.Status = src.Status diff --git a/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorch_multikueue_adapter_test.go b/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorch_multikueue_adapter_test.go index 4bc935eb46..baed632397 100644 --- a/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorch_multikueue_adapter_test.go +++ b/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorch_multikueue_adapter_test.go @@ -132,7 +132,7 @@ func TestMultikueueAdapter(t *testing.T) { ctx, _ := utiltesting.ContextWithLog(t) - adapter := kubeflowjob.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.PyTorchJobKind) + adapter := kubeflowjob.NewMKAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk) gotErr := tc.operation(ctx, adapter, managerClient, workerClient) diff --git a/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorchjob_controller.go b/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorchjob_controller.go index 8daf2abc6b..b63f17cbc0 100644 --- a/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorchjob_controller.go +++ b/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorchjob_controller.go @@ -45,7 +45,7 @@ func init() { JobType: &kftraining.PyTorchJob{}, AddToScheme: kftraining.AddToScheme, IsManagingObjectsOwner: isPyTorchJob, - MultiKueueAdapter: kubeflowjob.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.PyTorchJobKind), + MultiKueueAdapter: kubeflowjob.NewMKAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk), })) } diff --git a/pkg/controller/jobs/kubeflow/jobs/tfjob/tfjob_controller.go b/pkg/controller/jobs/kubeflow/jobs/tfjob/tfjob_controller.go index 4774d3f7fc..02f9896a19 100644 --- a/pkg/controller/jobs/kubeflow/jobs/tfjob/tfjob_controller.go +++ b/pkg/controller/jobs/kubeflow/jobs/tfjob/tfjob_controller.go @@ -45,7 +45,7 @@ func init() { JobType: &kftraining.TFJob{}, AddToScheme: kftraining.AddToScheme, IsManagingObjectsOwner: isTFJob, - MultiKueueAdapter: kubeflowjob.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.TFJobKind), + MultiKueueAdapter: kubeflowjob.NewMKAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk), })) } diff --git a/pkg/controller/jobs/kubeflow/jobs/tfjob/tfjob_multikueue_adapter.go b/pkg/controller/jobs/kubeflow/jobs/tfjob/tfjob_multikueue_adapter.go index 2ea08965fd..8a053c67a1 100644 --- a/pkg/controller/jobs/kubeflow/jobs/tfjob/tfjob_multikueue_adapter.go +++ b/pkg/controller/jobs/kubeflow/jobs/tfjob/tfjob_multikueue_adapter.go @@ -25,7 +25,7 @@ import ( "sigs.k8s.io/kueue/pkg/util/api" ) -var _ jobframework.MultiKueueAdapter = kubeflowjob.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.TFJobKind) +var _ jobframework.MultiKueueAdapter = kubeflowjob.NewMKAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk) func copyJobStatus(dst, src *kftraining.TFJob) { dst.Status = src.Status diff --git a/pkg/controller/jobs/kubeflow/jobs/tfjob/tfjob_multikueue_adapter_test.go b/pkg/controller/jobs/kubeflow/jobs/tfjob/tfjob_multikueue_adapter_test.go index b8918d3b28..d50de8e761 100644 --- a/pkg/controller/jobs/kubeflow/jobs/tfjob/tfjob_multikueue_adapter_test.go +++ b/pkg/controller/jobs/kubeflow/jobs/tfjob/tfjob_multikueue_adapter_test.go @@ -131,7 +131,7 @@ func TestMultikueueAdapter(t *testing.T) { ctx, _ := utiltesting.ContextWithLog(t) - adapter := kubeflowjob.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.TFJobKind) + adapter := kubeflowjob.NewMKAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk) gotErr := tc.operation(ctx, adapter, managerClient, workerClient) diff --git a/pkg/controller/jobs/kubeflow/jobs/xgboostjob/xgboostjob_controller.go b/pkg/controller/jobs/kubeflow/jobs/xgboostjob/xgboostjob_controller.go index aa8498ca98..8ddd0dae14 100644 --- a/pkg/controller/jobs/kubeflow/jobs/xgboostjob/xgboostjob_controller.go +++ b/pkg/controller/jobs/kubeflow/jobs/xgboostjob/xgboostjob_controller.go @@ -45,7 +45,7 @@ func init() { JobType: &kftraining.XGBoostJob{}, AddToScheme: kftraining.AddToScheme, IsManagingObjectsOwner: isXGBoostJob, - MultiKueueAdapter: kubeflowjob.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.XGBoostJobKind), + MultiKueueAdapter: kubeflowjob.NewMKAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk), })) } diff --git a/pkg/controller/jobs/kubeflow/jobs/xgboostjob/xgboostjob_multikueue_adapter.go b/pkg/controller/jobs/kubeflow/jobs/xgboostjob/xgboostjob_multikueue_adapter.go index fdb67bbea1..cf1ca5d680 100644 --- a/pkg/controller/jobs/kubeflow/jobs/xgboostjob/xgboostjob_multikueue_adapter.go +++ b/pkg/controller/jobs/kubeflow/jobs/xgboostjob/xgboostjob_multikueue_adapter.go @@ -25,7 +25,7 @@ import ( "sigs.k8s.io/kueue/pkg/util/api" ) -var _ jobframework.MultiKueueAdapter = kubeflowjob.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.XGBoostJobKind) +var _ jobframework.MultiKueueAdapter = kubeflowjob.NewMKAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk) func copyJobStatus(dst, src *kftraining.XGBoostJob) { dst.Status = src.Status diff --git a/pkg/controller/jobs/kubeflow/jobs/xgboostjob/xgboostjob_multikueue_adapter_test.go b/pkg/controller/jobs/kubeflow/jobs/xgboostjob/xgboostjob_multikueue_adapter_test.go index 0615992ac8..a0084482a3 100644 --- a/pkg/controller/jobs/kubeflow/jobs/xgboostjob/xgboostjob_multikueue_adapter_test.go +++ b/pkg/controller/jobs/kubeflow/jobs/xgboostjob/xgboostjob_multikueue_adapter_test.go @@ -131,7 +131,7 @@ func TestMultikueueAdapter(t *testing.T) { ctx, _ := utiltesting.ContextWithLog(t) - adapter := kubeflowjob.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.XGBoostJobKind) + adapter := kubeflowjob.NewMKAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk) gotErr := tc.operation(ctx, adapter, managerClient, workerClient) diff --git a/pkg/controller/jobs/kubeflow/kubeflowjob/kubeflowjob_multikueue_adapter.go b/pkg/controller/jobs/kubeflow/kubeflowjob/kubeflowjob_multikueue_adapter.go index 0d15c91b17..f422524848 100644 --- a/pkg/controller/jobs/kubeflow/kubeflowjob/kubeflowjob_multikueue_adapter.go +++ b/pkg/controller/jobs/kubeflow/kubeflowjob/kubeflowjob_multikueue_adapter.go @@ -44,8 +44,7 @@ type adapter[PtrT objAsPtr[T], T any] struct { copySpec func(dst, src PtrT) copyStatus func(dst, src PtrT) emptyList func() client.ObjectList - gwk schema.GroupVersionKind - typeName string + gvk schema.GroupVersionKind } type fullInterface interface { @@ -53,31 +52,29 @@ type fullInterface interface { jobframework.MultiKueueWatcher } -func NewAdapter[PtrT objAsPtr[T], T any]( +func NewMKAdapter[PtrT objAsPtr[T], T any]( copySpec func(dst, src PtrT), copyStatus func(dst, src PtrT), emptyList func() client.ObjectList, - gwk schema.GroupVersionKind, - typeName string, + gvk schema.GroupVersionKind, ) fullInterface { return &adapter[PtrT, T]{ copySpec: copySpec, copyStatus: copyStatus, emptyList: emptyList, - gwk: gwk, - typeName: typeName, + gvk: gvk, } } func (a adapter[PtrT, T]) GVK() schema.GroupVersionKind { - return a.gwk + return a.gvk } func (a adapter[PtrT, T]) KeepAdmissionCheckPending() bool { return false } -func (a adapter[PtrT, T]) IsJobManagedByKueue(_ context.Context, _ client.Client, _ types.NamespacedName) (bool, string, error) { +func (a adapter[PtrT, T]) IsJobManagedByKueue(context.Context, client.Client, types.NamespacedName) (bool, string, error) { return true, "", nil } @@ -124,10 +121,8 @@ func (a adapter[PtrT, T]) SyncJob( func (a adapter[PtrT, T]) DeleteRemoteObject(ctx context.Context, remoteClient client.Client, key types.NamespacedName) error { job := PtrT(new(T)) - err := remoteClient.Get(ctx, key, job) - if err != nil { - return client.IgnoreNotFound(err) - } + job.SetName(key.Name) + job.SetNamespace(key.Namespace) return client.IgnoreNotFound(remoteClient.Delete(ctx, job)) } @@ -138,12 +133,12 @@ func (a adapter[PtrT, T]) GetEmptyList() client.ObjectList { func (a adapter[PtrT, T]) WorkloadKeyFor(o runtime.Object) (types.NamespacedName, error) { job, isTheJob := o.(PtrT) if !isTheJob { - return types.NamespacedName{}, fmt.Errorf("not a %s", a.typeName) + return types.NamespacedName{}, fmt.Errorf("not a %s", a.gvk.Kind) } prebuiltWl, hasPrebuiltWorkload := job.GetLabels()[constants.PrebuiltWorkloadLabel] if !hasPrebuiltWorkload { - return types.NamespacedName{}, fmt.Errorf("no prebuilt workload found for %s: %s", a.typeName, klog.KObj(job)) + return types.NamespacedName{}, fmt.Errorf("no prebuilt workload found for %s: %s", a.gvk.Kind, klog.KObj(job)) } return types.NamespacedName{Name: prebuiltWl, Namespace: job.GetNamespace()}, nil