From af7d428c37565b2e3d2927774b9150a43fc1b9f6 Mon Sep 17 00:00:00 2001 From: Maksim Kremenev Date: Sun, 29 Sep 2024 09:32:26 +0200 Subject: [PATCH] statsd: aggregate metrics from a chunk in batch Process all metrics in a chunk together. This is necessary for custom Prometheus-like histogram metrics. For example, when creating an `http_response_duration_seconds` metric with buckets 1...100 and a flush interval in Telegraf set to 30 seconds, the old version of the code would process each sample separately. If a flush interval occurred, only the processed metrics would be flushed with the current timestamp. The next metrics would be flushed in the following interval with a different timestamp (+30s), causing issues with histogram calculation. In the current version, all samples are processed together. --- plugins/inputs/statsd/statsd.go | 203 ++++++++++++++++---------------- 1 file changed, 104 insertions(+), 99 deletions(-) diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go index 7cbf7df319c8d..ef87860c6caf3 100644 --- a/plugins/inputs/statsd/statsd.go +++ b/plugins/inputs/statsd/statsd.go @@ -529,6 +529,7 @@ func (s *Statsd) parser() error { start := time.Now() lines := strings.Split(in.Buffer.String(), "\n") s.bufPool.Put(in.Buffer) + metrics := make([]*metric, 0, len(lines)) for _, line := range lines { line = strings.TrimSpace(line) switch { @@ -542,15 +543,18 @@ func (s *Statsd) parser() error { s.Log.Debugf(" line was: %s", line) } default: - if err := s.parseStatsdLine(line); err != nil { + m, err := s.parseStatsdLine(line) + if err != nil { if !errors.Is(err, errParsing) { // Ignore parsing errors but error out on // everything else... return err } } + metrics = append(metrics, m) } } + s.aggregate(metrics...) elapsed := time.Since(start) s.ParseTimeNS.Set(elapsed.Nanoseconds()) } @@ -559,7 +563,7 @@ func (s *Statsd) parser() error { // parseStatsdLine will parse the given statsd line, validating it as it goes. // If the line is valid, it will be cached for the next call to Gather() -func (s *Statsd) parseStatsdLine(line string) error { +func (s *Statsd) parseStatsdLine(line string) (*metric, error) { lineTags := make(map[string]string) if s.DataDogExtensions { recombinedSegments := make([]string, 0) @@ -584,7 +588,7 @@ func (s *Statsd) parseStatsdLine(line string) error { bits := strings.Split(line, ":") if len(bits) < 2 { s.Log.Errorf("Splitting ':', unable to parse metric: %s", line) - return errParsing + return nil, errParsing } // Extract bucket name from individual metric bits @@ -600,7 +604,7 @@ func (s *Statsd) parseStatsdLine(line string) error { pipesplit := strings.Split(bit, "|") if len(pipesplit) < 2 { s.Log.Errorf("Splitting '|', unable to parse metric: %s", line) - return errParsing + return nil, errParsing } else if len(pipesplit) > 2 { sr := pipesplit[2] @@ -624,14 +628,14 @@ func (s *Statsd) parseStatsdLine(line string) error { m.mtype = pipesplit[1] default: s.Log.Errorf("Metric type %q unsupported", pipesplit[1]) - return errParsing + return nil, errParsing } // Parse the value if strings.HasPrefix(pipesplit[0], "-") || strings.HasPrefix(pipesplit[0], "+") { if m.mtype != "g" && m.mtype != "c" { s.Log.Errorf("+- values are only supported for gauges & counters, unable to parse metric: %s", line) - return errParsing + return nil, errParsing } m.additive = true } @@ -641,7 +645,7 @@ func (s *Statsd) parseStatsdLine(line string) error { v, err := strconv.ParseFloat(pipesplit[0], 64) if err != nil { s.Log.Errorf("Parsing value to float64, unable to parse metric: %s", line) - return errParsing + return nil, errParsing } m.floatvalue = v case "c": @@ -651,7 +655,7 @@ func (s *Statsd) parseStatsdLine(line string) error { v2, err2 := strconv.ParseFloat(pipesplit[0], 64) if err2 != nil { s.Log.Errorf("Parsing value to int64, unable to parse metric: %s", line) - return errParsing + return nil, errParsing } v = int64(v2) } @@ -702,11 +706,10 @@ func (s *Statsd) parseStatsdLine(line string) error { sort.Strings(tg) tg = append(tg, m.name) m.hash = strings.Join(tg, "") - - s.aggregate(m) + return &m, nil } - return nil + return nil, nil } // parseName parses the given bucket name with the list of bucket maps in the @@ -790,107 +793,109 @@ func parseKeyValue(keyValue string) (key string, val string) { // aggregate takes in a metric. It then // aggregates and caches the current value(s). It does not deal with the // Delete* options, because those are dealt with in the Gather function. -func (s *Statsd) aggregate(m metric) { +func (s *Statsd) aggregate(metrics ...*metric) { s.Lock() defer s.Unlock() - switch m.mtype { - case "d": - if s.DataDogExtensions && s.DataDogDistributions { - cached := cacheddistributions{ - name: m.name, - value: m.floatvalue, - tags: m.tags, + for _, m := range metrics { + switch m.mtype { + case "d": + if s.DataDogExtensions && s.DataDogDistributions { + cached := cacheddistributions{ + name: m.name, + value: m.floatvalue, + tags: m.tags, + } + s.distributions = append(s.distributions, cached) } - s.distributions = append(s.distributions, cached) - } - case "ms", "h": - // Check if the measurement exists - cached, ok := s.timings[m.hash] - if !ok { - cached = cachedtimings{ - name: m.name, - fields: make(map[string]RunningStats), - tags: m.tags, + case "ms", "h": + // Check if the measurement exists + cached, ok := s.timings[m.hash] + if !ok { + cached = cachedtimings{ + name: m.name, + fields: make(map[string]RunningStats), + tags: m.tags, + } } - } - // Check if the field exists. If we've not enabled multiple fields per timer - // this will be the default field name, eg. "value" - field, ok := cached.fields[m.field] - if !ok { - field = RunningStats{ - PercLimit: s.PercentileLimit, + // Check if the field exists. If we've not enabled multiple fields per timer + // this will be the default field name, eg. "value" + field, ok := cached.fields[m.field] + if !ok { + field = RunningStats{ + PercLimit: s.PercentileLimit, + } } - } - if m.samplerate > 0 { - for i := 0; i < int(1.0/m.samplerate); i++ { + if m.samplerate > 0 { + for i := 0; i < int(1.0/m.samplerate); i++ { + field.AddValue(m.floatvalue) + } + } else { field.AddValue(m.floatvalue) } - } else { - field.AddValue(m.floatvalue) - } - cached.fields[m.field] = field - cached.expiresAt = time.Now().Add(time.Duration(s.MaxTTL)) - s.timings[m.hash] = cached - case "c": - // check if the measurement exists - cached, ok := s.counters[m.hash] - if !ok { - cached = cachedcounter{ - name: m.name, - fields: make(map[string]interface{}), - tags: m.tags, + cached.fields[m.field] = field + cached.expiresAt = time.Now().Add(time.Duration(s.MaxTTL)) + s.timings[m.hash] = cached + case "c": + // check if the measurement exists + cached, ok := s.counters[m.hash] + if !ok { + cached = cachedcounter{ + name: m.name, + fields: make(map[string]interface{}), + tags: m.tags, + } } - } - // check if the field exists - _, ok = cached.fields[m.field] - if !ok { - cached.fields[m.field] = int64(0) - } - cached.fields[m.field] = cached.fields[m.field].(int64) + m.intvalue - cached.expiresAt = time.Now().Add(time.Duration(s.MaxTTL)) - s.counters[m.hash] = cached - case "g": - // check if the measurement exists - cached, ok := s.gauges[m.hash] - if !ok { - cached = cachedgauge{ - name: m.name, - fields: make(map[string]interface{}), - tags: m.tags, + // check if the field exists + _, ok = cached.fields[m.field] + if !ok { + cached.fields[m.field] = int64(0) + } + cached.fields[m.field] = cached.fields[m.field].(int64) + m.intvalue + cached.expiresAt = time.Now().Add(time.Duration(s.MaxTTL)) + s.counters[m.hash] = cached + case "g": + // check if the measurement exists + cached, ok := s.gauges[m.hash] + if !ok { + cached = cachedgauge{ + name: m.name, + fields: make(map[string]interface{}), + tags: m.tags, + } + } + // check if the field exists + _, ok = cached.fields[m.field] + if !ok { + cached.fields[m.field] = float64(0) + } + if m.additive { + cached.fields[m.field] = cached.fields[m.field].(float64) + m.floatvalue + } else { + cached.fields[m.field] = m.floatvalue } - } - // check if the field exists - _, ok = cached.fields[m.field] - if !ok { - cached.fields[m.field] = float64(0) - } - if m.additive { - cached.fields[m.field] = cached.fields[m.field].(float64) + m.floatvalue - } else { - cached.fields[m.field] = m.floatvalue - } - cached.expiresAt = time.Now().Add(time.Duration(s.MaxTTL)) - s.gauges[m.hash] = cached - case "s": - // check if the measurement exists - cached, ok := s.sets[m.hash] - if !ok { - cached = cachedset{ - name: m.name, - fields: make(map[string]map[string]bool), - tags: m.tags, + cached.expiresAt = time.Now().Add(time.Duration(s.MaxTTL)) + s.gauges[m.hash] = cached + case "s": + // check if the measurement exists + cached, ok := s.sets[m.hash] + if !ok { + cached = cachedset{ + name: m.name, + fields: make(map[string]map[string]bool), + tags: m.tags, + } } + // check if the field exists + _, ok = cached.fields[m.field] + if !ok { + cached.fields[m.field] = make(map[string]bool) + } + cached.fields[m.field][m.strvalue] = true + cached.expiresAt = time.Now().Add(time.Duration(s.MaxTTL)) + s.sets[m.hash] = cached } - // check if the field exists - _, ok = cached.fields[m.field] - if !ok { - cached.fields[m.field] = make(map[string]bool) - } - cached.fields[m.field][m.strvalue] = true - cached.expiresAt = time.Now().Add(time.Duration(s.MaxTTL)) - s.sets[m.hash] = cached } }