Skip to content

Commit

Permalink
export explicit histos from prometheus
Browse files Browse the repository at this point in the history
  • Loading branch information
Isaac Zinda committed Sep 21, 2024
1 parent 8c0a5bf commit ef972a4
Show file tree
Hide file tree
Showing 4 changed files with 310 additions and 10 deletions.
1 change: 1 addition & 0 deletions lightstep/sdk/metric/exporters/otlp/otelcol/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ func (c *client) ExportMetrics(ctx context.Context, data data.Metrics) error {
c.counter,
&c.ResourceMap,
c.exporter,
true, // use exponential histograms
)
}

Expand Down
11 changes: 10 additions & 1 deletion lightstep/sdk/metric/exporters/prom/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,16 @@ func (c *client) String() string {

// ExportMetrics implements PushExporter.
func (c *client) ExportMetrics(ctx context.Context, data data.Metrics) error {
return export.ExportMetrics(ctx, data, c.tracer, c.counter, &c.ResourceMap, c.exporter)
return export.ExportMetrics(
ctx,
data,
c.tracer,
c.counter,
&c.ResourceMap,
c.exporter,
// don't use exponential histograms, since the prometheus exporter doesn't support them
false,
)
}

// ShutdownMetrics implements PushExporter.
Expand Down
113 changes: 104 additions & 9 deletions lightstep/sdk/metric/internal/export/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
otelcodes "go.opentelemetry.io/otel/codes"
metricapi "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"math"
)

func toTemporality(t aggregation.Temporality) pmetric.AggregationTemporality {
Expand Down Expand Up @@ -94,7 +95,92 @@ func copyGaugePoints(m pmetric.Metric, inM data.Instrument) {
}
}

func copyHistogramPoints(m pmetric.Metric, inM data.Instrument) {
func copyExplicitHistogramPoints(m pmetric.Metric, inM data.Instrument) {
s := m.SetEmptyHistogram()
s.SetAggregationTemporality(toTemporality(inM.Points[0].Temporality))

for _, inP := range inM.Points {
dp := s.DataPoints().AppendEmpty()

dp.SetStartTimestamp(pcommon.NewTimestampFromTime(inP.Start))
dp.SetTimestamp(pcommon.NewTimestampFromTime(inP.End))
internal.CopyAttributes(dp.Attributes(), inP.Attributes)

switch t := inP.Aggregation.(type) {
case *histogram.Int64:
dp.SetSum(t.Sum().CoerceToFloat64(number.Int64Kind))
dp.SetCount(t.Count())

if t.Count() != 0 {
dp.SetMax(t.Max().CoerceToFloat64(number.Int64Kind))
dp.SetMin(t.Min().CoerceToFloat64(number.Int64Kind))
}

copyExplicitHistogramBuckets(t.Positive(), t.ZeroCount(), t.Scale(), dp)

case *histogram.Float64:
dp.SetSum(number.ToFloat64(t.Sum()))
dp.SetCount(t.Count())
if t.Count() != 0 {
dp.SetMax(number.ToFloat64(t.Max()))
dp.SetMin(number.ToFloat64(t.Min()))
}

copyExplicitHistogramBuckets(t.Positive(), t.ZeroCount(), t.Scale(), dp)
default:
panic("unhandled case")
}
}
}

// copyExplicitHistogramBuckets converts a Lightstep exponential histogram to an OTel histogram with
// explicitly defined buckets.
func copyExplicitHistogramBuckets(
sourcePositiveBuckets aggregation.Buckets,
sourceZeroCount uint64,
sourceScale int32,
dest pmetric.HistogramDataPoint,
) {
// add the zero bucket in: (-Inf, 0]
dest.ExplicitBounds().Append(0)
dest.BucketCounts().Append(sourceZeroCount)

if sourcePositiveBuckets.Len() > 0 {
positiveOffset := sourcePositiveBuckets.Offset()
positiveNumElements := int32(sourcePositiveBuckets.Len())
for element := int32(0); element < positiveNumElements; element++ {
index := element + positiveOffset

leftBound, rightBound := indexToBucketBounds(index, sourceScale, true)

// We have to add a bucket to get from zero to the start of the first user-defined bucket.
// This has no count.
if element == 0 {
dest.ExplicitBounds().Append(leftBound)
dest.BucketCounts().Append(0)
}

dest.ExplicitBounds().Append(rightBound)
dest.BucketCounts().Append(sourcePositiveBuckets.At(uint32(element)))
}
}

// There are no elements in the (..., +Inf] bucket.
dest.BucketCounts().Append(0)
}

// indexToBucketBounds returns (left_bound, right_bound] for the bucket with the given index.
func indexToBucketBounds(index int32, scale int32, isPositive bool) (float64, float64) {
base := math.Pow(2, math.Pow(2, -float64(scale)))

if isPositive {
return math.Pow(base, float64(index)), math.Pow(base, float64(index)+1)
} else {
return -math.Pow(base, float64(index)+1), -math.Pow(base, float64(index))
}
}

func copyExponentialHistogramPoints(m pmetric.Metric, inM data.Instrument) {
s := m.SetEmptyExponentialHistogram()
s.SetAggregationTemporality(toTemporality(inM.Points[0].Temporality))

Expand All @@ -117,10 +203,10 @@ func copyHistogramPoints(m pmetric.Metric, inM data.Instrument) {
dp.SetMin(t.Min().CoerceToFloat64(number.Int64Kind))
}
if t.Positive().Len() != 0 {
copyHistogramBuckets(dp.Positive(), t.Positive())
copyExponentialHistogramBuckets(dp.Positive(), t.Positive())
}
if t.Negative().Len() != 0 {
copyHistogramBuckets(dp.Negative(), t.Negative())
copyExponentialHistogramBuckets(dp.Negative(), t.Negative())
}
case *histogram.Float64:
dp.SetSum(number.ToFloat64(t.Sum()))
Expand All @@ -132,18 +218,18 @@ func copyHistogramPoints(m pmetric.Metric, inM data.Instrument) {
dp.SetMin(number.ToFloat64(t.Min()))
}
if t.Positive().Len() != 0 {
copyHistogramBuckets(dp.Positive(), t.Positive())
copyExponentialHistogramBuckets(dp.Positive(), t.Positive())
}
if t.Negative().Len() != 0 {
copyHistogramBuckets(dp.Negative(), t.Negative())
copyExponentialHistogramBuckets(dp.Negative(), t.Negative())
}
default:
panic("unhandled case")
}
}
}

func copyHistogramBuckets(dest pmetric.ExponentialHistogramDataPointBuckets, src aggregation.Buckets) {
func copyExponentialHistogramBuckets(dest pmetric.ExponentialHistogramDataPointBuckets, src aggregation.Buckets) {
if src.Len() == 0 {
return
}
Expand Down Expand Up @@ -187,7 +273,11 @@ func copyMMSCPoints(m pmetric.Metric, inM data.Instrument) {
}
}

func d2pd(resourceMap *internal.ResourceMap, in data.Metrics) pmetric.Metrics {
func d2pd(
resourceMap *internal.ResourceMap,
in data.Metrics,
useExponentialHistogram bool,
) pmetric.Metrics {
out := pmetric.NewMetrics()
rm := out.ResourceMetrics().AppendEmpty()

Expand Down Expand Up @@ -217,7 +307,11 @@ func d2pd(resourceMap *internal.ResourceMap, in data.Metrics) pmetric.Metrics {
case *gauge.Int64, *gauge.Float64:
copyGaugePoints(m, inM)
case *histogram.Int64, *histogram.Float64:
copyHistogramPoints(m, inM)
if useExponentialHistogram {
copyExponentialHistogramPoints(m, inM)
} else {
copyExplicitHistogramPoints(m, inM)
}
case *minmaxsumcount.Int64, *minmaxsumcount.Float64:
copyMMSCPoints(m, inM)
}
Expand All @@ -234,14 +328,15 @@ func ExportMetrics(
telemetryItemsCounter metricapi.Int64Counter,
resourceMap *internal.ResourceMap,
exporter exporter.Metrics,
useExponentialHistogram bool,
) error {
ctx, span := tracer.Start(
ctx,
"otelsdk_export_metrics",
)
defer span.End()

converted := d2pd(resourceMap, data)
converted := d2pd(resourceMap, data, useExponentialHistogram)
points := int64(converted.DataPointCount())

err := exporter.ConsumeMetrics(ctx, converted)
Expand Down
Loading

0 comments on commit ef972a4

Please sign in to comment.