From 03e2736999a61ccb073721467051435f1d8aa45e Mon Sep 17 00:00:00 2001 From: Michal Szadkowski Date: Fri, 19 Jul 2024 16:01:30 +0200 Subject: [PATCH] wip --- .../xgboostjob/xgboostjob_controller_test.go | 10 ++- pkg/util/testingjobs/mxjob/wrappers.go | 7 +- pkg/util/testingjobs/paddlejob/wrappers.go | 5 +- .../pytorchjob/wrappers_pytorchjob.go | 62 +++++++++++---- pkg/util/testingjobs/tfjob/wrappers_tfjob.go | 79 +++++++++++++++---- pkg/util/testingjobs/xgboostjob/wrappers.go | 61 ++++++++++---- .../jobs/mxjob/mxjob_controller_test.go | 2 +- .../pytorchjob/pytorchjob_controller_test.go | 17 ++-- .../jobs/tfjob/tfjob_controller_test.go | 5 +- .../xgboostjob/xgboostjob_controller_test.go | 5 +- 10 files changed, 183 insertions(+), 70 deletions(-) diff --git a/pkg/controller/jobs/kubeflow/jobs/xgboostjob/xgboostjob_controller_test.go b/pkg/controller/jobs/kubeflow/jobs/xgboostjob/xgboostjob_controller_test.go index eff2fc356f..1edddc1c4b 100644 --- a/pkg/controller/jobs/kubeflow/jobs/xgboostjob/xgboostjob_controller_test.go +++ b/pkg/controller/jobs/kubeflow/jobs/xgboostjob/xgboostjob_controller_test.go @@ -253,8 +253,8 @@ func TestReconciler(t *testing.T) { reconcilerOptions: []jobframework.Option{ jobframework.WithManageJobsWithoutQueueName(true), }, - job: testingxgboostjob.MakeXGBoostJob("xgboostjob", "ns").Parallelism(2).Obj(), - wantJob: testingxgboostjob.MakeXGBoostJob("xgboostjob", "ns").Parallelism(2).Obj(), + job: testingxgboostjob.MakeXGBoostJob("xgboostjob", "ns").XGBReplicaSpecsDefault().Parallelism(2).Obj(), + wantJob: testingxgboostjob.MakeXGBoostJob("xgboostjob", "ns").XGBReplicaSpecsDefault().Parallelism(2).Obj(), wantWorkloads: []kueue.Workload{ *utiltesting.MakeWorkload("xgboostjob", "ns"). PodSets( @@ -268,12 +268,13 @@ func TestReconciler(t *testing.T) { reconcilerOptions: []jobframework.Option{ jobframework.WithManageJobsWithoutQueueName(false), }, - job: testingxgboostjob.MakeXGBoostJob("xgboostjob", "ns").Parallelism(2).Obj(), - wantJob: testingxgboostjob.MakeXGBoostJob("xgboostjob", "ns").Parallelism(2).Obj(), + job: testingxgboostjob.MakeXGBoostJob("xgboostjob", "ns").XGBReplicaSpecsDefault().Parallelism(2).Obj(), + wantJob: testingxgboostjob.MakeXGBoostJob("xgboostjob", "ns").XGBReplicaSpecsDefault().Parallelism(2).Obj(), wantWorkloads: []kueue.Workload{}, }, "when workload is evicted, suspended is reset, restore node affinity": { job: testingxgboostjob.MakeXGBoostJob("xgboostjob", "ns"). + XGBReplicaSpecsDefault(). Image(""). Args(nil). Queue("foo"). @@ -317,6 +318,7 @@ func TestReconciler(t *testing.T) { Obj(), }, wantJob: testingxgboostjob.MakeXGBoostJob("xgboostjob", "ns"). + XGBReplicaSpecsDefault(). Image(""). Args(nil). Queue("foo"). diff --git a/pkg/util/testingjobs/mxjob/wrappers.go b/pkg/util/testingjobs/mxjob/wrappers.go index e42d30da69..0d7fefe289 100644 --- a/pkg/util/testingjobs/mxjob/wrappers.go +++ b/pkg/util/testingjobs/mxjob/wrappers.go @@ -60,6 +60,7 @@ func (j *MXJobWrapper) MXReplicaSpecs(replicaSpecs ...MXReplicaSpecRequirement) for _, rs := range replicaSpecs { j.Spec.MXReplicaSpecs[rs.ReplicaType].Replicas = ptr.To[int32](rs.ReplicaCount) j.Spec.MXReplicaSpecs[rs.ReplicaType].Template.Spec.RestartPolicy = corev1.RestartPolicy(rs.RestartPolicy) + j.Spec.MXReplicaSpecs[rs.ReplicaType].Template.Spec.Containers[0].Name = "mxnet" if rs.Annotations != nil { j.Spec.MXReplicaSpecs[rs.ReplicaType].Template.ObjectMeta.Annotations = rs.Annotations @@ -77,7 +78,7 @@ func (j *MXJobWrapper) MXReplicaSpecsDefault() *MXJobWrapper { RestartPolicy: "Never", Containers: []corev1.Container{ { - Name: "mxnet", + Name: "c", Image: "pause", Command: []string{}, Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{}}, @@ -95,7 +96,7 @@ func (j *MXJobWrapper) MXReplicaSpecsDefault() *MXJobWrapper { RestartPolicy: "Never", Containers: []corev1.Container{ { - Name: "mxnet", + Name: "c", Image: "pause", Command: []string{}, Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{}}, @@ -113,7 +114,7 @@ func (j *MXJobWrapper) MXReplicaSpecsDefault() *MXJobWrapper { RestartPolicy: "Never", Containers: []corev1.Container{ { - Name: "mxnet", + Name: "c", Image: "pause", Command: []string{}, Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{}}, diff --git a/pkg/util/testingjobs/paddlejob/wrappers.go b/pkg/util/testingjobs/paddlejob/wrappers.go index e9f2c49ff9..4a56e78eb6 100644 --- a/pkg/util/testingjobs/paddlejob/wrappers.go +++ b/pkg/util/testingjobs/paddlejob/wrappers.go @@ -59,6 +59,7 @@ func (j *PaddleJobWrapper) PaddleReplicaSpecs(replicaSpecs ...PaddleReplicaSpecR for _, rs := range replicaSpecs { j.Spec.PaddleReplicaSpecs[rs.ReplicaType].Replicas = ptr.To[int32](rs.ReplicaCount) j.Spec.PaddleReplicaSpecs[rs.ReplicaType].Template.Spec.RestartPolicy = corev1.RestartPolicy(rs.RestartPolicy) + j.Spec.PaddleReplicaSpecs[rs.ReplicaType].Template.Spec.Containers[0].Name = "paddle" if rs.Annotations != nil { j.Spec.PaddleReplicaSpecs[rs.ReplicaType].Template.ObjectMeta.Annotations = rs.Annotations @@ -76,7 +77,7 @@ func (j *PaddleJobWrapper) PaddleReplicaSpecsDefault() *PaddleJobWrapper { RestartPolicy: "Never", Containers: []corev1.Container{ { - Name: "paddle", + Name: "c", Image: "pause", Command: []string{}, Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{}}, @@ -94,7 +95,7 @@ func (j *PaddleJobWrapper) PaddleReplicaSpecsDefault() *PaddleJobWrapper { RestartPolicy: "Never", Containers: []corev1.Container{ { - Name: "paddle", + Name: "c", Image: "pause", Command: []string{}, Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{}}, diff --git a/pkg/util/testingjobs/pytorchjob/wrappers_pytorchjob.go b/pkg/util/testingjobs/pytorchjob/wrappers_pytorchjob.go index b8aacd3df0..b9fcda2348 100644 --- a/pkg/util/testingjobs/pytorchjob/wrappers_pytorchjob.go +++ b/pkg/util/testingjobs/pytorchjob/wrappers_pytorchjob.go @@ -42,6 +42,7 @@ func MakePyTorchJob(name, ns string) *PyTorchJobWrapper { RunPolicy: kftraining.RunPolicy{ Suspend: ptr.To(true), }, + PyTorchReplicaSpecs: make(map[kftraining.ReplicaType]*kftraining.ReplicaSpec), }, }} } @@ -54,28 +55,55 @@ type PyTorchReplicaSpecRequirement struct { } func (j *PyTorchJobWrapper) PyTorchReplicaSpecs(replicaSpecs ...PyTorchReplicaSpecRequirement) *PyTorchJobWrapper { - j.Spec.PyTorchReplicaSpecs = make(map[kftraining.ReplicaType]*kftraining.ReplicaSpec) - + j = j.PyTorchReplicaSpecsDefault() for _, rs := range replicaSpecs { - j.Spec.PyTorchReplicaSpecs[rs.ReplicaType] = &kftraining.ReplicaSpec{ - Replicas: ptr.To[int32](rs.ReplicaCount), - Template: corev1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Annotations: rs.Annotations, + j.Spec.PyTorchReplicaSpecs[rs.ReplicaType].Replicas = ptr.To[int32](rs.ReplicaCount) + j.Spec.PyTorchReplicaSpecs[rs.ReplicaType].Template.Spec.RestartPolicy = corev1.RestartPolicy(rs.RestartPolicy) + j.Spec.PyTorchReplicaSpecs[rs.ReplicaType].Template.Spec.Containers[0].Name = "pytorch" + + if rs.Annotations != nil { + j.Spec.PyTorchReplicaSpecs[rs.ReplicaType].Template.ObjectMeta.Annotations = rs.Annotations + } + } + + return j +} + +func (j *PyTorchJobWrapper) PyTorchReplicaSpecsDefault() *PyTorchJobWrapper { + j.Spec.PyTorchReplicaSpecs[kftraining.PyTorchJobReplicaTypeMaster] = &kftraining.ReplicaSpec{ + Replicas: ptr.To[int32](1), + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + RestartPolicy: "Never", + Containers: []corev1.Container{ + { + Name: "c", + Image: "pause", + Command: []string{}, + Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{}}, + }, }, - Spec: corev1.PodSpec{ - RestartPolicy: corev1.RestartPolicy(rs.RestartPolicy), - Containers: []corev1.Container{ - { - Name: "pytorch", // each pytorchjob container must have the name "pytorch" - Command: []string{}, - Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{}}, - }, + NodeSelector: map[string]string{}, + }, + }, + } + + j.Spec.PyTorchReplicaSpecs[kftraining.PyTorchJobReplicaTypeWorker] = &kftraining.ReplicaSpec{ + Replicas: ptr.To[int32](1), + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + RestartPolicy: "Never", + Containers: []corev1.Container{ + { + Name: "c", + Image: "pause", + Command: []string{}, + Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{}}, }, - NodeSelector: map[string]string{}, }, + NodeSelector: map[string]string{}, }, - } + }, } return j diff --git a/pkg/util/testingjobs/tfjob/wrappers_tfjob.go b/pkg/util/testingjobs/tfjob/wrappers_tfjob.go index abc2204a43..d4a0ea2fbd 100644 --- a/pkg/util/testingjobs/tfjob/wrappers_tfjob.go +++ b/pkg/util/testingjobs/tfjob/wrappers_tfjob.go @@ -56,28 +56,73 @@ type TFReplicaSpecRequirement struct { } func (j *TFJobWrapper) TFReplicaSpecs(replicaSpecs ...TFReplicaSpecRequirement) *TFJobWrapper { - j.Spec.TFReplicaSpecs = make(map[kftraining.ReplicaType]*kftraining.ReplicaSpec) - + j = j.TFReplicaSpecsDefault() for _, rs := range replicaSpecs { - j.Spec.TFReplicaSpecs[rs.ReplicaType] = &kftraining.ReplicaSpec{ - Replicas: ptr.To[int32](rs.ReplicaCount), - Template: corev1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Annotations: rs.Annotations, + j.Spec.TFReplicaSpecs[rs.ReplicaType].Replicas = ptr.To[int32](rs.ReplicaCount) + j.Spec.TFReplicaSpecs[rs.ReplicaType].Template.Spec.RestartPolicy = corev1.RestartPolicy(rs.RestartPolicy) + j.Spec.TFReplicaSpecs[rs.ReplicaType].Template.Spec.Containers[0].Name = "tensorflow" + + if rs.Annotations != nil { + j.Spec.TFReplicaSpecs[rs.ReplicaType].Template.ObjectMeta.Annotations = rs.Annotations + } + } + + return j +} + +func (j *TFJobWrapper) TFReplicaSpecsDefault() *TFJobWrapper { + j.Spec.TFReplicaSpecs[kftraining.TFJobReplicaTypeChief] = &kftraining.ReplicaSpec{ + Replicas: ptr.To[int32](1), + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + RestartPolicy: "Never", + Containers: []corev1.Container{ + { + Name: "c", + Image: "pause", + Command: []string{}, + Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{}}, + }, }, - Spec: corev1.PodSpec{ - RestartPolicy: corev1.RestartPolicy(rs.RestartPolicy), - Containers: []corev1.Container{ - { - Name: "tensorflow", // each tfjob container must have the name "tensorflow" - Command: []string{}, - Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{}}, - }, + NodeSelector: map[string]string{}, + }, + }, + } + + j.Spec.TFReplicaSpecs[kftraining.TFJobReplicaTypePS] = &kftraining.ReplicaSpec{ + Replicas: ptr.To[int32](1), + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + RestartPolicy: "Never", + Containers: []corev1.Container{ + { + Name: "c", + Image: "pause", + Command: []string{}, + Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{}}, }, - NodeSelector: map[string]string{}, }, + NodeSelector: map[string]string{}, }, - } + }, + } + + j.Spec.TFReplicaSpecs[kftraining.TFJobReplicaTypeWorker] = &kftraining.ReplicaSpec{ + Replicas: ptr.To[int32](1), + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + RestartPolicy: "Never", + Containers: []corev1.Container{ + { + Name: "c", + Image: "pause", + Command: []string{}, + Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{}}, + }, + }, + NodeSelector: map[string]string{}, + }, + }, } return j diff --git a/pkg/util/testingjobs/xgboostjob/wrappers.go b/pkg/util/testingjobs/xgboostjob/wrappers.go index 063a96fe9d..c2a99c9449 100644 --- a/pkg/util/testingjobs/xgboostjob/wrappers.go +++ b/pkg/util/testingjobs/xgboostjob/wrappers.go @@ -55,28 +55,55 @@ type XGBReplicaSpecRequirement struct { } func (j *XGBoostJobWrapper) XGBReplicaSpecs(replicaSpecs ...XGBReplicaSpecRequirement) *XGBoostJobWrapper { - j.Spec.XGBReplicaSpecs = make(map[kftraining.ReplicaType]*kftraining.ReplicaSpec) - + j = j.XGBReplicaSpecsDefault() for _, rs := range replicaSpecs { - j.Spec.XGBReplicaSpecs[rs.ReplicaType] = &kftraining.ReplicaSpec{ - Replicas: ptr.To[int32](rs.ReplicaCount), - Template: corev1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Annotations: rs.Annotations, + j.Spec.XGBReplicaSpecs[rs.ReplicaType].Replicas = ptr.To[int32](rs.ReplicaCount) + j.Spec.XGBReplicaSpecs[rs.ReplicaType].Template.Spec.RestartPolicy = corev1.RestartPolicy(rs.RestartPolicy) + j.Spec.XGBReplicaSpecs[rs.ReplicaType].Template.Spec.Containers[0].Name = "xgboost" + + if rs.Annotations != nil { + j.Spec.XGBReplicaSpecs[rs.ReplicaType].Template.ObjectMeta.Annotations = rs.Annotations + } + } + + return j +} + +func (j *XGBoostJobWrapper) XGBReplicaSpecsDefault() *XGBoostJobWrapper { + j.Spec.XGBReplicaSpecs[kftraining.XGBoostJobReplicaTypeMaster] = &kftraining.ReplicaSpec{ + Replicas: ptr.To[int32](1), + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + RestartPolicy: "Never", + Containers: []corev1.Container{ + { + Name: "c", + Image: "pause", + Command: []string{}, + Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{}}, + }, }, - Spec: corev1.PodSpec{ - RestartPolicy: corev1.RestartPolicy(rs.RestartPolicy), - Containers: []corev1.Container{ - { - Name: "xgboost", // each XgBoostJob container must have the name "xgboost" - Command: []string{}, - Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{}}, - }, + NodeSelector: map[string]string{}, + }, + }, + } + + j.Spec.XGBReplicaSpecs[kftraining.XGBoostJobReplicaTypeWorker] = &kftraining.ReplicaSpec{ + Replicas: ptr.To[int32](1), + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + RestartPolicy: "Never", + Containers: []corev1.Container{ + { + Name: "c", + Image: "pause", + Command: []string{}, + Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{}}, }, - NodeSelector: map[string]string{}, }, + NodeSelector: map[string]string{}, }, - } + }, } return j diff --git a/test/integration/controller/jobs/mxjob/mxjob_controller_test.go b/test/integration/controller/jobs/mxjob/mxjob_controller_test.go index 77f68629c2..fb94fa9996 100644 --- a/test/integration/controller/jobs/mxjob/mxjob_controller_test.go +++ b/test/integration/controller/jobs/mxjob/mxjob_controller_test.go @@ -73,7 +73,7 @@ var _ = ginkgo.Describe("Job controller", ginkgo.Ordered, ginkgo.ContinueOnFailu }) ginkgo.It("Should reconcile MXJobs", func() { - kfJob := kubeflowjob.KubeflowJob{KFJobControl: (*workloadmxjob.JobControl)(testingmxjob.MakeMXJob(jobName, ns.Name).Obj())} + kfJob := kubeflowjob.KubeflowJob{KFJobControl: (*workloadmxjob.JobControl)(testingmxjob.MakeMXJob(jobName, ns.Name).MXReplicaSpecsDefault().Obj())} createdJob := kubeflowjob.KubeflowJob{KFJobControl: (*workloadmxjob.JobControl)(&kftraining.MXJob{})} kftesting.ShouldReconcileJob(ctx, k8sClient, kfJob, createdJob, []kftesting.PodSetsResource{ diff --git a/test/integration/controller/jobs/pytorchjob/pytorchjob_controller_test.go b/test/integration/controller/jobs/pytorchjob/pytorchjob_controller_test.go index e725a5b481..fa02d31fac 100644 --- a/test/integration/controller/jobs/pytorchjob/pytorchjob_controller_test.go +++ b/test/integration/controller/jobs/pytorchjob/pytorchjob_controller_test.go @@ -80,7 +80,7 @@ var _ = ginkgo.Describe("Job controller", ginkgo.Ordered, ginkgo.ContinueOnFailu }) ginkgo.It("Should reconcile PyTorchJobs", func() { - kfJob := kubeflowjob.KubeflowJob{KFJobControl: (*workloadpytorchjob.JobControl)(testingpytorchjob.MakePyTorchJob(jobName, ns.Name).Obj())} + kfJob := kubeflowjob.KubeflowJob{KFJobControl: (*workloadpytorchjob.JobControl)(testingpytorchjob.MakePyTorchJob(jobName, ns.Name).PyTorchReplicaSpecsDefault().Obj())} createdJob := kubeflowjob.KubeflowJob{KFJobControl: (*workloadpytorchjob.JobControl)(&kftraining.PyTorchJob{})} kftesting.ShouldReconcileJob(ctx, k8sClient, kfJob, createdJob, []kftesting.PodSetsResource{ { @@ -125,7 +125,9 @@ var _ = ginkgo.Describe("Job controller for workloads when only jobs with queue ginkgo.It("Should reconcile jobs only when queue is set", func() { ginkgo.By("checking the workload is not created when queue name is not set") - job := testingpytorchjob.MakePyTorchJob(jobName, ns.Name).Obj() + job := testingpytorchjob.MakePyTorchJob(jobName, ns.Name). + PyTorchReplicaSpecsDefault(). + Obj() gomega.Expect(k8sClient.Create(ctx, job)).Should(gomega.Succeed()) lookupKey := types.NamespacedName{Name: jobName, Namespace: ns.Name} createdJob := &kftraining.PyTorchJob{} @@ -182,6 +184,7 @@ var _ = ginkgo.Describe("Job controller for workloads when only jobs with queue createdJob := &kftraining.PyTorchJob{} createdWorkload := &kueue.Workload{} job := testingpytorchjob.MakePyTorchJob(jobName, ns.Name). + PyTorchReplicaSpecsDefault(). PodAnnotation(kftraining.PyTorchJobReplicaTypeWorker, "old-ann-key", "old-ann-value"). PodLabel(kftraining.PyTorchJobReplicaTypeWorker, "old-label-key", "old-label-value"). Queue(localQueue.Name). @@ -359,7 +362,7 @@ var _ = ginkgo.Describe("Job controller when waitForPodsReady enabled", ginkgo.O ginkgo.DescribeTable("Single job at different stages of progress towards completion", func(podsReadyTestSpec kftesting.PodsReadyTestSpec) { - kfJob := kubeflowjob.KubeflowJob{KFJobControl: (*workloadpytorchjob.JobControl)(testingpytorchjob.MakePyTorchJob(jobName, ns.Name).Parallelism(2).Obj())} + kfJob := kubeflowjob.KubeflowJob{KFJobControl: (*workloadpytorchjob.JobControl)(testingpytorchjob.MakePyTorchJob(jobName, ns.Name).PyTorchReplicaSpecsDefault().Parallelism(2).Obj())} createdJob := kubeflowjob.KubeflowJob{KFJobControl: (*workloadpytorchjob.JobControl)(&kftraining.PyTorchJob{})} kftesting.JobControllerWhenWaitForPodsReadyEnabled(ctx, k8sClient, kfJob, createdJob, podsReadyTestSpec, []kftesting.PodSetsResource{ @@ -512,7 +515,9 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", ginkgo.Orde gomega.Expect(k8sClient.Create(ctx, localQueue)).Should(gomega.Succeed()) kfJob := kubeflowjob.KubeflowJob{KFJobControl: (*workloadpytorchjob.JobControl)( - testingpytorchjob.MakePyTorchJob(jobName, ns.Name).Queue(localQueue.Name). + testingpytorchjob.MakePyTorchJob(jobName, ns.Name). + PyTorchReplicaSpecsDefault(). + Queue(localQueue.Name). Request(kftraining.PyTorchJobReplicaTypeMaster, corev1.ResourceCPU, "3"). Request(kftraining.PyTorchJobReplicaTypeWorker, corev1.ResourceCPU, "4"). Obj(), @@ -534,7 +539,9 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", ginkgo.Orde ginkgo.When("The workload's admission is removed", func() { ginkgo.It("Should restore the original node selectors", func() { localQueue := testing.MakeLocalQueue("local-queue", ns.Name).ClusterQueue(clusterQueue.Name).Obj() - job := testingpytorchjob.MakePyTorchJob(jobName, ns.Name).Queue(localQueue.Name). + job := testingpytorchjob.MakePyTorchJob(jobName, ns.Name). + PyTorchReplicaSpecsDefault(). + Queue(localQueue.Name). Request(kftraining.PyTorchJobReplicaTypeMaster, corev1.ResourceCPU, "3"). Request(kftraining.PyTorchJobReplicaTypeWorker, corev1.ResourceCPU, "4"). Obj() diff --git a/test/integration/controller/jobs/tfjob/tfjob_controller_test.go b/test/integration/controller/jobs/tfjob/tfjob_controller_test.go index 9e5ab1935c..855c941518 100644 --- a/test/integration/controller/jobs/tfjob/tfjob_controller_test.go +++ b/test/integration/controller/jobs/tfjob/tfjob_controller_test.go @@ -74,7 +74,7 @@ var _ = ginkgo.Describe("Job controller", ginkgo.Ordered, ginkgo.ContinueOnFailu }) ginkgo.It("Should reconcile TFJobs", func() { - kfJob := kubeflowjob.KubeflowJob{KFJobControl: (*workloadtfjob.JobControl)(testingtfjob.MakeTFJob(jobName, ns.Name).Obj())} + kfJob := kubeflowjob.KubeflowJob{KFJobControl: (*workloadtfjob.JobControl)(testingtfjob.MakeTFJob(jobName, ns.Name).TFReplicaSpecsDefault().Obj())} createdJob := kubeflowjob.KubeflowJob{KFJobControl: (*workloadtfjob.JobControl)(&kftraining.TFJob{})} kftesting.ShouldReconcileJob(ctx, k8sClient, kfJob, createdJob, []kftesting.PodSetsResource{ @@ -130,7 +130,7 @@ var _ = ginkgo.Describe("Job controller when waitForPodsReady enabled", ginkgo.O ginkgo.DescribeTable("Single job at different stages of progress towards completion", func(podsReadyTestSpec kftesting.PodsReadyTestSpec) { - kfJob := kubeflowjob.KubeflowJob{KFJobControl: (*workloadtfjob.JobControl)(testingtfjob.MakeTFJob(jobName, ns.Name).Parallelism(2, 2).Obj())} + kfJob := kubeflowjob.KubeflowJob{KFJobControl: (*workloadtfjob.JobControl)(testingtfjob.MakeTFJob(jobName, ns.Name).TFReplicaSpecsDefault().Parallelism(2, 2).Obj())} createdJob := kubeflowjob.KubeflowJob{KFJobControl: (*workloadtfjob.JobControl)(&kftraining.TFJob{})} kftesting.JobControllerWhenWaitForPodsReadyEnabled(ctx, k8sClient, kfJob, createdJob, podsReadyTestSpec, []kftesting.PodSetsResource{ @@ -288,6 +288,7 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", ginkgo.Orde kfJob := kubeflowjob.KubeflowJob{KFJobControl: (*workloadtfjob.JobControl)( testingtfjob.MakeTFJob(jobName, ns.Name).Queue(localQueue.Name). + TFReplicaSpecsDefault(). Request(kftraining.TFJobReplicaTypeChief, corev1.ResourceCPU, "3"). Request(kftraining.TFJobReplicaTypePS, corev1.ResourceCPU, "4"). Request(kftraining.TFJobReplicaTypeWorker, corev1.ResourceCPU, "4"). diff --git a/test/integration/controller/jobs/xgboostjob/xgboostjob_controller_test.go b/test/integration/controller/jobs/xgboostjob/xgboostjob_controller_test.go index da4273868f..7f6a986486 100644 --- a/test/integration/controller/jobs/xgboostjob/xgboostjob_controller_test.go +++ b/test/integration/controller/jobs/xgboostjob/xgboostjob_controller_test.go @@ -74,7 +74,7 @@ var _ = ginkgo.Describe("Job controller", ginkgo.Ordered, ginkgo.ContinueOnFailu }) ginkgo.It("Should reconcile XGBoostJobs", func() { - kfJob := kubeflowjob.KubeflowJob{KFJobControl: (*workloadxgboostjob.JobControl)(testingxgboostjob.MakeXGBoostJob(jobName, ns.Name).Obj())} + kfJob := kubeflowjob.KubeflowJob{KFJobControl: (*workloadxgboostjob.JobControl)(testingxgboostjob.MakeXGBoostJob(jobName, ns.Name).XGBReplicaSpecsDefault().Obj())} createdJob := kubeflowjob.KubeflowJob{KFJobControl: (*workloadxgboostjob.JobControl)(&kftraining.XGBoostJob{})} kftesting.ShouldReconcileJob(ctx, k8sClient, kfJob, createdJob, []kftesting.PodSetsResource{ { @@ -125,7 +125,7 @@ var _ = ginkgo.Describe("Job controller when waitForPodsReady enabled", ginkgo.O ginkgo.DescribeTable("Single job at different stages of progress towards completion", func(podsReadyTestSpec kftesting.PodsReadyTestSpec) { - kfJob := kubeflowjob.KubeflowJob{KFJobControl: (*workloadxgboostjob.JobControl)(testingxgboostjob.MakeXGBoostJob(jobName, ns.Name).Parallelism(2).Obj())} + kfJob := kubeflowjob.KubeflowJob{KFJobControl: (*workloadxgboostjob.JobControl)(testingxgboostjob.MakeXGBoostJob(jobName, ns.Name).XGBReplicaSpecsDefault().Parallelism(2).Obj())} createdJob := kubeflowjob.KubeflowJob{KFJobControl: (*workloadxgboostjob.JobControl)(&kftraining.XGBoostJob{})} kftesting.JobControllerWhenWaitForPodsReadyEnabled(ctx, k8sClient, kfJob, createdJob, podsReadyTestSpec, []kftesting.PodSetsResource{ { @@ -278,6 +278,7 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", ginkgo.Orde kfJob := kubeflowjob.KubeflowJob{KFJobControl: (*workloadxgboostjob.JobControl)( testingxgboostjob.MakeXGBoostJob(jobName, ns.Name).Queue(localQueue.Name). + XGBReplicaSpecsDefault(). Request(kftraining.XGBoostJobReplicaTypeMaster, corev1.ResourceCPU, "3"). Request(kftraining.XGBoostJobReplicaTypeWorker, corev1.ResourceCPU, "4"). Obj(),