Skip to content

Commit

Permalink
feat(self-monitoring): add deployment uid to manager resource attributes
Browse files Browse the repository at this point in the history
  • Loading branch information
basti1302 committed Sep 13, 2024
1 parent ceec292 commit 37c1bcb
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 86 deletions.
154 changes: 78 additions & 76 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
_ "k8s.io/client-go/plugin/pkg/client/auth"

"github.com/go-logr/logr"
semconv "go.opentelemetry.io/collector/semconv/v1.27.0"
otelmetric "go.opentelemetry.io/otel/metric"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -86,8 +87,11 @@ var (
scheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")

metricNamePrefix = fmt.Sprintf("%s.", meterName)
startupTasksK8sClient client.Client
deploymentSelfReference *appsv1.Deployment
envVars environmentVariables

metricNamePrefix = fmt.Sprintf("%s.", meterName)
meter otelmetric.Meter
otelShutdownFunctions []func(ctx context.Context) error
)
Expand Down Expand Up @@ -167,9 +171,32 @@ func main() {
TLSOpts: tlsOpts,
})

meter, otelShutdownFunctions = common.InitOTelSdk(ctx, meterName)
var err error
if err = readEnvironmentVariables(); err != nil {
os.Exit(1)
}
if err = initStartupTasksK8sClient(&setupLog); err != nil {
os.Exit(1)
}
if err = findDeploymentSelfReference(
ctx,
startupTasksK8sClient,
envVars.operatorNamespace,
envVars.deploymentName,
&setupLog,
); err != nil {
setupLog.Error(err, "The Dash0 operator manager process to lookup its own deployment.")
os.Exit(1)
}

if err := startOperatorManager(
meter, otelShutdownFunctions =
common.InitOTelSdk(
ctx,
meterName,
map[string]string{semconv.AttributeK8SDeploymentUID: string(deploymentSelfReference.UID)},
)

if err = startOperatorManager(
ctx,
metricsAddr,
secureMetrics,
Expand Down Expand Up @@ -226,11 +253,6 @@ func startOperatorManager(
return fmt.Errorf("unable to create the clientset client")
}

envVars, err := readEnvironmentVariables()
if err != nil {
return err
}

setupLog.Info(
"configuration:",

Expand Down Expand Up @@ -263,7 +285,7 @@ func startOperatorManager(
developmentMode,
)

err = startDash0Controllers(ctx, mgr, clientset, envVars, developmentMode)
err = startDash0Controllers(ctx, mgr, clientset, developmentMode)
if err != nil {
return err
}
Expand All @@ -288,55 +310,55 @@ func startOperatorManager(
return nil
}

func readEnvironmentVariables() (*environmentVariables, error) {
func readEnvironmentVariables() error {
operatorNamespace, isSet := os.LookupEnv(operatorNamespaceEnvVarName)
if !isSet {
return nil, fmt.Errorf(mandatoryEnvVarMissingMessageTemplate, operatorNamespaceEnvVarName)
return fmt.Errorf(mandatoryEnvVarMissingMessageTemplate, operatorNamespaceEnvVarName)
}

deploymentName, isSet := os.LookupEnv(deploymentNameEnvVarName)
if !isSet {
return nil, fmt.Errorf(mandatoryEnvVarMissingMessageTemplate, deploymentNameEnvVarName)
return fmt.Errorf(mandatoryEnvVarMissingMessageTemplate, deploymentNameEnvVarName)
}

oTelCollectorNamePrefix, isSet := os.LookupEnv(oTelCollectorNamePrefixEnvVarName)
if !isSet {
return nil, fmt.Errorf(mandatoryEnvVarMissingMessageTemplate, oTelCollectorNamePrefixEnvVarName)
return fmt.Errorf(mandatoryEnvVarMissingMessageTemplate, oTelCollectorNamePrefixEnvVarName)
}

operatorImage, isSet := os.LookupEnv(operatorImageEnvVarName)
if !isSet {
return nil, fmt.Errorf(mandatoryEnvVarMissingMessageTemplate, operatorImageEnvVarName)
return fmt.Errorf(mandatoryEnvVarMissingMessageTemplate, operatorImageEnvVarName)
}

initContainerImage, isSet := os.LookupEnv(initContainerImageEnvVarName)
if !isSet {
return nil, fmt.Errorf(mandatoryEnvVarMissingMessageTemplate, initContainerImageEnvVarName)
return fmt.Errorf(mandatoryEnvVarMissingMessageTemplate, initContainerImageEnvVarName)
}
initContainerImagePullPolicy :=
readOptionalPullPolicyFromEnvironmentVariable(initContainerImagePullPolicyEnvVarName)

collectorImage, isSet := os.LookupEnv(collectorImageEnvVarName)
if !isSet {
return nil, fmt.Errorf(mandatoryEnvVarMissingMessageTemplate, collectorImageEnvVarName)
return fmt.Errorf(mandatoryEnvVarMissingMessageTemplate, collectorImageEnvVarName)
}
collectorImagePullPolicy := readOptionalPullPolicyFromEnvironmentVariable(collectorImageImagePullPolicyEnvVarName)

configurationReloaderImage, isSet := os.LookupEnv(configurationReloaderImageEnvVarName)
if !isSet {
return nil, fmt.Errorf(mandatoryEnvVarMissingMessageTemplate, configurationReloaderImageEnvVarName)
return fmt.Errorf(mandatoryEnvVarMissingMessageTemplate, configurationReloaderImageEnvVarName)
}
configurationReloaderImagePullPolicy :=
readOptionalPullPolicyFromEnvironmentVariable(configurationReloaderImagePullPolicyEnvVarName)

filelogOffsetSynchImage, isSet := os.LookupEnv(filelogOffsetSynchImageEnvVarName)
if !isSet {
return nil, fmt.Errorf(mandatoryEnvVarMissingMessageTemplate, filelogOffsetSynchImageEnvVarName)
return fmt.Errorf(mandatoryEnvVarMissingMessageTemplate, filelogOffsetSynchImageEnvVarName)
}
filelogOffsetSynchImagePullPolicy :=
readOptionalPullPolicyFromEnvironmentVariable(filelogOffsetSynchImagePullPolicyEnvVarName)

return &environmentVariables{
envVars = environmentVariables{
operatorNamespace: operatorNamespace,
deploymentName: deploymentName,
oTelCollectorNamePrefix: oTelCollectorNamePrefix,
Expand All @@ -349,7 +371,9 @@ func readEnvironmentVariables() (*environmentVariables, error) {
configurationReloaderImagePullPolicy: configurationReloaderImagePullPolicy,
filelogOffsetSynchImage: filelogOffsetSynchImage,
filelogOffsetSynchImagePullPolicy: filelogOffsetSynchImagePullPolicy,
}, nil
}

return nil
}

func readOptionalPullPolicyFromEnvironmentVariable(envVarName string) corev1.PullPolicy {
Expand All @@ -372,7 +396,6 @@ func startDash0Controllers(
ctx context.Context,
mgr manager.Manager,
clientset *kubernetes.Clientset,
envVars *environmentVariables,
developmentMode bool,
) error {
oTelCollectorBaseUrl :=
Expand All @@ -392,21 +415,13 @@ func startDash0Controllers(
FilelogOffsetSynchImagePullPolicy: envVars.filelogOffsetSynchImagePullPolicy,
}

var deploymentSelfReference *appsv1.Deployment
var err error

if deploymentSelfReference, err = executeStartupTasks(
executeStartupTasks(
ctx,
clientset,
mgr.GetEventRecorderFor("dash0-startup-tasks"),
images,
oTelCollectorBaseUrl,
envVars.operatorNamespace,
envVars.deploymentName,
&setupLog,
); err != nil {
return err
}
)

logCurrentSelfMonitoringSettings(deploymentSelfReference)

Expand Down Expand Up @@ -437,7 +452,7 @@ func startDash0Controllers(
OperatorNamespace: envVars.operatorNamespace,
OTelCollectorNamePrefix: envVars.oTelCollectorNamePrefix,
}
if err = backendConnectionReconciler.SetupWithManager(mgr); err != nil {
if err := backendConnectionReconciler.SetupWithManager(mgr); err != nil {
return fmt.Errorf("unable to set up the backend connection reconciler: %w", err)
}

Expand All @@ -450,7 +465,7 @@ func startDash0Controllers(
Images: images,
DevelopmentMode: developmentMode,
}
if err = operatorConfigurationReconciler.SetupWithManager(mgr); err != nil {
if err := operatorConfigurationReconciler.SetupWithManager(mgr); err != nil {
return fmt.Errorf("unable to set up the operator configuration reconciler: %w", err)
}
operatorConfigurationReconciler.InitializeSelfMonitoringMetrics(
Expand All @@ -467,7 +482,7 @@ func startDash0Controllers(
Images: images,
OperatorNamespace: envVars.operatorNamespace,
}
if err = monitoringReconciler.SetupWithManager(mgr); err != nil {
if err := monitoringReconciler.SetupWithManager(mgr); err != nil {
return fmt.Errorf("unable to set up the monitoring reconciler: %w", err)
}
monitoringReconciler.InitializeSelfMonitoringMetrics(
Expand All @@ -477,7 +492,7 @@ func startDash0Controllers(
)

if os.Getenv("ENABLE_WEBHOOK") != "false" {
if err = (&webhook.Handler{
if err := (&webhook.Handler{
Client: k8sClient,
Recorder: mgr.GetEventRecorderFor("dash0-webhook"),
Images: images,
Expand All @@ -492,46 +507,16 @@ func startDash0Controllers(
return nil
}

func executeStartupTasks(
ctx context.Context,
clientset *kubernetes.Clientset,
eventRecorder record.EventRecorder,
images util.Images,
oTelCollectorBaseUrl string,
operatorNamespace string,
deploymentName string,
logger *logr.Logger,
) (*appsv1.Deployment, error) {
func initStartupTasksK8sClient(logger *logr.Logger) error {
cfg := ctrl.GetConfigOrDie()
startupTasksK8sClient, err := client.New(cfg, client.Options{
var err error
if startupTasksK8sClient, err = client.New(cfg, client.Options{
Scheme: scheme,
})
if err != nil {
}); err != nil {
logger.Error(err, "failed to create Kubernetes API client for startup tasks")
return nil, err
}

instrumentAtStartup(
ctx,
startupTasksK8sClient,
clientset,
eventRecorder,
images,
oTelCollectorBaseUrl,
)

deploymentSelfReference, err := findDeploymentSelfReference(
ctx,
startupTasksK8sClient,
operatorNamespace,
deploymentName,
logger,
)
if err != nil {
return nil, err
return err
}

return deploymentSelfReference, nil
return nil
}

func findDeploymentSelfReference(
Expand All @@ -540,23 +525,40 @@ func findDeploymentSelfReference(
operatorNamespace string,
deploymentName string,
logger *logr.Logger,
) (*appsv1.Deployment, error) {
deploymentSelfReference := &appsv1.Deployment{}
) error {
deploymentSelfReference = &appsv1.Deployment{}
fullyQualifiedName := fmt.Sprintf("%s/%s", operatorNamespace, deploymentName)
if err := k8sClient.Get(ctx, client.ObjectKey{
Namespace: operatorNamespace,
Name: deploymentName,
}, deploymentSelfReference); err != nil {
logger.Error(err, "failed to get self reference for controller deployment")
return nil, err
return err
}
if deploymentSelfReference.UID == "" {
msg := fmt.Sprintf("self reference for controller deployment %s has no UID", fullyQualifiedName)
err := fmt.Errorf(msg)
logger.Error(err, msg)
return nil, err
return err
}
return deploymentSelfReference, nil
return nil
}

func executeStartupTasks(
ctx context.Context,
clientset *kubernetes.Clientset,
eventRecorder record.EventRecorder,
images util.Images,
oTelCollectorBaseUrl string,
) {
instrumentAtStartup(
ctx,
startupTasksK8sClient,
clientset,
eventRecorder,
images,
oTelCollectorBaseUrl,
)
}

func instrumentAtStartup(
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/onsi/ginkgo/v2 v2.20.2
github.com/onsi/gomega v1.34.2
go.opentelemetry.io/collector/pdata v1.15.0
go.opentelemetry.io/collector/semconv v0.109.0
go.opentelemetry.io/otel/metric v1.30.0
gopkg.in/yaml.v3 v3.0.1
k8s.io/api v0.31.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.opentelemetry.io/collector/pdata v1.15.0 h1:q/T1sFpRKJnjDrUsHdJ6mq4uSqViR/f92yvGwDby/gY=
go.opentelemetry.io/collector/pdata v1.15.0/go.mod h1:2wcsTIiLAJSbqBq/XUUYbi+cP+N87d0jEJzmb9nT19U=
go.opentelemetry.io/collector/semconv v0.109.0 h1:6CStOFOVhdrzlHg51kXpcPHRKPh5RtV7z/wz+c1TG1g=
go.opentelemetry.io/collector/semconv v0.109.0/go.mod h1:zCJ5njhWpejR+A40kiEoeFm1xq1uzyZwMnRNX6/D82A=
go.opentelemetry.io/otel v1.30.0 h1:F2t8sK4qf1fAmY9ua4ohFS/K+FUuOPemHUIXHtktrts=
go.opentelemetry.io/otel v1.30.0/go.mod h1:tFw4Br9b7fOS+uEao81PJjVMjW/5fvNCbpsDIXqP0pc=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.30.0 h1:WypxHH02KX2poqqbaadmkMYalGyy/vil4HE4PM4nRJc=
Expand Down
2 changes: 1 addition & 1 deletion images/configreloader/src/configreloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func main() {
done := make(chan bool, 1)
signal.Notify(shutdown, syscall.SIGTERM)

meter, selfMonitoringShutdownFunctions := common.InitOTelSdk(ctx, meterName)
meter, selfMonitoringShutdownFunctions := common.InitOTelSdk(ctx, meterName, nil)
initializeSelfMonitoringMetrics(meter)

go func() {
Expand Down
2 changes: 1 addition & 1 deletion images/filelogoffsetsynch/src/filelogoffsetsynch.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func main() {
log.Fatalf("Cannot create the Kube API client: %v\n", err)
}

meter, selfMonitoringShutdownFunctions := common.InitOTelSdk(ctx, meterName)
meter, selfMonitoringShutdownFunctions := common.InitOTelSdk(ctx, meterName, nil)
initializeSelfMonitoringMetrics(meter)

// creates the clientset
Expand Down
13 changes: 9 additions & 4 deletions images/pkg/common/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
otelmetric "go.opentelemetry.io/otel/metric"
Expand All @@ -26,6 +27,7 @@ var (
func InitOTelSdk(
ctx context.Context,
meterName string,
extraResourceAttributes map[string]string,
) (otelmetric.Meter, []func(ctx context.Context) error) {
podUid, isSet := os.LookupEnv("K8S_POD_UID")
if !isSet {
Expand Down Expand Up @@ -65,11 +67,14 @@ func InitOTelSdk(
log.Fatalf("Unexpected OTLP protocol set as value of the 'OTEL_EXPORTER_OTLP_PROTOCOL' environment variable: %v", protocol)
}

attributes := make([]attribute.KeyValue, 0, len(extraResourceAttributes)+2)
attributes = append(attributes, semconv.K8SPodUID(podUid))
attributes = append(attributes, semconv.K8SNodeName(nodeName))
for key, value := range extraResourceAttributes {
attributes = append(attributes, attribute.String(key, value))
}
resourceAttributes, err := resource.New(ctx,
resource.WithAttributes(
semconv.K8SPodUID(podUid),
semconv.K8SNodeName(nodeName),
),
resource.WithAttributes(attributes...),
)
if err != nil {
log.Fatalf("Cannot initialize the OpenTelemetry resource: %v", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ processors:

resourcedetection:
detectors:
# Note: Adding the "env" detector here would tag all metrics for all monitored resources with the key-value pairs
# from the OTEL_RESOURCE_ATTRIBUTES that we attach to the collector deployment via self-monitoring.
- system
- eks
- ecs
Expand Down
Loading

0 comments on commit 37c1bcb

Please sign in to comment.