Skip to content

Commit

Permalink
fix(plugins.input.redis): Discard invalid errorstat lines (influxdata…
Browse files Browse the repository at this point in the history
…#15311)

Some badly-behaved sources (such as dragonfly) may send error messages
which are not in the expected format. If errorstat lines are found
which do not include an `=` sign, silently discard them.

Somewhere between 1.26 and 1.28.5, a change was made which caused such
missing '=' symbols to cause a panic.

In addition to ignoring such entries, add debug-level logging for this
and a similar problem, where the errorstat values are not base-10.
  • Loading branch information
nicois committed May 7, 2024
1 parent 46fb0af commit f0f9981
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 14 deletions.
22 changes: 15 additions & 7 deletions plugins/inputs/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,14 +351,15 @@ func (r *Redis) gatherServer(client Client, acc telegraf.Accumulator) error {
}

rdr := strings.NewReader(info)
return gatherInfoOutput(rdr, acc, client.BaseTags())
return gatherInfoOutput(rdr, acc, client.BaseTags(), r.Log)
}

// gatherInfoOutput gathers
func gatherInfoOutput(
rdr io.Reader,
acc telegraf.Accumulator,
tags map[string]string,
log telegraf.Logger,
) error {
var section string
var keyspaceHits, keyspaceMisses int64
Expand Down Expand Up @@ -427,7 +428,7 @@ func gatherInfoOutput(
}
if section == "Errorstats" {
kline := strings.TrimSpace(parts[1])
gatherErrorstatsLine(name, kline, acc, tags)
gatherErrorstatsLine(name, kline, acc, tags, log)
continue
}

Expand Down Expand Up @@ -658,17 +659,24 @@ func gatherErrorstatsLine(
line string,
acc telegraf.Accumulator,
globalTags map[string]string,
log telegraf.Logger,
) {
tags := make(map[string]string, len(globalTags)+1)
for k, v := range globalTags {
tags[k] = v
}
tags["err"] = strings.TrimPrefix(name, "errorstat_")
kv := strings.Split(line, "=")
ival, err := strconv.ParseInt(kv[1], 10, 64)
if err == nil {
fields := map[string]interface{}{"total": ival}
acc.AddFields("redis_errorstat", fields, tags)
if strings.Contains(line, "=") {
kv := strings.Split(line, "=")
ival, err := strconv.ParseInt(kv[1], 10, 64)
if err == nil {
fields := map[string]interface{}{"total": ival}
acc.AddFields("redis_errorstat", fields, tags)
} else if log != nil {
log.Debugf("Error while parsing %v: %v", line, err)
}
} else if log != nil {
log.Debugf("Missing '=' in line %v (name:%v).", line, name)
}
}

Expand Down
33 changes: 26 additions & 7 deletions plugins/inputs/redis/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ import (
"github.com/influxdata/telegraf/testutil"
)

type testClient struct {
}
type testClient struct{}

func (t *testClient) BaseTags() map[string]string {
return map[string]string{"host": "redis.net"}
Expand Down Expand Up @@ -94,7 +93,7 @@ func TestRedis_ParseMetrics(t *testing.T) {
tags := map[string]string{"host": "redis.net"}
rdr := bufio.NewReader(strings.NewReader(testOutput))

err := gatherInfoOutput(rdr, &acc, tags)
err := gatherInfoOutput(rdr, &acc, tags, nil)
require.NoError(t, err)

tags = map[string]string{"host": "redis.net", "replication_role": "master"}
Expand Down Expand Up @@ -314,7 +313,7 @@ func TestRedis_ParseFloatOnInts(t *testing.T) {
var acc testutil.Accumulator
tags := map[string]string{"host": "redis.net"}
rdr := bufio.NewReader(strings.NewReader(strings.Replace(testOutput, "mem_fragmentation_ratio:0.81", "mem_fragmentation_ratio:1", 1)))
err := gatherInfoOutput(rdr, &acc, tags)
err := gatherInfoOutput(rdr, &acc, tags, nil)
require.NoError(t, err)
var m *testutil.Metric
for i := range acc.Metrics {
Expand All @@ -333,7 +332,7 @@ func TestRedis_ParseIntOnFloats(t *testing.T) {
var acc testutil.Accumulator
tags := map[string]string{"host": "redis.net"}
rdr := bufio.NewReader(strings.NewReader(strings.Replace(testOutput, "clients_in_timeout_table:0", "clients_in_timeout_table:0.0", 1)))
err := gatherInfoOutput(rdr, &acc, tags)
err := gatherInfoOutput(rdr, &acc, tags, nil)
require.NoError(t, err)
var m *testutil.Metric
for i := range acc.Metrics {
Expand All @@ -352,7 +351,7 @@ func TestRedis_ParseStringOnInts(t *testing.T) {
var acc testutil.Accumulator
tags := map[string]string{"host": "redis.net"}
rdr := bufio.NewReader(strings.NewReader(strings.Replace(testOutput, "maxmemory_policy:no-eviction", "maxmemory_policy:1", 1)))
err := gatherInfoOutput(rdr, &acc, tags)
err := gatherInfoOutput(rdr, &acc, tags, nil)
require.NoError(t, err)
var m *testutil.Metric
for i := range acc.Metrics {
Expand All @@ -371,7 +370,7 @@ func TestRedis_ParseIntOnString(t *testing.T) {
var acc testutil.Accumulator
tags := map[string]string{"host": "redis.net"}
rdr := bufio.NewReader(strings.NewReader(strings.Replace(testOutput, "clients_in_timeout_table:0", `clients_in_timeout_table:""`, 1)))
err := gatherInfoOutput(rdr, &acc, tags)
err := gatherInfoOutput(rdr, &acc, tags, nil)
require.NoError(t, err)
var m *testutil.Metric
for i := range acc.Metrics {
Expand All @@ -386,6 +385,26 @@ func TestRedis_ParseIntOnString(t *testing.T) {
require.IsType(t, int64(0), clientsInTimeout)
}

func TestRedis_GatherErrorstatsLine(t *testing.T) {
var acc testutil.Accumulator
var mockLogger testutil.CaptureLogger
globalTags := map[string]string{}

mockLogger.Clear()
gatherErrorstatsLine("FOO", "BAR", &acc, globalTags, &mockLogger)
require.Equal(t, 1, mockLogger.NMessages())
require.Equal(t, "Missing '=' in line BAR (name:FOO).", mockLogger.Messages()[0].Text)

mockLogger.Clear()
gatherErrorstatsLine("FOO", "BAR=a", &acc, globalTags, &mockLogger)
require.Equal(t, 1, mockLogger.NMessages())
require.Equal(t, "Error while parsing BAR=a: strconv.ParseInt: parsing \"a\": invalid syntax", mockLogger.Messages()[0].Text)

mockLogger.Clear()
gatherErrorstatsLine("FOO", "BAR=77", &acc, globalTags, &mockLogger)
require.Equal(t, 0, mockLogger.NMessages())
}

const testOutput = `# Server
redis_version:6.0.9
redis_git_sha1:00000000
Expand Down

0 comments on commit f0f9981

Please sign in to comment.