Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(agent): Use a unique WAL file for plugin instances of the same type #15966

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ The agent table configures Telegraf and the defaults used across all plugins.

- **buffer_directory**:
The directory to use when in `disk` buffer mode. Each output plugin will make
another subdirectory in this directory with the output plugin's name.
another subdirectory in this directory with the output plugin's ID.

## Plugins

Expand Down
4 changes: 2 additions & 2 deletions models/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type BufferStats struct {
}

// NewBuffer returns a new empty Buffer with the given capacity.
func NewBuffer(name string, alias string, capacity int, strategy string, path string) (Buffer, error) {
func NewBuffer(id, name, alias string, capacity int, strategy, path string) (Buffer, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need the alias still? Maybe name, id would be more intuitive?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The alias is still used for the buffer stats for both buffer strategies, though it isn't used for either buffer configuration otherwise. I can swap ID and name here though

registerGob()

bs := NewBufferStats(name, alias, capacity)
Expand All @@ -58,7 +58,7 @@ func NewBuffer(name string, alias string, capacity int, strategy string, path st
case "", "memory":
return NewMemoryBuffer(capacity, bs)
case "disk":
return NewDiskBuffer(name, alias, path, bs)
return NewDiskBuffer(id, path, bs)
DStrand1 marked this conversation as resolved.
Show resolved Hide resolved
}
return nil, fmt.Errorf("invalid buffer strategy %q", strategy)
}
Expand Down
15 changes: 8 additions & 7 deletions models/buffer_disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,18 @@ type DiskBuffer struct {
isEmpty bool
}

func NewDiskBuffer(name, alias, path string, stats BufferStats) (*DiskBuffer, error) {
pluginName := name
if alias != "" {
pluginName += "-" + alias
}

filePath := filepath.Join(path, pluginName)
func NewDiskBuffer(id, path string, stats BufferStats) (*DiskBuffer, error) {
DStrand1 marked this conversation as resolved.
Show resolved Hide resolved
filePath := filepath.Join(path, id)
walFile, err := wal.Open(filePath, nil)
if err != nil {
return nil, fmt.Errorf("failed to open wal file: %w", err)
}
//nolint:errcheck // cannot error here
if index, _ := walFile.FirstIndex(); index == 0 {
// simple way to test if the walfile is freshly initialized, meaning no existing file was found
log.Printf("I! wal file not found for plugin %s, this can safely be ignored if this is the first instance of this plugin", id)
DStrand1 marked this conversation as resolved.
Show resolved Hide resolved
}

buf := &DiskBuffer{
BufferStats: stats,
file: walFile,
Expand Down
2 changes: 1 addition & 1 deletion models/buffer_disk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func newTestDiskBuffer(t testing.TB) Buffer {

func newTestDiskBufferWithPath(t testing.TB, name string, path string) Buffer {
t.Helper()
buf, err := NewBuffer(name, "", 0, "disk", path)
buf, err := NewBuffer(name, name, "", 0, "disk", path)
DStrand1 marked this conversation as resolved.
Show resolved Hide resolved
require.NoError(t, err)
buf.Stats().MetricsAdded.Set(0)
buf.Stats().MetricsWritten.Set(0)
Expand Down
2 changes: 1 addition & 1 deletion models/buffer_mem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

func newTestMemoryBuffer(t testing.TB, capacity int) Buffer {
t.Helper()
buf, err := NewBuffer("test", "", capacity, "memory", "")
buf, err := NewBuffer("test", "test", "", capacity, "memory", "")
DStrand1 marked this conversation as resolved.
Show resolved Hide resolved
require.NoError(t, err)
buf.Stats().MetricsAdded.Set(0)
buf.Stats().MetricsWritten.Set(0)
Expand Down
2 changes: 1 addition & 1 deletion models/buffer_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func MetricTime(sec int64) telegraf.Metric {

func (s *BufferSuiteTest) newTestBuffer(capacity int) Buffer {
s.T().Helper()
buf, err := NewBuffer("test", "", capacity, s.bufferType, s.bufferPath)
buf, err := NewBuffer("test", "test", "", capacity, s.bufferType, s.bufferPath)
DStrand1 marked this conversation as resolved.
Show resolved Hide resolved
s.Require().NoError(err)
buf.Stats().MetricsAdded.Set(0)
buf.Stats().MetricsWritten.Set(0)
Expand Down
2 changes: 1 addition & 1 deletion models/running_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func NewRunningOutput(
batchSize = DefaultMetricBatchSize
}

b, err := NewBuffer(config.Name, config.Alias, bufferLimit, config.BufferStrategy, config.BufferDirectory)
b, err := NewBuffer(config.ID, config.Name, config.Alias, bufferLimit, config.BufferStrategy, config.BufferDirectory)
DStrand1 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
panic(err)
}
Expand Down
Loading