-
Notifications
You must be signed in to change notification settings - Fork 689
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KEP-2170: Implement runtime framework #2248
base: master
Are you sure you want to change the base?
KEP-2170: Implement runtime framework #2248
Conversation
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here.
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
92b1dd1
to
4195338
Compare
Pull Request Test Coverage Report for Build 10798105140Details
💛 - Coveralls |
c7d0e0f
to
d220851
Compare
Signed-off-by: Yuki Iwai <[email protected]>
d220851
to
caa8564
Compare
sigs.k8s.io/controller-runtime v0.17.3 | ||
sigs.k8s.io/jobset v0.5.2 | ||
sigs.k8s.io/kueue v0.6.3 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need kueue dependency ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This dependency came from
training-operator/pkg/runtime.v2/runtime.go
Line 125 in 22da8af
PodRequests: kueuelr.TotalRequests(&spec.podSpec), |
This allows us to set the appropriate required resources for PodGroup. If we remove this dependency, we need to just copy Kueue's "TotalRequests" function here. I believe that just coping and pasting is not the ideal way.
Signed-off-by: Yuki Iwai <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for this effort @tenzen-y!
I left my initial comments, will try to review again soon.
/assign @kubeflow/wg-training-leads @kannon92 @shravan-achar @akshaychitneni @kuizhiqing @Syulin7
var trainingRuntimeFactory *TrainingRuntime | ||
|
||
func NewTrainingRuntime(ctx context.Context, c client.Client, indexer client.FieldIndexer) (runtime.Runtime, error) { | ||
if err := indexer.IndexField(ctx, &kubeflowv2.TrainJob{}, idxer.TrainJobTrainingRuntimeRefKey, idxer.IndexTrainJobTrainingRuntimes); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need indexers for runtimes ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For other runtimes that users can register, we will still use the same API fields, isn't ?
runtimeRef:
name: RuntimeName
kind: RuntimeKind
apiGroup: RuntimeAPIGroup
return nil | ||
} | ||
|
||
func (f *Framework) RunCustomValidationPlugins(oldObj, newObj client.Object) (admission.Warnings, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How this is used and why do we need it if we have validation webhook ?
if err := r.client.Get(ctx, client.ObjectKey{Name: trainJob.Spec.TrainingRuntimeRef.Name}, &clTrainingRuntime); err != nil { | ||
return nil, fmt.Errorf("%w: %w", errorNotFoundSpecifiedClusterTrainingRuntime, err) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be part of TrainJob validation ?
|
||
var ClusterTrainingRuntimeGroupKind = schema.GroupKind{ | ||
Group: kubeflowv2.GroupVersion.Group, | ||
Kind: "ClusterTrainingRuntime", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move the ClusterTrainingRuntime and TrainingRuntime as constant variables to the API types ?
if rJob.Replicas == 0 { | ||
jobSetTemplateSpec.Spec.ReplicatedJobs[idx].Replicas = 1 | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does JobSet have a default value for Job replica number ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this if replicas can't be == 0 ?
@andreyvelich Thank you for the first comment. /hold |
@@ -80,6 +80,10 @@ test: envtest | |||
test-integrationv2: envtest | |||
KUBEBUILDER_ASSETS="$(shell setup-envtest use $(ENVTEST_K8S_VERSION) -p path)" go test ./test/... -coverprofile cover.out | |||
|
|||
.PHONY: testv2 | |||
testv2: | |||
go test ./pkg/controller.v2/... ./pkg/runtime.v2/... ./pkg/webhook.v2/... ./pkg/util.v2/... -coverprofile cover.out |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
go test ./pkg/controller.v2/... ./pkg/runtime.v2/... ./pkg/webhook.v2/... ./pkg/util.v2/... -coverprofile cover.out | |
go test ./pkg/.. -coverprofile cover.out |
This can at least help future people if a new folder is added and they don't need to wonder why there was not coverage for their package.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not available due to v1 testing codes.
After we remove v1 codes, we can migrate to the approach.
Additionally, we proposed some automation here: #2248 (comment)
But, the approach is rejected.
sigs.k8s.io/controller-runtime v0.17.3 | ||
sigs.k8s.io/jobset v0.5.2 | ||
sigs.k8s.io/kueue v0.6.3 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a pretty old version of Kueue, no?
sigs.k8s.io/kueue v0.6.3 | |
sigs.k8s.io/kueue v0.8.1 |
If we really need a older major release of Kueue, that is concerning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No available to use the latest version due to existing v1 codes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do we update v1 to use a later version of Kueue? I am not even sure if 0.6 will be in support much longer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh is it because we are still on 0.29 and bringing in Kueue will cause an update of the k8s libs?
Currently I didn't see any reference of Kueue in the go.mods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh is it because we are still on 0.29 and bringing in Kueue will cause an update of the k8s libs?
Currently I didn't see any reference of Kueue in the go.mods.
Yes, that's right. Currently, we use the v1.29 K8s client, but once we introduce the latest Kueue, the k8s client should be upgraded to the latest version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tenzen-y @kannon92 If we just use one helper function from Kueue, don't you think it is better to just copy it here?
Otherwise, can we find something that we can use from scheduler plugins ?
For example this one: https://github.com/kubernetes-sigs/scheduler-plugins/blob/bb56af11184a0f6ed33e2fc8b189a5b1ccfc60e4/pkg/noderesources/resource_allocation.go#L104
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tenzen-y I added a few more comments.
type TrainingRuntime struct { | ||
framework *fwkcore.Framework | ||
client client.Client | ||
scheme *apiruntime.Scheme |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need scheme on the TrainingRuntime ?
runtime "github.com/kubeflow/training-operator/pkg/runtime.v2" | ||
fwkcore "github.com/kubeflow/training-operator/pkg/runtime.v2/framework/core" | ||
fwkplugins "github.com/kubeflow/training-operator/pkg/runtime.v2/framework/plugins" | ||
idxer "github.com/kubeflow/training-operator/pkg/runtime.v2/indexer" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
idxer "github.com/kubeflow/training-operator/pkg/runtime.v2/indexer" | |
indexer "github.com/kubeflow/training-operator/pkg/runtime.v2/indexer" |
var trainingRuntimeFactory *TrainingRuntime | ||
|
||
func NewTrainingRuntime(ctx context.Context, c client.Client, indexer client.FieldIndexer) (runtime.Runtime, error) { | ||
if err := indexer.IndexField(ctx, &kubeflowv2.TrainJob{}, idxer.TrainJobTrainingRuntimeRefKey, idxer.IndexTrainJobTrainingRuntimes); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For other runtimes that users can register, we will still use the same API fields, isn't ?
runtimeRef:
name: RuntimeName
kind: RuntimeKind
apiGroup: RuntimeAPIGroup
schema.GroupKind{Group: jobsetv1alpha2.GroupVersion.Group, Kind: "JobSet"}, | ||
jobsetv1alpha2.SchemeGroupVersion.Version, | ||
); err != nil { | ||
return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are we checking here ?
} | ||
return []runtime.ReconcilerBuilder{ | ||
func(b *builder.Builder, c client.Client) *builder.Builder { | ||
return b.Owns(&jobsetv1alpha2.JobSet{}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How are we going to identify that we should watch only for the JobSet created by TrainJob (e.g. has the appropriate owner reference, like we are doing here: https://github.com/kubeflow/katib/blob/master/pkg/controller.v1beta1/experiment/experiment_controller.go#L124C26-L124C48) ?
Spec: *jobSetTemplateSpec.Spec.DeepCopy(), | ||
}, opts...) | ||
|
||
if err := r.framework.RunEnforceMLPolicyPlugins(info); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How we can make sure only one of MLPolicy will be used ?
E.g. user can set one of Torch or MPI policy.
if info == nil || info.Obj == nil || trainJob == nil { | ||
return nil, fmt.Errorf("runtime info or object is missing") | ||
} | ||
raw, ok := info.Obj.(*jobsetv1alpha2.JobSet) | ||
if !ok { | ||
return nil, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to have it if this will be part of validation ?
Spec: raw.Spec, | ||
}) | ||
jobSet := jobSetBuilder. | ||
ContainerImage(trainJob.Spec.Trainer.Image). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to make sure we insert image only for replicated Job with training script (e.g. it should have Node name).
|
||
func needsCreateOrUpdate(old, new *jobsetv1alpha2.JobSet, suspended bool) bool { | ||
return old == nil || | ||
suspended && (!equality.Semantic.DeepEqual(old.Spec, new.Spec) || !maps.Equal(old.Labels, new.Labels) || !maps.Equal(old.Annotations, new.Annotations)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If TrainJob is suspended do we need to create/update JobSet ?
|
||
func needsCreateOrUpdate(old, new *jobsetv1alpha2.JobSet, suspended bool) bool { | ||
return old == nil || | ||
suspended && (!equality.Semantic.DeepEqual(old.Spec, new.Spec) || !maps.Equal(old.Labels, new.Labels) || !maps.Equal(old.Annotations, new.Annotations)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we check the labels/annotations ?
Group: kubeflowv2.GroupVersion.Group, | ||
Kind: "TrainingRuntime", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did we skip version of GVK ?
What this PR does / why we need it:
Brief Design: https://docs.google.com/presentation/d/1HyEsBa7hxWpIoBXaX6uECiB48FWB85SG1kx15mO8hug/edit#slide=id.g30596bfee76_0_202
I implemented the runtime framework interfaces.
The responsibilities are the following:
/runtime.v2/core: This contains the actual Kubeflow Job Pipeline like TrainigRuntime (not CRD), which is an internal concept.
These pipelines build objects or create reconcile builders. We will add some pipelines in the future like SingleHostTrainingRuntime.
/runtime.v2/framework: This contains the Kubeflow Job Pipeline Framework, which has some extension points in the following, and we will add some extension points in the future.
/runtime.v2/framework/plugins: This contains the Kubeflow Job Pipeline Framework plugins, which implement the Framework extension points. Each of these plugins is performed in Kubeflow Job Pipeline Framework extension points.
Additionally, I did not implement all plugins. So, I will open an issue and delegate plugin implementation contributors who are interested in this project.
Which issue(s) this PR fixes (optional, in
Fixes #<issue number>, #<issue number>, ...
format, will close the issue(s) when PR gets merged):Fixes #
Checklist: