Skip to content

Commit

Permalink
Refactor Producer Results + Better FS2 Error Handling + Public Retry (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
etspaceman authored Jun 6, 2023
1 parent 01f2353 commit 1edbea8
Show file tree
Hide file tree
Showing 24 changed files with 417 additions and 443 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ package kinesis4cats.compat.retry

import scala.concurrent.duration.FiniteDuration

private[kinesis4cats] sealed trait PolicyDecision
sealed trait PolicyDecision

private[kinesis4cats] object PolicyDecision {
object PolicyDecision {
case object GiveUp extends PolicyDecision

final case class DelayAndRetry(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ package kinesis4cats.compat.retry

import scala.concurrent.duration.FiniteDuration

private[kinesis4cats] sealed trait RetryDetails {
sealed trait RetryDetails {
def retriesSoFar: Int
def cumulativeDelay: FiniteDuration
def givingUp: Boolean
def upcomingDelay: Option[FiniteDuration]
}

private[kinesis4cats] object RetryDetails {
object RetryDetails {
final case class GivingUp(
totalRetries: Int,
totalDelay: FiniteDuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import cats.syntax.show._

import kinesis4cats.compat.retry.PolicyDecision._

private[kinesis4cats] object RetryPolicies {
object RetryPolicies {
private val LongMax: BigInt = BigInt(Long.MaxValue)

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import cats.{Applicative, Apply, Functor, Monad}

import kinesis4cats.compat.retry.PolicyDecision._

private[kinesis4cats] case class RetryPolicy[M[_]](
case class RetryPolicy[M[_]](
decideNextRetry: RetryStatus => M[PolicyDecision]
) {
def show: String = toString
Expand Down Expand Up @@ -105,7 +105,7 @@ private[kinesis4cats] case class RetryPolicy[M[_]](
)
}

private[kinesis4cats] object RetryPolicy {
object RetryPolicy {
def lift[M[_]](
f: RetryStatus => PolicyDecision
)(implicit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package kinesis4cats.compat.retry

import scala.concurrent.duration.{Duration, FiniteDuration}

private[kinesis4cats] final case class RetryStatus(
final case class RetryStatus(
retriesSoFar: Int,
cumulativeDelay: FiniteDuration,
previousDelay: Option[FiniteDuration]
Expand All @@ -30,6 +30,6 @@ private[kinesis4cats] final case class RetryStatus(
)
}

private[kinesis4cats] object RetryStatus {
object RetryStatus {
val NoRetriesYet = RetryStatus(0, Duration.Zero, None)
}
1 change: 0 additions & 1 deletion docs/client/localstack.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ libraryDependencies += "io.github.etspaceman" %% "kinesis4cats-kinesis-client-lo

```scala mdoc:compile-only
import cats.effect.IO
import cats.effect.syntax.all._

import kinesis4cats.client.localstack.LocalstackKinesisClient
import kinesis4cats.client.producer.localstack.LocalstackKinesisProducer
Expand Down
1 change: 0 additions & 1 deletion docs/kcl/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ It is not recommended to use this in production as scaling the application becom

```scala mdoc:compile-only
import cats.effect._
import cats.effect.syntax.all._
import cats.syntax.all._
import software.amazon.kinesis.common._

Expand Down
1 change: 0 additions & 1 deletion docs/kpl/localstack.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ libraryDependencies += "io.github.etspaceman" %% "kinesis4cats-kpl-localstack" %

```scala mdoc:compile-only
import cats.effect.IO
import cats.effect.syntax.all._

import kinesis4cats.localstack.TestStreamConfig
import kinesis4cats.kpl.localstack.LocalstackKPLProducer
Expand Down
1 change: 0 additions & 1 deletion docs/smithy4s/localstack.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ libraryDependencies += "io.github.etspaceman" %% "kinesis4cats-smithy4s-client-l

```scala mdoc:compile-only
import cats.effect._
import cats.effect.syntax.all._
import org.http4s.blaze.client.BlazeClientBuilder
import org.typelevel.log4cats.slf4j.Slf4jLogger
import smithy4s.aws._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ object LocalstackFS2KinesisProducer {
KinesisClient[F],
StreamNameOrArn
) => F[Either[ShardMapCache.Error, ShardMap]],
callback: Producer.Res[PutRecordsResponse] => F[Unit]
callback: Producer.Result[PutRecordsResponse] => F[Unit]
)(implicit F: Async[F]) {

def withLocalstackConfig(localstackConfig: LocalstackConfig): Builder[F] =
Expand Down Expand Up @@ -139,7 +139,7 @@ object LocalstackFS2KinesisProducer {
Nil,
(client: KinesisClient[F], snoa: StreamNameOrArn) =>
KinesisProducer.getShardMap(client, snoa),
(_: Producer.Res[PutRecordsResponse]) => F.unit
(_: Producer.Result[PutRecordsResponse]) => F.unit
)

@annotation.unused
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ final class FS2KinesisProducer[F[_]] private[kinesis4cats] (
override protected val channel: Channel[F, Record],
override protected val underlying: KinesisProducer[F]
)(
override protected val callback: Producer.Res[PutRecordsResponse] => F[Unit]
override protected val callback: Producer.Result[PutRecordsResponse] => F[
Unit
]
)(implicit
F: Async[F]
) extends FS2Producer[F, PutRecordsRequest, PutRecordsResponse]
Expand All @@ -63,7 +65,7 @@ object FS2KinesisProducer {
clientResource: Resource[F, KinesisClient[F]],
encoders: KinesisProducer.LogEncoders,
logger: StructuredLogger[F],
callback: Producer.Res[PutRecordsResponse] => F[Unit]
callback: Producer.Result[PutRecordsResponse] => F[Unit]
)(implicit F: Async[F]) {
def withConfig(config: FS2Producer.Config[F]): Builder[F] = copy(
config = config
Expand Down Expand Up @@ -108,7 +110,7 @@ object FS2KinesisProducer {
KinesisClient.Builder.default.build,
KinesisProducer.LogEncoders.show,
Slf4jLogger.getLogger,
(_: Producer.Res[PutRecordsResponse]) => F.unit
(_: Producer.Result[PutRecordsResponse]) => F.unit
)

@annotation.unused
Expand Down
Loading

0 comments on commit 1edbea8

Please sign in to comment.