From 8f0e756fdbdf32c12073e1b0dce0167e7e59329a Mon Sep 17 00:00:00 2001 From: Bastian Krol Date: Sun, 18 Aug 2024 11:29:07 +0200 Subject: [PATCH] refactor(controller, instrumenter): extract instrumentation logic Extract the controller's instrumentation logic into a separate module, called instrumentation/instrumenter. This allows running the InstrumentAtStartup task with a separate client and to move it into executeStartupTasks; getting rid of the arbitrary (and potentially problematic) 10 second wait, which we previously needed to wait until the manager had finished initializing its Kubernetes API client. --- cmd/main.go | 115 +- .../otelcol_resources_suite_test.go | 3 + internal/dash0/controller/dash0_controller.go | 978 +----------------- .../controller/dash0_controller_suite_test.go | 4 +- .../dash0/controller/dash0_controller_test.go | 167 +-- .../instrumentable_workloads.go | 2 +- .../instrumentation_suite_test.go | 91 ++ .../dash0/instrumentation/instrumenter.go | 954 +++++++++++++++++ .../instrumentation/instrumenter_test.go | 198 ++++ internal/dash0/removal/removal_suite_test.go | 24 +- .../controller_util.go => util/controller.go} | 25 +- internal/dash0/util/types.go | 7 + internal/dash0/util/util_suite_test.go | 5 +- .../webhook/attach_dangling_events_test.go | 15 +- internal/dash0/webhook/dash0_webhook_test.go | 8 +- internal/dash0/webhook/webhook_suite_test.go | 2 +- .../dash0/workloads/workload_modifier_test.go | 10 +- .../dash0/workloads/workloads_suite_test.go | 5 +- test/e2e/run_command.go | 3 + test/util/constants.go | 9 +- test/util/resources.go | 6 +- test/util/verification.go | 2 +- 22 files changed, 1462 insertions(+), 1171 deletions(-) rename internal/dash0/{controller => instrumentation}/instrumentable_workloads.go (99%) create mode 100644 internal/dash0/instrumentation/instrumentation_suite_test.go create mode 100644 internal/dash0/instrumentation/instrumenter.go create mode 100644 internal/dash0/instrumentation/instrumenter_test.go rename internal/dash0/{controller/controller_util.go => util/controller.go} (95%) diff --git a/cmd/main.go b/cmd/main.go index dd24498d..41b212c9 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -10,7 +10,6 @@ import ( "fmt" "os" "strings" - "time" // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) // to ensure that exec-entrypoint and run can make use of them. @@ -23,6 +22,7 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/kubernetes" clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/healthz" @@ -35,6 +35,7 @@ import ( "github.com/dash0hq/dash0-operator/internal/backendconnection" "github.com/dash0hq/dash0-operator/internal/backendconnection/otelcolresources" dash0controller "github.com/dash0hq/dash0-operator/internal/dash0/controller" + "github.com/dash0hq/dash0-operator/internal/dash0/instrumentation" dash0removal "github.com/dash0hq/dash0-operator/internal/dash0/removal" dash0util "github.com/dash0hq/dash0-operator/internal/dash0/util" dash0webhook "github.com/dash0hq/dash0-operator/internal/dash0/webhook" @@ -245,17 +246,7 @@ func startOperatorManager( envVars.oTelCollectorNamePrefix, ) - var deploymentSelfReference *appsv1.Deployment - if deploymentSelfReference, err = executeStartupTasks( - ctx, - envVars.operatorNamespace, - envVars.deploymentName, - &setupLog, - ); err != nil { - return err - } - - err = startDash0Controller(mgr, clientset, envVars, deploymentSelfReference) + err = startDash0Controller(ctx, mgr, clientset, envVars) if err != nil { return err } @@ -278,10 +269,10 @@ func startOperatorManager( } func startDash0Controller( + ctx context.Context, mgr manager.Manager, clientset *kubernetes.Clientset, envVars *environmentVariables, - deploymentSelfReference *appsv1.Deployment, ) error { oTelCollectorBaseUrl := fmt.Sprintf( @@ -299,26 +290,48 @@ func startDash0Controller( FilelogOffsetSynchImage: envVars.filelogOffsetSynchImage, FilelogOffsetSynchImagePullPolicy: envVars.filelogOffsetSynchImagePullPolicy, } + + var deploymentSelfReference *appsv1.Deployment + var err error + + if deploymentSelfReference, err = executeStartupTasks( + ctx, + clientset, + mgr.GetEventRecorderFor("dash0-startup-tasks"), + images, + oTelCollectorBaseUrl, + envVars.operatorNamespace, + envVars.deploymentName, + &setupLog, + ); err != nil { + return err + } + + k8sClient := mgr.GetClient() + instrumenter := &instrumentation.Instrumenter{ + Client: k8sClient, + Clientset: clientset, + Recorder: mgr.GetEventRecorderFor("dash0-controller"), + Images: images, + OTelCollectorBaseUrl: oTelCollectorBaseUrl, + } oTelColResourceManager := &otelcolresources.OTelColResourceManager{ - Client: mgr.GetClient(), + Client: k8sClient, Scheme: mgr.GetScheme(), DeploymentSelfReference: deploymentSelfReference, OTelCollectorNamePrefix: envVars.oTelCollectorNamePrefix, } backendConnectionManager := &backendconnection.BackendConnectionManager{ - Client: mgr.GetClient(), + Client: k8sClient, Clientset: clientset, OTelColResourceManager: oTelColResourceManager, } dash0Reconciler := &dash0controller.Dash0Reconciler{ - Client: mgr.GetClient(), + Client: k8sClient, Clientset: clientset, - Scheme: mgr.GetScheme(), - Recorder: mgr.GetEventRecorderFor("dash0-controller"), + Instrumenter: instrumenter, BackendConnectionManager: backendConnectionManager, Images: images, - OTelCollectorNamePrefix: envVars.oTelCollectorNamePrefix, - OTelCollectorBaseUrl: oTelCollectorBaseUrl, OperatorNamespace: envVars.operatorNamespace, } @@ -329,7 +342,7 @@ func startDash0Controller( if os.Getenv("ENABLE_WEBHOOK") != "false" { if err := (&dash0webhook.Handler{ - Client: mgr.GetClient(), + Client: k8sClient, Recorder: mgr.GetEventRecorderFor("dash0-webhook"), Images: images, OTelCollectorBaseUrl: oTelCollectorBaseUrl, @@ -341,31 +354,49 @@ func startDash0Controller( setupLog.Info("Dash0 webhooks have been disabled via configuration.") } - go func() { - time.Sleep(10 * time.Second) - - // trigger an unconditional apply/update of instrumentation for all workloads, see godoc comment on - // Dash0Reconciler#InstrumentAtStartup - dash0Reconciler.InstrumentAtStartup() - }() - return nil } func executeStartupTasks( ctx context.Context, + clientset *kubernetes.Clientset, + eventRecorder record.EventRecorder, + images dash0util.Images, + oTelCollectorBaseUrl string, operatorNamespace string, deploymentName string, logger *logr.Logger, ) (*appsv1.Deployment, error) { cfg := ctrl.GetConfigOrDie() - client, err := client.New(cfg, client.Options{}) + startupTasksK8sClient, err := client.New(cfg, client.Options{ + Scheme: scheme, + }) if err != nil { logger.Error(err, "failed to create Kubernetes API client for startup tasks") return nil, err } - return findDeploymentSelfReference(ctx, client, operatorNamespace, deploymentName, logger) + instrumentAtStartup( + ctx, + startupTasksK8sClient, + clientset, + eventRecorder, + images, + oTelCollectorBaseUrl, + ) + + deploymentSelfReference, err := findDeploymentSelfReference( + ctx, + startupTasksK8sClient, + operatorNamespace, + deploymentName, + logger, + ) + if err != nil { + return nil, err + } + + return deploymentSelfReference, nil } func findDeploymentSelfReference( @@ -393,6 +424,28 @@ func findDeploymentSelfReference( return deploymentSelfReference, nil } +func instrumentAtStartup( + ctx context.Context, + startupTasksK8sClient client.Client, + clientset *kubernetes.Clientset, + eventRecorder record.EventRecorder, + images dash0util.Images, + oTelCollectorBaseUrl string, +) { + startupInstrumenter := &instrumentation.Instrumenter{ + Client: startupTasksK8sClient, + Clientset: clientset, + Recorder: eventRecorder, + Images: images, + OTelCollectorBaseUrl: oTelCollectorBaseUrl, + } + + // Trigger an unconditional apply/update of instrumentation for all workloads in Dash0-enabled namespaces, according + // to the respective settings of the Dash0 monitoring resource in the namespace. See godoc comment on + // Instrumenter#InstrumentAtStartup. + startupInstrumenter.InstrumentAtStartup(ctx, startupTasksK8sClient, &setupLog) +} + func readEnvironmentVariables() (*environmentVariables, error) { operatorNamespace, isSet := os.LookupEnv(operatorNamespaceEnvVarName) if !isSet { diff --git a/internal/backendconnection/otelcolresources/otelcol_resources_suite_test.go b/internal/backendconnection/otelcolresources/otelcol_resources_suite_test.go index f82d66f2..ea5b5e8f 100644 --- a/internal/backendconnection/otelcolresources/otelcol_resources_suite_test.go +++ b/internal/backendconnection/otelcolresources/otelcol_resources_suite_test.go @@ -1,3 +1,6 @@ +// SPDX-FileCopyrightText: Copyright 2024 Dash0 Inc. +// SPDX-License-Identifier: Apache-2.0 + package otelcolresources import ( diff --git a/internal/dash0/controller/dash0_controller.go b/internal/dash0/controller/dash0_controller.go index aac0f3bb..5943553e 100644 --- a/internal/dash0/controller/dash0_controller.go +++ b/internal/dash0/controller/dash0_controller.go @@ -7,18 +7,12 @@ import ( "context" "errors" "fmt" - "slices" "time" "github.com/go-logr/logr" - appsv1 "k8s.io/api/apps/v1" - batchv1 "k8s.io/api/batch/v1" - corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" @@ -26,8 +20,8 @@ import ( dash0v1alpha1 "github.com/dash0hq/dash0-operator/api/dash0monitoring/v1alpha1" "github.com/dash0hq/dash0-operator/internal/backendconnection" + "github.com/dash0hq/dash0-operator/internal/dash0/instrumentation" "github.com/dash0hq/dash0-operator/internal/dash0/util" - "github.com/dash0hq/dash0-operator/internal/dash0/workloads" ) type DanglingEventsTimeouts struct { @@ -38,32 +32,18 @@ type DanglingEventsTimeouts struct { type Dash0Reconciler struct { client.Client Clientset *kubernetes.Clientset - Scheme *runtime.Scheme - Recorder record.EventRecorder + Instrumenter *instrumentation.Instrumenter BackendConnectionManager *backendconnection.BackendConnectionManager Images util.Images - OTelCollectorNamePrefix string - OTelCollectorBaseUrl string OperatorNamespace string DanglingEventsTimeouts *DanglingEventsTimeouts } -type modificationMode string - const ( - workkloadTypeLabel = "workload type" - workloadNamespaceLabel = "workload namespace" - workloadNameLabel = "workload name" - updateStatusFailedMessage = "Failed to update Dash0 monitoring status conditions, requeuing reconcile request." - - modificationModeInstrumentation modificationMode = "instrumentation" - modificationModeUninstrumentation modificationMode = "uninstrumentation" ) var ( - timeoutForListingPods int64 = 2 - defaultDanglingEventsTimeouts = &DanglingEventsTimeouts{ InitialTimeout: 30 * time.Second, Backoff: wait.Backoff{ @@ -75,31 +55,6 @@ var ( } ) -type ImmutableWorkloadError struct { - workloadType string - workloadName string - modificationMode modificationMode -} - -func (e ImmutableWorkloadError) Error() string { - var modificationParticle string - switch e.modificationMode { - case modificationModeInstrumentation: - modificationParticle = "instrument" - case modificationModeUninstrumentation: - modificationParticle = "remove the instrumentation from" - default: - modificationParticle = "modify" - } - - return fmt.Sprintf( - "Dash0 cannot %s the existing %s %s, since this type of workload is immutable.", - modificationParticle, - e.workloadType, - e.workloadName, - ) -} - func (r *Dash0Reconciler) SetupWithManager(mgr ctrl.Manager) error { if r.DanglingEventsTimeouts == nil { r.DanglingEventsTimeouts = defaultDanglingEventsTimeouts @@ -109,68 +64,6 @@ func (r *Dash0Reconciler) SetupWithManager(mgr ctrl.Manager) error { Complete(r) } -// InstrumentAtStartup is run once, when the controller process starts. Its main purpose is to upgrade workloads that -// have already been instrumented, in namespaces where the Dash0 monitoring resource already exists. For those workloads, -// it is not guaranteed that a reconcile request will be triggered when the operator controller image is updated and -// restarted - reconcile requests are only triggered when the Dash0 monitoring resource is installed/changed/deleted. -// Since it runs the full instrumentation process, it might also as a byproduct instrument workloads that are not -// instrumented yet. It will only cover namespaces where a Dash0 monitoring resource exists, because it works by listing -// all Dash0 monitoring resources and then instrumenting workloads in the corresponding namespaces. -func (r *Dash0Reconciler) InstrumentAtStartup() { - ctx := context.Background() - logger := log.FromContext(ctx) - logger.Info("Applying/updating instrumentation at controller startup.") - dash0MonitoringResourcesInNamespace := &dash0v1alpha1.Dash0MonitoringList{} - if err := r.Client.List( - ctx, - dash0MonitoringResourcesInNamespace, - &client.ListOptions{}, - ); err != nil { - logger.Error(err, "Failed to list all Dash0 monitoring resources at controller startup.") - return - } - - logger.Info(fmt.Sprintf("Found %d Dash0 monitoring resources.", len(dash0MonitoringResourcesInNamespace.Items))) - for _, dash0MonitoringResource := range dash0MonitoringResourcesInNamespace.Items { - logger.Info(fmt.Sprintf("Processing workloads in Dash0-enabled namespace %s", dash0MonitoringResource.Namespace)) - - if dash0MonitoringResource.IsMarkedForDeletion() { - continue - } - pseudoReconcileRequest := ctrl.Request{ - NamespacedName: client.ObjectKey{ - Namespace: dash0MonitoringResource.Namespace, - Name: dash0MonitoringResource.Name, - }, - } - _, stop, err := verifyUniqueDash0MonitoringResourceExists( - ctx, - r.Client, - r.Status(), - updateStatusFailedMessage, - pseudoReconcileRequest, - logger, - ) - if err != nil || stop { - // if an error occurred, it has already been logged in verifyUniqueDash0MonitoringResourceExists - continue - } - - err = r.checkSettingsAndInstrumentExistingWorkloads(ctx, &dash0MonitoringResource, &logger) - if err != nil { - logger.Error( - err, - "Failed to apply/update instrumentation instrumentation at startup in one namespace.", - "namespace", - dash0MonitoringResource.Namespace, - "name", - dash0MonitoringResource.Name, - ) - continue - } - } -} - // The following markers are used to generate the rules permissions (RBAC) on config/rbac using controller-gen // when the command is executed. // To know more about markers see: https://book.kubebuilder.io/reference/markers.html @@ -198,7 +91,7 @@ func (r *Dash0Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl logger := log.FromContext(ctx) logger.Info("processing reconcile request for Dash0 monitoring resource") - namespaceStillExists, err := checkIfNamespaceExists(ctx, r.Clientset, req.Namespace, &logger) + namespaceStillExists, err := util.CheckIfNamespaceExists(ctx, r.Clientset, req.Namespace, &logger) if err != nil { // The error has already been logged in checkIfNamespaceExists. return ctrl.Result{}, err @@ -208,13 +101,12 @@ func (r *Dash0Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl return ctrl.Result{}, nil } - dash0MonitoringResource, stopReconcile, err := verifyUniqueDash0MonitoringResourceExists( + dash0MonitoringResource, stopReconcile, err := util.VerifyUniqueDash0MonitoringResourceExists( ctx, r.Client, - r.Status(), updateStatusFailedMessage, req, - logger, + &logger, ) if err != nil { return ctrl.Result{}, err @@ -222,7 +114,7 @@ func (r *Dash0Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl return ctrl.Result{}, nil } - isFirstReconcile, err := initStatusConditions( + isFirstReconcile, err := util.InitStatusConditions( ctx, r.Status(), dash0MonitoringResource, @@ -233,7 +125,7 @@ func (r *Dash0Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl return ctrl.Result{}, err } - isMarkedForDeletion, runCleanupActions, err := checkImminentDeletionAndHandleFinalizers( + isMarkedForDeletion, runCleanupActions, err := util.CheckImminentDeletionAndHandleFinalizers( ctx, r.Client, dash0MonitoringResource, @@ -249,8 +141,8 @@ func (r *Dash0Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl // error has already been logged in runCleanupActions return ctrl.Result{}, err } - // The Dash0 monitoring resource is slated for deletion, all cleanup actions (like reverting instrumented resources) - // have been processed, no further reconciliation is necessary. + // The Dash0 monitoring resource is slated for deletion, all cleanup actions (like reverting instrumented + // resources) have been processed, no further reconciliation is necessary. return ctrl.Result{}, nil } else if isMarkedForDeletion { // The Dash0 monitoring resource is slated for deletion, the finalizer has already been removed (which means all @@ -269,7 +161,7 @@ func (r *Dash0Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl return ctrl.Result{}, err } - var requiredAction modificationMode + var requiredAction util.ModificationMode dash0MonitoringResource, requiredAction, err = r.manageInstrumentWorkloadsChanges(ctx, dash0MonitoringResource, isFirstReconcile, &logger) if err != nil { @@ -277,14 +169,14 @@ func (r *Dash0Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl return ctrl.Result{}, err } - if isFirstReconcile || requiredAction == modificationModeInstrumentation { - if err = r.checkSettingsAndInstrumentExistingWorkloads(ctx, dash0MonitoringResource, &logger); err != nil { + if isFirstReconcile || requiredAction == util.ModificationModeInstrumentation { + if err = r.Instrumenter.CheckSettingsAndInstrumentExistingWorkloads(ctx, dash0MonitoringResource, &logger); err != nil { // The error has already been logged in checkSettingsAndInstrumentExistingWorkloads logger.Info("Requeuing reconcile request.") return ctrl.Result{}, err } - } else if requiredAction == modificationModeUninstrumentation { - if err = r.uninstrumentWorkloadsIfAvailable(ctx, dash0MonitoringResource, &logger); err != nil { + } else if requiredAction == util.ModificationModeUninstrumentation { + if err = r.Instrumenter.UninstrumentWorkloadsIfAvailable(ctx, dash0MonitoringResource, &logger); err != nil { logger.Error(err, "Failed to uninstrument workloads, requeuing reconcile request.") return ctrl.Result{}, err } @@ -301,455 +193,54 @@ func (r *Dash0Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl return ctrl.Result{}, nil } -func (r *Dash0Reconciler) manageInstrumentWorkloadsChanges(ctx context.Context, dash0MonitoringResource *dash0v1alpha1.Dash0Monitoring, isFirstReconcile bool, logger *logr.Logger) (*dash0v1alpha1.Dash0Monitoring, modificationMode, error) { +func (r *Dash0Reconciler) manageInstrumentWorkloadsChanges( + ctx context.Context, + dash0MonitoringResource *dash0v1alpha1.Dash0Monitoring, + isFirstReconcile bool, + logger *logr.Logger, +) (*dash0v1alpha1.Dash0Monitoring, util.ModificationMode, error) { previous := dash0MonitoringResource.Status.PreviousInstrumentWorkloads current := dash0MonitoringResource.ReadInstrumentWorkloadsSetting() - var requiredAction modificationMode + var requiredAction util.ModificationMode if !isFirstReconcile { if previous != dash0v1alpha1.All && previous != "" && current == dash0v1alpha1.All { logger.Info(fmt.Sprintf( "The instrumentWorkloads setting has changed from \"%s\" to \"%s\" (or it is absent, in which case it"+ "defaults to \"all\"). Workloads in this namespace will now be instrumented so they send "+ "telemetry to Dash0.", previous, current)) - requiredAction = modificationModeInstrumentation + requiredAction = util.ModificationModeInstrumentation } else if previous != dash0v1alpha1.None && current == dash0v1alpha1.None { logger.Info(fmt.Sprintf( "The instrumentWorkloads setting has changed from \"%s\" to \"%s\". Instrumented workloads in this "+ "namespace will now be uninstrumented, they will no longer send telemetry to Dash0.", previous, current)) - requiredAction = modificationModeUninstrumentation + requiredAction = util.ModificationModeUninstrumentation } } if previous != current { dash0MonitoringResource.Status.PreviousInstrumentWorkloads = current if err := r.Status().Update(ctx, dash0MonitoringResource); err != nil { - logger.Error(err, "Failed to update the previous instrumentWorkloads status on the Dash0 monitoring resource, requeuing reconcile request.") + logger.Error(err, "Failed to update the previous instrumentWorkloads status on the Dash0 monitoring "+ + "resource, requeuing reconcile request.") return dash0MonitoringResource, "", err } } return dash0MonitoringResource, requiredAction, nil } -func (r *Dash0Reconciler) checkSettingsAndInstrumentExistingWorkloads( - ctx context.Context, - dash0MonitoringResource *dash0v1alpha1.Dash0Monitoring, - logger *logr.Logger, -) error { - instrumentWorkloads := dash0MonitoringResource.ReadInstrumentWorkloadsSetting() - if instrumentWorkloads == dash0v1alpha1.None { - logger.Info( - "Instrumentation is not enabled, neither new nor existing workloads will be modified to send telemetry " + - "to Dash0.", - ) - return nil - } - if instrumentWorkloads == dash0v1alpha1.CreatedAndUpdated { - logger.Info( - "Instrumenting existing workloads is not enabled, only new or updated workloads will be modified (at " + - "deploy time) to send telemetry to Dash0.", - ) - return nil - } - - logger.Info("Now instrumenting existing workloads in namespace so they send telemetry to Dash0.") - if err := r.instrumentAllWorkloads(ctx, dash0MonitoringResource, logger); err != nil { - logger.Error(err, "Instrumenting existing workloads failed.") - return err - } - - return nil -} - -func (r *Dash0Reconciler) instrumentAllWorkloads( - ctx context.Context, - dash0MonitoringResource *dash0v1alpha1.Dash0Monitoring, - logger *logr.Logger, -) error { - namespace := dash0MonitoringResource.Namespace - - errCronJobs := r.findAndInstrumentCronJobs(ctx, namespace, logger) - errDaemonSets := r.findAndInstrumentyDaemonSets(ctx, namespace, logger) - errDeployments := r.findAndInstrumentDeployments(ctx, namespace, logger) - errJobs := r.findAndAddLabelsToImmutableJobsOnInstrumentation(ctx, namespace, logger) - errReplicaSets := r.findAndInstrumentReplicaSets(ctx, namespace, logger) - errStatefulSets := r.findAndInstrumentStatefulSets(ctx, namespace, logger) - combinedErrors := errors.Join( - errCronJobs, - errDaemonSets, - errDeployments, - errJobs, - errReplicaSets, - errStatefulSets, - ) - if combinedErrors != nil { - return combinedErrors - } - return nil -} - -func (r *Dash0Reconciler) findAndInstrumentCronJobs( - ctx context.Context, - namespace string, - logger *logr.Logger, -) error { - matchingWorkloadsInNamespace, err := - r.Clientset.BatchV1().CronJobs(namespace).List(ctx, util.EmptyListOptions) - if err != nil { - return fmt.Errorf("error when querying cron jobs: %w", err) - } - for _, resource := range matchingWorkloadsInNamespace.Items { - r.instrumentCronJob(ctx, resource, logger) - } - return nil -} - -func (r *Dash0Reconciler) instrumentCronJob( - ctx context.Context, - cronJob batchv1.CronJob, - reconcileLogger *logr.Logger, -) { - r.instrumentWorkload(ctx, &cronJobWorkload{ - cronJob: &cronJob, - }, reconcileLogger) -} - -func (r *Dash0Reconciler) findAndInstrumentyDaemonSets( - ctx context.Context, - namespace string, - logger *logr.Logger, -) error { - matchingWorkloadsInNamespace, err := - r.Clientset.AppsV1().DaemonSets(namespace).List(ctx, util.EmptyListOptions) - if err != nil { - return fmt.Errorf("error when querying daemon sets: %w", err) - } - for _, resource := range matchingWorkloadsInNamespace.Items { - r.instrumentDaemonSet(ctx, resource, logger) - } - return nil -} - -func (r *Dash0Reconciler) instrumentDaemonSet( - ctx context.Context, - daemonSet appsv1.DaemonSet, - reconcileLogger *logr.Logger, -) { - r.instrumentWorkload(ctx, &daemonSetWorkload{ - daemonSet: &daemonSet, - }, reconcileLogger) -} - -func (r *Dash0Reconciler) findAndInstrumentDeployments( - ctx context.Context, - namespace string, - logger *logr.Logger, -) error { - matchingWorkloadsInNamespace, err := - r.Clientset.AppsV1().Deployments(namespace).List(ctx, util.EmptyListOptions) - if err != nil { - return fmt.Errorf("error when querying deployments: %w", err) - } - for _, resource := range matchingWorkloadsInNamespace.Items { - r.instrumentDeployment(ctx, resource, logger) - } - return nil -} - -func (r *Dash0Reconciler) instrumentDeployment( - ctx context.Context, - deployment appsv1.Deployment, - reconcileLogger *logr.Logger, -) { - r.instrumentWorkload(ctx, &deploymentWorkload{ - deployment: &deployment, - }, reconcileLogger) -} - -func (r *Dash0Reconciler) findAndAddLabelsToImmutableJobsOnInstrumentation( - ctx context.Context, - namespace string, - logger *logr.Logger, -) error { - matchingWorkloadsInNamespace, err := - r.Clientset.BatchV1().Jobs(namespace).List(ctx, util.EmptyListOptions) - if err != nil { - return fmt.Errorf("error when querying jobs: %w", err) - } - - for _, job := range matchingWorkloadsInNamespace.Items { - r.handleJobJobOnInstrumentation(ctx, job, logger) - } - return nil -} - -func (r *Dash0Reconciler) handleJobJobOnInstrumentation( - ctx context.Context, - job batchv1.Job, - reconcileLogger *logr.Logger, -) { - logger := reconcileLogger.WithValues( - workkloadTypeLabel, - "Job", - workloadNamespaceLabel, - job.GetNamespace(), - workloadNameLabel, - job.GetName(), - ) - if job.DeletionTimestamp != nil { - // do not modify resources that are being deleted - logger.Info("not instrumenting this workload since it is about to be deleted (a deletion timestamp is set)") - return - } - - objectMeta := &job.ObjectMeta - var requiredAction modificationMode - modifyLabels := true - createImmutableWorkloadsError := true - if util.HasOptedOutOfInstrumenation(objectMeta) && util.InstrumenationAttemptHasFailed(objectMeta) { - // There has been an unsuccessful attempt to instrument this job before, but now the user has added the opt-out - // label, so we can remove the labels left over from that earlier attempt. - // "requiredAction = Instrumentation" in the context of immutable jobs means "remove Dash0 labels from the job", - // no other modification will take place. - requiredAction = modificationModeUninstrumentation - createImmutableWorkloadsError = false - } else if util.HasOptedOutOfInstrumenation(objectMeta) && util.HasBeenInstrumentedSuccessfully(objectMeta) { - // This job has been instrumented successfully, presumably by the webhook. Since then, the opt-out label has - // been added. The correct action would be to uninstrument it, but since it is immutable, we cannot do that. - // We will not actually modify this job at all, but create a log message and a corresponding event. - modifyLabels = false - requiredAction = modificationModeUninstrumentation - } else if util.HasOptedOutOfInstrumenation(objectMeta) { - // has opt-out label and there has been no previous instrumentation attempt - logger.Info("not instrumenting this workload due to dash0.com/enable=false") - return - } else if util.HasBeenInstrumentedSuccessfully(objectMeta) || util.InstrumenationAttemptHasFailed(objectMeta) { - // We already have instrumented this job (via the webhook) or have failed to instrument it, in either case, - // there is nothing to do here. - return - } else { - // We have not attempted to instrument this job yet, that is, we are seeing this job for the first time now. - // - // "requiredAction = Instrumentation" in the context of immutable jobs means "add labels to the job", no other - // modification will (or can) take place. - requiredAction = modificationModeInstrumentation - } - - retryErr := util.Retry("handling immutable job", func() error { - if !modifyLabels { - return nil - } - - if err := r.Client.Get(ctx, client.ObjectKey{ - Namespace: job.GetNamespace(), - Name: job.GetName(), - }, &job); err != nil { - return fmt.Errorf("error when fetching job %s/%s: %w", job.GetNamespace(), job.GetName(), err) - } - - hasBeenModified := false - switch requiredAction { - case modificationModeInstrumentation: - hasBeenModified = newWorkloadModifier(r.Images, r.OTelCollectorBaseUrl, &logger).AddLabelsToImmutableJob(&job) - case modificationModeUninstrumentation: - hasBeenModified = newWorkloadModifier(r.Images, r.OTelCollectorBaseUrl, &logger).RemoveLabelsFromImmutableJob(&job) - } - - if hasBeenModified { - return r.Client.Update(ctx, &job) - } else { - return nil - } - }, &logger) - - postProcess := r.postProcessInstrumentation - if requiredAction == modificationModeUninstrumentation { - postProcess = r.postProcessUninstrumentation - } - if retryErr != nil { - postProcess(&job, false, retryErr, &logger) - } else if createImmutableWorkloadsError { - // One way or another we are in a situation were we would have wanted to instrument/uninstrument the job, but - // could not. Passing an ImmutableWorkloadError to postProcess will make sure we write a corresponding log - // message and create a corresponding event. - postProcess(&job, false, ImmutableWorkloadError{ - workloadType: "job", - workloadName: fmt.Sprintf("%s/%s", job.GetNamespace(), job.GetName()), - modificationMode: requiredAction, - }, &logger) - } else { - postProcess(&job, false, nil, &logger) - } -} - -func (r *Dash0Reconciler) findAndInstrumentReplicaSets( - ctx context.Context, - namespace string, - logger *logr.Logger, -) error { - matchingWorkloadsInNamespace, err := - r.Clientset.AppsV1().ReplicaSets(namespace).List(ctx, util.EmptyListOptions) - if err != nil { - return fmt.Errorf("error when querying replica sets: %w", err) - } - for _, resource := range matchingWorkloadsInNamespace.Items { - r.instrumentReplicaSet(ctx, resource, logger) - } - return nil -} - -func (r *Dash0Reconciler) instrumentReplicaSet( - ctx context.Context, - replicaSet appsv1.ReplicaSet, - reconcileLogger *logr.Logger, -) { - hasBeenUpdated := r.instrumentWorkload(ctx, &replicaSetWorkload{ - replicaSet: &replicaSet, - }, reconcileLogger) - - if hasBeenUpdated { - r.restartPodsOfReplicaSet(ctx, replicaSet, reconcileLogger) - } -} - -func (r *Dash0Reconciler) findAndInstrumentStatefulSets( - ctx context.Context, - namespace string, - logger *logr.Logger, -) error { - matchingWorkloadsInNamespace, err := r.Clientset.AppsV1().StatefulSets(namespace).List(ctx, util.EmptyListOptions) - if err != nil { - return fmt.Errorf("error when querying stateful sets: %w", err) - } - for _, resource := range matchingWorkloadsInNamespace.Items { - r.instrumentStatefulSet(ctx, resource, logger) - } - return nil -} - -func (r *Dash0Reconciler) instrumentStatefulSet( - ctx context.Context, - statefulSet appsv1.StatefulSet, - reconcileLogger *logr.Logger, -) { - r.instrumentWorkload(ctx, &statefulSetWorkload{ - statefulSet: &statefulSet, - }, reconcileLogger) -} - -func (r *Dash0Reconciler) instrumentWorkload( - ctx context.Context, - workload instrumentableWorkload, - reconcileLogger *logr.Logger, -) bool { - objectMeta := workload.getObjectMeta() - kind := workload.getKind() - logger := reconcileLogger.WithValues( - workkloadTypeLabel, - kind, - workloadNamespaceLabel, - objectMeta.GetNamespace(), - workloadNameLabel, - objectMeta.GetName(), - ) - if objectMeta.DeletionTimestamp != nil { - // do not modify resources that are being deleted - logger.Info("not instrumenting this workload since it is about to be deleted (a deletion timestamp is set)") - return false - } - - var requiredAction modificationMode - if util.WasInstrumentedButHasOptedOutNow(objectMeta) { - requiredAction = modificationModeUninstrumentation - } else if util.HasBeenInstrumentedSuccessfullyByThisVersion(objectMeta, r.Images) { - // No change necessary, this workload has already been instrumented and an opt-out label (which would need to - // trigger uninstrumentation) has not been added since it has been instrumented. - logger.Info("not updating the existing instrumentation for this workload, it has already been successfully " + - "instrumented by the same operator version") - return false - } else if util.HasOptedOutOfInstrumenationAndIsUninstrumented(workload.getObjectMeta()) { - logger.Info("not instrumenting this workload due to dash0.com/enable=false") - return false - } else { - requiredAction = modificationModeInstrumentation - } - - hasBeenModified := false - retryErr := util.Retry(fmt.Sprintf("instrumenting %s", kind), func() error { - if err := r.Client.Get(ctx, client.ObjectKey{ - Namespace: objectMeta.GetNamespace(), - Name: objectMeta.GetName(), - }, workload.asClientObject()); err != nil { - return fmt.Errorf( - "error when fetching %s %s/%s: %w", - kind, - objectMeta.GetNamespace(), - objectMeta.GetName(), - err, - ) - } - - switch requiredAction { - case modificationModeInstrumentation: - hasBeenModified = workload.instrument(r.Images, r.OTelCollectorBaseUrl, &logger) - case modificationModeUninstrumentation: - hasBeenModified = workload.revert(r.Images, r.OTelCollectorBaseUrl, &logger) - } - - if hasBeenModified { - return r.Client.Update(ctx, workload.asClientObject()) - } else { - return nil - } - }, &logger) - - switch requiredAction { - case modificationModeInstrumentation: - return r.postProcessInstrumentation(workload.asRuntimeObject(), hasBeenModified, retryErr, &logger) - case modificationModeUninstrumentation: - return r.postProcessUninstrumentation(workload.asRuntimeObject(), hasBeenModified, retryErr, &logger) - } - return false -} - -func (r *Dash0Reconciler) postProcessInstrumentation( - resource runtime.Object, - hasBeenModified bool, - retryErr error, - logger *logr.Logger, -) bool { - if retryErr != nil { - e := &ImmutableWorkloadError{} - if errors.As(retryErr, e) { - logger.Info(e.Error()) - } else { - logger.Error(retryErr, "Dash0 instrumentation by controller has not been successful.") - } - util.QueueFailedInstrumentationEvent(r.Recorder, resource, "controller", retryErr) - return false - } else if !hasBeenModified { - // TODO This also happens for replica sets owned by a deployment and the log message as well as the message on - // the event are unspecific, would be better if we could differentiate between the two cases. - // (Also for revert maybe.) - logger.Info("Dash0 instrumentation was already present on this workload, or the workload is part of a higher " + - "order workload that will be instrumented, no modification by the controller is necessary.") - util.QueueNoInstrumentationNecessaryEvent(r.Recorder, resource, "controller") - return false - } else { - logger.Info("The controller has added Dash0 instrumentation to the workload.") - util.QueueSuccessfulInstrumentationEvent(r.Recorder, resource, "controller") - return true - } -} - func (r *Dash0Reconciler) runCleanupActions( ctx context.Context, dash0MonitoringResource *dash0v1alpha1.Dash0Monitoring, logger *logr.Logger, ) error { - if err := r.uninstrumentWorkloadsIfAvailable(ctx, dash0MonitoringResource, logger); err != nil { + if err := r.Instrumenter.UninstrumentWorkloadsIfAvailable( + ctx, + dash0MonitoringResource, + logger, + ); err != nil { logger.Error(err, "Failed to uninstrument workloads, requeuing reconcile request.") return err } @@ -760,7 +251,8 @@ func (r *Dash0Reconciler) runCleanupActions( r.OperatorNamespace, dash0MonitoringResource, ); err != nil { - logger.Error(err, "Failed to check if the OpenTelemetry collector instance needs to be removed or failed removing it.") + logger.Error(err, "Failed to check if the OpenTelemetry collector instance needs to be removed or failed "+ + "removing it.") return err } @@ -775,415 +267,13 @@ func (r *Dash0Reconciler) runCleanupActions( controllerutil.RemoveFinalizer(dash0MonitoringResource, dash0v1alpha1.FinalizerId) if err := r.Update(ctx, dash0MonitoringResource); err != nil { - logger.Error(err, "Failed to remove the finalizer from the Dash0 monitoring resource, requeuing reconcile request.") + logger.Error(err, "Failed to remove the finalizer from the Dash0 monitoring resource, requeuing reconcile "+ + "request.") return err } return nil } -func (r *Dash0Reconciler) uninstrumentWorkloadsIfAvailable( - ctx context.Context, - dash0MonitoringResource *dash0v1alpha1.Dash0Monitoring, - logger *logr.Logger, -) error { - if dash0MonitoringResource.IsAvailable() { - logger.Info("Reverting Dash0's modifications to workloads that have been instrumented to make them send telemetry to Dash0.") - if err := r.uninstrumentWorkloads(ctx, dash0MonitoringResource, logger); err != nil { - logger.Error(err, "Uninstrumenting existing workloads failed.") - return err - } - } else { - logger.Info("Removing the Dash0 monitoring resource and running finalizers, but Dash0 is not marked as available." + - " Dash0 Instrumentation will not be removed from workloads..") - } - return nil -} - -func (r *Dash0Reconciler) uninstrumentWorkloads( - ctx context.Context, - dash0MonitoringResource *dash0v1alpha1.Dash0Monitoring, - logger *logr.Logger, -) error { - namespace := dash0MonitoringResource.Namespace - - errCronJobs := r.findAndUninstrumentCronJobs(ctx, namespace, logger) - errDaemonSets := r.findAndUninstrumentDaemonSets(ctx, namespace, logger) - errDeployments := r.findAndUninstrumentDeployments(ctx, namespace, logger) - errJobs := r.findAndHandleJobOnUninstrumentation(ctx, namespace, logger) - errReplicaSets := r.findAndUninstrumentReplicaSets(ctx, namespace, logger) - errStatefulSets := r.findAndUninstrumentStatefulSets(ctx, namespace, logger) - combinedErrors := errors.Join( - errCronJobs, - errDaemonSets, - errDeployments, - errJobs, - errReplicaSets, - errStatefulSets, - ) - if combinedErrors != nil { - return combinedErrors - } - return nil -} - -func (r *Dash0Reconciler) findAndUninstrumentCronJobs( - ctx context.Context, - namespace string, - logger *logr.Logger, -) error { - matchingWorkloadsInNamespace, err := - r.Clientset.BatchV1().CronJobs(namespace).List(ctx, util.WorkloadsWithDash0InstrumentedLabelFilter) - if err != nil { - return fmt.Errorf("error when querying instrumented cron jobs: %w", err) - } - for _, resource := range matchingWorkloadsInNamespace.Items { - r.uninstrumentCronJob(ctx, resource, logger) - } - return nil -} - -func (r *Dash0Reconciler) uninstrumentCronJob( - ctx context.Context, - cronJob batchv1.CronJob, - reconcileLogger *logr.Logger, -) { - r.revertWorkloadInstrumentation(ctx, &cronJobWorkload{ - cronJob: &cronJob, - }, reconcileLogger) -} - -func (r *Dash0Reconciler) findAndUninstrumentDaemonSets(ctx context.Context, namespace string, logger *logr.Logger) error { - matchingWorkloadsInNamespace, err := - r.Clientset.AppsV1().DaemonSets(namespace).List(ctx, util.WorkloadsWithDash0InstrumentedLabelFilter) - if err != nil { - return fmt.Errorf("error when querying instrumented daemon sets: %w", err) - } - for _, resource := range matchingWorkloadsInNamespace.Items { - r.uninstrumentDaemonSet(ctx, resource, logger) - } - return nil -} - -func (r *Dash0Reconciler) uninstrumentDaemonSet( - ctx context.Context, - daemonSet appsv1.DaemonSet, - reconcileLogger *logr.Logger, -) { - r.revertWorkloadInstrumentation(ctx, &daemonSetWorkload{ - daemonSet: &daemonSet, - }, reconcileLogger) -} - -func (r *Dash0Reconciler) findAndUninstrumentDeployments( - ctx context.Context, - namespace string, - logger *logr.Logger, -) error { - matchingWorkloadsInNamespace, err := - r.Clientset.AppsV1().Deployments(namespace).List(ctx, util.WorkloadsWithDash0InstrumentedLabelFilter) - if err != nil { - return fmt.Errorf("error when querying instrumented deployments: %w", err) - } - for _, resource := range matchingWorkloadsInNamespace.Items { - r.uninstrumentDeployment(ctx, resource, logger) - } - return nil -} - -func (r *Dash0Reconciler) uninstrumentDeployment( - ctx context.Context, - deployment appsv1.Deployment, - reconcileLogger *logr.Logger, -) { - r.revertWorkloadInstrumentation(ctx, &deploymentWorkload{ - deployment: &deployment, - }, reconcileLogger) -} - -func (r *Dash0Reconciler) findAndHandleJobOnUninstrumentation( - ctx context.Context, - namespace string, - logger *logr.Logger, -) error { - matchingWorkloadsInNamespace, err := r.Clientset.BatchV1().Jobs(namespace).List(ctx, util.WorkloadsWithDash0InstrumentedLabelFilter) - if err != nil { - return fmt.Errorf("error when querying instrumented jobs: %w", err) - } - - for _, job := range matchingWorkloadsInNamespace.Items { - r.handleJobOnUninstrumentation(ctx, job, logger) - } - return nil -} - -func (r *Dash0Reconciler) handleJobOnUninstrumentation(ctx context.Context, job batchv1.Job, reconcileLogger *logr.Logger) { - logger := reconcileLogger.WithValues( - workkloadTypeLabel, - "Job", - workloadNamespaceLabel, - job.GetNamespace(), - workloadNameLabel, - job.GetName(), - ) - if job.DeletionTimestamp != nil { - // do not modify resources that are being deleted - logger.Info("not uninstrumenting this workload since it is about to be deleted (a deletion timestamp is set)") - return - } - - // Note: In contrast to the instrumentation logic, there is no need to check for dash.com/enable=false here: - // If it is set, the workload would not have been instrumented in the first place, hence the label selector filter - // looking for dash0.com/instrumented=true would not have matched. Or if the workload is actually instrumented, - // although it has dash0.com/enabled=false it must have been set after the instrumentation, in which case - // uninstrumenting it is the correct thing to do. - - createImmutableWorkloadsError := false - retryErr := util.Retry("removing labels from immutable job", func() error { - if err := r.Client.Get(ctx, client.ObjectKey{ - Namespace: job.GetNamespace(), - Name: job.GetName(), - }, &job); err != nil { - return fmt.Errorf("error when fetching job %s/%s: %w", job.GetNamespace(), job.GetName(), err) - } - if util.HasBeenInstrumentedSuccessfully(&job.ObjectMeta) { - // This job has been instrumented, presumably by the webhook. We cannot undo the instrumentation here, since - // jobs are immutable. - - // Deliberately not calling newWorkloadModifier(r.Images, &logger).RemoveLabelsFromImmutableJob(&job) here - // since we cannot remove the instrumentation, so we also have to leave the labels in place. - createImmutableWorkloadsError = true - return nil - } else if util.InstrumenationAttemptHasFailed(&job.ObjectMeta) { - // There was an attempt to instrument this job (probably by the controller), which has not been successful. - // We only need remove the labels from that instrumentation attempt to clean up. - newWorkloadModifier(r.Images, r.OTelCollectorBaseUrl, &logger).RemoveLabelsFromImmutableJob(&job) - - // Apparently for jobs we do not need to set the "dash0.com/webhook-ignore-once" label, since changing their - // labels does not trigger a new admission request. - return r.Client.Update(ctx, &job) - } else { - // No dash0.com/instrumented label is present, do nothing. - return nil - } - }, &logger) - - if retryErr != nil { - // For the case that the job was instrumented, and we could not uninstrument it, we create a - // ImmutableWorkloadError inside the retry loop. This error is then handled in the postProcessUninstrumentation. - // The same is true for any other error types (for example errors in r.ClientUpdate). - r.postProcessUninstrumentation(&job, false, retryErr, &logger) - } else if createImmutableWorkloadsError { - r.postProcessUninstrumentation(&job, false, ImmutableWorkloadError{ - workloadType: "job", - workloadName: fmt.Sprintf("%s/%s", job.GetNamespace(), job.GetName()), - modificationMode: modificationModeUninstrumentation, - }, &logger) - } else { - r.postProcessUninstrumentation(&job, false, nil, &logger) - } -} - -func (r *Dash0Reconciler) findAndUninstrumentReplicaSets( - ctx context.Context, - namespace string, - logger *logr.Logger, -) error { - matchingWorkloadsInNamespace, err := - r.Clientset.AppsV1().ReplicaSets(namespace).List(ctx, util.WorkloadsWithDash0InstrumentedLabelFilter) - if err != nil { - return fmt.Errorf("error when querying instrumented replica sets: %w", err) - } - for _, resource := range matchingWorkloadsInNamespace.Items { - r.uninstrumentReplicaSet(ctx, resource, logger) - } - return nil -} - -func (r *Dash0Reconciler) uninstrumentReplicaSet(ctx context.Context, replicaSet appsv1.ReplicaSet, reconcileLogger *logr.Logger) { - hasBeenUpdated := r.revertWorkloadInstrumentation(ctx, &replicaSetWorkload{ - replicaSet: &replicaSet, - }, reconcileLogger) - - if hasBeenUpdated { - r.restartPodsOfReplicaSet(ctx, replicaSet, reconcileLogger) - } -} - -func (r *Dash0Reconciler) findAndUninstrumentStatefulSets( - ctx context.Context, - namespace string, - logger *logr.Logger, -) error { - matchingWorkloadsInNamespace, err := - r.Clientset.AppsV1().StatefulSets(namespace).List(ctx, util.WorkloadsWithDash0InstrumentedLabelFilter) - if err != nil { - return fmt.Errorf("error when querying instrumented stateful sets: %w", err) - } - for _, resource := range matchingWorkloadsInNamespace.Items { - r.uninstrumentStatefulSet(ctx, resource, logger) - } - return nil -} - -func (r *Dash0Reconciler) uninstrumentStatefulSet( - ctx context.Context, - statefulSet appsv1.StatefulSet, - reconcileLogger *logr.Logger, -) { - r.revertWorkloadInstrumentation(ctx, &statefulSetWorkload{ - statefulSet: &statefulSet, - }, reconcileLogger) -} - -func (r *Dash0Reconciler) revertWorkloadInstrumentation( - ctx context.Context, - workload instrumentableWorkload, - reconcileLogger *logr.Logger, -) bool { - objectMeta := workload.getObjectMeta() - kind := workload.getKind() - logger := reconcileLogger.WithValues( - workkloadTypeLabel, - kind, - workloadNamespaceLabel, - objectMeta.GetNamespace(), - workloadNameLabel, - objectMeta.GetName(), - ) - if objectMeta.DeletionTimestamp != nil { - // do not modify resources that are being deleted - logger.Info("not uninstrumenting this workload since it is about to be deleted (a deletion timestamp is set)") - return false - } - - // Note: In contrast to the instrumentation logic, there is no need to check for dash.com/enable=false here: - // If it is set, the workload would not have been instrumented in the first place, hence the label selector filter - // looking for dash0.com/instrumented=true would not have matched. Or if the workload is actually instrumented, - // although it has dash0.com/enabled=false it must have been set after the instrumentation, in which case - // uninstrumenting it is the correct thing to do. - - hasBeenModified := false - retryErr := util.Retry(fmt.Sprintf("uninstrumenting %s", kind), func() error { - if err := r.Client.Get(ctx, client.ObjectKey{ - Namespace: objectMeta.GetNamespace(), - Name: objectMeta.GetName(), - }, workload.asClientObject()); err != nil { - return fmt.Errorf( - "error when fetching %s %s/%s: %w", - kind, - objectMeta.GetNamespace(), - objectMeta.GetName(), - err, - ) - } - hasBeenModified = workload.revert(r.Images, r.OTelCollectorBaseUrl, &logger) - if hasBeenModified { - // Changing the workload spec sometimes triggers a new admission request, which would re-instrument the - // workload via the webhook immediately. To prevent this, we add a label that the webhook can check to - // prevent instrumentation. - util.AddWebhookIgnoreOnceLabel(objectMeta) - return r.Client.Update(ctx, workload.asClientObject()) - } else { - return nil - } - }, &logger) - - return r.postProcessUninstrumentation(workload.asRuntimeObject(), hasBeenModified, retryErr, &logger) -} - -func (r *Dash0Reconciler) postProcessUninstrumentation( - resource runtime.Object, - hasBeenModified bool, - retryErr error, - logger *logr.Logger, -) bool { - if retryErr != nil { - e := &ImmutableWorkloadError{} - if errors.As(retryErr, e) { - logger.Info(e.Error()) - } else { - logger.Error(retryErr, "Dash0's removal of instrumentation by controller has not been successful.") - } - util.QueueFailedUninstrumentationEvent(r.Recorder, resource, "controller", retryErr) - return false - } else if !hasBeenModified { - logger.Info("Dash0 instrumentations was not present on this workload, no modification by the controller has " + - "been necessary.") - util.QueueNoUninstrumentationNecessaryEvent(r.Recorder, resource, "controller") - return false - } else { - logger.Info("The controller has removed the Dash0 instrumentation from the workload.") - util.QueueSuccessfulUninstrumentationEvent(r.Recorder, resource, "controller") - return true - } -} - -func newWorkloadModifier(images util.Images, oTelCollectorBaseUrl string, logger *logr.Logger) *workloads.ResourceModifier { - return workloads.NewResourceModifier( - util.InstrumentationMetadata{ - Images: images, - InstrumentedBy: "controller", - OTelCollectorBaseUrl: oTelCollectorBaseUrl, - }, - logger, - ) -} - -func (r *Dash0Reconciler) restartPodsOfReplicaSet( - ctx context.Context, - replicaSet appsv1.ReplicaSet, - logger *logr.Logger, -) { - // Note: ReplicaSet pods are not restarted automatically by Kubernetes when their spec is changed (for other - // resource types like deployments or daemonsets this is managed by Kubernetes automatically). Therefore, we - // find all pods owned by the replica set and explicitly delete them to trigger a restart. - allPodsInNamespace, err := - r.Clientset. - CoreV1(). - Pods(replicaSet.Namespace). - List(ctx, metav1.ListOptions{ - TimeoutSeconds: &timeoutForListingPods, - }) - if err != nil { - logger.Error( - err, - fmt.Sprintf( - "Failed to list all pods in the namespaces for the purpose of restarting the pods owned by the "+ - "replica set %s/%s (%s), pods will not be restarted automatically.", - replicaSet.Namespace, - replicaSet.Name, - replicaSet.UID, - )) - return - } - - podsOfReplicaSet := slices.DeleteFunc(allPodsInNamespace.Items, func(pod corev1.Pod) bool { - ownerReferences := pod.GetOwnerReferences() - for _, ownerReference := range ownerReferences { - if ownerReference.Kind == "ReplicaSet" && - ownerReference.Name == replicaSet.Name && - ownerReference.UID == replicaSet.UID { - return false - } - } - return true - }) - - for _, pod := range podsOfReplicaSet { - err := r.Client.Delete(ctx, &pod) - if err != nil { - logger.Info( - fmt.Sprintf( - "Failed to restart pod owned by the replica "+ - "set %s/%s (%s), this pod will not be restarted automatically.", - replicaSet.Namespace, - replicaSet.Name, - replicaSet.UID, - )) - } - } -} - func (r *Dash0Reconciler) scheduleAttachDanglingEvents( ctx context.Context, dash0MonitoringResource *dash0v1alpha1.Dash0Monitoring, diff --git a/internal/dash0/controller/dash0_controller_suite_test.go b/internal/dash0/controller/dash0_controller_suite_test.go index 99373e0f..6148f3de 100644 --- a/internal/dash0/controller/dash0_controller_suite_test.go +++ b/internal/dash0/controller/dash0_controller_suite_test.go @@ -19,11 +19,11 @@ import ( logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log/zap" + dash0v1alpha1 "github.com/dash0hq/dash0-operator/api/dash0monitoring/v1alpha1" + . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/onsi/gomega/format" - - dash0v1alpha1 "github.com/dash0hq/dash0-operator/api/dash0monitoring/v1alpha1" ) var ( diff --git a/internal/dash0/controller/dash0_controller_test.go b/internal/dash0/controller/dash0_controller_test.go index 8dc69498..9de77bcf 100644 --- a/internal/dash0/controller/dash0_controller_test.go +++ b/internal/dash0/controller/dash0_controller_test.go @@ -3,14 +3,18 @@ package controller +// Maintenance note: the canonical order/grouping for imports is: +// - standard library imports +// - external third-party library imports, except for test libraries +// - internal imports, except for internal test packages +// - external third party test libraries +// - internal test packages + import ( "context" "fmt" "time" - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" "k8s.io/apimachinery/pkg/api/meta" @@ -23,12 +27,12 @@ import ( dash0v1alpha1 "github.com/dash0hq/dash0-operator/api/dash0monitoring/v1alpha1" "github.com/dash0hq/dash0-operator/internal/backendconnection" "github.com/dash0hq/dash0-operator/internal/backendconnection/otelcolresources" - . "github.com/dash0hq/dash0-operator/test/util" -) + "github.com/dash0hq/dash0-operator/internal/dash0/instrumentation" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" -const ( - olderOperatorControllerImageLabel = "some-registry_com_1234_dash0hq_operator-controller_0.9.8" - olderInitContainerImageLabel = "some-registry_com_1234_dash0hq_instrumentation_2.3.4" + . "github.com/dash0hq/dash0-operator/test/util" ) var ( @@ -56,6 +60,13 @@ var _ = Describe("The Dash0 controller", Ordered, func() { BeforeEach(func() { createdObjects = make([]client.Object, 0) + instrumenter := &instrumentation.Instrumenter{ + Client: k8sClient, + Clientset: clientset, + Recorder: recorder, + Images: TestImages, + OTelCollectorBaseUrl: OTelCollectorBaseUrlTest, + } oTelColResourceManager := &otelcolresources.OTelColResourceManager{ Client: k8sClient, Scheme: k8sClient.Scheme(), @@ -70,10 +81,8 @@ var _ = Describe("The Dash0 controller", Ordered, func() { reconciler = &Dash0Reconciler{ Client: k8sClient, Clientset: clientset, - Recorder: recorder, - Scheme: k8sClient.Scheme(), + Instrumenter: instrumenter, Images: TestImages, - OTelCollectorBaseUrl: "http://dash0-operator-opentelemetry-collector.dash0-system.svc.cluster.local:4318", OperatorNamespace: Dash0OperatorNamespace, BackendConnectionManager: backendConnectionManager, DanglingEventsTimeouts: &DanglingEventsTimeouts{ @@ -501,142 +510,6 @@ var _ = Describe("The Dash0 controller", Ordered, func() { }), ) - DescribeTable("should instrument existing workloads at startup", func(config WorkloadTestConfig) { - name := UniqueName(config.WorkloadNamePrefix) - workload := config.CreateFn(ctx, k8sClient, namespace, name) - createdObjects = append(createdObjects, workload.Get()) - - reconciler.InstrumentAtStartup() - - VerifySuccessfulInstrumentationEvent(ctx, clientset, namespace, name, "controller") - config.VerifyFn(config.GetFn(ctx, k8sClient, namespace, name)) - }, Entry("should instrument a cron job at startup", WorkloadTestConfig{ - WorkloadNamePrefix: CronJobNamePrefix, - CreateFn: WrapCronJobFnAsTestableWorkload(CreateBasicCronJob), - GetFn: WrapCronJobFnAsTestableWorkload(GetCronJob), - VerifyFn: func(workload TestableWorkload) { - VerifyModifiedCronJob(workload.Get().(*batchv1.CronJob), BasicInstrumentedPodSpecExpectations()) - }, - }), Entry("should instrument a daemon set at startup", WorkloadTestConfig{ - WorkloadNamePrefix: DaemonSetNamePrefix, - CreateFn: WrapDaemonSetFnAsTestableWorkload(CreateBasicDaemonSet), - GetFn: WrapDaemonSetFnAsTestableWorkload(GetDaemonSet), - VerifyFn: func(workload TestableWorkload) { - VerifyModifiedDaemonSet(workload.Get().(*appsv1.DaemonSet), BasicInstrumentedPodSpecExpectations()) - }, - }), Entry("should instrument a deployment at startup", WorkloadTestConfig{ - WorkloadNamePrefix: DeploymentNamePrefix, - CreateFn: WrapDeploymentFnAsTestableWorkload(CreateBasicDeployment), - GetFn: WrapDeploymentFnAsTestableWorkload(GetDeployment), - VerifyFn: func(workload TestableWorkload) { - VerifyModifiedDeployment(workload.Get().(*appsv1.Deployment), BasicInstrumentedPodSpecExpectations()) - }, - }), Entry("should instrument a replica set at startup", WorkloadTestConfig{ - WorkloadNamePrefix: ReplicaSetNamePrefix, - CreateFn: WrapReplicaSetFnAsTestableWorkload(CreateBasicReplicaSet), - GetFn: WrapReplicaSetFnAsTestableWorkload(GetReplicaSet), - VerifyFn: func(workload TestableWorkload) { - VerifyModifiedReplicaSet(workload.Get().(*appsv1.ReplicaSet), BasicInstrumentedPodSpecExpectations()) - }, - }), Entry("should instrument a stateful set at startup", WorkloadTestConfig{ - WorkloadNamePrefix: StatefulSetNamePrefix, - CreateFn: WrapStatefulSetFnAsTestableWorkload(CreateBasicStatefulSet), - GetFn: WrapStatefulSetFnAsTestableWorkload(GetStatefulSet), - VerifyFn: func(workload TestableWorkload) { - VerifyModifiedStatefulSet(workload.Get().(*appsv1.StatefulSet), BasicInstrumentedPodSpecExpectations()) - }, - }), - ) - - Describe("should not instrument existing jobs at startup", func() { - It("should record a failure event when attempting to instrument an existing job at startup and add labels", func() { - name := UniqueName(JobNamePrefix) - job := CreateBasicJob(ctx, k8sClient, namespace, name) - createdObjects = append(createdObjects, job) - - reconciler.InstrumentAtStartup() - - VerifyFailedInstrumentationEvent( - ctx, - clientset, - namespace, - name, - fmt.Sprintf("Dash0 instrumentation of this workload by the controller has not been successful. "+ - "Error message: Dash0 cannot instrument the existing job test-namespace/%s, since this type "+ - "of workload is immutable.", name), - ) - VerifyImmutableJobCouldNotBeModified(GetJob(ctx, k8sClient, namespace, name)) - }) - }) - - DescribeTable("when updating instrumented workloads at startup", func(config WorkloadTestConfig) { - name := UniqueName(config.WorkloadNamePrefix) - workload := config.CreateFn(ctx, k8sClient, TestNamespaceName, name) - createdObjects = append(createdObjects, workload.Get()) - workload.GetObjectMeta().Labels["dash0.com/operator-image"] = olderOperatorControllerImageLabel - workload.GetObjectMeta().Labels["dash0.com/init-container-image"] = olderInitContainerImageLabel - UpdateWorkload(ctx, k8sClient, workload.Get()) - reconciler.InstrumentAtStartup() - config.VerifyFn(config.GetFn(ctx, k8sClient, TestNamespaceName, name)) - VerifySuccessfulInstrumentationEvent(ctx, clientset, namespace, name, "controller") - }, Entry("should override outdated instrumentation settings for a cron job at startup", WorkloadTestConfig{ - WorkloadNamePrefix: CronJobNamePrefix, - CreateFn: WrapCronJobFnAsTestableWorkload(CreateInstrumentedCronJob), - GetFn: WrapCronJobFnAsTestableWorkload(GetCronJob), - VerifyFn: func(workload TestableWorkload) { - VerifyModifiedCronJob(workload.Get().(*batchv1.CronJob), BasicInstrumentedPodSpecExpectations()) - }, - }), Entry("should override outdated instrumentation settings for a daemon set at startup", WorkloadTestConfig{ - WorkloadNamePrefix: DaemonSetNamePrefix, - CreateFn: WrapDaemonSetFnAsTestableWorkload(CreateInstrumentedDaemonSet), - GetFn: WrapDaemonSetFnAsTestableWorkload(GetDaemonSet), - VerifyFn: func(workload TestableWorkload) { - VerifyModifiedDaemonSet(workload.Get().(*appsv1.DaemonSet), BasicInstrumentedPodSpecExpectations()) - }, - }), Entry("should override outdated instrumentation settings for a deployment at startup", WorkloadTestConfig{ - WorkloadNamePrefix: DeploymentNamePrefix, - CreateFn: WrapDeploymentFnAsTestableWorkload(CreateInstrumentedDeployment), - GetFn: WrapDeploymentFnAsTestableWorkload(GetDeployment), - VerifyFn: func(workload TestableWorkload) { - VerifyModifiedDeployment(workload.Get().(*appsv1.Deployment), BasicInstrumentedPodSpecExpectations()) - }, - }), Entry("should override outdated instrumentation settings for a replica set at startup", WorkloadTestConfig{ - WorkloadNamePrefix: ReplicaSetNamePrefix, - CreateFn: WrapReplicaSetFnAsTestableWorkload(CreateInstrumentedReplicaSet), - GetFn: WrapReplicaSetFnAsTestableWorkload(GetReplicaSet), - VerifyFn: func(workload TestableWorkload) { - VerifyModifiedReplicaSet(workload.Get().(*appsv1.ReplicaSet), BasicInstrumentedPodSpecExpectations()) - }, - }), Entry("should override outdated instrumentation settings for a stateful set at startup", WorkloadTestConfig{ - WorkloadNamePrefix: StatefulSetNamePrefix, - CreateFn: WrapStatefulSetFnAsTestableWorkload(CreateInstrumentedStatefulSet), - GetFn: WrapStatefulSetFnAsTestableWorkload(GetStatefulSet), - VerifyFn: func(workload TestableWorkload) { - VerifyModifiedStatefulSet(workload.Get().(*appsv1.StatefulSet), BasicInstrumentedPodSpecExpectations()) - }, - }), - ) - - Describe("when attempting to update instrumented jobs at startup", func() { - It("should not override outdated instrumentation settings for a job at startup", func() { - name := UniqueName(JobNamePrefix) - workload := CreateInstrumentedJob(ctx, k8sClient, TestNamespaceName, name) - createdObjects = append(createdObjects, workload) - workload.ObjectMeta.Labels["dash0.com/operator-image"] = "some-registry.com_1234_dash0hq_operator-controller_0.9.8" - workload.ObjectMeta.Labels["dash0.com/init-container-image"] = "some-registry.com_1234_dash0hq_instrumentation_2.3.4" - UpdateWorkload(ctx, k8sClient, workload) - reconciler.InstrumentAtStartup() - - // we do not attempt to update the instrumentation for jobs, since they are immutable - workload = GetJob(ctx, k8sClient, TestNamespaceName, name) - jobLabels := workload.ObjectMeta.Labels - Expect(jobLabels["dash0.com/instrumented"]).To(Equal("true")) - Expect(jobLabels["dash0.com/operator-image"]).To(Equal("some-registry.com_1234_dash0hq_operator-controller_0.9.8")) - Expect(jobLabels["dash0.com/init-container-image"]).To(Equal("some-registry.com_1234_dash0hq_instrumentation_2.3.4")) - VerifyNoEvents(ctx, clientset, namespace) - }) - }) - DescribeTable("when deleting the Dash0 monitoring resource and reverting the instrumentation on cleanup", func(config WorkloadTestConfig) { // We trigger one reconcile request before creating any workload and before deleting the Dash0 monitoring // resource, just to get the `isFirstReconcile` logic out of the way and to add the finalizer. diff --git a/internal/dash0/controller/instrumentable_workloads.go b/internal/dash0/instrumentation/instrumentable_workloads.go similarity index 99% rename from internal/dash0/controller/instrumentable_workloads.go rename to internal/dash0/instrumentation/instrumentable_workloads.go index ba8bb086..b8ac01e2 100644 --- a/internal/dash0/controller/instrumentable_workloads.go +++ b/internal/dash0/instrumentation/instrumentable_workloads.go @@ -1,7 +1,7 @@ // SPDX-FileCopyrightText: Copyright 2024 Dash0 Inc. // SPDX-License-Identifier: Apache-2.0 -package controller +package instrumentation import ( "github.com/go-logr/logr" diff --git a/internal/dash0/instrumentation/instrumentation_suite_test.go b/internal/dash0/instrumentation/instrumentation_suite_test.go new file mode 100644 index 00000000..033df949 --- /dev/null +++ b/internal/dash0/instrumentation/instrumentation_suite_test.go @@ -0,0 +1,91 @@ +// SPDX-FileCopyrightText: Copyright 2024 Dash0 Inc. +// SPDX-License-Identifier: Apache-2.0 + +package instrumentation + +import ( + "fmt" + "path/filepath" + "runtime" + "testing" + + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/record" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/envtest" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + + dash0v1alpha1 "github.com/dash0hq/dash0-operator/api/dash0monitoring/v1alpha1" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/onsi/gomega/format" +) + +var ( + cfg *rest.Config + k8sClient client.Client + clientset *kubernetes.Clientset + recorder record.EventRecorder + testEnv *envtest.Environment +) + +func TestInstrumentation(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Instrumentation Suite") +} + +var _ = BeforeSuite(func() { + format.MaxLength = 0 + + logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))) + + By("bootstrapping test environment") + testEnv = &envtest.Environment{ + CRDDirectoryPaths: []string{filepath.Join("..", "..", "..", "config", "crd", "bases")}, + ErrorIfCRDPathMissing: true, + + // The BinaryAssetsDirectory is only required if you want to run the tests directly + // without call the makefile target test. If not informed it will look for the + // default path defined in controller-runtime which is /usr/local/kubebuilder/. + // Note that you must have the required binaries setup under the bin directory to perform + // the tests directly. When we run make test it will be setup and used automatically. + BinaryAssetsDirectory: filepath.Join("..", "..", "..", "bin", "k8s", + fmt.Sprintf("1.28.3-%s-%s", runtime.GOOS, runtime.GOARCH)), + } + + var err error + // cfg is defined in this file globally. + cfg, err = testEnv.Start() + Expect(err).NotTo(HaveOccurred()) + Expect(cfg).NotTo(BeNil()) + + Expect(dash0v1alpha1.AddToScheme(scheme.Scheme)).To(Succeed()) + + //+kubebuilder:scaffold:scheme + + k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme}) + Expect(err).NotTo(HaveOccurred()) + Expect(k8sClient).NotTo(BeNil()) + + clientset, err = kubernetes.NewForConfig(cfg) + Expect(err).NotTo(HaveOccurred()) + Expect(clientset).NotTo(BeNil()) + + mgr, err := ctrl.NewManager(cfg, ctrl.Options{ + Scheme: scheme.Scheme, + }) + Expect(err).NotTo(HaveOccurred()) + Expect(mgr).NotTo(BeNil()) + recorder = mgr.GetEventRecorderFor("dash0-controller") +}) + +var _ = AfterSuite(func() { + By("tearing down the test environment") + err := testEnv.Stop() + Expect(err).NotTo(HaveOccurred()) +}) diff --git a/internal/dash0/instrumentation/instrumenter.go b/internal/dash0/instrumentation/instrumenter.go new file mode 100644 index 00000000..eb49b488 --- /dev/null +++ b/internal/dash0/instrumentation/instrumenter.go @@ -0,0 +1,954 @@ +// SPDX-FileCopyrightText: Copyright 2024 Dash0 Inc. +// SPDX-License-Identifier: Apache-2.0 + +package instrumentation + +import ( + "context" + "errors" + "fmt" + "slices" + + "github.com/go-logr/logr" + appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/record" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + dash0v1alpha1 "github.com/dash0hq/dash0-operator/api/dash0monitoring/v1alpha1" + "github.com/dash0hq/dash0-operator/internal/dash0/util" + "github.com/dash0hq/dash0-operator/internal/dash0/workloads" +) + +type Instrumenter struct { + client.Client + Clientset *kubernetes.Clientset + Recorder record.EventRecorder + Images util.Images + OTelCollectorBaseUrl string +} + +type ImmutableWorkloadError struct { + workloadType string + workloadName string + modificationMode util.ModificationMode +} + +const ( + workkloadTypeLabel = "workload type" + workloadNamespaceLabel = "workload namespace" + workloadNameLabel = "workload name" + + updateStatusFailedMessage = "Failed to update Dash0 monitoring status conditions, requeuing reconcile request." +) + +var ( + timeoutForListingPods int64 = 2 +) + +func (e ImmutableWorkloadError) Error() string { + var modificationParticle string + switch e.modificationMode { + case util.ModificationModeInstrumentation: + modificationParticle = "instrument" + case util.ModificationModeUninstrumentation: + modificationParticle = "remove the instrumentation from" + default: + modificationParticle = "modify" + } + + return fmt.Sprintf( + "Dash0 cannot %s the existing %s %s, since this type of workload is immutable.", + modificationParticle, + e.workloadType, + e.workloadName, + ) +} + +// InstrumentAtStartup is run once, when the controller process starts. Its main purpose is to upgrade workloads that +// have already been instrumented, in namespaces where the Dash0 monitoring resource already exists. For those workloads, +// it is not guaranteed that a reconcile request will be triggered when the operator controller image is updated and +// restarted - reconcile requests are only triggered when the Dash0 monitoring resource is installed/changed/deleted. +// Since it runs the full instrumentation process, it might also as a byproduct instrument workloads that are not +// instrumented yet. It will only cover namespaces where a Dash0 monitoring resource exists, because it works by listing +// all Dash0 monitoring resources and then instrumenting workloads in the corresponding namespaces. +func (i *Instrumenter) InstrumentAtStartup( + ctx context.Context, + k8sClient client.Client, + logger *logr.Logger, +) { + logger.Info("Applying/updating instrumentation at controller startup.") + dash0MonitoringResourcesInNamespace := &dash0v1alpha1.Dash0MonitoringList{} + if err := k8sClient.List( + ctx, + dash0MonitoringResourcesInNamespace, + &client.ListOptions{}, + ); err != nil { + logger.Error(err, "Failed to list all Dash0 monitoring resources at controller startup.") + return + } + + logger.Info(fmt.Sprintf("Found %d Dash0 monitoring resources.", len(dash0MonitoringResourcesInNamespace.Items))) + for _, dash0MonitoringResource := range dash0MonitoringResourcesInNamespace.Items { + logger.Info(fmt.Sprintf("Processing workloads in Dash0-enabled namespace %s", dash0MonitoringResource.Namespace)) + + if dash0MonitoringResource.IsMarkedForDeletion() { + continue + } + pseudoReconcileRequest := ctrl.Request{ + NamespacedName: client.ObjectKey{ + Namespace: dash0MonitoringResource.Namespace, + Name: dash0MonitoringResource.Name, + }, + } + _, stop, err := util.VerifyUniqueDash0MonitoringResourceExists( + ctx, + k8sClient, + updateStatusFailedMessage, + pseudoReconcileRequest, + logger, + ) + if err != nil || stop { + // if an error occurred, it has already been logged in verifyUniqueDash0MonitoringResourceExists + continue + } + + err = i.CheckSettingsAndInstrumentExistingWorkloads(ctx, &dash0MonitoringResource, logger) + if err != nil { + logger.Error( + err, + "Failed to apply/update instrumentation instrumentation at startup in one namespace.", + "namespace", + dash0MonitoringResource.Namespace, + "name", + dash0MonitoringResource.Name, + ) + continue + } + } +} + +// CheckSettingsAndInstrumentExistingWorkloads is the main instrumentation function that is called in the controller's +// reconcile loop. It checks the settings of the Dash0 monitoring resource and instruments existing workloads +// accodingly. +func (i *Instrumenter) CheckSettingsAndInstrumentExistingWorkloads( + ctx context.Context, + dash0MonitoringResource *dash0v1alpha1.Dash0Monitoring, + logger *logr.Logger, +) error { + instrumentWorkloads := dash0MonitoringResource.ReadInstrumentWorkloadsSetting() + if instrumentWorkloads == dash0v1alpha1.None { + logger.Info( + "Instrumentation is not enabled, neither new nor existing workloads will be modified to send telemetry " + + "to Dash0.", + ) + return nil + } + if instrumentWorkloads == dash0v1alpha1.CreatedAndUpdated { + logger.Info( + "Instrumenting existing workloads is not enabled, only new or updated workloads will be modified (at " + + "deploy time) to send telemetry to Dash0.", + ) + return nil + } + + logger.Info("Now instrumenting existing workloads in namespace so they send telemetry to Dash0.") + if err := i.instrumentAllWorkloads(ctx, dash0MonitoringResource, logger); err != nil { + logger.Error(err, "Instrumenting existing workloads failed.") + return err + } + + return nil +} + +func (i *Instrumenter) instrumentAllWorkloads( + ctx context.Context, + dash0MonitoringResource *dash0v1alpha1.Dash0Monitoring, + logger *logr.Logger, +) error { + namespace := dash0MonitoringResource.Namespace + + errCronJobs := i.findAndInstrumentCronJobs(ctx, namespace, logger) + errDaemonSets := i.findAndInstrumentyDaemonSets(ctx, namespace, logger) + errDeployments := i.findAndInstrumentDeployments(ctx, namespace, logger) + errJobs := i.findAndAddLabelsToImmutableJobsOnInstrumentation(ctx, namespace, logger) + errReplicaSets := i.findAndInstrumentReplicaSets(ctx, namespace, logger) + errStatefulSets := i.findAndInstrumentStatefulSets(ctx, namespace, logger) + combinedErrors := errors.Join( + errCronJobs, + errDaemonSets, + errDeployments, + errJobs, + errReplicaSets, + errStatefulSets, + ) + if combinedErrors != nil { + return combinedErrors + } + return nil +} + +func (i *Instrumenter) findAndInstrumentCronJobs( + ctx context.Context, + namespace string, + logger *logr.Logger, +) error { + matchingWorkloadsInNamespace, err := + i.Clientset.BatchV1().CronJobs(namespace).List(ctx, util.EmptyListOptions) + if err != nil { + return fmt.Errorf("error when querying cron jobs: %w", err) + } + for _, resource := range matchingWorkloadsInNamespace.Items { + i.instrumentCronJob(ctx, resource, logger) + } + return nil +} + +func (i *Instrumenter) instrumentCronJob( + ctx context.Context, + cronJob batchv1.CronJob, + reconcileLogger *logr.Logger, +) { + i.instrumentWorkload(ctx, &cronJobWorkload{ + cronJob: &cronJob, + }, reconcileLogger) +} + +func (i *Instrumenter) findAndInstrumentyDaemonSets( + ctx context.Context, + namespace string, + logger *logr.Logger, +) error { + matchingWorkloadsInNamespace, err := + i.Clientset.AppsV1().DaemonSets(namespace).List(ctx, util.EmptyListOptions) + if err != nil { + return fmt.Errorf("error when querying daemon sets: %w", err) + } + for _, resource := range matchingWorkloadsInNamespace.Items { + i.instrumentDaemonSet(ctx, resource, logger) + } + return nil +} + +func (i *Instrumenter) instrumentDaemonSet( + ctx context.Context, + daemonSet appsv1.DaemonSet, + reconcileLogger *logr.Logger, +) { + i.instrumentWorkload(ctx, &daemonSetWorkload{ + daemonSet: &daemonSet, + }, reconcileLogger) +} + +func (i *Instrumenter) findAndInstrumentDeployments( + ctx context.Context, + namespace string, + logger *logr.Logger, +) error { + matchingWorkloadsInNamespace, err := + i.Clientset.AppsV1().Deployments(namespace).List(ctx, util.EmptyListOptions) + if err != nil { + return fmt.Errorf("error when querying deployments: %w", err) + } + for _, resource := range matchingWorkloadsInNamespace.Items { + i.instrumentDeployment(ctx, resource, logger) + } + return nil +} + +func (i *Instrumenter) instrumentDeployment( + ctx context.Context, + deployment appsv1.Deployment, + reconcileLogger *logr.Logger, +) { + i.instrumentWorkload(ctx, &deploymentWorkload{ + deployment: &deployment, + }, reconcileLogger) +} + +func (i *Instrumenter) findAndAddLabelsToImmutableJobsOnInstrumentation( + ctx context.Context, + namespace string, + logger *logr.Logger, +) error { + matchingWorkloadsInNamespace, err := + i.Clientset.BatchV1().Jobs(namespace).List(ctx, util.EmptyListOptions) + if err != nil { + return fmt.Errorf("error when querying jobs: %w", err) + } + + for _, job := range matchingWorkloadsInNamespace.Items { + i.handleJobJobOnInstrumentation(ctx, job, logger) + } + return nil +} + +func (i *Instrumenter) handleJobJobOnInstrumentation( + ctx context.Context, + job batchv1.Job, + reconcileLogger *logr.Logger, +) { + logger := reconcileLogger.WithValues( + workkloadTypeLabel, + "Job", + workloadNamespaceLabel, + job.GetNamespace(), + workloadNameLabel, + job.GetName(), + ) + if job.DeletionTimestamp != nil { + // do not modify resources that are being deleted + logger.Info("not instrumenting this workload since it is about to be deleted (a deletion timestamp is set)") + return + } + + objectMeta := &job.ObjectMeta + var requiredAction util.ModificationMode + modifyLabels := true + createImmutableWorkloadsError := true + if util.HasOptedOutOfInstrumenation(objectMeta) && util.InstrumenationAttemptHasFailed(objectMeta) { + // There has been an unsuccessful attempt to instrument this job before, but now the user has added the opt-out + // label, so we can remove the labels left over from that earlier attempt. + // "requiredAction = Instrumentation" in the context of immutable jobs means "remove Dash0 labels from the job", + // no other modification will take place. + requiredAction = util.ModificationModeUninstrumentation + createImmutableWorkloadsError = false + } else if util.HasOptedOutOfInstrumenation(objectMeta) && util.HasBeenInstrumentedSuccessfully(objectMeta) { + // This job has been instrumented successfully, presumably by the webhook. Since then, the opt-out label has + // been added. The correct action would be to uninstrument it, but since it is immutable, we cannot do that. + // We will not actually modify this job at all, but create a log message and a corresponding event. + modifyLabels = false + requiredAction = util.ModificationModeUninstrumentation + } else if util.HasOptedOutOfInstrumenation(objectMeta) { + // has opt-out label and there has been no previous instrumentation attempt + logger.Info("not instrumenting this workload due to dash0.com/enable=false") + return + } else if util.HasBeenInstrumentedSuccessfully(objectMeta) || util.InstrumenationAttemptHasFailed(objectMeta) { + // We already have instrumented this job (via the webhook) or have failed to instrument it, in either case, + // there is nothing to do here. + return + } else { + // We have not attempted to instrument this job yet, that is, we are seeing this job for the first time now. + // + // "requiredAction = Instrumentation" in the context of immutable jobs means "add labels to the job", no other + // modification will (or can) take place. + requiredAction = util.ModificationModeInstrumentation + } + + retryErr := util.Retry("handling immutable job", func() error { + if !modifyLabels { + return nil + } + + if err := i.Client.Get(ctx, client.ObjectKey{ + Namespace: job.GetNamespace(), + Name: job.GetName(), + }, &job); err != nil { + return fmt.Errorf("error when fetching job %s/%s: %w", job.GetNamespace(), job.GetName(), err) + } + + hasBeenModified := false + switch requiredAction { + case util.ModificationModeInstrumentation: + hasBeenModified = newWorkloadModifier(i.Images, i.OTelCollectorBaseUrl, &logger).AddLabelsToImmutableJob(&job) + case util.ModificationModeUninstrumentation: + hasBeenModified = newWorkloadModifier(i.Images, i.OTelCollectorBaseUrl, &logger).RemoveLabelsFromImmutableJob(&job) + } + + if hasBeenModified { + return i.Client.Update(ctx, &job) + } else { + return nil + } + }, &logger) + + postProcess := i.postProcessInstrumentation + if requiredAction == util.ModificationModeUninstrumentation { + postProcess = i.postProcessUninstrumentation + } + if retryErr != nil { + postProcess(&job, false, retryErr, &logger) + } else if createImmutableWorkloadsError { + // One way or another we are in a situation were we would have wanted to instrument/uninstrument the job, but + // could not. Passing an ImmutableWorkloadError to postProcess will make sure we write a corresponding log + // message and create a corresponding event. + postProcess(&job, false, ImmutableWorkloadError{ + workloadType: "job", + workloadName: fmt.Sprintf("%s/%s", job.GetNamespace(), job.GetName()), + modificationMode: requiredAction, + }, &logger) + } else { + postProcess(&job, false, nil, &logger) + } +} + +func (i *Instrumenter) findAndInstrumentReplicaSets( + ctx context.Context, + namespace string, + logger *logr.Logger, +) error { + matchingWorkloadsInNamespace, err := + i.Clientset.AppsV1().ReplicaSets(namespace).List(ctx, util.EmptyListOptions) + if err != nil { + return fmt.Errorf("error when querying replica sets: %w", err) + } + for _, resource := range matchingWorkloadsInNamespace.Items { + i.instrumentReplicaSet(ctx, resource, logger) + } + return nil +} + +func (i *Instrumenter) instrumentReplicaSet( + ctx context.Context, + replicaSet appsv1.ReplicaSet, + reconcileLogger *logr.Logger, +) { + hasBeenUpdated := i.instrumentWorkload(ctx, &replicaSetWorkload{ + replicaSet: &replicaSet, + }, reconcileLogger) + + if hasBeenUpdated { + i.restartPodsOfReplicaSet(ctx, replicaSet, reconcileLogger) + } +} + +func (i *Instrumenter) findAndInstrumentStatefulSets( + ctx context.Context, + namespace string, + logger *logr.Logger, +) error { + matchingWorkloadsInNamespace, err := i.Clientset.AppsV1().StatefulSets(namespace).List(ctx, util.EmptyListOptions) + if err != nil { + return fmt.Errorf("error when querying stateful sets: %w", err) + } + for _, resource := range matchingWorkloadsInNamespace.Items { + i.instrumentStatefulSet(ctx, resource, logger) + } + return nil +} + +func (i *Instrumenter) instrumentStatefulSet( + ctx context.Context, + statefulSet appsv1.StatefulSet, + reconcileLogger *logr.Logger, +) { + i.instrumentWorkload(ctx, &statefulSetWorkload{ + statefulSet: &statefulSet, + }, reconcileLogger) +} + +func (i *Instrumenter) instrumentWorkload( + ctx context.Context, + workload instrumentableWorkload, + reconcileLogger *logr.Logger, +) bool { + objectMeta := workload.getObjectMeta() + kind := workload.getKind() + logger := reconcileLogger.WithValues( + workkloadTypeLabel, + kind, + workloadNamespaceLabel, + objectMeta.GetNamespace(), + workloadNameLabel, + objectMeta.GetName(), + ) + if objectMeta.DeletionTimestamp != nil { + // do not modify resources that are being deleted + logger.Info("not instrumenting this workload since it is about to be deleted (a deletion timestamp is set)") + return false + } + + var requiredAction util.ModificationMode + if util.WasInstrumentedButHasOptedOutNow(objectMeta) { + requiredAction = util.ModificationModeUninstrumentation + } else if util.HasBeenInstrumentedSuccessfullyByThisVersion(objectMeta, i.Images) { + // No change necessary, this workload has already been instrumented and an opt-out label (which would need to + // trigger uninstrumentation) has not been added since it has been instrumented. + logger.Info("not updating the existing instrumentation for this workload, it has already been successfully " + + "instrumented by the same operator version") + return false + } else if util.HasOptedOutOfInstrumenationAndIsUninstrumented(workload.getObjectMeta()) { + logger.Info("not instrumenting this workload due to dash0.com/enable=false") + return false + } else { + requiredAction = util.ModificationModeInstrumentation + } + + hasBeenModified := false + retryErr := util.Retry(fmt.Sprintf("instrumenting %s", kind), func() error { + if err := i.Client.Get(ctx, client.ObjectKey{ + Namespace: objectMeta.GetNamespace(), + Name: objectMeta.GetName(), + }, workload.asClientObject()); err != nil { + return fmt.Errorf( + "error when fetching %s %s/%s: %w", + kind, + objectMeta.GetNamespace(), + objectMeta.GetName(), + err, + ) + } + + switch requiredAction { + case util.ModificationModeInstrumentation: + hasBeenModified = workload.instrument(i.Images, i.OTelCollectorBaseUrl, &logger) + case util.ModificationModeUninstrumentation: + hasBeenModified = workload.revert(i.Images, i.OTelCollectorBaseUrl, &logger) + } + + if hasBeenModified { + return i.Client.Update(ctx, workload.asClientObject()) + } else { + return nil + } + }, &logger) + + switch requiredAction { + case util.ModificationModeInstrumentation: + return i.postProcessInstrumentation(workload.asRuntimeObject(), hasBeenModified, retryErr, &logger) + case util.ModificationModeUninstrumentation: + return i.postProcessUninstrumentation(workload.asRuntimeObject(), hasBeenModified, retryErr, &logger) + } + return false +} + +func (i *Instrumenter) postProcessInstrumentation( + resource runtime.Object, + hasBeenModified bool, + retryErr error, + logger *logr.Logger, +) bool { + if retryErr != nil { + e := &ImmutableWorkloadError{} + if errors.As(retryErr, e) { + logger.Info(e.Error()) + } else { + logger.Error(retryErr, "Dash0 instrumentation by controller has not been successful.") + } + util.QueueFailedInstrumentationEvent(i.Recorder, resource, "controller", retryErr) + return false + } else if !hasBeenModified { + // TODO This also happens for replica sets owned by a deployment and the log message as well as the message on + // the event are unspecific, would be better if we could differentiate between the two cases. + // (Also for revert maybe.) + logger.Info("Dash0 instrumentation was already present on this workload, or the workload is part of a higher " + + "order workload that will be instrumented, no modification by the controller is necessary.") + util.QueueNoInstrumentationNecessaryEvent(i.Recorder, resource, "controller") + return false + } else { + logger.Info("The controller has added Dash0 instrumentation to the workload.") + util.QueueSuccessfulInstrumentationEvent(i.Recorder, resource, "controller") + return true + } +} + +// UninstrumentWorkloadsIfAvailable is the main uninstrumentation function that is called in the controller's reconcile +// loop. It checks whether the Dash0 monitoring resource is marked as available; if it is, it uninstruments existing +// workloads. +func (i *Instrumenter) UninstrumentWorkloadsIfAvailable( + ctx context.Context, + dash0MonitoringResource *dash0v1alpha1.Dash0Monitoring, + logger *logr.Logger, +) error { + if dash0MonitoringResource.IsAvailable() { + logger.Info("Reverting Dash0's modifications to workloads that have been instrumented to make them send telemetry to Dash0.") + if err := i.uninstrumentWorkloads(ctx, dash0MonitoringResource, logger); err != nil { + logger.Error(err, "Uninstrumenting existing workloads failed.") + return err + } + } else { + logger.Info("Removing the Dash0 monitoring resource and running finalizers, but Dash0 is not marked as available." + + " Dash0 Instrumentation will not be removed from workloads..") + } + return nil +} + +func (i *Instrumenter) uninstrumentWorkloads( + ctx context.Context, + dash0MonitoringResource *dash0v1alpha1.Dash0Monitoring, + logger *logr.Logger, +) error { + namespace := dash0MonitoringResource.Namespace + + errCronJobs := i.findAndUninstrumentCronJobs(ctx, namespace, logger) + errDaemonSets := i.findAndUninstrumentDaemonSets(ctx, namespace, logger) + errDeployments := i.findAndUninstrumentDeployments(ctx, namespace, logger) + errJobs := i.findAndHandleJobOnUninstrumentation(ctx, namespace, logger) + errReplicaSets := i.findAndUninstrumentReplicaSets(ctx, namespace, logger) + errStatefulSets := i.findAndUninstrumentStatefulSets(ctx, namespace, logger) + combinedErrors := errors.Join( + errCronJobs, + errDaemonSets, + errDeployments, + errJobs, + errReplicaSets, + errStatefulSets, + ) + if combinedErrors != nil { + return combinedErrors + } + return nil +} + +func (i *Instrumenter) findAndUninstrumentCronJobs( + ctx context.Context, + namespace string, + logger *logr.Logger, +) error { + matchingWorkloadsInNamespace, err := + i.Clientset.BatchV1().CronJobs(namespace).List(ctx, util.WorkloadsWithDash0InstrumentedLabelFilter) + if err != nil { + return fmt.Errorf("error when querying instrumented cron jobs: %w", err) + } + for _, resource := range matchingWorkloadsInNamespace.Items { + i.uninstrumentCronJob(ctx, resource, logger) + } + return nil +} + +func (i *Instrumenter) uninstrumentCronJob( + ctx context.Context, + cronJob batchv1.CronJob, + reconcileLogger *logr.Logger, +) { + i.revertWorkloadInstrumentation(ctx, &cronJobWorkload{ + cronJob: &cronJob, + }, reconcileLogger) +} + +func (i *Instrumenter) findAndUninstrumentDaemonSets(ctx context.Context, namespace string, logger *logr.Logger) error { + matchingWorkloadsInNamespace, err := + i.Clientset.AppsV1().DaemonSets(namespace).List(ctx, util.WorkloadsWithDash0InstrumentedLabelFilter) + if err != nil { + return fmt.Errorf("error when querying instrumented daemon sets: %w", err) + } + for _, resource := range matchingWorkloadsInNamespace.Items { + i.uninstrumentDaemonSet(ctx, resource, logger) + } + return nil +} + +func (i *Instrumenter) uninstrumentDaemonSet( + ctx context.Context, + daemonSet appsv1.DaemonSet, + reconcileLogger *logr.Logger, +) { + i.revertWorkloadInstrumentation(ctx, &daemonSetWorkload{ + daemonSet: &daemonSet, + }, reconcileLogger) +} + +func (i *Instrumenter) findAndUninstrumentDeployments( + ctx context.Context, + namespace string, + logger *logr.Logger, +) error { + matchingWorkloadsInNamespace, err := + i.Clientset.AppsV1().Deployments(namespace).List(ctx, util.WorkloadsWithDash0InstrumentedLabelFilter) + if err != nil { + return fmt.Errorf("error when querying instrumented deployments: %w", err) + } + for _, resource := range matchingWorkloadsInNamespace.Items { + i.uninstrumentDeployment(ctx, resource, logger) + } + return nil +} + +func (i *Instrumenter) uninstrumentDeployment( + ctx context.Context, + deployment appsv1.Deployment, + reconcileLogger *logr.Logger, +) { + i.revertWorkloadInstrumentation(ctx, &deploymentWorkload{ + deployment: &deployment, + }, reconcileLogger) +} + +func (i *Instrumenter) findAndHandleJobOnUninstrumentation( + ctx context.Context, + namespace string, + logger *logr.Logger, +) error { + matchingWorkloadsInNamespace, err := i.Clientset.BatchV1().Jobs(namespace).List(ctx, util.WorkloadsWithDash0InstrumentedLabelFilter) + if err != nil { + return fmt.Errorf("error when querying instrumented jobs: %w", err) + } + + for _, job := range matchingWorkloadsInNamespace.Items { + i.handleJobOnUninstrumentation(ctx, job, logger) + } + return nil +} + +func (i *Instrumenter) handleJobOnUninstrumentation(ctx context.Context, job batchv1.Job, reconcileLogger *logr.Logger) { + logger := reconcileLogger.WithValues( + workkloadTypeLabel, + "Job", + workloadNamespaceLabel, + job.GetNamespace(), + workloadNameLabel, + job.GetName(), + ) + if job.DeletionTimestamp != nil { + // do not modify resources that are being deleted + logger.Info("not uninstrumenting this workload since it is about to be deleted (a deletion timestamp is set)") + return + } + + // Note: In contrast to the instrumentation logic, there is no need to check for dash.com/enable=false here: + // If it is set, the workload would not have been instrumented in the first place, hence the label selector filter + // looking for dash0.com/instrumented=true would not have matched. Or if the workload is actually instrumented, + // although it has dash0.com/enabled=false it must have been set after the instrumentation, in which case + // uninstrumenting it is the correct thing to do. + + createImmutableWorkloadsError := false + retryErr := util.Retry("removing labels from immutable job", func() error { + if err := i.Client.Get(ctx, client.ObjectKey{ + Namespace: job.GetNamespace(), + Name: job.GetName(), + }, &job); err != nil { + return fmt.Errorf("error when fetching job %s/%s: %w", job.GetNamespace(), job.GetName(), err) + } + if util.HasBeenInstrumentedSuccessfully(&job.ObjectMeta) { + // This job has been instrumented, presumably by the webhook. We cannot undo the instrumentation here, since + // jobs are immutable. + + // Deliberately not calling newWorkloadModifier(i.Images, &logger).RemoveLabelsFromImmutableJob(&job) here + // since we cannot remove the instrumentation, so we also have to leave the labels in place. + createImmutableWorkloadsError = true + return nil + } else if util.InstrumenationAttemptHasFailed(&job.ObjectMeta) { + // There was an attempt to instrument this job (probably by the controller), which has not been successful. + // We only need remove the labels from that instrumentation attempt to clean up. + newWorkloadModifier(i.Images, i.OTelCollectorBaseUrl, &logger).RemoveLabelsFromImmutableJob(&job) + + // Apparently for jobs we do not need to set the "dash0.com/webhook-ignore-once" label, since changing their + // labels does not trigger a new admission request. + return i.Client.Update(ctx, &job) + } else { + // No dash0.com/instrumented label is present, do nothing. + return nil + } + }, &logger) + + if retryErr != nil { + // For the case that the job was instrumented, and we could not uninstrument it, we create a + // ImmutableWorkloadError inside the retry loop. This error is then handled in the postProcessUninstrumentation. + // The same is true for any other error types (for example errors in i.ClientUpdate). + i.postProcessUninstrumentation(&job, false, retryErr, &logger) + } else if createImmutableWorkloadsError { + i.postProcessUninstrumentation(&job, false, ImmutableWorkloadError{ + workloadType: "job", + workloadName: fmt.Sprintf("%s/%s", job.GetNamespace(), job.GetName()), + modificationMode: util.ModificationModeUninstrumentation, + }, &logger) + } else { + i.postProcessUninstrumentation(&job, false, nil, &logger) + } +} + +func (i *Instrumenter) findAndUninstrumentReplicaSets( + ctx context.Context, + namespace string, + logger *logr.Logger, +) error { + matchingWorkloadsInNamespace, err := + i.Clientset.AppsV1().ReplicaSets(namespace).List(ctx, util.WorkloadsWithDash0InstrumentedLabelFilter) + if err != nil { + return fmt.Errorf("error when querying instrumented replica sets: %w", err) + } + for _, resource := range matchingWorkloadsInNamespace.Items { + i.uninstrumentReplicaSet(ctx, resource, logger) + } + return nil +} + +func (i *Instrumenter) uninstrumentReplicaSet(ctx context.Context, replicaSet appsv1.ReplicaSet, reconcileLogger *logr.Logger) { + hasBeenUpdated := i.revertWorkloadInstrumentation(ctx, &replicaSetWorkload{ + replicaSet: &replicaSet, + }, reconcileLogger) + + if hasBeenUpdated { + i.restartPodsOfReplicaSet(ctx, replicaSet, reconcileLogger) + } +} + +func (i *Instrumenter) findAndUninstrumentStatefulSets( + ctx context.Context, + namespace string, + logger *logr.Logger, +) error { + matchingWorkloadsInNamespace, err := + i.Clientset.AppsV1().StatefulSets(namespace).List(ctx, util.WorkloadsWithDash0InstrumentedLabelFilter) + if err != nil { + return fmt.Errorf("error when querying instrumented stateful sets: %w", err) + } + for _, resource := range matchingWorkloadsInNamespace.Items { + i.uninstrumentStatefulSet(ctx, resource, logger) + } + return nil +} + +func (i *Instrumenter) uninstrumentStatefulSet( + ctx context.Context, + statefulSet appsv1.StatefulSet, + reconcileLogger *logr.Logger, +) { + i.revertWorkloadInstrumentation(ctx, &statefulSetWorkload{ + statefulSet: &statefulSet, + }, reconcileLogger) +} + +func (i *Instrumenter) revertWorkloadInstrumentation( + ctx context.Context, + workload instrumentableWorkload, + reconcileLogger *logr.Logger, +) bool { + objectMeta := workload.getObjectMeta() + kind := workload.getKind() + logger := reconcileLogger.WithValues( + workkloadTypeLabel, + kind, + workloadNamespaceLabel, + objectMeta.GetNamespace(), + workloadNameLabel, + objectMeta.GetName(), + ) + if objectMeta.DeletionTimestamp != nil { + // do not modify resources that are being deleted + logger.Info("not uninstrumenting this workload since it is about to be deleted (a deletion timestamp is set)") + return false + } + + // Note: In contrast to the instrumentation logic, there is no need to check for dash.com/enable=false here: + // If it is set, the workload would not have been instrumented in the first place, hence the label selector filter + // looking for dash0.com/instrumented=true would not have matched. Or if the workload is actually instrumented, + // although it has dash0.com/enabled=false it must have been set after the instrumentation, in which case + // uninstrumenting it is the correct thing to do. + + hasBeenModified := false + retryErr := util.Retry(fmt.Sprintf("uninstrumenting %s", kind), func() error { + if err := i.Client.Get(ctx, client.ObjectKey{ + Namespace: objectMeta.GetNamespace(), + Name: objectMeta.GetName(), + }, workload.asClientObject()); err != nil { + return fmt.Errorf( + "error when fetching %s %s/%s: %w", + kind, + objectMeta.GetNamespace(), + objectMeta.GetName(), + err, + ) + } + hasBeenModified = workload.revert(i.Images, i.OTelCollectorBaseUrl, &logger) + if hasBeenModified { + // Changing the workload spec sometimes triggers a new admission request, which would re-instrument the + // workload via the webhook immediately. To prevent this, we add a label that the webhook can check to + // prevent instrumentation. + util.AddWebhookIgnoreOnceLabel(objectMeta) + return i.Client.Update(ctx, workload.asClientObject()) + } else { + return nil + } + }, &logger) + + return i.postProcessUninstrumentation(workload.asRuntimeObject(), hasBeenModified, retryErr, &logger) +} + +func (i *Instrumenter) postProcessUninstrumentation( + resource runtime.Object, + hasBeenModified bool, + retryErr error, + logger *logr.Logger, +) bool { + if retryErr != nil { + e := &ImmutableWorkloadError{} + if errors.As(retryErr, e) { + logger.Info(e.Error()) + } else { + logger.Error(retryErr, "Dash0's removal of instrumentation by controller has not been successful.") + } + util.QueueFailedUninstrumentationEvent(i.Recorder, resource, "controller", retryErr) + return false + } else if !hasBeenModified { + logger.Info("Dash0 instrumentations was not present on this workload, no modification by the controller has " + + "been necessary.") + util.QueueNoUninstrumentationNecessaryEvent(i.Recorder, resource, "controller") + return false + } else { + logger.Info("The controller has removed the Dash0 instrumentation from the workload.") + util.QueueSuccessfulUninstrumentationEvent(i.Recorder, resource, "controller") + return true + } +} + +func newWorkloadModifier(images util.Images, oTelCollectorBaseUrl string, logger *logr.Logger) *workloads.ResourceModifier { + return workloads.NewResourceModifier( + util.InstrumentationMetadata{ + Images: images, + InstrumentedBy: "controller", + OTelCollectorBaseUrl: oTelCollectorBaseUrl, + }, + logger, + ) +} + +func (i *Instrumenter) restartPodsOfReplicaSet( + ctx context.Context, + replicaSet appsv1.ReplicaSet, + logger *logr.Logger, +) { + // Note: ReplicaSet pods are not restarted automatically by Kubernetes when their spec is changed (for other + // resource types like deployments or daemonsets this is managed by Kubernetes automatically). Therefore, we + // find all pods owned by the replica set and explicitly delete them to trigger a restart. + allPodsInNamespace, err := + i.Clientset. + CoreV1(). + Pods(replicaSet.Namespace). + List(ctx, metav1.ListOptions{ + TimeoutSeconds: &timeoutForListingPods, + }) + if err != nil { + logger.Error( + err, + fmt.Sprintf( + "Failed to list all pods in the namespaces for the purpose of restarting the pods owned by the "+ + "replica set %s/%s (%s), pods will not be restarted automatically.", + replicaSet.Namespace, + replicaSet.Name, + replicaSet.UID, + )) + return + } + + podsOfReplicaSet := slices.DeleteFunc(allPodsInNamespace.Items, func(pod corev1.Pod) bool { + ownerReferences := pod.GetOwnerReferences() + for _, ownerReference := range ownerReferences { + if ownerReference.Kind == "ReplicaSet" && + ownerReference.Name == replicaSet.Name && + ownerReference.UID == replicaSet.UID { + return false + } + } + return true + }) + + for _, pod := range podsOfReplicaSet { + err := i.Client.Delete(ctx, &pod) + if err != nil { + logger.Info( + fmt.Sprintf( + "Failed to restart pod owned by the replica "+ + "set %s/%s (%s), this pod will not be restarted automatically.", + replicaSet.Namespace, + replicaSet.Name, + replicaSet.UID, + )) + } + } +} diff --git a/internal/dash0/instrumentation/instrumenter_test.go b/internal/dash0/instrumentation/instrumenter_test.go new file mode 100644 index 00000000..e26d5d7e --- /dev/null +++ b/internal/dash0/instrumentation/instrumenter_test.go @@ -0,0 +1,198 @@ +// SPDX-FileCopyrightText: Copyright 2024 Dash0 Inc. +// SPDX-License-Identifier: Apache-2.0 + +package instrumentation + +import ( + "context" + "fmt" + + appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + . "github.com/dash0hq/dash0-operator/test/util" +) + +const ( + olderOperatorControllerImageLabel = "some-registry_com_1234_dash0hq_operator-controller_0.9.8" + olderInitContainerImageLabel = "some-registry_com_1234_dash0hq_instrumentation_2.3.4" +) + +var ( + namespace = TestNamespaceName +) + +var _ = Describe("The instrumenter", Ordered, func() { + ctx := context.Background() + logger := log.FromContext(ctx) + var createdObjects []client.Object + + var instrumenter *Instrumenter + + BeforeAll(func() { + EnsureTestNamespaceExists(ctx, k8sClient) + EnsureDash0OperatorNamespaceExists(ctx, k8sClient) + }) + + BeforeEach(func() { + EnsureDash0MonitoringResourceExists(ctx, k8sClient) + + createdObjects = make([]client.Object, 0) + + instrumenter = &Instrumenter{ + Client: k8sClient, + Clientset: clientset, + Recorder: recorder, + Images: TestImages, + OTelCollectorBaseUrl: OTelCollectorBaseUrlTest, + } + }) + + AfterEach(func() { + createdObjects = DeleteAllCreatedObjects(ctx, k8sClient, createdObjects) + DeleteAllEvents(ctx, clientset, namespace) + + RemoveDash0MonitoringResource(ctx, k8sClient) + }) + + DescribeTable("should instrument existing workloads at startup", func(config WorkloadTestConfig) { + name := UniqueName(config.WorkloadNamePrefix) + workload := config.CreateFn(ctx, k8sClient, namespace, name) + createdObjects = append(createdObjects, workload.Get()) + + instrumenter.InstrumentAtStartup(ctx, k8sClient, &logger) + + VerifySuccessfulInstrumentationEvent(ctx, clientset, namespace, name, "controller") + config.VerifyFn(config.GetFn(ctx, k8sClient, namespace, name)) + }, Entry("should instrument a cron job at startup", WorkloadTestConfig{ + WorkloadNamePrefix: CronJobNamePrefix, + CreateFn: WrapCronJobFnAsTestableWorkload(CreateBasicCronJob), + GetFn: WrapCronJobFnAsTestableWorkload(GetCronJob), + VerifyFn: func(workload TestableWorkload) { + VerifyModifiedCronJob(workload.Get().(*batchv1.CronJob), BasicInstrumentedPodSpecExpectations()) + }, + }), Entry("should instrument a daemon set at startup", WorkloadTestConfig{ + WorkloadNamePrefix: DaemonSetNamePrefix, + CreateFn: WrapDaemonSetFnAsTestableWorkload(CreateBasicDaemonSet), + GetFn: WrapDaemonSetFnAsTestableWorkload(GetDaemonSet), + VerifyFn: func(workload TestableWorkload) { + VerifyModifiedDaemonSet(workload.Get().(*appsv1.DaemonSet), BasicInstrumentedPodSpecExpectations()) + }, + }), Entry("should instrument a deployment at startup", WorkloadTestConfig{ + WorkloadNamePrefix: DeploymentNamePrefix, + CreateFn: WrapDeploymentFnAsTestableWorkload(CreateBasicDeployment), + GetFn: WrapDeploymentFnAsTestableWorkload(GetDeployment), + VerifyFn: func(workload TestableWorkload) { + VerifyModifiedDeployment(workload.Get().(*appsv1.Deployment), BasicInstrumentedPodSpecExpectations()) + }, + }), Entry("should instrument a replica set at startup", WorkloadTestConfig{ + WorkloadNamePrefix: ReplicaSetNamePrefix, + CreateFn: WrapReplicaSetFnAsTestableWorkload(CreateBasicReplicaSet), + GetFn: WrapReplicaSetFnAsTestableWorkload(GetReplicaSet), + VerifyFn: func(workload TestableWorkload) { + VerifyModifiedReplicaSet(workload.Get().(*appsv1.ReplicaSet), BasicInstrumentedPodSpecExpectations()) + }, + }), Entry("should instrument a stateful set at startup", WorkloadTestConfig{ + WorkloadNamePrefix: StatefulSetNamePrefix, + CreateFn: WrapStatefulSetFnAsTestableWorkload(CreateBasicStatefulSet), + GetFn: WrapStatefulSetFnAsTestableWorkload(GetStatefulSet), + VerifyFn: func(workload TestableWorkload) { + VerifyModifiedStatefulSet(workload.Get().(*appsv1.StatefulSet), BasicInstrumentedPodSpecExpectations()) + }, + }), + ) + + Describe("should not instrument existing jobs at startup", func() { + It("should record a failure event when attempting to instrument an existing job at startup and add labels", func() { + name := UniqueName(JobNamePrefix) + job := CreateBasicJob(ctx, k8sClient, namespace, name) + createdObjects = append(createdObjects, job) + + instrumenter.InstrumentAtStartup(ctx, k8sClient, &logger) + + VerifyFailedInstrumentationEvent( + ctx, + clientset, + namespace, + name, + fmt.Sprintf("Dash0 instrumentation of this workload by the controller has not been successful. "+ + "Error message: Dash0 cannot instrument the existing job test-namespace/%s, since this type "+ + "of workload is immutable.", name), + ) + VerifyImmutableJobCouldNotBeModified(GetJob(ctx, k8sClient, namespace, name)) + }) + }) + + DescribeTable("when updating instrumented workloads at startup", func(config WorkloadTestConfig) { + name := UniqueName(config.WorkloadNamePrefix) + workload := config.CreateFn(ctx, k8sClient, TestNamespaceName, name) + createdObjects = append(createdObjects, workload.Get()) + workload.GetObjectMeta().Labels["dash0.com/operator-image"] = olderOperatorControllerImageLabel + workload.GetObjectMeta().Labels["dash0.com/init-container-image"] = olderInitContainerImageLabel + UpdateWorkload(ctx, k8sClient, workload.Get()) + instrumenter.InstrumentAtStartup(ctx, k8sClient, &logger) + config.VerifyFn(config.GetFn(ctx, k8sClient, TestNamespaceName, name)) + VerifySuccessfulInstrumentationEvent(ctx, clientset, namespace, name, "controller") + }, Entry("should override outdated instrumentation settings for a cron job at startup", WorkloadTestConfig{ + WorkloadNamePrefix: CronJobNamePrefix, + CreateFn: WrapCronJobFnAsTestableWorkload(CreateInstrumentedCronJob), + GetFn: WrapCronJobFnAsTestableWorkload(GetCronJob), + VerifyFn: func(workload TestableWorkload) { + VerifyModifiedCronJob(workload.Get().(*batchv1.CronJob), BasicInstrumentedPodSpecExpectations()) + }, + }), Entry("should override outdated instrumentation settings for a daemon set at startup", WorkloadTestConfig{ + WorkloadNamePrefix: DaemonSetNamePrefix, + CreateFn: WrapDaemonSetFnAsTestableWorkload(CreateInstrumentedDaemonSet), + GetFn: WrapDaemonSetFnAsTestableWorkload(GetDaemonSet), + VerifyFn: func(workload TestableWorkload) { + VerifyModifiedDaemonSet(workload.Get().(*appsv1.DaemonSet), BasicInstrumentedPodSpecExpectations()) + }, + }), Entry("should override outdated instrumentation settings for a deployment at startup", WorkloadTestConfig{ + WorkloadNamePrefix: DeploymentNamePrefix, + CreateFn: WrapDeploymentFnAsTestableWorkload(CreateInstrumentedDeployment), + GetFn: WrapDeploymentFnAsTestableWorkload(GetDeployment), + VerifyFn: func(workload TestableWorkload) { + VerifyModifiedDeployment(workload.Get().(*appsv1.Deployment), BasicInstrumentedPodSpecExpectations()) + }, + }), Entry("should override outdated instrumentation settings for a replica set at startup", WorkloadTestConfig{ + WorkloadNamePrefix: ReplicaSetNamePrefix, + CreateFn: WrapReplicaSetFnAsTestableWorkload(CreateInstrumentedReplicaSet), + GetFn: WrapReplicaSetFnAsTestableWorkload(GetReplicaSet), + VerifyFn: func(workload TestableWorkload) { + VerifyModifiedReplicaSet(workload.Get().(*appsv1.ReplicaSet), BasicInstrumentedPodSpecExpectations()) + }, + }), Entry("should override outdated instrumentation settings for a stateful set at startup", WorkloadTestConfig{ + WorkloadNamePrefix: StatefulSetNamePrefix, + CreateFn: WrapStatefulSetFnAsTestableWorkload(CreateInstrumentedStatefulSet), + GetFn: WrapStatefulSetFnAsTestableWorkload(GetStatefulSet), + VerifyFn: func(workload TestableWorkload) { + VerifyModifiedStatefulSet(workload.Get().(*appsv1.StatefulSet), BasicInstrumentedPodSpecExpectations()) + }, + }), + ) + + Describe("when attempting to update instrumented jobs at startup", func() { + It("should not override outdated instrumentation settings for a job at startup", func() { + name := UniqueName(JobNamePrefix) + workload := CreateInstrumentedJob(ctx, k8sClient, TestNamespaceName, name) + createdObjects = append(createdObjects, workload) + workload.ObjectMeta.Labels["dash0.com/operator-image"] = "some-registry.com_1234_dash0hq_operator-controller_0.9.8" + workload.ObjectMeta.Labels["dash0.com/init-container-image"] = "some-registry.com_1234_dash0hq_instrumentation_2.3.4" + UpdateWorkload(ctx, k8sClient, workload) + instrumenter.InstrumentAtStartup(ctx, k8sClient, &logger) + + // we do not attempt to update the instrumentation for jobs, since they are immutable + workload = GetJob(ctx, k8sClient, TestNamespaceName, name) + jobLabels := workload.ObjectMeta.Labels + Expect(jobLabels["dash0.com/instrumented"]).To(Equal("true")) + Expect(jobLabels["dash0.com/operator-image"]).To(Equal("some-registry.com_1234_dash0hq_operator-controller_0.9.8")) + Expect(jobLabels["dash0.com/init-container-image"]).To(Equal("some-registry.com_1234_dash0hq_instrumentation_2.3.4")) + VerifyNoEvents(ctx, clientset, namespace) + }) + }) +}) diff --git a/internal/dash0/removal/removal_suite_test.go b/internal/dash0/removal/removal_suite_test.go index cae20f53..57a00d56 100644 --- a/internal/dash0/removal/removal_suite_test.go +++ b/internal/dash0/removal/removal_suite_test.go @@ -1,3 +1,6 @@ +// SPDX-FileCopyrightText: Copyright 2024 Dash0 Inc. +// SPDX-License-Identifier: Apache-2.0 + package removal import ( @@ -17,14 +20,16 @@ import ( logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log/zap" - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - "github.com/onsi/gomega/format" - dash0v1alpha1 "github.com/dash0hq/dash0-operator/api/dash0monitoring/v1alpha1" "github.com/dash0hq/dash0-operator/internal/backendconnection" "github.com/dash0hq/dash0-operator/internal/backendconnection/otelcolresources" "github.com/dash0hq/dash0-operator/internal/dash0/controller" + "github.com/dash0hq/dash0-operator/internal/dash0/instrumentation" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/onsi/gomega/format" + . "github.com/dash0hq/dash0-operator/test/util" ) @@ -91,6 +96,13 @@ var _ = BeforeSuite(func() { Expect(err).NotTo(HaveOccurred()) Expect(mgr).NotTo(BeNil()) + instrumenter := &instrumentation.Instrumenter{ + Client: k8sClient, + Clientset: clientset, + Recorder: mgr.GetEventRecorderFor("dash0-controller"), + Images: TestImages, + OTelCollectorBaseUrl: OTelCollectorBaseUrlTest, + } oTelColResourceManager := &otelcolresources.OTelColResourceManager{ Client: k8sClient, Scheme: k8sClient.Scheme(), @@ -105,10 +117,8 @@ var _ = BeforeSuite(func() { reconciler = &controller.Dash0Reconciler{ Client: k8sClient, Clientset: clientset, - Recorder: mgr.GetEventRecorderFor("dash0-controller"), - Scheme: k8sClient.Scheme(), Images: TestImages, - OTelCollectorBaseUrl: "http://dash0-operator-opentelemetry-collector.dash0-system.svc.cluster.local:4318", + Instrumenter: instrumenter, OperatorNamespace: Dash0OperatorNamespace, BackendConnectionManager: backendConnectionManager, DanglingEventsTimeouts: &controller.DanglingEventsTimeouts{ diff --git a/internal/dash0/controller/controller_util.go b/internal/dash0/util/controller.go similarity index 95% rename from internal/dash0/controller/controller_util.go rename to internal/dash0/util/controller.go index 152e9290..62a7e630 100644 --- a/internal/dash0/controller/controller_util.go +++ b/internal/dash0/util/controller.go @@ -1,7 +1,7 @@ // SPDX-FileCopyrightText: Copyright 2024 Dash0 Inc. // SPDX-License-Identifier: Apache-2.0 -package controller +package util import ( "context" @@ -20,9 +20,9 @@ import ( dash0v1alpha1 "github.com/dash0hq/dash0-operator/api/dash0monitoring/v1alpha1" ) -// checkIfNamespaceExists checks if the given namespace (which is supposed to be the namespace from a reconcile request) +// CheckIfNamespaceExists checks if the given namespace (which is supposed to be the namespace from a reconcile request) // exists in the cluster. If the namespace does not exist, it returns false, and this is supposed to stop the reconcile -func checkIfNamespaceExists( +func CheckIfNamespaceExists( ctx context.Context, clientset *kubernetes.Clientset, namespace string, @@ -40,7 +40,7 @@ func checkIfNamespaceExists( return true, nil } -// verifyUniqueDash0MonitoringResourceExists loads the resource that the current reconcile request applies to, if it +// VerifyUniqueDash0MonitoringResourceExists loads the resource that the current reconcile request applies to, if it // exists. It also checks whether there is only one such resource (or, if there are multiple, if the currently // reconciled one is the most recently created one). The bool returned has the meaning "stop the reconcile request", // that is, if the function returns true, it expects the caller to stop the reconcile request immediately and not @@ -56,19 +56,18 @@ func checkIfNamespaceExists( // stopReconcile and the caller is expected to stop the reconcile and not requeue it. // - If any error is encountered when searching for resources etc., that error will be returned, the caller is // expected to ignore the bool result and requeue the reconcile request. -func verifyUniqueDash0MonitoringResourceExists( +func VerifyUniqueDash0MonitoringResourceExists( ctx context.Context, k8sClient client.Client, - statusWriter client.SubResourceWriter, updateStatusFailedMessage string, req ctrl.Request, - logger logr.Logger, + logger *logr.Logger, ) (*dash0v1alpha1.Dash0Monitoring, bool, error) { dash0MonitoringResource, stopReconcile, err := verifyThatCustomResourceExists( ctx, k8sClient, req, - &logger, + logger, ) if err != nil || stopReconcile { return nil, stopReconcile, err @@ -77,11 +76,10 @@ func verifyUniqueDash0MonitoringResourceExists( verifyThatCustomResourceIsUniqe( ctx, k8sClient, - statusWriter, req, dash0MonitoringResource, updateStatusFailedMessage, - &logger, + logger, ) return dash0MonitoringResource, stopReconcile, err } @@ -125,7 +123,6 @@ func verifyThatCustomResourceExists( func verifyThatCustomResourceIsUniqe( ctx context.Context, k8sClient client.Client, - statusWriter client.SubResourceWriter, req ctrl.Request, dash0MonitoringResource *dash0v1alpha1.Dash0Monitoring, updateStatusFailedMessage string, @@ -175,7 +172,7 @@ func verifyThatCustomResourceIsUniqe( "NewerResourceIsPresent", "There is a more recently created Dash0 monitoring resource in this namespace, please remove all but one resource instance.", ) - if err := statusWriter.Update(ctx, dash0MonitoringResource); err != nil { + if err := k8sClient.Status().Update(ctx, dash0MonitoringResource); err != nil { logger.Error(err, updateStatusFailedMessage) return true, err } @@ -200,7 +197,7 @@ func (s SortByCreationTimestamp) Less(i, j int) bool { return tsi.Before(&tsj) } -func initStatusConditions( +func InitStatusConditions( ctx context.Context, statusWriter client.SubResourceWriter, dash0MonitoringResource *dash0v1alpha1.Dash0Monitoring, @@ -244,7 +241,7 @@ func updateResourceStatus( return nil } -func checkImminentDeletionAndHandleFinalizers( +func CheckImminentDeletionAndHandleFinalizers( ctx context.Context, k8sClient client.Client, dash0MonitoringResource *dash0v1alpha1.Dash0Monitoring, diff --git a/internal/dash0/util/types.go b/internal/dash0/util/types.go index addafdc1..d074fa90 100644 --- a/internal/dash0/util/types.go +++ b/internal/dash0/util/types.go @@ -42,3 +42,10 @@ type InstrumentationMetadata struct { OTelCollectorBaseUrl string InstrumentedBy string } + +type ModificationMode string + +const ( + ModificationModeInstrumentation ModificationMode = "instrumentation" + ModificationModeUninstrumentation ModificationMode = "uninstrumentation" +) diff --git a/internal/dash0/util/util_suite_test.go b/internal/dash0/util/util_suite_test.go index 6d6903e9..f0036272 100644 --- a/internal/dash0/util/util_suite_test.go +++ b/internal/dash0/util/util_suite_test.go @@ -1,4 +1,7 @@ -package util_test +// SPDX-FileCopyrightText: Copyright 2024 Dash0 Inc. +// SPDX-License-Identifier: Apache-2.0 + +package util import ( "testing" diff --git a/internal/dash0/webhook/attach_dangling_events_test.go b/internal/dash0/webhook/attach_dangling_events_test.go index a2e8d13b..4614f16e 100644 --- a/internal/dash0/webhook/attach_dangling_events_test.go +++ b/internal/dash0/webhook/attach_dangling_events_test.go @@ -20,6 +20,7 @@ import ( "github.com/dash0hq/dash0-operator/internal/backendconnection" "github.com/dash0hq/dash0-operator/internal/backendconnection/otelcolresources" "github.com/dash0hq/dash0-operator/internal/dash0/controller" + "github.com/dash0hq/dash0-operator/internal/dash0/instrumentation" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -35,6 +36,14 @@ var _ = Describe("The Dash0 webhook and the Dash0 controller", Ordered, func() { BeforeAll(func() { EnsureDash0OperatorNamespaceExists(ctx, k8sClient) + recorder := manager.GetEventRecorderFor("dash0-controller") + instrumenter := &instrumentation.Instrumenter{ + Client: k8sClient, + Clientset: clientset, + Recorder: recorder, + Images: TestImages, + OTelCollectorBaseUrl: OTelCollectorBaseUrlTest, + } oTelColResourceManager := &otelcolresources.OTelColResourceManager{ Client: k8sClient, Scheme: k8sClient.Scheme(), @@ -46,15 +55,11 @@ var _ = Describe("The Dash0 webhook and the Dash0 controller", Ordered, func() { Clientset: clientset, OTelColResourceManager: oTelColResourceManager, } - recorder := manager.GetEventRecorderFor("dash0-controller") - reconciler = &controller.Dash0Reconciler{ Client: k8sClient, Clientset: clientset, - Recorder: recorder, - Scheme: k8sClient.Scheme(), + Instrumenter: instrumenter, Images: TestImages, - OTelCollectorBaseUrl: "http://dash0-operator-opentelemetry-collector.dash0-system.svc.cluster.local:4318", OperatorNamespace: Dash0OperatorNamespace, BackendConnectionManager: backendConnectionManager, DanglingEventsTimeouts: &controller.DanglingEventsTimeouts{ diff --git a/internal/dash0/webhook/dash0_webhook_test.go b/internal/dash0/webhook/dash0_webhook_test.go index 75471fa6..fae8ef09 100644 --- a/internal/dash0/webhook/dash0_webhook_test.go +++ b/internal/dash0/webhook/dash0_webhook_test.go @@ -199,7 +199,7 @@ var _ = Describe("The Dash0 webhook", func() { EnvVars: 3, NodeOptionsEnvVarIdx: 1, Dash0CollectorBaseUrlEnvVarIdx: 2, - Dash0CollectorBaseUrlEnvVarExpectedValue: "http://dash0-operator-opentelemetry-collector.dash0-system.svc.cluster.local:4318", + Dash0CollectorBaseUrlEnvVarExpectedValue: OTelCollectorBaseUrlTest, }, { VolumeMounts: 3, @@ -207,7 +207,7 @@ var _ = Describe("The Dash0 webhook", func() { EnvVars: 4, NodeOptionsEnvVarIdx: 2, Dash0CollectorBaseUrlEnvVarIdx: 3, - Dash0CollectorBaseUrlEnvVarExpectedValue: "http://dash0-operator-opentelemetry-collector.dash0-system.svc.cluster.local:4318", + Dash0CollectorBaseUrlEnvVarExpectedValue: OTelCollectorBaseUrlTest, }, }, }) @@ -236,7 +236,7 @@ var _ = Describe("The Dash0 webhook", func() { NodeOptionsEnvVarIdx: 1, NodeOptionsUsesValueFrom: true, Dash0CollectorBaseUrlEnvVarIdx: 2, - Dash0CollectorBaseUrlEnvVarExpectedValue: "http://dash0-operator-opentelemetry-collector.dash0-system.svc.cluster.local:4318", + Dash0CollectorBaseUrlEnvVarExpectedValue: OTelCollectorBaseUrlTest, }, { VolumeMounts: 3, @@ -245,7 +245,7 @@ var _ = Describe("The Dash0 webhook", func() { NodeOptionsEnvVarIdx: 1, NodeOptionsValue: "--require /__dash0__/instrumentation/node.js/node_modules/@dash0hq/opentelemetry --require something-else --experimental-modules", Dash0CollectorBaseUrlEnvVarIdx: 0, - Dash0CollectorBaseUrlEnvVarExpectedValue: "http://dash0-operator-opentelemetry-collector.dash0-system.svc.cluster.local:4318", + Dash0CollectorBaseUrlEnvVarExpectedValue: OTelCollectorBaseUrlTest, }, }, }) diff --git a/internal/dash0/webhook/webhook_suite_test.go b/internal/dash0/webhook/webhook_suite_test.go index 9f35233d..cb775186 100644 --- a/internal/dash0/webhook/webhook_suite_test.go +++ b/internal/dash0/webhook/webhook_suite_test.go @@ -121,7 +121,7 @@ var _ = BeforeSuite(func() { Client: k8sClient, Recorder: manager.GetEventRecorderFor("dash0-webhook"), Images: TestImages, - OTelCollectorBaseUrl: "http://dash0-operator-opentelemetry-collector.dash0-system.svc.cluster.local:4318", + OTelCollectorBaseUrl: OTelCollectorBaseUrlTest, }).SetupWebhookWithManager(manager) Expect(err).NotTo(HaveOccurred()) diff --git a/internal/dash0/workloads/workload_modifier_test.go b/internal/dash0/workloads/workload_modifier_test.go index 404b9947..7475b1c1 100644 --- a/internal/dash0/workloads/workload_modifier_test.go +++ b/internal/dash0/workloads/workload_modifier_test.go @@ -24,7 +24,7 @@ import ( var ( instrumentationMetadata = util.InstrumentationMetadata{ Images: TestImages, - OTelCollectorBaseUrl: "http://dash0-operator-opentelemetry-collector.dash0-system.svc.cluster.local:4318", + OTelCollectorBaseUrl: OTelCollectorBaseUrlTest, InstrumentedBy: "modify_test", } ) @@ -77,7 +77,7 @@ var _ = Describe("Dash0 Workload Modification", func() { EnvVars: 3, NodeOptionsEnvVarIdx: 1, Dash0CollectorBaseUrlEnvVarIdx: 2, - Dash0CollectorBaseUrlEnvVarExpectedValue: "http://dash0-operator-opentelemetry-collector.dash0-system.svc.cluster.local:4318", + Dash0CollectorBaseUrlEnvVarExpectedValue: OTelCollectorBaseUrlTest, }, { VolumeMounts: 3, @@ -85,7 +85,7 @@ var _ = Describe("Dash0 Workload Modification", func() { EnvVars: 4, NodeOptionsEnvVarIdx: 2, Dash0CollectorBaseUrlEnvVarIdx: 3, - Dash0CollectorBaseUrlEnvVarExpectedValue: "http://dash0-operator-opentelemetry-collector.dash0-system.svc.cluster.local:4318", + Dash0CollectorBaseUrlEnvVarExpectedValue: OTelCollectorBaseUrlTest, }, }, }) @@ -109,7 +109,7 @@ var _ = Describe("Dash0 Workload Modification", func() { NodeOptionsEnvVarIdx: 1, NodeOptionsUsesValueFrom: true, Dash0CollectorBaseUrlEnvVarIdx: 2, - Dash0CollectorBaseUrlEnvVarExpectedValue: "http://dash0-operator-opentelemetry-collector.dash0-system.svc.cluster.local:4318", + Dash0CollectorBaseUrlEnvVarExpectedValue: OTelCollectorBaseUrlTest, }, { VolumeMounts: 3, @@ -118,7 +118,7 @@ var _ = Describe("Dash0 Workload Modification", func() { NodeOptionsEnvVarIdx: 1, NodeOptionsValue: "--require /__dash0__/instrumentation/node.js/node_modules/@dash0hq/opentelemetry --require something-else --experimental-modules", Dash0CollectorBaseUrlEnvVarIdx: 0, - Dash0CollectorBaseUrlEnvVarExpectedValue: "http://dash0-operator-opentelemetry-collector.dash0-system.svc.cluster.local:4318", + Dash0CollectorBaseUrlEnvVarExpectedValue: OTelCollectorBaseUrlTest, }, }, }) diff --git a/internal/dash0/workloads/workloads_suite_test.go b/internal/dash0/workloads/workloads_suite_test.go index 2a36c850..ee21460d 100644 --- a/internal/dash0/workloads/workloads_suite_test.go +++ b/internal/dash0/workloads/workloads_suite_test.go @@ -1,4 +1,7 @@ -package workloads_test +// SPDX-FileCopyrightText: Copyright 2024 Dash0 Inc. +// SPDX-License-Identifier: Apache-2.0 + +package workloads import ( "testing" diff --git a/test/e2e/run_command.go b/test/e2e/run_command.go index ebaeeb77..dc457939 100644 --- a/test/e2e/run_command.go +++ b/test/e2e/run_command.go @@ -1,3 +1,6 @@ +// SPDX-FileCopyrightText: Copyright 2024 Dash0 Inc. +// SPDX-License-Identifier: Apache-2.0 + package e2e import ( diff --git a/test/util/constants.go b/test/util/constants.go index c9dccc8e..b04a204a 100644 --- a/test/util/constants.go +++ b/test/util/constants.go @@ -28,10 +28,11 @@ const ( ConfigurationReloaderImageTest = "some-registry.com:1234/dash0hq/configuration-reloader:10.11.12" FilelogOffsetSynchImageTest = "some-registry.com:1234/dash0hq/filelog-offset-synch:13.14.15" - IngressEndpointTest = "ingress.endpoint.dash0.com:4317" - AuthorizationTokenTest = "authorization-token" - SecretRefTest = "secret-ref" - SecretRefEmpty = "" + OTelCollectorBaseUrlTest = "http://dash0-operator-opentelemetry-collector.dash0-system.svc.cluster.local:4318" + IngressEndpointTest = "ingress.endpoint.dash0.com:4317" + AuthorizationTokenTest = "authorization-token" + SecretRefTest = "secret-ref" + SecretRefEmpty = "" ) var ( diff --git a/test/util/resources.go b/test/util/resources.go index a3d37a72..62642525 100644 --- a/test/util/resources.go +++ b/test/util/resources.go @@ -819,7 +819,7 @@ func InstrumentedDeploymentWithMoreBellsAndWhistles(namespace string, name strin }, { Name: "DASH0_OTEL_COLLECTOR_BASE_URL", - Value: "http://dash0-operator-opentelemetry-collector.dash0-system.svc.cluster.local:4318", + Value: OTelCollectorBaseUrlTest, }, }, }, @@ -855,7 +855,7 @@ func InstrumentedDeploymentWithMoreBellsAndWhistles(namespace string, name strin }, { Name: "DASH0_OTEL_COLLECTOR_BASE_URL", - Value: "http://dash0-operator-opentelemetry-collector.dash0-system.svc.cluster.local:4318", + Value: OTelCollectorBaseUrlTest, }, }, }, @@ -894,7 +894,7 @@ func simulateInstrumentedPodSpec(podSpec *corev1.PodSpec, meta *metav1.ObjectMet }, { Name: "DASH0_OTEL_COLLECTOR_BASE_URL", - Value: "http://dash0-operator-opentelemetry-collector.dash0-system.svc.cluster.local:4318", + Value: OTelCollectorBaseUrlTest, }, } diff --git a/test/util/verification.go b/test/util/verification.go index 5804daef..f57515e4 100644 --- a/test/util/verification.go +++ b/test/util/verification.go @@ -55,7 +55,7 @@ func BasicInstrumentedPodSpecExpectations() PodSpecExpectations { NodeOptionsEnvVarIdx: 0, Dash0CollectorBaseUrlEnvVarIdx: 1, Dash0CollectorBaseUrlEnvVarExpectedValue:// - "http://dash0-operator-opentelemetry-collector.dash0-system.svc.cluster.local:4318", + OTelCollectorBaseUrlTest, }}, } }