From 275a9fdd1d2319808778f6a50d9b848b551669a6 Mon Sep 17 00:00:00 2001 From: Sven Rebhan Date: Mon, 14 Oct 2024 17:21:21 +0200 Subject: [PATCH] fix(outputs.syslog): Trim field-names belonging to explicit SDIDs correctly --- plugins/outputs/syslog/syslog.go | 9 +- plugins/outputs/syslog/syslog_mapper.go | 19 ++- plugins/outputs/syslog/syslog_test.go | 156 ++++++++++++++++++ .../syslog/testcases/issue_16012/expected.out | 1 + .../syslog/testcases/issue_16012/input.influx | 1 + .../testcases/issue_16012/telegraf.conf | 4 + 6 files changed, 178 insertions(+), 12 deletions(-) create mode 100644 plugins/outputs/syslog/testcases/issue_16012/expected.out create mode 100644 plugins/outputs/syslog/testcases/issue_16012/input.influx create mode 100644 plugins/outputs/syslog/testcases/issue_16012/telegraf.conf diff --git a/plugins/outputs/syslog/syslog.go b/plugins/outputs/syslog/syslog.go index fc599fce8a413..80dfda42c3fb1 100644 --- a/plugins/outputs/syslog/syslog.go +++ b/plugins/outputs/syslog/syslog.go @@ -121,13 +121,14 @@ func (s *Syslog) Write(metrics []telegraf.Metric) (err error) { } } for _, metric := range metrics { - var msg *rfc5424.SyslogMessage - if msg, err = s.mapper.MapMetricToSyslogMessage(metric); err != nil { + msg, err := s.mapper.MapMetricToSyslogMessage(metric) + if err != nil { s.Log.Errorf("Failed to create syslog message: %v", err) continue } - var msgBytesWithFraming []byte - if msgBytesWithFraming, err = s.getSyslogMessageBytesWithFraming(msg); err != nil { + + msgBytesWithFraming, err := s.getSyslogMessageBytesWithFraming(msg) + if err != nil { s.Log.Errorf("Failed to convert syslog message with framing: %v", err) continue } diff --git a/plugins/outputs/syslog/syslog_mapper.go b/plugins/outputs/syslog/syslog_mapper.go index 07a93248feea0..3e60d42a19599 100644 --- a/plugins/outputs/syslog/syslog_mapper.go +++ b/plugins/outputs/syslog/syslog_mapper.go @@ -53,22 +53,25 @@ func (sm *SyslogMapper) mapStructuredData(metric telegraf.Metric, msg *rfc5424.S } func (sm *SyslogMapper) mapStructuredDataItem(key, value string, msg *rfc5424.SyslogMessage) { + // Do not add already reserved keys if sm.reservedKeys[key] { return } - isExplicitSdid := false + + // Add keys matching one of the sd-IDs for _, sdid := range sm.Sdids { - k := strings.TrimLeft(key, sdid+sm.Separator) - if len(key) > len(k) { - isExplicitSdid = true + if k := strings.TrimPrefix(key, sdid+sm.Separator); key != k { msg.SetParameter(sdid, k, value) - break + return } } - if !isExplicitSdid && len(sm.DefaultSdid) > 0 { - k := strings.TrimPrefix(key, sm.DefaultSdid+sm.Separator) - msg.SetParameter(sm.DefaultSdid, k, value) + + // Add remaining keys with the default sd-ID if configured + if sm.DefaultSdid == "" { + return } + k := strings.TrimPrefix(key, sm.DefaultSdid+sm.Separator) + msg.SetParameter(sm.DefaultSdid, k, value) } func (sm *SyslogMapper) mapAppname(metric telegraf.Metric, msg *rfc5424.SyslogMessage) { diff --git a/plugins/outputs/syslog/syslog_test.go b/plugins/outputs/syslog/syslog_test.go index 6e98372dc45c1..40dbe0a1cafcb 100644 --- a/plugins/outputs/syslog/syslog_test.go +++ b/plugins/outputs/syslog/syslog_test.go @@ -1,7 +1,10 @@ package syslog import ( + "bytes" "net" + "os" + "path/filepath" "sync" "testing" "time" @@ -9,9 +12,12 @@ import ( "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/models" + "github.com/influxdata/telegraf/plugins/outputs" + "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/testutil" "github.com/leodido/go-syslog/v4/nontransparent" ) @@ -428,3 +434,153 @@ func TestStartupErrorBehaviorRetry(t *testing.T) { wg.Wait() require.NotEmpty(t, string(buf)) } + +func TestCases(t *testing.T) { + // Get all testcase directories + folders, err := os.ReadDir("testcases") + require.NoError(t, err) + + // Register the plugin + outputs.Add("syslog", func() telegraf.Output { return newSyslog() }) + + for _, f := range folders { + // Only handle folders + if !f.IsDir() { + continue + } + + t.Run(f.Name(), func(t *testing.T) { + testcasePath := filepath.Join("testcases", f.Name()) + configFilename := filepath.Join(testcasePath, "telegraf.conf") + inputFilename := filepath.Join(testcasePath, "input.influx") + expectedFilename := filepath.Join(testcasePath, "expected.out") + expectedErrorFilename := filepath.Join(testcasePath, "expected.err") + + // Get parser to parse input and expected output + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + + // Load the input data + input, err := testutil.ParseMetricsFromFile(inputFilename, parser) + require.NoError(t, err) + + // Read the expected output if any + var expected []byte + if _, err := os.Stat(expectedFilename); err == nil { + expected, err = os.ReadFile(expectedFilename) + require.NoError(t, err) + } + + // Read the expected output if any + var expectedError string + if _, err := os.Stat(expectedErrorFilename); err == nil { + expectedErrors, err := testutil.ParseLinesFromFile(expectedErrorFilename) + require.NoError(t, err) + require.Len(t, expectedErrors, 1) + expectedError = expectedErrors[0] + } + + // Configure the plugin + cfg := config.NewConfig() + require.NoError(t, cfg.LoadConfig(configFilename)) + require.Len(t, cfg.Outputs, 1) + + // Create a mock-server to receive the data + server, err := newMockServer() + require.NoError(t, err) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + server.listen() + }() + defer server.close() + + // Setup the plugin + plugin := cfg.Outputs[0].Output.(*Syslog) + plugin.Address = "udp://" + server.address() + plugin.Log = testutil.Logger{} + require.NoError(t, plugin.Init()) + require.NoError(t, plugin.Connect()) + defer plugin.Close() + + // Write the data and wait for it to arrive + err = plugin.Write(input) + if expectedError != "" { + require.ErrorContains(t, err, expectedError) + return + } + require.NoError(t, err) + require.NoError(t, plugin.Close()) + + require.Eventuallyf(t, func() bool { + return server.len() >= len(expected) + }, 3*time.Second, 100*time.Millisecond, "received %q", server.message()) + + // Check the received data + require.Equal(t, string(expected), server.message()) + }) + } +} + +type mockServer struct { + conn *net.UDPConn + + data bytes.Buffer + err error + + sync.Mutex +} + +func newMockServer() (*mockServer, error) { + addr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0") + if err != nil { + return nil, err + } + + conn, err := net.ListenUDP("udp", addr) + if err != nil { + return nil, err + } + + return &mockServer{conn: conn}, nil +} + +func (s *mockServer) address() string { + return s.conn.LocalAddr().String() +} + +func (s *mockServer) listen() { + buf := make([]byte, 2048) + for { + n, err := s.conn.Read(buf) + if err != nil { + s.err = err + return + } + s.Lock() + _, _ = s.data.Write(buf[:n]) + s.Unlock() + } +} + +func (s *mockServer) close() error { + if s.conn == nil { + return nil + } + + return s.conn.Close() +} + +func (s *mockServer) message() string { + s.Lock() + defer s.Unlock() + return s.data.String() +} + +func (s *mockServer) len() int { + s.Lock() + defer s.Unlock() + return s.data.Len() +} diff --git a/plugins/outputs/syslog/testcases/issue_16012/expected.out b/plugins/outputs/syslog/testcases/issue_16012/expected.out new file mode 100644 index 0000000000000..d1dfa79ae8748 --- /dev/null +++ b/plugins/outputs/syslog/testcases/issue_16012/expected.out @@ -0,0 +1 @@ +342 <13>1 2024-10-11T21:30:04Z draco Telegraf - scc-change-logs [additional entityUid="6b580296-7199-47b5-9736-9b91329c284e" lastEventDate="2024-10-09T19:26:13Z" status="COMPLETED" uid="544ee602-1f4c-4f5f-bbd2-365d865d78b3"][events action="UPDATE" date="2024-10-09T19:26:08Z" description="Changed ASA Config" diff="" username="user@mydomain.com"] \ No newline at end of file diff --git a/plugins/outputs/syslog/testcases/issue_16012/input.influx b/plugins/outputs/syslog/testcases/issue_16012/input.influx new file mode 100644 index 0000000000000..cf241ae719e82 --- /dev/null +++ b/plugins/outputs/syslog/testcases/issue_16012/input.influx @@ -0,0 +1 @@ +scc-change-logs,host=draco events_description="Changed ASA Config",events_diff="@@ -5,1 +5,1 @@\\n-: Written by lockhart at 18:53:02.210 UTC Tue Oct 8 2024\\n+: Written by lockhart at 19:24:54.048 UTC Wed Oct 9 2024\\n@@ -135,2 +135,0 @@\\n-object network 1.1.1.1\\n-host 1.1.1.1\\n@@ -239,0 +237,2 @@\\n+object network 1.1.1.1\\n+host 1.1.1.1\\n@@ -1108,1 +1108,1 @@\\n-Cryptochecksum:b06f479add1a10f8388a2958d0ee0018\\n+Cryptochecksum:b858dfb10323f3dbc9694a49b8c94168",events_username="user@mydomain.com",events_date="2024-10-09T19:26:08Z",events_action="UPDATE",uid="544ee602-1f4c-4f5f-bbd2-365d865d78b3",status="COMPLETED",lastEventDate="2024-10-09T19:26:13Z",entityUid="6b580296-7199-47b5-9736-9b91329c284e" 1728682204000000000 diff --git a/plugins/outputs/syslog/testcases/issue_16012/telegraf.conf b/plugins/outputs/syslog/testcases/issue_16012/telegraf.conf new file mode 100644 index 0000000000000..b2db24a0510e6 --- /dev/null +++ b/plugins/outputs/syslog/testcases/issue_16012/telegraf.conf @@ -0,0 +1,4 @@ +[[outputs.syslog]] + address = "udp://127.0.0.1:0" + default_sdid = "additional" + sdids = ["events"]