Skip to content

Commit

Permalink
Batch Mutable State Metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
njo committed Oct 11, 2024
1 parent c20f6a1 commit 52d7430
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 42 deletions.
5 changes: 5 additions & 0 deletions common/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
package metrics

import (
"io"
"time"

"go.temporal.io/server/common/log"
Expand Down Expand Up @@ -58,6 +59,10 @@ type (
Histogram(string, MetricUnit) HistogramIface

Stop(log.Logger)

// BatchStart returns a Handler that where supported, can emit a series of metrics as a single "wide event".
// If wide events aren't supported in the underlying implementation, metrics can still be sent individually.
BatchStart(string) (Handler, io.Closer)
}

// CounterIface is an ever-increasing counter.
Expand Down
16 changes: 16 additions & 0 deletions common/metrics/metrics_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions common/metrics/noop_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package metrics

import (
"io"
"time"

"go.temporal.io/server/common/log"
Expand Down Expand Up @@ -68,6 +69,14 @@ func (*noopMetricsHandler) Histogram(string, MetricUnit) HistogramIface {

func (*noopMetricsHandler) Stop(log.Logger) {}

func (*noopMetricsHandler) Close() error {
return nil
}

func (n *noopMetricsHandler) BatchStart(_ string) (Handler, io.Closer) {
return n, n
}

var NoopCounterMetricFunc = CounterFunc(func(i int64, t ...Tag) {})
var NoopGaugeMetricFunc = GaugeFunc(func(f float64, t ...Tag) {})
var NoopTimerMetricFunc = TimerFunc(func(d time.Duration, t ...Tag) {})
Expand Down
9 changes: 9 additions & 0 deletions common/metrics/otel_metrics_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ package metrics
import (
"context"
"fmt"
"io"
"sync"
"time"

Expand Down Expand Up @@ -204,6 +205,14 @@ func (omp *otelMetricsHandler) Stop(l log.Logger) {
omp.provider.Stop(l)
}

func (omp *otelMetricsHandler) Close() error {
return nil
}

func (omp *otelMetricsHandler) BatchStart(_ string) (Handler, io.Closer) {
return omp, omp
}

// makeSet returns an otel attribute.Set with the given tags merged with the
// otelMetricsHandler's tags.
func (omp *otelMetricsHandler) makeSet(tags []Tag) attribute.Set {
Expand Down
43 changes: 26 additions & 17 deletions common/metrics/tally_metrics_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package metrics

import (
"io"
"time"

"github.com/uber-go/tally/v4"
Expand Down Expand Up @@ -66,60 +67,68 @@ func NewTallyMetricsHandler(cfg ClientConfig, scope tally.Scope) *tallyMetricsHa

// WithTags creates a new MetricProvder with provided []Tag
// Tags are merged with registered Tags from the source MetricsHandler
func (tmp *tallyMetricsHandler) WithTags(tags ...Tag) Handler {
func (tmh *tallyMetricsHandler) WithTags(tags ...Tag) Handler {
return &tallyMetricsHandler{
scope: tmp.scope.Tagged(tagsToMap(tags, tmp.excludeTags)),
perUnitBuckets: tmp.perUnitBuckets,
excludeTags: tmp.excludeTags,
scope: tmh.scope.Tagged(tagsToMap(tags, tmh.excludeTags)),
perUnitBuckets: tmh.perUnitBuckets,
excludeTags: tmh.excludeTags,
}
}

// Counter obtains a counter for the given name.
func (tmp *tallyMetricsHandler) Counter(counter string) CounterIface {
func (tmh *tallyMetricsHandler) Counter(counter string) CounterIface {
return CounterFunc(func(i int64, t ...Tag) {
scope := tmp.scope
scope := tmh.scope
if len(t) > 0 {
scope = tmp.scope.Tagged(tagsToMap(t, tmp.excludeTags))
scope = tmh.scope.Tagged(tagsToMap(t, tmh.excludeTags))
}
scope.Counter(counter).Inc(i)
})
}

// Gauge obtains a gauge for the given name.
func (tmp *tallyMetricsHandler) Gauge(gauge string) GaugeIface {
func (tmh *tallyMetricsHandler) Gauge(gauge string) GaugeIface {
return GaugeFunc(func(f float64, t ...Tag) {
scope := tmp.scope
scope := tmh.scope
if len(t) > 0 {
scope = tmp.scope.Tagged(tagsToMap(t, tmp.excludeTags))
scope = tmh.scope.Tagged(tagsToMap(t, tmh.excludeTags))
}
scope.Gauge(gauge).Update(f)
})
}

// Timer obtains a timer for the given name.
func (tmp *tallyMetricsHandler) Timer(timer string) TimerIface {
func (tmh *tallyMetricsHandler) Timer(timer string) TimerIface {
return TimerFunc(func(d time.Duration, t ...Tag) {
scope := tmp.scope
scope := tmh.scope
if len(t) > 0 {
scope = tmp.scope.Tagged(tagsToMap(t, tmp.excludeTags))
scope = tmh.scope.Tagged(tagsToMap(t, tmh.excludeTags))
}
scope.Timer(timer).Record(d)
})
}

// Histogram obtains a histogram for the given name.
func (tmp *tallyMetricsHandler) Histogram(histogram string, unit MetricUnit) HistogramIface {
func (tmh *tallyMetricsHandler) Histogram(histogram string, unit MetricUnit) HistogramIface {
return HistogramFunc(func(i int64, t ...Tag) {
scope := tmp.scope
scope := tmh.scope
if len(t) > 0 {
scope = tmp.scope.Tagged(tagsToMap(t, tmp.excludeTags))
scope = tmh.scope.Tagged(tagsToMap(t, tmh.excludeTags))
}
scope.Histogram(histogram, tmp.perUnitBuckets[unit]).RecordValue(float64(i))
scope.Histogram(histogram, tmh.perUnitBuckets[unit]).RecordValue(float64(i))
})
}

func (*tallyMetricsHandler) Stop(log.Logger) {}

func (*tallyMetricsHandler) Close() error {
return nil
}

func (tmh *tallyMetricsHandler) BatchStart(_ string) (Handler, io.Closer) {
return tmh, tmh
}

func tagsToMap(t1 []Tag, e excludeTags) map[string]string {
if len(t1) == 0 {
return nil
Expand Down
54 changes: 29 additions & 25 deletions service/history/workflow/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,36 +60,40 @@ func emitMutableStateStatus(
if stats == nil {
return
}
metrics.MutableStateSize.With(metricsHandler).Record(int64(stats.TotalSize))
metrics.ExecutionInfoSize.With(metricsHandler).Record(int64(stats.ExecutionInfoSize))
metrics.ExecutionStateSize.With(metricsHandler).Record(int64(stats.ExecutionStateSize))
metrics.ActivityInfoSize.With(metricsHandler).Record(int64(stats.ActivityInfoSize))
metrics.ActivityInfoCount.With(metricsHandler).Record(int64(stats.ActivityInfoCount))
metrics.TotalActivityCount.With(metricsHandler).Record(stats.TotalActivityCount)
metrics.TimerInfoSize.With(metricsHandler).Record(int64(stats.TimerInfoSize))
metrics.TimerInfoCount.With(metricsHandler).Record(int64(stats.TimerInfoCount))
metrics.TotalUserTimerCount.With(metricsHandler).Record(stats.TotalUserTimerCount)
metrics.ChildInfoSize.With(metricsHandler).Record(int64(stats.ChildInfoSize))
metrics.ChildInfoCount.With(metricsHandler).Record(int64(stats.ChildInfoCount))
metrics.TotalChildExecutionCount.With(metricsHandler).Record(stats.TotalChildExecutionCount)
metrics.RequestCancelInfoSize.With(metricsHandler).Record(int64(stats.RequestCancelInfoSize))
metrics.RequestCancelInfoCount.With(metricsHandler).Record(int64(stats.RequestCancelInfoCount))
metrics.TotalRequestCancelExternalCount.With(metricsHandler).Record(stats.TotalRequestCancelExternalCount)
metrics.SignalInfoSize.With(metricsHandler).Record(int64(stats.SignalInfoSize))
metrics.SignalInfoCount.With(metricsHandler).Record(int64(stats.SignalInfoCount))
metrics.TotalSignalExternalCount.With(metricsHandler).Record(stats.TotalSignalExternalCount)
metrics.SignalRequestIDSize.With(metricsHandler).Record(int64(stats.SignalRequestIDSize))
metrics.SignalRequestIDCount.With(metricsHandler).Record(int64(stats.SignalRequestIDCount))
metrics.TotalSignalCount.With(metricsHandler).Record(stats.TotalSignalCount)
metrics.BufferedEventsSize.With(metricsHandler).Record(int64(stats.BufferedEventsSize))
metrics.BufferedEventsCount.With(metricsHandler).Record(int64(stats.BufferedEventsCount))

batchHandler, closer := metricsHandler.BatchStart("mutable_state_status")
defer closer.Close()
metrics.MutableStateSize.With(batchHandler).Record(int64(stats.TotalSize))
metrics.ExecutionInfoSize.With(batchHandler).Record(int64(stats.ExecutionInfoSize))
metrics.ExecutionStateSize.With(batchHandler).Record(int64(stats.ExecutionStateSize))
metrics.ActivityInfoSize.With(batchHandler).Record(int64(stats.ActivityInfoSize))
metrics.ActivityInfoCount.With(batchHandler).Record(int64(stats.ActivityInfoCount))
metrics.TotalActivityCount.With(batchHandler).Record(stats.TotalActivityCount)
metrics.TimerInfoSize.With(batchHandler).Record(int64(stats.TimerInfoSize))
metrics.TimerInfoCount.With(batchHandler).Record(int64(stats.TimerInfoCount))
metrics.TotalUserTimerCount.With(batchHandler).Record(stats.TotalUserTimerCount)
metrics.ChildInfoSize.With(batchHandler).Record(int64(stats.ChildInfoSize))
metrics.ChildInfoCount.With(batchHandler).Record(int64(stats.ChildInfoCount))
metrics.TotalChildExecutionCount.With(batchHandler).Record(stats.TotalChildExecutionCount)
metrics.RequestCancelInfoSize.With(batchHandler).Record(int64(stats.RequestCancelInfoSize))
metrics.RequestCancelInfoCount.With(batchHandler).Record(int64(stats.RequestCancelInfoCount))
metrics.TotalRequestCancelExternalCount.With(batchHandler).Record(stats.TotalRequestCancelExternalCount)
metrics.SignalInfoSize.With(batchHandler).Record(int64(stats.SignalInfoSize))
metrics.SignalInfoCount.With(batchHandler).Record(int64(stats.SignalInfoCount))
metrics.TotalSignalExternalCount.With(batchHandler).Record(stats.TotalSignalExternalCount)
metrics.SignalRequestIDSize.With(batchHandler).Record(int64(stats.SignalRequestIDSize))
metrics.SignalRequestIDCount.With(batchHandler).Record(int64(stats.SignalRequestIDCount))
metrics.TotalSignalCount.With(batchHandler).Record(stats.TotalSignalCount)
metrics.BufferedEventsSize.With(batchHandler).Record(int64(stats.BufferedEventsSize))
metrics.BufferedEventsCount.With(batchHandler).Record(int64(stats.BufferedEventsCount))

if stats.HistoryStatistics != nil {
metrics.HistorySize.With(metricsHandler).Record(int64(stats.HistoryStatistics.SizeDiff))
metrics.HistoryCount.With(metricsHandler).Record(int64(stats.HistoryStatistics.CountDiff))
metrics.HistorySize.With(batchHandler).Record(int64(stats.HistoryStatistics.SizeDiff))
metrics.HistoryCount.With(batchHandler).Record(int64(stats.HistoryStatistics.CountDiff))
}

for category, taskCount := range stats.TaskCountByCategory {
// We use the metricsHandler rather than the batchHandler here because the same metric is repeatedly sent
metrics.TaskCount.With(metricsHandler).
Record(int64(taskCount), metrics.TaskCategoryTag(category))
}
Expand Down

0 comments on commit 52d7430

Please sign in to comment.