diff --git a/models/buffer_disk.go b/models/buffer_disk.go index e5fc1b219cfda..dabd377cd9e03 100644 --- a/models/buffer_disk.go +++ b/models/buffer_disk.go @@ -4,7 +4,6 @@ import ( "errors" "fmt" "log" - "os" "path/filepath" "sync" @@ -27,6 +26,11 @@ type DiskBuffer struct { // Ending point of metrics read from disk on telegraf launch. // Used to know whether to discard tracking metrics. originalEnd uint64 + + // The WAL library currently has no way to "fully empty" the walfile. In this case, + // we have to do our best and track that the walfile "should" be empty, so that next + // write, we can remove the invalid entry (also skipping this entry if it is being read). + isEmpty bool } func NewDiskBuffer(name string, path string, stats BufferStats) (*DiskBuffer, error) { @@ -53,6 +57,9 @@ func (b *DiskBuffer) Len() int { } func (b *DiskBuffer) length() int { + if b.isEmpty { + return 0 + } // Special case for when the read index is zero, it must be empty (otherwise it would be >= 1) if b.readIndex() == 0 { return 0 @@ -87,6 +94,8 @@ func (b *DiskBuffer) Add(metrics ...telegraf.Metric) int { if !b.addSingleMetric(m) { dropped++ } + // as soon as a new metric is added, if this was empty, try to flush the "empty" metric out + b.handleEmptyFile() } b.BufferSize.Set(int64(b.length())) return dropped @@ -169,7 +178,7 @@ func (b *DiskBuffer) Accept(batch []telegraf.Metric) { b.metricWritten(m) } if b.length() == len(batch) { - b.resetWalFile() + b.emptyFile() } else { err := b.file.TruncateFront(b.batchFirst + uint64(len(batch))) if err != nil { @@ -205,15 +214,27 @@ func (b *DiskBuffer) resetBatch() { } // This is very messy and not ideal, but serves as the only way I can find currently -// to actually clear the walfile completely if needed, since Truncate() calls require +// to actually treat the walfile as empty if needed, since Truncate() calls require // that at least one entry remains in them otherwise they return an error. // Related issue: https://github.com/tidwall/wal/issues/20 -func (b *DiskBuffer) resetWalFile() { - b.file.Close() - os.Remove(b.path) - walFile, err := wal.Open(b.path, nil) - if err != nil { +func (b *DiskBuffer) handleEmptyFile() { + if !b.isEmpty { + return + } + if err := b.file.TruncateFront(b.readIndex() + 1); err != nil { + log.Printf("E! readIndex: %d, buffer len: %d", b.readIndex(), b.length()) + panic(err) + } + b.isEmpty = false +} + +func (b *DiskBuffer) emptyFile() { + if b.isEmpty || b.length() == 0 { + return + } + if err := b.file.TruncateFront(b.writeIndex() - 1); err != nil { + log.Printf("E! writeIndex: %d, buffer len: %d", b.writeIndex(), b.length()) panic(err) } - b.file = walFile + b.isEmpty = true } diff --git a/models/buffer_suite_test.go b/models/buffer_suite_test.go index a984df41e1614..fff061694b549 100644 --- a/models/buffer_suite_test.go +++ b/models/buffer_suite_test.go @@ -809,3 +809,26 @@ func (s *BufferSuiteTest) TestBuffer_RejectEmptyBatch() { s.NotNil(m) } } + +func (s *BufferSuiteTest) TestBuffer_FlushedPartial() { + b := s.newTestBuffer(5) + b.Add(MetricTime(1)) + b.Add(MetricTime(2)) + b.Add(MetricTime(3)) + batch := b.Batch(2) + s.Len(batch, 2) + + b.Accept(batch) + s.Equal(1, b.Len()) +} + +func (s *BufferSuiteTest) TestBuffer_FlushedFull() { + b := s.newTestBuffer(5) + b.Add(MetricTime(1)) + b.Add(MetricTime(2)) + batch := b.Batch(2) + s.Len(batch, 2) + + b.Accept(batch) + s.Equal(0, b.Len()) +}