Skip to content

Commit

Permalink
statsd: aggregate metrics from a chunk in batch
Browse files Browse the repository at this point in the history
Process all metrics in a chunk together.

This is necessary for custom Prometheus-like histogram metrics. For
example, when creating an `http_response_duration_seconds` metric with
buckets 1...100 and a flush interval in Telegraf set to 30 seconds,
the old version of the code would process each sample separately. If a
flush interval occurred, only the processed metrics would be flushed
with the current timestamp. The next metrics would be flushed in the
following interval with a different timestamp (+30s), causing issues
with histogram calculation. In the current version, all samples are
processed together.
  • Loading branch information
ezotrank committed Sep 29, 2024
1 parent 6400904 commit af7d428
Showing 1 changed file with 104 additions and 99 deletions.
203 changes: 104 additions & 99 deletions plugins/inputs/statsd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,7 @@ func (s *Statsd) parser() error {
start := time.Now()
lines := strings.Split(in.Buffer.String(), "\n")
s.bufPool.Put(in.Buffer)
metrics := make([]*metric, 0, len(lines))
for _, line := range lines {
line = strings.TrimSpace(line)
switch {
Expand All @@ -542,15 +543,18 @@ func (s *Statsd) parser() error {
s.Log.Debugf(" line was: %s", line)
}
default:
if err := s.parseStatsdLine(line); err != nil {
m, err := s.parseStatsdLine(line)
if err != nil {
if !errors.Is(err, errParsing) {
// Ignore parsing errors but error out on
// everything else...
return err
}
}
metrics = append(metrics, m)
}
}
s.aggregate(metrics...)
elapsed := time.Since(start)
s.ParseTimeNS.Set(elapsed.Nanoseconds())
}
Expand All @@ -559,7 +563,7 @@ func (s *Statsd) parser() error {

// parseStatsdLine will parse the given statsd line, validating it as it goes.
// If the line is valid, it will be cached for the next call to Gather()
func (s *Statsd) parseStatsdLine(line string) error {
func (s *Statsd) parseStatsdLine(line string) (*metric, error) {
lineTags := make(map[string]string)
if s.DataDogExtensions {
recombinedSegments := make([]string, 0)
Expand All @@ -584,7 +588,7 @@ func (s *Statsd) parseStatsdLine(line string) error {
bits := strings.Split(line, ":")
if len(bits) < 2 {
s.Log.Errorf("Splitting ':', unable to parse metric: %s", line)
return errParsing
return nil, errParsing
}

// Extract bucket name from individual metric bits
Expand All @@ -600,7 +604,7 @@ func (s *Statsd) parseStatsdLine(line string) error {
pipesplit := strings.Split(bit, "|")
if len(pipesplit) < 2 {
s.Log.Errorf("Splitting '|', unable to parse metric: %s", line)
return errParsing
return nil, errParsing
} else if len(pipesplit) > 2 {
sr := pipesplit[2]

Expand All @@ -624,14 +628,14 @@ func (s *Statsd) parseStatsdLine(line string) error {
m.mtype = pipesplit[1]
default:
s.Log.Errorf("Metric type %q unsupported", pipesplit[1])
return errParsing
return nil, errParsing
}

// Parse the value
if strings.HasPrefix(pipesplit[0], "-") || strings.HasPrefix(pipesplit[0], "+") {
if m.mtype != "g" && m.mtype != "c" {
s.Log.Errorf("+- values are only supported for gauges & counters, unable to parse metric: %s", line)
return errParsing
return nil, errParsing
}
m.additive = true
}
Expand All @@ -641,7 +645,7 @@ func (s *Statsd) parseStatsdLine(line string) error {
v, err := strconv.ParseFloat(pipesplit[0], 64)
if err != nil {
s.Log.Errorf("Parsing value to float64, unable to parse metric: %s", line)
return errParsing
return nil, errParsing
}
m.floatvalue = v
case "c":
Expand All @@ -651,7 +655,7 @@ func (s *Statsd) parseStatsdLine(line string) error {
v2, err2 := strconv.ParseFloat(pipesplit[0], 64)
if err2 != nil {
s.Log.Errorf("Parsing value to int64, unable to parse metric: %s", line)
return errParsing
return nil, errParsing
}
v = int64(v2)
}
Expand Down Expand Up @@ -702,11 +706,10 @@ func (s *Statsd) parseStatsdLine(line string) error {
sort.Strings(tg)
tg = append(tg, m.name)
m.hash = strings.Join(tg, "")

s.aggregate(m)
return &m, nil
}

return nil
return nil, nil
}

// parseName parses the given bucket name with the list of bucket maps in the
Expand Down Expand Up @@ -790,107 +793,109 @@ func parseKeyValue(keyValue string) (key string, val string) {
// aggregate takes in a metric. It then
// aggregates and caches the current value(s). It does not deal with the
// Delete* options, because those are dealt with in the Gather function.
func (s *Statsd) aggregate(m metric) {
func (s *Statsd) aggregate(metrics ...*metric) {
s.Lock()
defer s.Unlock()

switch m.mtype {
case "d":
if s.DataDogExtensions && s.DataDogDistributions {
cached := cacheddistributions{
name: m.name,
value: m.floatvalue,
tags: m.tags,
for _, m := range metrics {
switch m.mtype {
case "d":
if s.DataDogExtensions && s.DataDogDistributions {
cached := cacheddistributions{
name: m.name,
value: m.floatvalue,
tags: m.tags,
}
s.distributions = append(s.distributions, cached)
}
s.distributions = append(s.distributions, cached)
}
case "ms", "h":
// Check if the measurement exists
cached, ok := s.timings[m.hash]
if !ok {
cached = cachedtimings{
name: m.name,
fields: make(map[string]RunningStats),
tags: m.tags,
case "ms", "h":
// Check if the measurement exists
cached, ok := s.timings[m.hash]
if !ok {
cached = cachedtimings{
name: m.name,
fields: make(map[string]RunningStats),
tags: m.tags,
}
}
}
// Check if the field exists. If we've not enabled multiple fields per timer
// this will be the default field name, eg. "value"
field, ok := cached.fields[m.field]
if !ok {
field = RunningStats{
PercLimit: s.PercentileLimit,
// Check if the field exists. If we've not enabled multiple fields per timer
// this will be the default field name, eg. "value"
field, ok := cached.fields[m.field]
if !ok {
field = RunningStats{
PercLimit: s.PercentileLimit,
}
}
}
if m.samplerate > 0 {
for i := 0; i < int(1.0/m.samplerate); i++ {
if m.samplerate > 0 {
for i := 0; i < int(1.0/m.samplerate); i++ {
field.AddValue(m.floatvalue)
}
} else {
field.AddValue(m.floatvalue)
}
} else {
field.AddValue(m.floatvalue)
}
cached.fields[m.field] = field
cached.expiresAt = time.Now().Add(time.Duration(s.MaxTTL))
s.timings[m.hash] = cached
case "c":
// check if the measurement exists
cached, ok := s.counters[m.hash]
if !ok {
cached = cachedcounter{
name: m.name,
fields: make(map[string]interface{}),
tags: m.tags,
cached.fields[m.field] = field
cached.expiresAt = time.Now().Add(time.Duration(s.MaxTTL))
s.timings[m.hash] = cached
case "c":
// check if the measurement exists
cached, ok := s.counters[m.hash]
if !ok {
cached = cachedcounter{
name: m.name,
fields: make(map[string]interface{}),
tags: m.tags,
}
}
}
// check if the field exists
_, ok = cached.fields[m.field]
if !ok {
cached.fields[m.field] = int64(0)
}
cached.fields[m.field] = cached.fields[m.field].(int64) + m.intvalue
cached.expiresAt = time.Now().Add(time.Duration(s.MaxTTL))
s.counters[m.hash] = cached
case "g":
// check if the measurement exists
cached, ok := s.gauges[m.hash]
if !ok {
cached = cachedgauge{
name: m.name,
fields: make(map[string]interface{}),
tags: m.tags,
// check if the field exists
_, ok = cached.fields[m.field]
if !ok {
cached.fields[m.field] = int64(0)
}
cached.fields[m.field] = cached.fields[m.field].(int64) + m.intvalue
cached.expiresAt = time.Now().Add(time.Duration(s.MaxTTL))
s.counters[m.hash] = cached
case "g":
// check if the measurement exists
cached, ok := s.gauges[m.hash]
if !ok {
cached = cachedgauge{
name: m.name,
fields: make(map[string]interface{}),
tags: m.tags,
}
}
// check if the field exists
_, ok = cached.fields[m.field]
if !ok {
cached.fields[m.field] = float64(0)
}
if m.additive {
cached.fields[m.field] = cached.fields[m.field].(float64) + m.floatvalue
} else {
cached.fields[m.field] = m.floatvalue
}
}
// check if the field exists
_, ok = cached.fields[m.field]
if !ok {
cached.fields[m.field] = float64(0)
}
if m.additive {
cached.fields[m.field] = cached.fields[m.field].(float64) + m.floatvalue
} else {
cached.fields[m.field] = m.floatvalue
}

cached.expiresAt = time.Now().Add(time.Duration(s.MaxTTL))
s.gauges[m.hash] = cached
case "s":
// check if the measurement exists
cached, ok := s.sets[m.hash]
if !ok {
cached = cachedset{
name: m.name,
fields: make(map[string]map[string]bool),
tags: m.tags,
cached.expiresAt = time.Now().Add(time.Duration(s.MaxTTL))
s.gauges[m.hash] = cached
case "s":
// check if the measurement exists
cached, ok := s.sets[m.hash]
if !ok {
cached = cachedset{
name: m.name,
fields: make(map[string]map[string]bool),
tags: m.tags,
}
}
// check if the field exists
_, ok = cached.fields[m.field]
if !ok {
cached.fields[m.field] = make(map[string]bool)
}
cached.fields[m.field][m.strvalue] = true
cached.expiresAt = time.Now().Add(time.Duration(s.MaxTTL))
s.sets[m.hash] = cached
}
// check if the field exists
_, ok = cached.fields[m.field]
if !ok {
cached.fields[m.field] = make(map[string]bool)
}
cached.fields[m.field][m.strvalue] = true
cached.expiresAt = time.Now().Add(time.Duration(s.MaxTTL))
s.sets[m.hash] = cached
}
}

Expand Down

0 comments on commit af7d428

Please sign in to comment.