From 52d743047bbdd64fc9d521d39a25898dec4390c1 Mon Sep 17 00:00:00 2001 From: Nathan Oorloff Date: Fri, 11 Oct 2024 11:02:57 -0700 Subject: [PATCH] Batch Mutable State Metrics --- common/metrics/metrics.go | 5 +++ common/metrics/metrics_mock.go | 16 ++++++++ common/metrics/noop_impl.go | 9 +++++ common/metrics/otel_metrics_handler.go | 9 +++++ common/metrics/tally_metrics_handler.go | 43 ++++++++++++-------- service/history/workflow/metrics.go | 54 +++++++++++++------------ 6 files changed, 94 insertions(+), 42 deletions(-) diff --git a/common/metrics/metrics.go b/common/metrics/metrics.go index 9743d946348..5900698d1f7 100644 --- a/common/metrics/metrics.go +++ b/common/metrics/metrics.go @@ -27,6 +27,7 @@ package metrics import ( + "io" "time" "go.temporal.io/server/common/log" @@ -58,6 +59,10 @@ type ( Histogram(string, MetricUnit) HistogramIface Stop(log.Logger) + + // BatchStart returns a Handler that where supported, can emit a series of metrics as a single "wide event". + // If wide events aren't supported in the underlying implementation, metrics can still be sent individually. + BatchStart(string) (Handler, io.Closer) } // CounterIface is an ever-increasing counter. diff --git a/common/metrics/metrics_mock.go b/common/metrics/metrics_mock.go index 0760074ca33..0d2fd6b4fe0 100644 --- a/common/metrics/metrics_mock.go +++ b/common/metrics/metrics_mock.go @@ -34,6 +34,7 @@ package metrics import ( + io "io" reflect "reflect" time "time" @@ -64,6 +65,21 @@ func (m *MockHandler) EXPECT() *MockHandlerMockRecorder { return m.recorder } +// BatchStart mocks base method. +func (m *MockHandler) BatchStart(arg0 string) (Handler, io.Closer) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "BatchStart", arg0) + ret0, _ := ret[0].(Handler) + ret1, _ := ret[1].(io.Closer) + return ret0, ret1 +} + +// BatchStart indicates an expected call of BatchStart. +func (mr *MockHandlerMockRecorder) BatchStart(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BatchStart", reflect.TypeOf((*MockHandler)(nil).BatchStart), arg0) +} + // Counter mocks base method. func (m *MockHandler) Counter(arg0 string) CounterIface { m.ctrl.T.Helper() diff --git a/common/metrics/noop_impl.go b/common/metrics/noop_impl.go index 081c75b51a8..d527f4aa9e5 100644 --- a/common/metrics/noop_impl.go +++ b/common/metrics/noop_impl.go @@ -25,6 +25,7 @@ package metrics import ( + "io" "time" "go.temporal.io/server/common/log" @@ -68,6 +69,14 @@ func (*noopMetricsHandler) Histogram(string, MetricUnit) HistogramIface { func (*noopMetricsHandler) Stop(log.Logger) {} +func (*noopMetricsHandler) Close() error { + return nil +} + +func (n *noopMetricsHandler) BatchStart(_ string) (Handler, io.Closer) { + return n, n +} + var NoopCounterMetricFunc = CounterFunc(func(i int64, t ...Tag) {}) var NoopGaugeMetricFunc = GaugeFunc(func(f float64, t ...Tag) {}) var NoopTimerMetricFunc = TimerFunc(func(d time.Duration, t ...Tag) {}) diff --git a/common/metrics/otel_metrics_handler.go b/common/metrics/otel_metrics_handler.go index 1247aedf46e..65c3bdf97fe 100644 --- a/common/metrics/otel_metrics_handler.go +++ b/common/metrics/otel_metrics_handler.go @@ -27,6 +27,7 @@ package metrics import ( "context" "fmt" + "io" "sync" "time" @@ -204,6 +205,14 @@ func (omp *otelMetricsHandler) Stop(l log.Logger) { omp.provider.Stop(l) } +func (omp *otelMetricsHandler) Close() error { + return nil +} + +func (omp *otelMetricsHandler) BatchStart(_ string) (Handler, io.Closer) { + return omp, omp +} + // makeSet returns an otel attribute.Set with the given tags merged with the // otelMetricsHandler's tags. func (omp *otelMetricsHandler) makeSet(tags []Tag) attribute.Set { diff --git a/common/metrics/tally_metrics_handler.go b/common/metrics/tally_metrics_handler.go index 806e2983bb0..c5150554c84 100644 --- a/common/metrics/tally_metrics_handler.go +++ b/common/metrics/tally_metrics_handler.go @@ -25,6 +25,7 @@ package metrics import ( + "io" "time" "github.com/uber-go/tally/v4" @@ -66,60 +67,68 @@ func NewTallyMetricsHandler(cfg ClientConfig, scope tally.Scope) *tallyMetricsHa // WithTags creates a new MetricProvder with provided []Tag // Tags are merged with registered Tags from the source MetricsHandler -func (tmp *tallyMetricsHandler) WithTags(tags ...Tag) Handler { +func (tmh *tallyMetricsHandler) WithTags(tags ...Tag) Handler { return &tallyMetricsHandler{ - scope: tmp.scope.Tagged(tagsToMap(tags, tmp.excludeTags)), - perUnitBuckets: tmp.perUnitBuckets, - excludeTags: tmp.excludeTags, + scope: tmh.scope.Tagged(tagsToMap(tags, tmh.excludeTags)), + perUnitBuckets: tmh.perUnitBuckets, + excludeTags: tmh.excludeTags, } } // Counter obtains a counter for the given name. -func (tmp *tallyMetricsHandler) Counter(counter string) CounterIface { +func (tmh *tallyMetricsHandler) Counter(counter string) CounterIface { return CounterFunc(func(i int64, t ...Tag) { - scope := tmp.scope + scope := tmh.scope if len(t) > 0 { - scope = tmp.scope.Tagged(tagsToMap(t, tmp.excludeTags)) + scope = tmh.scope.Tagged(tagsToMap(t, tmh.excludeTags)) } scope.Counter(counter).Inc(i) }) } // Gauge obtains a gauge for the given name. -func (tmp *tallyMetricsHandler) Gauge(gauge string) GaugeIface { +func (tmh *tallyMetricsHandler) Gauge(gauge string) GaugeIface { return GaugeFunc(func(f float64, t ...Tag) { - scope := tmp.scope + scope := tmh.scope if len(t) > 0 { - scope = tmp.scope.Tagged(tagsToMap(t, tmp.excludeTags)) + scope = tmh.scope.Tagged(tagsToMap(t, tmh.excludeTags)) } scope.Gauge(gauge).Update(f) }) } // Timer obtains a timer for the given name. -func (tmp *tallyMetricsHandler) Timer(timer string) TimerIface { +func (tmh *tallyMetricsHandler) Timer(timer string) TimerIface { return TimerFunc(func(d time.Duration, t ...Tag) { - scope := tmp.scope + scope := tmh.scope if len(t) > 0 { - scope = tmp.scope.Tagged(tagsToMap(t, tmp.excludeTags)) + scope = tmh.scope.Tagged(tagsToMap(t, tmh.excludeTags)) } scope.Timer(timer).Record(d) }) } // Histogram obtains a histogram for the given name. -func (tmp *tallyMetricsHandler) Histogram(histogram string, unit MetricUnit) HistogramIface { +func (tmh *tallyMetricsHandler) Histogram(histogram string, unit MetricUnit) HistogramIface { return HistogramFunc(func(i int64, t ...Tag) { - scope := tmp.scope + scope := tmh.scope if len(t) > 0 { - scope = tmp.scope.Tagged(tagsToMap(t, tmp.excludeTags)) + scope = tmh.scope.Tagged(tagsToMap(t, tmh.excludeTags)) } - scope.Histogram(histogram, tmp.perUnitBuckets[unit]).RecordValue(float64(i)) + scope.Histogram(histogram, tmh.perUnitBuckets[unit]).RecordValue(float64(i)) }) } func (*tallyMetricsHandler) Stop(log.Logger) {} +func (*tallyMetricsHandler) Close() error { + return nil +} + +func (tmh *tallyMetricsHandler) BatchStart(_ string) (Handler, io.Closer) { + return tmh, tmh +} + func tagsToMap(t1 []Tag, e excludeTags) map[string]string { if len(t1) == 0 { return nil diff --git a/service/history/workflow/metrics.go b/service/history/workflow/metrics.go index 7e39759dda8..b9a3c3c8dc7 100644 --- a/service/history/workflow/metrics.go +++ b/service/history/workflow/metrics.go @@ -60,36 +60,40 @@ func emitMutableStateStatus( if stats == nil { return } - metrics.MutableStateSize.With(metricsHandler).Record(int64(stats.TotalSize)) - metrics.ExecutionInfoSize.With(metricsHandler).Record(int64(stats.ExecutionInfoSize)) - metrics.ExecutionStateSize.With(metricsHandler).Record(int64(stats.ExecutionStateSize)) - metrics.ActivityInfoSize.With(metricsHandler).Record(int64(stats.ActivityInfoSize)) - metrics.ActivityInfoCount.With(metricsHandler).Record(int64(stats.ActivityInfoCount)) - metrics.TotalActivityCount.With(metricsHandler).Record(stats.TotalActivityCount) - metrics.TimerInfoSize.With(metricsHandler).Record(int64(stats.TimerInfoSize)) - metrics.TimerInfoCount.With(metricsHandler).Record(int64(stats.TimerInfoCount)) - metrics.TotalUserTimerCount.With(metricsHandler).Record(stats.TotalUserTimerCount) - metrics.ChildInfoSize.With(metricsHandler).Record(int64(stats.ChildInfoSize)) - metrics.ChildInfoCount.With(metricsHandler).Record(int64(stats.ChildInfoCount)) - metrics.TotalChildExecutionCount.With(metricsHandler).Record(stats.TotalChildExecutionCount) - metrics.RequestCancelInfoSize.With(metricsHandler).Record(int64(stats.RequestCancelInfoSize)) - metrics.RequestCancelInfoCount.With(metricsHandler).Record(int64(stats.RequestCancelInfoCount)) - metrics.TotalRequestCancelExternalCount.With(metricsHandler).Record(stats.TotalRequestCancelExternalCount) - metrics.SignalInfoSize.With(metricsHandler).Record(int64(stats.SignalInfoSize)) - metrics.SignalInfoCount.With(metricsHandler).Record(int64(stats.SignalInfoCount)) - metrics.TotalSignalExternalCount.With(metricsHandler).Record(stats.TotalSignalExternalCount) - metrics.SignalRequestIDSize.With(metricsHandler).Record(int64(stats.SignalRequestIDSize)) - metrics.SignalRequestIDCount.With(metricsHandler).Record(int64(stats.SignalRequestIDCount)) - metrics.TotalSignalCount.With(metricsHandler).Record(stats.TotalSignalCount) - metrics.BufferedEventsSize.With(metricsHandler).Record(int64(stats.BufferedEventsSize)) - metrics.BufferedEventsCount.With(metricsHandler).Record(int64(stats.BufferedEventsCount)) + + batchHandler, closer := metricsHandler.BatchStart("mutable_state_status") + defer closer.Close() + metrics.MutableStateSize.With(batchHandler).Record(int64(stats.TotalSize)) + metrics.ExecutionInfoSize.With(batchHandler).Record(int64(stats.ExecutionInfoSize)) + metrics.ExecutionStateSize.With(batchHandler).Record(int64(stats.ExecutionStateSize)) + metrics.ActivityInfoSize.With(batchHandler).Record(int64(stats.ActivityInfoSize)) + metrics.ActivityInfoCount.With(batchHandler).Record(int64(stats.ActivityInfoCount)) + metrics.TotalActivityCount.With(batchHandler).Record(stats.TotalActivityCount) + metrics.TimerInfoSize.With(batchHandler).Record(int64(stats.TimerInfoSize)) + metrics.TimerInfoCount.With(batchHandler).Record(int64(stats.TimerInfoCount)) + metrics.TotalUserTimerCount.With(batchHandler).Record(stats.TotalUserTimerCount) + metrics.ChildInfoSize.With(batchHandler).Record(int64(stats.ChildInfoSize)) + metrics.ChildInfoCount.With(batchHandler).Record(int64(stats.ChildInfoCount)) + metrics.TotalChildExecutionCount.With(batchHandler).Record(stats.TotalChildExecutionCount) + metrics.RequestCancelInfoSize.With(batchHandler).Record(int64(stats.RequestCancelInfoSize)) + metrics.RequestCancelInfoCount.With(batchHandler).Record(int64(stats.RequestCancelInfoCount)) + metrics.TotalRequestCancelExternalCount.With(batchHandler).Record(stats.TotalRequestCancelExternalCount) + metrics.SignalInfoSize.With(batchHandler).Record(int64(stats.SignalInfoSize)) + metrics.SignalInfoCount.With(batchHandler).Record(int64(stats.SignalInfoCount)) + metrics.TotalSignalExternalCount.With(batchHandler).Record(stats.TotalSignalExternalCount) + metrics.SignalRequestIDSize.With(batchHandler).Record(int64(stats.SignalRequestIDSize)) + metrics.SignalRequestIDCount.With(batchHandler).Record(int64(stats.SignalRequestIDCount)) + metrics.TotalSignalCount.With(batchHandler).Record(stats.TotalSignalCount) + metrics.BufferedEventsSize.With(batchHandler).Record(int64(stats.BufferedEventsSize)) + metrics.BufferedEventsCount.With(batchHandler).Record(int64(stats.BufferedEventsCount)) if stats.HistoryStatistics != nil { - metrics.HistorySize.With(metricsHandler).Record(int64(stats.HistoryStatistics.SizeDiff)) - metrics.HistoryCount.With(metricsHandler).Record(int64(stats.HistoryStatistics.CountDiff)) + metrics.HistorySize.With(batchHandler).Record(int64(stats.HistoryStatistics.SizeDiff)) + metrics.HistoryCount.With(batchHandler).Record(int64(stats.HistoryStatistics.CountDiff)) } for category, taskCount := range stats.TaskCountByCategory { + // We use the metricsHandler rather than the batchHandler here because the same metric is repeatedly sent metrics.TaskCount.With(metricsHandler). Record(int64(taskCount), metrics.TaskCategoryTag(category)) }