diff --git a/aws-v1-localstack/src/main/scala/kinesis4cats/localstack/aws/v1/AwsClients.scala b/aws-v1-localstack/src/main/scala/kinesis4cats/localstack/aws/v1/AwsClients.scala index 6ffad7eb..326e7242 100644 --- a/aws-v1-localstack/src/main/scala/kinesis4cats/localstack/aws/v1/AwsClients.scala +++ b/aws-v1-localstack/src/main/scala/kinesis4cats/localstack/aws/v1/AwsClients.scala @@ -118,6 +118,116 @@ object AwsClients { ): Resource[F, AmazonKinesisAsync] = kinesisClient[F](prefix).toResource + /** Creates a stream and awaits for the status to be ready + * + * @param client + * [[https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisAsync.html AmazonKinesisAsync]] + * to use + * @param streamName + * Stream name + * @param shardCount + * Shard count for stream + * @param describeRetries + * How many times to retry DescribeStreamSummary when checking the stream + * status + * @param describeRetryDuration + * How long to delay between retries of the DescribeStreamSummary call + * @param F + * F with an [[cats.effect.Async Async]] instance + * @return + */ + def createStream[F[_]]( + client: AmazonKinesisAsync, + streamName: String, + shardCount: Int, + describeRetries: Int, + describeRetryDuration: FiniteDuration + )(implicit F: Async[F]): F[Unit] = { + val retryPolicy = constantDelay(describeRetryDuration).join( + limitRetries(describeRetries) + ) + for { + _ <- F.interruptibleMany( + client.createStream( + new CreateStreamRequest() + .withStreamName(streamName) + .withShardCount(shardCount) + .withStreamModeDetails( + new StreamModeDetails().withStreamMode(StreamMode.PROVISIONED) + ) + ) + ) + _ <- retryingOnFailuresAndAllErrors( + retryPolicy, + (x: DescribeStreamSummaryResult) => + F.pure( + x.getStreamDescriptionSummary().getStreamStatus() === "ACTIVE" + ), + noop[F, DescribeStreamSummaryResult], + noop[F, Throwable] + )( + F.interruptibleMany( + client.describeStreamSummary( + new DescribeStreamSummaryRequest().withStreamName(streamName) + ) + ) + ) + } yield () + } + + /** Deletes a stream and awaits for the stream deletion to be finalized + * + * @param client + * [[https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisAsync.html AmazonKinesisAsync]] + * to use + * @param streamName + * Stream name + * @param describeRetries + * How many times to retry DescribeStreamSummary when checking the stream + * status + * @param describeRetryDuration + * How long to delay between retries of the DescribeStreamSummary call + * @param F + * F with an [[cats.effect.Async Async]] instance + * @return + */ + def deleteStream[F[_]]( + client: AmazonKinesisAsync, + streamName: String, + describeRetries: Int, + describeRetryDuration: FiniteDuration + )(implicit F: Async[F]): F[Unit] = { + val retryPolicy = constantDelay(describeRetryDuration).join( + limitRetries(describeRetries) + ) + for { + _ <- F.interruptibleMany(client.deleteStream(streamName)) + _ <- retryingOnFailuresAndSomeErrors( + retryPolicy, + (x: Either[Throwable, DescribeStreamSummaryResult]) => + F.pure( + x.swap.exists { + case _: ResourceNotFoundException => true + case _ => false + } + ), + (e: Throwable) => + e match { + case _: ResourceNotFoundException => F.pure(false) + case _ => F.pure(true) + }, + noop[F, Either[Throwable, DescribeStreamSummaryResult]], + noop[F, Throwable] + )( + F.interruptibleMany( + client.describeStreamSummary( + new DescribeStreamSummaryRequest().withStreamName(streamName) + ) + ).attempt + ) + } yield () + } + /** A resource that does the following: * * - Builds a @@ -155,64 +265,16 @@ object AwsClients { F: Async[F] ): Resource[F, AmazonKinesisAsync] = for { client <- kinesisClientResource(config) - retryPolicy = constantDelay(describeRetryDuration).join( - limitRetries(describeRetries) - ) result <- Resource.make( - for { - _ <- F.interruptibleMany( - client.createStream( - new CreateStreamRequest() - .withStreamName(streamName) - .withShardCount(shardCount) - .withStreamModeDetails( - new StreamModeDetails().withStreamMode(StreamMode.PROVISIONED) - ) - ) - ) - _ <- retryingOnFailuresAndAllErrors( - retryPolicy, - (x: DescribeStreamSummaryResult) => - F.pure( - x.getStreamDescriptionSummary().getStreamStatus() === "ACTIVE" - ), - noop[F, DescribeStreamSummaryResult], - noop[F, Throwable] - )( - F.interruptibleMany( - client.describeStreamSummary( - new DescribeStreamSummaryRequest().withStreamName(streamName) - ) - ) - ) - } yield client + createStream( + client, + streamName, + shardCount, + describeRetries, + describeRetryDuration + ).as(client) )(client => - for { - _ <- F.interruptibleMany(client.deleteStream(streamName)) - _ <- retryingOnFailuresAndSomeErrors( - retryPolicy, - (x: Either[Throwable, DescribeStreamSummaryResult]) => - F.pure( - x.swap.exists { - case _: ResourceNotFoundException => true - case _ => false - } - ), - (e: Throwable) => - e match { - case _: ResourceNotFoundException => F.pure(false) - case _ => F.pure(true) - }, - noop[F, Either[Throwable, DescribeStreamSummaryResult]], - noop[F, Throwable] - )( - F.interruptibleMany( - client.describeStreamSummary( - new DescribeStreamSummaryRequest().withStreamName(streamName) - ) - ).attempt - ) - } yield () + deleteStream(client, streamName, describeRetries, describeRetryDuration) ) } yield result diff --git a/aws-v2-localstack/src/main/scala/kinesis4cats/localstack/aws/v2/AwsClients.scala b/aws-v2-localstack/src/main/scala/kinesis4cats/localstack/aws/v2/AwsClients.scala index 4f31ecef..08ceb253 100644 --- a/aws-v2-localstack/src/main/scala/kinesis4cats/localstack/aws/v2/AwsClients.scala +++ b/aws-v2-localstack/src/main/scala/kinesis4cats/localstack/aws/v2/AwsClients.scala @@ -146,6 +146,140 @@ object AwsClients { ): Resource[F, KinesisAsyncClient] = kinesisClient[F](prefix).toResource + /** Creates a stream and awaits for the status to be ready + * + * @param client + * [[https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/kinesis/KinesisAsyncClient.html KinesisAsyncClient]] + * to use + * @param streamName + * Stream name + * @param shardCount + * Shard count for stream + * @param describeRetries + * How many times to retry DescribeStreamSummary when checking the stream + * status + * @param describeRetryDuration + * How long to delay between retries of the DescribeStreamSummary call + * @param F + * F with an [[cats.effect.Async Async]] instance + * @return + */ + def createStream[F[_]]( + client: KinesisAsyncClient, + streamName: String, + shardCount: Int, + describeRetries: Int, + describeRetryDuration: FiniteDuration + )(implicit F: Async[F]): F[Unit] = { + val retryPolicy = constantDelay(describeRetryDuration).join( + limitRetries(describeRetries) + ) + for { + _ <- F.fromCompletableFuture( + F.delay( + client.createStream( + CreateStreamRequest + .builder() + .streamName(streamName) + .shardCount(shardCount) + .streamModeDetails( + StreamModeDetails + .builder() + .streamMode(StreamMode.PROVISIONED) + .build() + ) + .build() + ) + ) + ) + _ <- retryingOnFailuresAndAllErrors( + retryPolicy, + (x: DescribeStreamSummaryResponse) => + F.pure( + x.streamDescriptionSummary() + .streamStatus() == StreamStatus.ACTIVE + ), + noop[F, DescribeStreamSummaryResponse], + noop[F, Throwable] + )( + F.fromCompletableFuture( + F.delay( + client.describeStreamSummary( + DescribeStreamSummaryRequest + .builder() + .streamName(streamName) + .build() + ) + ) + ) + ) + } yield () + } + + /** Deletes a stream and awaits for the stream deletion to be finalized + * + * @param client + * [[https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/kinesis/KinesisAsyncClient.html KinesisAsyncClient]] + * to use + * @param streamName + * Stream name + * @param describeRetries + * How many times to retry DescribeStreamSummary when checking the stream + * status + * @param describeRetryDuration + * How long to delay between retries of the DescribeStreamSummary call + * @param F + * F with an [[cats.effect.Async Async]] instance + * @return + */ + def deleteStream[F[_]]( + client: KinesisAsyncClient, + streamName: String, + describeRetries: Int, + describeRetryDuration: FiniteDuration + )(implicit F: Async[F]): F[Unit] = { + val retryPolicy = constantDelay(describeRetryDuration).join( + limitRetries(describeRetries) + ) + for { + _ <- F.fromCompletableFuture( + F.delay( + client.deleteStream( + DeleteStreamRequest.builder().streamName(streamName).build() + ) + ) + ) + _ <- retryingOnFailuresAndSomeErrors( + retryPolicy, + (x: Either[Throwable, DescribeStreamSummaryResponse]) => + F.pure( + x.swap.exists { + case _: ResourceNotFoundException => true + case _ => false + } + ), + (e: Throwable) => + e match { + case _: ResourceNotFoundException => F.pure(false) + case _ => F.pure(true) + }, + noop[F, Either[Throwable, DescribeStreamSummaryResponse]], + noop[F, Throwable] + )( + F.fromCompletableFuture( + F.delay( + client.describeStreamSummary( + DescribeStreamSummaryRequest + .builder() + .streamName(streamName) + .build() + ) + ) + ).attempt + ) + } yield () + } + /** A resource that does the following: * * - Builds a @@ -183,88 +317,16 @@ object AwsClients { F: Async[F] ): Resource[F, KinesisAsyncClient] = for { client <- kinesisClientResource(config) - retryPolicy = constantDelay(describeRetryDuration).join( - limitRetries(describeRetries) - ) result <- Resource.make( - for { - _ <- F.fromCompletableFuture( - F.delay( - client.createStream( - CreateStreamRequest - .builder() - .streamName(streamName) - .shardCount(shardCount) - .streamModeDetails( - StreamModeDetails - .builder() - .streamMode(StreamMode.PROVISIONED) - .build() - ) - .build() - ) - ) - ) - _ <- retryingOnFailuresAndAllErrors( - retryPolicy, - (x: DescribeStreamSummaryResponse) => - F.pure( - x.streamDescriptionSummary() - .streamStatus() == StreamStatus.ACTIVE - ), - noop[F, DescribeStreamSummaryResponse], - noop[F, Throwable] - )( - F.fromCompletableFuture( - F.delay( - client.describeStreamSummary( - DescribeStreamSummaryRequest - .builder() - .streamName(streamName) - .build() - ) - ) - ) - ) - } yield client + createStream( + client, + streamName, + shardCount, + describeRetries, + describeRetryDuration + ).as(client) )(client => - for { - _ <- F.fromCompletableFuture( - F.delay( - client.deleteStream( - DeleteStreamRequest.builder().streamName(streamName).build() - ) - ) - ) - _ <- retryingOnFailuresAndSomeErrors( - retryPolicy, - (x: Either[Throwable, DescribeStreamSummaryResponse]) => - F.pure( - x.swap.exists { - case _: ResourceNotFoundException => true - case _ => false - } - ), - (e: Throwable) => - e match { - case _: ResourceNotFoundException => F.pure(false) - case _ => F.pure(true) - }, - noop[F, Either[Throwable, DescribeStreamSummaryResponse]], - noop[F, Throwable] - )( - F.fromCompletableFuture( - F.delay( - client.describeStreamSummary( - DescribeStreamSummaryRequest - .builder() - .streamName(streamName) - .build() - ) - ) - ).attempt - ) - } yield () + deleteStream(client, streamName, describeRetries, describeRetryDuration) ) } yield result diff --git a/kinesis-client-localstack/src/main/scala/kinesis4cats/client/localstack/LocalstackKinesisClient.scala b/kinesis-client-localstack/src/main/scala/kinesis4cats/client/localstack/LocalstackKinesisClient.scala index 2eccfbcc..72f89b02 100644 --- a/kinesis-client-localstack/src/main/scala/kinesis4cats/client/localstack/LocalstackKinesisClient.scala +++ b/kinesis-client-localstack/src/main/scala/kinesis4cats/client/localstack/LocalstackKinesisClient.scala @@ -24,10 +24,7 @@ import cats.effect.syntax.all._ import cats.effect.{Async, Resource} import cats.syntax.all._ import org.typelevel.log4cats.slf4j.Slf4jLogger -import software.amazon.awssdk.services.kinesis.model._ -import kinesis4cats.compat.retry.RetryPolicies._ -import kinesis4cats.compat.retry._ import kinesis4cats.localstack.LocalstackConfig import kinesis4cats.localstack.aws.v2.AwsClients @@ -115,74 +112,24 @@ object LocalstackKinesisClient { LE: KinesisClient.LogEncoders ): Resource[F, KinesisClient[F]] = for { client <- clientResource(config) - retryPolicy = constantDelay(describeRetryDuration).join( - limitRetries(describeRetries) - ) result <- Resource.make( - for { - _ <- client.createStream( - CreateStreamRequest - .builder() - .streamName(streamName) - .shardCount(shardCount) - .streamModeDetails( - StreamModeDetails - .builder() - .streamMode(StreamMode.PROVISIONED) - .build() - ) - .build() - ) - _ <- retryingOnFailuresAndAllErrors( - retryPolicy, - (x: DescribeStreamSummaryResponse) => - F.pure( - x.streamDescriptionSummary() - .streamStatus() == StreamStatus.ACTIVE - ), - noop[F, DescribeStreamSummaryResponse], - noop[F, Throwable] - )( - client.describeStreamSummary( - DescribeStreamSummaryRequest - .builder() - .streamName(streamName) - .build() - ) + AwsClients + .createStream( + client.client, + streamName, + shardCount, + describeRetries, + describeRetryDuration ) - } yield client + .as(client) )(client => - for { - _ <- client.deleteStream( - DeleteStreamRequest.builder().streamName(streamName).build() - ) - _ <- retryingOnFailuresAndSomeErrors( - retryPolicy, - (x: Either[Throwable, DescribeStreamSummaryResponse]) => - F.pure( - x.swap.exists { - case _: ResourceNotFoundException => true - case _ => false - } - ), - (e: Throwable) => - e match { - case _: ResourceNotFoundException => F.pure(false) - case _ => F.pure(true) - }, - noop[F, Either[Throwable, DescribeStreamSummaryResponse]], - noop[F, Throwable] - )( - client - .describeStreamSummary( - DescribeStreamSummaryRequest - .builder() - .streamName(streamName) - .build() - ) - .attempt + AwsClients + .deleteStream( + client.client, + streamName, + describeRetries, + describeRetryDuration ) - } yield () ) } yield result