Skip to content

Commit

Permalink
rework after code review
Browse files Browse the repository at this point in the history
  • Loading branch information
mszadkow committed Jul 30, 2024
1 parent e73894f commit 3b81bb6
Show file tree
Hide file tree
Showing 16 changed files with 67 additions and 62 deletions.
3 changes: 2 additions & 1 deletion Makefile-test.mk
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ IMAGE_TAG ?= $(IMAGE_REPO):$(GIT_TAG)

# JobSet Version
JOBSET_VERSION = $(shell $(GO_CMD) list -m -f "{{.Version}}" sigs.k8s.io/jobset)
KUBEFLOW_VERSION = $(shell $(GO_CMD) list -m -f "{{.Version}}" github.com/kubeflow/training-operator)

##@ Tests

Expand Down Expand Up @@ -96,7 +97,7 @@ run-test-e2e-%: FORCE
run-test-multikueue-e2e-%: K8S_VERSION = $(@:run-test-multikueue-e2e-%=%)
run-test-multikueue-e2e-%: FORCE
@echo Running multikueue e2e for k8s ${K8S_VERSION}
E2E_KIND_VERSION="kindest/node:v$(K8S_VERSION)" KIND_CLUSTER_NAME=$(KIND_CLUSTER_NAME) CREATE_KIND_CLUSTER=$(CREATE_KIND_CLUSTER) ARTIFACTS="$(ARTIFACTS)/$@" IMAGE_TAG=$(IMAGE_TAG) GINKGO_ARGS="$(GINKGO_ARGS)" JOBSET_VERSION=$(JOBSET_VERSION) ./hack/multikueue-e2e-test.sh
E2E_KIND_VERSION="kindest/node:v$(K8S_VERSION)" KIND_CLUSTER_NAME=$(KIND_CLUSTER_NAME) CREATE_KIND_CLUSTER=$(CREATE_KIND_CLUSTER) ARTIFACTS="$(ARTIFACTS)/$@" IMAGE_TAG=$(IMAGE_TAG) GINKGO_ARGS="$(GINKGO_ARGS)" JOBSET_VERSION=$(JOBSET_VERSION) KUBEFLOW_VERSION=$(KUBEFLOW_VERSION) ./hack/multikueue-e2e-test.sh

SCALABILITY_RUNNER := $(PROJECT_DIR)/bin/performance-scheduler-runner
.PHONY: performance-scheduler-runner
Expand Down
1 change: 1 addition & 0 deletions charts/kueue/templates/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ rules:
verbs:
- get
- update
- patch
- apiGroups:
- kubeflow.org
resources:
Expand Down
1 change: 1 addition & 0 deletions config/components/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ rules:
verbs:
- get
- update
- patch
- apiGroups:
- kubeflow.org
resources:
Expand Down
1 change: 0 additions & 1 deletion hack/e2e-common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ export JOBSET_MANIFEST=https://github.com/kubernetes-sigs/jobset/releases/downlo
export JOBSET_IMAGE=registry.k8s.io/jobset/jobset:${JOBSET_VERSION}
export JOBSET_CRDS=${ROOT_DIR}/dep-crds/jobset-operator/

export KUBEFLOW_VERSION=v1.7.0
export KUBEFLOW_MANIFEST=https://github.com/kubeflow/training-operator/manifests/overlays/standalone?ref=${KUBEFLOW_VERSION}
#no matching semver tag unfortunately
export KUBEFLOW_IMAGE=kubeflow/training-operator:v1-855e096
Expand Down
6 changes: 3 additions & 3 deletions hack/multikueue-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,14 @@ function kind_load {
install_jobset $WORKER1_KIND_CLUSTER_NAME
install_jobset $WORKER2_KIND_CLUSTER_NAME

#KUBEFLOW SETUP
#MANAGER
# KUBEFLOW SETUP
# MANAGER
# Only install the CRDs and not the controller to be able to
# have Kubeflow Jobs admitted without execution in the manager cluster.
kubectl config use-context kind-${MANAGER_KIND_CLUSTER_NAME}
kubectl apply -k ${KUBEFLOW_CRDS}

#WORKERS
# WORKERS
docker pull kubeflow/training-operator:v1-855e096
install_kubeflow $WORKER1_KIND_CLUSTER_NAME
install_kubeflow $WORKER2_KIND_CLUSTER_NAME
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/jobframework/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ var (
supportedPrebuiltWlJobGVKs = sets.New(
batchv1.SchemeGroupVersion.WithKind("Job").String(),
jobset.SchemeGroupVersion.WithKind("JobSet").String(),
kftraining.SchemeGroupVersion.WithKind("TFJob").String())
kftraining.SchemeGroupVersion.WithKind(kftraining.TFJobKind).String())
)

// ValidateJobOnCreate encapsulates all GenericJob validations that must be performed on a Create operation
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/jobs/job/job_multikueue_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Clie

// add the prebuilt workload
if remoteJob.Labels == nil {
remoteJob.Labels = map[string]string{}
remoteJob.Labels = make(map[string]string, 2)
}
remoteJob.Labels[constants.PrebuiltWorkloadLabel] = workloadName
remoteJob.Labels[kueuealpha.MultiKueueOriginLabel] = origin
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/jobs/jobset/jobset_multikueue_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Clie

// add the prebuilt workload
if remoteJob.Labels == nil {
remoteJob.Labels = map[string]string{}
remoteJob.Labels = make(map[string]string, 2)
}
remoteJob.Labels[constants.PrebuiltWorkloadLabel] = workloadName
remoteJob.Labels[kueuealpha.MultiKueueOriginLabel] = origin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func init() {
// +kubebuilder:rbac:groups=scheduling.k8s.io,resources=priorityclasses,verbs=list;get;watch
// +kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update;patch
// +kubebuilder:rbac:groups=kubeflow.org,resources=tfjobs,verbs=get;list;watch;update;patch
// +kubebuilder:rbac:groups=kubeflow.org,resources=tfjobs/status,verbs=get;update
// +kubebuilder:rbac:groups=kubeflow.org,resources=tfjobs/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=kubeflow.org,resources=tfjobs/finalizers,verbs=get;update
// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads/status,verbs=get;update;patch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"sigs.k8s.io/kueue/pkg/controller/constants"
"sigs.k8s.io/kueue/pkg/controller/jobframework"
"sigs.k8s.io/kueue/pkg/util/api"
clientutil "sigs.k8s.io/kueue/pkg/util/client"
)

type multikueueAdapter struct{}
Expand All @@ -53,8 +54,10 @@ func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Clie

// if the remote exists, just copy the status
if err == nil {
localJob.Status = remoteJob.Status
return localClient.Status().Update(ctx, &localJob)
return clientutil.PatchStatus(ctx, localClient, &localJob, func() (bool, error) {
localJob.Status = remoteJob.Status
return true, nil
})
}

remoteJob = &kftraining.TFJob{
Expand All @@ -64,7 +67,7 @@ func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Clie

// add the prebuilt workload
if remoteJob.Labels == nil {
remoteJob.Labels = map[string]string{}
remoteJob.Labels = make(map[string]string, 2)
}
remoteJob.Labels[constants.PrebuiltWorkloadLabel] = workloadName
remoteJob.Labels[kueuealpha.MultiKueueOriginLabel] = origin
Expand Down
6 changes: 6 additions & 0 deletions site/content/en/docs/concepts/multikueue.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ The `managedBy` field is available as an Alpha feature staring Kubernetes 1.30.0

We recommend using JobSet v0.5.1 or newer.

### Kubeflow

The supported version of the Kubeflow Training Operator is v1.7.0.
The Management cluster should only install the CRDs and not the package itself.
On the other hand, the Worker cluster can install the package directly.

## Submitting Jobs
In a [configured MultiKueue environment](/docs/tasks/manage/setup_multikueue), you can submit any MultiKueue supported job to the Manager cluster, targeting a ClusterQueue configured for Multikueue.
Kueue delegates the job to the configured worker clusters without any additional configuration changes.
Expand Down
10 changes: 10 additions & 0 deletions site/content/en/docs/tasks/manage/setup_multikueue.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,16 @@ kubectl apply --server-side -f https://raw.githubusercontent.com/kubernetes-sigs
```
{{% /alert %}}

### Kubeflow Installation

{{% alert title="Warning" color="warning" %}}
Make sure to only install the Kubeflow TFJobs CRD of version v1.7.0 on the management cluster.

```bash
kubectl apply --server-side -f https://github.com/kubeflow/training-operator/blob/v1.7.0/manifests/base/crds/kubeflow.org_tfjobs.yaml
```
{{% /alert %}}

### Enable the MultiKueue feature

Enable the `MultiKueue` feature.
Expand Down
16 changes: 16 additions & 0 deletions site/static/examples/multikueue/create-multikueue-kubeconfig.sh
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,22 @@ rules:
- get
- patch
- update
- apiGroups:
- kubeflow.org
resources:
- tfjobs
verbs:
- create
- delete
- get
- list
- watch
- apiGroups:
- kubeflow.org
resources:
- tfjobs/status
verbs:
- get
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
Expand Down
65 changes: 17 additions & 48 deletions test/e2e/multikueue/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,27 +372,18 @@ var _ = ginkgo.Describe("MultiKueue", func() {
Queue(managerLq.Name).
TFReplicaSpecs(
testingtfjob.TFReplicaSpecRequirement{
ReplicaType: kftraining.TFJobReplicaTypeChief,
ReplicaCount: 1,
Annotations: map[string]string{
"sidecar.istio.io/inject": "false",
},
ReplicaType: kftraining.TFJobReplicaTypeChief,
ReplicaCount: 1,
RestartPolicy: "OnFailure",
},
testingtfjob.TFReplicaSpecRequirement{
ReplicaType: kftraining.TFJobReplicaTypePS,
ReplicaCount: 1,
Annotations: map[string]string{
"sidecar.istio.io/inject": "false",
},
ReplicaType: kftraining.TFJobReplicaTypePS,
ReplicaCount: 1,
RestartPolicy: "Never",
},
testingtfjob.TFReplicaSpecRequirement{
ReplicaType: kftraining.TFJobReplicaTypeWorker,
ReplicaCount: 2,
Annotations: map[string]string{
"sidecar.istio.io/inject": "false",
},
ReplicaType: kftraining.TFJobReplicaTypeWorker,
ReplicaCount: 1,
RestartPolicy: "OnFailure",
},
).
Expand All @@ -402,68 +393,46 @@ var _ = ginkgo.Describe("MultiKueue", func() {
Request(kftraining.TFJobReplicaTypePS, corev1.ResourceMemory, "200M").
Request(kftraining.TFJobReplicaTypeWorker, corev1.ResourceCPU, "0.5").
Request(kftraining.TFJobReplicaTypeWorker, corev1.ResourceMemory, "100M").
Image(kftraining.TFJobReplicaTypeChief, "gcr.io/k8s-staging-perf-tests/sleep:v0.1.0", []string{"5s"}).
Image(kftraining.TFJobReplicaTypePS, "gcr.io/k8s-staging-perf-tests/sleep:v0.1.0", []string{"5s"}).
Image(kftraining.TFJobReplicaTypeWorker, "gcr.io/k8s-staging-perf-tests/sleep:v0.1.0", []string{"5s"}).
Image(kftraining.TFJobReplicaTypeChief, "gcr.io/k8s-staging-perf-tests/sleep:v0.1.0", []string{"1ms"}).
Image(kftraining.TFJobReplicaTypePS, "gcr.io/k8s-staging-perf-tests/sleep:v0.1.0", []string{"1ms"}).
Image(kftraining.TFJobReplicaTypeWorker, "gcr.io/k8s-staging-perf-tests/sleep:v0.1.0", []string{"1ms"}).
Obj()

ginkgo.By("Creating the TfJob", func() {
gomega.Expect(k8sManagerClient.Create(ctx, tfJob)).Should(gomega.Succeed())
})

createdLeaderWorkload := &kueue.Workload{}
wlLookupKey := types.NamespacedName{Name: workloadtfjob.GetWorkloadNameForTFJob(tfJob.Name, tfJob.UID), Namespace: managerNs.Name}

// the execution should be given to the worker
ginkgo.By("Waiting to be admitted in worker1 and manager", func() {
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sManagerClient.Get(ctx, wlLookupKey, createdLeaderWorkload)).To(gomega.Succeed())
g.Expect(apimeta.FindStatusCondition(createdLeaderWorkload.Status.Conditions, kueue.WorkloadAdmitted)).To(gomega.BeComparableTo(&metav1.Condition{
createdWorkload := &kueue.Workload{}
g.Expect(k8sManagerClient.Get(ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())
g.Expect(apimeta.FindStatusCondition(createdWorkload.Status.Conditions, kueue.WorkloadAdmitted)).To(gomega.BeComparableTo(&metav1.Condition{
Type: kueue.WorkloadAdmitted,
Status: metav1.ConditionTrue,
Reason: "Admitted",
Message: "The workload is admitted",
}, util.IgnoreConditionTimestampsAndObservedGeneration))
g.Expect(workload.FindAdmissionCheck(createdLeaderWorkload.Status.AdmissionChecks, multiKueueAc.Name)).To(gomega.BeComparableTo(&kueue.AdmissionCheckState{
g.Expect(workload.FindAdmissionCheck(createdWorkload.Status.AdmissionChecks, multiKueueAc.Name)).To(gomega.BeComparableTo(&kueue.AdmissionCheckState{
Name: multiKueueAc.Name,
State: kueue.CheckStateReady,
Message: `The workload got reservation on "worker1"`,
}, cmpopts.IgnoreFields(kueue.AdmissionCheckState{}, "LastTransitionTime")))
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("Waiting for the TfJob to get status updates", func() {
gomega.Eventually(func(g gomega.Gomega) {
createdTfJob := &kftraining.TFJob{}
g.Expect(k8sManagerClient.Get(ctx, client.ObjectKeyFromObject(tfJob), createdTfJob)).To(gomega.Succeed())
g.Expect(createdTfJob.Status.ReplicaStatuses).To(gomega.BeComparableTo(
map[kftraining.ReplicaType]*kftraining.ReplicaStatus{
kftraining.TFJobReplicaTypeChief: {
Active: 1,
Succeeded: 0,
},
kftraining.TFJobReplicaTypePS: {
Active: 1,
Succeeded: 0,
},
kftraining.TFJobReplicaTypeWorker: {
Active: 2,
Succeeded: 0,
},
},
util.IgnoreConditionTimestampsAndObservedGeneration))
}, util.LongTimeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("Waiting for the TfJob to finish", func() {
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sManagerClient.Get(ctx, wlLookupKey, createdLeaderWorkload)).To(gomega.Succeed())
createdWorkload := &kueue.Workload{}
g.Expect(k8sManagerClient.Get(ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())

g.Expect(apimeta.FindStatusCondition(createdLeaderWorkload.Status.Conditions, kueue.WorkloadFinished)).To(gomega.BeComparableTo(&metav1.Condition{
g.Expect(apimeta.FindStatusCondition(createdWorkload.Status.Conditions, kueue.WorkloadFinished)).To(gomega.BeComparableTo(&metav1.Condition{
Type: kueue.WorkloadFinished,
Status: metav1.ConditionTrue,
Reason: kueue.WorkloadFinishedReasonSucceeded,
Message: fmt.Sprintf("TFJob %s/%s successfully completed.", createdLeaderWorkload.Namespace, tfJob.Name),
Message: fmt.Sprintf("TFJob %s/%s successfully completed.", createdWorkload.Namespace, tfJob.Name),
}, util.IgnoreConditionTimestampsAndObservedGeneration))
}, util.LongTimeout, util.Interval).Should(gomega.Succeed())
})
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/multikueue/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ var _ = ginkgo.BeforeSuite(func() {
gomega.Expect(err).NotTo(gomega.HaveOccurred())
gomega.Expect(makeMultiKueueSecret(ctx, k8sManagerClient, "kueue-system", "multikueue1", worker1Kconfig)).To(gomega.Succeed())

worker2Kconfig, err := kubeconfigForMultiKueueSA(ctx, k8sWorker2Client, worker2Cfg, "kueue-system", "mksa", worker1ClusterName)
worker2Kconfig, err := kubeconfigForMultiKueueSA(ctx, k8sWorker2Client, worker2Cfg, "kueue-system", "mksa", worker2ClusterName)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
gomega.Expect(makeMultiKueueSecret(ctx, k8sManagerClient, "kueue-system", "multikueue2", worker2Kconfig)).To(gomega.Succeed())

Expand Down
1 change: 0 additions & 1 deletion test/integration/multikueue/multikueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -814,7 +814,6 @@ var _ = ginkgo.Describe("Multikueue", ginkgo.Ordered, ginkgo.ContinueOnFailure,
createdWorkload := &kueue.Workload{}
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())
fmt.Println(createdWorkload.Spec.PodSets)
g.Expect(util.SetQuotaReservation(managerTestCluster.ctx, managerTestCluster.client, createdWorkload, admission)).To(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})
Expand Down

0 comments on commit 3b81bb6

Please sign in to comment.