From 668ce11f2c0d603b0d54567524a0c968089d6e30 Mon Sep 17 00:00:00 2001 From: Maksym Ochenashko Date: Thu, 5 Sep 2024 16:50:30 +0300 Subject: [PATCH] sdk-exporter: implement gRPC exporter --- build.sbt | 1 + docs/sdk/configuration.md | 4 + .../exporter/otlp/HttpPayloadEncoding.scala | 16 +- .../otel4s/sdk/exporter/otlp/OtlpClient.scala | 378 +++++++++++++++++ .../sdk/exporter/otlp/OtlpHttpClient.scala | 233 ----------- .../sdk/exporter/otlp/OtlpProtocol.scala | 47 +++ .../exporter/otlp/PayloadCompression.scala | 43 ++ ...re.scala => OtlpClientAutoConfigure.scala} | 133 +++--- .../autoconfigure/ProtocolAutoConfigure.scala | 155 ------- .../sdk/exporter/otlp/grpc/GrpcCodecs.scala | 74 ++++ .../sdk/exporter/otlp/grpc/GrpcHeaders.scala | 86 ++++ .../GrpcStatusException.scala} | 13 +- .../otlp/grpc/LengthPrefixedMessage.scala | 36 ++ .../ProtocolAutoConfigureSuite.scala | 149 ------- .../sdk/exporter/otlp/OtlpProtocolSuite.scala | 49 +++ .../OtlpClientAutoConfigureSuite.scala | 393 ++++++++++++++++++ .../OtlpHttpClientAutoConfigureSuite.scala | 239 ----------- ...xporter.scala => OtlpMetricExporter.scala} | 85 ++-- .../OtlpMetricExporterAutoConfigure.scala | 67 +-- ...te.scala => OtlpMetricExporterSuite.scala} | 93 +++-- ...OtlpMetricExporterAutoConfigureSuite.scala | 42 +- ...nExporter.scala => OtlpSpanExporter.scala} | 87 ++-- .../OtlpSpanExporterAutoConfigure.scala | 51 +-- ...uite.scala => OtlpSpanExporterSuite.scala} | 79 +++- .../OtlpSpanExporterAutoConfigureSuite.scala | 42 +- 25 files changed, 1549 insertions(+), 1046 deletions(-) create mode 100644 sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/OtlpClient.scala delete mode 100644 sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/OtlpHttpClient.scala create mode 100644 sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/OtlpProtocol.scala create mode 100644 sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/PayloadCompression.scala rename sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/autoconfigure/{OtlpHttpClientAutoConfigure.scala => OtlpClientAutoConfigure.scala} (65%) delete mode 100644 sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/autoconfigure/ProtocolAutoConfigure.scala create mode 100644 sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/grpc/GrpcCodecs.scala create mode 100644 sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/grpc/GrpcHeaders.scala rename sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/{Protocol.scala => grpc/GrpcStatusException.scala} (67%) create mode 100644 sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/grpc/LengthPrefixedMessage.scala delete mode 100644 sdk-exporter/common/src/test/scala/org/typelevel/otel4s/sdk/exporter/autoconfigure/ProtocolAutoConfigureSuite.scala create mode 100644 sdk-exporter/common/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/OtlpProtocolSuite.scala create mode 100644 sdk-exporter/common/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/autoconfigure/OtlpClientAutoConfigureSuite.scala delete mode 100644 sdk-exporter/common/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/autoconfigure/OtlpHttpClientAutoConfigureSuite.scala rename sdk-exporter/metrics/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/metrics/{OtlpHttpMetricExporter.scala => OtlpMetricExporter.scala} (78%) rename sdk-exporter/metrics/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/metrics/{OtlpHttpMetricExporterSuite.scala => OtlpMetricExporterSuite.scala} (84%) rename sdk-exporter/trace/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/{OtlpHttpSpanExporter.scala => OtlpSpanExporter.scala} (68%) rename sdk-exporter/trace/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/{OtlpHttpSpanExporterSuite.scala => OtlpSpanExporterSuite.scala} (82%) diff --git a/build.sbt b/build.sbt index 8cbb10944..ee59346ee 100644 --- a/build.sbt +++ b/build.sbt @@ -360,6 +360,7 @@ lazy val `sdk-exporter-common` = name := "otel4s-sdk-exporter-common", startYear := Some(2023), libraryDependencies ++= Seq( + "co.fs2" %%% "fs2-scodec" % FS2Version, "org.http4s" %%% "http4s-ember-client" % Http4sVersion, "org.http4s" %%% "http4s-circe" % Http4sVersion, "io.github.scalapb-json" %%% "scalapb-circe" % ScalaPBCirceVersion, diff --git a/docs/sdk/configuration.md b/docs/sdk/configuration.md index 575a055ad..0c3ef731d 100644 --- a/docs/sdk/configuration.md +++ b/docs/sdk/configuration.md @@ -90,11 +90,13 @@ Target-specific properties are prioritized. E.g. `otel.exporter.otlp.metrics.end | System property | Environment variable | Description | |----------------------------------------|------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| otel.exporter.otlp.protocol | OTEL\\_EXPORTER\\_OTLP\\_PROTOCOL | The transport protocol to use. Options include `grpc`, `http/protobuf`, and `http/json`. Default is `http/protobuf`. | | otel.exporter.otlp.endpoint | OTEL\\_EXPORTER\\_OTLP\\_ENDPOINT | The OTLP traces, metrics, and logs endpoint to connect to. Must be a **base** URL with a scheme of either http or https based on the use of TLS. Default is `http://localhost:4318/`. | | otel.exporter.otlp.headers | OTEL\\_EXPORTER\\_OTLP\\_HEADERS | Key-value pairs separated by commas to pass as request headers on OTLP trace, metric, and log requests. | | otel.exporter.otlp.compression | OTEL\\_EXPORTER\\_OTLP\\_COMPRESSION | The compression type to use on OTLP trace, metric, and log requests. Options include gzip. By default, no compression will be used. | | otel.exporter.otlp.timeout | OTEL\\_EXPORTER\\_OTLP\\_TIMEOUT | The maximum waiting time to send each OTLP trace, metric, and log batch. Default is `10 seconds`. | | **Target specific:** | | | +| otel.exporter.otlp.metrics.protocol | OTEL\\_EXPORTER\\_OTLP\\_METRICS\\_PROTOCOL | The transport protocol to use. Options include `grpc`, `http/protobuf`, and `http/json`. Default is `http/protobuf`. | | otel.exporter.otlp.metrics.endpoint | OTEL\\_EXPORTER\\_OTLP\\_METRICS\\_ENDPOINT | The OTLP metrics endpoint to connect to. Default is `http://localhost:4318/v1/metrics`. | | otel.exporter.otlp.metrics.headers | OTEL\\_EXPORTER\\_OTLP\\_METRICS\\_HEADERS | Key-value pairs separated by commas to pass as request headers on OTLP trace requests. | | otel.exporter.otlp.metrics.compression | OTEL\\_EXPORTER\\_OTLP\\_METRICS\\_COMPRESSION | The compression type to use on OTLP trace requests. Options include gzip. By default, no compression will be used. | @@ -150,11 +152,13 @@ Target-specific properties are prioritized. E.g. `otel.exporter.otlp.traces.endp | System property | Environment variable | Description | |---------------------------------------|-----------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| otel.exporter.otlp.protocol | OTEL\\_EXPORTER\\_OTLP\\_PROTOCOL | The transport protocol to use. Options include `grpc`, `http/protobuf`, and `http/json`. Default is `http/protobuf`. | | otel.exporter.otlp.endpoint | OTEL\\_EXPORTER\\_OTLP\\_ENDPOINT | The OTLP traces, metrics, and logs endpoint to connect to. Must be a **base** URL with a scheme of either http or https based on the use of TLS. Default is `http://localhost:4318/`. | | otel.exporter.otlp.headers | OTEL\\_EXPORTER\\_OTLP\\_HEADERS | Key-value pairs separated by commas to pass as request headers on OTLP trace, metric, and log requests. | | otel.exporter.otlp.compression | OTEL\\_EXPORTER\\_OTLP\\_COMPRESSION | The compression type to use on OTLP trace, metric, and log requests. Options include gzip. By default, no compression will be used. | | otel.exporter.otlp.timeout | OTEL\\_EXPORTER\\_OTLP\\_TIMEOUT | The maximum waiting time to send each OTLP trace, metric, and log batch. Default is `10 seconds`. | | **Target specific:** | | | +| otel.exporter.otlp.metrics.protocol | OTEL\\_EXPORTER\\_OTLP\\_TRACES\\_PROTOCOL | The transport protocol to use. Options include `grpc`, `http/protobuf`, and `http/json`. Default is `http/protobuf`. | | otel.exporter.otlp.traces.endpoint | OTEL\\_EXPORTER\\_OTLP\\_TRACES\\_ENDPOINT | The OTLP traces endpoint to connect to. Default is `http://localhost:4318/v1/traces`. | | otel.exporter.otlp.traces.headers | OTEL\\_EXPORTER\\_OTLP\\_TRACES\\_HEADERS | Key-value pairs separated by commas to pass as request headers on OTLP trace requests. | | otel.exporter.otlp.traces.compression | OTEL\\_EXPORTER\\_OTLP\\_TRACES\\_COMPRESSION | The compression type to use on OTLP trace requests. Options include gzip. By default, no compression will be used. | diff --git a/sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/HttpPayloadEncoding.scala b/sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/HttpPayloadEncoding.scala index 87d7344bf..133f353a4 100644 --- a/sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/HttpPayloadEncoding.scala +++ b/sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/HttpPayloadEncoding.scala @@ -16,9 +16,21 @@ package org.typelevel.otel4s.sdk.exporter.otlp -sealed trait HttpPayloadEncoding +import cats.Show -object HttpPayloadEncoding { +private[otlp] sealed trait HttpPayloadEncoding { + override def toString: String = + Show[HttpPayloadEncoding].show(this) +} + +private[otlp] object HttpPayloadEncoding { case object Json extends HttpPayloadEncoding case object Protobuf extends HttpPayloadEncoding + + implicit val httpPayloadEncodingShow: Show[HttpPayloadEncoding] = + Show.show { + case Json => "json" + case Protobuf => "protobuf" + } + } diff --git a/sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/OtlpClient.scala b/sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/OtlpClient.scala new file mode 100644 index 000000000..f5f032719 --- /dev/null +++ b/sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/OtlpClient.scala @@ -0,0 +1,378 @@ +/* + * Copyright 2023 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s.sdk.exporter.otlp + +import cats.Foldable +import cats.effect.Async +import cats.effect.Resource +import cats.effect.Temporal +import cats.effect.std.Console +import cats.effect.syntax.temporal._ +import cats.syntax.applicative._ +import cats.syntax.applicativeError._ +import cats.syntax.flatMap._ +import cats.syntax.foldable._ +import cats.syntax.functor._ +import fs2.Chunk +import fs2.Stream +import fs2.compression.Compression +import fs2.io.net.Network +import fs2.io.net.tls.TLSContext +import io.opentelemetry.proto.collector.trace.v1.trace_service.ExportTraceServiceResponse +import org.http4s.ContentCoding +import org.http4s.EntityEncoder +import org.http4s.Header +import org.http4s.Headers +import org.http4s.HttpVersion +import org.http4s.Method +import org.http4s.ProductId +import org.http4s.Request +import org.http4s.Response +import org.http4s.Status +import org.http4s.Uri +import org.http4s.client.Client +import org.http4s.client.middleware.{RetryPolicy => HttpRetryPolicy} +import org.http4s.client.middleware.GZip +import org.http4s.client.middleware.Retry +import org.http4s.ember.client.EmberClientBuilder +import org.http4s.h2.H2Keys +import org.http4s.headers.`User-Agent` +import org.typelevel.ci._ +import org.typelevel.otel4s.sdk.BuildInfo +import org.typelevel.otel4s.sdk.exporter.RetryPolicy +import org.typelevel.otel4s.sdk.exporter.otlp.grpc.GrpcCodecs +import org.typelevel.otel4s.sdk.exporter.otlp.grpc.GrpcHeaders +import org.typelevel.otel4s.sdk.exporter.otlp.grpc.GrpcStatusException +import scalapb_circe.Printer +import scodec.Attempt +import scodec.DecodeResult +import scodec.Decoder +import scodec.Encoder +import scodec.bits.BitVector +import scodec.bits.ByteVector + +import scala.concurrent.TimeoutException +import scala.concurrent.duration.FiniteDuration +import scala.util.chaining._ + +private[otlp] abstract class OtlpClient[F[_]: Temporal: Console, A] private ( + config: OtlpClient.Config, + client: Client[F] +) { + import OtlpClient.Defaults + + private val userAgent = `User-Agent`( + ProductId(Defaults.UserAgentName, version = Some(BuildInfo.version)) + ) + + protected def toRequest[G[_]: Foldable](records: G[A]): Request[F] + + protected def handleResponse(response: Response[F]): F[Unit] + + final def doExport[G[_]: Foldable](records: G[A]): F[Unit] = + client + .run(toRequest(records).putHeaders(userAgent)) + .use(response => handleResponse(response)) + .timeoutTo( + config.timeout, + Temporal[F].unit >> Temporal[F].raiseError( + new TimeoutException( + s"OtlpClient(${config.protocol}): the export to [${config.endpoint}] has timed out after [${config.timeout}]" + ) + ) + ) + .handleErrorWith { e => + Console[F].errorln( + s"[OtlpClient(${config.protocol}) ${config.endpoint}]: cannot export: ${e.getMessage}\n${e.getStackTrace.mkString("\n")}\n" + ) + } + + override final def toString: String = { + val headers = + config.headers.mkString("headers={", ",", "}", Headers.SensitiveHeaders) + + "OtlpClient{" + + s"protocol=${config.protocol}, " + + s"endpoint=${config.endpoint}, " + + s"timeout=${config.timeout}, " + + s"compression=${config.compression}, " + + headers + + "}" + } +} + +private[otlp] object OtlpClient { + + private final case class Config( + protocol: OtlpProtocol, + endpoint: Uri, + timeout: FiniteDuration, + headers: Headers, + compression: PayloadCompression + ) + + private object Defaults { + val UserAgentName: String = "OTel-OTLP-Exporter-Scala-Otel4s" + } + + def create[F[_]: Async: Network: Compression: Console, A]( + protocol: OtlpProtocol, + endpoint: Uri, + headers: Headers, + compression: PayloadCompression, + timeout: FiniteDuration, + retryPolicy: RetryPolicy, + tlsContext: Option[TLSContext[F]], + customClient: Option[Client[F]] + )(implicit + encoder: ProtoEncoder.Message[List[A]], + printer: Printer + ): Resource[F, OtlpClient[F, A]] = { + val config = Config(protocol, endpoint, timeout, headers, compression) + + def createClient(enableHttp2: Boolean): Resource[F, Client[F]] = + customClient match { + case Some(client) => + Resource + .eval( + Console[F].println( + "You are using a custom http4s client with OtlpClient. " + + "'timeout' and 'tlsContext' settings are ignored." + + "If you are using the gRPC exporter, make sure the client has '.withHttp2' enabled." + ) + ) + .as(client) + + case None => + EmberClientBuilder + .default[F] + .withTimeout(timeout) + .pipe(builder => tlsContext.foldLeft(builder)(_.withTLSContext(_))) + .pipe { + case builder if enableHttp2 => builder.withHttp2 + case builder => builder + } + .build + } + + def backoff(attempt: Int): Option[FiniteDuration] = + Option.when(attempt < retryPolicy.maxAttempts) { + val next = + retryPolicy.initialBackoff * attempt.toLong * retryPolicy.backoffMultiplier + + val delay = + next.min(retryPolicy.maxBackoff) + + delay match { + case f: FiniteDuration => f + case _ => retryPolicy.maxBackoff + } + } + + protocol match { + case OtlpProtocol.Http(encoding) => + val gzip: Client[F] => Client[F] = + compression match { + case PayloadCompression.Gzip => GZip[F]() + case PayloadCompression.NoCompression => identity + } + + // see https://opentelemetry.io/docs/specs/otlp/#failures-1 + val retryable = Set( + Status.TooManyRequests, + Status.BadGateway, + Status.ServiceUnavailable, + Status.GatewayTimeout + ) + + def shouldRetry(result: Either[Throwable, Response[F]]): Boolean = + result match { + case Left(_) => true + case Right(response) => retryable.contains(response.status) + } + + val policy = HttpRetryPolicy[F](backoff, (_, res) => shouldRetry(res)) + + for { + client <- createClient(enableHttp2 = false) + } yield new Http[F, A](config, encoding, Retry(policy)(gzip(client))) + + case OtlpProtocol.Grpc => + // see https://opentelemetry.io/docs/specs/otlp/#failures + // https://grpc.github.io/grpc/core/md_doc_statuscodes.html + val retryable = Set( + 1, // CANCELLED + 4, // DEADLINE_EXCEEDED + 10, // ABORTED + 11, // OUT_OF_RANGE + 14, // UNAVAILABLE + 15 // DATA_LOSS + ) + + def shouldRetry(result: Either[Throwable, Response[F]]): Boolean = + result match { + case Left(GrpcStatusException(code, _)) => retryable.contains(code) + case Left(_) => true + case Right(_) => false + } + + val policy = HttpRetryPolicy[F](backoff, (_, res) => shouldRetry(res)) + + for { + client <- createClient(enableHttp2 = true) + } yield new Grpc[F, A](config, Retry(policy)(client)) + } + } + + private final class Http[F[_]: Temporal: Console, A]( + config: Config, + encoding: HttpPayloadEncoding, + client: Client[F] + )(implicit encoder: ProtoEncoder.Message[List[A]], printer: Printer) + extends OtlpClient[F, A](config, client) { + + private implicit val entityEncoder: EntityEncoder[F, List[A]] = + encoding match { + case HttpPayloadEncoding.Json => + import io.circe.Printer + import org.http4s.circe._ + jsonEncoderWithPrinter(Printer.noSpaces).contramap[List[A]] { spans => + ProtoEncoder.toJson(spans) + } + + case HttpPayloadEncoding.Protobuf => + val content = Header.Raw(ci"Content-Type", "application/x-protobuf") + EntityEncoder.simple(content) { spans => + Chunk.array(ProtoEncoder.toByteArray(spans)) + } + } + + protected def toRequest[G[_]: Foldable](records: G[A]): Request[F] = + Request[F](Method.POST, config.endpoint, HttpVersion.`HTTP/1.1`) + .withEntity(records.toList) + .putHeaders(config.headers) + + protected def handleResponse(response: Response[F]): F[Unit] = + logBody(response).unlessA(isSuccess(response.status)) + + private def isSuccess(status: Status): Boolean = + status.responseClass == Status.Successful + + private def logBody(response: Response[F]): F[Unit] = + for { + body <- response.bodyText.compile.string + _ <- Console[F].errorln( + s"[OtlpClient(${config.protocol}) ${config.endpoint}] the request failed with [${response.status}]. Body: $body" + ) + } yield () + } + + /** The implementation utilizes some ideas from: + * - https://github.com/http4s/http4s-grpc + * - https://github.com/typelevel/fs2-grpc + * + * We can consider migration to http4s-grpc once it reaches 1.x. + */ + private final class Grpc[F[_]: Temporal: Compression: Console, A]( + config: Config, + client: Client[F] + )(implicit encoder: ProtoEncoder.Message[List[A]]) + extends OtlpClient[F, A](config, client) { + + private val encode: Encoder[List[A]] = Encoder { a => + Attempt.successful(ByteVector.view(ProtoEncoder.toByteArray(a)).bits) + } + + private val decode: Decoder[ExportTraceServiceResponse] = Decoder { bits => + Attempt + .fromTry(ExportTraceServiceResponse.validate(bits.bytes.toArrayUnsafe)) + .map(a => DecodeResult(a, BitVector.empty)) + } + + private val bodyStreamEncoder = { + val isGzip = config.compression match { + case PayloadCompression.Gzip => true + case PayloadCompression.NoCompression => false + } + + GrpcCodecs.encode(encode, isGzip) + } + + private val bodyStreamDecoder = + GrpcCodecs.decode(decode) + + private val headers = { + val coding = config.compression match { + case PayloadCompression.Gzip => ContentCoding.gzip + case PayloadCompression.NoCompression => ContentCoding.identity + } + + val grpc = Headers( + GrpcHeaders.TE, + GrpcHeaders.GrpcEncoding(coding), + GrpcHeaders.GrpcAcceptEncoding(coding), + GrpcHeaders.ContentType + ) + + grpc ++ config.headers + } + + protected def toRequest[G[_]: Foldable](records: G[A]): Request[F] = + Request[F](Method.POST, config.endpoint, HttpVersion.`HTTP/2`) + .putHeaders(headers) + .withBodyStream(Stream(records.toList).through(bodyStreamEncoder)) + .withAttribute(H2Keys.Http2PriorKnowledge, ()) + + protected def handleResponse(response: Response[F]): F[Unit] = + for { + _ <- checkGrpcStatus(response.headers) + body <- decodeResponse(response) + _ <- checkExportStatus(body) + trailingHeaders <- response.trailerHeaders + _ <- checkGrpcStatus(trailingHeaders) + } yield () + + private def checkExportStatus(response: ExportTraceServiceResponse): F[Unit] = + response.partialSuccess + .filter(r => r.errorMessage.nonEmpty || r.rejectedSpans > 0) + .traverse_ { ps => + Console[F].errorln( + s"[OtlpClient(${config.protocol}) ${config.endpoint}]: some spans [${ps.rejectedSpans}] were rejected due to [${ps.errorMessage}]" + ) + } + + private def decodeResponse(response: Response[F]): F[ExportTraceServiceResponse] = + response.body.through(bodyStreamDecoder).take(1).compile.lastOrError + + private def checkGrpcStatus(headers: Headers): F[Unit] = { + val status = headers.get[GrpcHeaders.GrpcStatus] + val reason = headers.get[GrpcHeaders.GrpcMessage] + + status match { + case Some(GrpcHeaders.GrpcStatus(0)) => + Temporal[F].unit + + case Some(GrpcHeaders.GrpcStatus(status)) => + Temporal[F].raiseError(GrpcStatusException(status, reason.map(_.message))) + + case None => + Temporal[F].unit + } + } + } + +} diff --git a/sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/OtlpHttpClient.scala b/sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/OtlpHttpClient.scala deleted file mode 100644 index ac6f1ce97..000000000 --- a/sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/OtlpHttpClient.scala +++ /dev/null @@ -1,233 +0,0 @@ -/* - * Copyright 2023 Typelevel - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.typelevel.otel4s.sdk.exporter.otlp - -import cats.Foldable -import cats.effect.Async -import cats.effect.Resource -import cats.effect.Temporal -import cats.effect.std.Console -import cats.effect.syntax.temporal._ -import cats.syntax.applicative._ -import cats.syntax.applicativeError._ -import cats.syntax.flatMap._ -import cats.syntax.foldable._ -import cats.syntax.functor._ -import fs2.Chunk -import fs2.compression.Compression -import fs2.io.net.Network -import fs2.io.net.tls.TLSContext -import org.http4s.EntityEncoder -import org.http4s.Header -import org.http4s.Headers -import org.http4s.HttpVersion -import org.http4s.Method -import org.http4s.ProductId -import org.http4s.Request -import org.http4s.Response -import org.http4s.Status -import org.http4s.Uri -import org.http4s.client.Client -import org.http4s.client.middleware.{RetryPolicy => HttpRetryPolicy} -import org.http4s.client.middleware.GZip -import org.http4s.client.middleware.Retry -import org.http4s.ember.client.EmberClientBuilder -import org.http4s.headers.`User-Agent` -import org.typelevel.ci._ -import org.typelevel.otel4s.sdk.BuildInfo -import org.typelevel.otel4s.sdk.exporter.RetryPolicy -import scalapb_circe.Printer - -import scala.concurrent.TimeoutException -import scala.concurrent.duration.FiniteDuration - -/** Exports spans via HTTP. Support `json` and `protobuf` encoding. - * - * @see - * [[https://opentelemetry.io/docs/specs/otel/protocol/exporter/]] - * - * @see - * [[https://opentelemetry.io/docs/concepts/sdk-configuration/otlp-exporter-configuration/]] - */ -private[otlp] final class OtlpHttpClient[F[_]: Temporal: Console, A] private ( - client: Client[F], - config: OtlpHttpClient.Config, -)(implicit encoder: ProtoEncoder.Message[List[A]], printer: Printer) { - - import OtlpHttpClient.Defaults - - private val userAgent = `User-Agent`( - ProductId(Defaults.UserAgentName, version = Some(BuildInfo.version)) - ) - - private implicit val entityEncoder: EntityEncoder[F, List[A]] = - config.encoding match { - case HttpPayloadEncoding.Json => - import io.circe.Printer - import org.http4s.circe._ - jsonEncoderWithPrinter(Printer.noSpaces).contramap[List[A]] { spans => - ProtoEncoder.toJson(spans) - } - - case HttpPayloadEncoding.Protobuf => - val content = Header.Raw(ci"Content-Type", "application/x-protobuf") - EntityEncoder.simple(content) { spans => - Chunk.array(ProtoEncoder.toByteArray(spans)) - } - } - - def doExport[G[_]: Foldable](records: G[A]): F[Unit] = { - val request = - Request[F](Method.POST, config.endpoint, HttpVersion.`HTTP/1.1`) - .withEntity(records.toList) - .putHeaders(userAgent) - .putHeaders(config.headers) - - client - .run(request) - .use(response => logBody(response).unlessA(isSuccess(response.status))) - .timeoutTo( - config.timeout, - Temporal[F].unit >> Temporal[F].raiseError( - new TimeoutException( - s"The export to [${config.endpoint}] has timed out after [${config.timeout}]" - ) - ) - ) - .handleErrorWith { e => - Console[F].errorln( - s"OtlpHttpClient: cannot export: ${e.getMessage}\n${e.getStackTrace.mkString("\n")}\n" - ) - } - } - - private def isSuccess(status: Status): Boolean = - status.responseClass == Status.Successful - - private def logBody(response: Response[F]): F[Unit] = - for { - body <- response.bodyText.compile.string - _ <- Console[F].println( - s"[OtlpHttpClient/${config.encoding} ${config.endpoint}] the request failed with [${response.status}]. Body: $body" - ) - } yield () - - override def toString: String = { - val headers = config.headers.mkString( - "headers={", - ",", - "}", - Headers.SensitiveHeaders - ) - - "OtlpHttpClient{" + - s"encoding=${config.encoding}, " + - s"endpoint=${config.endpoint}, " + - s"timeout=${config.timeout}, " + - s"gzipCompression=${config.gzipCompression}, " + - headers + - "}" - } -} - -private[otlp] object OtlpHttpClient { - - private final case class Config( - encoding: HttpPayloadEncoding, - endpoint: Uri, - timeout: FiniteDuration, - headers: Headers, - gzipCompression: Boolean - ) - - private object Defaults { - val UserAgentName: String = "OTel-OTLP-Exporter-Scala-Otel4s" - } - - def create[F[_]: Async: Network: Compression: Console, A]( - encoding: HttpPayloadEncoding, - endpoint: Uri, - timeout: FiniteDuration, - headers: Headers, - gzipCompression: Boolean, - retryPolicy: RetryPolicy, - tlsContext: Option[TLSContext[F]], - customClient: Option[Client[F]] - )(implicit - encoder: ProtoEncoder.Message[List[A]], - printer: Printer - ): Resource[F, OtlpHttpClient[F, A]] = { - val config = Config(encoding, endpoint, timeout, headers, gzipCompression) - - def createClient: Resource[F, Client[F]] = - customClient match { - case Some(client) => - Resource - .eval( - Console[F].println( - "You are using a custom http4s client with OtlpHttpClient. 'timeout' and 'tlsContext' settings are ignored." - ) - ) - .as(client) - - case None => - val builder = EmberClientBuilder - .default[F] - .withTimeout(timeout) - - tlsContext.foldLeft(builder)(_.withTLSContext(_)).build - } - - val gzip: Client[F] => Client[F] = - if (gzipCompression) GZip[F]() else identity - - def backoff(attempt: Int): Option[FiniteDuration] = - Option.when(attempt < retryPolicy.maxAttempts) { - val next = - retryPolicy.initialBackoff * attempt.toLong * retryPolicy.backoffMultiplier - - val delay = - next.min(retryPolicy.maxBackoff) - - delay match { - case f: FiniteDuration => f - case _ => retryPolicy.maxBackoff - } - } - - // see https://opentelemetry.io/docs/specs/otlp/#failures-1 - val retryableCodes = Set( - Status.TooManyRequests, - Status.BadGateway, - Status.ServiceUnavailable, - Status.GatewayTimeout - ) - - def shouldRetry(result: Either[Throwable, Response[F]]): Boolean = - result match { - case Left(_) => true - case Right(response) => retryableCodes.contains(response.status) - } - - val policy = HttpRetryPolicy[F](backoff, (_, res) => shouldRetry(res)) - - for { - client <- createClient - } yield new OtlpHttpClient[F, A](Retry(policy)(gzip(client)), config) - } - -} diff --git a/sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/OtlpProtocol.scala b/sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/OtlpProtocol.scala new file mode 100644 index 000000000..8a9cce2f9 --- /dev/null +++ b/sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/OtlpProtocol.scala @@ -0,0 +1,47 @@ +/* + * Copyright 2023 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s.sdk.exporter.otlp + +import cats.Show +import cats.syntax.show._ + +sealed trait OtlpProtocol { + override final def toString: String = + Show[OtlpProtocol].show(this) +} + +object OtlpProtocol { + + def httpJson: OtlpProtocol = + Http(HttpPayloadEncoding.Json) + + def httpProtobuf: OtlpProtocol = + Http(HttpPayloadEncoding.Protobuf) + + def grpc: OtlpProtocol = + Grpc + + implicit val otlpProtocolShow: Show[OtlpProtocol] = + Show.show { + case Grpc => "grpc" + case Http(encoding) => show"http/$encoding" + } + + private[otlp] final case class Http(encoding: HttpPayloadEncoding) extends OtlpProtocol + private[otlp] case object Grpc extends OtlpProtocol + +} diff --git a/sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/PayloadCompression.scala b/sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/PayloadCompression.scala new file mode 100644 index 000000000..fd34485ec --- /dev/null +++ b/sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/PayloadCompression.scala @@ -0,0 +1,43 @@ +/* + * Copyright 2023 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s.sdk.exporter.otlp + +import cats.Show + +sealed trait PayloadCompression { + override def toString: String = + Show[PayloadCompression].show(this) +} + +object PayloadCompression { + + def gzip: PayloadCompression = + Gzip + + def none: PayloadCompression = + NoCompression + + implicit val payloadCompressionSHow: Show[PayloadCompression] = + Show { + case Gzip => "gzip" + case NoCompression => "none" + } + + private[otlp] case object Gzip extends PayloadCompression + private[otlp] case object NoCompression extends PayloadCompression + +} diff --git a/sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/autoconfigure/OtlpHttpClientAutoConfigure.scala b/sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/autoconfigure/OtlpClientAutoConfigure.scala similarity index 65% rename from sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/autoconfigure/OtlpHttpClientAutoConfigure.scala rename to sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/autoconfigure/OtlpClientAutoConfigure.scala index 8515b0221..84f2688b5 100644 --- a/sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/autoconfigure/OtlpHttpClientAutoConfigure.scala +++ b/sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/autoconfigure/OtlpClientAutoConfigure.scala @@ -37,7 +37,7 @@ import scalapb_circe.Printer import scala.concurrent.duration.FiniteDuration -/** Autoconfigures [[OtlpHttpClient]]. +/** Autoconfigures [[OtlpClient]]. * * Target-specific properties are prioritized. E.g. `otel.exporter.otlp.traces.endpoint` is prioritized over * `otel.exporter.otlp.endpoint`. @@ -46,45 +46,68 @@ import scala.concurrent.duration.FiniteDuration * {{{ * | System property | Environment variable | Description | * |---------------------------------------|---------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| + * | otel.exporter.otlp.protocol | OTEL_EXPORTER_OTLP_PROTOCOL | The transport protocol to use. Options include `grpc`, `http/protobuf`, and `http/json`. Default is `http/protobuf`. | * | otel.exporter.otlp.endpoint | OTEL_EXPORTER_OTLP_ENDPOINT | The OTLP traces, metrics, and logs endpoint to connect to. Must be a URL with a scheme of either `http` or `https` based on the use of TLS. Default is `http://localhost:4318/`. | * | otel.exporter.otlp.headers | OTEL_EXPORTER_OTLP_HEADERS | Key-value pairs separated by commas to pass as request headers on OTLP trace, metric, and log requests. | * | otel.exporter.otlp.compression | OTEL_EXPORTER_OTLP_COMPRESSION | The compression type to use on OTLP trace, metric, and log requests. Options include `gzip`. By default no compression will be used. | * | otel.exporter.otlp.timeout | OTEL_EXPORTER_OTLP_TIMEOUT | The maximum waiting time to send each OTLP trace, metric, and log batch. Default is `10 seconds`. | * }}} * + * The metrics-specific configuration options: + * {{{ + * | System property | Environment variable | Description | + * |----------------------------------------|----------------------------------------|----------------------------------------------------------------------------------------------------------------------| + * | otel.exporter.otlp.metrics.protocol | OTEL_EXPORTER_OTLP_METRICS_PROTOCOL | The transport protocol to use. Options include `grpc`, `http/protobuf`, and `http/json`. Default is `http/protobuf`. | + * | otel.exporter.otlp.metrics.headers | OTEL_EXPORTER_OTLP_METRICS_HEADERS | Key-value pairs separated by commas to pass as request headers on OTLP trace requests. | + * | otel.exporter.otlp.metrics.endpoint | OTEL_EXPORTER_OTLP_METRICS_ENDPOINT | The OTLP traces endpoint to connect to. Default is `http://localhost:4318/v1/metrics`. | + * | otel.exporter.otlp.metrics.compression | OTEL_EXPORTER_OTLP_METRICS_COMPRESSION | The compression type to use on OTLP trace requests. Options include `gzip`. By default no compression will be used. | + * | otel.exporter.otlp.metrics.timeout | OTEL_EXPORTER_OTLP_METRICS_TIMEOUT | The maximum waiting time to send each OTLP trace batch. Default is `10 seconds`. | + * }}} + * * The traces-specific configuration options: * {{{ - * | System property | Environment variable | Description | - * |---------------------------------------|---------------------------------------|---------------------------------------------------------------------------------------------------------------------| - * | otel.exporter.otlp.traces.headers | OTEL_EXPORTER_OTLP_TRACES_HEADERS | Key-value pairs separated by commas to pass as request headers on OTLP trace requests. | - * | otel.exporter.otlp.traces.endpoint | OTEL_EXPORTER_OTLP_TRACES_ENDPOINT | The OTLP traces endpoint to connect to. Default is `http://localhost:4318/v1/traces`. | - * | otel.exporter.otlp.traces.compression | OTEL_EXPORTER_OTLP_TRACES_COMPRESSION | The compression type to use on OTLP trace requests. Options include `gzip`. By default no compression will be used. | - * | otel.exporter.otlp.traces.timeout | OTEL_EXPORTER_OTLP_TRACES_TIMEOUT | The maximum waiting time to send each OTLP trace batch. Default is `10 seconds`. | + * | System property | Environment variable | Description | + * |---------------------------------------|---------------------------------------|----------------------------------------------------------------------------------------------------------------------| + * | otel.exporter.otlp.traces.protocol | OTEL_EXPORTER_OTLP_TRACES_PROTOCOL | The transport protocol to use. Options include `grpc`, `http/protobuf`, and `http/json`. Default is `http/protobuf`. | + * | otel.exporter.otlp.traces.headers | OTEL_EXPORTER_OTLP_TRACES_HEADERS | Key-value pairs separated by commas to pass as request headers on OTLP trace requests. | + * | otel.exporter.otlp.traces.endpoint | OTEL_EXPORTER_OTLP_TRACES_ENDPOINT | The OTLP traces endpoint to connect to. Default is `http://localhost:4318/v1/traces`. | + * | otel.exporter.otlp.traces.compression | OTEL_EXPORTER_OTLP_TRACES_COMPRESSION | The compression type to use on OTLP trace requests. Options include `gzip`. By default no compression will be used. | + * | otel.exporter.otlp.traces.timeout | OTEL_EXPORTER_OTLP_TRACES_TIMEOUT | The maximum waiting time to send each OTLP trace batch. Default is `10 seconds`. | * }}} * * @see * [[https://opentelemetry.io/docs/languages/java/configuration/#otlp-exporter-span-metric-and-log-exporters]] */ -private final class OtlpHttpClientAutoConfigure[ +private final class OtlpClientAutoConfigure[ F[_]: Async: Network: Compression: Console, A ]( - specific: OtlpHttpClientAutoConfigure.ConfigKeys.Keys, - defaults: OtlpHttpClientAutoConfigure.Defaults, + specific: OtlpClientAutoConfigure.ConfigKeys.Keys, + defaults: OtlpClientAutoConfigure.Defaults, customClient: Option[Client[F]], configKeys: Set[Config.Key[_]] )(implicit encoder: ProtoEncoder.Message[List[A]], printer: Printer) - extends AutoConfigure.WithHint[F, OtlpHttpClient[F, A]]( - "OtlpHttpClient", + extends AutoConfigure.WithHint[F, OtlpClient[F, A]]( + "OtlpClient", configKeys ) { - import OtlpHttpClientAutoConfigure.{ConfigKeys, Defaults, PayloadCompression} + import OtlpClientAutoConfigure.{ConfigKeys, Defaults} - protected def fromConfig( - config: Config - ): Resource[F, OtlpHttpClient[F, A]] = { + private val protocols: Map[String, OtlpProtocol] = + Map( + "http/json" -> OtlpProtocol.httpJson, + "http/protobuf" -> OtlpProtocol.httpProtobuf, + "grpc" -> OtlpProtocol.grpc + ) + private val compressions: Map[String, PayloadCompression] = + Map( + "gzip" -> PayloadCompression.gzip, + "none" -> PayloadCompression.none + ) + + protected def fromConfig(config: Config): Resource[F, OtlpClient[F, A]] = { def get[V: Config.Reader]( select: ConfigKeys.Keys => Config.Key[V] ): Either[ConfigurationError, Option[V]] = @@ -119,16 +142,17 @@ private final class OtlpHttpClientAutoConfigure[ def tryLoad = for { + protocol <- getOrElse(_.Protocol, _.protocol) endpoint <- getEndpoint timeout <- getOrElse(_.Timeout, _.timeout) headers <- getOrElse(_.Headers, _.headers) - compression <- get(_.Compression) - } yield OtlpHttpClient.create( - defaults.encoding, + compression <- getOrElse(_.Compression, _.compression) + } yield OtlpClient.create( + protocol, endpoint, - timeout, headers, - compression.isDefined, + compression, + timeout, RetryPolicy.default, None, customClient @@ -145,21 +169,6 @@ private final class OtlpHttpClientAutoConfigure[ Uri.fromString(s).leftMap(e => ConfigurationError(e.message)) } - private implicit val compressionReader: Config.Reader[PayloadCompression] = - Config.Reader.decodeWithHint("Compression") { s => - s.trim.toLowerCase match { - case "gzip" => - Right(PayloadCompression.Gzip) - - case _ => - Left( - ConfigurationError( - "Unrecognized compression. Supported options [gzip]" - ) - ) - } - } - private implicit val headersReader: Config.Reader[Headers] = Config.Reader[Map[String, String]].map { value => val headers = value.map { case (key, value) => @@ -167,21 +176,40 @@ private final class OtlpHttpClientAutoConfigure[ } new Headers(headers.toList) } -} -private[exporter] object OtlpHttpClientAutoConfigure { + private implicit val compressionReader: Config.Reader[PayloadCompression] = + Config.Reader.decodeWithHint("Compression") { s => + compressions + .get(s.trim.toLowerCase) + .toRight( + ConfigurationError( + s"Unrecognized compression [$s]. Supported options [${compressions.keys.mkString(", ")}]" + ) + ) + } - private sealed trait PayloadCompression - private object PayloadCompression { - case object Gzip extends PayloadCompression - } + private implicit val protocolReader: Config.Reader[OtlpProtocol] = + Config.Reader.decodeWithHint("Protocol") { s => + protocols + .get(s.trim.toLowerCase) + .toRight( + ConfigurationError( + s"Unrecognized protocol [$s]. Supported options [${protocols.keys.mkString(", ")}]" + ) + ) + } + +} + +private[exporter] object OtlpClientAutoConfigure { - final case class Defaults( + private[otlp] final case class Defaults( + protocol: OtlpProtocol, endpoint: Uri, apiPath: String, headers: Headers, timeout: FiniteDuration, - encoding: HttpPayloadEncoding + compression: PayloadCompression ) private object ConfigKeys { @@ -191,6 +219,8 @@ private[exporter] object OtlpHttpClientAutoConfigure { object Traces extends Keys("otel.exporter.otlp.traces") abstract class Keys(namespace: String) { + val Protocol: Config.Key[OtlpProtocol] = + Config.Key(s"$namespace.protocol") val Endpoint: Config.Key[Uri] = Config.Key(s"$namespace.endpoint") val Headers: Config.Key[Headers] = @@ -200,11 +230,12 @@ private[exporter] object OtlpHttpClientAutoConfigure { val Timeout: Config.Key[FiniteDuration] = Config.Key(s"$namespace.timeout") - val All: Set[Config.Key[_]] = Set(Endpoint, Headers, Compression, Timeout) + val All: Set[Config.Key[_]] = + Set(Protocol, Endpoint, Headers, Compression, Timeout) } } - /** Autoconfigures [[OtlpHttpClient]] using `otel.exporter.otlp.metrics.{x}` and `otel.exporter.otlp.{x}` properties. + /** Autoconfigures [[OtlpClient]] using `otel.exporter.otlp.metrics.{x}` and `otel.exporter.otlp.{x}` properties. * * @param defaults * the default values to use as a fallback when property is missing in the config @@ -215,15 +246,15 @@ private[exporter] object OtlpHttpClientAutoConfigure { )(implicit encoder: ProtoEncoder.Message[List[A]], printer: Printer - ): AutoConfigure[F, OtlpHttpClient[F, A]] = - new OtlpHttpClientAutoConfigure[F, A]( + ): AutoConfigure[F, OtlpClient[F, A]] = + new OtlpClientAutoConfigure[F, A]( ConfigKeys.Metrics, defaults, customClient, ConfigKeys.General.All ++ ConfigKeys.Metrics.All ) - /** Autoconfigures [[OtlpHttpClient]] using `otel.exporter.otlp.traces.{x}` and `otel.exporter.otlp.{x}` properties. + /** Autoconfigures [[OtlpClient]] using `otel.exporter.otlp.traces.{x}` and `otel.exporter.otlp.{x}` properties. * * @param defaults * the default values to use as a fallback when property is missing in the config @@ -234,8 +265,8 @@ private[exporter] object OtlpHttpClientAutoConfigure { )(implicit encoder: ProtoEncoder.Message[List[A]], printer: Printer - ): AutoConfigure[F, OtlpHttpClient[F, A]] = - new OtlpHttpClientAutoConfigure[F, A]( + ): AutoConfigure[F, OtlpClient[F, A]] = + new OtlpClientAutoConfigure[F, A]( ConfigKeys.Traces, defaults, customClient, diff --git a/sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/autoconfigure/ProtocolAutoConfigure.scala b/sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/autoconfigure/ProtocolAutoConfigure.scala deleted file mode 100644 index ba626b792..000000000 --- a/sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/autoconfigure/ProtocolAutoConfigure.scala +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Copyright 2023 Typelevel - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.typelevel.otel4s.sdk.exporter.otlp -package autoconfigure - -import cats.MonadThrow -import cats.effect.Resource -import org.typelevel.otel4s.sdk.autoconfigure.AutoConfigure -import org.typelevel.otel4s.sdk.autoconfigure.Config -import org.typelevel.otel4s.sdk.autoconfigure.ConfigurationError - -/** Autoconfigures OTLP [[org.typelevel.otel4s.sdk.exporter.otlp.Protocol Protocol]]. - * - * The general configuration options: - * {{{ - * | System property | Environment variable | Description | - * |-----------------------------|-----------------------------|-------------------------------------------------------------------------------------------------------------| - * | otel.exporter.otlp.protocol | OTEL_EXPORTER_OTLP_PROTOCOL | The transport protocol to use. Options include `http/protobuf` and `http/json`. Default is `http/protobuf`. | - * }}} - * - * The metrics-specific configuration options: - * {{{ - * | System property | Environment variable | Description | - * |-------------------------------------|-------------------------------------|-------------------------------------------------------------------------------------------------------------| - * | otel.exporter.otlp.metrics.protocol | OTEL_EXPORTER_OTLP_METRICS_PROTOCOL | The transport protocol to use. Options include `http/protobuf` and `http/json`. Default is `http/protobuf`. | - * }}} - * - * The traces-specific configuration options: - * {{{ - * | System property | Environment variable | Description | - * |------------------------------------|------------------------------------|-------------------------------------------------------------------------------------------------------------| - * | otel.exporter.otlp.traces.protocol | OTEL_EXPORTER_OTLP_TRACES_PROTOCOL | The transport protocol to use. Options include `http/protobuf` and `http/json`. Default is `http/protobuf`. | - * }}} - * - * @see - * [[https://opentelemetry.io/docs/languages/sdk-configuration/otlp-exporter/#otel_exporter_otlp_protocol]] - */ -private final class ProtocolAutoConfigure[F[_]: MonadThrow]( - targetSpecificKey: Config.Key[Protocol] -) extends AutoConfigure.WithHint[F, Protocol]( - "Protocol", - Set(ProtocolAutoConfigure.ConfigKeys.GeneralProtocol, targetSpecificKey) - ) { - - import ProtocolAutoConfigure.ConfigKeys - import ProtocolAutoConfigure.Defaults - - private val protocols: Map[String, Protocol] = - Map( - "http/json" -> Protocol.Http(HttpPayloadEncoding.Json), - "http/protobuf" -> Protocol.Http(HttpPayloadEncoding.Protobuf) - ) - - protected def fromConfig(config: Config): Resource[F, Protocol] = { - val protocol = config - .get(targetSpecificKey) - .flatMap { - case Some(value) => - Right(value) - - case None => - config.getOrElse(ConfigKeys.GeneralProtocol, Defaults.OtlpProtocol) - } - - Resource.eval(MonadThrow[F].fromEither(protocol)) - } - - private implicit val protocolReader: Config.Reader[Protocol] = - Config.Reader.decodeWithHint("Protocol") { s => - protocols - .get(s.trim.toLowerCase) - .toRight( - ConfigurationError( - s"Unrecognized protocol [$s]. Supported options [${protocols.keys.mkString(", ")}]" - ) - ) - } - -} - -private[exporter] object ProtocolAutoConfigure { - - private object ConfigKeys { - val GeneralProtocol: Config.Key[Protocol] = - Config.Key("otel.exporter.otlp.protocol") - - val MetricsProtocol: Config.Key[Protocol] = - Config.Key("otel.exporter.otlp.metrics.protocol") - - val TracesProtocol: Config.Key[Protocol] = - Config.Key("otel.exporter.otlp.traces.protocol") - } - - private object Defaults { - val OtlpProtocol: Protocol = Protocol.Http(HttpPayloadEncoding.Protobuf) - } - - /** Autoconfigures OTLP [[org.typelevel.otel4s.sdk.exporter.otlp.Protocol Protocol]]. - * - * The general configuration options: - * {{{ - * | System property | Environment variable | Description | - * |-----------------------------|-----------------------------|-------------------------------------------------------------------------------------------------------------| - * | otel.exporter.otlp.protocol | OTEL_EXPORTER_OTLP_PROTOCOL | The transport protocol to use. Options include `http/protobuf` and `http/json`. Default is `http/protobuf`. | - * }}} - * - * The metrics-specific configuration options: - * {{{ - * | System property | Environment variable | Description | - * |-------------------------------------|-------------------------------------|-------------------------------------------------------------------------------------------------------------| - * | otel.exporter.otlp.metrics.protocol | OTEL_EXPORTER_OTLP_METRICS_PROTOCOL | The transport protocol to use. Options include `http/protobuf` and `http/json`. Default is `http/protobuf`. | - * }}} - * - * @see - * [[https://opentelemetry.io/docs/languages/sdk-configuration/otlp-exporter/#otel_exporter_otlp_protocol]] - */ - def metrics[F[_]: MonadThrow]: AutoConfigure[F, Protocol] = - new ProtocolAutoConfigure[F](ConfigKeys.MetricsProtocol) - - /** Autoconfigures OTLP [[org.typelevel.otel4s.sdk.exporter.otlp.Protocol Protocol]]. - * - * The general configuration options: - * {{{ - * | System property | Environment variable | Description | - * |-----------------------------|-----------------------------|-------------------------------------------------------------------------------------------------------------| - * | otel.exporter.otlp.protocol | OTEL_EXPORTER_OTLP_PROTOCOL | The transport protocol to use. Options include `http/protobuf` and `http/json`. Default is `http/protobuf`. | - * }}} - * - * The traces-specific configuration options: - * {{{ - * | System property | Environment variable | Description | - * |------------------------------------|------------------------------------|-------------------------------------------------------------------------------------------------------------| - * | otel.exporter.otlp.traces.protocol | OTEL_EXPORTER_OTLP_TRACES_PROTOCOL | The transport protocol to use. Options include `http/protobuf` and `http/json`. Default is `http/protobuf`. | - * }}} - * - * @see - * [[https://opentelemetry.io/docs/languages/sdk-configuration/otlp-exporter/#otel_exporter_otlp_protocol]] - */ - def traces[F[_]: MonadThrow]: AutoConfigure[F, Protocol] = - new ProtocolAutoConfigure[F](ConfigKeys.TracesProtocol) -} diff --git a/sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/grpc/GrpcCodecs.scala b/sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/grpc/GrpcCodecs.scala new file mode 100644 index 000000000..f2ca65e30 --- /dev/null +++ b/sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/grpc/GrpcCodecs.scala @@ -0,0 +1,74 @@ +/* + * Copyright 2023 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s.sdk.exporter.otlp.grpc + +import fs2.Chunk +import fs2.Pipe +import fs2.RaiseThrowable +import fs2.Stream +import fs2.compression.Compression +import fs2.interop.scodec.StreamDecoder +import fs2.interop.scodec.StreamEncoder +import scodec.Decoder +import scodec.Encoder + +private[otlp] object GrpcCodecs { + + def decode[F[_]: RaiseThrowable: Compression, A]( + decoder: Decoder[A] + ): Pipe[F, Byte, A] = { + val entityDecoder: Pipe[F, Byte, A] = + StreamDecoder.once(decoder).toPipeByte + + val lpmDecoder: Pipe[F, Byte, LengthPrefixedMessage] = + StreamDecoder.once(LengthPrefixedMessage.codec).toPipeByte + + def decompress: Pipe[F, LengthPrefixedMessage, Byte] = + _.flatMap { lpm => + val payload = Stream.chunk(Chunk.byteVector(lpm.message)) + if (lpm.compressed) { + payload.through(Compression[F].gunzip()).flatMap(_.content) + } else { + payload + } + } + + _.through(lpmDecoder).through(decompress).through(entityDecoder) + } + + def encode[F[_]: RaiseThrowable: Compression, A]( + encoder: Encoder[A], + gzip: Boolean + ): Pipe[F, A, Byte] = { + val compression: Pipe[F, Byte, Byte] = + if (gzip) Compression[F].gzip() else identity + + val entityEncoder: Pipe[F, A, Byte] = + StreamEncoder.once(encoder).toPipeByte + + val lpmEncoder: Pipe[F, LengthPrefixedMessage, Byte] = + StreamEncoder.once(LengthPrefixedMessage.codec).toPipeByte + + _.through(entityEncoder) + .through(compression) + .chunks + .foldMonoid + .map(chunks => LengthPrefixedMessage(gzip, chunks.toByteVector)) + .through(lpmEncoder) + } + +} diff --git a/sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/grpc/GrpcHeaders.scala b/sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/grpc/GrpcHeaders.scala new file mode 100644 index 000000000..5b4fdc565 --- /dev/null +++ b/sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/grpc/GrpcHeaders.scala @@ -0,0 +1,86 @@ +/* + * Copyright 2023 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s.sdk.exporter.otlp.grpc + +import cats.syntax.either._ +import cats.syntax.show._ +import org.http4s.ContentCoding +import org.http4s.Header +import org.http4s.ParseFailure +import org.http4s.ParseResult +import org.typelevel.ci._ + +private[otlp] object GrpcHeaders { + + val ContentType: Header.Raw = + Header.Raw(ci"Content-Type", "application/grpc+proto") + + val TE: Header.Raw = + Header.Raw(ci"te", "trailers") + + final case class GrpcEncoding(coding: ContentCoding) + + object GrpcEncoding { + implicit val header: Header[GrpcEncoding, Header.Single] = + Header.create( + ci"grpc-encoding", + _.coding.coding, + s => ContentCoding.parse(s).map(c => GrpcEncoding(c)) + ) + } + + final case class GrpcAcceptEncoding(coding: ContentCoding) + + object GrpcAcceptEncoding { + implicit val header: Header[GrpcAcceptEncoding, Header.Single] = + Header.create( + ci"grpc-accept-encoding", + _.coding.coding, + s => ContentCoding.parse(s).map(c => GrpcAcceptEncoding(c)) + ) + } + + // https://grpc.github.io/grpc/core/md_doc_statuscodes.html + final case class GrpcStatus(statusCode: Int) + + object GrpcStatus { + private val parser = + cats.parse.Numbers.nonNegativeIntString.map(s => GrpcStatus(s.toInt)) + + implicit val header: Header[GrpcStatus, Header.Single] = + Header.create( + ci"grpc-status", + _.statusCode.toString, + s => + parser + .parseAll(s) + .leftMap(e => ParseFailure("Invalid GrpcStatus", e.show)) + ) + } + + final case class GrpcMessage(message: String) + + object GrpcMessage { + implicit val header: Header[GrpcMessage, Header.Single] = + Header.create( + ci"grpc-message", + _.message, + s => ParseResult.success(GrpcMessage(s)) + ) + } + +} diff --git a/sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/Protocol.scala b/sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/grpc/GrpcStatusException.scala similarity index 67% rename from sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/Protocol.scala rename to sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/grpc/GrpcStatusException.scala index 3c43649cf..3fcab6392 100644 --- a/sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/Protocol.scala +++ b/sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/grpc/GrpcStatusException.scala @@ -14,10 +14,13 @@ * limitations under the License. */ -package org.typelevel.otel4s.sdk.exporter.otlp +package org.typelevel.otel4s.sdk.exporter.otlp.grpc -private[exporter] sealed trait Protocol +import cats.syntax.foldable._ -private[exporter] object Protocol { - final case class Http(encoding: HttpPayloadEncoding) extends Protocol -} +private[otlp] final case class GrpcStatusException( + status: Int, + message: Option[String] +) extends RuntimeException( + s"Grpc error: status [$status]${message.foldMap(m => s", message [$m]")}" + ) diff --git a/sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/grpc/LengthPrefixedMessage.scala b/sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/grpc/LengthPrefixedMessage.scala new file mode 100644 index 000000000..09e6d03cd --- /dev/null +++ b/sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/grpc/LengthPrefixedMessage.scala @@ -0,0 +1,36 @@ +/* + * Copyright 2023 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s.sdk.exporter.otlp.grpc + +import scodec._ +import scodec.bits._ +import scodec.codecs._ + +private[otlp] final case class LengthPrefixedMessage( + compressed: Boolean, + message: ByteVector +) + +private[otlp] object LengthPrefixedMessage { + + val codec: scodec.Codec[LengthPrefixedMessage] = + ( + uint8.xmap[Boolean](_ == 1, compressed => if (compressed) 1 else 0) :: + variableSizeBytesLong(uint32, bytes) + ).as[LengthPrefixedMessage] + +} diff --git a/sdk-exporter/common/src/test/scala/org/typelevel/otel4s/sdk/exporter/autoconfigure/ProtocolAutoConfigureSuite.scala b/sdk-exporter/common/src/test/scala/org/typelevel/otel4s/sdk/exporter/autoconfigure/ProtocolAutoConfigureSuite.scala deleted file mode 100644 index 19b726db1..000000000 --- a/sdk-exporter/common/src/test/scala/org/typelevel/otel4s/sdk/exporter/autoconfigure/ProtocolAutoConfigureSuite.scala +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Copyright 2023 Typelevel - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.typelevel.otel4s.sdk.exporter.autoconfigure - -import cats.effect.IO -import cats.syntax.either._ -import munit.CatsEffectSuite -import org.typelevel.otel4s.sdk.autoconfigure.Config -import org.typelevel.otel4s.sdk.exporter.otlp.HttpPayloadEncoding -import org.typelevel.otel4s.sdk.exporter.otlp.Protocol -import org.typelevel.otel4s.sdk.exporter.otlp.autoconfigure.ProtocolAutoConfigure - -class ProtocolAutoConfigureSuite extends CatsEffectSuite { - - test("metrics - load from the config - empty config - load default") { - val config = Config.ofProps(Map.empty) - val expected = Protocol.Http(HttpPayloadEncoding.Protobuf) - - ProtocolAutoConfigure - .metrics[IO] - .configure(config) - .use(protocol => IO(assertEquals(protocol, expected))) - } - - test("metrics - load from the config - empty string - load default") { - val config = Config.ofProps( - Map( - "otel.exporter.otlp.protocol" -> "", - "otel.exporter.otlp.metrics.protocol" -> "" - ) - ) - - val expected = Protocol.Http(HttpPayloadEncoding.Protobuf) - - ProtocolAutoConfigure - .metrics[IO] - .configure(config) - .use(protocol => IO(assertEquals(protocol, expected))) - } - - test("metrics - load from the config - prioritize 'metrics' properties") { - val config = Config.ofProps( - Map( - "otel.exporter.otlp.protocol" -> "http/protobuf", - "otel.exporter.otlp.metrics.protocol" -> "http/json" - ) - ) - - val expected = Protocol.Http(HttpPayloadEncoding.Json) - - ProtocolAutoConfigure - .metrics[IO] - .configure(config) - .use(protocol => IO(assertEquals(protocol, expected))) - } - - test("metrics - load from the config - unknown protocol - fail") { - val config = Config.ofProps(Map("otel.exporter.otlp.protocol" -> "grpc")) - - ProtocolAutoConfigure - .metrics[IO] - .configure(config) - .use_ - .attempt - .map(_.leftMap(_.getMessage)) - .assertEquals( - Left("""Cannot autoconfigure [Protocol]. - |Cause: Unrecognized protocol [grpc]. Supported options [http/json, http/protobuf]. - |Config: - |1) `otel.exporter.otlp.metrics.protocol` - N/A - |2) `otel.exporter.otlp.protocol` - grpc""".stripMargin) - ) - } - - test("traces - load from the config - empty config - load default") { - val config = Config.ofProps(Map.empty) - val expected = Protocol.Http(HttpPayloadEncoding.Protobuf) - - ProtocolAutoConfigure - .traces[IO] - .configure(config) - .use(protocol => IO(assertEquals(protocol, expected))) - } - - test("traces - load from the config - empty string - load default") { - val config = Config.ofProps( - Map( - "otel.exporter.otlp.protocol" -> "", - "otel.exporter.otlp.traces.protocol" -> "" - ) - ) - - val expected = Protocol.Http(HttpPayloadEncoding.Protobuf) - - ProtocolAutoConfigure - .traces[IO] - .configure(config) - .use(protocol => IO(assertEquals(protocol, expected))) - } - - test("traces - load from the config - prioritize 'traces' properties") { - val config = Config.ofProps( - Map( - "otel.exporter.otlp.protocol" -> "http/protobuf", - "otel.exporter.otlp.traces.protocol" -> "http/json" - ) - ) - - val expected = Protocol.Http(HttpPayloadEncoding.Json) - - ProtocolAutoConfigure - .traces[IO] - .configure(config) - .use(protocol => IO(assertEquals(protocol, expected))) - } - - test("traces - load from the config - unknown protocol - fail") { - val config = Config.ofProps(Map("otel.exporter.otlp.protocol" -> "grpc")) - - ProtocolAutoConfigure - .traces[IO] - .configure(config) - .use_ - .attempt - .map(_.leftMap(_.getMessage)) - .assertEquals( - Left("""Cannot autoconfigure [Protocol]. - |Cause: Unrecognized protocol [grpc]. Supported options [http/json, http/protobuf]. - |Config: - |1) `otel.exporter.otlp.protocol` - grpc - |2) `otel.exporter.otlp.traces.protocol` - N/A""".stripMargin) - ) - } - -} diff --git a/sdk-exporter/common/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/OtlpProtocolSuite.scala b/sdk-exporter/common/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/OtlpProtocolSuite.scala new file mode 100644 index 000000000..e8c5e482d --- /dev/null +++ b/sdk-exporter/common/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/OtlpProtocolSuite.scala @@ -0,0 +1,49 @@ +/* + * Copyright 2023 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s.sdk.exporter.otlp + +import cats.Show +import munit.ScalaCheckSuite +import org.scalacheck.Arbitrary +import org.scalacheck.Gen +import org.scalacheck.Prop + +class OtlpProtocolSuite extends ScalaCheckSuite { + + private implicit val protocolArbitrary: Arbitrary[OtlpProtocol] = + Arbitrary( + Gen.oneOf( + OtlpProtocol.grpc, + OtlpProtocol.httpJson, + OtlpProtocol.httpProtobuf + ) + ) + + test("Show[OtlpProtocol]") { + Prop.forAll { (protocol: OtlpProtocol) => + val expected = protocol match { + case OtlpProtocol.Http(HttpPayloadEncoding.Json) => "http/json" + case OtlpProtocol.Http(HttpPayloadEncoding.Protobuf) => "http/protobuf" + case OtlpProtocol.Grpc => "grpc" + } + + assertEquals(Show[OtlpProtocol].show(protocol), expected) + assertEquals(protocol.toString, expected) + } + } + +} diff --git a/sdk-exporter/common/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/autoconfigure/OtlpClientAutoConfigureSuite.scala b/sdk-exporter/common/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/autoconfigure/OtlpClientAutoConfigureSuite.scala new file mode 100644 index 000000000..c26b7e06d --- /dev/null +++ b/sdk-exporter/common/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/autoconfigure/OtlpClientAutoConfigureSuite.scala @@ -0,0 +1,393 @@ +/* + * Copyright 2023 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s.sdk.exporter +package otlp +package autoconfigure + +import cats.effect.IO +import cats.syntax.either._ +import cats.syntax.traverse._ +import munit.CatsEffectSuite +import org.http4s.Headers +import org.http4s.Uri +import org.http4s.syntax.literals._ +import org.typelevel.otel4s.sdk.autoconfigure.Config +import scalapb_circe.Printer + +import scala.concurrent.duration._ + +class OtlpClientAutoConfigureSuite extends CatsEffectSuite with SuiteRuntimePlatform { + + import OtlpClientAutoConfigure.Defaults + + private sealed trait Payload + + private implicit val protoEncoder: ProtoEncoder.Message[List[Payload]] = + _ => ??? + + private implicit val jsonPrinter: Printer = new Printer() + + private val tracesDefaults = defaults("traces") + private val metricsDefaults = defaults("metrics") + + // + // Traces + // + + test("traces - empty config - load default") { + val config = Config.ofProps(Map.empty) + + val expected = + "OtlpClient{" + + "protocol=http/protobuf, " + + "endpoint=http://localhost:4318/v1/traces, " + + "timeout=10 seconds, " + + "compression=none, " + + "headers={}}" + + configureTraces(config).assertEquals(expected) + } + + test("traces - empty string - load default") { + val config = Config.ofProps( + Map( + "otel.exporter.otlp.compression" -> "", + "otel.exporter.otlp.endpoint" -> "", + "otel.exporter.otlp.headers" -> "", + "otel.exporter.otlp.timeout" -> "", + "otel.exporter.otlp.traces.compression" -> "", + "otel.exporter.otlp.traces.endpoint" -> "", + "otel.exporter.otlp.traces.headers" -> "", + "otel.exporter.otlp.traces.timeout" -> "" + ) + ) + + val expected = + "OtlpClient{" + + "protocol=http/protobuf, " + + "endpoint=http://localhost:4318/v1/traces, " + + "timeout=10 seconds, " + + "compression=none, " + + "headers={}}" + + configureTraces(config).assertEquals(expected) + } + + test("traces - use global properties") { + val config = Config.ofProps( + Map( + "otel.exporter.otlp.protocol" -> "grpc", + "otel.exporter.otlp.compression" -> "gzip", + "otel.exporter.otlp.endpoint" -> "http://localhost:1234/", + "otel.exporter.otlp.headers" -> "header1=value1", + "otel.exporter.otlp.timeout" -> "5 seconds" + ) + ) + + val expected = + "OtlpClient{" + + "protocol=grpc, " + + "endpoint=http://localhost:1234/v1/traces, " + + "timeout=5 seconds, " + + "compression=gzip, " + + "headers={header1: value1}}" + + configureTraces(config).assertEquals(expected) + } + + test("traces - prioritize 'traces' properties") { + val config = Config.ofProps( + Map( + "otel.exporter.otlp.protocol" -> "grpc", + "otel.exporter.otlp.compression" -> "", + "otel.exporter.otlp.endpoint" -> "", + "otel.exporter.otlp.headers" -> "header1=value1", + "otel.exporter.otlp.timeout" -> "5 seconds", + "otel.exporter.otlp.traces.protocol" -> "http/json", + "otel.exporter.otlp.traces.compression" -> "gzip", + "otel.exporter.otlp.traces.endpoint" -> "http://localhost:1234/v2/traces", + "otel.exporter.otlp.traces.headers" -> "header2=value2", + "otel.exporter.otlp.traces.timeout" -> "15 seconds" + ) + ) + + val expected = + "OtlpClient{" + + "protocol=http/json, " + + "endpoint=http://localhost:1234/v2/traces, " + + "timeout=15 seconds, " + + "compression=gzip, " + + "headers={header2: value2}}" + + configureTraces(config).assertEquals(expected) + } + + test("traces - invalid property - fail") { + val input = List( + "otel.exporter.otlp.protocol" -> "other", + "otel.exporter.otlp.compression" -> "unknown", + "otel.exporter.otlp.endpoint" -> "not\\//-a-url", + "otel.exporter.otlp.headers" -> "header1", + "otel.exporter.otlp.timeout" -> "5 hertz", + "otel.exporter.otlp.traces.protocol" -> "other1", + "otel.exporter.otlp.traces.compression" -> "gzipped", + "otel.exporter.otlp.traces.endpoint" -> "aa aa", + "otel.exporter.otlp.traces.headers" -> "not a header", + "otel.exporter.otlp.traces.timeout" -> "invalid" + ) + + val empty = Map( + "otel.exporter.otlp.protocol" -> "", + "otel.exporter.otlp.compression" -> "", + "otel.exporter.otlp.endpoint" -> "", + "otel.exporter.otlp.headers" -> "", + "otel.exporter.otlp.timeout" -> "", + "otel.exporter.otlp.traces.protocol" -> "", + "otel.exporter.otlp.traces.compression" -> "", + "otel.exporter.otlp.traces.endpoint" -> "", + "otel.exporter.otlp.traces.headers" -> "", + "otel.exporter.otlp.traces.timeout" -> "" + ) + + val errors = Map( + "otel.exporter.otlp.protocol" -> "Unrecognized protocol [other]. Supported options [http/json, http/protobuf, grpc]", + "otel.exporter.otlp.compression" -> "Unrecognized compression [unknown]. Supported options [gzip, none]", + "otel.exporter.otlp.endpoint" -> Uri + .fromString("not\\//-a-url") + .fold(_.message, _ => ""), + "otel.exporter.otlp.headers" -> "Invalid map property [header1]", + "otel.exporter.otlp.timeout" -> "Invalid value for property otel.exporter.otlp.timeout=5 hertz. Must be [FiniteDuration]", + "otel.exporter.otlp.traces.protocol" -> "Unrecognized protocol [other1]. Supported options [http/json, http/protobuf, grpc]", + "otel.exporter.otlp.traces.compression" -> "Unrecognized compression [gzipped]. Supported options [gzip, none]", + "otel.exporter.otlp.traces.endpoint" -> Uri + .fromString("aa aa") + .fold(_.message, _ => ""), + "otel.exporter.otlp.traces.headers" -> "Invalid map property [not a header]", + "otel.exporter.otlp.traces.timeout" -> "Invalid value for property otel.exporter.otlp.traces.timeout=invalid. Must be [FiniteDuration]" + ) + + input.traverse { case (key, value) => + val properties = empty.updated(key, value) + val config = Config.ofProps(properties) + val cause = errors(key) + + val prettyConfig = properties.toSeq.sorted.zipWithIndex + .map { case ((k, v), i) => + val idx = i + 1 + val value = if (v.isEmpty) "N/A" else v + s"$idx) `$k` - $value" + } + .mkString("\n") + + val expected = + s"Cannot autoconfigure [OtlpClient].\nCause: $cause.\nConfig:\n$prettyConfig" + + OtlpClientAutoConfigure + .traces[IO, Payload](tracesDefaults, None) + .configure(config) + .use_ + .attempt + .map(_.leftMap(_.getMessage)) + .assertEquals(Left(expected)) + } + } + + // + // Metrics + // + + test("metrics - empty config - load default") { + val config = Config.ofProps(Map.empty) + + val expected = + "OtlpClient{" + + "protocol=http/protobuf, " + + "endpoint=http://localhost:4318/v1/metrics, " + + "timeout=10 seconds, " + + "compression=none, " + + "headers={}}" + + configureMetrics(config).assertEquals(expected) + } + + test("metrics - empty string - load default") { + val config = Config.ofProps( + Map( + "otel.exporter.otlp.compression" -> "", + "otel.exporter.otlp.endpoint" -> "", + "otel.exporter.otlp.headers" -> "", + "otel.exporter.otlp.timeout" -> "", + "otel.exporter.otlp.metrics.compression" -> "", + "otel.exporter.otlp.metrics.endpoint" -> "", + "otel.exporter.otlp.metrics.headers" -> "", + "otel.exporter.otlp.metrics.timeout" -> "" + ) + ) + + val expected = + "OtlpClient{" + + "protocol=http/protobuf, " + + "endpoint=http://localhost:4318/v1/metrics, " + + "timeout=10 seconds, " + + "compression=none, " + + "headers={}}" + + configureMetrics(config).assertEquals(expected) + } + + test("metrics - use global properties") { + val config = Config.ofProps( + Map( + "otel.exporter.otlp.protocol" -> "grpc", + "otel.exporter.otlp.compression" -> "gzip", + "otel.exporter.otlp.endpoint" -> "http://localhost:1234/", + "otel.exporter.otlp.headers" -> "header1=value1", + "otel.exporter.otlp.timeout" -> "5 seconds" + ) + ) + + val expected = + "OtlpClient{" + + "protocol=grpc, " + + "endpoint=http://localhost:1234/v1/metrics, " + + "timeout=5 seconds, " + + "compression=gzip, " + + "headers={header1: value1}}" + + configureMetrics(config).assertEquals(expected) + } + + test("metrics - prioritize 'metrics' properties") { + val config = Config.ofProps( + Map( + "otel.exporter.otlp.protocol" -> "grpc", + "otel.exporter.otlp.compression" -> "", + "otel.exporter.otlp.endpoint" -> "", + "otel.exporter.otlp.headers" -> "header1=value1", + "otel.exporter.otlp.timeout" -> "5 seconds", + "otel.exporter.otlp.metrics.protocol" -> "http/json", + "otel.exporter.otlp.metrics.compression" -> "gzip", + "otel.exporter.otlp.metrics.endpoint" -> "http://localhost:1234/v2/metrics", + "otel.exporter.otlp.metrics.headers" -> "header2=value2", + "otel.exporter.otlp.metrics.timeout" -> "15 seconds" + ) + ) + + val expected = + "OtlpClient{" + + "protocol=http/json, " + + "endpoint=http://localhost:1234/v2/metrics, " + + "timeout=15 seconds, " + + "compression=gzip, " + + "headers={header2: value2}}" + + configureMetrics(config).assertEquals(expected) + } + + test("metrics - invalid property - fail") { + val input = List( + "otel.exporter.otlp.protocol" -> "other", + "otel.exporter.otlp.compression" -> "unknown", + "otel.exporter.otlp.endpoint" -> "not\\//-a-url", + "otel.exporter.otlp.headers" -> "header1", + "otel.exporter.otlp.timeout" -> "5 hertz", + "otel.exporter.otlp.metrics.protocol" -> "other1", + "otel.exporter.otlp.metrics.compression" -> "gzipped", + "otel.exporter.otlp.metrics.endpoint" -> "aa aa", + "otel.exporter.otlp.metrics.headers" -> "not a header", + "otel.exporter.otlp.metrics.timeout" -> "invalid" + ) + + val empty = Map( + "otel.exporter.otlp.protocol" -> "", + "otel.exporter.otlp.compression" -> "", + "otel.exporter.otlp.endpoint" -> "", + "otel.exporter.otlp.headers" -> "", + "otel.exporter.otlp.timeout" -> "", + "otel.exporter.otlp.metrics.protocol" -> "", + "otel.exporter.otlp.metrics.compression" -> "", + "otel.exporter.otlp.metrics.endpoint" -> "", + "otel.exporter.otlp.metrics.headers" -> "", + "otel.exporter.otlp.metrics.timeout" -> "" + ) + + val errors = Map( + "otel.exporter.otlp.protocol" -> "Unrecognized protocol [other]. Supported options [http/json, http/protobuf, grpc]", + "otel.exporter.otlp.compression" -> "Unrecognized compression [unknown]. Supported options [gzip, none]", + "otel.exporter.otlp.endpoint" -> Uri + .fromString("not\\//-a-url") + .fold(_.message, _ => ""), + "otel.exporter.otlp.headers" -> "Invalid map property [header1]", + "otel.exporter.otlp.timeout" -> "Invalid value for property otel.exporter.otlp.timeout=5 hertz. Must be [FiniteDuration]", + "otel.exporter.otlp.metrics.protocol" -> "Unrecognized protocol [other1]. Supported options [http/json, http/protobuf, grpc]", + "otel.exporter.otlp.metrics.compression" -> "Unrecognized compression [gzipped]. Supported options [gzip, none]", + "otel.exporter.otlp.metrics.endpoint" -> Uri + .fromString("aa aa") + .fold(_.message, _ => ""), + "otel.exporter.otlp.metrics.headers" -> "Invalid map property [not a header]", + "otel.exporter.otlp.metrics.timeout" -> "Invalid value for property otel.exporter.otlp.metrics.timeout=invalid. Must be [FiniteDuration]" + ) + + input.traverse { case (key, value) => + val properties = empty.updated(key, value) + val config = Config.ofProps(properties) + val cause = errors(key) + + val prettyConfig = properties.toSeq.sorted.zipWithIndex + .map { case ((k, v), i) => + val idx = i + 1 + val value = if (v.isEmpty) "N/A" else v + s"$idx) `$k` - $value" + } + .mkString("\n") + + val expected = + s"Cannot autoconfigure [OtlpClient].\nCause: $cause.\nConfig:\n$prettyConfig" + + OtlpClientAutoConfigure + .metrics[IO, Payload](metricsDefaults, None) + .configure(config) + .use_ + .attempt + .map(_.leftMap(_.getMessage)) + .assertEquals(Left(expected)) + } + } + + private def configureMetrics(config: Config): IO[String] = + OtlpClientAutoConfigure + .metrics[IO, Payload](metricsDefaults, None) + .configure(config) + .use(client => IO.pure(client.toString)) + + private def configureTraces(config: Config): IO[String] = + OtlpClientAutoConfigure + .traces[IO, Payload](tracesDefaults, None) + .configure(config) + .use(client => IO.pure(client.toString)) + + private def defaults(path: String) = + Defaults( + protocol = OtlpProtocol.httpProtobuf, + endpoint = uri"http://localhost:4318" / "v1" / path, + apiPath = s"v1/$path", + headers = Headers.empty, + timeout = 10.seconds, + compression = PayloadCompression.none + ) + +} diff --git a/sdk-exporter/common/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/autoconfigure/OtlpHttpClientAutoConfigureSuite.scala b/sdk-exporter/common/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/autoconfigure/OtlpHttpClientAutoConfigureSuite.scala deleted file mode 100644 index 29268b533..000000000 --- a/sdk-exporter/common/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/autoconfigure/OtlpHttpClientAutoConfigureSuite.scala +++ /dev/null @@ -1,239 +0,0 @@ -/* - * Copyright 2023 Typelevel - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.typelevel.otel4s.sdk.exporter -package otlp.autoconfigure - -import cats.effect.IO -import cats.syntax.either._ -import cats.syntax.traverse._ -import munit.CatsEffectSuite -import org.http4s.Headers -import org.http4s.Uri -import org.http4s.syntax.literals._ -import org.typelevel.otel4s.sdk.autoconfigure.Config -import org.typelevel.otel4s.sdk.exporter.otlp.HttpPayloadEncoding -import org.typelevel.otel4s.sdk.exporter.otlp.ProtoEncoder -import scalapb_circe.Printer - -import scala.concurrent.duration._ - -class OtlpHttpClientAutoConfigureSuite extends CatsEffectSuite with SuiteRuntimePlatform { - - import OtlpHttpClientAutoConfigure.Defaults - - private sealed trait Payload - - private implicit val protoEncoder: ProtoEncoder.Message[List[Payload]] = - _ => ??? - - private implicit val jsonPrinter: Printer = new Printer() - - private val tracesDefaults = Defaults( - uri"http://localhost:4318/v1/traces", - "v1/traces", - Headers.empty, - 10.seconds, - HttpPayloadEncoding.Protobuf - ) - - test("traces - empty config - load default") { - val config = Config.ofProps(Map.empty) - - val expected = - "OtlpHttpClient{" + - "encoding=Protobuf, " + - "endpoint=http://localhost:4318/v1/traces, " + - "timeout=10 seconds, " + - "gzipCompression=false, " + - "headers={}}" - - OtlpHttpClientAutoConfigure - .traces[IO, Payload](tracesDefaults, None) - .configure(config) - .use(client => IO(assertEquals(client.toString, expected))) - } - - test("traces - empty string - load default") { - val config = Config.ofProps( - Map( - "otel.exporter.otlp.compression" -> "", - "otel.exporter.otlp.endpoint" -> "", - "otel.exporter.otlp.headers" -> "", - "otel.exporter.otlp.timeout" -> "", - "otel.exporter.otlp.traces.compression" -> "", - "otel.exporter.otlp.traces.endpoint" -> "", - "otel.exporter.otlp.traces.headers" -> "", - "otel.exporter.otlp.traces.timeout" -> "" - ) - ) - - val expected = - "OtlpHttpClient{" + - "encoding=Protobuf, " + - "endpoint=http://localhost:4318/v1/traces, " + - "timeout=10 seconds, " + - "gzipCompression=false, " + - "headers={}}" - - OtlpHttpClientAutoConfigure - .traces[IO, Payload](tracesDefaults, None) - .configure(config) - .use(client => IO(assertEquals(client.toString, expected))) - } - - test("traces - use global properties") { - val config = Config.ofProps( - Map( - "otel.exporter.otlp.compression" -> "gzip", - "otel.exporter.otlp.endpoint" -> "http://localhost:1234/", - "otel.exporter.otlp.headers" -> "header1=value1", - "otel.exporter.otlp.timeout" -> "5 seconds" - ) - ) - - val expected = - "OtlpHttpClient{" + - "encoding=Protobuf, " + - "endpoint=http://localhost:1234/v1/traces, " + - "timeout=5 seconds, " + - "gzipCompression=true, " + - "headers={header1: value1}}" - - OtlpHttpClientAutoConfigure - .traces[IO, Payload](tracesDefaults, None) - .configure(config) - .use(client => IO(assertEquals(client.toString, expected))) - } - - test("traces - prioritize 'traces' properties") { - val config = Config.ofProps( - Map( - "otel.exporter.otlp.compression" -> "", - "otel.exporter.otlp.endpoint" -> "", - "otel.exporter.otlp.headers" -> "header1=value1", - "otel.exporter.otlp.timeout" -> "5 seconds", - "otel.exporter.otlp.traces.compression" -> "gzip", - "otel.exporter.otlp.traces.endpoint" -> "http://localhost:1234/v2/traces", - "otel.exporter.otlp.traces.headers" -> "header2=value2", - "otel.exporter.otlp.traces.timeout" -> "15 seconds" - ) - ) - - val expected = - "OtlpHttpClient{" + - "encoding=Protobuf, " + - "endpoint=http://localhost:1234/v2/traces, " + - "timeout=15 seconds, " + - "gzipCompression=true, " + - "headers={header2: value2}}" - - OtlpHttpClientAutoConfigure - .traces[IO, Payload](tracesDefaults, None) - .configure(config) - .use(client => IO(assertEquals(client.toString, expected))) - } - - test("traces - invalid property - fail") { - val input = List( - "otel.exporter.otlp.compression" -> "unknown", - "otel.exporter.otlp.endpoint" -> "not\\//-a-url", - "otel.exporter.otlp.headers" -> "header1", - "otel.exporter.otlp.timeout" -> "5 hertz", - "otel.exporter.otlp.traces.compression" -> "gzipped", - "otel.exporter.otlp.traces.endpoint" -> "aa aa", - "otel.exporter.otlp.traces.headers" -> "not a header", - "otel.exporter.otlp.traces.timeout" -> "invalid" - ) - - val empty = Map( - "otel.exporter.otlp.compression" -> "", - "otel.exporter.otlp.endpoint" -> "", - "otel.exporter.otlp.headers" -> "", - "otel.exporter.otlp.timeout" -> "", - "otel.exporter.otlp.traces.compression" -> "", - "otel.exporter.otlp.traces.endpoint" -> "", - "otel.exporter.otlp.traces.headers" -> "", - "otel.exporter.otlp.traces.timeout" -> "" - ) - - val errors = Map( - "otel.exporter.otlp.compression" -> "Unrecognized compression. Supported options [gzip]", - "otel.exporter.otlp.endpoint" -> Uri - .fromString("not\\//-a-url") - .fold(_.message, _ => ""), - "otel.exporter.otlp.headers" -> "Invalid map property [header1]", - "otel.exporter.otlp.timeout" -> "Invalid value for property otel.exporter.otlp.timeout=5 hertz. Must be [FiniteDuration]", - "otel.exporter.otlp.traces.compression" -> "Unrecognized compression. Supported options [gzip]", - "otel.exporter.otlp.traces.endpoint" -> Uri - .fromString("aa aa") - .fold(_.message, _ => ""), - "otel.exporter.otlp.traces.headers" -> "Invalid map property [not a header]", - "otel.exporter.otlp.traces.timeout" -> "Invalid value for property otel.exporter.otlp.traces.timeout=invalid. Must be [FiniteDuration]" - ) - - input.traverse { case (key, value) => - val properties = empty.updated(key, value) - val config = Config.ofProps(properties) - val cause = errors(key) - - val prettyConfig = properties.toSeq.sorted.zipWithIndex - .map { case ((k, v), i) => - val idx = i + 1 - val value = if (v.isEmpty) "N/A" else v - s"$idx) `$k` - $value" - } - .mkString("\n") - - val expected = - s"Cannot autoconfigure [OtlpHttpClient].\nCause: $cause.\nConfig:\n$prettyConfig" - - OtlpHttpClientAutoConfigure - .traces[IO, Payload](tracesDefaults, None) - .configure(config) - .use_ - .attempt - .map(_.leftMap(_.getMessage)) - .assertEquals(Left(expected)) - } - } - - test("traces - unknown protocol - fail") { - val config = - Config.ofProps(Map("otel.exporter.otlp.headers" -> "non-header")) - - OtlpHttpClientAutoConfigure - .traces[IO, Payload](tracesDefaults, None) - .configure(config) - .use_ - .attempt - .map(_.leftMap(_.getMessage)) - .assertEquals( - Left("""Cannot autoconfigure [OtlpHttpClient]. - |Cause: Invalid map property [non-header]. - |Config: - |1) `otel.exporter.otlp.compression` - N/A - |2) `otel.exporter.otlp.endpoint` - N/A - |3) `otel.exporter.otlp.headers` - non-header - |4) `otel.exporter.otlp.timeout` - N/A - |5) `otel.exporter.otlp.traces.compression` - N/A - |6) `otel.exporter.otlp.traces.endpoint` - N/A - |7) `otel.exporter.otlp.traces.headers` - N/A - |8) `otel.exporter.otlp.traces.timeout` - N/A""".stripMargin) - ) - } - -} diff --git a/sdk-exporter/metrics/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/metrics/OtlpHttpMetricExporter.scala b/sdk-exporter/metrics/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/metrics/OtlpMetricExporter.scala similarity index 78% rename from sdk-exporter/metrics/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/metrics/OtlpHttpMetricExporter.scala rename to sdk-exporter/metrics/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/metrics/OtlpMetricExporter.scala index fec73c1df..244d04022 100644 --- a/sdk-exporter/metrics/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/metrics/OtlpHttpMetricExporter.scala +++ b/sdk-exporter/metrics/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/metrics/OtlpMetricExporter.scala @@ -38,21 +38,29 @@ import org.typelevel.otel4s.sdk.metrics.exporter.MetricExporter import scala.concurrent.duration._ -/** Exports metrics via HTTP. Support `json` and `protobuf` encoding. +/** Exports metrics via HTTP. + * + * Supported protocols: + * - `grpc` + * - `http/json` + * - `http/protobuf` * * @see * [[https://opentelemetry.io/docs/specs/otel/protocol/exporter/]] * * @see * [[https://opentelemetry.io/docs/concepts/sdk-configuration/otlp-exporter-configuration/]] + * + * @see + * [[https://github.com/open-telemetry/opentelemetry-proto/blob/v1.3.2/opentelemetry/proto/collector/metrics/v1/metrics_service.proto]] */ -private final class OtlpHttpMetricExporter[F[_]: Applicative] private[otlp] ( - client: OtlpHttpClient[F, MetricData], +private final class OtlpMetricExporter[F[_]: Applicative] private[otlp] ( + client: OtlpClient[F, MetricData], val aggregationTemporalitySelector: AggregationTemporalitySelector, val defaultAggregationSelector: AggregationSelector, val defaultCardinalityLimitSelector: CardinalityLimitSelector ) extends MetricExporter.Push[F] { - val name: String = s"OtlpHttpMetricExporter{client=$client}" + val name: String = s"OtlpMetricExporter{client=$client}" def exportMetrics[G[_]: Foldable](metrics: G[MetricData]): F[Unit] = client.doExport(metrics) @@ -60,15 +68,17 @@ private final class OtlpHttpMetricExporter[F[_]: Applicative] private[otlp] ( def flush: F[Unit] = Applicative[F].unit } -object OtlpHttpMetricExporter { +object OtlpMetricExporter { private[otlp] object Defaults { - val Endpoint: Uri = uri"http://localhost:4318/v1/metrics" + val Protocol: OtlpProtocol = OtlpProtocol.httpProtobuf + val HttpEndpoint: Uri = uri"http://localhost:4318/v1/metrics" + val GrpcEndpoint: Uri = uri"http://localhost:4317/opentelemetry.proto.collector.metrics.v1.MetricsService/Export" val Timeout: FiniteDuration = 10.seconds - val GzipCompression: Boolean = false + val Compression: PayloadCompression = PayloadCompression.none } - /** A builder of [[OtlpHttpMetricExporter]] */ + /** A builder of [[OtlpMetricExporter]] */ sealed trait Builder[F[_]] { /** Sets the OTLP endpoint to connect to. @@ -85,14 +95,11 @@ object OtlpHttpMetricExporter { */ def withTimeout(timeout: FiniteDuration): Builder[F] - /** Enables Gzip compression. + /** Sets the compression to use. * - * The compression is disabled by default. + * Default protocol is [[PayloadCompression.none]]. */ - def withGzip: Builder[F] - - /** Disables Gzip compression. */ - def withoutGzip: Builder[F] + def withCompression(compression: PayloadCompression): Builder[F] /** Adds headers to requests. */ def addHeaders(headers: Headers): Builder[F] @@ -109,9 +116,9 @@ object OtlpHttpMetricExporter { /** Configures the exporter to use the given encoding. * - * Default encoding is `Protobuf`. + * Default protocol is [[OtlpProtocol.httpProtobuf]]. */ - def withEncoding(encoding: HttpPayloadEncoding): Builder[F] + def withProtocol(protocol: OtlpProtocol): Builder[F] /** Sets the aggregation temporality selector to use. * @@ -154,12 +161,12 @@ object OtlpHttpMetricExporter { */ def withClient(client: Client[F]): Builder[F] - /** Creates a [[OtlpHttpMetricExporter]] using the configuration of this builder. + /** Creates a [[OtlpMetricExporter]] using the configuration of this builder. */ def build: Resource[F, MetricExporter.Push[F]] } - /** Creates a [[Builder]] of [[OtlpHttpMetricExporter]] with the default configuration: + /** Creates a [[Builder]] of [[OtlpMetricExporter]] with the default configuration: * - encoding: `Protobuf` * - endpoint: `http://localhost:4318/v1/metrics` * - timeout: `10 seconds` @@ -167,9 +174,9 @@ object OtlpHttpMetricExporter { */ def builder[F[_]: Async: Network: Compression: Console]: Builder[F] = BuilderImpl( - encoding = HttpPayloadEncoding.Protobuf, - endpoint = Defaults.Endpoint, - gzipCompression = Defaults.GzipCompression, + protocol = Defaults.Protocol, + endpoint = None, + compression = Defaults.Compression, timeout = Defaults.Timeout, headers = Headers.empty, retryPolicy = RetryPolicy.default, @@ -183,9 +190,9 @@ object OtlpHttpMetricExporter { private final case class BuilderImpl[ F[_]: Async: Network: Compression: Console ]( - encoding: HttpPayloadEncoding, - endpoint: Uri, - gzipCompression: Boolean, + protocol: OtlpProtocol, + endpoint: Option[Uri], + compression: PayloadCompression, timeout: FiniteDuration, headers: Headers, retryPolicy: RetryPolicy, @@ -200,16 +207,13 @@ object OtlpHttpMetricExporter { copy(timeout = timeout) def withEndpoint(endpoint: Uri): Builder[F] = - copy(endpoint = endpoint) + copy(endpoint = Some(endpoint)) def addHeaders(headers: Headers): Builder[F] = copy(headers = this.headers ++ headers) - def withGzip: Builder[F] = - copy(gzipCompression = true) - - def withoutGzip: Builder[F] = - copy(gzipCompression = false) + def withCompression(compression: PayloadCompression): Builder[F] = + copy(compression = compression) def withTLSContext(context: TLSContext[F]): Builder[F] = copy(tlsContext = Some(context)) @@ -217,8 +221,8 @@ object OtlpHttpMetricExporter { def withRetryPolicy(policy: RetryPolicy): Builder[F] = copy(retryPolicy = policy) - def withEncoding(encoding: HttpPayloadEncoding): Builder[F] = - copy(encoding = encoding) + def withProtocol(protocol: OtlpProtocol): Builder[F] = + copy(protocol = protocol) def withAggregationTemporalitySelector( selector: AggregationTemporalitySelector @@ -242,18 +246,25 @@ object OtlpHttpMetricExporter { import MetricsProtoEncoder.exportMetricsRequest import MetricsProtoEncoder.jsonPrinter + val endpoint = this.endpoint.getOrElse { + protocol match { + case _: OtlpProtocol.Http => Defaults.HttpEndpoint + case OtlpProtocol.Grpc => Defaults.GrpcEndpoint + } + } + for { - client <- OtlpHttpClient.create[F, MetricData]( - encoding, + client <- OtlpClient.create[F, MetricData]( + protocol, endpoint, - timeout, headers, - gzipCompression, + compression, + timeout, retryPolicy, tlsContext, client ) - } yield new OtlpHttpMetricExporter[F]( + } yield new OtlpMetricExporter[F]( client, aggregationTemporalitySelector, defaultAggregationSelector, diff --git a/sdk-exporter/metrics/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/metrics/autoconfigure/OtlpMetricExporterAutoConfigure.scala b/sdk-exporter/metrics/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/metrics/autoconfigure/OtlpMetricExporterAutoConfigure.scala index 4f15f88df..edd1e1d91 100644 --- a/sdk-exporter/metrics/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/metrics/autoconfigure/OtlpMetricExporterAutoConfigure.scala +++ b/sdk-exporter/metrics/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/metrics/autoconfigure/OtlpMetricExporterAutoConfigure.scala @@ -25,11 +25,9 @@ import org.http4s.Headers import org.http4s.client.Client import org.typelevel.otel4s.sdk.autoconfigure.AutoConfigure import org.typelevel.otel4s.sdk.autoconfigure.Config -import org.typelevel.otel4s.sdk.exporter.otlp.Protocol -import org.typelevel.otel4s.sdk.exporter.otlp.autoconfigure.OtlpHttpClientAutoConfigure -import org.typelevel.otel4s.sdk.exporter.otlp.autoconfigure.ProtocolAutoConfigure +import org.typelevel.otel4s.sdk.exporter.otlp.autoconfigure.OtlpClientAutoConfigure import org.typelevel.otel4s.sdk.exporter.otlp.metrics.MetricsProtoEncoder -import org.typelevel.otel4s.sdk.exporter.otlp.metrics.OtlpHttpMetricExporter +import org.typelevel.otel4s.sdk.exporter.otlp.metrics.OtlpMetricExporter import org.typelevel.otel4s.sdk.metrics.data.MetricData import org.typelevel.otel4s.sdk.metrics.exporter.AggregationSelector import org.typelevel.otel4s.sdk.metrics.exporter.AggregationTemporalitySelector @@ -39,10 +37,7 @@ import org.typelevel.otel4s.sdk.metrics.exporter.MetricExporter /** Autoconfigures OTLP [[org.typelevel.otel4s.sdk.metrics.exporter.MetricExporter MetricExporter]]. * * @see - * [[ProtocolAutoConfigure]] for OTLP protocol configuration - * - * @see - * [[OtlpHttpClientAutoConfigure]] for OTLP HTTP client configuration + * [[OtlpClientAutoConfigure]] for OTLP client configuration * * @see * [[https://opentelemetry.io/docs/languages/sdk-configuration/otlp-exporter/#otel_exporter_otlp_protocol]] @@ -58,31 +53,31 @@ private final class OtlpMetricExporterAutoConfigure[ def name: String = "otlp" - protected def fromConfig(config: Config): Resource[F, MetricExporter[F]] = - ProtocolAutoConfigure.metrics[F].configure(config).flatMap { case Protocol.Http(encoding) => - import MetricsProtoEncoder.exportMetricsRequest - import MetricsProtoEncoder.jsonPrinter + protected def fromConfig(config: Config): Resource[F, MetricExporter[F]] = { + import MetricsProtoEncoder.exportMetricsRequest + import MetricsProtoEncoder.jsonPrinter - val defaults = OtlpHttpClientAutoConfigure.Defaults( - OtlpHttpMetricExporter.Defaults.Endpoint, - OtlpHttpMetricExporter.Defaults.Endpoint.path.toString, - Headers.empty, - OtlpHttpMetricExporter.Defaults.Timeout, - encoding - ) + val defaults = OtlpClientAutoConfigure.Defaults( + OtlpMetricExporter.Defaults.Protocol, + OtlpMetricExporter.Defaults.HttpEndpoint, + OtlpMetricExporter.Defaults.HttpEndpoint.path.toString, + Headers.empty, + OtlpMetricExporter.Defaults.Timeout, + OtlpMetricExporter.Defaults.Compression + ) - OtlpHttpClientAutoConfigure - .metrics[F, MetricData](defaults, customClient) - .configure(config) - .map { client => - new OtlpHttpMetricExporter[F]( - client, - AggregationTemporalitySelector.alwaysCumulative, - AggregationSelector.default, - CardinalityLimitSelector.default - ) - } - } + OtlpClientAutoConfigure + .metrics[F, MetricData](defaults, customClient) + .configure(config) + .map { client => + new OtlpMetricExporter[F]( + client, + AggregationTemporalitySelector.alwaysCumulative, + AggregationSelector.default, + CardinalityLimitSelector.default + ) + } + } } @@ -92,7 +87,10 @@ object OtlpMetricExporterAutoConfigure { * * The configuration depends on the `otel.exporter.otlp.protocol` or `otel.exporter.otlp.metrics.protocol`. * - * The supported protocols: `http/json`, `http/protobuf`. + * Supported protocols: + * - `grpc` + * - `http/json` + * - `http/protobuf` * * @see * `OtlpHttpClientAutoConfigure` for the configuration details of the OTLP HTTP client @@ -107,7 +105,10 @@ object OtlpMetricExporterAutoConfigure { * * The configuration depends on the `otel.exporter.otlp.protocol` or `otel.exporter.otlp.metrics.protocol`. * - * The supported protocols: `http/json`, `http/protobuf`. + * Supported protocols: + * - `grpc` + * - `http/json` + * - `http/protobuf` * * @see * `OtlpHttpClientAutoConfigure` for the configuration details of the OTLP HTTP client diff --git a/sdk-exporter/metrics/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/metrics/OtlpHttpMetricExporterSuite.scala b/sdk-exporter/metrics/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/metrics/OtlpMetricExporterSuite.scala similarity index 84% rename from sdk-exporter/metrics/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/metrics/OtlpHttpMetricExporterSuite.scala rename to sdk-exporter/metrics/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/metrics/OtlpMetricExporterSuite.scala index cd9d7ff82..02a5fb66d 100644 --- a/sdk-exporter/metrics/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/metrics/OtlpHttpMetricExporterSuite.scala +++ b/sdk-exporter/metrics/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/metrics/OtlpMetricExporterSuite.scala @@ -14,7 +14,8 @@ * limitations under the License. */ -package org.typelevel.otel4s.sdk.exporter.otlp.metrics +package org.typelevel.otel4s.sdk.exporter.otlp +package metrics import cats.data.NonEmptyVector import cats.effect.IO @@ -36,7 +37,6 @@ import org.typelevel.otel4s.AttributeType import org.typelevel.otel4s.Attributes import org.typelevel.otel4s.sdk.exporter.RetryPolicy import org.typelevel.otel4s.sdk.exporter.SuiteRuntimePlatform -import org.typelevel.otel4s.sdk.exporter.otlp.HttpPayloadEncoding import org.typelevel.otel4s.sdk.metrics.data.AggregationTemporality import org.typelevel.otel4s.sdk.metrics.data.MetricData import org.typelevel.otel4s.sdk.metrics.data.MetricPoints @@ -45,36 +45,72 @@ import org.typelevel.otel4s.sdk.metrics.scalacheck.Arbitraries._ import scala.concurrent.duration._ -class OtlpHttpMetricExporterSuite extends CatsEffectSuite with ScalaCheckEffectSuite with SuiteRuntimePlatform { +class OtlpMetricExporterSuite extends CatsEffectSuite with ScalaCheckEffectSuite with SuiteRuntimePlatform { - import OtlpHttpMetricExporterSuite._ + import OtlpMetricExporterSuite._ - private implicit val encodingArbitrary: Arbitrary[HttpPayloadEncoding] = - Arbitrary(Gen.oneOf(HttpPayloadEncoding.Protobuf, HttpPayloadEncoding.Json)) + private implicit val protocolArbitrary: Arbitrary[OtlpProtocol] = + Arbitrary( + Gen.oneOf( + OtlpProtocol.httpJson, + OtlpProtocol.httpProtobuf, + OtlpProtocol.grpc + ) + ) - test("represent builder parameters in the name") { - PropF.forAllF { (encoding: HttpPayloadEncoding) => - val enc = encoding match { - case HttpPayloadEncoding.Json => "Json" - case HttpPayloadEncoding.Protobuf => "Protobuf" - } + private implicit val compressionArbitrary: Arbitrary[PayloadCompression] = + Arbitrary( + Gen.oneOf( + PayloadCompression.gzip, + PayloadCompression.none + ) + ) + test("represent builder parameters in the name") { + PropF.forAllF { (protocol: OtlpProtocol, compression: PayloadCompression) => val expected = - s"OtlpHttpMetricExporter{client=OtlpHttpClient{encoding=$enc, " + + s"OtlpMetricExporter{client=OtlpClient{protocol=$protocol, " + "endpoint=https://localhost:4318/api/v1/metrics, " + "timeout=5 seconds, " + - "gzipCompression=true, " + + s"compression=$compression, " + "headers={X-Forwarded-For: 127.0.0.1}}}" - OtlpHttpMetricExporter + OtlpMetricExporter .builder[IO] .addHeaders( Headers(`X-Forwarded-For`(IpAddress.fromString("127.0.0.1"))) ) .withEndpoint(uri"https://localhost:4318/api/v1/metrics") .withTimeout(5.seconds) - .withGzip - .withEncoding(encoding) + .withProtocol(protocol) + .withCompression(compression) + .build + .use { exporter => + IO(assertEquals(exporter.name, expected)) + } + } + } + + test("change endpoint according to the protocol") { + PropF.forAllF { (protocol: OtlpProtocol) => + val endpoint = protocol match { + case _: OtlpProtocol.Http => + "http://localhost:4318/v1/metrics" + + case OtlpProtocol.Grpc => + "http://localhost:4317/opentelemetry.proto.collector.metrics.v1.MetricsService/Export" + } + + val expected = + s"OtlpMetricExporter{client=OtlpClient{protocol=$protocol, " + + s"endpoint=$endpoint, " + + "timeout=10 seconds, " + + "compression=none, " + + "headers={}}}" + + OtlpMetricExporter + .builder[IO] + .withProtocol(protocol) .build .use { exporter => IO(assertEquals(exporter.name, expected)) @@ -83,7 +119,7 @@ class OtlpHttpMetricExporterSuite extends CatsEffectSuite with ScalaCheckEffectS } test("export metrics") { - PropF.forAllF { (md: MetricData, encoding: HttpPayloadEncoding) => + PropF.forAllF { (md: MetricData, protocol: OtlpProtocol, compression: PayloadCompression) => val metric = MetricData( md.resource, md.instrumentationScope, @@ -133,10 +169,19 @@ class OtlpHttpMetricExporterSuite extends CatsEffectSuite with ScalaCheckEffectS } } - OtlpHttpMetricExporter + val endpoint = protocol match { + case _: OtlpProtocol.Http => + uri"http://localhost:44318/v1/metrics" + + case OtlpProtocol.Grpc => + uri"http://localhost:44317/opentelemetry.proto.collector.metrics.v1.MetricsService/Export" + } + + OtlpMetricExporter .builder[IO] - .withEncoding(encoding) - .withEndpoint(uri"http://localhost:44318/v1/metrics") + .withProtocol(protocol) + .withCompression(compression) + .withEndpoint(endpoint) .withTimeout(20.seconds) .withRetryPolicy( RetryPolicy.builder @@ -176,10 +221,10 @@ class OtlpHttpMetricExporterSuite extends CatsEffectSuite with ScalaCheckEffectS for { body <- response.as[String] _ <- IO.println( - s"Cannot retrieve metrics due. Status ${response.status} body ${body}" + s"Cannot retrieve metrics due. Status ${response.status} body $body" ) } yield new RuntimeException( - s"Cannot retrieve metrics due. Status ${response.status} body ${body}" + s"Cannot retrieve metrics due. Status ${response.status} body $body" ) } .handleErrorWith { error => @@ -370,7 +415,7 @@ class OtlpHttpMetricExporterSuite extends CatsEffectSuite with ScalaCheckEffectS } -object OtlpHttpMetricExporterSuite { +object OtlpMetricExporterSuite { implicit val prometheusValueDecoder: Decoder[PrometheusValue] = Decoder.instance { cursor => diff --git a/sdk-exporter/metrics/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/metrics/autoconfigure/OtlpMetricExporterAutoConfigureSuite.scala b/sdk-exporter/metrics/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/metrics/autoconfigure/OtlpMetricExporterAutoConfigureSuite.scala index 047019f53..924d55be2 100644 --- a/sdk-exporter/metrics/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/metrics/autoconfigure/OtlpMetricExporterAutoConfigureSuite.scala +++ b/sdk-exporter/metrics/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/metrics/autoconfigure/OtlpMetricExporterAutoConfigureSuite.scala @@ -29,11 +29,11 @@ class OtlpMetricExporterAutoConfigureSuite extends CatsEffectSuite with SuiteRun val config = Config.ofProps(Map.empty) val expected = - "OtlpHttpMetricExporter{client=OtlpHttpClient{" + - "encoding=Protobuf, " + + "OtlpMetricExporter{client=OtlpClient{" + + "protocol=http/protobuf, " + "endpoint=http://localhost:4318/v1/metrics, " + "timeout=10 seconds, " + - "gzipCompression=false, " + + "compression=none, " + "headers={}}}" OtlpMetricExporterAutoConfigure[IO] @@ -50,11 +50,11 @@ class OtlpMetricExporterAutoConfigureSuite extends CatsEffectSuite with SuiteRun ) val expected = - "OtlpHttpMetricExporter{client=OtlpHttpClient{" + - "encoding=Protobuf, " + + "OtlpMetricExporter{client=OtlpClient{" + + "protocol=http/protobuf, " + "endpoint=http://localhost:4318/v1/metrics, " + "timeout=10 seconds, " + - "gzipCompression=false, " + + "compression=none, " + "headers={}}}" OtlpMetricExporterAutoConfigure[IO] @@ -71,11 +71,11 @@ class OtlpMetricExporterAutoConfigureSuite extends CatsEffectSuite with SuiteRun ) val expected = - "OtlpHttpMetricExporter{client=OtlpHttpClient{" + - "encoding=Json, " + + "OtlpMetricExporter{client=OtlpClient{" + + "protocol=http/json, " + "endpoint=http://localhost:4318/v1/metrics, " + "timeout=10 seconds, " + - "gzipCompression=false, " + + "compression=none, " + "headers={}}}" OtlpMetricExporterAutoConfigure[IO] @@ -90,11 +90,11 @@ class OtlpMetricExporterAutoConfigureSuite extends CatsEffectSuite with SuiteRun ) val expected = - "OtlpHttpMetricExporter{client=OtlpHttpClient{" + - "encoding=Protobuf, " + + "OtlpMetricExporter{client=OtlpClient{" + + "protocol=http/protobuf, " + "endpoint=http://localhost:4318/v1/metrics, " + "timeout=10 seconds, " + - "gzipCompression=false, " + + "compression=none, " + "headers={}}}" endpoints.traverse_ { endpoint => @@ -109,7 +109,7 @@ class OtlpMetricExporterAutoConfigureSuite extends CatsEffectSuite with SuiteRun } test("load from the config - unknown protocol - fail") { - val config = Config.ofProps(Map("otel.exporter.otlp.protocol" -> "grpc")) + val config = Config.ofProps(Map("otel.exporter.otlp.protocol" -> "mqtt")) OtlpMetricExporterAutoConfigure[IO] .configure(config) @@ -117,11 +117,19 @@ class OtlpMetricExporterAutoConfigureSuite extends CatsEffectSuite with SuiteRun .attempt .map(_.leftMap(_.getMessage)) .assertEquals( - Left("""Cannot autoconfigure [Protocol]. - |Cause: Unrecognized protocol [grpc]. Supported options [http/json, http/protobuf]. + Left("""Cannot autoconfigure [OtlpClient]. + |Cause: Unrecognized protocol [mqtt]. Supported options [http/json, http/protobuf, grpc]. |Config: - |1) `otel.exporter.otlp.metrics.protocol` - N/A - |2) `otel.exporter.otlp.protocol` - grpc""".stripMargin) + |1) `otel.exporter.otlp.compression` - N/A + |2) `otel.exporter.otlp.endpoint` - N/A + |3) `otel.exporter.otlp.headers` - N/A + |4) `otel.exporter.otlp.metrics.compression` - N/A + |5) `otel.exporter.otlp.metrics.endpoint` - N/A + |6) `otel.exporter.otlp.metrics.headers` - N/A + |7) `otel.exporter.otlp.metrics.protocol` - N/A + |8) `otel.exporter.otlp.metrics.timeout` - N/A + |9) `otel.exporter.otlp.protocol` - mqtt + |10) `otel.exporter.otlp.timeout` - N/A""".stripMargin) ) } diff --git a/sdk-exporter/trace/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/OtlpHttpSpanExporter.scala b/sdk-exporter/trace/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/OtlpSpanExporter.scala similarity index 68% rename from sdk-exporter/trace/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/OtlpHttpSpanExporter.scala rename to sdk-exporter/trace/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/OtlpSpanExporter.scala index dd8a8e218..be839da9a 100644 --- a/sdk-exporter/trace/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/OtlpHttpSpanExporter.scala +++ b/sdk-exporter/trace/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/OtlpSpanExporter.scala @@ -35,18 +35,26 @@ import org.typelevel.otel4s.sdk.trace.exporter.SpanExporter import scala.concurrent.duration._ -/** Exports spans via HTTP. Supports `json` and `protobuf` encodings. +/** Exports spans using underlying OTLP client. + * + * Supported protocols: + * - `grpc` + * - `http/json` + * - `http/protobuf` * * @see * [[https://opentelemetry.io/docs/specs/otel/protocol/exporter/]] * * @see * [[https://opentelemetry.io/docs/concepts/sdk-configuration/otlp-exporter-configuration/]] + * + * @see + * [[https://github.com/open-telemetry/opentelemetry-proto/blob/v1.3.2/opentelemetry/proto/collector/trace/v1/trace_service.proto]] */ -private final class OtlpHttpSpanExporter[F[_]: Applicative] private[otlp] ( - client: OtlpHttpClient[F, SpanData] +private final class OtlpSpanExporter[F[_]: Applicative] private[otlp] ( + client: OtlpClient[F, SpanData] ) extends SpanExporter[F] { - val name: String = s"OtlpHttpSpanExporter{client=$client}" + val name: String = s"OtlpSpanExporter{client=$client}" def exportSpans[G[_]: Foldable](spans: G[SpanData]): F[Unit] = client.doExport(spans) @@ -54,15 +62,17 @@ private final class OtlpHttpSpanExporter[F[_]: Applicative] private[otlp] ( def flush: F[Unit] = Applicative[F].unit } -object OtlpHttpSpanExporter { +object OtlpSpanExporter { private[otlp] object Defaults { - val Endpoint: Uri = uri"http://localhost:4318/v1/traces" + val Protocol: OtlpProtocol = OtlpProtocol.httpProtobuf + val HttpEndpoint: Uri = uri"http://localhost:4318/v1/traces" + val GrpcEndpoint: Uri = uri"http://localhost:4317/opentelemetry.proto.collector.trace.v1.TraceService/Export" val Timeout: FiniteDuration = 10.seconds - val GzipCompression: Boolean = false + val Compression: PayloadCompression = PayloadCompression.none } - /** A builder of [[OtlpHttpSpanExporter]] */ + /** A builder of [[OtlpSpanExporter]] */ sealed trait Builder[F[_]] { /** Sets the OTLP endpoint to connect to. @@ -79,14 +89,11 @@ object OtlpHttpSpanExporter { */ def withTimeout(timeout: FiniteDuration): Builder[F] - /** Enables Gzip compression. + /** Sets the compression to use. * - * The compression is disabled by default. + * Default protocol is [[PayloadCompression.none]]. */ - def withGzip: Builder[F] - - /** Disables Gzip compression. */ - def withoutGzip: Builder[F] + def withCompression(compression: PayloadCompression): Builder[F] /** Adds headers to requests. */ def addHeaders(headers: Headers): Builder[F] @@ -103,9 +110,9 @@ object OtlpHttpSpanExporter { /** Configures the exporter to use the given encoding. * - * Default encoding is `Protobuf`. + * Default protocol is [[OtlpProtocol.httpProtobuf]]. */ - def withEncoding(encoding: HttpPayloadEncoding): Builder[F] + def withProtocol(protocol: OtlpProtocol): Builder[F] /** Configures the exporter to use the given client. * @@ -117,22 +124,22 @@ object OtlpHttpSpanExporter { */ def withClient(client: Client[F]): Builder[F] - /** Creates a [[OtlpHttpSpanExporter]] using the configuration of this builder. + /** Creates a [[OtlpSpanExporter]] using the configuration of this builder. */ def build: Resource[F, SpanExporter[F]] } - /** Creates a [[Builder]] of [[OtlpHttpSpanExporter]] with the default configuration: - * - encoding: `Protobuf` + /** Creates a [[Builder]] of [[OtlpSpanExporter]] with the default configuration: + * - protocol: `http/protobuf` * - endpoint: `http://localhost:4318/v1/traces` * - timeout: `10 seconds` * - retry policy: 5 exponential attempts, initial backoff is `1 second`, max backoff is `5 seconds` */ def builder[F[_]: Async: Network: Compression: Console]: Builder[F] = BuilderImpl( - encoding = HttpPayloadEncoding.Protobuf, - endpoint = Defaults.Endpoint, - gzipCompression = Defaults.GzipCompression, + protocol = Defaults.Protocol, + endpoint = None, + compression = Defaults.Compression, timeout = Defaults.Timeout, headers = Headers.empty, retryPolicy = RetryPolicy.default, @@ -143,9 +150,9 @@ object OtlpHttpSpanExporter { private final case class BuilderImpl[ F[_]: Async: Network: Compression: Console ]( - encoding: HttpPayloadEncoding, - endpoint: Uri, - gzipCompression: Boolean, + protocol: OtlpProtocol, + endpoint: Option[Uri], + compression: PayloadCompression, timeout: FiniteDuration, headers: Headers, retryPolicy: RetryPolicy, @@ -157,16 +164,13 @@ object OtlpHttpSpanExporter { copy(timeout = timeout) def withEndpoint(endpoint: Uri): Builder[F] = - copy(endpoint = endpoint) + copy(endpoint = Some(endpoint)) def addHeaders(headers: Headers): Builder[F] = copy(headers = this.headers ++ headers) - def withGzip: Builder[F] = - copy(gzipCompression = true) - - def withoutGzip: Builder[F] = - copy(gzipCompression = false) + def withCompression(compression: PayloadCompression): Builder[F] = + copy(compression = compression) def withTLSContext(context: TLSContext[F]): Builder[F] = copy(tlsContext = Some(context)) @@ -174,8 +178,8 @@ object OtlpHttpSpanExporter { def withRetryPolicy(policy: RetryPolicy): Builder[F] = copy(retryPolicy = policy) - def withEncoding(encoding: HttpPayloadEncoding): Builder[F] = - copy(encoding = encoding) + def withProtocol(protocol: OtlpProtocol): Builder[F] = + copy(protocol = protocol) def withClient(client: Client[F]): Builder[F] = copy(client = Some(client)) @@ -184,18 +188,25 @@ object OtlpHttpSpanExporter { import SpansProtoEncoder.spanDataToRequest import SpansProtoEncoder.jsonPrinter + val endpoint = this.endpoint.getOrElse { + protocol match { + case _: OtlpProtocol.Http => Defaults.HttpEndpoint + case OtlpProtocol.Grpc => Defaults.GrpcEndpoint + } + } + for { - client <- OtlpHttpClient.create[F, SpanData]( - encoding, + client <- OtlpClient.create[F, SpanData]( + protocol, endpoint, - timeout, headers, - gzipCompression, + compression, + timeout, retryPolicy, tlsContext, client ) - } yield new OtlpHttpSpanExporter[F](client) + } yield new OtlpSpanExporter[F](client) } } diff --git a/sdk-exporter/trace/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/autoconfigure/OtlpSpanExporterAutoConfigure.scala b/sdk-exporter/trace/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/autoconfigure/OtlpSpanExporterAutoConfigure.scala index 4045ed786..6f7969615 100644 --- a/sdk-exporter/trace/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/autoconfigure/OtlpSpanExporterAutoConfigure.scala +++ b/sdk-exporter/trace/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/autoconfigure/OtlpSpanExporterAutoConfigure.scala @@ -26,19 +26,14 @@ import org.http4s.Headers import org.http4s.client.Client import org.typelevel.otel4s.sdk.autoconfigure.AutoConfigure import org.typelevel.otel4s.sdk.autoconfigure.Config -import org.typelevel.otel4s.sdk.exporter.otlp.Protocol -import org.typelevel.otel4s.sdk.exporter.otlp.autoconfigure.OtlpHttpClientAutoConfigure -import org.typelevel.otel4s.sdk.exporter.otlp.autoconfigure.ProtocolAutoConfigure +import org.typelevel.otel4s.sdk.exporter.otlp.autoconfigure.OtlpClientAutoConfigure import org.typelevel.otel4s.sdk.trace.data.SpanData import org.typelevel.otel4s.sdk.trace.exporter.SpanExporter /** Autoconfigures OTLP [[org.typelevel.otel4s.sdk.trace.exporter.SpanExporter SpanExporter]]. * * @see - * [[ProtocolAutoConfigure]] for OTLP protocol configuration - * - * @see - * [[OtlpHttpClientAutoConfigure]] for OTLP HTTP client configuration + * [[OtlpClientAutoConfigure]] for OTLP client configuration * * @see * [[https://opentelemetry.io/docs/languages/sdk-configuration/otlp-exporter/#otel_exporter_otlp_protocol]] @@ -54,24 +49,24 @@ private final class OtlpSpanExporterAutoConfigure[ def name: String = "otlp" - protected def fromConfig(config: Config): Resource[F, SpanExporter[F]] = - ProtocolAutoConfigure.traces[F].configure(config).flatMap { case Protocol.Http(encoding) => - import SpansProtoEncoder.spanDataToRequest - import SpansProtoEncoder.jsonPrinter + protected def fromConfig(config: Config): Resource[F, SpanExporter[F]] = { + import SpansProtoEncoder.spanDataToRequest + import SpansProtoEncoder.jsonPrinter - val defaults = OtlpHttpClientAutoConfigure.Defaults( - OtlpHttpSpanExporter.Defaults.Endpoint, - OtlpHttpSpanExporter.Defaults.Endpoint.path.toString, - Headers.empty, - OtlpHttpSpanExporter.Defaults.Timeout, - encoding - ) + val defaults = OtlpClientAutoConfigure.Defaults( + OtlpSpanExporter.Defaults.Protocol, + OtlpSpanExporter.Defaults.HttpEndpoint, + OtlpSpanExporter.Defaults.HttpEndpoint.path.toString, + Headers.empty, + OtlpSpanExporter.Defaults.Timeout, + OtlpSpanExporter.Defaults.Compression + ) - OtlpHttpClientAutoConfigure - .traces[F, SpanData](defaults, customClient) - .configure(config) - .map(client => new OtlpHttpSpanExporter[F](client)) - } + OtlpClientAutoConfigure + .traces[F, SpanData](defaults, customClient) + .configure(config) + .map(client => new OtlpSpanExporter(client)) + } } @@ -81,7 +76,10 @@ object OtlpSpanExporterAutoConfigure { * * The configuration depends on the `otel.exporter.otlp.protocol` or `otel.exporter.otlp.traces.protocol`. * - * The supported protocols: `http/json`, `http/protobuf`. + * Supported protocols: + * - `grpc` + * - `http/json` + * - `http/protobuf` * * @see * `OtlpHttpClientAutoConfigure` for the configuration details of the OTLP HTTP client @@ -95,7 +93,10 @@ object OtlpSpanExporterAutoConfigure { * * The configuration depends on the `otel.exporter.otlp.protocol` or `otel.exporter.otlp.traces.protocol`. * - * The supported protocols: `http/json`, `http/protobuf`. + * Supported protocols: + * - `grpc` + * - `http/json` + * - `http/protobuf` * * @see * `OtlpHttpClientAutoConfigure` for the configuration details of the OTLP HTTP client diff --git a/sdk-exporter/trace/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/OtlpHttpSpanExporterSuite.scala b/sdk-exporter/trace/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/OtlpSpanExporterSuite.scala similarity index 82% rename from sdk-exporter/trace/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/OtlpHttpSpanExporterSuite.scala rename to sdk-exporter/trace/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/OtlpSpanExporterSuite.scala index 27a748789..7b632155d 100644 --- a/sdk-exporter/trace/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/OtlpHttpSpanExporterSuite.scala +++ b/sdk-exporter/trace/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/OtlpSpanExporterSuite.scala @@ -43,36 +43,72 @@ import org.typelevel.otel4s.trace.StatusCode import java.util.Locale import scala.concurrent.duration._ -class OtlpHttpSpanExporterSuite extends CatsEffectSuite with ScalaCheckEffectSuite with SuiteRuntimePlatform { - - import OtlpHttpSpanExporterSuite._ - - private implicit val encodingArbitrary: Arbitrary[HttpPayloadEncoding] = - Arbitrary(Gen.oneOf(HttpPayloadEncoding.Protobuf, HttpPayloadEncoding.Json)) +class OtlpSpanExporterSuite extends CatsEffectSuite with ScalaCheckEffectSuite with SuiteRuntimePlatform { + + import OtlpSpanExporterSuite._ + + private implicit val protocolArbitrary: Arbitrary[OtlpProtocol] = + Arbitrary( + Gen.oneOf( + OtlpProtocol.httpJson, + OtlpProtocol.httpProtobuf, + OtlpProtocol.grpc + ) + ) + + private implicit val compressionArbitrary: Arbitrary[PayloadCompression] = + Arbitrary( + Gen.oneOf( + PayloadCompression.gzip, + PayloadCompression.none + ) + ) test("represent builder parameters in the name") { - PropF.forAllF { (encoding: HttpPayloadEncoding) => - val enc = encoding match { - case HttpPayloadEncoding.Json => "Json" - case HttpPayloadEncoding.Protobuf => "Protobuf" - } - + PropF.forAllF { (protocol: OtlpProtocol, compression: PayloadCompression) => val expected = - s"OtlpHttpSpanExporter{client=OtlpHttpClient{encoding=$enc, " + + s"OtlpSpanExporter{client=OtlpClient{protocol=$protocol, " + "endpoint=https://localhost:4318/api/v1/traces, " + "timeout=5 seconds, " + - "gzipCompression=true, " + + s"compression=$compression, " + "headers={X-Forwarded-For: 127.0.0.1}}}" - OtlpHttpSpanExporter + OtlpSpanExporter .builder[IO] .addHeaders( Headers(`X-Forwarded-For`(IpAddress.fromString("127.0.0.1"))) ) .withEndpoint(uri"https://localhost:4318/api/v1/traces") .withTimeout(5.seconds) - .withGzip - .withEncoding(encoding) + .withProtocol(protocol) + .withCompression(compression) + .build + .use { exporter => + IO(assertEquals(exporter.name, expected)) + } + } + } + + test("change endpoint according to the protocol") { + PropF.forAllF { (protocol: OtlpProtocol) => + val endpoint = protocol match { + case _: OtlpProtocol.Http => + "http://localhost:4318/v1/traces" + + case OtlpProtocol.Grpc => + "http://localhost:4317/opentelemetry.proto.collector.trace.v1.TraceService/Export" + } + + val expected = + s"OtlpSpanExporter{client=OtlpClient{protocol=$protocol, " + + s"endpoint=$endpoint, " + + "timeout=10 seconds, " + + "compression=none, " + + "headers={}}}" + + OtlpSpanExporter + .builder[IO] + .withProtocol(protocol) .build .use { exporter => IO(assertEquals(exporter.name, expected)) @@ -81,7 +117,7 @@ class OtlpHttpSpanExporterSuite extends CatsEffectSuite with ScalaCheckEffectSui } test("export spans") { - PropF.forAllF { (sd: SpanData, encoding: HttpPayloadEncoding) => + PropF.forAllF { (sd: SpanData, protocol: OtlpProtocol, compression: PayloadCompression) => IO.realTime.flatMap { now => // we need to tweak end timestamps and attributes, so we recreate the span data val span = SpanData( @@ -183,9 +219,10 @@ class OtlpHttpSpanExporterSuite extends CatsEffectSuite with ScalaCheckEffectSui JaegerResponse(List(jaegerTrace)) } - OtlpHttpSpanExporter + OtlpSpanExporter .builder[IO] - .withEncoding(encoding) + .withProtocol(protocol) + .withCompression(compression) .withTimeout(20.seconds) .withRetryPolicy( RetryPolicy.builder @@ -259,7 +296,7 @@ class OtlpHttpSpanExporterSuite extends CatsEffectSuite with ScalaCheckEffectSui } -object OtlpHttpSpanExporterSuite { +object OtlpSpanExporterSuite { case class JaegerRef(refType: String, traceID: String, spanID: String) case class JaegerTag(key: String, `type`: String, value: Json) case class JaegerLog(timestamp: Long) diff --git a/sdk-exporter/trace/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/autoconfigure/OtlpSpanExporterAutoConfigureSuite.scala b/sdk-exporter/trace/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/autoconfigure/OtlpSpanExporterAutoConfigureSuite.scala index d4f55610d..a92ae6bb9 100644 --- a/sdk-exporter/trace/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/autoconfigure/OtlpSpanExporterAutoConfigureSuite.scala +++ b/sdk-exporter/trace/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/autoconfigure/OtlpSpanExporterAutoConfigureSuite.scala @@ -29,11 +29,11 @@ class OtlpSpanExporterAutoConfigureSuite extends CatsEffectSuite with SuiteRunti val config = Config.ofProps(Map.empty) val expected = - "OtlpHttpSpanExporter{client=OtlpHttpClient{" + - "encoding=Protobuf, " + + "OtlpSpanExporter{client=OtlpClient{" + + "protocol=http/protobuf, " + "endpoint=http://localhost:4318/v1/traces, " + "timeout=10 seconds, " + - "gzipCompression=false, " + + "compression=none, " + "headers={}}}" OtlpSpanExporterAutoConfigure[IO] @@ -50,11 +50,11 @@ class OtlpSpanExporterAutoConfigureSuite extends CatsEffectSuite with SuiteRunti ) val expected = - "OtlpHttpSpanExporter{client=OtlpHttpClient{" + - "encoding=Protobuf, " + + "OtlpSpanExporter{client=OtlpClient{" + + "protocol=http/protobuf, " + "endpoint=http://localhost:4318/v1/traces, " + "timeout=10 seconds, " + - "gzipCompression=false, " + + "compression=none, " + "headers={}}}" OtlpSpanExporterAutoConfigure[IO] @@ -71,11 +71,11 @@ class OtlpSpanExporterAutoConfigureSuite extends CatsEffectSuite with SuiteRunti ) val expected = - "OtlpHttpSpanExporter{client=OtlpHttpClient{" + - "encoding=Json, " + + "OtlpSpanExporter{client=OtlpClient{" + + "protocol=http/json, " + "endpoint=http://localhost:4318/v1/traces, " + "timeout=10 seconds, " + - "gzipCompression=false, " + + "compression=none, " + "headers={}}}" OtlpSpanExporterAutoConfigure[IO] @@ -90,11 +90,11 @@ class OtlpSpanExporterAutoConfigureSuite extends CatsEffectSuite with SuiteRunti ) val expected = - "OtlpHttpSpanExporter{client=OtlpHttpClient{" + - "encoding=Protobuf, " + + "OtlpSpanExporter{client=OtlpClient{" + + "protocol=http/protobuf, " + "endpoint=http://localhost:4318/v1/traces, " + "timeout=10 seconds, " + - "gzipCompression=false, " + + "compression=none, " + "headers={}}}" endpoints.traverse_ { endpoint => @@ -109,7 +109,7 @@ class OtlpSpanExporterAutoConfigureSuite extends CatsEffectSuite with SuiteRunti } test("load from the config - unknown protocol - fail") { - val config = Config.ofProps(Map("otel.exporter.otlp.protocol" -> "grpc")) + val config = Config.ofProps(Map("otel.exporter.otlp.protocol" -> "mqtt")) OtlpSpanExporterAutoConfigure[IO] .configure(config) @@ -117,11 +117,19 @@ class OtlpSpanExporterAutoConfigureSuite extends CatsEffectSuite with SuiteRunti .attempt .map(_.leftMap(_.getMessage)) .assertEquals( - Left("""Cannot autoconfigure [Protocol]. - |Cause: Unrecognized protocol [grpc]. Supported options [http/json, http/protobuf]. + Left("""Cannot autoconfigure [OtlpClient]. + |Cause: Unrecognized protocol [mqtt]. Supported options [http/json, http/protobuf, grpc]. |Config: - |1) `otel.exporter.otlp.protocol` - grpc - |2) `otel.exporter.otlp.traces.protocol` - N/A""".stripMargin) + |1) `otel.exporter.otlp.compression` - N/A + |2) `otel.exporter.otlp.endpoint` - N/A + |3) `otel.exporter.otlp.headers` - N/A + |4) `otel.exporter.otlp.protocol` - mqtt + |5) `otel.exporter.otlp.timeout` - N/A + |6) `otel.exporter.otlp.traces.compression` - N/A + |7) `otel.exporter.otlp.traces.endpoint` - N/A + |8) `otel.exporter.otlp.traces.headers` - N/A + |9) `otel.exporter.otlp.traces.protocol` - N/A + |10) `otel.exporter.otlp.traces.timeout` - N/A""".stripMargin) ) }