Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add selector of exemplar reservoir providers to metric.Stream configuration #5861

Merged
merged 8 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

- Add `go.opentelemetry.io/otel/sdk/metric/exemplar.AlwaysOffFilter`, which can be used to disable exemplar recording. (#5850)
- Add `go.opentelemetry.io/otel/sdk/metric.WithExemplarFilter`, which can be used to configure the exemplar filter used by the metrics SDK. (#5850)
- Add `ExemplarReservoirProviderSelector` and `DefaultExemplarReservoirProviderSelector` to `go.opentelemetry.io/otel/sdk/metric`, which defines the exemplar reservoir to use based on the aggregation of the metric. (#5861)
- Add `ExemplarReservoirProviderSelector` to `go.opentelemetry.io/otel/sdk/metric.Stream` to allow using views to configure the exemplar reservoir to use for a metric. (#5861)
- Add `ReservoirProvider`, `HistogramReservoirProvider` and `FixedSizeReservoirProvider` to `go.opentelemetry.io/otel/sdk/metric/exemplar` to make it convenient to use providers of Reservoirs. (#5861)

<!-- Released section -->
<!-- Don't change this section unless doing release -->
Expand Down
25 changes: 25 additions & 0 deletions sdk/metric/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,31 @@ func ExampleNewView_exponentialHistogram() {
)
}

func ExampleNewView_exemplarreservoirproviderselector() {
// Create a view that makes all metrics use a different exemplar reservoir.
view := metric.NewView(
metric.Instrument{Name: "*"},
metric.Stream{
ExemplarReservoirProviderSelector: func(agg metric.Aggregation) exemplar.ReservoirProvider {
// This example uses a fixed-size reservoir with a size of 10
// for explicit bucket histograms instead of the default
// bucket-aligned reservoir.
if _, ok := agg.(metric.AggregationExplicitBucketHistogram); ok {
dashpole marked this conversation as resolved.
Show resolved Hide resolved
return exemplar.FixedSizeReservoirProvider(10)
}
// Fall back to the default reservoir otherwise.
return metric.DefaultExemplarReservoirProviderSelector(agg)
},
},
)

// The created view can then be registered with the OpenTelemetry metric
// SDK using the WithView option.
_ = metric.NewMeterProvider(
metric.WithView(view),
)
}

func ExampleWithExemplarFilter_disabled() {
// Use exemplar.AlwaysOffFilter to disable exemplar collection.
_ = metric.NewMeterProvider(
Expand Down
41 changes: 31 additions & 10 deletions sdk/metric/exemplar.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,48 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"runtime"
"slices"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
)

// ExemplarReservoirProviderSelector selects the
// [exemplar.ReservoirProvider] to use
// based on the [Aggregation] of the metric.
type ExemplarReservoirProviderSelector func(Aggregation) exemplar.ReservoirProvider

// reservoirFunc returns the appropriately configured exemplar reservoir
// creation func based on the passed InstrumentKind and filter configuration.
func reservoirFunc[N int64 | float64](agg Aggregation, filter exemplar.Filter) func() aggregate.FilteredExemplarReservoir[N] {
func reservoirFunc[N int64 | float64](provider exemplar.ReservoirProvider, filter exemplar.Filter) func(attribute.Set) aggregate.FilteredExemplarReservoir[N] {
return func(attrs attribute.Set) aggregate.FilteredExemplarReservoir[N] {
return aggregate.NewFilteredExemplarReservoir[N](filter, provider(attrs))
}
}

// DefaultExemplarReservoirProviderSelector returns the default
// [exemplar.ReservoirProvider] for the
// provided [Aggregation].
//
// For explicit bucket histograms with more than 1 bucket, it uses the
// [exemplar.HistogramReservoirProvider].
// For exponential histograms, it uses the
// [exemplar.FixedSizeReservoirProvider]
// with a size of min(20, max_buckets).
// For all other aggregations, it uses the
// [exemplar.FixedSizeReservoirProvider]
// with a size equal to the number of CPUs.
//
// Exemplar default reservoirs MAY change in a minor version bump. No
// guarantees are made on the shape or statistical properties of returned
// exemplars.
func DefaultExemplarReservoirProviderSelector(agg Aggregation) exemplar.ReservoirProvider {
// https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/metrics/sdk.md#exemplar-defaults
// Explicit bucket histogram aggregation with more than 1 bucket will
// use AlignedHistogramBucketExemplarReservoir.
a, ok := agg.(AggregationExplicitBucketHistogram)
if ok && len(a.Boundaries) > 0 {
cp := slices.Clone(a.Boundaries)
return func() aggregate.FilteredExemplarReservoir[N] {
bounds := cp
return aggregate.NewFilteredExemplarReservoir[N](filter, exemplar.NewHistogramReservoir(bounds))
}
return exemplar.HistogramReservoirProvider(a.Boundaries)
}

var n int
Expand All @@ -50,7 +73,5 @@ func reservoirFunc[N int64 | float64](agg Aggregation, filter exemplar.Filter) f
}
}

return func() aggregate.FilteredExemplarReservoir[N] {
return aggregate.NewFilteredExemplarReservoir[N](filter, exemplar.NewFixedSizeReservoir(n))
}
return exemplar.FixedSizeReservoirProvider(n)
}
7 changes: 7 additions & 0 deletions sdk/metric/exemplar/fixed_size_reservoir.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ import (
"go.opentelemetry.io/otel/attribute"
)

// FixedSizeReservoirProvider returns a provider of [FixedSizeReservoir].
func FixedSizeReservoirProvider(k int) ReservoirProvider {
return func(_ attribute.Set) Reservoir {
return NewFixedSizeReservoir(k)
}
}

// NewFixedSizeReservoir returns a [FixedSizeReservoir] that samples at most
// k exemplars. If there are k or less measurements made, the Reservoir will
// sample each one. If there are more than k, the Reservoir will then randomly
Expand Down
8 changes: 4 additions & 4 deletions sdk/metric/exemplar/fixed_size_reservoir_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ import (
)

func TestNewFixedSizeReservoir(t *testing.T) {
t.Run("Int64", ReservoirTest[int64](func(n int) (Reservoir, int) {
return NewFixedSizeReservoir(n), n
t.Run("Int64", ReservoirTest[int64](func(n int) (ReservoirProvider, int) {
return FixedSizeReservoirProvider(n), n
}))

t.Run("Float64", ReservoirTest[float64](func(n int) (Reservoir, int) {
return NewFixedSizeReservoir(n), n
t.Run("Float64", ReservoirTest[float64](func(n int) (ReservoirProvider, int) {
return FixedSizeReservoirProvider(n), n
}))
}

Expand Down
12 changes: 10 additions & 2 deletions sdk/metric/exemplar/histogram_reservoir.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,21 @@ import (
"go.opentelemetry.io/otel/attribute"
)

// HistogramReservoirProvider is a provider of [HistogramReservoir].
func HistogramReservoirProvider(bounds []float64) ReservoirProvider {
cp := slices.Clone(bounds)
slices.Sort(cp)
return func(_ attribute.Set) Reservoir {
return NewHistogramReservoir(cp)
}
}

// NewHistogramReservoir returns a [HistogramReservoir] that samples the last
// measurement that falls within a histogram bucket. The histogram bucket
// upper-boundaries are define by bounds.
//
// The passed bounds will be sorted by this function.
// The passed bounds must be sorted before calling this function.
func NewHistogramReservoir(bounds []float64) *HistogramReservoir {
slices.Sort(bounds)
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
return &HistogramReservoir{
bounds: bounds,
storage: newStorage(len(bounds) + 1),
Expand Down
8 changes: 4 additions & 4 deletions sdk/metric/exemplar/histogram_reservoir_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import "testing"

func TestHist(t *testing.T) {
bounds := []float64{0, 100}
t.Run("Int64", ReservoirTest[int64](func(int) (Reservoir, int) {
return NewHistogramReservoir(bounds), len(bounds)
t.Run("Int64", ReservoirTest[int64](func(int) (ReservoirProvider, int) {
return HistogramReservoirProvider(bounds), len(bounds)
}))

t.Run("Float64", ReservoirTest[float64](func(int) (Reservoir, int) {
return NewHistogramReservoir(bounds), len(bounds)
t.Run("Float64", ReservoirTest[float64](func(int) (ReservoirProvider, int) {
return HistogramReservoirProvider(bounds), len(bounds)
}))
}
8 changes: 8 additions & 0 deletions sdk/metric/exemplar/reservoir.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,11 @@ type Reservoir interface {
// The Reservoir state is preserved after this call.
Collect(dest *[]Exemplar)
}

// ReservoirProvider creates new [Reservoir]s.
//
// The attributes provided are attributes which are kept by the aggregation, and
// are exclusive with attributes passed to Offer. The combination of these
// attributes and the attributes passed to Offer is the complete set of
// attributes a measurement was made with.
type ReservoirProvider func(attr attribute.Set) Reservoir
17 changes: 11 additions & 6 deletions sdk/metric/exemplar/reservoir_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
// Sat Jan 01 2000 00:00:00 GMT+0000.
var staticTime = time.Unix(946684800, 0)

type factory func(requestedCap int) (r Reservoir, actualCap int)
type factory func(requestedCap int) (r ReservoirProvider, actualCap int)

func ReservoirTest[N int64 | float64](f factory) func(*testing.T) {
return func(t *testing.T) {
Expand All @@ -29,10 +29,11 @@ func ReservoirTest[N int64 | float64](f factory) func(*testing.T) {
t.Run("CaptureSpanContext", func(t *testing.T) {
t.Helper()

r, n := f(1)
rp, n := f(1)
if n < 1 {
t.Skip("skipping, reservoir capacity less than 1:", n)
}
r := rp(*attribute.EmptySet())

tID, sID := trace.TraceID{0x01}, trace.SpanID{0x01}
sc := trace.NewSpanContext(trace.SpanContextConfig{
Expand Down Expand Up @@ -60,10 +61,11 @@ func ReservoirTest[N int64 | float64](f factory) func(*testing.T) {
t.Run("FilterAttributes", func(t *testing.T) {
t.Helper()

r, n := f(1)
rp, n := f(1)
if n < 1 {
t.Skip("skipping, reservoir capacity less than 1:", n)
}
r := rp(*attribute.EmptySet())

adminTrue := attribute.Bool("admin", true)
r.Offer(ctx, staticTime, NewValue(N(10)), []attribute.KeyValue{adminTrue})
Expand All @@ -83,10 +85,11 @@ func ReservoirTest[N int64 | float64](f factory) func(*testing.T) {
t.Run("CollectLessThanN", func(t *testing.T) {
t.Helper()

r, n := f(2)
rp, n := f(2)
if n < 2 {
t.Skip("skipping, reservoir capacity less than 2:", n)
}
r := rp(*attribute.EmptySet())

r.Offer(ctx, staticTime, NewValue(N(10)), nil)

Expand All @@ -99,10 +102,11 @@ func ReservoirTest[N int64 | float64](f factory) func(*testing.T) {
t.Run("MultipleOffers", func(t *testing.T) {
t.Helper()

r, n := f(3)
rp, n := f(3)
if n < 1 {
t.Skip("skipping, reservoir capacity less than 1:", n)
}
r := rp(*attribute.EmptySet())

for i := 0; i < n+1; i++ {
v := NewValue(N(i))
Expand All @@ -127,10 +131,11 @@ func ReservoirTest[N int64 | float64](f factory) func(*testing.T) {
t.Run("DropAll", func(t *testing.T) {
t.Helper()

r, n := f(0)
rp, n := f(0)
if n > 0 {
t.Skip("skipping, reservoir capacity greater than 0:", n)
}
r := rp(*attribute.EmptySet())

r.Offer(context.Background(), staticTime, NewValue(N(10)), nil)

Expand Down
6 changes: 6 additions & 0 deletions sdk/metric/instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ type Stream struct {
// Use NewAllowKeysFilter from "go.opentelemetry.io/otel/attribute" to
// provide an allow-list of attribute keys here.
AttributeFilter attribute.Filter
// ExemplarReservoirProvider selects the
// [go.opentelemetry.io/otel/sdk/metric/exemplar.ReservoirProvider] based
// on the [Aggregation].
//
// If unspecified, [DefaultExemplarReservoirProviderSelector] is used.
ExemplarReservoirProviderSelector ExemplarReservoirProviderSelector
}

// instID are the identifying properties of a instrument.
Expand Down
4 changes: 2 additions & 2 deletions sdk/metric/internal/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type Builder[N int64 | float64] struct {
//
// If this is not provided a default factory function that returns an
// dropReservoir reservoir will be used.
ReservoirFunc func() FilteredExemplarReservoir[N]
ReservoirFunc func(attribute.Set) FilteredExemplarReservoir[N]
// AggregationLimit is the cardinality limit of measurement attributes. Any
// measurement for new attributes once the limit has been reached will be
// aggregated into a single aggregate for the "otel.metric.overflow"
Expand All @@ -49,7 +49,7 @@ type Builder[N int64 | float64] struct {
AggregationLimit int
}

func (b Builder[N]) resFunc() func() FilteredExemplarReservoir[N] {
func (b Builder[N]) resFunc() func(attribute.Set) FilteredExemplarReservoir[N] {
if b.ReservoirFunc != nil {
return b.ReservoirFunc
}
Expand Down
4 changes: 2 additions & 2 deletions sdk/metric/internal/aggregate/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ func (c *clock) Register() (unregister func()) {
return func() { now = orig }
}

func dropExemplars[N int64 | float64]() FilteredExemplarReservoir[N] {
return dropReservoir[N]()
func dropExemplars[N int64 | float64](attr attribute.Set) FilteredExemplarReservoir[N] {
return dropReservoir[N](attr)
}

func TestBuilderFilter(t *testing.T) {
Expand Down
4 changes: 3 additions & 1 deletion sdk/metric/internal/aggregate/drop.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import (
)

// dropReservoir returns a [FilteredReservoir] that drops all measurements it is offered.
func dropReservoir[N int64 | float64]() FilteredExemplarReservoir[N] { return &dropRes[N]{} }
func dropReservoir[N int64 | float64](attribute.Set) FilteredExemplarReservoir[N] {
return &dropRes[N]{}
}

type dropRes[N int64 | float64] struct{}

Expand Down
3 changes: 2 additions & 1 deletion sdk/metric/internal/aggregate/drop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/stretchr/testify/assert"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
)

Expand All @@ -17,7 +18,7 @@ func TestDrop(t *testing.T) {
}

func testDropFiltered[N int64 | float64](t *testing.T) {
r := dropReservoir[N]()
r := dropReservoir[N](*attribute.EmptySet())

var dest []exemplar.Exemplar
r.Collect(&dest)
Expand Down
6 changes: 3 additions & 3 deletions sdk/metric/internal/aggregate/exponential_histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func (b *expoBuckets) downscale(delta int32) {
// newExponentialHistogram returns an Aggregator that summarizes a set of
// measurements as an exponential histogram. Each histogram is scoped by attributes
// and the aggregation cycle the measurements were made in.
func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func() FilteredExemplarReservoir[N]) *expoHistogram[N] {
func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *expoHistogram[N] {
return &expoHistogram[N]{
noSum: noSum,
noMinMax: noMinMax,
Expand All @@ -306,7 +306,7 @@ type expoHistogram[N int64 | float64] struct {
maxSize int
maxScale int32

newRes func() FilteredExemplarReservoir[N]
newRes func(attribute.Set) FilteredExemplarReservoir[N]
limit limiter[*expoHistogramDataPoint[N]]
values map[attribute.Distinct]*expoHistogramDataPoint[N]
valuesMu sync.Mutex
Expand All @@ -327,7 +327,7 @@ func (e *expoHistogram[N]) measure(ctx context.Context, value N, fltrAttr attrib
v, ok := e.values[attr.Equivalent()]
if !ok {
v = newExpoHistogramDataPoint[N](attr, e.maxSize, e.maxScale, e.noMinMax, e.noSum)
v.res = e.newRes()
v.res = e.newRes(attr)

e.values[attr.Equivalent()] = v
}
Expand Down
8 changes: 4 additions & 4 deletions sdk/metric/internal/aggregate/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ type histValues[N int64 | float64] struct {
noSum bool
bounds []float64

newRes func() FilteredExemplarReservoir[N]
newRes func(attribute.Set) FilteredExemplarReservoir[N]
limit limiter[*buckets[N]]
values map[attribute.Distinct]*buckets[N]
valuesMu sync.Mutex
}

func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func() FilteredExemplarReservoir[N]) *histValues[N] {
func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *histValues[N] {
// The responsibility of keeping all buckets correctly associated with the
// passed boundaries is ultimately this type's responsibility. Make a copy
// here so we can always guarantee this. Or, in the case of failure, have
Expand Down Expand Up @@ -93,7 +93,7 @@ func (s *histValues[N]) measure(ctx context.Context, value N, fltrAttr attribute
//
// buckets = (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, +∞)
b = newBuckets[N](attr, len(s.bounds)+1)
b.res = s.newRes()
b.res = s.newRes(attr)

// Ensure min and max are recorded values (not zero), for new buckets.
b.min, b.max = value, value
Expand All @@ -108,7 +108,7 @@ func (s *histValues[N]) measure(ctx context.Context, value N, fltrAttr attribute

// newHistogram returns an Aggregator that summarizes a set of measurements as
// an histogram.
func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func() FilteredExemplarReservoir[N]) *histogram[N] {
func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *histogram[N] {
return &histogram[N]{
histValues: newHistValues[N](boundaries, noSum, limit, r),
noMinMax: noMinMax,
Expand Down
Loading
Loading