Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PoC] Change Processor.OnEmit to accept a record pointer #5469

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- The `IsEmpty` method is added to the `Instrument` type in `go.opentelemetry.io/otel/sdk/metric`.
This method is used to check if an `Instrument` instance is a zero-value. (#5431)

### Changed

- `Processor` in `go.opentelemetry.io/otel/sdk/log` now accepts a pointer to `Record` instead of a value so that the record modifications done in a processor are propagated to subsequent registered processors. (#5469)

### Fixed

- Log a warning to the OpenTelemetry internal logger when a `Record` in `go.opentelemetry.io/otel/sdk/log` drops an attribute due to a limit being reached. (#5376)
Expand Down
4 changes: 2 additions & 2 deletions sdk/log/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,11 @@ func (b *BatchProcessor) poll(interval time.Duration) (done chan struct{}) {
}

// OnEmit batches provided log record.
func (b *BatchProcessor) OnEmit(_ context.Context, r Record) error {
func (b *BatchProcessor) OnEmit(_ context.Context, r *Record) error {
if b.stopped.Load() || b.q == nil {
return nil
}
if n := b.q.Enqueue(r); n >= b.batchSize {
if n := b.q.Enqueue(*r); n >= b.batchSize {
select {
case b.pollTrigger <- struct{}{}:
default:
Expand Down
30 changes: 15 additions & 15 deletions sdk/log/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ func TestEmptyBatchConfig(t *testing.T) {
assert.NotPanics(t, func() {
var bp BatchProcessor
ctx := context.Background()
var record Record
record := new(Record)
assert.NoError(t, bp.OnEmit(ctx, record), "OnEmit")
assert.False(t, bp.Enabled(ctx, record), "Enabled")
assert.False(t, bp.Enabled(ctx, *record), "Enabled")
assert.NoError(t, bp.ForceFlush(ctx), "ForceFlush")
assert.NoError(t, bp.Shutdown(ctx), "Shutdown")
})
Expand Down Expand Up @@ -198,7 +198,7 @@ func TestBatchProcessor(t *testing.T) {
WithExportTimeout(time.Hour),
)
for _, r := range make([]Record, size) {
assert.NoError(t, b.OnEmit(ctx, r))
assert.NoError(t, b.OnEmit(ctx, &r))
}
var got []Record
assert.Eventually(t, func() bool {
Expand All @@ -221,7 +221,7 @@ func TestBatchProcessor(t *testing.T) {
WithExportTimeout(time.Hour),
)
for _, r := range make([]Record, 10*batch) {
assert.NoError(t, b.OnEmit(ctx, r))
assert.NoError(t, b.OnEmit(ctx, &r))
}
assert.Eventually(t, func() bool {
return e.ExportN() > 1
Expand All @@ -244,7 +244,7 @@ func TestBatchProcessor(t *testing.T) {
WithExportTimeout(time.Hour),
)
for _, r := range make([]Record, 2*batch) {
assert.NoError(t, b.OnEmit(ctx, r))
assert.NoError(t, b.OnEmit(ctx, &r))
}

var n int
Expand All @@ -255,7 +255,7 @@ func TestBatchProcessor(t *testing.T) {

var err error
require.Eventually(t, func() bool {
err = b.OnEmit(ctx, Record{})
err = b.OnEmit(ctx, new(Record))
return true
}, time.Second, time.Microsecond, "OnEmit blocked")
assert.NoError(t, err)
Expand Down Expand Up @@ -303,15 +303,15 @@ func TestBatchProcessor(t *testing.T) {
assert.NoError(t, b.Shutdown(ctx))

want := e.ExportN()
assert.NoError(t, b.OnEmit(ctx, Record{}))
assert.NoError(t, b.OnEmit(ctx, new(Record)))
assert.Equal(t, want, e.ExportN(), "Export called after shutdown")
})

t.Run("ForceFlush", func(t *testing.T) {
e := newTestExporter(nil)
b := NewBatchProcessor(e)

assert.NoError(t, b.OnEmit(ctx, Record{}))
assert.NoError(t, b.OnEmit(ctx, new(Record)))
assert.NoError(t, b.Shutdown(ctx))

assert.NoError(t, b.ForceFlush(ctx))
Expand Down Expand Up @@ -344,7 +344,7 @@ func TestBatchProcessor(t *testing.T) {
)
t.Cleanup(func() { _ = b.Shutdown(ctx) })

var r Record
r := new(Record)
r.SetBody(log.BoolValue(true))
require.NoError(t, b.OnEmit(ctx, r))

Expand All @@ -353,7 +353,7 @@ func TestBatchProcessor(t *testing.T) {
if assert.Equal(t, 1, e.ExportN(), "exporter Export calls") {
got := e.Records()
if assert.Len(t, got[0], 1, "records received") {
assert.Equal(t, r, got[0][0])
assert.Equal(t, *r, got[0][0])
}
}
})
Expand Down Expand Up @@ -381,7 +381,7 @@ func TestBatchProcessor(t *testing.T) {

// Enqueue 10 x "batch size" amount of records.
for i := 0; i < 10*batch; i++ {
require.NoError(t, b.OnEmit(ctx, Record{}))
require.NoError(t, b.OnEmit(ctx, new(Record)))
}
assert.Eventually(t, func() bool {
return e.ExportN() > 0 && len(b.exporter.input) == cap(b.exporter.input)
Expand Down Expand Up @@ -423,7 +423,7 @@ func TestBatchProcessor(t *testing.T) {
b := NewBatchProcessor(e)
t.Cleanup(func() { _ = b.Shutdown(ctx) })

var r Record
r := new(Record)
r.SetBody(log.BoolValue(true))
_ = b.OnEmit(ctx, r)
t.Cleanup(func() { _ = b.Shutdown(ctx) })
Expand Down Expand Up @@ -453,7 +453,7 @@ func TestBatchProcessor(t *testing.T) {
WithExportInterval(time.Hour),
WithExportTimeout(time.Hour),
)
var r Record
r := new(Record)
// First record will be blocked by testExporter.Export
assert.NoError(t, b.OnEmit(ctx, r), "exported record")
require.Eventually(t, func() bool {
Expand Down Expand Up @@ -497,7 +497,7 @@ func TestBatchProcessor(t *testing.T) {
case <-ctx.Done():
return
default:
assert.NoError(t, b.OnEmit(ctx, Record{}))
assert.NoError(t, b.OnEmit(ctx, new(Record)))
// Ignore partial flush errors.
_ = b.ForceFlush(ctx)
}
Expand Down Expand Up @@ -642,7 +642,7 @@ func TestQueue(t *testing.T) {
}

func BenchmarkBatchProcessorOnEmit(b *testing.B) {
var r Record
r := new(Record)
body := log.BoolValue(true)
r.SetBody(body)

Expand Down
6 changes: 2 additions & 4 deletions sdk/log/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@ type timestampDecorator struct {
Processor
}

func (e timestampDecorator) OnEmit(ctx context.Context, r Record) error {
r = r.Clone()
func (e timestampDecorator) OnEmit(ctx context.Context, r *Record) error {
r.SetObservedTimestamp(time.Date(1988, time.November, 17, 0, 0, 0, 0, time.UTC))
return e.Processor.OnEmit(ctx, r)
}
Expand All @@ -93,8 +92,7 @@ type attrDecorator struct {
Processor
}

func (e attrDecorator) OnEmit(ctx context.Context, r Record) error {
r = r.Clone()
func (e attrDecorator) OnEmit(ctx context.Context, r *Record) error {
r.SetAttributes(log.String("replace", "me"))
return e.Processor.OnEmit(ctx, r)
}
3 changes: 0 additions & 3 deletions sdk/log/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@ type Exporter interface {
// Handler.
//
// Implementations must not retain the records slice.
//
// Before modifying a Record, the implementation must use Record.Clone
// to create a copy that shares no state with the original.
Export(ctx context.Context, records []Record) error
// Shutdown is called when the SDK shuts down. Any cleanup or release of
// resources held by the exporter should be done in this call.
Expand Down
2 changes: 1 addition & 1 deletion sdk/log/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func newLogger(p *LoggerProvider, scope instrumentation.Scope) *logger {
func (l *logger) Emit(ctx context.Context, r log.Record) {
newRecord := l.newRecord(ctx, r)
for _, p := range l.provider.processors {
if err := p.OnEmit(ctx, newRecord); err != nil {
if err := p.OnEmit(ctx, &newRecord); err != nil {
otel.Handle(err)
}
}
Expand Down
8 changes: 2 additions & 6 deletions sdk/log/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ type Processor interface {
// considered unrecoverable and will be reported to a configured error
// Handler.
//
// Before modifying a Record, the implementation must use Record.Clone
// to create a copy that shares no state with the original.
OnEmit(ctx context.Context, record Record) error
// Implementations must not modify the record after OnEmit returns.
OnEmit(ctx context.Context, record *Record) error
// Enabled returns whether the Processor will process for the given context
// and record.
//
Expand All @@ -43,9 +42,6 @@ type Processor interface {
// state. An implementation should default to returning true for an
// indeterminate state, but may return false if valid reasons in particular
// circumstances exist (e.g. performance, correctness).
//
// Before modifying a Record, the implementation must use Record.Clone
// to create a copy that shares no state with the original.
Enabled(ctx context.Context, record Record) bool
// Shutdown is called when the SDK shuts down. Any cleanup or release of
// resources held by the exporter should be done in this call.
Expand Down
4 changes: 2 additions & 2 deletions sdk/log/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ func newProcessor(name string) *processor {
return &processor{Name: name, enabled: true}
}

func (p *processor) OnEmit(ctx context.Context, r Record) error {
func (p *processor) OnEmit(ctx context.Context, r *Record) error {
if p.Err != nil {
return p.Err
}

p.records = append(p.records, r)
p.records = append(p.records, *r)
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions sdk/log/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ func NewSimpleProcessor(exporter Exporter, _ ...SimpleProcessorOption) *SimplePr
}

// OnEmit batches provided log record.
func (s *SimpleProcessor) OnEmit(ctx context.Context, r Record) error {
return s.exporter.Export(ctx, []Record{r})
func (s *SimpleProcessor) OnEmit(ctx context.Context, r *Record) error {
return s.exporter.Export(ctx, []Record{*r})
}

// Enabled returns true.
Expand Down
10 changes: 5 additions & 5 deletions sdk/log/simple_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ func TestSimpleProcessorOnEmit(t *testing.T) {
e := new(exporter)
s := log.NewSimpleProcessor(e)

var r log.Record
r := new(log.Record)
r.SetSeverityText("test")
_ = s.OnEmit(context.Background(), r)

require.True(t, e.exportCalled, "exporter Export not called")
assert.Equal(t, []log.Record{r}, e.records)
assert.Equal(t, []log.Record{*r}, e.records)
}

func TestSimpleProcessorEnabled(t *testing.T) {
Expand Down Expand Up @@ -75,7 +75,7 @@ func TestSimpleProcessorConcurrentSafe(t *testing.T) {
var wg sync.WaitGroup
wg.Add(goRoutineN)

var r log.Record
r := new(log.Record)
r.SetSeverityText("test")
ctx := context.Background()
s := log.NewSimpleProcessor(nil)
Expand All @@ -84,7 +84,7 @@ func TestSimpleProcessorConcurrentSafe(t *testing.T) {
defer wg.Done()

_ = s.OnEmit(ctx, r)
_ = s.Enabled(ctx, r)
_ = s.Enabled(ctx, *r)
_ = s.Shutdown(ctx)
_ = s.ForceFlush(ctx)
}()
Expand All @@ -94,7 +94,7 @@ func TestSimpleProcessorConcurrentSafe(t *testing.T) {
}

func BenchmarkSimpleProcessorOnEmit(b *testing.B) {
var r log.Record
r := new(log.Record)
r.SetSeverityText("test")
ctx := context.Background()
s := log.NewSimpleProcessor(nil)
Expand Down