diff --git a/plugins/serializers/prometheusremotewrite/README.md b/plugins/serializers/prometheusremotewrite/README.md index f44f95203fa46..882f49f314557 100644 --- a/plugins/serializers/prometheusremotewrite/README.md +++ b/plugins/serializers/prometheusremotewrite/README.md @@ -42,3 +42,4 @@ it is not included in the final metric name. Prometheus labels are produced for each tag. **Note:** String fields are ignored and do not produce Prometheus metrics. +Set **log_level** to `trace` to see all serialization issues. diff --git a/plugins/serializers/prometheusremotewrite/prometheusremotewrite.go b/plugins/serializers/prometheusremotewrite/prometheusremotewrite.go index 255f9e721be58..3f281eaed75a5 100644 --- a/plugins/serializers/prometheusremotewrite/prometheusremotewrite.go +++ b/plugins/serializers/prometheusremotewrite/prometheusremotewrite.go @@ -20,8 +20,9 @@ import ( type MetricKey uint64 type Serializer struct { - SortMetrics bool `toml:"prometheus_sort_metrics"` - StringAsLabel bool `toml:"prometheus_string_as_label"` + SortMetrics bool `toml:"prometheus_sort_metrics"` + StringAsLabel bool `toml:"prometheus_string_as_label"` + Log telegraf.Logger `toml:"-"` } func (s *Serializer) Serialize(metric telegraf.Metric) ([]byte, error) { @@ -29,8 +30,15 @@ func (s *Serializer) Serialize(metric telegraf.Metric) ([]byte, error) { } func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { - var buf bytes.Buffer + var lastErr error + // traceAndKeepErr logs on Trace level every passed error. + // with each call it updates lastErr, so it can be logged later with higher level. + traceAndKeepErr := func(format string, a ...any) { + lastErr = fmt.Errorf(format, a...) + s.Log.Trace(lastErr) + } + var buf bytes.Buffer var entries = make(map[MetricKey]prompb.TimeSeries) var labels = make([]prompb.Label, 0) for _, metric := range metrics { @@ -41,6 +49,7 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { metricName := prometheus.MetricName(metric.Name(), field.Key, metric.Type()) metricName, ok := prometheus.SanitizeMetricName(metricName) if !ok { + traceAndKeepErr("failed to parse metric name %q", metricName) continue } @@ -52,6 +61,7 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { case telegraf.Untyped: value, ok := prometheus.SampleValue(field.Value) if !ok { + traceAndKeepErr("failed to parse %q: bad sample value %#v", metricName, field.Value) continue } metrickey, promts = getPromTS(metricName, labels, value, metric.Time()) @@ -78,14 +88,17 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { le, ok := metric.GetTag("le") if !ok { + traceAndKeepErr("failed to parse %q: can't find `le` label", metricName) continue } bound, err := strconv.ParseFloat(le, 64) if err != nil { + traceAndKeepErr("failed to parse %q: can't parse %q value: %w", metricName, le, err) continue } count, ok := prometheus.SampleCount(field.Value) if !ok { + traceAndKeepErr("failed to parse %q: bad sample value %#v", metricName, field.Value) continue } @@ -97,6 +110,7 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { case strings.HasSuffix(field.Key, "_sum"): sum, ok := prometheus.SampleSum(field.Value) if !ok { + traceAndKeepErr("failed to parse %q: bad sample value %#v", metricName, field.Value) continue } @@ -104,6 +118,7 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { case strings.HasSuffix(field.Key, "_count"): count, ok := prometheus.SampleCount(field.Value) if !ok { + traceAndKeepErr("failed to parse %q: bad sample value %#v", metricName, field.Value) continue } @@ -119,6 +134,7 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { metrickey, promts = getPromTS(metricName+"_count", labels, float64(count), metric.Time()) default: + traceAndKeepErr("failed to parse %q: series %q should have `_count`, `_sum` or `_bucket` suffix", metricName, field.Key) continue } case telegraf.Summary: @@ -126,6 +142,7 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { case strings.HasSuffix(field.Key, "_sum"): sum, ok := prometheus.SampleSum(field.Value) if !ok { + traceAndKeepErr("failed to parse %q: bad sample value %#v", metricName, field.Value) continue } @@ -133,6 +150,7 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { case strings.HasSuffix(field.Key, "_count"): count, ok := prometheus.SampleCount(field.Value) if !ok { + traceAndKeepErr("failed to parse %q: bad sample value %#v", metricName, field.Value) continue } @@ -140,14 +158,17 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { default: quantileTag, ok := metric.GetTag("quantile") if !ok { + traceAndKeepErr("failed to parse %q: can't find `quantile` label", metricName) continue } quantile, err := strconv.ParseFloat(quantileTag, 64) if err != nil { + traceAndKeepErr("failed to parse %q: can't parse %q value: %w", metricName, quantileTag, err) continue } value, ok := prometheus.SampleValue(field.Value) if !ok { + traceAndKeepErr("failed to parse %q: bad sample value %#v", metricName, field.Value) continue } @@ -162,11 +183,12 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { } // A batch of metrics can contain multiple values for a single - // Prometheus sample. If this metric is older than the existing + // Prometheus sample. If this metric is older than the existing // sample then we can skip over it. m, ok := entries[metrickey] if ok { if metric.Time().Before(time.Unix(0, m.Samples[0].Timestamp*1_000_000)) { + traceAndKeepErr("metric %q has samples with timestamp %v older than already registered before", metric.Name(), metric.Time()) continue } } @@ -174,6 +196,12 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { } } + if lastErr != nil { + // log only the last recorded error in the batch, as it could have many errors and logging each one + // could be too verbose. The following log line still provides enough info for user to act on. + s.Log.Errorf("some series were dropped, %d series left to send; last recorded error: %v", len(entries), lastErr) + } + var promTS = make([]prompb.TimeSeries, len(entries)) var i int for _, promts := range entries { diff --git a/plugins/serializers/prometheusremotewrite/prometheusremotewrite_test.go b/plugins/serializers/prometheusremotewrite/prometheusremotewrite_test.go index 3c3ba2bb34ad0..1eac455c99f3a 100644 --- a/plugins/serializers/prometheusremotewrite/prometheusremotewrite_test.go +++ b/plugins/serializers/prometheusremotewrite/prometheusremotewrite_test.go @@ -33,7 +33,7 @@ func BenchmarkRemoteWrite(b *testing.B) { time.Unix(0, 0), ) } - s := &Serializer{} + s := &Serializer{Log: &testutil.CaptureLogger{}} for n := 0; n < b.N; n++ { //nolint:errcheck // Benchmarking so skip the error check to avoid the unnecessary operations s.SerializeBatch(batch) @@ -188,6 +188,7 @@ http_request_duration_seconds_bucket{le="0.5"} 129389 for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { s := &Serializer{ + Log: &testutil.CaptureLogger{}, SortMetrics: true, } data, err := s.Serialize(tt.metric) @@ -201,6 +202,83 @@ http_request_duration_seconds_bucket{le="0.5"} 129389 } } +func TestRemoteWriteSerializeNegative(t *testing.T) { + clog := &testutil.CaptureLogger{} + s := &Serializer{Log: clog} + + assert := func(msg string, err error) { + t.Helper() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + lastMsg := clog.LastError() + if lastMsg == "" { + t.Fatal("expected non-empty last message") + } + if !strings.Contains(lastMsg, msg) { + t.Fatalf("expected to have log message %q; got %q instead", msg, lastMsg) + } + // reset logger so it can be reused again + clog.Clear() + } + + m := testutil.MustMetric("@@!!", nil, map[string]interface{}{"!!": "@@"}, time.Unix(0, 0)) + _, err := s.Serialize(m) + assert("failed to parse metric name", err) + + m = testutil.MustMetric("prometheus", nil, + map[string]interface{}{ + "http_requests_total": "asd", + }, + time.Unix(0, 0), + ) + _, err = s.Serialize(m) + assert("bad sample", err) + + m = testutil.MustMetric( + "prometheus", + map[string]string{ + "le": "0.5", + }, + map[string]interface{}{ + "http_request_duration_seconds_bucket": "asd", + }, + time.Unix(0, 0), + telegraf.Histogram, + ) + _, err = s.Serialize(m) + assert("bad sample", err) + + m = testutil.MustMetric( + "prometheus", + map[string]string{ + "code": "400", + "method": "post", + }, + map[string]interface{}{ + "http_requests_total": 3.0, + "http_requests_errors_total": "3.0", + }, + time.Unix(0, 0), + telegraf.Gauge, + ) + _, err = s.Serialize(m) + assert("bad sample", err) + + m = testutil.MustMetric( + "prometheus", + map[string]string{"quantile": "0.01a"}, + map[string]interface{}{ + "rpc_duration_seconds": 3102.0, + }, + time.Unix(0, 0), + telegraf.Summary, + ) + _, err = s.Serialize(m) + assert("failed to parse", err) +} + func TestRemoteWriteSerializeBatch(t *testing.T) { tests := []struct { name string @@ -679,6 +757,7 @@ rpc_duration_seconds_sum 17560473 for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { s := &Serializer{ + Log: &testutil.CaptureLogger{}, SortMetrics: true, StringAsLabel: tt.stringAsLabel, } @@ -733,7 +812,7 @@ func protoToSamples(req *prompb.WriteRequest) model.Samples { } func BenchmarkSerialize(b *testing.B) { - s := &Serializer{} + s := &Serializer{Log: &testutil.CaptureLogger{}} metrics := serializers.BenchmarkMetrics(b) b.ResetTimer() for i := 0; i < b.N; i++ { @@ -743,7 +822,7 @@ func BenchmarkSerialize(b *testing.B) { } func BenchmarkSerializeBatch(b *testing.B) { - s := &Serializer{} + s := &Serializer{Log: &testutil.CaptureLogger{}} m := serializers.BenchmarkMetrics(b) metrics := m[:] b.ResetTimer()