Skip to content

Commit

Permalink
feat(inputs.prometheus): Control which pod metadata is added as tags (i…
Browse files Browse the repository at this point in the history
  • Loading branch information
redbaron authored Apr 3, 2023
1 parent 789a498 commit 5fdeae1
Show file tree
Hide file tree
Showing 6 changed files with 180 additions and 17 deletions.
20 changes: 12 additions & 8 deletions models/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,14 +161,7 @@ func (f *Filter) shouldNamePass(key string) bool {
// shouldFieldPass returns true if the metric should pass, false if it should drop
// based on the drop/pass filter parameters
func (f *Filter) shouldFieldPass(key string) bool {
if f.fieldPassFilter != nil && f.fieldDropFilter != nil {
return f.fieldPassFilter.Match(key) && !f.fieldDropFilter.Match(key)
} else if f.fieldPassFilter != nil {
return f.fieldPassFilter.Match(key)
} else if f.fieldDropFilter != nil {
return !f.fieldDropFilter.Match(key)
}
return true
return ShouldPassFilters(f.fieldPassFilter, f.fieldDropFilter, key)
}

// shouldTagsPass returns true if the metric should pass, false if it should drop
Expand Down Expand Up @@ -217,6 +210,17 @@ func (f *Filter) filterTags(metric telegraf.Metric) {
}
}

func ShouldPassFilters(include filter.Filter, exclude filter.Filter, key string) bool {
if include != nil && exclude != nil {
return include.Match(key) && !exclude.Match(key)
} else if include != nil {
return include.Match(key)
} else if exclude != nil {
return !exclude.Match(key)
}
return true
}

func ShouldTagsPass(passFilters []TagFilter, dropFilters []TagFilter, tags []*telegraf.Tag) bool {
pass := func(tpf []TagFilter) bool {
for _, pat := range tpf {
Expand Down
7 changes: 7 additions & 0 deletions plugins/inputs/prometheus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
# eg. To scrape pods on a specific node
# kubernetes_field_selector = "spec.nodeName=$HOSTNAME"

## Filter which pod annotations and labels will be added to metric tags
#
# pod_annotation_include = ["annotation-key-1"]
# pod_annotation_exclude = ["exclude-me"]
# pod_label_include = ["label-key-1"]
# pod_label_exclude = ["exclude-me"]

# cache refresh interval to set the interval for re-sync of pods list.
# Default is 60 minutes.
# cache_refresh_interval = 60
Expand Down
22 changes: 14 additions & 8 deletions plugins/inputs/prometheus/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"crypto/tls"
"encoding/json"
"fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/models"
"net"
"net/http"
"net/url"
Expand All @@ -15,6 +13,9 @@ import (
"strconv"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/models"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -369,10 +370,13 @@ func registerPod(pod *corev1.Pod, p *Prometheus) {
}

p.Log.Debugf("will scrape metrics from %q", targetURL.String())
// add annotation as metrics tags
tags := pod.Annotations
if tags == nil {
tags = map[string]string{}
tags := map[string]string{}

// add annotation as metrics tags, subject to include/exclude filters
for k, v := range pod.Annotations {
if models.ShouldPassFilters(p.podAnnotationIncludeFilter, p.podAnnotationExcludeFilter, k) {
tags[k] = v
}
}

tags["pod_name"] = pod.Name
Expand All @@ -382,9 +386,11 @@ func registerPod(pod *corev1.Pod, p *Prometheus) {
}
tags[podNamespace] = pod.Namespace

// add labels as metrics tags
// add labels as metrics tags, subject to include/exclude filters
for k, v := range pod.Labels {
tags[k] = v
if models.ShouldPassFilters(p.podLabelIncludeFilter, p.podLabelExcludeFilter, k) {
tags[k] = v
}
}
podURL := p.AddressToURL(targetURL, targetURL.Hostname())

Expand Down
93 changes: 92 additions & 1 deletion plugins/inputs/prometheus/kubernetes_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package prometheus

import (
"k8s.io/client-go/tools/cache"
"testing"

"k8s.io/client-go/tools/cache"

"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -264,6 +265,96 @@ func TestInvalidFieldSelector(t *testing.T) {
require.NotEqual(t, err, nil)
}

func TestAnnotationFilters(t *testing.T) {
p := pod()
p.Annotations = map[string]string{
"prometheus.io/scrape": "true",
"includeme": "true",
"excludeme": "true",
"neutral": "true",
}

cases := []struct {
desc string
include []string
exclude []string
expectedTags []string
}{
{"Just include",
[]string{"includeme"},
nil,
[]string{"includeme"}},
{"Just exclude",
nil,
[]string{"excludeme"},
[]string{"includeme", "neutral"}},
{"Include & exclude",
[]string{"includeme"},
[]string{"exludeme"},
[]string{"includeme"}},
}

for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
prom := &Prometheus{Log: testutil.Logger{}}
prom.PodAnnotationInclude = tc.include
prom.PodAnnotationExclude = tc.exclude
require.NoError(t, prom.initFilters())
registerPod(p, prom)
for _, pd := range prom.kubernetesPods {
for _, tagKey := range tc.expectedTags {
require.Contains(t, pd.Tags, tagKey)
}
}
})
}
}

func TestLabelFilters(t *testing.T) {
p := pod()
p.Annotations = map[string]string{"prometheus.io/scrape": "true"}
p.Labels = map[string]string{
"includeme": "true",
"excludeme": "true",
"neutral": "true",
}

cases := []struct {
desc string
include []string
exclude []string
expectedTags []string
}{
{"Just include",
[]string{"includeme"},
nil,
[]string{"includeme"}},
{"Just exclude",
nil,
[]string{"excludeme"},
[]string{"includeme", "neutral"}},
{"Include & exclude",
[]string{"includeme"},
[]string{"exludeme"},
[]string{"includeme"}},
}

for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
prom := &Prometheus{Log: testutil.Logger{}}
prom.PodLabelInclude = tc.include
prom.PodLabelExclude = tc.exclude
require.NoError(t, prom.initFilters())
registerPod(p, prom)
for _, pd := range prom.kubernetesPods {
for _, tagKey := range tc.expectedTags {
require.Contains(t, pd.Tags, tagKey)
}
}
})
}
}

func pod() *corev1.Pod {
p := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{}, Status: corev1.PodStatus{}, Spec: corev1.PodSpec{}}
p.Status.PodIP = "127.0.0.1"
Expand Down
48 changes: 48 additions & 0 deletions plugins/inputs/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"sync"
"time"

"github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/models"
"k8s.io/client-go/tools/cache"

Expand Down Expand Up @@ -119,6 +120,17 @@ type Prometheus struct {
NamespaceAnnotationPass map[string][]string `toml:"namespace_annotation_pass"`
NamespaceAnnotationDrop map[string][]string `toml:"namespace_annotation_drop"`

PodAnnotationInclude []string `toml:"pod_annotation_include"`
PodAnnotationExclude []string `toml:"pod_annotation_exclude"`

PodLabelInclude []string `toml:"pod_label_include"`
PodLabelExclude []string `toml:"pod_label_exclude"`

podAnnotationIncludeFilter filter.Filter
podAnnotationExcludeFilter filter.Filter
podLabelIncludeFilter filter.Filter
podLabelExcludeFilter filter.Filter

// Only for monitor_kubernetes_pods=true
CacheRefreshInterval int `toml:"cache_refresh_interval"`

Expand Down Expand Up @@ -193,6 +205,10 @@ func (p *Prometheus) Init() error {
p.nsAnnotationDrop = append(p.nsAnnotationDrop, tagFilter)
}

if err := p.initFilters(); err != nil {
return err
}

ctx := context.Background()
if p.ResponseTimeout != 0 {
p.HTTPClientConfig.Timeout = p.ResponseTimeout
Expand All @@ -211,6 +227,38 @@ func (p *Prometheus) Init() error {
return nil
}

func (p *Prometheus) initFilters() error {
if p.PodAnnotationExclude != nil {
podAnnotationExclude, err := filter.Compile(p.PodAnnotationExclude)
if err != nil {
return fmt.Errorf("error compiling 'pod_annotation_exclude': %w", err)
}
p.podAnnotationExcludeFilter = podAnnotationExclude
}
if p.PodAnnotationInclude != nil {
podAnnotationInclude, err := filter.Compile(p.PodAnnotationInclude)
if err != nil {
return fmt.Errorf("error compiling 'pod_annotation_include': %w", err)
}
p.podAnnotationIncludeFilter = podAnnotationInclude
}
if p.PodLabelExclude != nil {
podLabelExclude, err := filter.Compile(p.PodLabelExclude)
if err != nil {
return fmt.Errorf("error compiling 'pod_label_exclude': %w", err)
}
p.podLabelExcludeFilter = podLabelExclude
}
if p.PodLabelInclude != nil {
podLabelInclude, err := filter.Compile(p.PodLabelInclude)
if err != nil {
return fmt.Errorf("error compiling 'pod_label_include': %w", err)
}
p.podLabelIncludeFilter = podLabelInclude
}
return nil
}

func (p *Prometheus) AddressToURL(u *url.URL, address string) *url.URL {
host := address
if u.Port() != "" {
Expand Down
7 changes: 7 additions & 0 deletions plugins/inputs/prometheus/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@
# eg. To scrape pods on a specific node
# kubernetes_field_selector = "spec.nodeName=$HOSTNAME"

## Filter which pod annotations and labels will be added to metric tags
#
# pod_annotation_include = ["annotation-key-1"]
# pod_annotation_exclude = ["exclude-me"]
# pod_label_include = ["label-key-1"]
# pod_label_exclude = ["exclude-me"]

# cache refresh interval to set the interval for re-sync of pods list.
# Default is 60 minutes.
# cache_refresh_interval = 60
Expand Down

0 comments on commit 5fdeae1

Please sign in to comment.