Skip to content

Commit

Permalink
Collectors self-telemetry pipelines. (#1431)
Browse files Browse the repository at this point in the history
Send collector metrics from node and cluster collectors.
The goal is to have metrics about the data sent and throughput from
sources and to destinations.

The main parts of this PR are:
* Collector configuration for collecting self-telemetry. This includes
definning the self-telemetry pipeline in the node and cluster
collectors.
* `odigostrafficmetrics` which is a processor that additional collects
metrics to those that are already provided by the collector. This
processor can be configured to attach different attributes to the
metrics. In addition, it is possible to configure the processor to
perform its measurements on a fraction of the spans/metrics/logs.
* `collectormetrics` package in the UI server which acts as an OTLP
receiver, and exposes endpoints for the UI.
  • Loading branch information
RonFed authored Aug 12, 2024
1 parent 1829f00 commit 7c8d508
Show file tree
Hide file tree
Showing 49 changed files with 2,616 additions and 196 deletions.
6 changes: 5 additions & 1 deletion api/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.22.0

require (
github.com/odigos-io/odigos/common v0.0.0
github.com/odigos-io/odigos/k8sutils v0.0.0
github.com/stretchr/testify v1.8.4
k8s.io/api v0.30.1
k8s.io/apimachinery v0.30.3
Expand Down Expand Up @@ -66,4 +67,7 @@ require (
sigs.k8s.io/yaml v1.3.0
)

replace github.com/odigos-io/odigos/common => ../common
replace (
github.com/odigos-io/odigos/common => ../common
github.com/odigos-io/odigos/k8sutils => ../k8sutils
)
7 changes: 4 additions & 3 deletions api/odigos/v1alpha1/collectorsgroup_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@ package v1alpha1

import (
"github.com/odigos-io/odigos/common"
k8sconsts "github.com/odigos-io/odigos/k8sutils/pkg/consts"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// +kubebuilder:validation:Enum=CLUSTER_GATEWAY;NODE_COLLECTOR
type CollectorsGroupRole string
type CollectorsGroupRole k8sconsts.CollectorRole

const (
CollectorsGroupRoleClusterGateway CollectorsGroupRole = "CLUSTER_GATEWAY"
CollectorsGroupRoleNodeCollector CollectorsGroupRole = "NODE_COLLECTOR"
CollectorsGroupRoleClusterGateway CollectorsGroupRole = CollectorsGroupRole(k8sconsts.CollectorsRoleClusterGateway)
CollectorsGroupRoleNodeCollector CollectorsGroupRole = CollectorsGroupRole(k8sconsts.CollectorsRoleNodeCollector)
)

// CollectorsGroupSpec defines the desired state of Collector
Expand Down
82 changes: 75 additions & 7 deletions autoscaler/controllers/datacollection/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ import (

"github.com/odigos-io/odigos/autoscaler/controllers/datacollection/custom"
"github.com/odigos-io/odigos/common"
"github.com/odigos-io/odigos/common/consts"

"github.com/ghodss/yaml"
odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1"
commonconf "github.com/odigos-io/odigos/autoscaler/controllers/common"
"github.com/odigos-io/odigos/common/config"
"github.com/odigos-io/odigos/k8sutils/pkg/consts"
constsK8s "github.com/odigos-io/odigos/k8sutils/pkg/consts"
"github.com/odigos-io/odigos/k8sutils/pkg/env"
v1 "k8s.io/api/core/v1"
Expand All @@ -24,6 +24,8 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"

semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
)

func SyncConfigMap(apps *odigosv1.InstrumentedApplicationList, dests *odigosv1.DestinationList, allProcessors *odigosv1.ProcessorList,
Expand All @@ -42,7 +44,7 @@ func SyncConfigMap(apps *odigosv1.InstrumentedApplicationList, dests *odigosv1.D
logger.Error(err, "failed to get desired config map")
return "", err
}
desiredData := desired.Data[consts.OdigosNodeCollectorConfigMapKey]
desiredData := desired.Data[constsK8s.OdigosNodeCollectorConfigMapKey]

existing := &v1.ConfigMap{}
if err := c.Get(ctx, client.ObjectKey{Namespace: datacollection.Namespace, Name: datacollection.Name}, existing); err != nil {
Expand Down Expand Up @@ -108,7 +110,7 @@ func getDesiredConfigMap(apps *odigosv1.InstrumentedApplicationList, dests *odig
Namespace: datacollection.Namespace,
},
Data: map[string]string{
consts.OdigosNodeCollectorConfigMapKey: cmData,
constsK8s.OdigosNodeCollectorConfigMapKey: cmData,
},
}

Expand Down Expand Up @@ -144,6 +146,23 @@ func calculateConfigMapData(apps *odigosv1.InstrumentedApplicationList, dests *o
}},
}
processorsCfg["resourcedetection"] = config.GenericMap{"detectors": []string{"ec2", "gcp", "azure"}}
processorsCfg["odigostrafficmetrics"] = config.GenericMap{
// adding the following resource attributes to the metrics allows to aggregate the metrics by source.
"res_attributes_keys": []string{
string(semconv.ServiceNameKey),
string(semconv.K8SNamespaceNameKey),
string(semconv.K8SDeploymentNameKey),
string(semconv.K8SStatefulSetNameKey),
string(semconv.K8SDaemonSetNameKey),
},
}
processorsCfg["resource/pod-name"] = config.GenericMap{
"attributes": []config.GenericMap{{
"key": "k8s.pod.name",
"value": "${POD_NAME}",
"action": "upsert",
}},
}

exporters := config.GenericMap{
"otlp/gateway": config.GenericMap{
Expand All @@ -152,6 +171,15 @@ func calculateConfigMapData(apps *odigosv1.InstrumentedApplicationList, dests *o
"insecure": true,
},
},
"otlp/odigos-own-telemetry-ui": config.GenericMap{
"endpoint": fmt.Sprintf("ui.%s:%d", env.GetCurrentNamespace(), consts.OTLPPort),
"tls": config.GenericMap{
"insecure": true,
},
"retry_on_failure": config.GenericMap{
"enabled": false,
},
},
}
tracesPipelineExporter := []string{"otlp/gateway"}

Expand All @@ -175,6 +203,28 @@ func calculateConfigMapData(apps *odigosv1.InstrumentedApplicationList, dests *o
},
},
},
"prometheus/self-metrics": config.GenericMap{
"config": config.GenericMap{
"scrape_configs": []config.GenericMap{
{
"job_name": "otelcol",
"scrape_interval": "10s",
"static_configs": []config.GenericMap{
{
"targets": []string{"127.0.0.1:8888"},
},
},
"metric_relabel_configs": []config.GenericMap{
{
"source_labels": []string{"__name__"},
"regex": "(.*odigos.*|^otelcol_processor_accepted.*)",
"action": "keep",
},
},
},
},
},
},
},
Exporters: exporters,
Processors: processorsCfg,
Expand All @@ -184,8 +234,26 @@ func calculateConfigMapData(apps *odigosv1.InstrumentedApplicationList, dests *o
},
},
Service: config.Service{
Pipelines: map[string]config.Pipeline{},
Pipelines: map[string]config.Pipeline{
"metrics/otelcol": {
Receivers: []string{"prometheus/self-metrics"},
Processors: []string{"resource/pod-name"},
Exporters: []string{"otlp/odigos-own-telemetry-ui"},
},
},
Extensions: []string{"health_check"},
Telemetry: config.Telemetry{
Metrics: config.GenericMap{
"address": "0.0.0.0:8888",
},
Resource: map[string]*string{
// The collector add "otelcol" as a service name, so we need to remove it
// to avoid duplication, since we are interested in the instrumented services.
string(semconv.ServiceNameKey): nil,
// The collector adds its own version as a service version, which is not needed currently.
string(semconv.ServiceVersionKey): nil,
},
},
},
}

Expand Down Expand Up @@ -245,15 +313,15 @@ func calculateConfigMapData(apps *odigosv1.InstrumentedApplicationList, dests *o

cfg.Service.Pipelines["logs"] = config.Pipeline{
Receivers: []string{"filelog"},
Processors: append([]string{"batch", "odigosresourcename", "resource", "resourcedetection"}, logsProcessors...),
Processors: append([]string{"batch", "odigosresourcename", "resource", "resourcedetection", "odigostrafficmetrics"}, logsProcessors...),
Exporters: []string{"otlp/gateway"},
}
}

if collectTraces {
cfg.Service.Pipelines["traces"] = config.Pipeline{
Receivers: []string{"otlp"},
Processors: append([]string{"batch", "odigosresourcename", "resource", "resourcedetection"}, tracesProcessors...),
Processors: append([]string{"batch", "odigosresourcename", "resource", "resourcedetection", "odigostrafficmetrics"}, tracesProcessors...),
Exporters: tracesPipelineExporter,
}
}
Expand All @@ -268,7 +336,7 @@ func calculateConfigMapData(apps *odigosv1.InstrumentedApplicationList, dests *o

cfg.Service.Pipelines["metrics"] = config.Pipeline{
Receivers: []string{"otlp", "kubeletstats"},
Processors: append([]string{"batch", "odigosresourcename", "resource", "resourcedetection"}, metricsProcessors...),
Processors: append([]string{"batch", "odigosresourcename", "resource", "resourcedetection", "odigostrafficmetrics"}, metricsProcessors...),
Exporters: []string{"otlp/gateway"},
}
}
Expand Down
11 changes: 9 additions & 2 deletions autoscaler/controllers/datacollection/daemonset.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
)

const (
collectorLabel = "odigos.io/data-collection"
containerName = "data-collection"
containerImage = "keyval/odigos-collector"
containerCommand = "/odigosotelcol"
Expand All @@ -34,7 +33,7 @@ const (

var (
commonLabels = map[string]string{
collectorLabel: "true",
consts.OdigosCollectorRoleLabel: string(consts.CollectorsRoleNodeCollector),
}
)

Expand Down Expand Up @@ -276,6 +275,14 @@ func getDesiredDaemonSet(datacollection *odigosv1.CollectorsGroup, configData st
},
},
},
{
Name: "POD_NAME",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.name",
},
},
},
},
LivenessProbe: &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
Expand Down
89 changes: 89 additions & 0 deletions autoscaler/controllers/gateway/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@ package gateway

import (
"context"
"errors"
"fmt"
"reflect"

odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1"
"github.com/odigos-io/odigos/autoscaler/controllers/common"
odigoscommon "github.com/odigos-io/odigos/common"
"github.com/odigos-io/odigos/common/config"
odigosconsts "github.com/odigos-io/odigos/common/consts"
odgiosK8s "github.com/odigos-io/odigos/k8sutils/pkg/conditions"
"github.com/odigos-io/odigos/k8sutils/pkg/consts"
"github.com/odigos-io/odigos/k8sutils/pkg/env"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -23,6 +27,90 @@ const (
destinationConfiguredType = "DestinationConfigured"
)

var (
errNoPipelineConfigured = errors.New("no pipeline was configured, cannot add self telemetry pipeline")
errNoReceiversConfigured = errors.New("no receivers were configured, cannot add self telemetry pipeline")
errNoExportersConfigured = errors.New("no exporters were configured, cannot add self telemetry pipeline")
)

func addSelfTelemetryPipeline(c *config.Config) error {
if c.Service.Pipelines == nil {
return errNoPipelineConfigured
}
if c.Receivers == nil {
return errNoReceiversConfigured
}
if c.Exporters == nil {
return errNoExportersConfigured
}
c.Receivers["prometheus/self-metrics"] = config.GenericMap{
"config": config.GenericMap{
"scrape_configs": []config.GenericMap{
{
"job_name": "otelcol",
"scrape_interval": "10s",
"static_configs": []config.GenericMap{
{
"targets": []string{"127.0.0.1:8888"},
},
},
"metric_relabel_configs": []config.GenericMap{
{
"source_labels": []string{"__name__"},
"regex": "(.*odigos.*|^otelcol_processor_accepted.*|^otelcol_exporter_sent.*)",
"action": "keep",
},
},
},
},
},
}
if c.Processors == nil {
c.Processors = make(config.GenericMap)
}
c.Processors["resource/pod-name"] = config.GenericMap{
"attributes": []config.GenericMap{
{
"key": "k8s.pod.name",
"value": "${POD_NAME}",
"action": "upsert",
},
},
}
// odigostrafficmetrics processor should be the last processor in the pipeline
// as it helps to calculate the size of the data being exported.
// In case of performance impact caused by this processor, we should modify this config to reduce the sampling ratio.
c.Processors["odigostrafficmetrics"] = struct{}{}
c.Exporters["otlp/odigos-own-telemetry-ui"] = config.GenericMap{
"endpoint": fmt.Sprintf("ui.%s:%d", env.GetCurrentNamespace(), odigosconsts.OTLPPort),
"tls": config.GenericMap{
"insecure": true,
},
"retry_on_failure": config.GenericMap{
"enabled": false,
},
}
c.Service.Pipelines["metrics/otelcol"] = config.Pipeline{
Receivers: []string{"prometheus/self-metrics"},
Processors: []string{"resource/pod-name"},
Exporters: []string{"otlp/odigos-own-telemetry-ui"},
}

c.Service.Telemetry.Metrics = config.GenericMap{
"address": "0.0.0.0:8888",
}

for pipelineName, pipeline := range c.Service.Pipelines {
if pipelineName == "metrics/otelcol" {
continue
}
pipeline.Processors = append(pipeline.Processors, "odigostrafficmetrics")
c.Service.Pipelines[pipelineName] = pipeline
}

return nil
}

func syncConfigMap(dests *odigosv1.DestinationList, allProcessors *odigosv1.ProcessorList, gateway *odigosv1.CollectorsGroup, ctx context.Context, c client.Client, scheme *runtime.Scheme, memConfig *memoryConfigurations) (string, []odigoscommon.ObservabilitySignal, error) {
logger := log.FromContext(ctx)

Expand All @@ -38,6 +126,7 @@ func syncConfigMap(dests *odigosv1.DestinationList, allProcessors *odigosv1.Proc
common.ToExporterConfigurerArray(dests),
common.ToProcessorConfigurerArray(processors),
memoryLimiterConfiguration,
addSelfTelemetryPipeline,
)
if err != nil {
logger.Error(err, "Failed to calculate config")
Expand Down
Loading

0 comments on commit 7c8d508

Please sign in to comment.