Skip to content

Commit

Permalink
Changes after code review
Browse files Browse the repository at this point in the history
  • Loading branch information
mszadkow committed Aug 19, 2024
1 parent 7140be8 commit 5f98f05
Show file tree
Hide file tree
Showing 13 changed files with 21 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func init() {
JobType: &kftraining.PaddleJob{},
AddToScheme: kftraining.AddToScheme,
IsManagingObjectsOwner: isPaddleJob,
MultiKueueAdapter: kubeflowjob.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.PaddleJobKind),
MultiKueueAdapter: kubeflowjob.NewMKAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk),
}))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"sigs.k8s.io/kueue/pkg/util/api"
)

var _ jobframework.MultiKueueAdapter = kubeflowjob.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.PaddleJobKind)
var _ jobframework.MultiKueueAdapter = kubeflowjob.NewMKAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk)

func copyJobStatus(dst, src *kftraining.PaddleJob) {
dst.Status = src.Status
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func TestMultikueueAdapter(t *testing.T) {

ctx, _ := utiltesting.ContextWithLog(t)

adapter := kubeflowjob.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.PaddleJobKind)
adapter := kubeflowjob.NewMKAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk)

gotErr := tc.operation(ctx, adapter, managerClient, workerClient)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"sigs.k8s.io/kueue/pkg/util/api"
)

var _ jobframework.MultiKueueAdapter = kubeflowjob.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.PyTorchJobKind)
var _ jobframework.MultiKueueAdapter = kubeflowjob.NewMKAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk)

func copyJobStatus(dst, src *kftraining.PyTorchJob) {
dst.Status = src.Status
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func TestMultikueueAdapter(t *testing.T) {

ctx, _ := utiltesting.ContextWithLog(t)

adapter := kubeflowjob.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.PyTorchJobKind)
adapter := kubeflowjob.NewMKAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk)

gotErr := tc.operation(ctx, adapter, managerClient, workerClient)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func init() {
JobType: &kftraining.PyTorchJob{},
AddToScheme: kftraining.AddToScheme,
IsManagingObjectsOwner: isPyTorchJob,
MultiKueueAdapter: kubeflowjob.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.PyTorchJobKind),
MultiKueueAdapter: kubeflowjob.NewMKAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk),
}))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func init() {
JobType: &kftraining.TFJob{},
AddToScheme: kftraining.AddToScheme,
IsManagingObjectsOwner: isTFJob,
MultiKueueAdapter: kubeflowjob.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.TFJobKind),
MultiKueueAdapter: kubeflowjob.NewMKAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk),
}))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"sigs.k8s.io/kueue/pkg/util/api"
)

var _ jobframework.MultiKueueAdapter = kubeflowjob.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.TFJobKind)
var _ jobframework.MultiKueueAdapter = kubeflowjob.NewMKAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk)

func copyJobStatus(dst, src *kftraining.TFJob) {
dst.Status = src.Status
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func TestMultikueueAdapter(t *testing.T) {

ctx, _ := utiltesting.ContextWithLog(t)

adapter := kubeflowjob.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.TFJobKind)
adapter := kubeflowjob.NewMKAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk)

gotErr := tc.operation(ctx, adapter, managerClient, workerClient)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func init() {
JobType: &kftraining.XGBoostJob{},
AddToScheme: kftraining.AddToScheme,
IsManagingObjectsOwner: isXGBoostJob,
MultiKueueAdapter: kubeflowjob.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.XGBoostJobKind),
MultiKueueAdapter: kubeflowjob.NewMKAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk),
}))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"sigs.k8s.io/kueue/pkg/util/api"
)

var _ jobframework.MultiKueueAdapter = kubeflowjob.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.XGBoostJobKind)
var _ jobframework.MultiKueueAdapter = kubeflowjob.NewMKAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk)

func copyJobStatus(dst, src *kftraining.XGBoostJob) {
dst.Status = src.Status
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func TestMultikueueAdapter(t *testing.T) {

ctx, _ := utiltesting.ContextWithLog(t)

adapter := kubeflowjob.NewAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk, kftraining.XGBoostJobKind)
adapter := kubeflowjob.NewMKAdapter(copyJobSpec, copyJobStatus, getEmptyList, gvk)

gotErr := tc.operation(ctx, adapter, managerClient, workerClient)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,33 +44,30 @@ type adapter[PtrT objAsPtr[T], T any] struct {
copySpec func(dst, src PtrT)
copyStatus func(dst, src PtrT)
emptyList func() client.ObjectList
gwk schema.GroupVersionKind
typeName string
gvk schema.GroupVersionKind
}

type fullInterface interface {
jobframework.MultiKueueAdapter
jobframework.MultiKueueWatcher
}

func NewAdapter[PtrT objAsPtr[T], T any](
func NewMKAdapter[PtrT objAsPtr[T], T any](
copySpec func(dst, src PtrT),
copyStatus func(dst, src PtrT),
emptyList func() client.ObjectList,
gwk schema.GroupVersionKind,
typeName string,
gvk schema.GroupVersionKind,
) fullInterface {
return &adapter[PtrT, T]{
copySpec: copySpec,
copyStatus: copyStatus,
emptyList: emptyList,
gwk: gwk,
typeName: typeName,
gvk: gvk,
}
}

func (a adapter[PtrT, T]) GVK() schema.GroupVersionKind {
return a.gwk
return a.gvk
}

func (a adapter[PtrT, T]) KeepAdmissionCheckPending() bool {
Expand Down Expand Up @@ -124,10 +121,8 @@ func (a adapter[PtrT, T]) SyncJob(

func (a adapter[PtrT, T]) DeleteRemoteObject(ctx context.Context, remoteClient client.Client, key types.NamespacedName) error {
job := PtrT(new(T))
err := remoteClient.Get(ctx, key, job)
if err != nil {
return client.IgnoreNotFound(err)
}
job.SetName(key.Name)
job.SetNamespace(key.Namespace)
return client.IgnoreNotFound(remoteClient.Delete(ctx, job))
}

Expand All @@ -138,12 +133,12 @@ func (a adapter[PtrT, T]) GetEmptyList() client.ObjectList {
func (a adapter[PtrT, T]) WorkloadKeyFor(o runtime.Object) (types.NamespacedName, error) {
job, isTheJob := o.(PtrT)
if !isTheJob {
return types.NamespacedName{}, fmt.Errorf("not a %s", a.typeName)
return types.NamespacedName{}, fmt.Errorf("not a %s", a.gvk.Kind)
}

prebuiltWl, hasPrebuiltWorkload := job.GetLabels()[constants.PrebuiltWorkloadLabel]
if !hasPrebuiltWorkload {
return types.NamespacedName{}, fmt.Errorf("no prebuilt workload found for %s: %s", a.typeName, klog.KObj(job))
return types.NamespacedName{}, fmt.Errorf("no prebuilt workload found for %s: %s", a.gvk.Kind, klog.KObj(job))
}

return types.NamespacedName{Name: prebuiltWl, Namespace: job.GetNamespace()}, nil
Expand Down

0 comments on commit 5f98f05

Please sign in to comment.