Skip to content

Commit

Permalink
Merge branch 'main' into issue-1688
Browse files Browse the repository at this point in the history
  • Loading branch information
xuan-cao-swi authored Sep 3, 2024
2 parents 34271c8 + 75a4885 commit 4372ada
Show file tree
Hide file tree
Showing 10 changed files with 467 additions and 30 deletions.
29 changes: 9 additions & 20 deletions exporter/otlp-metrics/.rubocop.yml
Original file line number Diff line number Diff line change
@@ -1,32 +1,21 @@
inherit_from: ../../contrib/rubocop.yml

AllCops:
TargetRubyVersion: "3.0"
NewCops: disable
SuggestExtensions: false
Exclude:
- "lib/opentelemetry/proto/**/*"
- "vendor/**/*"

Bundler/OrderedGems:
Exclude:
- gemfiles/**/*
Lint/UnusedMethodArgument:
Enabled: false
Lint/MissingSuper:
Enabled: false
Lint/ConstantDefinitionInBlock:
Exclude:
- "test/**/*"
Style/StringConcatenation:
Exclude:
- "test/**/*"
Metrics/AbcSize:
Metrics/CyclomaticComplexity:
Enabled: false
Layout/LineLength:
Metrics/PerceivedComplexity:
Enabled: false
Metrics/MethodLength:
Max: 20
Metrics/ParameterLists:
Enabled: false
Metrics/ClassLength:
Enabled: false
Bundler/OrderedGems:
Exclude:
- gemfiles/**/*
Style/FrozenStringLiteralComment:
Exclude:
- gemfiles/**/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ module Exporter
module OTLP
module Metrics
# An OpenTelemetry metrics exporter that sends metrics over HTTP as Protobuf encoded OTLP ExportMetricsServiceRequest.
class MetricsExporter < ::OpenTelemetry::SDK::Metrics::Export::MetricReader # rubocop:disable Metrics/ClassLength
class MetricsExporter < ::OpenTelemetry::SDK::Metrics::Export::MetricReader
include Util

attr_reader :metric_snapshots
Expand Down Expand Up @@ -89,7 +89,7 @@ def export(metrics, timeout: nil)
end
end

def send_bytes(bytes, timeout:) # rubocop:disable Metrics/CyclomaticComplexity, Metrics/MethodLength, Metrics/PerceivedComplexity
def send_bytes(bytes, timeout:)
return FAILURE if bytes.nil?

request = Net::HTTP::Post.new(@path)
Expand Down Expand Up @@ -183,7 +183,7 @@ def send_bytes(bytes, timeout:) # rubocop:disable Metrics/CyclomaticComplexity,
@http.write_timeout = @timeout
end

def encode(metrics_data) # rubocop:disable Metrics/MethodLength, Metrics/CyclomaticComplexity
def encode(metrics_data)
Opentelemetry::Proto::Collector::Metrics::V1::ExportMetricsServiceRequest.encode(
Opentelemetry::Proto::Collector::Metrics::V1::ExportMetricsServiceRequest.new(
resource_metrics: metrics_data
Expand Down Expand Up @@ -217,7 +217,7 @@ def encode(metrics_data) # rubocop:disable Metrics/MethodLength, Metrics/Cycloma
# current metric sdk only implements instrument: :counter -> :sum, :histogram -> :histogram
#
# metrics [MetricData]
def as_otlp_metrics(metrics) # rubocop:disable Metrics/MethodLength
def as_otlp_metrics(metrics)
case metrics.instrument_kind
when :observable_gauge
Opentelemetry::Proto::Metrics::V1::Metric.new(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def parse_headers(raw)
end
end

def backoff?(retry_count:, reason:, retry_after: nil) # rubocop:disable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
def backoff?(retry_count:, reason:, retry_after: nil)
return false if retry_count > RETRY_COUNT

sleep_interval = nil
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,309 @@
# frozen_string_literal: true

# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

require 'opentelemetry/common'
require 'opentelemetry/sdk'
require 'net/http'
require 'csv'
require 'zlib'

require 'google/rpc/status_pb'

require 'opentelemetry/proto/common/v1/common_pb'
require 'opentelemetry/proto/resource/v1/resource_pb'
require 'opentelemetry/proto/metrics/v1/metrics_pb'
require 'opentelemetry/proto/collector/metrics/v1/metrics_service_pb'

require 'opentelemetry/metrics'
require 'opentelemetry/sdk/metrics'

require_relative './util'

module OpenTelemetry
module Exporter
module OTLP
# An OpenTelemetry metrics exporter that sends metrics over HTTP as Protobuf encoded OTLP ExportMetricsServiceRequest.
class MetricsExporter < ::OpenTelemetry::SDK::Metrics::Export::MetricReader
include Util

attr_reader :metric_snapshots

SUCCESS = OpenTelemetry::SDK::Metrics::Export::SUCCESS
FAILURE = OpenTelemetry::SDK::Metrics::Export::FAILURE
private_constant(:SUCCESS, :FAILURE)

WRITE_TIMEOUT_SUPPORTED = Gem::Version.new(RUBY_VERSION) >= Gem::Version.new('2.6')
private_constant(:WRITE_TIMEOUT_SUPPORTED)

def self.ssl_verify_mode
if ENV.key?('OTEL_RUBY_EXPORTER_OTLP_SSL_VERIFY_PEER')
OpenSSL::SSL::VERIFY_PEER
elsif ENV.key?('OTEL_RUBY_EXPORTER_OTLP_SSL_VERIFY_NONE')
OpenSSL::SSL::VERIFY_NONE
else
OpenSSL::SSL::VERIFY_PEER
end
end

def initialize(endpoint: OpenTelemetry::Common::Utilities.config_opt('OTEL_EXPORTER_OTLP_METRICS_ENDPOINT', 'OTEL_EXPORTER_OTLP_ENDPOINT', default: 'http://localhost:4318/v1/metrics'),
certificate_file: OpenTelemetry::Common::Utilities.config_opt('OTEL_EXPORTER_OTLP_METRICS_CERTIFICATE', 'OTEL_EXPORTER_OTLP_CERTIFICATE'),
ssl_verify_mode: MetricsExporter.ssl_verify_mode,
headers: OpenTelemetry::Common::Utilities.config_opt('OTEL_EXPORTER_OTLP_METRICS_HEADERS', 'OTEL_EXPORTER_OTLP_HEADERS', default: {}),
compression: OpenTelemetry::Common::Utilities.config_opt('OTEL_EXPORTER_OTLP_METRICS_COMPRESSION', 'OTEL_EXPORTER_OTLP_COMPRESSION', default: 'gzip'),
timeout: OpenTelemetry::Common::Utilities.config_opt('OTEL_EXPORTER_OTLP_METRICS_TIMEOUT', 'OTEL_EXPORTER_OTLP_TIMEOUT', default: 10))
raise ArgumentError, "invalid url for OTLP::MetricsExporter #{endpoint}" unless OpenTelemetry::Common::Utilities.valid_url?(endpoint)
raise ArgumentError, "unsupported compression key #{compression}" unless compression.nil? || %w[gzip none].include?(compression)

# create the MetricStore object
super()

@uri = if endpoint == ENV['OTEL_EXPORTER_OTLP_ENDPOINT']
URI.join(endpoint, 'v1/metrics')
else
URI(endpoint)
end

@http = http_connection(@uri, ssl_verify_mode, certificate_file)

@path = @uri.path
@headers = prepare_headers(headers)
@timeout = timeout.to_f
@compression = compression
@mutex = Mutex.new
@shutdown = false
end

# consolidate the metrics data into the form of MetricData
#
# return MetricData
def pull
export(collect)
end

# metrics Array[MetricData]
def export(metrics, timeout: nil)
@mutex.synchronize do
send_bytes(encode(metrics), timeout: timeout)
end
end

def send_bytes(bytes, timeout:)
return FAILURE if bytes.nil?

request = Net::HTTP::Post.new(@path)

if @compression == 'gzip'
request.add_field('Content-Encoding', 'gzip')
body = Zlib.gzip(bytes)
else
body = bytes
end

request.body = body
request.add_field('Content-Type', 'application/x-protobuf')
@headers.each { |key, value| request.add_field(key, value) }

retry_count = 0
timeout ||= @timeout
start_time = OpenTelemetry::Common::Utilities.timeout_timestamp

around_request do
remaining_timeout = OpenTelemetry::Common::Utilities.maybe_timeout(timeout, start_time)
return FAILURE if remaining_timeout.zero?

@http.open_timeout = remaining_timeout
@http.read_timeout = remaining_timeout
@http.write_timeout = remaining_timeout if WRITE_TIMEOUT_SUPPORTED
@http.start unless @http.started?
response = measure_request_duration { @http.request(request) }
case response
when Net::HTTPOK
response.body # Read and discard body
SUCCESS
when Net::HTTPServiceUnavailable, Net::HTTPTooManyRequests
response.body # Read and discard body
redo if backoff?(retry_after: response['Retry-After'], retry_count: retry_count += 1, reason: response.code)
OpenTelemetry.logger.warn('Net::HTTPServiceUnavailable/Net::HTTPTooManyRequests in MetricsExporter#send_bytes')
FAILURE
when Net::HTTPRequestTimeOut, Net::HTTPGatewayTimeOut, Net::HTTPBadGateway
response.body # Read and discard body
redo if backoff?(retry_count: retry_count += 1, reason: response.code)
OpenTelemetry.logger.warn('Net::HTTPRequestTimeOut/Net::HTTPGatewayTimeOut/Net::HTTPBadGateway in MetricsExporter#send_bytes')
FAILURE
when Net::HTTPNotFound
OpenTelemetry.handle_error(message: "OTLP metrics_exporter received http.code=404 for uri: '#{@path}'")
FAILURE
when Net::HTTPBadRequest, Net::HTTPClientError, Net::HTTPServerError
log_status(response.body)
OpenTelemetry.logger.warn('Net::HTTPBadRequest/Net::HTTPClientError/Net::HTTPServerError in MetricsExporter#send_bytes')
FAILURE
when Net::HTTPRedirection
@http.finish
handle_redirect(response['location'])
redo if backoff?(retry_after: 0, retry_count: retry_count += 1, reason: response.code)
else
@http.finish
OpenTelemetry.logger.warn("Unexpected error in OTLP::MetricsExporter#send_bytes - #{response.message}")
FAILURE
end
rescue Net::OpenTimeout, Net::ReadTimeout
retry if backoff?(retry_count: retry_count += 1, reason: 'timeout')
OpenTelemetry.logger.warn('Net::OpenTimeout/Net::ReadTimeout in MetricsExporter#send_bytes')
return FAILURE
rescue OpenSSL::SSL::SSLError
retry if backoff?(retry_count: retry_count += 1, reason: 'openssl_error')
OpenTelemetry.logger.warn('OpenSSL::SSL::SSLError in MetricsExporter#send_bytes')
return FAILURE
rescue SocketError
retry if backoff?(retry_count: retry_count += 1, reason: 'socket_error')
OpenTelemetry.logger.warn('SocketError in MetricsExporter#send_bytes')
return FAILURE
rescue SystemCallError => e
retry if backoff?(retry_count: retry_count += 1, reason: e.class.name)
OpenTelemetry.logger.warn('SystemCallError in MetricsExporter#send_bytes')
return FAILURE
rescue EOFError
retry if backoff?(retry_count: retry_count += 1, reason: 'eof_error')
OpenTelemetry.logger.warn('EOFError in MetricsExporter#send_bytes')
return FAILURE
rescue Zlib::DataError
retry if backoff?(retry_count: retry_count += 1, reason: 'zlib_error')
OpenTelemetry.logger.warn('Zlib::DataError in MetricsExporter#send_bytes')
return FAILURE
rescue StandardError => e
OpenTelemetry.handle_error(exception: e, message: 'unexpected error in OTLP::MetricsExporter#send_bytes')
return FAILURE
end
ensure
# Reset timeouts to defaults for the next call.
@http.open_timeout = @timeout
@http.read_timeout = @timeout
@http.write_timeout = @timeout if WRITE_TIMEOUT_SUPPORTED
end

def encode(metrics_data)
Opentelemetry::Proto::Collector::Metrics::V1::ExportMetricsServiceRequest.encode(
Opentelemetry::Proto::Collector::Metrics::V1::ExportMetricsServiceRequest.new(
resource_metrics: metrics_data
.group_by(&:resource)
.map do |resource, scope_metrics|
Opentelemetry::Proto::Metrics::V1::ResourceMetrics.new(
resource: Opentelemetry::Proto::Resource::V1::Resource.new(
attributes: resource.attribute_enumerator.map { |key, value| as_otlp_key_value(key, value) }
),
scope_metrics: scope_metrics
.group_by(&:instrumentation_scope)
.map do |instrumentation_scope, metrics|
Opentelemetry::Proto::Metrics::V1::ScopeMetrics.new(
scope: Opentelemetry::Proto::Common::V1::InstrumentationScope.new(
name: instrumentation_scope.name,
version: instrumentation_scope.version
),
metrics: metrics.map { |sd| as_otlp_metrics(sd) }
)
end
)
end
)
)
rescue StandardError => e
OpenTelemetry.handle_error(exception: e, message: 'unexpected error in OTLP::MetricsExporter#encode')
nil
end

# metrics_pb has following type of data: :gauge, :sum, :histogram, :exponential_histogram, :summary
# current metric sdk only implements instrument: :counter -> :sum, :histogram -> :histogram
#
# metrics [MetricData]
def as_otlp_metrics(metrics)
case metrics.instrument_kind
when :observable_gauge
Opentelemetry::Proto::Metrics::V1::Metric.new(
name: metrics.name,
description: metrics.description,
unit: metrics.unit,
gauge: Opentelemetry::Proto::Metrics::V1::Gauge.new(
aggregation_temporality: as_otlp_aggregation_temporality(metrics.aggregation_temporality),
data_points: metrics.data_points.map do |ndp|
number_data_point(ndp)
end
)
)

when :counter, :up_down_counter
Opentelemetry::Proto::Metrics::V1::Metric.new(
name: metrics.name,
description: metrics.description,
unit: metrics.unit,
sum: Opentelemetry::Proto::Metrics::V1::Sum.new(
aggregation_temporality: as_otlp_aggregation_temporality(metrics.aggregation_temporality),
data_points: metrics.data_points.map do |ndp|
number_data_point(ndp)
end
)
)

when :histogram
Opentelemetry::Proto::Metrics::V1::Metric.new(
name: metrics.name,
description: metrics.description,
unit: metrics.unit,
histogram: Opentelemetry::Proto::Metrics::V1::Histogram.new(
aggregation_temporality: as_otlp_aggregation_temporality(metrics.aggregation_temporality),
data_points: metrics.data_points.map do |hdp|
histogram_data_point(hdp)
end
)
)
end
end

def as_otlp_aggregation_temporality(type)
case type
when :delta then Opentelemetry::Proto::Metrics::V1::AggregationTemporality::AGGREGATION_TEMPORALITY_DELTA
when :cumulative then Opentelemetry::Proto::Metrics::V1::AggregationTemporality::AGGREGATION_TEMPORALITY_CUMULATIVE
else Opentelemetry::Proto::Metrics::V1::AggregationTemporality::AGGREGATION_TEMPORALITY_UNSPECIFIED
end
end

def histogram_data_point(hdp)
Opentelemetry::Proto::Metrics::V1::HistogramDataPoint.new(
attributes: hdp.attributes.map { |k, v| as_otlp_key_value(k, v) },
start_time_unix_nano: hdp.start_time_unix_nano,
time_unix_nano: hdp.time_unix_nano,
count: hdp.count,
sum: hdp.sum,
bucket_counts: hdp.bucket_counts,
explicit_bounds: hdp.explicit_bounds,
exemplars: hdp.exemplars,
min: hdp.min,
max: hdp.max
)
end

def number_data_point(ndp)
Opentelemetry::Proto::Metrics::V1::NumberDataPoint.new(
attributes: ndp.attributes.map { |k, v| as_otlp_key_value(k, v) },
as_int: ndp.value,
start_time_unix_nano: ndp.start_time_unix_nano,
time_unix_nano: ndp.time_unix_nano,
exemplars: ndp.exemplars # exemplars not implemented yet from metrics sdk
)
end

# may not need this
def reset
SUCCESS
end

def shutdown(timeout: nil)
@shutdown = true
SUCCESS
end
end
end
end
end
Loading

0 comments on commit 4372ada

Please sign in to comment.