diff --git a/sdk/metric/internal/aggregate/aggregate.go b/sdk/metric/internal/aggregate/aggregate.go index 82a8571e88d..ac9dda5cbad 100644 --- a/sdk/metric/internal/aggregate/aggregate.go +++ b/sdk/metric/internal/aggregate/aggregate.go @@ -42,6 +42,17 @@ type Builder[N int64 | float64] struct { Filter attribute.Filter } +func (b Builder[N]) filter(f Measure[N]) Measure[N] { + if b.Filter != nil { + fltr := b.Filter // Copy to make it immutable after assignment. + return func(ctx context.Context, n N, a attribute.Set) { + fAttr, _ := a.Filter(fltr) + f(ctx, n, fAttr) + } + } + return f +} + func (b Builder[N]) input(agg aggregator[N]) Measure[N] { if b.Filter != nil { fltr := b.Filter // Copy to make it immutable after assignment. @@ -63,11 +74,13 @@ func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) { // a last-value aggregate. lv := newLastValue[N]() - return b.input(lv), func(dest *metricdata.Aggregation) int { - // TODO (#4220): optimize memory reuse here. - *dest = lv.Aggregation() - + return b.filter(lv.measure), func(dest *metricdata.Aggregation) int { + // Ignore if dest is not a metricdata.Gauge. The chance for memory + // reuse of the DataPoints is missed (better luck next time). gData, _ := (*dest).(metricdata.Gauge[N]) + lv.computeAggregation(&gData.DataPoints) + *dest = gData + return len(gData.DataPoints) } } @@ -129,3 +142,12 @@ func (b Builder[N]) ExplicitBucketHistogram(cfg aggregation.ExplicitBucketHistog return len(hData.DataPoints) } } + +// reset ensures s has capacity and sets it length. If the capacity of s too +// small, a new slice is returned with the specified capacity and length. +func reset[T any](s []T, length, capacity int) []T { + if cap(s) < capacity { + return make([]T, length, capacity) + } + return s[:length] +} diff --git a/sdk/metric/internal/aggregate/aggregate_test.go b/sdk/metric/internal/aggregate/aggregate_test.go index 09c92ab9a28..76ba3a26eaf 100644 --- a/sdk/metric/internal/aggregate/aggregate_test.go +++ b/sdk/metric/internal/aggregate/aggregate_test.go @@ -16,25 +16,43 @@ package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggreg import ( "context" + "strconv" "testing" + "time" "github.com/stretchr/testify/assert" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" ) var ( - keyUser = "user" - userAlice = attribute.String(keyUser, "Alice") - adminTrue = attribute.Bool("admin", true) + keyUser = "user" + userAlice = attribute.String(keyUser, "Alice") + userBob = attribute.String(keyUser, "Bob") + adminTrue = attribute.Bool("admin", true) + adminFalse = attribute.Bool("admin", false) alice = attribute.NewSet(userAlice, adminTrue) + bob = attribute.NewSet(userBob, adminFalse) // Filtered. attrFltr = func(kv attribute.KeyValue) bool { return kv.Key == attribute.Key(keyUser) } fltrAlice = attribute.NewSet(userAlice) + fltrBob = attribute.NewSet(userBob) + + // Sat Jan 01 2000 00:00:00 GMT+0000. + staticTime = time.Unix(946684800, 0) + staticNowFunc = func() time.Time { return staticTime } + // Pass to t.Cleanup to override the now function with staticNowFunc and + // revert once the test completes. E.g. t.Cleanup(mockTime(now)). + mockTime = func(orig func() time.Time) (cleanup func()) { + now = staticNowFunc + return func() { now = orig } + } ) type inputTester[N int64 | float64] struct { @@ -73,3 +91,92 @@ func testBuilderInput[N int64 | float64]() func(t *testing.T) { t.Run("Filter", run(Builder[N]{Filter: attrFltr}, fltrAlice)) } } + +type arg[N int64 | float64] struct { + ctx context.Context + + value N + attr attribute.Set +} + +type output struct { + n int + agg metricdata.Aggregation +} + +type teststep[N int64 | float64] struct { + input []arg[N] + expect output +} + +func test[N int64 | float64](meas Measure[N], comp ComputeAggregation, steps []teststep[N]) func(*testing.T) { + return func(t *testing.T) { + t.Helper() + + got := new(metricdata.Aggregation) + for i, step := range steps { + for _, args := range step.input { + meas(args.ctx, args.value, args.attr) + } + + t.Logf("step: %d", i) + assert.Equal(t, step.expect.n, comp(got), "incorrect data size") + metricdatatest.AssertAggregationsEqual(t, step.expect.agg, *got) + } + } +} + +func benchmarkAggregate[N int64 | float64](factory func() (Measure[N], ComputeAggregation)) func(*testing.B) { + counts := []int{1, 10, 100} + return func(b *testing.B) { + for _, n := range counts { + b.Run(strconv.Itoa(n), func(b *testing.B) { + benchmarkAggregateN(b, factory, n) + }) + } + } +} + +var bmarkRes metricdata.Aggregation + +func benchmarkAggregateN[N int64 | float64](b *testing.B, factory func() (Measure[N], ComputeAggregation), count int) { + ctx := context.Background() + attrs := make([]attribute.Set, count) + for i := range attrs { + attrs[i] = attribute.NewSet(attribute.Int("value", i)) + } + + b.Run("Measure", func(b *testing.B) { + got := &bmarkRes + meas, comp := factory() + b.ReportAllocs() + b.ResetTimer() + + for n := 0; n < b.N; n++ { + for _, attr := range attrs { + meas(ctx, 1, attr) + } + } + + comp(got) + }) + + b.Run("ComputeAggregation", func(b *testing.B) { + comps := make([]ComputeAggregation, b.N) + for n := range comps { + meas, comp := factory() + for _, attr := range attrs { + meas(ctx, 1, attr) + } + comps[n] = comp + } + + got := &bmarkRes + b.ReportAllocs() + b.ResetTimer() + + for n := 0; n < b.N; n++ { + comps[n](got) + } + }) +} diff --git a/sdk/metric/internal/aggregate/aggregator_test.go b/sdk/metric/internal/aggregate/aggregator_test.go index 4f322694729..80882c7d641 100644 --- a/sdk/metric/internal/aggregate/aggregator_test.go +++ b/sdk/metric/internal/aggregate/aggregator_test.go @@ -18,7 +18,6 @@ import ( "strconv" "sync" "testing" - "time" "github.com/stretchr/testify/assert" @@ -33,20 +32,7 @@ const ( defaultCycles = 3 ) -var ( - bob = attribute.NewSet(attribute.String(keyUser, "bob"), attribute.Bool("admin", false)) - carol = attribute.NewSet(attribute.String(keyUser, "carol"), attribute.Bool("admin", false)) - - // Sat Jan 01 2000 00:00:00 GMT+0000. - staticTime = time.Unix(946684800, 0) - staticNowFunc = func() time.Time { return staticTime } - // Pass to t.Cleanup to override the now function with staticNowFunc and - // revert once the test completes. E.g. t.Cleanup(mockTime(now)). - mockTime = func(orig func() time.Time) (cleanup func()) { - now = staticNowFunc - return func() { now = orig } - } -) +var carol = attribute.NewSet(attribute.String("user", "carol"), attribute.Bool("admin", false)) func monoIncr[N int64 | float64]() setMap[N] { return setMap[N]{alice: 1, bob: 10, carol: 2} diff --git a/sdk/metric/internal/aggregate/lastvalue.go b/sdk/metric/internal/aggregate/lastvalue.go index 7a8e7d3b956..6af2d606141 100644 --- a/sdk/metric/internal/aggregate/lastvalue.go +++ b/sdk/metric/internal/aggregate/lastvalue.go @@ -15,6 +15,7 @@ package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" import ( + "context" "sync" "time" @@ -28,6 +29,10 @@ type datapoint[N int64 | float64] struct { value N } +func newLastValue[N int64 | float64]() *lastValue[N] { + return &lastValue[N]{values: make(map[attribute.Set]datapoint[N])} +} + // lastValue summarizes a set of measurements as the last one made. type lastValue[N int64 | float64] struct { sync.Mutex @@ -35,40 +40,29 @@ type lastValue[N int64 | float64] struct { values map[attribute.Set]datapoint[N] } -// newLastValue returns an Aggregator that summarizes a set of measurements as -// the last one made. -func newLastValue[N int64 | float64]() aggregator[N] { - return &lastValue[N]{values: make(map[attribute.Set]datapoint[N])} -} - -func (s *lastValue[N]) Aggregate(value N, attr attribute.Set) { +func (s *lastValue[N]) measure(ctx context.Context, value N, attr attribute.Set) { d := datapoint[N]{timestamp: now(), value: value} s.Lock() s.values[attr] = d s.Unlock() } -func (s *lastValue[N]) Aggregation() metricdata.Aggregation { +func (s *lastValue[N]) computeAggregation(dest *[]metricdata.DataPoint[N]) { s.Lock() defer s.Unlock() - if len(s.values) == 0 { - return nil - } + n := len(s.values) + *dest = reset(*dest, n, n) - gauge := metricdata.Gauge[N]{ - DataPoints: make([]metricdata.DataPoint[N], 0, len(s.values)), - } + var i int for a, v := range s.values { - gauge.DataPoints = append(gauge.DataPoints, metricdata.DataPoint[N]{ - Attributes: a, - // The event time is the only meaningful timestamp, StartTime is - // ignored. - Time: v.timestamp, - Value: v.value, - }) + (*dest)[i].Attributes = a + // The event time is the only meaningful timestamp, StartTime is + // ignored. + (*dest)[i].Time = v.timestamp + (*dest)[i].Value = v.value // Do not report stale values. delete(s.values, a) + i++ } - return gauge } diff --git a/sdk/metric/internal/aggregate/lastvalue_test.go b/sdk/metric/internal/aggregate/lastvalue_test.go index caf0735ba26..c758eb370c7 100644 --- a/sdk/metric/internal/aggregate/lastvalue_test.go +++ b/sdk/metric/internal/aggregate/lastvalue_test.go @@ -15,12 +15,10 @@ package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" import ( + "context" "testing" - "github.com/stretchr/testify/assert" - "go.opentelemetry.io/otel/sdk/metric/metricdata" - "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" ) func TestLastValue(t *testing.T) { @@ -31,66 +29,69 @@ func TestLastValue(t *testing.T) { } func testLastValue[N int64 | float64]() func(*testing.T) { - tester := &aggregatorTester[N]{ - GoroutineN: defaultGoroutines, - MeasurementN: defaultMeasurements, - CycleN: defaultCycles, - } - - eFunc := func(increments setMap[N]) expectFunc { - data := make([]metricdata.DataPoint[N], 0, len(increments)) - for a, v := range increments { - point := metricdata.DataPoint[N]{Attributes: a, Time: now(), Value: N(v)} - data = append(data, point) - } - gauge := metricdata.Gauge[N]{DataPoints: data} - return func(int) metricdata.Aggregation { return gauge } - } - incr := monoIncr[N]() - return tester.Run(newLastValue[N](), incr, eFunc(incr)) -} - -func testLastValueReset[N int64 | float64](t *testing.T) { - t.Cleanup(mockTime(now)) - - a := newLastValue[N]() - assert.Nil(t, a.Aggregation()) - - a.Aggregate(1, alice) - expect := metricdata.Gauge[N]{ - DataPoints: []metricdata.DataPoint[N]{{ - Attributes: alice, - Time: now(), - Value: 1, - }}, - } - metricdatatest.AssertAggregationsEqual(t, expect, a.Aggregation()) - - // The attr set should be forgotten once Aggregations is called. - expect.DataPoints = nil - assert.Nil(t, a.Aggregation()) - - // Aggregating another set should not affect the original (alice). - a.Aggregate(1, bob) - expect.DataPoints = []metricdata.DataPoint[N]{{ - Attributes: bob, - Time: now(), - Value: 1, - }} - metricdatatest.AssertAggregationsEqual(t, expect, a.Aggregation()) -} - -func TestLastValueReset(t *testing.T) { - t.Run("Int64", testLastValueReset[int64]) - t.Run("Float64", testLastValueReset[float64]) -} - -func TestEmptyLastValueNilAggregation(t *testing.T) { - assert.Nil(t, newLastValue[int64]().Aggregation()) - assert.Nil(t, newLastValue[float64]().Aggregation()) + in, out := Builder[N]{Filter: attrFltr}.LastValue() + ctx := context.Background() + return test[N](in, out, []teststep[N]{ + { + // Empty output if nothing is measured. + input: []arg[N]{}, + expect: output{n: 0, agg: metricdata.Gauge[N]{}}, + }, { + input: []arg[N]{ + {ctx, 1, alice}, + {ctx, -1, bob}, + {ctx, 1, fltrAlice}, + {ctx, 2, alice}, + {ctx, -10, bob}, + }, + expect: output{ + n: 2, + agg: metricdata.Gauge[N]{ + DataPoints: []metricdata.DataPoint[N]{ + { + Attributes: fltrAlice, + Time: staticTime, + Value: 2, + }, + { + Attributes: fltrBob, + Time: staticTime, + Value: -10, + }, + }, + }, + }, + }, { + // Everything resets, do not report old measurements. + input: []arg[N]{}, + expect: output{n: 0, agg: metricdata.Gauge[N]{}}, + }, { + input: []arg[N]{ + {ctx, 10, alice}, + {ctx, 3, bob}, + }, + expect: output{ + n: 2, + agg: metricdata.Gauge[N]{ + DataPoints: []metricdata.DataPoint[N]{ + { + Attributes: fltrAlice, + Time: staticTime, + Value: 10, + }, + { + Attributes: fltrBob, + Time: staticTime, + Value: 3, + }, + }, + }, + }, + }, + }) } func BenchmarkLastValue(b *testing.B) { - b.Run("Int64", benchmarkAggregator(newLastValue[int64])) - b.Run("Float64", benchmarkAggregator(newLastValue[float64])) + b.Run("Int64", benchmarkAggregate(Builder[int64]{}.LastValue)) + b.Run("Float64", benchmarkAggregate(Builder[float64]{}.LastValue)) }