Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement span processor's OnEnding #5756

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

### Added

- Add `go.opentelemetry.io/otel/sdk/metric/exemplar` package which includes `Exemplar`, `Filter`, `SampledFilter`, `AlwaysOnFilter`, `HistogramReservoir`, `FixedSizeReservoir`, `Reservoir`, `Value` and `ValueType` types. These will be used for configuring the exemplar reservoir for the metrics sdk. (#5747)
- Add `go.opentelemetry.io/otel/sdk/metric/exemplar` package which includes `Exemplar`, `Filter`, `TraceBasedFilter`, `AlwaysOnFilter`, `HistogramReservoir`, `FixedSizeReservoir`, `Reservoir`, `Value` and `ValueType` types. These will be used for configuring the exemplar reservoir for the metrics sdk. (#5747, #5862)
- Support for the `OnEnding` callback in span processors. (#5756)

### Changed

Expand Down
3 changes: 3 additions & 0 deletions sdk/trace/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,8 @@ Package trace contains support for OpenTelemetry distributed tracing.

The following assumes a basic familiarity with OpenTelemetry concepts.
See https://opentelemetry.io.

See [go.opentelemetry.io/otel/sdk/internal/x] for information about the
experimental features.
*/
package trace // import "go.opentelemetry.io/otel/sdk/trace"
33 changes: 33 additions & 0 deletions sdk/trace/internal/x/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Experimental Features

The Trace SDK contains features that have not yet stabilized.
These features are added to the OpenTelemetry Go Trace SDK prior to
stabilization so that users can start experimenting with them and provide
feedback.

These feature may change in backwards incompatible ways as feedback is applied.
See the [Compatibility and Stability](#compatibility-and-stability) section for
more information.

## Features

- [OnEnding Processor](#onending-processor)

### OnEnding Processor

Processor implementations sometimes want to be able to modify a span after it
ended, but before it becomes immutable.
A processor that implements the `OnEnding` method can use that callback to
perform such modifications.

It can be used to implement tail-based sampling for example.

## Compatibility and Stability

Experimental features do not fall within the scope of the OpenTelemetry Go
versioning and stability [policy](../../../../VERSIONING.md).
These features may be removed or modified in successive version releases,
including patch versions.

When an experimental feature is promoted to a stable feature, a migration path
will be included in the changelog entry of the release.
25 changes: 25 additions & 0 deletions sdk/trace/internal/x/onending_processor.go
dmathieu marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package x // import "go.opentelemetry.io/otel/sdk/trace/internal/x"

import "go.opentelemetry.io/otel/trace"

// OnEndingSpanProcessor represents span processors that allow mutating spans
// just before they are ended and made immutable.
//
// This is useful for custom processor implementations that want to mutate
// spans when they are finished, and before they are made immutable, such as
// implementing tail-based sampling.
type OnEndingSpanProcessor interface {
// OnEnding is called while the span is finished, and spans are still
// mutable.
//
// This method is called synchronously during the span's End operation,
// therefore it should not block or throw an exception.
// If multiple [SpanProcessor] are registered, their OnEnding callbacks are
// invoked in the order they have been registered.
//
// [SpanProcessor]: https://pkg.go.dev/go.opentelemetry.io/otel/sdk/trace#SpanProcessor
OnEnding(trace.Span)
}
26 changes: 23 additions & 3 deletions sdk/trace/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/sdk/trace/internal/x"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/embedded"
Expand Down Expand Up @@ -120,6 +121,9 @@ type recordingSpan struct {
// value of time.Time until the span is ended.
endTime time.Time

// hasEnded records whether the span is fully ended.
hasEnded bool

// status is the status of this span.
status Status

Expand Down Expand Up @@ -174,7 +178,7 @@ func (s *recordingSpan) IsRecording() bool {
s.mu.Lock()
defer s.mu.Unlock()

return s.endTime.IsZero()
return !s.hasEnded
dmathieu marked this conversation as resolved.
Show resolved Hide resolved
}

// SetStatus sets the status of the Span in the form of a code and a
Expand Down Expand Up @@ -416,16 +420,32 @@ func (s *recordingSpan) End(options ...trace.SpanEndOption) {
s.executionTracerTaskEnd()
}

sps := s.tracer.provider.getSpanProcessors()
var oesps []x.OnEndingSpanProcessor
for _, sp := range sps {
if oesp, ok := sp.sp.(x.OnEndingSpanProcessor); ok {
oesps = append(oesps, oesp)
}
}

s.mu.Lock()
// Setting endTime to non-zero marks the span as ended and not recording.
if config.Timestamp().IsZero() {
s.endTime = et
} else {
s.endTime = config.Timestamp()
}
s.hasEnded = len(oesps) == 0
s.mu.Unlock()

sps := s.tracer.provider.getSpanProcessors()
if len(oesps) > 0 {
for _, sp := range oesps {
sp.OnEnding(s)
}
s.mu.Lock()
s.hasEnded = true
s.mu.Unlock()
}

if len(sps) == 0 {
return
}
Expand Down
21 changes: 21 additions & 0 deletions sdk/trace/span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ func (t *testSpanProcessor) OnStart(parent context.Context, s sdktrace.ReadWrite
t.spansStarted = append(t.spansStarted, s)
}

func (t *testSpanProcessor) OnEnding(s trace.Span) {
if t == nil {
return
}
s.SetAttributes(attribute.Bool("OnEnding", true))
}

func (t *testSpanProcessor) OnEnd(s sdktrace.ReadOnlySpan) {
if t == nil {
return
Expand Down Expand Up @@ -130,6 +137,17 @@ func TestRegisterSpanProcessor(t *testing.T) {
}
}
}

onEndingOK := false
for _, kv := range sp.spansEnded[0].Attributes() {
switch kv.Key {
case "OnEnding":
onEndingOK = true
default:
continue
}
}

if c != len(spNames) {
t.Errorf("%s: expected attributes(SpanProcessorName): got %d, want %d\n", name, c, len(spNames))
}
Expand All @@ -139,6 +157,9 @@ func TestRegisterSpanProcessor(t *testing.T) {
if !sidOK {
t.Errorf("%s: expected attributes(ParentSpanID)\n", name)
}
if !onEndingOK {
t.Errorf("%s: expected attributes(OnEnding)\n", name)
}
}
}

Expand Down
Loading