diff --git a/kinesis-client/src/main/scala/kinesis4cats/client/producer/metrics/CloudwatchMetricsReporter.scala b/kinesis-client/src/main/scala/kinesis4cats/client/producer/metrics/CloudwatchMetricsReporter.scala new file mode 100644 index 00000000..f7a1195b --- /dev/null +++ b/kinesis-client/src/main/scala/kinesis4cats/client/producer/metrics/CloudwatchMetricsReporter.scala @@ -0,0 +1,58 @@ +package kinesis4cats.client.producer.metrics + +import cats.effect.Async +import fs2.Chunk +import fs2.concurrent.Channel +import org.typelevel.log4cats.StructuredLogger +import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient +import software.amazon.awssdk.services.cloudwatch.model.Dimension +import software.amazon.awssdk.services.cloudwatch.model.MetricDatum +import software.amazon.awssdk.services.cloudwatch.model.PutMetricDataRequest +import software.amazon.awssdk.services.cloudwatch.model.PutMetricDataResponse + +import kinesis4cats.producer.metrics + +class CloudwatchMetricsReporter[F[_]]( + client: CloudWatchAsyncClient, + namespace: String, + encoders: metrics.MetricsReporter.LogEncoders, + override val logger: StructuredLogger[F], + override val config: metrics.MetricsReporter.Config[F], + override protected val channel: Channel[F, metrics.Metric] +)( + override protected val callback: PutMetricDataResponse => F[Unit] +)(implicit F: Async[F]) + extends metrics.MetricsReporter[F, PutMetricDataResponse](encoders) { + + override def _put(x: Chunk[metrics.Metric]): F[PutMetricDataResponse] = F + .fromCompletableFuture( + F.delay( + client.putMetricData( + PutMetricDataRequest + .builder() + .namespace(namespace) + .metricData(x.toList.map(CloudwatchMetricsReporter.asAws): _*) + .build() + ) + ) + ) + +} + +object CloudwatchMetricsReporter { + private def asAwsDimension(dimension: metrics.Dimension): Dimension = + Dimension + .builder() + .name(dimension.name) + .value(dimension.value) + .build() + + def asAws(metric: metrics.Metric): MetricDatum = MetricDatum + .builder() + .metricName(metric.name) + .timestamp(metric.timestamp) + .dimensions(metric.dimensions.map(asAwsDimension): _*) + .unit(metric.unit.value) + .value(metric.value) + .build() +} diff --git a/shared/src/main/scala/kinesis4cats/producer/fs2/FS2Producer.scala b/shared/src/main/scala/kinesis4cats/producer/fs2/FS2Producer.scala index 8296e476..1f2e5e10 100644 --- a/shared/src/main/scala/kinesis4cats/producer/fs2/FS2Producer.scala +++ b/shared/src/main/scala/kinesis4cats/producer/fs2/FS2Producer.scala @@ -142,11 +142,6 @@ object FS2Producer { * Size of underlying buffer of records * @param putMaxChunk * Max records to buffer before running a put request - * @param putMaxWait - * Max time to wait before running a put request - * @param putMaxRetries - * Number of retries for the underlying put request. None means infinite - * retries. * @param putRetryInterval * Delay between retries * @param producerConfig @@ -156,8 +151,6 @@ object FS2Producer { queueSize: Int, putMaxChunk: Int, putMaxWait: FiniteDuration, - putMaxRetries: Option[Int], - putRetryInterval: FiniteDuration, producerConfig: Producer.Config[F], gracefulShutdownWait: FiniteDuration ) @@ -169,8 +162,6 @@ object FS2Producer { 1000, 500, 100.millis, - Some(5), - 0.seconds, Producer.Config.default[F](streamNameOrArn), 30.seconds ) diff --git a/shared/src/main/scala/kinesis4cats/producer/metrics/Dimension.scala b/shared/src/main/scala/kinesis4cats/producer/metrics/Dimension.scala new file mode 100644 index 00000000..2eb6d0df --- /dev/null +++ b/shared/src/main/scala/kinesis4cats/producer/metrics/Dimension.scala @@ -0,0 +1,16 @@ +package kinesis4cats.producer.metrics + +import kinesis4cats.models.ShardId +import kinesis4cats.models.StreamArn + +sealed abstract class Dimension(val name: String, val value: String) + +object Dimension { + final case class Stream(arn: StreamArn) + extends Dimension("StreamARN", arn.streamArn) + + final case class Shard(id: ShardId) extends Dimension("ShardID", id.shardId) + + final case class ErrorCode(override val value: String) + extends Dimension("ErrorCode", value) +} diff --git a/shared/src/main/scala/kinesis4cats/producer/metrics/Granularity.scala b/shared/src/main/scala/kinesis4cats/producer/metrics/Granularity.scala new file mode 100644 index 00000000..5903092b --- /dev/null +++ b/shared/src/main/scala/kinesis4cats/producer/metrics/Granularity.scala @@ -0,0 +1,9 @@ +package kinesis4cats.producer.metrics + +sealed abstract class Granularity(val value: String) + +object Granularity { + case object Global extends Granularity("GLOBAL") + case object Stream extends Granularity("STREAM") + case object Shard extends Granularity("SHARD") +} diff --git a/shared/src/main/scala/kinesis4cats/producer/metrics/Level.scala b/shared/src/main/scala/kinesis4cats/producer/metrics/Level.scala new file mode 100644 index 00000000..d60766e9 --- /dev/null +++ b/shared/src/main/scala/kinesis4cats/producer/metrics/Level.scala @@ -0,0 +1,21 @@ +package kinesis4cats.producer.metrics + +sealed abstract class Level(val value: String) { + def isDetailed: Boolean + def isSummary: Boolean +} + +object Level { + case object None extends Level("NONE") { + override def isDetailed: Boolean = false + override def isSummary: Boolean = false + } + case object Summary extends Level("SUMMARY") { + override def isDetailed: Boolean = false + override def isSummary: Boolean = true + } + case object Detailed extends Level("DETAILED") { + override def isDetailed: Boolean = true + override def isSummary: Boolean = true + } +} diff --git a/shared/src/main/scala/kinesis4cats/producer/metrics/Metric.scala b/shared/src/main/scala/kinesis4cats/producer/metrics/Metric.scala new file mode 100644 index 00000000..a7b28a90 --- /dev/null +++ b/shared/src/main/scala/kinesis4cats/producer/metrics/Metric.scala @@ -0,0 +1,363 @@ +package kinesis4cats.producer.metrics + +import scala.concurrent.duration.FiniteDuration + +import java.time.Instant + +import cats.effect.Async +import cats.syntax.all._ + +import kinesis4cats.models.ShardId +import kinesis4cats.models.StreamArn + +final case class Metric( + name: String, + dimensions: List[Dimension], + timestamp: Instant, + value: Double, + unit: StandardUnit +) + +object Metric { + private def getDimensions( + granularity: Granularity, + streamArn: StreamArn, + shardId: ShardId, + includeShard: Boolean = true + ): List[Dimension] = granularity match { + case Granularity.Global => Nil + case Granularity.Stream => List(Dimension.Stream(streamArn)) + case Granularity.Shard => + if (includeShard) + List(Dimension.Stream(streamArn), Dimension.Shard(shardId)) + else List(Dimension.Stream(streamArn)) + } + + def userRecordsReceived[F[_]]( + x: Int, + granularity: Granularity, + level: Level, + streamArn: StreamArn, + shardId: ShardId + )(implicit F: Async[F]): F[Option[Metric]] = + if (level.isDetailed) + F.realTime + .map(d => Instant.EPOCH.plusNanos(d.toNanos)) + .map(now => + Some( + Metric( + "UserRecordsReceived", + getDimensions(granularity, streamArn, shardId, false), + now, + x.toDouble, + StandardUnit.Count + ) + ) + ) + else F.pure(None) + + def userRecordsPending[F[_]]( + x: Int, + granularity: Granularity, + level: Level, + streamArn: StreamArn, + shardId: ShardId + )(implicit F: Async[F]): F[Option[Metric]] = + if (level.isDetailed) + F.realTime + .map(d => Instant.EPOCH.plusNanos(d.toNanos)) + .map(now => + Some( + Metric( + "UserRecordsPending", + getDimensions(granularity, streamArn, shardId, false), + now, + x.toDouble, + StandardUnit.Count + ) + ) + ) + else F.pure(None) + + def userRecordsPut[F[_]]( + x: Int, + granularity: Granularity, + level: Level, + streamArn: StreamArn, + shardId: ShardId + )(implicit F: Async[F]): F[Option[Metric]] = + if (level.isSummary) + F.realTime + .map(d => Instant.EPOCH.plusNanos(d.toNanos)) + .map(now => + Some( + Metric( + "UserRecordsPut", + getDimensions(granularity, streamArn, shardId), + now, + x.toDouble, + StandardUnit.Count + ) + ) + ) + else F.pure(None) + + def userRecordsDataPut[F[_]]( + x: Int, + granularity: Granularity, + level: Level, + streamArn: StreamArn, + shardId: ShardId + )(implicit F: Async[F]): F[Option[Metric]] = + if (level.isDetailed) + F.realTime + .map(d => Instant.EPOCH.plusNanos(d.toNanos)) + .map(now => + Some( + Metric( + "UserRecordsDataPut", + getDimensions(granularity, streamArn, shardId), + now, + x.toDouble, + StandardUnit.Bytes + ) + ) + ) + else F.pure(None) + + def kinesisRecordsPut[F[_]]( + x: Int, + granularity: Granularity, + level: Level, + streamArn: StreamArn, + shardId: ShardId + )(implicit F: Async[F]): F[Option[Metric]] = + if (level.isSummary) + F.realTime + .map(d => Instant.EPOCH.plusNanos(d.toNanos)) + .map(now => + Some( + Metric( + "KinesisRecordsPut", + getDimensions(granularity, streamArn, shardId), + now, + x.toDouble, + StandardUnit.Count + ) + ) + ) + else F.pure(None) + + def kinesisRecordsDataPut[F[_]]( + x: Int, + granularity: Granularity, + level: Level, + streamArn: StreamArn, + shardId: ShardId + )(implicit F: Async[F]): F[Option[Metric]] = + if (level.isDetailed) + F.realTime + .map(d => Instant.EPOCH.plusNanos(d.toNanos)) + .map(now => + Some( + Metric( + "KinesisRecordsDataPut", + getDimensions(granularity, streamArn, shardId), + now, + x.toDouble, + StandardUnit.Bytes + ) + ) + ) + else F.pure(None) + + def errorsByCode[F[_]]( + x: Int, + code: String, + granularity: Granularity, + level: Level, + streamArn: StreamArn, + shardId: ShardId + )(implicit F: Async[F]): F[Option[Metric]] = + if (level.isSummary) + F.realTime + .map(d => Instant.EPOCH.plusNanos(d.toNanos)) + .map(now => + Some( + Metric( + "ErrorsByCode", + Dimension + .ErrorCode(code) +: getDimensions( + granularity, + streamArn, + shardId + ), + now, + x.toDouble, + StandardUnit.Count + ) + ) + ) + else F.pure(None) + + def allErrors[F[_]]( + x: Int, + granularity: Granularity, + level: Level, + streamArn: StreamArn, + shardId: ShardId + )(implicit F: Async[F]): F[Option[Metric]] = + if (level.isSummary) + F.realTime + .map(d => Instant.EPOCH.plusNanos(d.toNanos)) + .map(now => + Some( + Metric( + "AllErrors", + getDimensions(granularity, streamArn, shardId), + now, + x.toDouble, + StandardUnit.Count + ) + ) + ) + else F.pure(None) + + def retriesPerRecord[F[_]]( + x: Int, + granularity: Granularity, + level: Level, + streamArn: StreamArn, + shardId: ShardId + )(implicit F: Async[F]): F[Option[Metric]] = + if (level.isDetailed) + F.realTime + .map(d => Instant.EPOCH.plusNanos(d.toNanos)) + .map(now => + Some( + Metric( + "RetriesPerRecord", + getDimensions(granularity, streamArn, shardId), + now, + x.toDouble, + StandardUnit.Count + ) + ) + ) + else F.pure(None) + + def bufferingTime[F[_]]( + x: FiniteDuration, + granularity: Granularity, + level: Level, + streamArn: StreamArn, + shardId: ShardId + )(implicit F: Async[F]): F[Option[Metric]] = + if (level.isSummary) + F.realTime + .map(d => Instant.EPOCH.plusNanos(d.toNanos)) + .map(now => + Some( + Metric( + "BufferingTime", + getDimensions(granularity, streamArn, shardId), + now, + x.toMillis.toDouble, + StandardUnit.Milliseconds + ) + ) + ) + else F.pure(None) + + def requestTime[F[_]]( + x: FiniteDuration, + granularity: Granularity, + level: Level, + streamArn: StreamArn, + shardId: ShardId + )(implicit F: Async[F]): F[Option[Metric]] = + if (level.isDetailed) + F.realTime + .map(d => Instant.EPOCH.plusNanos(d.toNanos)) + .map(now => + Some( + Metric( + "Request Time", + getDimensions(granularity, streamArn, shardId), + now, + x.toMillis.toDouble, + StandardUnit.Milliseconds + ) + ) + ) + else F.pure(None) + + def userRecordsPerKinesisRecord[F[_]]( + x: Int, + granularity: Granularity, + level: Level, + streamArn: StreamArn, + shardId: ShardId + )(implicit F: Async[F]): F[Option[Metric]] = + if (level.isDetailed) + F.realTime + .map(d => Instant.EPOCH.plusNanos(d.toNanos)) + .map(now => + Some( + Metric( + "User Records per Kinesis Record", + getDimensions(granularity, streamArn, shardId), + now, + x.toDouble, + StandardUnit.Count + ) + ) + ) + else F.pure(None) + + def amazonKinesisRecordsPerPutRecordsRequest[F[_]]( + x: Int, + granularity: Granularity, + level: Level, + streamArn: StreamArn, + shardId: ShardId + )(implicit F: Async[F]): F[Option[Metric]] = + if (level.isDetailed) + F.realTime + .map(d => Instant.EPOCH.plusNanos(d.toNanos)) + .map(now => + Some( + Metric( + "Amazon Kinesis Records per PutRecordsRequest", + getDimensions(granularity, streamArn, shardId, false), + now, + x.toDouble, + StandardUnit.Count + ) + ) + ) + else F.pure(None) + + def userRecordsPerPutRecordsRequest[F[_]]( + x: Int, + granularity: Granularity, + level: Level, + streamArn: StreamArn, + shardId: ShardId + )(implicit F: Async[F]): F[Option[Metric]] = + if (level.isDetailed) + F.realTime + .map(d => Instant.EPOCH.plusNanos(d.toNanos)) + .map(now => + Some( + Metric( + "User Records per PutRecordsRequest", + getDimensions(granularity, streamArn, shardId, false), + now, + x.toDouble, + StandardUnit.Count + ) + ) + ) + else F.pure(None) +} diff --git a/shared/src/main/scala/kinesis4cats/producer/metrics/MetricsReporter.scala b/shared/src/main/scala/kinesis4cats/producer/metrics/MetricsReporter.scala new file mode 100644 index 00000000..8ec01d26 --- /dev/null +++ b/shared/src/main/scala/kinesis4cats/producer/metrics/MetricsReporter.scala @@ -0,0 +1,153 @@ +package kinesis4cats.producer.metrics + +import scala.concurrent.duration._ + +import cats.Applicative +import cats.effect.Async +import cats.effect.Fiber +import cats.effect.Resource +import cats.effect.syntax.all._ +import cats.syntax.all._ +import fs2.Chunk +import fs2.concurrent.Channel +import org.typelevel.log4cats.StructuredLogger + +import kinesis4cats.compat.retry._ +import kinesis4cats.logging._ + +abstract class MetricsReporter[F[_], PutRes]( + encoders: MetricsReporter.LogEncoders +)(implicit F: Async[F]) { + def config: MetricsReporter.Config[F] + def logger: StructuredLogger[F] + protected def channel: Channel[F, Metric] + + import encoders._ + + /** A user defined function that can be run against the results of a request + */ + protected def callback: PutRes => F[Unit] + + def _put(metrics: Chunk[Metric]): F[PutRes] + + def put(metric: Metric): F[Unit] = { + val ctx = LogContext() + + for { + _ <- logger.debug(ctx.context)("Received metric to put") + res <- channel.send(metric) + _ <- res.bitraverse( + _ => + logger.warn(ctx.context)( + "MetricsReporter has been shut down and will not accept further requests" + ), + _ => + logger.debug(ctx.context)( + "Successfully put metric into processing queue" + ) + ) + } yield () + } + + /** Stop the processing of records + */ + private[kinesis4cats] def stop(f: Fiber[F, Throwable, Unit]): F[Unit] = { + val ctx = LogContext() + for { + _ <- logger.debug(ctx.context)("Stopping the MetricsReporter") + _ <- channel.close + _ <- f.join.void.timeoutTo(config.gracefulShutdownWait, f.cancel) + } yield () + } + + /** Start the processing of records + */ + private[kinesis4cats] def start(): F[Unit] = { + val ctx = LogContext() + + for { + _ <- logger + .debug(ctx.context)("Starting the MetricsReporter") + _ <- channel.stream + .groupWithin(config.putMaxChunk, config.putMaxWait) + .evalMap { x => + val c = ctx.addEncoded("batchSize", x.size) + for { + _ <- logger.debug(c.context)( + "Received metrics batch to process" + ) + res <- retryingOnAllErrors( + config.retryPolicy, + (e: Throwable, details: RetryDetails) => + logger + .error(ctx.addEncoded("retryDetails", details).context, e)( + "Exception when putting metrics, retrying" + ) + )(_put(x)).attempt + _ <- res.leftTraverse { e => + if (config.raiseOnExhaustedRetries) F.raiseError(e).void + else if (config.warnOnFailures) + logger.warn(ctx.context, e)( + "Batch of metrics failed to upload" + ) + else F.unit + } + _ <- logger.debug(c.context)( + "Finished processing metrics batch" + ) + } yield () + } + .compile + .drain + } yield () + } + + private[kinesis4cats] def resource: Resource[F, Unit] = + Resource.make(start().start)(stop).void +} + +object MetricsReporter { + + /** [[kinesis4cats.logging.LogEncoder LogEncoder]] instances for the + * [[kinesis4cats.producer.Producer]] + * + * @param recordLogEncoder + * @param finiteDurationEncoder + */ + final class LogEncoders(implicit + val retryDetailsEncoder: LogEncoder[RetryDetails] + ) + + object LogEncoders { + val show = { + import kinesis4cats.logging.instances.show._ + new LogEncoders() + } + } + + final case class Config[F[_]]( + queueSize: Int, + putMaxChunk: Int, + putMaxWait: FiniteDuration, + level: Level, + granularity: Granularity, + gracefulShutdownWait: FiniteDuration, + raiseOnExhaustedRetries: Boolean, + warnOnFailures: Boolean, + retryPolicy: RetryPolicy[F] + ) + + object Config { + def default[F[_]](implicit F: Applicative[F]): Config[F] = Config[F]( + 1000, + 500, + 100.millis, + Level.Detailed, + Granularity.Shard, + 30.seconds, + false, + false, + RetryPolicies.alwaysGiveUp[F] + ) + } +} diff --git a/shared/src/main/scala/kinesis4cats/producer/metrics/StandardUnit.scala b/shared/src/main/scala/kinesis4cats/producer/metrics/StandardUnit.scala new file mode 100644 index 00000000..298d8a62 --- /dev/null +++ b/shared/src/main/scala/kinesis4cats/producer/metrics/StandardUnit.scala @@ -0,0 +1,9 @@ +package kinesis4cats.producer.metrics + +sealed abstract class StandardUnit(val value: String) + +object StandardUnit { + case object Count extends StandardUnit("Count") + case object Bytes extends StandardUnit("Bytes") + case object Milliseconds extends StandardUnit("Milliseconds") +}