Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collectors self-telemetry pipelines. #1431

Merged
merged 21 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion api/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.22.0

require (
github.com/odigos-io/odigos/common v0.0.0
github.com/odigos-io/odigos/k8sutils v0.0.0
github.com/stretchr/testify v1.8.4
k8s.io/api v0.30.1
k8s.io/apimachinery v0.30.3
Expand Down Expand Up @@ -66,4 +67,7 @@ require (
sigs.k8s.io/yaml v1.3.0
)

replace github.com/odigos-io/odigos/common => ../common
replace (
github.com/odigos-io/odigos/common => ../common
github.com/odigos-io/odigos/k8sutils => ../k8sutils
)
7 changes: 4 additions & 3 deletions api/odigos/v1alpha1/collectorsgroup_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@ package v1alpha1

import (
"github.com/odigos-io/odigos/common"
k8sconsts "github.com/odigos-io/odigos/k8sutils/pkg/consts"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// +kubebuilder:validation:Enum=CLUSTER_GATEWAY;NODE_COLLECTOR
type CollectorsGroupRole string
type CollectorsGroupRole k8sconsts.CollectorRole

const (
CollectorsGroupRoleClusterGateway CollectorsGroupRole = "CLUSTER_GATEWAY"
CollectorsGroupRoleNodeCollector CollectorsGroupRole = "NODE_COLLECTOR"
CollectorsGroupRoleClusterGateway CollectorsGroupRole = CollectorsGroupRole(k8sconsts.CollectorsRoleClusterGateway)
CollectorsGroupRoleNodeCollector CollectorsGroupRole = CollectorsGroupRole(k8sconsts.CollectorsRoleNodeCollector)
)

// CollectorsGroupSpec defines the desired state of Collector
Expand Down
82 changes: 75 additions & 7 deletions autoscaler/controllers/datacollection/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ import (

"github.com/odigos-io/odigos/autoscaler/controllers/datacollection/custom"
"github.com/odigos-io/odigos/common"
"github.com/odigos-io/odigos/common/consts"

"github.com/ghodss/yaml"
odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1"
commonconf "github.com/odigos-io/odigos/autoscaler/controllers/common"
"github.com/odigos-io/odigos/common/config"
"github.com/odigos-io/odigos/k8sutils/pkg/consts"
constsK8s "github.com/odigos-io/odigos/k8sutils/pkg/consts"
"github.com/odigos-io/odigos/k8sutils/pkg/env"
v1 "k8s.io/api/core/v1"
Expand All @@ -24,6 +24,8 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"

semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
)

func SyncConfigMap(apps *odigosv1.InstrumentedApplicationList, dests *odigosv1.DestinationList, allProcessors *odigosv1.ProcessorList,
Expand All @@ -42,7 +44,7 @@ func SyncConfigMap(apps *odigosv1.InstrumentedApplicationList, dests *odigosv1.D
logger.Error(err, "failed to get desired config map")
return "", err
}
desiredData := desired.Data[consts.OdigosNodeCollectorConfigMapKey]
desiredData := desired.Data[constsK8s.OdigosNodeCollectorConfigMapKey]

existing := &v1.ConfigMap{}
if err := c.Get(ctx, client.ObjectKey{Namespace: datacollection.Namespace, Name: datacollection.Name}, existing); err != nil {
Expand Down Expand Up @@ -108,7 +110,7 @@ func getDesiredConfigMap(apps *odigosv1.InstrumentedApplicationList, dests *odig
Namespace: datacollection.Namespace,
},
Data: map[string]string{
consts.OdigosNodeCollectorConfigMapKey: cmData,
constsK8s.OdigosNodeCollectorConfigMapKey: cmData,
},
}

Expand Down Expand Up @@ -144,6 +146,23 @@ func calculateConfigMapData(apps *odigosv1.InstrumentedApplicationList, dests *o
}},
}
processorsCfg["resourcedetection"] = config.GenericMap{"detectors": []string{"ec2", "gcp", "azure"}}
processorsCfg["odigostrafficmetrics"] = config.GenericMap{
// adding the following resource attributes to the metrics allows to aggregate the metrics by source.
"res_attributes_keys": []string{
string(semconv.ServiceNameKey),
string(semconv.K8SNamespaceNameKey),
string(semconv.K8SDeploymentNameKey),
string(semconv.K8SStatefulSetNameKey),
string(semconv.K8SDaemonSetNameKey),
},
}
processorsCfg["resource/pod-name"] = config.GenericMap{
"attributes": []config.GenericMap{{
"key": "k8s.pod.name",
"value": "${POD_NAME}",
"action": "upsert",
}},
}

exporters := config.GenericMap{
"otlp/gateway": config.GenericMap{
Expand All @@ -152,6 +171,15 @@ func calculateConfigMapData(apps *odigosv1.InstrumentedApplicationList, dests *o
"insecure": true,
},
},
"otlp/odigos-own-telemetry-ui": config.GenericMap{
"endpoint": fmt.Sprintf("ui.%s:%d", env.GetCurrentNamespace(), consts.OTLPPort),
"tls": config.GenericMap{
"insecure": true,
},
"retry_on_failure": config.GenericMap{
"enabled": false,
},
},
}
tracesPipelineExporter := []string{"otlp/gateway"}

Expand All @@ -175,6 +203,28 @@ func calculateConfigMapData(apps *odigosv1.InstrumentedApplicationList, dests *o
},
},
},
"prometheus/self-metrics": config.GenericMap{
"config": config.GenericMap{
"scrape_configs": []config.GenericMap{
{
"job_name": "otelcol",
"scrape_interval": "10s",
"static_configs": []config.GenericMap{
{
"targets": []string{"127.0.0.1:8888"},
},
},
"metric_relabel_configs": []config.GenericMap{
{
"source_labels": []string{"__name__"},
"regex": "(.*odigos.*|^otelcol_processor_accepted.*)",
"action": "keep",
},
},
},
},
},
},
},
Exporters: exporters,
Processors: processorsCfg,
Expand All @@ -184,8 +234,26 @@ func calculateConfigMapData(apps *odigosv1.InstrumentedApplicationList, dests *o
},
},
Service: config.Service{
Pipelines: map[string]config.Pipeline{},
Pipelines: map[string]config.Pipeline{
"metrics/otelcol": {
Receivers: []string{"prometheus/self-metrics"},
Processors: []string{"resource/pod-name"},
Exporters: []string{"otlp/odigos-own-telemetry-ui"},
},
},
Extensions: []string{"health_check"},
Telemetry: config.Telemetry{
Metrics: config.GenericMap{
"address": "0.0.0.0:8888",
},
Resource: map[string]*string{
// The collector add "otelcol" as a service name, so we need to remove it
// to avoid duplication, since we are interested in the instrumented services.
string(semconv.ServiceNameKey): nil,
// The collector adds its own version as a service version, which is not needed currently.
string(semconv.ServiceVersionKey): nil,
},
},
},
}

Expand Down Expand Up @@ -245,15 +313,15 @@ func calculateConfigMapData(apps *odigosv1.InstrumentedApplicationList, dests *o

cfg.Service.Pipelines["logs"] = config.Pipeline{
Receivers: []string{"filelog"},
Processors: append([]string{"batch", "odigosresourcename", "resource", "resourcedetection"}, logsProcessors...),
Processors: append([]string{"batch", "odigosresourcename", "resource", "resourcedetection", "odigostrafficmetrics"}, logsProcessors...),
Exporters: []string{"otlp/gateway"},
}
}

if collectTraces {
cfg.Service.Pipelines["traces"] = config.Pipeline{
Receivers: []string{"otlp"},
Processors: append([]string{"batch", "odigosresourcename", "resource", "resourcedetection"}, tracesProcessors...),
Processors: append([]string{"batch", "odigosresourcename", "resource", "resourcedetection", "odigostrafficmetrics"}, tracesProcessors...),
Exporters: tracesPipelineExporter,
}
}
Expand All @@ -268,7 +336,7 @@ func calculateConfigMapData(apps *odigosv1.InstrumentedApplicationList, dests *o

cfg.Service.Pipelines["metrics"] = config.Pipeline{
Receivers: []string{"otlp", "kubeletstats"},
Processors: append([]string{"batch", "odigosresourcename", "resource", "resourcedetection"}, metricsProcessors...),
Processors: append([]string{"batch", "odigosresourcename", "resource", "resourcedetection", "odigostrafficmetrics"}, metricsProcessors...),
Exporters: []string{"otlp/gateway"},
}
}
Expand Down
11 changes: 9 additions & 2 deletions autoscaler/controllers/datacollection/daemonset.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
)

const (
collectorLabel = "odigos.io/data-collection"
containerName = "data-collection"
containerImage = "keyval/odigos-collector"
containerCommand = "/odigosotelcol"
Expand All @@ -34,7 +33,7 @@ const (

var (
commonLabels = map[string]string{
collectorLabel: "true",
consts.OdigosCollectorRoleLabel: string(consts.CollectorsRoleNodeCollector),
}
)

Expand Down Expand Up @@ -276,6 +275,14 @@ func getDesiredDaemonSet(datacollection *odigosv1.CollectorsGroup, configData st
},
},
},
{
Name: "POD_NAME",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.name",
},
},
},
},
LivenessProbe: &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
Expand Down
89 changes: 89 additions & 0 deletions autoscaler/controllers/gateway/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@ package gateway

import (
"context"
"errors"
"fmt"
"reflect"

odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1"
"github.com/odigos-io/odigos/autoscaler/controllers/common"
odigoscommon "github.com/odigos-io/odigos/common"
"github.com/odigos-io/odigos/common/config"
odigosconsts "github.com/odigos-io/odigos/common/consts"
odgiosK8s "github.com/odigos-io/odigos/k8sutils/pkg/conditions"
"github.com/odigos-io/odigos/k8sutils/pkg/consts"
"github.com/odigos-io/odigos/k8sutils/pkg/env"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -23,6 +27,90 @@ const (
destinationConfiguredType = "DestinationConfigured"
)

var (
errNoPipelineConfigured = errors.New("no pipeline was configured, cannot add self telemetry pipeline")
errNoReceiversConfigured = errors.New("no receivers were configured, cannot add self telemetry pipeline")
errNoExportersConfigured = errors.New("no exporters were configured, cannot add self telemetry pipeline")
)

func addSelfTelemetryPipeline(c *config.Config) error {
if c.Service.Pipelines == nil {
return errNoPipelineConfigured
}
if c.Receivers == nil {
return errNoReceiversConfigured
}
if c.Exporters == nil {
return errNoExportersConfigured
}
c.Receivers["prometheus/self-metrics"] = config.GenericMap{
"config": config.GenericMap{
"scrape_configs": []config.GenericMap{
{
"job_name": "otelcol",
"scrape_interval": "10s",
"static_configs": []config.GenericMap{
{
"targets": []string{"127.0.0.1:8888"},
},
},
"metric_relabel_configs": []config.GenericMap{
{
"source_labels": []string{"__name__"},
"regex": "(.*odigos.*|^otelcol_processor_accepted.*|^otelcol_exporter_sent.*)",
"action": "keep",
},
},
},
},
},
}
if c.Processors == nil {
c.Processors = make(config.GenericMap)
}
c.Processors["resource/pod-name"] = config.GenericMap{
"attributes": []config.GenericMap{
{
"key": "k8s.pod.name",
"value": "${POD_NAME}",
"action": "upsert",
},
},
}
// odigostrafficmetrics processor should be the last processor in the pipeline
// as it helps to calculate the size of the data being exported.
// In case of performance impact caused by this processor, we should modify this config to reduce the sampling ratio.
c.Processors["odigostrafficmetrics"] = struct{}{}
c.Exporters["otlp/odigos-own-telemetry-ui"] = config.GenericMap{
"endpoint": fmt.Sprintf("ui.%s:%d", env.GetCurrentNamespace(), odigosconsts.OTLPPort),
"tls": config.GenericMap{
"insecure": true,
},
"retry_on_failure": config.GenericMap{
"enabled": false,
},
}
c.Service.Pipelines["metrics/otelcol"] = config.Pipeline{
Receivers: []string{"prometheus/self-metrics"},
Processors: []string{"resource/pod-name"},
Exporters: []string{"otlp/odigos-own-telemetry-ui"},
}

c.Service.Telemetry.Metrics = config.GenericMap{
"address": "0.0.0.0:8888",
}

for pipelineName, pipeline := range c.Service.Pipelines {
if pipelineName == "metrics/otelcol" {
continue
}
pipeline.Processors = append(pipeline.Processors, "odigostrafficmetrics")
c.Service.Pipelines[pipelineName] = pipeline
}

return nil
}

func syncConfigMap(dests *odigosv1.DestinationList, allProcessors *odigosv1.ProcessorList, gateway *odigosv1.CollectorsGroup, ctx context.Context, c client.Client, scheme *runtime.Scheme, memConfig *memoryConfigurations) (string, []odigoscommon.ObservabilitySignal, error) {
logger := log.FromContext(ctx)

Expand All @@ -38,6 +126,7 @@ func syncConfigMap(dests *odigosv1.DestinationList, allProcessors *odigosv1.Proc
common.ToExporterConfigurerArray(dests),
common.ToProcessorConfigurerArray(processors),
memoryLimiterConfiguration,
addSelfTelemetryPipeline,
)
if err != nil {
logger.Error(err, "Failed to calculate config")
Expand Down
Loading
Loading