From 3b81bb67820a229dc9899c416a5f1ca50aae8f8f Mon Sep 17 00:00:00 2001 From: Michal Szadkowski Date: Tue, 30 Jul 2024 12:39:53 +0200 Subject: [PATCH] rework after code review --- Makefile-test.mk | 3 +- charts/kueue/templates/rbac/role.yaml | 1 + config/components/rbac/role.yaml | 1 + hack/e2e-common.sh | 1 - hack/multikueue-e2e-test.sh | 6 +- pkg/controller/jobframework/validation.go | 2 +- .../jobs/job/job_multikueue_adapter.go | 2 +- .../jobs/jobset/jobset_multikueue_adapter.go | 2 +- .../kubeflow/jobs/tfjob/tfjob_controller.go | 2 +- .../jobs/tfjob/tfjob_multikueue_adapter.go | 9 ++- site/content/en/docs/concepts/multikueue.md | 6 ++ .../en/docs/tasks/manage/setup_multikueue.md | 10 +++ .../create-multikueue-kubeconfig.sh | 16 +++++ test/e2e/multikueue/e2e_test.go | 65 +++++-------------- test/e2e/multikueue/suite_test.go | 2 +- .../integration/multikueue/multikueue_test.go | 1 - 16 files changed, 67 insertions(+), 62 deletions(-) diff --git a/Makefile-test.mk b/Makefile-test.mk index b0915648b5..e400027606 100644 --- a/Makefile-test.mk +++ b/Makefile-test.mk @@ -58,6 +58,7 @@ IMAGE_TAG ?= $(IMAGE_REPO):$(GIT_TAG) # JobSet Version JOBSET_VERSION = $(shell $(GO_CMD) list -m -f "{{.Version}}" sigs.k8s.io/jobset) +KUBEFLOW_VERSION = $(shell $(GO_CMD) list -m -f "{{.Version}}" github.com/kubeflow/training-operator) ##@ Tests @@ -96,7 +97,7 @@ run-test-e2e-%: FORCE run-test-multikueue-e2e-%: K8S_VERSION = $(@:run-test-multikueue-e2e-%=%) run-test-multikueue-e2e-%: FORCE @echo Running multikueue e2e for k8s ${K8S_VERSION} - E2E_KIND_VERSION="kindest/node:v$(K8S_VERSION)" KIND_CLUSTER_NAME=$(KIND_CLUSTER_NAME) CREATE_KIND_CLUSTER=$(CREATE_KIND_CLUSTER) ARTIFACTS="$(ARTIFACTS)/$@" IMAGE_TAG=$(IMAGE_TAG) GINKGO_ARGS="$(GINKGO_ARGS)" JOBSET_VERSION=$(JOBSET_VERSION) ./hack/multikueue-e2e-test.sh + E2E_KIND_VERSION="kindest/node:v$(K8S_VERSION)" KIND_CLUSTER_NAME=$(KIND_CLUSTER_NAME) CREATE_KIND_CLUSTER=$(CREATE_KIND_CLUSTER) ARTIFACTS="$(ARTIFACTS)/$@" IMAGE_TAG=$(IMAGE_TAG) GINKGO_ARGS="$(GINKGO_ARGS)" JOBSET_VERSION=$(JOBSET_VERSION) KUBEFLOW_VERSION=$(KUBEFLOW_VERSION) ./hack/multikueue-e2e-test.sh SCALABILITY_RUNNER := $(PROJECT_DIR)/bin/performance-scheduler-runner .PHONY: performance-scheduler-runner diff --git a/charts/kueue/templates/rbac/role.yaml b/charts/kueue/templates/rbac/role.yaml index e1af9b089b..d0c10d5e1b 100644 --- a/charts/kueue/templates/rbac/role.yaml +++ b/charts/kueue/templates/rbac/role.yaml @@ -302,6 +302,7 @@ rules: verbs: - get - update + - patch - apiGroups: - kubeflow.org resources: diff --git a/config/components/rbac/role.yaml b/config/components/rbac/role.yaml index 48da53c4c0..5e3306bbbd 100644 --- a/config/components/rbac/role.yaml +++ b/config/components/rbac/role.yaml @@ -301,6 +301,7 @@ rules: verbs: - get - update + - patch - apiGroups: - kubeflow.org resources: diff --git a/hack/e2e-common.sh b/hack/e2e-common.sh index 6bd6a40c5f..ebbf127c05 100644 --- a/hack/e2e-common.sh +++ b/hack/e2e-common.sh @@ -23,7 +23,6 @@ export JOBSET_MANIFEST=https://github.com/kubernetes-sigs/jobset/releases/downlo export JOBSET_IMAGE=registry.k8s.io/jobset/jobset:${JOBSET_VERSION} export JOBSET_CRDS=${ROOT_DIR}/dep-crds/jobset-operator/ -export KUBEFLOW_VERSION=v1.7.0 export KUBEFLOW_MANIFEST=https://github.com/kubeflow/training-operator/manifests/overlays/standalone?ref=${KUBEFLOW_VERSION} #no matching semver tag unfortunately export KUBEFLOW_IMAGE=kubeflow/training-operator:v1-855e096 diff --git a/hack/multikueue-e2e-test.sh b/hack/multikueue-e2e-test.sh index 6b238c412b..16da205d93 100755 --- a/hack/multikueue-e2e-test.sh +++ b/hack/multikueue-e2e-test.sh @@ -79,14 +79,14 @@ function kind_load { install_jobset $WORKER1_KIND_CLUSTER_NAME install_jobset $WORKER2_KIND_CLUSTER_NAME - #KUBEFLOW SETUP - #MANAGER + # KUBEFLOW SETUP + # MANAGER # Only install the CRDs and not the controller to be able to # have Kubeflow Jobs admitted without execution in the manager cluster. kubectl config use-context kind-${MANAGER_KIND_CLUSTER_NAME} kubectl apply -k ${KUBEFLOW_CRDS} - #WORKERS + # WORKERS docker pull kubeflow/training-operator:v1-855e096 install_kubeflow $WORKER1_KIND_CLUSTER_NAME install_kubeflow $WORKER2_KIND_CLUSTER_NAME diff --git a/pkg/controller/jobframework/validation.go b/pkg/controller/jobframework/validation.go index fad264f217..4168abebd4 100644 --- a/pkg/controller/jobframework/validation.go +++ b/pkg/controller/jobframework/validation.go @@ -39,7 +39,7 @@ var ( supportedPrebuiltWlJobGVKs = sets.New( batchv1.SchemeGroupVersion.WithKind("Job").String(), jobset.SchemeGroupVersion.WithKind("JobSet").String(), - kftraining.SchemeGroupVersion.WithKind("TFJob").String()) + kftraining.SchemeGroupVersion.WithKind(kftraining.TFJobKind).String()) ) // ValidateJobOnCreate encapsulates all GenericJob validations that must be performed on a Create operation diff --git a/pkg/controller/jobs/job/job_multikueue_adapter.go b/pkg/controller/jobs/job/job_multikueue_adapter.go index 3d00b5a380..9985b9578a 100644 --- a/pkg/controller/jobs/job/job_multikueue_adapter.go +++ b/pkg/controller/jobs/job/job_multikueue_adapter.go @@ -94,7 +94,7 @@ func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Clie // add the prebuilt workload if remoteJob.Labels == nil { - remoteJob.Labels = map[string]string{} + remoteJob.Labels = make(map[string]string, 2) } remoteJob.Labels[constants.PrebuiltWorkloadLabel] = workloadName remoteJob.Labels[kueuealpha.MultiKueueOriginLabel] = origin diff --git a/pkg/controller/jobs/jobset/jobset_multikueue_adapter.go b/pkg/controller/jobs/jobset/jobset_multikueue_adapter.go index 895b6ca039..411f36ded6 100644 --- a/pkg/controller/jobs/jobset/jobset_multikueue_adapter.go +++ b/pkg/controller/jobs/jobset/jobset_multikueue_adapter.go @@ -68,7 +68,7 @@ func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Clie // add the prebuilt workload if remoteJob.Labels == nil { - remoteJob.Labels = map[string]string{} + remoteJob.Labels = make(map[string]string, 2) } remoteJob.Labels[constants.PrebuiltWorkloadLabel] = workloadName remoteJob.Labels[kueuealpha.MultiKueueOriginLabel] = origin diff --git a/pkg/controller/jobs/kubeflow/jobs/tfjob/tfjob_controller.go b/pkg/controller/jobs/kubeflow/jobs/tfjob/tfjob_controller.go index f605eb82d3..e3932ab393 100644 --- a/pkg/controller/jobs/kubeflow/jobs/tfjob/tfjob_controller.go +++ b/pkg/controller/jobs/kubeflow/jobs/tfjob/tfjob_controller.go @@ -52,7 +52,7 @@ func init() { // +kubebuilder:rbac:groups=scheduling.k8s.io,resources=priorityclasses,verbs=list;get;watch // +kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update;patch // +kubebuilder:rbac:groups=kubeflow.org,resources=tfjobs,verbs=get;list;watch;update;patch -// +kubebuilder:rbac:groups=kubeflow.org,resources=tfjobs/status,verbs=get;update +// +kubebuilder:rbac:groups=kubeflow.org,resources=tfjobs/status,verbs=get;update;patch // +kubebuilder:rbac:groups=kubeflow.org,resources=tfjobs/finalizers,verbs=get;update // +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads/status,verbs=get;update;patch diff --git a/pkg/controller/jobs/kubeflow/jobs/tfjob/tfjob_multikueue_adapter.go b/pkg/controller/jobs/kubeflow/jobs/tfjob/tfjob_multikueue_adapter.go index 4f36dd1350..4c81578ef8 100644 --- a/pkg/controller/jobs/kubeflow/jobs/tfjob/tfjob_multikueue_adapter.go +++ b/pkg/controller/jobs/kubeflow/jobs/tfjob/tfjob_multikueue_adapter.go @@ -32,6 +32,7 @@ import ( "sigs.k8s.io/kueue/pkg/controller/constants" "sigs.k8s.io/kueue/pkg/controller/jobframework" "sigs.k8s.io/kueue/pkg/util/api" + clientutil "sigs.k8s.io/kueue/pkg/util/client" ) type multikueueAdapter struct{} @@ -53,8 +54,10 @@ func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Clie // if the remote exists, just copy the status if err == nil { - localJob.Status = remoteJob.Status - return localClient.Status().Update(ctx, &localJob) + return clientutil.PatchStatus(ctx, localClient, &localJob, func() (bool, error) { + localJob.Status = remoteJob.Status + return true, nil + }) } remoteJob = &kftraining.TFJob{ @@ -64,7 +67,7 @@ func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Clie // add the prebuilt workload if remoteJob.Labels == nil { - remoteJob.Labels = map[string]string{} + remoteJob.Labels = make(map[string]string, 2) } remoteJob.Labels[constants.PrebuiltWorkloadLabel] = workloadName remoteJob.Labels[kueuealpha.MultiKueueOriginLabel] = origin diff --git a/site/content/en/docs/concepts/multikueue.md b/site/content/en/docs/concepts/multikueue.md index 7d811c1740..3cf59b1e86 100644 --- a/site/content/en/docs/concepts/multikueue.md +++ b/site/content/en/docs/concepts/multikueue.md @@ -68,6 +68,12 @@ The `managedBy` field is available as an Alpha feature staring Kubernetes 1.30.0 We recommend using JobSet v0.5.1 or newer. +### Kubeflow + +The supported version of the Kubeflow Training Operator is v1.7.0. +The Management cluster should only install the CRDs and not the package itself. +On the other hand, the Worker cluster can install the package directly. + ## Submitting Jobs In a [configured MultiKueue environment](/docs/tasks/manage/setup_multikueue), you can submit any MultiKueue supported job to the Manager cluster, targeting a ClusterQueue configured for Multikueue. Kueue delegates the job to the configured worker clusters without any additional configuration changes. diff --git a/site/content/en/docs/tasks/manage/setup_multikueue.md b/site/content/en/docs/tasks/manage/setup_multikueue.md index 76064c6b9e..865907a8af 100644 --- a/site/content/en/docs/tasks/manage/setup_multikueue.md +++ b/site/content/en/docs/tasks/manage/setup_multikueue.md @@ -74,6 +74,16 @@ kubectl apply --server-side -f https://raw.githubusercontent.com/kubernetes-sigs ``` {{% /alert %}} +### Kubeflow Installation + +{{% alert title="Warning" color="warning" %}} +Make sure to only install the Kubeflow TFJobs CRD of version v1.7.0 on the management cluster. + +```bash + kubectl apply --server-side -f https://github.com/kubeflow/training-operator/blob/v1.7.0/manifests/base/crds/kubeflow.org_tfjobs.yaml +``` +{{% /alert %}} + ### Enable the MultiKueue feature Enable the `MultiKueue` feature. diff --git a/site/static/examples/multikueue/create-multikueue-kubeconfig.sh b/site/static/examples/multikueue/create-multikueue-kubeconfig.sh index f73aa45736..edb87f1303 100644 --- a/site/static/examples/multikueue/create-multikueue-kubeconfig.sh +++ b/site/static/examples/multikueue/create-multikueue-kubeconfig.sh @@ -85,6 +85,22 @@ rules: - get - patch - update +- apiGroups: + - kubeflow.org + resources: + - tfjobs + verbs: + - create + - delete + - get + - list + - watch +- apiGroups: + - kubeflow.org + resources: + - tfjobs/status + verbs: + - get --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding diff --git a/test/e2e/multikueue/e2e_test.go b/test/e2e/multikueue/e2e_test.go index 88c4aa8107..a65721943c 100644 --- a/test/e2e/multikueue/e2e_test.go +++ b/test/e2e/multikueue/e2e_test.go @@ -372,27 +372,18 @@ var _ = ginkgo.Describe("MultiKueue", func() { Queue(managerLq.Name). TFReplicaSpecs( testingtfjob.TFReplicaSpecRequirement{ - ReplicaType: kftraining.TFJobReplicaTypeChief, - ReplicaCount: 1, - Annotations: map[string]string{ - "sidecar.istio.io/inject": "false", - }, + ReplicaType: kftraining.TFJobReplicaTypeChief, + ReplicaCount: 1, RestartPolicy: "OnFailure", }, testingtfjob.TFReplicaSpecRequirement{ - ReplicaType: kftraining.TFJobReplicaTypePS, - ReplicaCount: 1, - Annotations: map[string]string{ - "sidecar.istio.io/inject": "false", - }, + ReplicaType: kftraining.TFJobReplicaTypePS, + ReplicaCount: 1, RestartPolicy: "Never", }, testingtfjob.TFReplicaSpecRequirement{ - ReplicaType: kftraining.TFJobReplicaTypeWorker, - ReplicaCount: 2, - Annotations: map[string]string{ - "sidecar.istio.io/inject": "false", - }, + ReplicaType: kftraining.TFJobReplicaTypeWorker, + ReplicaCount: 1, RestartPolicy: "OnFailure", }, ). @@ -402,29 +393,29 @@ var _ = ginkgo.Describe("MultiKueue", func() { Request(kftraining.TFJobReplicaTypePS, corev1.ResourceMemory, "200M"). Request(kftraining.TFJobReplicaTypeWorker, corev1.ResourceCPU, "0.5"). Request(kftraining.TFJobReplicaTypeWorker, corev1.ResourceMemory, "100M"). - Image(kftraining.TFJobReplicaTypeChief, "gcr.io/k8s-staging-perf-tests/sleep:v0.1.0", []string{"5s"}). - Image(kftraining.TFJobReplicaTypePS, "gcr.io/k8s-staging-perf-tests/sleep:v0.1.0", []string{"5s"}). - Image(kftraining.TFJobReplicaTypeWorker, "gcr.io/k8s-staging-perf-tests/sleep:v0.1.0", []string{"5s"}). + Image(kftraining.TFJobReplicaTypeChief, "gcr.io/k8s-staging-perf-tests/sleep:v0.1.0", []string{"1ms"}). + Image(kftraining.TFJobReplicaTypePS, "gcr.io/k8s-staging-perf-tests/sleep:v0.1.0", []string{"1ms"}). + Image(kftraining.TFJobReplicaTypeWorker, "gcr.io/k8s-staging-perf-tests/sleep:v0.1.0", []string{"1ms"}). Obj() ginkgo.By("Creating the TfJob", func() { gomega.Expect(k8sManagerClient.Create(ctx, tfJob)).Should(gomega.Succeed()) }) - createdLeaderWorkload := &kueue.Workload{} wlLookupKey := types.NamespacedName{Name: workloadtfjob.GetWorkloadNameForTFJob(tfJob.Name, tfJob.UID), Namespace: managerNs.Name} // the execution should be given to the worker ginkgo.By("Waiting to be admitted in worker1 and manager", func() { gomega.Eventually(func(g gomega.Gomega) { - g.Expect(k8sManagerClient.Get(ctx, wlLookupKey, createdLeaderWorkload)).To(gomega.Succeed()) - g.Expect(apimeta.FindStatusCondition(createdLeaderWorkload.Status.Conditions, kueue.WorkloadAdmitted)).To(gomega.BeComparableTo(&metav1.Condition{ + createdWorkload := &kueue.Workload{} + g.Expect(k8sManagerClient.Get(ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) + g.Expect(apimeta.FindStatusCondition(createdWorkload.Status.Conditions, kueue.WorkloadAdmitted)).To(gomega.BeComparableTo(&metav1.Condition{ Type: kueue.WorkloadAdmitted, Status: metav1.ConditionTrue, Reason: "Admitted", Message: "The workload is admitted", }, util.IgnoreConditionTimestampsAndObservedGeneration)) - g.Expect(workload.FindAdmissionCheck(createdLeaderWorkload.Status.AdmissionChecks, multiKueueAc.Name)).To(gomega.BeComparableTo(&kueue.AdmissionCheckState{ + g.Expect(workload.FindAdmissionCheck(createdWorkload.Status.AdmissionChecks, multiKueueAc.Name)).To(gomega.BeComparableTo(&kueue.AdmissionCheckState{ Name: multiKueueAc.Name, State: kueue.CheckStateReady, Message: `The workload got reservation on "worker1"`, @@ -432,38 +423,16 @@ var _ = ginkgo.Describe("MultiKueue", func() { }, util.Timeout, util.Interval).Should(gomega.Succeed()) }) - ginkgo.By("Waiting for the TfJob to get status updates", func() { - gomega.Eventually(func(g gomega.Gomega) { - createdTfJob := &kftraining.TFJob{} - g.Expect(k8sManagerClient.Get(ctx, client.ObjectKeyFromObject(tfJob), createdTfJob)).To(gomega.Succeed()) - g.Expect(createdTfJob.Status.ReplicaStatuses).To(gomega.BeComparableTo( - map[kftraining.ReplicaType]*kftraining.ReplicaStatus{ - kftraining.TFJobReplicaTypeChief: { - Active: 1, - Succeeded: 0, - }, - kftraining.TFJobReplicaTypePS: { - Active: 1, - Succeeded: 0, - }, - kftraining.TFJobReplicaTypeWorker: { - Active: 2, - Succeeded: 0, - }, - }, - util.IgnoreConditionTimestampsAndObservedGeneration)) - }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) - }) - ginkgo.By("Waiting for the TfJob to finish", func() { gomega.Eventually(func(g gomega.Gomega) { - g.Expect(k8sManagerClient.Get(ctx, wlLookupKey, createdLeaderWorkload)).To(gomega.Succeed()) + createdWorkload := &kueue.Workload{} + g.Expect(k8sManagerClient.Get(ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) - g.Expect(apimeta.FindStatusCondition(createdLeaderWorkload.Status.Conditions, kueue.WorkloadFinished)).To(gomega.BeComparableTo(&metav1.Condition{ + g.Expect(apimeta.FindStatusCondition(createdWorkload.Status.Conditions, kueue.WorkloadFinished)).To(gomega.BeComparableTo(&metav1.Condition{ Type: kueue.WorkloadFinished, Status: metav1.ConditionTrue, Reason: kueue.WorkloadFinishedReasonSucceeded, - Message: fmt.Sprintf("TFJob %s/%s successfully completed.", createdLeaderWorkload.Namespace, tfJob.Name), + Message: fmt.Sprintf("TFJob %s/%s successfully completed.", createdWorkload.Namespace, tfJob.Name), }, util.IgnoreConditionTimestampsAndObservedGeneration)) }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) }) diff --git a/test/e2e/multikueue/suite_test.go b/test/e2e/multikueue/suite_test.go index 0975545d49..fb7fe26026 100644 --- a/test/e2e/multikueue/suite_test.go +++ b/test/e2e/multikueue/suite_test.go @@ -213,7 +213,7 @@ var _ = ginkgo.BeforeSuite(func() { gomega.Expect(err).NotTo(gomega.HaveOccurred()) gomega.Expect(makeMultiKueueSecret(ctx, k8sManagerClient, "kueue-system", "multikueue1", worker1Kconfig)).To(gomega.Succeed()) - worker2Kconfig, err := kubeconfigForMultiKueueSA(ctx, k8sWorker2Client, worker2Cfg, "kueue-system", "mksa", worker1ClusterName) + worker2Kconfig, err := kubeconfigForMultiKueueSA(ctx, k8sWorker2Client, worker2Cfg, "kueue-system", "mksa", worker2ClusterName) gomega.Expect(err).NotTo(gomega.HaveOccurred()) gomega.Expect(makeMultiKueueSecret(ctx, k8sManagerClient, "kueue-system", "multikueue2", worker2Kconfig)).To(gomega.Succeed()) diff --git a/test/integration/multikueue/multikueue_test.go b/test/integration/multikueue/multikueue_test.go index 41e5a9cf78..73466c9949 100644 --- a/test/integration/multikueue/multikueue_test.go +++ b/test/integration/multikueue/multikueue_test.go @@ -814,7 +814,6 @@ var _ = ginkgo.Describe("Multikueue", ginkgo.Ordered, ginkgo.ContinueOnFailure, createdWorkload := &kueue.Workload{} gomega.Eventually(func(g gomega.Gomega) { g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) - fmt.Println(createdWorkload.Spec.PodSets) g.Expect(util.SetQuotaReservation(managerTestCluster.ctx, managerTestCluster.client, createdWorkload, admission)).To(gomega.Succeed()) }, util.Timeout, util.Interval).Should(gomega.Succeed()) })