diff --git a/CHANGELOG.md b/CHANGELOG.md index eb1b86f31ed..b1fdedc3104 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - The `IsEmpty` method is added to the `Instrument` type in `go.opentelemetry.io/otel/sdk/metric`. This method is used to check if an `Instrument` instance is a zero-value. (#5431) +### Changed + +- `Processor` in `go.opentelemetry.io/otel/sdk/log` now accepts a pointer to `Record` instead of a value so that the record modifications done in a processor are propagated to subsequent registered processors. (#5469) + ### Fixed - Log a warning to the OpenTelemetry internal logger when a `Record` in `go.opentelemetry.io/otel/sdk/log` drops an attribute due to a limit being reached. (#5376) diff --git a/sdk/log/batch.go b/sdk/log/batch.go index 8e43b0e8f75..41b6065a09e 100644 --- a/sdk/log/batch.go +++ b/sdk/log/batch.go @@ -176,11 +176,11 @@ func (b *BatchProcessor) poll(interval time.Duration) (done chan struct{}) { } // OnEmit batches provided log record. -func (b *BatchProcessor) OnEmit(_ context.Context, r Record) error { +func (b *BatchProcessor) OnEmit(_ context.Context, r *Record) error { if b.stopped.Load() || b.q == nil { return nil } - if n := b.q.Enqueue(r); n >= b.batchSize { + if n := b.q.Enqueue(r.Clone()); n >= b.batchSize { select { case b.pollTrigger <- struct{}{}: default: diff --git a/sdk/log/batch_test.go b/sdk/log/batch_test.go index 70b12ab04fa..a6f017dd165 100644 --- a/sdk/log/batch_test.go +++ b/sdk/log/batch_test.go @@ -45,9 +45,9 @@ func TestEmptyBatchConfig(t *testing.T) { assert.NotPanics(t, func() { var bp BatchProcessor ctx := context.Background() - var record Record + record := new(Record) assert.NoError(t, bp.OnEmit(ctx, record), "OnEmit") - assert.False(t, bp.Enabled(ctx, record), "Enabled") + assert.False(t, bp.Enabled(ctx, *record), "Enabled") assert.NoError(t, bp.ForceFlush(ctx), "ForceFlush") assert.NoError(t, bp.Shutdown(ctx), "Shutdown") }) @@ -198,7 +198,7 @@ func TestBatchProcessor(t *testing.T) { WithExportTimeout(time.Hour), ) for _, r := range make([]Record, size) { - assert.NoError(t, b.OnEmit(ctx, r)) + assert.NoError(t, b.OnEmit(ctx, &r)) } var got []Record assert.Eventually(t, func() bool { @@ -221,7 +221,7 @@ func TestBatchProcessor(t *testing.T) { WithExportTimeout(time.Hour), ) for _, r := range make([]Record, 10*batch) { - assert.NoError(t, b.OnEmit(ctx, r)) + assert.NoError(t, b.OnEmit(ctx, &r)) } assert.Eventually(t, func() bool { return e.ExportN() > 1 @@ -244,7 +244,7 @@ func TestBatchProcessor(t *testing.T) { WithExportTimeout(time.Hour), ) for _, r := range make([]Record, 2*batch) { - assert.NoError(t, b.OnEmit(ctx, r)) + assert.NoError(t, b.OnEmit(ctx, &r)) } var n int @@ -255,7 +255,7 @@ func TestBatchProcessor(t *testing.T) { var err error require.Eventually(t, func() bool { - err = b.OnEmit(ctx, Record{}) + err = b.OnEmit(ctx, new(Record)) return true }, time.Second, time.Microsecond, "OnEmit blocked") assert.NoError(t, err) @@ -303,7 +303,7 @@ func TestBatchProcessor(t *testing.T) { assert.NoError(t, b.Shutdown(ctx)) want := e.ExportN() - assert.NoError(t, b.OnEmit(ctx, Record{})) + assert.NoError(t, b.OnEmit(ctx, new(Record))) assert.Equal(t, want, e.ExportN(), "Export called after shutdown") }) @@ -311,7 +311,7 @@ func TestBatchProcessor(t *testing.T) { e := newTestExporter(nil) b := NewBatchProcessor(e) - assert.NoError(t, b.OnEmit(ctx, Record{})) + assert.NoError(t, b.OnEmit(ctx, new(Record))) assert.NoError(t, b.Shutdown(ctx)) assert.NoError(t, b.ForceFlush(ctx)) @@ -344,7 +344,7 @@ func TestBatchProcessor(t *testing.T) { ) t.Cleanup(func() { _ = b.Shutdown(ctx) }) - var r Record + r := new(Record) r.SetBody(log.BoolValue(true)) require.NoError(t, b.OnEmit(ctx, r)) @@ -353,7 +353,7 @@ func TestBatchProcessor(t *testing.T) { if assert.Equal(t, 1, e.ExportN(), "exporter Export calls") { got := e.Records() if assert.Len(t, got[0], 1, "records received") { - assert.Equal(t, r, got[0][0]) + assert.Equal(t, *r, got[0][0]) } } }) @@ -381,7 +381,7 @@ func TestBatchProcessor(t *testing.T) { // Enqueue 10 x "batch size" amount of records. for i := 0; i < 10*batch; i++ { - require.NoError(t, b.OnEmit(ctx, Record{})) + require.NoError(t, b.OnEmit(ctx, new(Record))) } assert.Eventually(t, func() bool { return e.ExportN() > 0 && len(b.exporter.input) == cap(b.exporter.input) @@ -423,7 +423,7 @@ func TestBatchProcessor(t *testing.T) { b := NewBatchProcessor(e) t.Cleanup(func() { _ = b.Shutdown(ctx) }) - var r Record + r := new(Record) r.SetBody(log.BoolValue(true)) _ = b.OnEmit(ctx, r) t.Cleanup(func() { _ = b.Shutdown(ctx) }) @@ -453,7 +453,7 @@ func TestBatchProcessor(t *testing.T) { WithExportInterval(time.Hour), WithExportTimeout(time.Hour), ) - var r Record + r := new(Record) // First record will be blocked by testExporter.Export assert.NoError(t, b.OnEmit(ctx, r), "exported record") require.Eventually(t, func() bool { @@ -497,7 +497,7 @@ func TestBatchProcessor(t *testing.T) { case <-ctx.Done(): return default: - assert.NoError(t, b.OnEmit(ctx, Record{})) + assert.NoError(t, b.OnEmit(ctx, new(Record))) // Ignore partial flush errors. _ = b.ForceFlush(ctx) } @@ -642,7 +642,7 @@ func TestQueue(t *testing.T) { } func BenchmarkBatchProcessorOnEmit(b *testing.B) { - var r Record + r := new(Record) body := log.BoolValue(true) r.SetBody(body) diff --git a/sdk/log/bench_test.go b/sdk/log/bench_test.go index ff5d6fe2bfa..75b430b6d88 100644 --- a/sdk/log/bench_test.go +++ b/sdk/log/bench_test.go @@ -83,8 +83,7 @@ type timestampDecorator struct { Processor } -func (e timestampDecorator) OnEmit(ctx context.Context, r Record) error { - r = r.Clone() +func (e timestampDecorator) OnEmit(ctx context.Context, r *Record) error { r.SetObservedTimestamp(time.Date(1988, time.November, 17, 0, 0, 0, 0, time.UTC)) return e.Processor.OnEmit(ctx, r) } @@ -93,8 +92,7 @@ type attrDecorator struct { Processor } -func (e attrDecorator) OnEmit(ctx context.Context, r Record) error { - r = r.Clone() +func (e attrDecorator) OnEmit(ctx context.Context, r *Record) error { r.SetAttributes(log.String("replace", "me")) return e.Processor.OnEmit(ctx, r) } diff --git a/sdk/log/logger.go b/sdk/log/logger.go index 245867f3fd6..04c44ac5bb8 100644 --- a/sdk/log/logger.go +++ b/sdk/log/logger.go @@ -36,7 +36,7 @@ func newLogger(p *LoggerProvider, scope instrumentation.Scope) *logger { func (l *logger) Emit(ctx context.Context, r log.Record) { newRecord := l.newRecord(ctx, r) for _, p := range l.provider.processors { - if err := p.OnEmit(ctx, newRecord); err != nil { + if err := p.OnEmit(ctx, &newRecord); err != nil { otel.Handle(err) } } diff --git a/sdk/log/processor.go b/sdk/log/processor.go index f95ea949027..d2c99d90cc7 100644 --- a/sdk/log/processor.go +++ b/sdk/log/processor.go @@ -26,9 +26,12 @@ type Processor interface { // considered unrecoverable and will be reported to a configured error // Handler. // - // Before modifying a Record, the implementation must use Record.Clone - // to create a copy that shares no state with the original. - OnEmit(ctx context.Context, record Record) error + // Implementations may synchronously modify the record so that the changes + // are visible in the next registered processor. + // Implementations must not modify the record asynchronously as [Record] + // is not concurrent safe. + // Implementations must not retain the record. + OnEmit(ctx context.Context, record *Record) error // Enabled returns whether the Processor will process for the given context // and record. // @@ -46,6 +49,7 @@ type Processor interface { // // Before modifying a Record, the implementation must use Record.Clone // to create a copy that shares no state with the original. + // Implementations must not retain the record. Enabled(ctx context.Context, record Record) bool // Shutdown is called when the SDK shuts down. Any cleanup or release of // resources held by the exporter should be done in this call. diff --git a/sdk/log/provider_test.go b/sdk/log/provider_test.go index bfa8afcda1d..1ce1d832fae 100644 --- a/sdk/log/provider_test.go +++ b/sdk/log/provider_test.go @@ -36,12 +36,12 @@ func newProcessor(name string) *processor { return &processor{Name: name, enabled: true} } -func (p *processor) OnEmit(ctx context.Context, r Record) error { +func (p *processor) OnEmit(ctx context.Context, r *Record) error { if p.Err != nil { return p.Err } - p.records = append(p.records, r) + p.records = append(p.records, *r) return nil } diff --git a/sdk/log/simple.go b/sdk/log/simple.go index c7aa14b8706..a015cadfacf 100644 --- a/sdk/log/simple.go +++ b/sdk/log/simple.go @@ -31,8 +31,8 @@ func NewSimpleProcessor(exporter Exporter, _ ...SimpleProcessorOption) *SimplePr } // OnEmit batches provided log record. -func (s *SimpleProcessor) OnEmit(ctx context.Context, r Record) error { - return s.exporter.Export(ctx, []Record{r}) +func (s *SimpleProcessor) OnEmit(ctx context.Context, r *Record) error { + return s.exporter.Export(ctx, []Record{*r}) } // Enabled returns true. diff --git a/sdk/log/simple_test.go b/sdk/log/simple_test.go index 805130465b0..dbc91a90156 100644 --- a/sdk/log/simple_test.go +++ b/sdk/log/simple_test.go @@ -42,12 +42,12 @@ func TestSimpleProcessorOnEmit(t *testing.T) { e := new(exporter) s := log.NewSimpleProcessor(e) - var r log.Record + r := new(log.Record) r.SetSeverityText("test") _ = s.OnEmit(context.Background(), r) require.True(t, e.exportCalled, "exporter Export not called") - assert.Equal(t, []log.Record{r}, e.records) + assert.Equal(t, []log.Record{*r}, e.records) } func TestSimpleProcessorEnabled(t *testing.T) { @@ -75,7 +75,7 @@ func TestSimpleProcessorConcurrentSafe(t *testing.T) { var wg sync.WaitGroup wg.Add(goRoutineN) - var r log.Record + r := new(log.Record) r.SetSeverityText("test") ctx := context.Background() s := log.NewSimpleProcessor(nil) @@ -84,7 +84,7 @@ func TestSimpleProcessorConcurrentSafe(t *testing.T) { defer wg.Done() _ = s.OnEmit(ctx, r) - _ = s.Enabled(ctx, r) + _ = s.Enabled(ctx, *r) _ = s.Shutdown(ctx) _ = s.ForceFlush(ctx) }() @@ -94,7 +94,7 @@ func TestSimpleProcessorConcurrentSafe(t *testing.T) { } func BenchmarkSimpleProcessorOnEmit(b *testing.B) { - var r log.Record + r := new(log.Record) r.SetSeverityText("test") ctx := context.Background() s := log.NewSimpleProcessor(nil)