Skip to content

Commit

Permalink
feat(metrics): collect node & pod metrics via kubeletstats receiver
Browse files Browse the repository at this point in the history
  • Loading branch information
basti1302 committed Sep 4, 2024
1 parent 4da4cb5 commit 0b542b1
Show file tree
Hide file tree
Showing 9 changed files with 87 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ rules:
resources:
- pods
- namespaces
- nodes/stats
verbs:
- get
- watch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ cluster roles should match snapshot:
resources:
- pods
- namespaces
- nodes/stats
verbs:
- get
- watch
Expand Down
1 change: 1 addition & 0 deletions images/collector/src/builder/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ exporters:
receivers:
- gomod: "go.opentelemetry.io/collector/receiver/otlpreceiver v0.106.1"
- gomod: "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/filelogreceiver v0.106.1"
- gomod: "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kubeletstatsreceiver v0.106.1"

processors:
- gomod: "go.opentelemetry.io/collector/processor/batchprocessor v0.106.1"
Expand Down
10 changes: 10 additions & 0 deletions internal/backendconnection/otelcolresources/config.yaml.template
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,15 @@ receivers:
endpoint: ${env:MY_POD_IP}:4317
http:
endpoint: ${env:MY_POD_IP}:4318

kubeletstats:
auth_type: serviceAccount
collection_interval: 20s
endpoint: ${env:K8S_NODE_NAME}:10250
{{- if .DevelopmentMode }}
insecure_skip_verify: true
{{- end }}

# TODO Turn on conditionally for monitored namespaces
filelog/monitored_pods:
include:
Expand Down Expand Up @@ -186,6 +195,7 @@ service:
metrics/downstream:
receivers:
- otlp
- kubeletstats
processors:
- k8sattributes
- memory_limiter
Expand Down
9 changes: 7 additions & 2 deletions internal/backendconnection/otelcolresources/desired_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,13 @@ func assembleClusterRole(config *oTelColConfig) *rbacv1.ClusterRole {
Rules: []rbacv1.PolicyRule{
{
APIGroups: []string{""},
Resources: []string{"pods", "namespaces"},
Verbs: []string{"get", "watch", "list"},
Resources: []string{
"pods",
"namespaces",
// required for Kubelet Metrics/Kubeletstats receiver
"nodes/stats",
},
Verbs: []string{"get", "watch", "list"},
},
{
APIGroups: []string{"apps"},
Expand Down
16 changes: 14 additions & 2 deletions internal/backendconnection/otelcolresources/otelcol_resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,21 @@ func (m *OTelColResourceManager) DeleteResources(
}
var allErrors []error
for _, object := range allObjects {
err := m.Client.Delete(ctx, object)
err = m.Client.Delete(ctx, object)
if err != nil {
allErrors = append(allErrors, err)
if apierrors.IsNotFound(err) {
logger.Info(
"wanted to delete a resource, but it did not exist",
"name",
object.GetName(),
"namespace",
object.GetNamespace(),
"kind",
object.GetObjectKind().GroupVersionKind(),
)
} else {
allErrors = append(allErrors, err)
}
} else {
logger.Info(
"deleted resource",
Expand Down
11 changes: 9 additions & 2 deletions test/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ var _ = Describe("Dash0 Kubernetes Operator", Ordered, func() {
})
})

Describe("self-monitoring", func() {
Describe("metrics & self-monitoring", func() {
BeforeAll(func() {
By("deploy the Dash0 operator")
deployOperator(operatorNamespace, operatorHelmChart, operatorHelmChartUrl, images, true)
Expand All @@ -617,14 +617,21 @@ var _ = Describe("Dash0 Kubernetes Operator", Ordered, func() {
operatorHelmChart,
)

time.Sleep(15 * time.Second)
time.Sleep(10 * time.Second)
})

AfterEach(func() {
undeployDash0MonitoringResource(applicationUnderTestNamespace)
undeployDash0OperatorConfigurationResource()
})

It("should produce metrics", func() {
By("waiting for metrics")
Eventually(func(g Gomega) {
verifyKubeletStatsMetrics(g)
}, 50*time.Second, time.Second).Should(Succeed())
})

It("should produce self-monitoring telemetry", func() {
By("updating the Dash0 monitoring resource endpoint setting")
newEndpoint := "ingress.us-east-2.aws.dash0-dev.com:4317"
Expand Down
42 changes: 42 additions & 0 deletions test/e2e/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,48 @@ var (
metricsUnmarshaller = &pmetric.JSONUnmarshaler{}
)

func verifyKubeletStatsMetrics(g Gomega) {
resourceMatchFn := func(resourceMetrics pmetric.ResourceMetrics) bool {
attributes := resourceMetrics.Resource().Attributes()
var isSet bool

serviceNamespace, isSet := attributes.Get("service.namespace")
if isSet && serviceNamespace.Str() == "dash0.operator" {
return false
}
service, isSet := attributes.Get("service.name")
if isSet && service.Str() == "service.version" {
return false
}
_, isSet = attributes.Get("k8s.node.name")
if !isSet {
return false
}
_, isSet = attributes.Get("k8s.pod.uid")

return isSet
}

metricMatchFn := func(metric pmetric.Metric) bool {
return strings.HasPrefix(metric.Name(), "k8s.node.") || strings.HasPrefix(metric.Name(), "k8s.pod.")
}

metricsFound := findMatchingMetrics(g, resourceMatchFn, metricMatchFn)
g.Expect(metricsFound).To(BeTrue(), "expected to find at least one kubeletstat metric")
}

func findMatchingMetrics(
g Gomega,
resourceMatchFn func(resourceMetrics pmetric.ResourceMetrics) bool,
metricMatchFn func(metric pmetric.Metric) bool,
) bool {
return fileHasMatchingMetrics(
g,
resourceMatchFn,
metricMatchFn,
)
}

//nolint:all
func fileHasMatchingMetrics(
g Gomega,
Expand Down
11 changes: 2 additions & 9 deletions test/util/operator_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,14 +234,13 @@ func VerifyOperatorConfigurationResourceByNameDoesNotExist(
}

func RemoveOperatorConfigurationResource(ctx context.Context, k8sClient client.Client) {
RemoveOperatorConfigurationResourceByName(ctx, k8sClient, OperatorConfigurationResourceName, true)
RemoveOperatorConfigurationResourceByName(ctx, k8sClient, OperatorConfigurationResourceName)
}

func RemoveOperatorConfigurationResourceByName(
ctx context.Context,
k8sClient client.Client,
name string,
failOnErr bool,
) {
By("Removing the Dash0 operator configuration resource instance")
if resource := LoadOperatorConfigurationResourceByNameIfItExists(
Expand All @@ -250,12 +249,6 @@ func RemoveOperatorConfigurationResourceByName(
Default,
name,
); resource != nil {
err := k8sClient.Delete(ctx, resource)
if failOnErr {
// If the test already triggered the deletion of the operator resource, but it was blocked by the
// finalizer; removing the finalizer may immediately delete the operator resource. In these cases it is
// okay to ignore the error from k8sClient.Delete(ctx, dash0OperatorResource).
Expect(err).NotTo(HaveOccurred())
}
Expect(k8sClient.Delete(ctx, resource)).To(Succeed())
}
}

0 comments on commit 0b542b1

Please sign in to comment.