Skip to content

Commit

Permalink
Builder Refactor (#109)
Browse files Browse the repository at this point in the history
  • Loading branch information
etspaceman authored Apr 21, 2023
1 parent 333cbd1 commit de54c60
Show file tree
Hide file tree
Showing 79 changed files with 2,564 additions and 3,775 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
package kinesis4cats.localstack
package aws.v1

import scala.concurrent.duration._

import cats.effect.syntax.all._
import cats.effect.{Async, Resource}
import cats.syntax.all._
Expand All @@ -28,7 +26,6 @@ import com.amazonaws.services.dynamodbv2._
import com.amazonaws.services.kinesis._
import com.amazonaws.services.kinesis.model._

import kinesis4cats.compat.retry.RetryPolicies._
import kinesis4cats.compat.retry._

/** Helpers for constructing and leveraging AWS Java Client interfaces with
Expand Down Expand Up @@ -138,27 +135,21 @@ object AwsClients {
*/
def createStream[F[_]](
client: AmazonKinesisAsync,
streamName: String,
shardCount: Int,
describeRetries: Int,
describeRetryDuration: FiniteDuration
)(implicit F: Async[F]): F[Unit] = {
val retryPolicy = constantDelay(describeRetryDuration).join(
limitRetries(describeRetries)
)
config: TestStreamConfig[F]
)(implicit F: Async[F]): F[Unit] =
for {
_ <- F.interruptibleMany(
client.createStream(
new CreateStreamRequest()
.withStreamName(streamName)
.withShardCount(shardCount)
.withStreamName(config.streamName)
.withShardCount(config.shardCount)
.withStreamModeDetails(
new StreamModeDetails().withStreamMode(StreamMode.PROVISIONED)
)
)
)
_ <- retryingOnFailuresAndAllErrors(
retryPolicy,
config.describeRetryPolicy,
(x: DescribeStreamSummaryResult) =>
F.pure(
x.getStreamDescriptionSummary().getStreamStatus() === "ACTIVE"
Expand All @@ -168,12 +159,11 @@ object AwsClients {
)(
F.interruptibleMany(
client.describeStreamSummary(
new DescribeStreamSummaryRequest().withStreamName(streamName)
new DescribeStreamSummaryRequest().withStreamName(config.streamName)
)
)
)
} yield ()
}

/** Deletes a stream and awaits for the stream deletion to be finalized
*
Expand All @@ -193,17 +183,12 @@ object AwsClients {
*/
def deleteStream[F[_]](
client: AmazonKinesisAsync,
streamName: String,
describeRetries: Int,
describeRetryDuration: FiniteDuration
)(implicit F: Async[F]): F[Unit] = {
val retryPolicy = constantDelay(describeRetryDuration).join(
limitRetries(describeRetries)
)
config: TestStreamConfig[F]
)(implicit F: Async[F]): F[Unit] =
for {
_ <- F.interruptibleMany(client.deleteStream(streamName))
_ <- F.interruptibleMany(client.deleteStream(config.streamName))
_ <- retryingOnFailuresAndSomeErrors(
retryPolicy,
config.describeRetryPolicy,
(x: Either[Throwable, DescribeStreamSummaryResult]) =>
F.pure(
x.swap.exists {
Expand All @@ -221,12 +206,11 @@ object AwsClients {
)(
F.interruptibleMany(
client.describeStreamSummary(
new DescribeStreamSummaryRequest().withStreamName(streamName)
new DescribeStreamSummaryRequest().withStreamName(config.streamName)
)
).attempt
)
} yield ()
}

/** A resource that does the following:
*
Expand Down Expand Up @@ -256,27 +240,18 @@ object AwsClients {
* [[https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisAsync.html AmazonKinesisAsync]]
*/
def kinesisStreamResource[F[_]](
config: LocalstackConfig,
streamName: String,
shardCount: Int,
describeRetries: Int,
describeRetryDuration: FiniteDuration
localstackConfig: LocalstackConfig,
streamsToCreate: List[TestStreamConfig[F]]
)(implicit
F: Async[F]
): Resource[F, AmazonKinesisAsync] = for {
client <- kinesisClientResource(config)
result <- Resource.make(
createStream(
client,
streamName,
shardCount,
describeRetries,
describeRetryDuration
).as(client)
)(client =>
deleteStream(client, streamName, describeRetries, describeRetryDuration)
client <- kinesisClientResource(localstackConfig)
_ <- streamsToCreate.traverse_(config =>
Resource.make(
createStream(client, config).as(client)
)(client => deleteStream(client, config))
)
} yield result
} yield client

/** A resource that does the following:
*
Expand Down Expand Up @@ -307,22 +282,13 @@ object AwsClients {
* [[https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisAsync.html AmazonKinesisAsync]]
*/
def kinesisStreamResource[F[_]](
streamName: String,
shardCount: Int,
prefix: Option[String] = None,
describeRetries: Int = 5,
describeRetryDuration: FiniteDuration = 500.millis
streamsToCreate: List[TestStreamConfig[F]],
prefix: Option[String] = None
)(implicit
F: Async[F]
): Resource[F, AmazonKinesisAsync] = for {
config <- LocalstackConfig.resource(prefix)
result <- kinesisStreamResource(
config,
streamName,
shardCount,
describeRetries,
describeRetryDuration
)
localstackConfig <- LocalstackConfig.resource(prefix)
result <- kinesisStreamResource(localstackConfig, streamsToCreate)
} yield result

/** Builds a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
package kinesis4cats.localstack
package aws.v2

import scala.concurrent.duration._

import cats.effect.syntax.all._
import cats.effect.{Async, Resource}
import cats.syntax.all._
Expand All @@ -32,7 +30,6 @@ import software.amazon.awssdk.services.kinesis.KinesisAsyncClient
import software.amazon.awssdk.services.kinesis.model._
import software.amazon.awssdk.utils.AttributeMap

import kinesis4cats.compat.retry.RetryPolicies._
import kinesis4cats.compat.retry._

/** Helpers for constructing and leveraging AWS Java Client interfaces with
Expand Down Expand Up @@ -166,22 +163,16 @@ object AwsClients {
*/
def createStream[F[_]](
client: KinesisAsyncClient,
streamName: String,
shardCount: Int,
describeRetries: Int,
describeRetryDuration: FiniteDuration
)(implicit F: Async[F]): F[Unit] = {
val retryPolicy = constantDelay(describeRetryDuration).join(
limitRetries(describeRetries)
)
config: TestStreamConfig[F]
)(implicit F: Async[F]): F[Unit] =
for {
_ <- F.fromCompletableFuture(
F.delay(
client.createStream(
CreateStreamRequest
.builder()
.streamName(streamName)
.shardCount(shardCount)
.streamName(config.streamName)
.shardCount(config.shardCount)
.streamModeDetails(
StreamModeDetails
.builder()
Expand All @@ -193,7 +184,7 @@ object AwsClients {
)
)
_ <- retryingOnFailuresAndAllErrors(
retryPolicy,
config.describeRetryPolicy,
(x: DescribeStreamSummaryResponse) =>
F.pure(
x.streamDescriptionSummary()
Expand All @@ -207,14 +198,13 @@ object AwsClients {
client.describeStreamSummary(
DescribeStreamSummaryRequest
.builder()
.streamName(streamName)
.streamName(config.streamName)
.build()
)
)
)
)
} yield ()
}

/** Deletes a stream and awaits for the stream deletion to be finalized
*
Expand All @@ -234,23 +224,18 @@ object AwsClients {
*/
def deleteStream[F[_]](
client: KinesisAsyncClient,
streamName: String,
describeRetries: Int,
describeRetryDuration: FiniteDuration
)(implicit F: Async[F]): F[Unit] = {
val retryPolicy = constantDelay(describeRetryDuration).join(
limitRetries(describeRetries)
)
config: TestStreamConfig[F]
)(implicit F: Async[F]): F[Unit] =
for {
_ <- F.fromCompletableFuture(
F.delay(
client.deleteStream(
DeleteStreamRequest.builder().streamName(streamName).build()
DeleteStreamRequest.builder().streamName(config.streamName).build()
)
)
)
_ <- retryingOnFailuresAndSomeErrors(
retryPolicy,
config.describeRetryPolicy,
(x: Either[Throwable, DescribeStreamSummaryResponse]) =>
F.pure(
x.swap.exists {
Expand All @@ -271,14 +256,13 @@ object AwsClients {
client.describeStreamSummary(
DescribeStreamSummaryRequest
.builder()
.streamName(streamName)
.streamName(config.streamName)
.build()
)
)
).attempt
)
} yield ()
}

/** A resource that does the following:
*
Expand Down Expand Up @@ -308,27 +292,18 @@ object AwsClients {
* [[https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/kinesis/KinesisAsyncClient.html KinesisAsyncClient]]
*/
def kinesisStreamResource[F[_]](
config: LocalstackConfig,
streamName: String,
shardCount: Int,
describeRetries: Int,
describeRetryDuration: FiniteDuration
localstackConfig: LocalstackConfig,
streamsToCreate: List[TestStreamConfig[F]]
)(implicit
F: Async[F]
): Resource[F, KinesisAsyncClient] = for {
client <- kinesisClientResource(config)
result <- Resource.make(
createStream(
client,
streamName,
shardCount,
describeRetries,
describeRetryDuration
).as(client)
)(client =>
deleteStream(client, streamName, describeRetries, describeRetryDuration)
client <- kinesisClientResource(localstackConfig)
_ <- streamsToCreate.traverse_(config =>
Resource.make(createStream(client, config).as(client))(client =>
deleteStream(client, config)
)
)
} yield result
} yield client

/** A resource that does the following:
*
Expand Down Expand Up @@ -359,22 +334,13 @@ object AwsClients {
* [[https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/kinesis/KinesisAsyncClient.html KinesisAsyncClient]]
*/
def kinesisStreamResource[F[_]](
streamName: String,
shardCount: Int,
prefix: Option[String] = None,
describeRetries: Int = 5,
describeRetryDuration: FiniteDuration = 500.millis
streamsToCreate: List[TestStreamConfig[F]],
prefix: Option[String] = None
)(implicit
F: Async[F]
): Resource[F, KinesisAsyncClient] = for {
config <- LocalstackConfig.resource(prefix)
result <- kinesisStreamResource(
config,
streamName,
shardCount,
describeRetries,
describeRetryDuration
)
localstackConfig <- LocalstackConfig.resource(prefix)
result <- kinesisStreamResource(localstackConfig, streamsToCreate)
} yield result

/** Builds a
Expand Down
6 changes: 4 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ import laika.rewrite.link._

lazy val compat = projectMatrix
.settings(
description := "Code to maintain compatability across major scala versions"
description := "Code to maintain compatability across major scala versions",
scalacOptions --= Seq("-deprecation", "-Xlint:deprecation", "-Xsource:3"),
Compile / doc / sources := Seq.empty,
Compile / packageDoc / publishArtifact := false
)
.jvmPlatform(allScalaVersions)
.nativePlatform(allScalaVersions)
Expand Down Expand Up @@ -475,7 +478,6 @@ lazy val unidocs = projectMatrix
moduleName := name.value,
ScalaUnidoc / unidoc / unidocProjectFilter := inProjects(
List(
compat,
shared,
`shared-circe`,
`shared-ciris`,
Expand Down
3 changes: 1 addition & 2 deletions docs/client/circe.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@ libraryDependencies += "io.github.etspaceman" %% "kinesis4cats-client-logging-ci

```scala mdoc:compile-only
import cats.effect._
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient

import kinesis4cats.client.KinesisClient
import kinesis4cats.client.logging.instances.circe._

KinesisClient[IO](KinesisAsyncClient.builder().build(), kinesisClientCirceEncoders)
KinesisClient.Builder.default[IO].withLogEncoders(kinesisClientCirceEncoders).build
```
Loading

0 comments on commit de54c60

Please sign in to comment.