From 440d0b113bf95ef3afb5b1bf291b222c7638f601 Mon Sep 17 00:00:00 2001 From: dmathieu Date: Thu, 29 Aug 2024 10:29:46 +0200 Subject: [PATCH 1/9] implement span processor's OnEnding --- sdk/trace/span.go | 17 +++++++++++++---- sdk/trace/span_processor.go | 11 +++++++++++ sdk/trace/span_processor_test.go | 21 +++++++++++++++++++++ 3 files changed, 45 insertions(+), 4 deletions(-) diff --git a/sdk/trace/span.go b/sdk/trace/span.go index 4945f508303..c051e8f99b5 100644 --- a/sdk/trace/span.go +++ b/sdk/trace/span.go @@ -120,6 +120,9 @@ type recordingSpan struct { // value of time.Time until the span is ended. endTime time.Time + // hasEnded records whether the span is fully ended. + hasEnded bool + // status is the status of this span. status Status @@ -171,10 +174,8 @@ func (s *recordingSpan) IsRecording() bool { if s == nil { return false } - s.mu.Lock() - defer s.mu.Unlock() - return s.endTime.IsZero() + return !s.hasEnded } // SetStatus sets the status of the Span in the form of a code and a @@ -417,7 +418,6 @@ func (s *recordingSpan) End(options ...trace.SpanEndOption) { } s.mu.Lock() - // Setting endTime to non-zero marks the span as ended and not recording. if config.Timestamp().IsZero() { s.endTime = et } else { @@ -426,6 +426,15 @@ func (s *recordingSpan) End(options ...trace.SpanEndOption) { s.mu.Unlock() sps := s.tracer.provider.getSpanProcessors() + for _, sp := range sps { + if oesp, ok := sp.sp.(OnEndingSpanProcessor); ok { + oesp.OnEnding(s) + } + } + s.mu.Lock() + s.hasEnded = true + s.mu.Unlock() + if len(sps) == 0 { return } diff --git a/sdk/trace/span_processor.go b/sdk/trace/span_processor.go index af7f9177fc5..a71aa9de08b 100644 --- a/sdk/trace/span_processor.go +++ b/sdk/trace/span_processor.go @@ -49,6 +49,17 @@ type SpanProcessor interface { // must never be done outside of a new major release. } +// OnEndingSpanProcessor represents span processors that allow mutating spans +// just before they are ended and made immutable. +// +// NOT STABLE: This interface still has a status of "development", and may have +// breaking changes. +type OnEndingSpanProcessor interface { + // OnEnding is called while the span is finished, and while spans are still + // mutable. It is called synchronously and cannot block. + OnEnding(ReadWriteSpan) +} + type spanProcessorState struct { sp SpanProcessor state sync.Once diff --git a/sdk/trace/span_processor_test.go b/sdk/trace/span_processor_test.go index 9c8df1d54f9..e90385aa19c 100644 --- a/sdk/trace/span_processor_test.go +++ b/sdk/trace/span_processor_test.go @@ -46,6 +46,13 @@ func (t *testSpanProcessor) OnStart(parent context.Context, s sdktrace.ReadWrite t.spansStarted = append(t.spansStarted, s) } +func (t *testSpanProcessor) OnEnding(s sdktrace.ReadWriteSpan) { + if t == nil { + return + } + s.SetAttributes(attribute.Bool("OnEnding", true)) +} + func (t *testSpanProcessor) OnEnd(s sdktrace.ReadOnlySpan) { if t == nil { return @@ -130,6 +137,17 @@ func TestRegisterSpanProcessor(t *testing.T) { } } } + + onEndingOK := false + for _, kv := range sp.spansEnded[0].Attributes() { + switch kv.Key { + case "OnEnding": + onEndingOK = true + default: + continue + } + } + if c != len(spNames) { t.Errorf("%s: expected attributes(SpanProcessorName): got %d, want %d\n", name, c, len(spNames)) } @@ -139,6 +157,9 @@ func TestRegisterSpanProcessor(t *testing.T) { if !sidOK { t.Errorf("%s: expected attributes(ParentSpanID)\n", name) } + if !onEndingOK { + t.Errorf("%s: expected attributes(OnEnding)\n", name) + } } } From 999175ed0b9f4d29fd16fba808cf6133477d74df Mon Sep 17 00:00:00 2001 From: dmathieu Date: Thu, 29 Aug 2024 10:48:10 +0200 Subject: [PATCH 2/9] add changelog entry --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4969347e79e..6d663fc5e07 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ## [Unreleased] +### Added + +- Support for the `OnEnding` callback in span processors. (#5756) + ### Fixed - Fix memory leak in the global `MeterProvider` when identical instruments are repeatedly created. (#5754) From fb6bba590d76d471ccd76406a66c685f78c7ab49 Mon Sep 17 00:00:00 2001 From: dmathieu Date: Fri, 30 Aug 2024 10:04:47 +0200 Subject: [PATCH 3/9] move the OnEnding processor into internal/x --- sdk/internal/x/onending_processor.go | 14 ++++++++++++++ sdk/trace/span.go | 3 ++- sdk/trace/span_processor.go | 11 ----------- sdk/trace/span_processor_test.go | 2 +- 4 files changed, 17 insertions(+), 13 deletions(-) create mode 100644 sdk/internal/x/onending_processor.go diff --git a/sdk/internal/x/onending_processor.go b/sdk/internal/x/onending_processor.go new file mode 100644 index 00000000000..8ee598a34f4 --- /dev/null +++ b/sdk/internal/x/onending_processor.go @@ -0,0 +1,14 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package x // import "go.opentelemetry.io/otel/sdk/internal/x" + +import "go.opentelemetry.io/otel/trace" + +// OnEndingSpanProcessor represents span processors that allow mutating spans +// just before they are ended and made immutable. +type OnEndingSpanProcessor interface { + // OnEnding is called while the span is finished, and while spans are still + // mutable. It is called synchronously and cannot block. + OnEnding(trace.Span) +} diff --git a/sdk/trace/span.go b/sdk/trace/span.go index c051e8f99b5..7d01436ed04 100644 --- a/sdk/trace/span.go +++ b/sdk/trace/span.go @@ -19,6 +19,7 @@ import ( "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/internal/global" "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/internal/x" "go.opentelemetry.io/otel/sdk/resource" semconv "go.opentelemetry.io/otel/semconv/v1.26.0" "go.opentelemetry.io/otel/trace" @@ -427,7 +428,7 @@ func (s *recordingSpan) End(options ...trace.SpanEndOption) { sps := s.tracer.provider.getSpanProcessors() for _, sp := range sps { - if oesp, ok := sp.sp.(OnEndingSpanProcessor); ok { + if oesp, ok := sp.sp.(x.OnEndingSpanProcessor); ok { oesp.OnEnding(s) } } diff --git a/sdk/trace/span_processor.go b/sdk/trace/span_processor.go index a71aa9de08b..af7f9177fc5 100644 --- a/sdk/trace/span_processor.go +++ b/sdk/trace/span_processor.go @@ -49,17 +49,6 @@ type SpanProcessor interface { // must never be done outside of a new major release. } -// OnEndingSpanProcessor represents span processors that allow mutating spans -// just before they are ended and made immutable. -// -// NOT STABLE: This interface still has a status of "development", and may have -// breaking changes. -type OnEndingSpanProcessor interface { - // OnEnding is called while the span is finished, and while spans are still - // mutable. It is called synchronously and cannot block. - OnEnding(ReadWriteSpan) -} - type spanProcessorState struct { sp SpanProcessor state sync.Once diff --git a/sdk/trace/span_processor_test.go b/sdk/trace/span_processor_test.go index e90385aa19c..49f5b481f15 100644 --- a/sdk/trace/span_processor_test.go +++ b/sdk/trace/span_processor_test.go @@ -46,7 +46,7 @@ func (t *testSpanProcessor) OnStart(parent context.Context, s sdktrace.ReadWrite t.spansStarted = append(t.spansStarted, s) } -func (t *testSpanProcessor) OnEnding(s sdktrace.ReadWriteSpan) { +func (t *testSpanProcessor) OnEnding(s trace.Span) { if t == nil { return } From 5b53cc526fea8c0847e76905e7c6eb7c98e688c3 Mon Sep 17 00:00:00 2001 From: dmathieu Date: Fri, 6 Sep 2024 09:58:35 +0200 Subject: [PATCH 4/9] add a bit more doc --- sdk/internal/x/onending_processor.go | 14 ------------ sdk/trace/doc.go | 3 +++ sdk/trace/internal/x/onending_processor.go | 25 ++++++++++++++++++++++ sdk/trace/span.go | 2 +- 4 files changed, 29 insertions(+), 15 deletions(-) delete mode 100644 sdk/internal/x/onending_processor.go create mode 100644 sdk/trace/internal/x/onending_processor.go diff --git a/sdk/internal/x/onending_processor.go b/sdk/internal/x/onending_processor.go deleted file mode 100644 index 8ee598a34f4..00000000000 --- a/sdk/internal/x/onending_processor.go +++ /dev/null @@ -1,14 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package x // import "go.opentelemetry.io/otel/sdk/internal/x" - -import "go.opentelemetry.io/otel/trace" - -// OnEndingSpanProcessor represents span processors that allow mutating spans -// just before they are ended and made immutable. -type OnEndingSpanProcessor interface { - // OnEnding is called while the span is finished, and while spans are still - // mutable. It is called synchronously and cannot block. - OnEnding(trace.Span) -} diff --git a/sdk/trace/doc.go b/sdk/trace/doc.go index 1f60524e3ee..7a9d2f2545e 100644 --- a/sdk/trace/doc.go +++ b/sdk/trace/doc.go @@ -6,5 +6,8 @@ Package trace contains support for OpenTelemetry distributed tracing. The following assumes a basic familiarity with OpenTelemetry concepts. See https://opentelemetry.io. + +See [go.opentelemetry.io/otel/sdk/internal/x] for information about the +experimental features. */ package trace // import "go.opentelemetry.io/otel/sdk/trace" diff --git a/sdk/trace/internal/x/onending_processor.go b/sdk/trace/internal/x/onending_processor.go new file mode 100644 index 00000000000..e046d22751b --- /dev/null +++ b/sdk/trace/internal/x/onending_processor.go @@ -0,0 +1,25 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package x // import "go.opentelemetry.io/otel/sdk/trace/internal/x" + +import "go.opentelemetry.io/otel/trace" + +// OnEndingSpanProcessor represents span processors that allow mutating spans +// just before they are ended and made immutable. +// +// This is useful for custom processor implementations that want to mutate +// spans when they are finished, and before they are made immutable, such as +// implementing tail-based sampling. +type OnEndingSpanProcessor interface { + // OnEnding is called while the span is finished, and spans are still + // mutable. + // + // This method is called synchronously during the span's `End()` operation, + // therefore it should not block or throw an exception. + // If multiple [SpanProcessor] are registered, their `OnEnding` callbacks are + // invoked in the order they have been registered. + // + // [SpanProcessor]: https://pkg.go.dev/go.opentelemetry.io/otel/sdk/trace#SpanProcessor + OnEnding(trace.Span) +} diff --git a/sdk/trace/span.go b/sdk/trace/span.go index 7d01436ed04..6ba70437785 100644 --- a/sdk/trace/span.go +++ b/sdk/trace/span.go @@ -19,8 +19,8 @@ import ( "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/internal/global" "go.opentelemetry.io/otel/sdk/instrumentation" - "go.opentelemetry.io/otel/sdk/internal/x" "go.opentelemetry.io/otel/sdk/resource" + "go.opentelemetry.io/otel/sdk/trace/internal/x" semconv "go.opentelemetry.io/otel/semconv/v1.26.0" "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace/embedded" From 793d5fd45370fb3144a074fb2787d8545f6d3297 Mon Sep 17 00:00:00 2001 From: Damien Mathieu <42@dmathieu.com> Date: Fri, 6 Sep 2024 12:06:27 +0200 Subject: [PATCH 5/9] Update sdk/trace/internal/x/onending_processor.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Robert Pająk --- sdk/trace/internal/x/onending_processor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/trace/internal/x/onending_processor.go b/sdk/trace/internal/x/onending_processor.go index e046d22751b..09b90c07aa9 100644 --- a/sdk/trace/internal/x/onending_processor.go +++ b/sdk/trace/internal/x/onending_processor.go @@ -15,7 +15,7 @@ type OnEndingSpanProcessor interface { // OnEnding is called while the span is finished, and spans are still // mutable. // - // This method is called synchronously during the span's `End()` operation, + // This method is called synchronously during the span's End operation, // therefore it should not block or throw an exception. // If multiple [SpanProcessor] are registered, their `OnEnding` callbacks are // invoked in the order they have been registered. From 3613902016ab778110348523edf2ddf86b61232a Mon Sep 17 00:00:00 2001 From: Damien Mathieu <42@dmathieu.com> Date: Fri, 6 Sep 2024 12:06:34 +0200 Subject: [PATCH 6/9] Update sdk/trace/internal/x/onending_processor.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Robert Pająk --- sdk/trace/internal/x/onending_processor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/trace/internal/x/onending_processor.go b/sdk/trace/internal/x/onending_processor.go index 09b90c07aa9..fbb3543f411 100644 --- a/sdk/trace/internal/x/onending_processor.go +++ b/sdk/trace/internal/x/onending_processor.go @@ -17,7 +17,7 @@ type OnEndingSpanProcessor interface { // // This method is called synchronously during the span's End operation, // therefore it should not block or throw an exception. - // If multiple [SpanProcessor] are registered, their `OnEnding` callbacks are + // If multiple [SpanProcessor] are registered, their OnEnding callbacks are // invoked in the order they have been registered. // // [SpanProcessor]: https://pkg.go.dev/go.opentelemetry.io/otel/sdk/trace#SpanProcessor From 6c043185e5632e7686a7dbe1d91260e9ce471729 Mon Sep 17 00:00:00 2001 From: dmathieu Date: Fri, 6 Sep 2024 12:10:01 +0200 Subject: [PATCH 7/9] add readme --- sdk/trace/internal/x/README.md | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) create mode 100644 sdk/trace/internal/x/README.md diff --git a/sdk/trace/internal/x/README.md b/sdk/trace/internal/x/README.md new file mode 100644 index 00000000000..c6f457110b5 --- /dev/null +++ b/sdk/trace/internal/x/README.md @@ -0,0 +1,33 @@ +# Experimental Features + +The Trace SDK contains features that have not yet stabilized. +These features are added to the OpenTelemetry Go Trace SDK prior to +stabilization so that users can start experimenting with them and provide +feedback. + +These feature may change in backwards incompatible ways as feedback is applied. +See the [Compatibility and Stability](#compatibility-and-stability) section for +more information. + +## Features + +- [OnEnding Processor](#onending-processor) + +### OnEnding Processor + +Processor implementations sometimes want to be able to modify a span after it +ended, but before it becomes immutable. +A processor that implements the `OnEnding` method can use that callback to +perform such modifications. + +It can be used to implement tail-based sampling for example. + +## Compatibility and Stability + +Experimental features do not fall within the scope of the OpenTelemetry Go +versioning and stability [policy](../../../../VERSIONING.md). +These features may be removed or modified in successive version releases, +including patch versions. + +When an experimental feature is promoted to a stable feature, a migration path +will be included in the changelog entry of the release. From 80c873af509e66e029c094c1efb1f359ceb29d59 Mon Sep 17 00:00:00 2001 From: dmathieu Date: Wed, 2 Oct 2024 19:56:53 +0200 Subject: [PATCH 8/9] bring back mutex lock in IsRecording --- sdk/trace/span.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdk/trace/span.go b/sdk/trace/span.go index 6ba70437785..920e328f09a 100644 --- a/sdk/trace/span.go +++ b/sdk/trace/span.go @@ -175,6 +175,8 @@ func (s *recordingSpan) IsRecording() bool { if s == nil { return false } + s.mu.Lock() + defer s.mu.Unlock() return !s.hasEnded } From f4ae0f44dc9b14ce0386df1b8b4c0a32573a9d98 Mon Sep 17 00:00:00 2001 From: dmathieu Date: Thu, 3 Oct 2024 10:26:44 +0200 Subject: [PATCH 9/9] avoid locking twice if there are no onending span processors --- sdk/trace/span.go | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/sdk/trace/span.go b/sdk/trace/span.go index 920e328f09a..337863befe6 100644 --- a/sdk/trace/span.go +++ b/sdk/trace/span.go @@ -420,23 +420,31 @@ func (s *recordingSpan) End(options ...trace.SpanEndOption) { s.executionTracerTaskEnd() } + sps := s.tracer.provider.getSpanProcessors() + var oesps []x.OnEndingSpanProcessor + for _, sp := range sps { + if oesp, ok := sp.sp.(x.OnEndingSpanProcessor); ok { + oesps = append(oesps, oesp) + } + } + s.mu.Lock() if config.Timestamp().IsZero() { s.endTime = et } else { s.endTime = config.Timestamp() } + s.hasEnded = len(oesps) == 0 s.mu.Unlock() - sps := s.tracer.provider.getSpanProcessors() - for _, sp := range sps { - if oesp, ok := sp.sp.(x.OnEndingSpanProcessor); ok { - oesp.OnEnding(s) + if len(oesps) > 0 { + for _, sp := range oesps { + sp.OnEnding(s) } + s.mu.Lock() + s.hasEnded = true + s.mu.Unlock() } - s.mu.Lock() - s.hasEnded = true - s.mu.Unlock() if len(sps) == 0 { return