From 6ddeb2b90ebe116beaa800c57c344913e78aaf38 Mon Sep 17 00:00:00 2001 From: Yuki Iwai Date: Fri, 6 Sep 2024 23:10:16 +0900 Subject: [PATCH] KEP-2170: Initial Implementations for v2 Manager (#2236) Signed-off-by: Yuki Iwai --- .github/workflows/unittests.yaml | 6 +- Makefile | 4 + cmd/training-operator.v2alpha1/main.go | 160 ++++++++++++++++++ pkg/controller.v2/setup.go | 29 ++++ pkg/controller.v2/trainjob_controller.go | 43 +++++ pkg/webhook.v2/webhook.go | 23 +++ test/integration/controller.v2/suite_test.go | 42 +++++ .../controller.v2/trainjob_controller_test.go | 74 ++++++++ test/integration/framework/framework.go | 94 ++++++++++ 9 files changed, 474 insertions(+), 1 deletion(-) create mode 100644 pkg/controller.v2/setup.go create mode 100644 pkg/webhook.v2/webhook.go create mode 100644 test/integration/controller.v2/suite_test.go create mode 100644 test/integration/controller.v2/trainjob_controller_test.go create mode 100644 test/integration/framework/framework.go diff --git a/.github/workflows/unittests.yaml b/.github/workflows/unittests.yaml index 7bf8adbfae..f5e5ea2b65 100644 --- a/.github/workflows/unittests.yaml +++ b/.github/workflows/unittests.yaml @@ -31,10 +31,14 @@ jobs: with: go-version-file: ${{ env.GOPATH }}/src/github.com/kubeflow/training-operator/go.mod - - name: Run Go test + - name: Run Go test for v1 run: | make test ENVTEST_K8S_VERSION=${{ matrix.kubernetes-version }} + - name: Run Go test for v2 + run: | + make test-integrationv2 ENVTEST_K8S_VERSION=${{ matrix.kubernetes-version }} + - name: Coveralls report uses: shogo82148/actions-goveralls@v1 with: diff --git a/Makefile b/Makefile index abbc5865d7..dce93e7ed2 100644 --- a/Makefile +++ b/Makefile @@ -75,6 +75,10 @@ testall: manifests generate fmt vet golangci-lint test ## Run tests. test: envtest KUBEBUILDER_ASSETS="$(shell setup-envtest use $(ENVTEST_K8S_VERSION) -p path)" go test ./... -coverprofile cover.out +.PHONY: test-integrationv2 +test-integrationv2: envtest + KUBEBUILDER_ASSETS="$(shell setup-envtest use $(ENVTEST_K8S_VERSION) -p path)" go test ./test/... -coverprofile cover.out + envtest: ifndef HAS_SETUP_ENVTEST go install sigs.k8s.io/controller-runtime/tools/setup-envtest@bf15e44028f908c790721fc8fe67c7bf2d06a611 # v0.17.2 diff --git a/cmd/training-operator.v2alpha1/main.go b/cmd/training-operator.v2alpha1/main.go index da96f6f2c2..08bb9d4791 100644 --- a/cmd/training-operator.v2alpha1/main.go +++ b/cmd/training-operator.v2alpha1/main.go @@ -16,6 +16,166 @@ limitations under the License. package main +import ( + "crypto/tls" + "errors" + "flag" + "net/http" + "os" + + zaplog "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/healthz" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" + "sigs.k8s.io/controller-runtime/pkg/webhook" + jobsetv1alpha2 "sigs.k8s.io/jobset/api/jobset/v1alpha2" + + kubeflowv2 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v2alpha1" + "github.com/kubeflow/training-operator/pkg/cert" + controllerv2 "github.com/kubeflow/training-operator/pkg/controller.v2" + webhookv2 "github.com/kubeflow/training-operator/pkg/webhook.v2" +) + +var ( + scheme = runtime.NewScheme() + setupLog = ctrl.Log.WithName("setup") +) + +func init() { + utilruntime.Must(clientgoscheme.AddToScheme(scheme)) + utilruntime.Must(kubeflowv2.AddToScheme(scheme)) + utilruntime.Must(jobsetv1alpha2.AddToScheme(scheme)) +} + func main() { + var metricsAddr string + var enableLeaderElection bool + var probeAddr string + var secureMetrics bool + var enableHTTP2 bool + var webhookServerPort int + var webhookServiceName string + var webhookSecretName string + var tlsOpts []func(*tls.Config) + + flag.StringVar(&metricsAddr, "metrics-bind-address", "0", "The address the metrics endpoint binds to. "+ + "Use :8443 for HTTPS or :8080 for HTTP, or leave as 0 to disable the metrics service.") + flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.") + flag.BoolVar(&enableLeaderElection, "leader-elect", false, + "Enable leader election for controller manager. "+ + "Enabling this will ensure there is only one active controller manager.") + flag.BoolVar(&secureMetrics, "metrics-secure", true, + "If set, the metrics endpoint is served securely via HTTPS. Use --metrics-secure=false to use HTTP instead.") + flag.BoolVar(&enableHTTP2, "enable-http2", false, + "If set, HTTP/2 will be enabled for the metrics and webhook servers") + + // Cert generation flags + flag.IntVar(&webhookServerPort, "webhook-server-port", 9443, "Endpoint port for the webhook server.") + flag.StringVar(&webhookServiceName, "webhook-service-name", "training-operator-v2", "Name of the Service used as part of the DNSName") + flag.StringVar(&webhookSecretName, "webhook-secret-name", "training-operator-v2-webhook-cert", "Name of the Secret to store CA and server certs") + + opts := zap.Options{ + TimeEncoder: zapcore.RFC3339NanoTimeEncoder, + ZapOpts: []zaplog.Option{zaplog.AddCaller()}, + } + opts.BindFlags(flag.CommandLine) + flag.Parse() + + ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts))) + + if !enableHTTP2 { + // if the enable-http2 flag is false (the default), http/2 should be disabled + // due to its vulnerabilities. More specifically, disabling http/2 will + // prevent from being vulnerable to the HTTP/2 Stream Cancellation and + // Rapid Reset CVEs. For more information see: + // - https://github.com/advisories/GHSA-qppj-fm5r-hxr3 + // - https://github.com/advisories/GHSA-4374-p667-p6c8 + tlsOpts = append(tlsOpts, func(c *tls.Config) { + setupLog.Info("disabling http/2") + c.NextProtos = []string{"http/1.1"} + }) + } + mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ + Scheme: scheme, + Metrics: metricsserver.Options{ + BindAddress: metricsAddr, + SecureServing: secureMetrics, + TLSOpts: tlsOpts, + }, + WebhookServer: webhook.NewServer(webhook.Options{ + Port: webhookServerPort, + TLSOpts: tlsOpts, + }), + HealthProbeBindAddress: probeAddr, + }) + if err != nil { + setupLog.Error(err, "unable to start manager") + os.Exit(1) + } + + certsReady := make(chan struct{}) + if err = cert.ManageCerts(mgr, cert.Config{ + WebhookSecretName: webhookSecretName, + WebhookServiceName: webhookServiceName, + }, certsReady); err != nil { + setupLog.Error(err, "unable to set up cert rotation") + os.Exit(1) + } + + setupProbeEndpoints(mgr, certsReady) + // Set up controllers using goroutines to start the manager quickly. + go setupControllers(mgr, certsReady) + + setupLog.Info("Starting manager") + if err = mgr.Start(ctrl.SetupSignalHandler()); err != nil { + setupLog.Error(err, "Could not run manager") + os.Exit(1) + } +} + +func setupControllers(mgr ctrl.Manager, certsReady <-chan struct{}) { + setupLog.Info("Waiting for certificate generation to complete") + <-certsReady + setupLog.Info("Certs ready") + + if failedCtrlName, err := controllerv2.SetupControllers(mgr); err != nil { + setupLog.Error(err, "Could not create controller", "controller", failedCtrlName) + os.Exit(1) + } + if failedWebhook, err := webhookv2.Setup(mgr); err != nil { + setupLog.Error(err, "Could not create webhook", "webhook", failedWebhook) + os.Exit(1) + } +} + +func setupProbeEndpoints(mgr ctrl.Manager, certsReady <-chan struct{}) { + defer setupLog.Info("Probe endpoints are configured on healthz and readyz") + if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { + setupLog.Error(err, "unable to set up health check") + os.Exit(1) + } + // Wait for the webhook server to be listening before advertising the + // training-operator replica as ready. This allows users to wait with sending the first + // requests, requiring webhooks, until the training-operator deployment is available, so + // that the early requests are not rejected during the training-operator's startup. + // We wrap the call to GetWebhookServer in a closure to delay calling + // the function, otherwise a not fully-initialized webhook server (without + // ready certs) fails the start of the manager. + if err := mgr.AddReadyzCheck("readyz", func(req *http.Request) error { + select { + case <-certsReady: + return mgr.GetWebhookServer().StartedChecker()(req) + default: + return errors.New("certificates are not ready") + } + }); err != nil { + setupLog.Error(err, "unable to set up ready check") + os.Exit(1) + } } diff --git a/pkg/controller.v2/setup.go b/pkg/controller.v2/setup.go new file mode 100644 index 0000000000..79e89fa0c5 --- /dev/null +++ b/pkg/controller.v2/setup.go @@ -0,0 +1,29 @@ +/* +Copyright 2024 The Kubeflow Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllerv2 + +import ctrl "sigs.k8s.io/controller-runtime" + +func SetupControllers(mgr ctrl.Manager) (string, error) { + if err := NewTrainJobReconciler( + mgr.GetClient(), + mgr.GetEventRecorderFor("training-operator-trainjob-controller"), + ).SetupWithManager(mgr); err != nil { + return "TrainJob", err + } + return "", nil +} diff --git a/pkg/controller.v2/trainjob_controller.go b/pkg/controller.v2/trainjob_controller.go index d10f548225..e12cc3c2d7 100644 --- a/pkg/controller.v2/trainjob_controller.go +++ b/pkg/controller.v2/trainjob_controller.go @@ -15,3 +15,46 @@ limitations under the License. */ package controllerv2 + +import ( + "context" + + "github.com/go-logr/logr" + "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + kubeflowv2 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v2alpha1" +) + +type TrainJobReconciler struct { + log logr.Logger + client client.Client + recorder record.EventRecorder +} + +func NewTrainJobReconciler(client client.Client, recorder record.EventRecorder) *TrainJobReconciler { + return &TrainJobReconciler{ + log: ctrl.Log.WithName("trainjob-controller"), + client: client, + recorder: recorder, + } +} + +func (r *TrainJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + var trainJob kubeflowv2.TrainJob + if err := r.client.Get(ctx, req.NamespacedName, &trainJob); err != nil { + return ctrl.Result{}, client.IgnoreNotFound(err) + } + log := ctrl.LoggerFrom(ctx).WithValues("trainJob", klog.KObj(&trainJob)) + ctrl.LoggerInto(ctx, log) + log.V(2).Info("Reconciling TrainJob") + return ctrl.Result{}, nil +} + +func (r *TrainJobReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&kubeflowv2.TrainJob{}). + Complete(r) +} diff --git a/pkg/webhook.v2/webhook.go b/pkg/webhook.v2/webhook.go new file mode 100644 index 0000000000..3d4970ef45 --- /dev/null +++ b/pkg/webhook.v2/webhook.go @@ -0,0 +1,23 @@ +/* +Copyright 2024 The Kubeflow Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package webhookv2 + +import ctrl "sigs.k8s.io/controller-runtime" + +func Setup(ctrl.Manager) (string, error) { + return "", nil +} diff --git a/test/integration/controller.v2/suite_test.go b/test/integration/controller.v2/suite_test.go new file mode 100644 index 0000000000..a8d7abbf92 --- /dev/null +++ b/test/integration/controller.v2/suite_test.go @@ -0,0 +1,42 @@ +/* +Copyright 2024 The Kubeflow Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllerv2 + +import ( + "context" + "testing" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/kubeflow/training-operator/test/integration/framework" +) + +var ( + cfg *rest.Config + k8sClient client.Client + ctx context.Context + fwk *framework.Framework +) + +func TestAPIs(t *testing.T) { + gomega.RegisterFailHandler(ginkgo.Fail) + + ginkgo.RunSpecs(t, "v2 Controllers Suite") +} diff --git a/test/integration/controller.v2/trainjob_controller_test.go b/test/integration/controller.v2/trainjob_controller_test.go new file mode 100644 index 0000000000..d13f1179f1 --- /dev/null +++ b/test/integration/controller.v2/trainjob_controller_test.go @@ -0,0 +1,74 @@ +/* +Copyright 2024 The Kubeflow Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllerv2 + +import ( + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + + kubeflowv2 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v2alpha1" + "github.com/kubeflow/training-operator/test/integration/framework" +) + +var _ = ginkgo.Describe("TrainJob controller", ginkgo.Ordered, func() { + var ns *corev1.Namespace + + ginkgo.BeforeAll(func() { + fwk = &framework.Framework{} + cfg = fwk.Init() + ctx, k8sClient = fwk.RunManager(cfg) + }) + ginkgo.AfterAll(func() { + fwk.Teardown() + }) + + ginkgo.BeforeEach(func() { + ns = &corev1.Namespace{ + TypeMeta: metav1.TypeMeta{ + APIVersion: corev1.SchemeGroupVersion.String(), + Kind: "Namespace", + }, + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "trainjob-", + }, + } + gomega.Expect(k8sClient.Create(ctx, ns)).To(gomega.Succeed()) + }) + + ginkgo.When("Reconciling TrainJob", func() { + ginkgo.AfterEach(func() { + gomega.Expect(k8sClient.DeleteAllOf(ctx, &kubeflowv2.TrainJob{}, client.InNamespace(ns.Name))).Should(gomega.Succeed()) + }) + + ginkgo.It("Should succeed to create TrainJob", func() { + trainJob := &kubeflowv2.TrainJob{ + TypeMeta: metav1.TypeMeta{ + APIVersion: kubeflowv2.SchemeGroupVersion.String(), + Kind: "TrainJob", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "alpha", + Namespace: ns.Name, + }, + } + gomega.Expect(k8sClient.Create(ctx, trainJob)).Should(gomega.Succeed()) + }) + }) +}) diff --git a/test/integration/framework/framework.go b/test/integration/framework/framework.go new file mode 100644 index 0000000000..97d15246dd --- /dev/null +++ b/test/integration/framework/framework.go @@ -0,0 +1,94 @@ +/* +Copyright 2024 The Kubeflow Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package framework + +import ( + "context" + "path/filepath" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/envtest" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + "sigs.k8s.io/controller-runtime/pkg/manager" + metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" + + kubeflowv2 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v2alpha1" + controllerv2 "github.com/kubeflow/training-operator/pkg/controller.v2" +) + +type Framework struct { + testEnv *envtest.Environment + cancel context.CancelFunc +} + +func (f *Framework) Init() *rest.Config { + log.SetLogger(zap.New(zap.WriteTo(ginkgo.GinkgoWriter), zap.UseDevMode(true))) + ginkgo.By("bootstrapping test environment") + f.testEnv = &envtest.Environment{ + CRDDirectoryPaths: []string{filepath.Join("..", "..", "..", "manifests", "v2", "base", "crds")}, + ErrorIfCRDPathMissing: true, + } + cfg, err := f.testEnv.Start() + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(cfg).NotTo(gomega.BeNil()) + return cfg +} + +func (f *Framework) RunManager(cfg *rest.Config) (context.Context, client.Client) { + gomega.ExpectWithOffset(1, kubeflowv2.AddToScheme(scheme.Scheme)).NotTo(gomega.HaveOccurred()) + + // +kubebuilder:scaffold:scheme + + k8sClient, err := client.New(cfg, client.Options{Scheme: scheme.Scheme}) + gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred()) + gomega.ExpectWithOffset(1, k8sClient).NotTo(gomega.BeNil()) + + ctx, cancel := context.WithCancel(context.Background()) + f.cancel = cancel + mgr, err := ctrl.NewManager(cfg, manager.Options{ + Scheme: scheme.Scheme, + Metrics: metricsserver.Options{ + BindAddress: "0", // disable metrics to avoid conflicts between packages. + }, + }) + gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred(), "failed to create manager") + + failedCtrlName, err := controllerv2.SetupControllers(mgr) + gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred(), "controller", failedCtrlName) + + go func() { + defer ginkgo.GinkgoRecover() + err = mgr.Start(ctx) + gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred(), "failed to run manager") + }() + + return ctx, k8sClient +} + +func (f *Framework) Teardown() { + ginkgo.By("tearing down the test environment") + if f.cancel != nil { + f.cancel() + } + gomega.ExpectWithOffset(1, f.testEnv.Stop()).NotTo(gomega.HaveOccurred()) +}