Skip to content

Commit

Permalink
Simplify the last-value aggregate (#4343)
Browse files Browse the repository at this point in the history
Instead of treating the returned *lastValue as an aggregator from
newLastValue, just use the type directly to construct the Measure and
ComputeAggregation functions returned from the Builder.

Accept a destination type for the underlying computeAggregation. This
allows memory reuse for collections which adds a considerable
optimization.

Simplify the integration testing of the last-value aggregate.

Update benchmarking.
  • Loading branch information
MrAlias authored Jul 21, 2023
1 parent a37cb05 commit cbc5890
Show file tree
Hide file tree
Showing 5 changed files with 216 additions and 106 deletions.
30 changes: 26 additions & 4 deletions sdk/metric/internal/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,17 @@ type Builder[N int64 | float64] struct {
Filter attribute.Filter
}

func (b Builder[N]) filter(f Measure[N]) Measure[N] {
if b.Filter != nil {
fltr := b.Filter // Copy to make it immutable after assignment.
return func(ctx context.Context, n N, a attribute.Set) {
fAttr, _ := a.Filter(fltr)
f(ctx, n, fAttr)
}
}
return f
}

func (b Builder[N]) input(agg aggregator[N]) Measure[N] {
if b.Filter != nil {
fltr := b.Filter // Copy to make it immutable after assignment.
Expand All @@ -63,11 +74,13 @@ func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) {
// a last-value aggregate.
lv := newLastValue[N]()

return b.input(lv), func(dest *metricdata.Aggregation) int {
// TODO (#4220): optimize memory reuse here.
*dest = lv.Aggregation()

return b.filter(lv.measure), func(dest *metricdata.Aggregation) int {
// Ignore if dest is not a metricdata.Gauge. The chance for memory
// reuse of the DataPoints is missed (better luck next time).
gData, _ := (*dest).(metricdata.Gauge[N])
lv.computeAggregation(&gData.DataPoints)
*dest = gData

return len(gData.DataPoints)
}
}
Expand Down Expand Up @@ -129,3 +142,12 @@ func (b Builder[N]) ExplicitBucketHistogram(cfg aggregation.ExplicitBucketHistog
return len(hData.DataPoints)
}
}

// reset ensures s has capacity and sets it length. If the capacity of s too
// small, a new slice is returned with the specified capacity and length.
func reset[T any](s []T, length, capacity int) []T {
if cap(s) < capacity {
return make([]T, length, capacity)
}
return s[:length]
}
113 changes: 110 additions & 3 deletions sdk/metric/internal/aggregate/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,43 @@ package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggreg

import (
"context"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/assert"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
)

var (
keyUser = "user"
userAlice = attribute.String(keyUser, "Alice")
adminTrue = attribute.Bool("admin", true)
keyUser = "user"
userAlice = attribute.String(keyUser, "Alice")
userBob = attribute.String(keyUser, "Bob")
adminTrue = attribute.Bool("admin", true)
adminFalse = attribute.Bool("admin", false)

alice = attribute.NewSet(userAlice, adminTrue)
bob = attribute.NewSet(userBob, adminFalse)

// Filtered.
attrFltr = func(kv attribute.KeyValue) bool {
return kv.Key == attribute.Key(keyUser)
}
fltrAlice = attribute.NewSet(userAlice)
fltrBob = attribute.NewSet(userBob)

// Sat Jan 01 2000 00:00:00 GMT+0000.
staticTime = time.Unix(946684800, 0)
staticNowFunc = func() time.Time { return staticTime }
// Pass to t.Cleanup to override the now function with staticNowFunc and
// revert once the test completes. E.g. t.Cleanup(mockTime(now)).
mockTime = func(orig func() time.Time) (cleanup func()) {
now = staticNowFunc
return func() { now = orig }
}
)

type inputTester[N int64 | float64] struct {
Expand Down Expand Up @@ -73,3 +91,92 @@ func testBuilderInput[N int64 | float64]() func(t *testing.T) {
t.Run("Filter", run(Builder[N]{Filter: attrFltr}, fltrAlice))
}
}

type arg[N int64 | float64] struct {
ctx context.Context

value N
attr attribute.Set
}

type output struct {
n int
agg metricdata.Aggregation
}

type teststep[N int64 | float64] struct {
input []arg[N]
expect output
}

func test[N int64 | float64](meas Measure[N], comp ComputeAggregation, steps []teststep[N]) func(*testing.T) {
return func(t *testing.T) {
t.Helper()

got := new(metricdata.Aggregation)
for i, step := range steps {
for _, args := range step.input {
meas(args.ctx, args.value, args.attr)
}

t.Logf("step: %d", i)
assert.Equal(t, step.expect.n, comp(got), "incorrect data size")
metricdatatest.AssertAggregationsEqual(t, step.expect.agg, *got)
}
}
}

func benchmarkAggregate[N int64 | float64](factory func() (Measure[N], ComputeAggregation)) func(*testing.B) {
counts := []int{1, 10, 100}
return func(b *testing.B) {
for _, n := range counts {
b.Run(strconv.Itoa(n), func(b *testing.B) {
benchmarkAggregateN(b, factory, n)
})
}
}
}

var bmarkRes metricdata.Aggregation

func benchmarkAggregateN[N int64 | float64](b *testing.B, factory func() (Measure[N], ComputeAggregation), count int) {
ctx := context.Background()
attrs := make([]attribute.Set, count)
for i := range attrs {
attrs[i] = attribute.NewSet(attribute.Int("value", i))
}

b.Run("Measure", func(b *testing.B) {
got := &bmarkRes
meas, comp := factory()
b.ReportAllocs()
b.ResetTimer()

for n := 0; n < b.N; n++ {
for _, attr := range attrs {
meas(ctx, 1, attr)
}
}

comp(got)
})

b.Run("ComputeAggregation", func(b *testing.B) {
comps := make([]ComputeAggregation, b.N)
for n := range comps {
meas, comp := factory()
for _, attr := range attrs {
meas(ctx, 1, attr)
}
comps[n] = comp
}

got := &bmarkRes
b.ReportAllocs()
b.ResetTimer()

for n := 0; n < b.N; n++ {
comps[n](got)
}
})
}
16 changes: 1 addition & 15 deletions sdk/metric/internal/aggregate/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"strconv"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"

Expand All @@ -33,20 +32,7 @@ const (
defaultCycles = 3
)

var (
bob = attribute.NewSet(attribute.String(keyUser, "bob"), attribute.Bool("admin", false))
carol = attribute.NewSet(attribute.String(keyUser, "carol"), attribute.Bool("admin", false))

// Sat Jan 01 2000 00:00:00 GMT+0000.
staticTime = time.Unix(946684800, 0)
staticNowFunc = func() time.Time { return staticTime }
// Pass to t.Cleanup to override the now function with staticNowFunc and
// revert once the test completes. E.g. t.Cleanup(mockTime(now)).
mockTime = func(orig func() time.Time) (cleanup func()) {
now = staticNowFunc
return func() { now = orig }
}
)
var carol = attribute.NewSet(attribute.String("user", "carol"), attribute.Bool("admin", false))

func monoIncr[N int64 | float64]() setMap[N] {
return setMap[N]{alice: 1, bob: 10, carol: 2}
Expand Down
38 changes: 16 additions & 22 deletions sdk/metric/internal/aggregate/lastvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"

import (
"context"
"sync"
"time"

Expand All @@ -28,47 +29,40 @@ type datapoint[N int64 | float64] struct {
value N
}

func newLastValue[N int64 | float64]() *lastValue[N] {
return &lastValue[N]{values: make(map[attribute.Set]datapoint[N])}
}

// lastValue summarizes a set of measurements as the last one made.
type lastValue[N int64 | float64] struct {
sync.Mutex

values map[attribute.Set]datapoint[N]
}

// newLastValue returns an Aggregator that summarizes a set of measurements as
// the last one made.
func newLastValue[N int64 | float64]() aggregator[N] {
return &lastValue[N]{values: make(map[attribute.Set]datapoint[N])}
}

func (s *lastValue[N]) Aggregate(value N, attr attribute.Set) {
func (s *lastValue[N]) measure(ctx context.Context, value N, attr attribute.Set) {
d := datapoint[N]{timestamp: now(), value: value}
s.Lock()
s.values[attr] = d
s.Unlock()
}

func (s *lastValue[N]) Aggregation() metricdata.Aggregation {
func (s *lastValue[N]) computeAggregation(dest *[]metricdata.DataPoint[N]) {
s.Lock()
defer s.Unlock()

if len(s.values) == 0 {
return nil
}
n := len(s.values)
*dest = reset(*dest, n, n)

gauge := metricdata.Gauge[N]{
DataPoints: make([]metricdata.DataPoint[N], 0, len(s.values)),
}
var i int
for a, v := range s.values {
gauge.DataPoints = append(gauge.DataPoints, metricdata.DataPoint[N]{
Attributes: a,
// The event time is the only meaningful timestamp, StartTime is
// ignored.
Time: v.timestamp,
Value: v.value,
})
(*dest)[i].Attributes = a
// The event time is the only meaningful timestamp, StartTime is
// ignored.
(*dest)[i].Time = v.timestamp
(*dest)[i].Value = v.value
// Do not report stale values.
delete(s.values, a)
i++
}
return gauge
}
Loading

0 comments on commit cbc5890

Please sign in to comment.