From e47618fc36af51d17ecdcc7299bbf706397e1cb1 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Thu, 29 Aug 2024 13:30:20 -0400 Subject: [PATCH] Fix duplicate instrumentation memory leak (#5754) Fixes https://github.com/open-telemetry/opentelemetry-go/issues/5753 The added test fails on main, but passes after the fix. --------- Co-authored-by: Sam Xie --- CHANGELOG.md | 4 + internal/global/meter.go | 143 ++++++++++++++++++++++++++++++---- internal/global/meter_test.go | 16 +++- 3 files changed, 145 insertions(+), 18 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2591358863c..4969347e79e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ## [Unreleased] +### Fixed + +- Fix memory leak in the global `MeterProvider` when identical instruments are repeatedly created. (#5754) + ### Removed - Drop support for [Go 1.21]. (#5736, #5740) diff --git a/internal/global/meter.go b/internal/global/meter.go index cfd1df9bfa2..897e8b12bbc 100644 --- a/internal/global/meter.go +++ b/internal/global/meter.go @@ -5,6 +5,7 @@ package global // import "go.opentelemetry.io/otel/internal/global" import ( "container/list" + "reflect" "sync" "sync/atomic" @@ -76,7 +77,7 @@ func (p *meterProvider) Meter(name string, opts ...metric.MeterOption) metric.Me return val } - t := &meter{name: name, opts: opts} + t := &meter{name: name, opts: opts, instruments: make(map[instID]delegatedInstrument)} p.meters[key] = t return t } @@ -92,7 +93,7 @@ type meter struct { opts []metric.MeterOption mtx sync.Mutex - instruments []delegatedInstrument + instruments map[instID]delegatedInstrument registry list.List @@ -103,6 +104,18 @@ type delegatedInstrument interface { setDelegate(metric.Meter) } +// instID are the identifying properties of a instrument. +type instID struct { + // name is the name of the stream. + name string + // description is the description of the stream. + description string + // kind defines the functional group of the instrument. + kind reflect.Type + // unit is the unit of the stream. + unit string +} + // setDelegate configures m to delegate all Meter functionality to Meters // created by provider. // @@ -139,7 +152,14 @@ func (m *meter) Int64Counter(name string, options ...metric.Int64CounterOption) m.mtx.Lock() defer m.mtx.Unlock() i := &siCounter{name: name, opts: options} - m.instruments = append(m.instruments, i) + cfg := metric.NewInt64CounterConfig(options...) + id := instID{ + name: name, + kind: reflect.TypeOf(i), + description: cfg.Description(), + unit: cfg.Unit(), + } + m.instruments[id] = i return i, nil } @@ -150,7 +170,14 @@ func (m *meter) Int64UpDownCounter(name string, options ...metric.Int64UpDownCou m.mtx.Lock() defer m.mtx.Unlock() i := &siUpDownCounter{name: name, opts: options} - m.instruments = append(m.instruments, i) + cfg := metric.NewInt64UpDownCounterConfig(options...) + id := instID{ + name: name, + kind: reflect.TypeOf(i), + description: cfg.Description(), + unit: cfg.Unit(), + } + m.instruments[id] = i return i, nil } @@ -161,7 +188,14 @@ func (m *meter) Int64Histogram(name string, options ...metric.Int64HistogramOpti m.mtx.Lock() defer m.mtx.Unlock() i := &siHistogram{name: name, opts: options} - m.instruments = append(m.instruments, i) + cfg := metric.NewInt64HistogramConfig(options...) + id := instID{ + name: name, + kind: reflect.TypeOf(i), + description: cfg.Description(), + unit: cfg.Unit(), + } + m.instruments[id] = i return i, nil } @@ -172,7 +206,14 @@ func (m *meter) Int64Gauge(name string, options ...metric.Int64GaugeOption) (met m.mtx.Lock() defer m.mtx.Unlock() i := &siGauge{name: name, opts: options} - m.instruments = append(m.instruments, i) + cfg := metric.NewInt64GaugeConfig(options...) + id := instID{ + name: name, + kind: reflect.TypeOf(i), + description: cfg.Description(), + unit: cfg.Unit(), + } + m.instruments[id] = i return i, nil } @@ -183,7 +224,14 @@ func (m *meter) Int64ObservableCounter(name string, options ...metric.Int64Obser m.mtx.Lock() defer m.mtx.Unlock() i := &aiCounter{name: name, opts: options} - m.instruments = append(m.instruments, i) + cfg := metric.NewInt64ObservableCounterConfig(options...) + id := instID{ + name: name, + kind: reflect.TypeOf(i), + description: cfg.Description(), + unit: cfg.Unit(), + } + m.instruments[id] = i return i, nil } @@ -194,7 +242,14 @@ func (m *meter) Int64ObservableUpDownCounter(name string, options ...metric.Int6 m.mtx.Lock() defer m.mtx.Unlock() i := &aiUpDownCounter{name: name, opts: options} - m.instruments = append(m.instruments, i) + cfg := metric.NewInt64ObservableUpDownCounterConfig(options...) + id := instID{ + name: name, + kind: reflect.TypeOf(i), + description: cfg.Description(), + unit: cfg.Unit(), + } + m.instruments[id] = i return i, nil } @@ -205,7 +260,14 @@ func (m *meter) Int64ObservableGauge(name string, options ...metric.Int64Observa m.mtx.Lock() defer m.mtx.Unlock() i := &aiGauge{name: name, opts: options} - m.instruments = append(m.instruments, i) + cfg := metric.NewInt64ObservableGaugeConfig(options...) + id := instID{ + name: name, + kind: reflect.TypeOf(i), + description: cfg.Description(), + unit: cfg.Unit(), + } + m.instruments[id] = i return i, nil } @@ -216,7 +278,14 @@ func (m *meter) Float64Counter(name string, options ...metric.Float64CounterOpti m.mtx.Lock() defer m.mtx.Unlock() i := &sfCounter{name: name, opts: options} - m.instruments = append(m.instruments, i) + cfg := metric.NewFloat64CounterConfig(options...) + id := instID{ + name: name, + kind: reflect.TypeOf(i), + description: cfg.Description(), + unit: cfg.Unit(), + } + m.instruments[id] = i return i, nil } @@ -227,7 +296,14 @@ func (m *meter) Float64UpDownCounter(name string, options ...metric.Float64UpDow m.mtx.Lock() defer m.mtx.Unlock() i := &sfUpDownCounter{name: name, opts: options} - m.instruments = append(m.instruments, i) + cfg := metric.NewFloat64UpDownCounterConfig(options...) + id := instID{ + name: name, + kind: reflect.TypeOf(i), + description: cfg.Description(), + unit: cfg.Unit(), + } + m.instruments[id] = i return i, nil } @@ -238,7 +314,14 @@ func (m *meter) Float64Histogram(name string, options ...metric.Float64Histogram m.mtx.Lock() defer m.mtx.Unlock() i := &sfHistogram{name: name, opts: options} - m.instruments = append(m.instruments, i) + cfg := metric.NewFloat64HistogramConfig(options...) + id := instID{ + name: name, + kind: reflect.TypeOf(i), + description: cfg.Description(), + unit: cfg.Unit(), + } + m.instruments[id] = i return i, nil } @@ -249,7 +332,14 @@ func (m *meter) Float64Gauge(name string, options ...metric.Float64GaugeOption) m.mtx.Lock() defer m.mtx.Unlock() i := &sfGauge{name: name, opts: options} - m.instruments = append(m.instruments, i) + cfg := metric.NewFloat64GaugeConfig(options...) + id := instID{ + name: name, + kind: reflect.TypeOf(i), + description: cfg.Description(), + unit: cfg.Unit(), + } + m.instruments[id] = i return i, nil } @@ -260,7 +350,14 @@ func (m *meter) Float64ObservableCounter(name string, options ...metric.Float64O m.mtx.Lock() defer m.mtx.Unlock() i := &afCounter{name: name, opts: options} - m.instruments = append(m.instruments, i) + cfg := metric.NewFloat64ObservableCounterConfig(options...) + id := instID{ + name: name, + kind: reflect.TypeOf(i), + description: cfg.Description(), + unit: cfg.Unit(), + } + m.instruments[id] = i return i, nil } @@ -271,7 +368,14 @@ func (m *meter) Float64ObservableUpDownCounter(name string, options ...metric.Fl m.mtx.Lock() defer m.mtx.Unlock() i := &afUpDownCounter{name: name, opts: options} - m.instruments = append(m.instruments, i) + cfg := metric.NewFloat64ObservableUpDownCounterConfig(options...) + id := instID{ + name: name, + kind: reflect.TypeOf(i), + description: cfg.Description(), + unit: cfg.Unit(), + } + m.instruments[id] = i return i, nil } @@ -282,7 +386,14 @@ func (m *meter) Float64ObservableGauge(name string, options ...metric.Float64Obs m.mtx.Lock() defer m.mtx.Unlock() i := &afGauge{name: name, opts: options} - m.instruments = append(m.instruments, i) + cfg := metric.NewFloat64ObservableGaugeConfig(options...) + id := instID{ + name: name, + kind: reflect.TypeOf(i), + description: cfg.Description(), + unit: cfg.Unit(), + } + m.instruments[id] = i return i, nil } diff --git a/internal/global/meter_test.go b/internal/global/meter_test.go index 144f89ef9c3..a99213e3a9b 100644 --- a/internal/global/meter_test.go +++ b/internal/global/meter_test.go @@ -42,7 +42,7 @@ var zeroCallback metric.Callback = func(ctx context.Context, or metric.Observer) } func TestMeterConcurrentSafe(t *testing.T) { - mtr := &meter{} + mtr := &meter{instruments: make(map[instID]delegatedInstrument)} wg := &sync.WaitGroup{} wg.Add(1) @@ -86,7 +86,7 @@ func TestMeterConcurrentSafe(t *testing.T) { } func TestUnregisterConcurrentSafe(t *testing.T) { - mtr := &meter{} + mtr := &meter{instruments: make(map[instID]delegatedInstrument)} reg, err := mtr.RegisterCallback(zeroCallback) require.NoError(t, err) @@ -176,6 +176,18 @@ func testCollect(t *testing.T, m metric.Meter) { tMeter.collect() } +func TestInstrumentIdentity(t *testing.T) { + globalMeterProvider := &meterProvider{} + m := globalMeterProvider.Meter("go.opentelemetry.io/otel/metric/internal/global/meter_test") + tMeter := m.(*meter) + testSetupAllInstrumentTypes(t, m) + assert.Len(t, tMeter.instruments, 14) + // Creating the same instruments multiple times should not increase the + // number of instruments. + testSetupAllInstrumentTypes(t, m) + assert.Len(t, tMeter.instruments, 14) +} + func TestMeterProviderDelegatesCalls(t *testing.T) { // The global MeterProvider should directly call the underlying MeterProvider // if it is set prior to Meter() being called.