From d330a3057e00292c80720cac9cddada62ef8dc5e Mon Sep 17 00:00:00 2001 From: Dane Strandboge <136023093+DStrand1@users.noreply.github.com> Date: Wed, 2 Oct 2024 10:54:00 -0500 Subject: [PATCH 1/4] fix(agent.buffer): Fix shared wal file between same named plugins --- models/buffer.go | 2 +- models/buffer_disk.go | 9 +++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/models/buffer.go b/models/buffer.go index dc6363cc0747f..f78285f2b130c 100644 --- a/models/buffer.go +++ b/models/buffer.go @@ -58,7 +58,7 @@ func NewBuffer(name string, alias string, capacity int, strategy string, path st case "", "memory": return NewMemoryBuffer(capacity, bs) case "disk": - return NewDiskBuffer(name, path, bs) + return NewDiskBuffer(name, alias, path, bs) } return nil, fmt.Errorf("invalid buffer strategy %q", strategy) } diff --git a/models/buffer_disk.go b/models/buffer_disk.go index dabd377cd9e03..dd69783e20936 100644 --- a/models/buffer_disk.go +++ b/models/buffer_disk.go @@ -33,8 +33,13 @@ type DiskBuffer struct { isEmpty bool } -func NewDiskBuffer(name string, path string, stats BufferStats) (*DiskBuffer, error) { - filePath := filepath.Join(path, name) +func NewDiskBuffer(name, alias, path string, stats BufferStats) (*DiskBuffer, error) { + pluginName := name + if alias != "" { + pluginName += "-" + alias + } + + filePath := filepath.Join(path, pluginName) walFile, err := wal.Open(filePath, nil) if err != nil { return nil, fmt.Errorf("failed to open wal file: %w", err) From 10aa6cf5c8b232c3aee8d7fafafc5ba827f1f8df Mon Sep 17 00:00:00 2001 From: Dane Strandboge <136023093+DStrand1@users.noreply.github.com> Date: Tue, 8 Oct 2024 15:36:01 -0500 Subject: [PATCH 2/4] chore: switch from alias to id --- docs/CONFIGURATION.md | 2 +- models/buffer.go | 4 ++-- models/buffer_disk.go | 15 ++++++++------- models/buffer_disk_test.go | 2 +- models/buffer_mem_test.go | 2 +- models/buffer_suite_test.go | 2 +- models/running_output.go | 2 +- 7 files changed, 15 insertions(+), 14 deletions(-) diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index 313c6fa91163a..ded16750b078f 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -361,7 +361,7 @@ The agent table configures Telegraf and the defaults used across all plugins. - **buffer_directory**: The directory to use when in `disk` buffer mode. Each output plugin will make - another subdirectory in this directory with the output plugin's name. + another subdirectory in this directory with the output plugin's ID. ## Plugins diff --git a/models/buffer.go b/models/buffer.go index f78285f2b130c..3700a0a95773e 100644 --- a/models/buffer.go +++ b/models/buffer.go @@ -49,7 +49,7 @@ type BufferStats struct { } // NewBuffer returns a new empty Buffer with the given capacity. -func NewBuffer(name string, alias string, capacity int, strategy string, path string) (Buffer, error) { +func NewBuffer(id, name, alias string, capacity int, strategy, path string) (Buffer, error) { registerGob() bs := NewBufferStats(name, alias, capacity) @@ -58,7 +58,7 @@ func NewBuffer(name string, alias string, capacity int, strategy string, path st case "", "memory": return NewMemoryBuffer(capacity, bs) case "disk": - return NewDiskBuffer(name, alias, path, bs) + return NewDiskBuffer(id, path, bs) } return nil, fmt.Errorf("invalid buffer strategy %q", strategy) } diff --git a/models/buffer_disk.go b/models/buffer_disk.go index dd69783e20936..3ea8dc3531e80 100644 --- a/models/buffer_disk.go +++ b/models/buffer_disk.go @@ -33,17 +33,18 @@ type DiskBuffer struct { isEmpty bool } -func NewDiskBuffer(name, alias, path string, stats BufferStats) (*DiskBuffer, error) { - pluginName := name - if alias != "" { - pluginName += "-" + alias - } - - filePath := filepath.Join(path, pluginName) +func NewDiskBuffer(id, path string, stats BufferStats) (*DiskBuffer, error) { + filePath := filepath.Join(path, id) walFile, err := wal.Open(filePath, nil) if err != nil { return nil, fmt.Errorf("failed to open wal file: %w", err) } + //nolint:errcheck // cannot error here + if index, _ := walFile.FirstIndex(); index == 0 { + // simple way to test if the walfile is freshly initialized, meaning no existing file was found + log.Printf("I! wal file not found for plugin %s, this can safely be ignored if this is the first instance of this plugin", id) + } + buf := &DiskBuffer{ BufferStats: stats, file: walFile, diff --git a/models/buffer_disk_test.go b/models/buffer_disk_test.go index d650471ad249d..012276e172897 100644 --- a/models/buffer_disk_test.go +++ b/models/buffer_disk_test.go @@ -22,7 +22,7 @@ func newTestDiskBuffer(t testing.TB) Buffer { func newTestDiskBufferWithPath(t testing.TB, name string, path string) Buffer { t.Helper() - buf, err := NewBuffer(name, "", 0, "disk", path) + buf, err := NewBuffer(name, name, "", 0, "disk", path) require.NoError(t, err) buf.Stats().MetricsAdded.Set(0) buf.Stats().MetricsWritten.Set(0) diff --git a/models/buffer_mem_test.go b/models/buffer_mem_test.go index eec7c8b39c01f..63c4b9e7578e9 100644 --- a/models/buffer_mem_test.go +++ b/models/buffer_mem_test.go @@ -8,7 +8,7 @@ import ( func newTestMemoryBuffer(t testing.TB, capacity int) Buffer { t.Helper() - buf, err := NewBuffer("test", "", capacity, "memory", "") + buf, err := NewBuffer("test", "test", "", capacity, "memory", "") require.NoError(t, err) buf.Stats().MetricsAdded.Set(0) buf.Stats().MetricsWritten.Set(0) diff --git a/models/buffer_suite_test.go b/models/buffer_suite_test.go index fff061694b549..4875819c68a4e 100644 --- a/models/buffer_suite_test.go +++ b/models/buffer_suite_test.go @@ -84,7 +84,7 @@ func MetricTime(sec int64) telegraf.Metric { func (s *BufferSuiteTest) newTestBuffer(capacity int) Buffer { s.T().Helper() - buf, err := NewBuffer("test", "", capacity, s.bufferType, s.bufferPath) + buf, err := NewBuffer("test", "test", "", capacity, s.bufferType, s.bufferPath) s.Require().NoError(err) buf.Stats().MetricsAdded.Set(0) buf.Stats().MetricsWritten.Set(0) diff --git a/models/running_output.go b/models/running_output.go index c89f78c6dd963..34552ff9aa19b 100644 --- a/models/running_output.go +++ b/models/running_output.go @@ -104,7 +104,7 @@ func NewRunningOutput( batchSize = DefaultMetricBatchSize } - b, err := NewBuffer(config.Name, config.Alias, bufferLimit, config.BufferStrategy, config.BufferDirectory) + b, err := NewBuffer(config.ID, config.Name, config.Alias, bufferLimit, config.BufferStrategy, config.BufferDirectory) if err != nil { panic(err) } From a651aefa53c7d8f096ee7d36eecb7d9ec34427d0 Mon Sep 17 00:00:00 2001 From: Dane Strandboge <136023093+DStrand1@users.noreply.github.com> Date: Wed, 9 Oct 2024 18:57:07 -0500 Subject: [PATCH 3/4] reviews --- models/buffer.go | 4 ++-- models/buffer_disk.go | 5 +++-- models/buffer_disk_test.go | 2 +- models/buffer_mem_test.go | 2 +- models/buffer_suite_test.go | 2 +- models/running_output.go | 2 +- 6 files changed, 9 insertions(+), 8 deletions(-) diff --git a/models/buffer.go b/models/buffer.go index 3700a0a95773e..2f0aac7ce493e 100644 --- a/models/buffer.go +++ b/models/buffer.go @@ -49,7 +49,7 @@ type BufferStats struct { } // NewBuffer returns a new empty Buffer with the given capacity. -func NewBuffer(id, name, alias string, capacity int, strategy, path string) (Buffer, error) { +func NewBuffer(name, id, alias string, capacity int, strategy, path string) (Buffer, error) { registerGob() bs := NewBufferStats(name, alias, capacity) @@ -58,7 +58,7 @@ func NewBuffer(id, name, alias string, capacity int, strategy, path string) (Buf case "", "memory": return NewMemoryBuffer(capacity, bs) case "disk": - return NewDiskBuffer(id, path, bs) + return NewDiskBuffer(name, id, path, bs) } return nil, fmt.Errorf("invalid buffer strategy %q", strategy) } diff --git a/models/buffer_disk.go b/models/buffer_disk.go index 3ea8dc3531e80..2031e57399eab 100644 --- a/models/buffer_disk.go +++ b/models/buffer_disk.go @@ -33,7 +33,7 @@ type DiskBuffer struct { isEmpty bool } -func NewDiskBuffer(id, path string, stats BufferStats) (*DiskBuffer, error) { +func NewDiskBuffer(name, id, path string, stats BufferStats) (*DiskBuffer, error) { filePath := filepath.Join(path, id) walFile, err := wal.Open(filePath, nil) if err != nil { @@ -42,7 +42,8 @@ func NewDiskBuffer(id, path string, stats BufferStats) (*DiskBuffer, error) { //nolint:errcheck // cannot error here if index, _ := walFile.FirstIndex(); index == 0 { // simple way to test if the walfile is freshly initialized, meaning no existing file was found - log.Printf("I! wal file not found for plugin %s, this can safely be ignored if this is the first instance of this plugin", id) + log.Printf("I! WAL file not found for plugin outputs.%s (%s), "+ + "this can safely be ignored if you added this plugin instance for the first time", name, id) } buf := &DiskBuffer{ diff --git a/models/buffer_disk_test.go b/models/buffer_disk_test.go index 012276e172897..3e959f52a0e5d 100644 --- a/models/buffer_disk_test.go +++ b/models/buffer_disk_test.go @@ -22,7 +22,7 @@ func newTestDiskBuffer(t testing.TB) Buffer { func newTestDiskBufferWithPath(t testing.TB, name string, path string) Buffer { t.Helper() - buf, err := NewBuffer(name, name, "", 0, "disk", path) + buf, err := NewBuffer(name, "123", "", 0, "disk", path) require.NoError(t, err) buf.Stats().MetricsAdded.Set(0) buf.Stats().MetricsWritten.Set(0) diff --git a/models/buffer_mem_test.go b/models/buffer_mem_test.go index 63c4b9e7578e9..014f626315924 100644 --- a/models/buffer_mem_test.go +++ b/models/buffer_mem_test.go @@ -8,7 +8,7 @@ import ( func newTestMemoryBuffer(t testing.TB, capacity int) Buffer { t.Helper() - buf, err := NewBuffer("test", "test", "", capacity, "memory", "") + buf, err := NewBuffer("test", "123", "", capacity, "memory", "") require.NoError(t, err) buf.Stats().MetricsAdded.Set(0) buf.Stats().MetricsWritten.Set(0) diff --git a/models/buffer_suite_test.go b/models/buffer_suite_test.go index 4875819c68a4e..39814d657825c 100644 --- a/models/buffer_suite_test.go +++ b/models/buffer_suite_test.go @@ -84,7 +84,7 @@ func MetricTime(sec int64) telegraf.Metric { func (s *BufferSuiteTest) newTestBuffer(capacity int) Buffer { s.T().Helper() - buf, err := NewBuffer("test", "test", "", capacity, s.bufferType, s.bufferPath) + buf, err := NewBuffer("test", "123", "", capacity, s.bufferType, s.bufferPath) s.Require().NoError(err) buf.Stats().MetricsAdded.Set(0) buf.Stats().MetricsWritten.Set(0) diff --git a/models/running_output.go b/models/running_output.go index 34552ff9aa19b..420af4d764092 100644 --- a/models/running_output.go +++ b/models/running_output.go @@ -104,7 +104,7 @@ func NewRunningOutput( batchSize = DefaultMetricBatchSize } - b, err := NewBuffer(config.ID, config.Name, config.Alias, bufferLimit, config.BufferStrategy, config.BufferDirectory) + b, err := NewBuffer(config.Name, config.ID, config.Alias, bufferLimit, config.BufferStrategy, config.BufferDirectory) if err != nil { panic(err) } From 2b345d49259d14a44d21388f2095bdaf31063918 Mon Sep 17 00:00:00 2001 From: Dane Strandboge <136023093+DStrand1@users.noreply.github.com> Date: Mon, 14 Oct 2024 09:13:09 -0500 Subject: [PATCH 4/4] fix test failure --- models/buffer_disk_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/models/buffer_disk_test.go b/models/buffer_disk_test.go index 3e959f52a0e5d..d7d97be812bb1 100644 --- a/models/buffer_disk_test.go +++ b/models/buffer_disk_test.go @@ -45,6 +45,7 @@ func TestBuffer_RetainsTrackingInformation(t *testing.T) { func TestBuffer_TrackingDroppedFromOldWal(t *testing.T) { path, err := os.MkdirTemp("", "*-buffer-test") require.NoError(t, err) + path = filepath.Join(path, "123") walfile, err := wal.Open(path, nil) require.NoError(t, err)