Skip to content

Commit

Permalink
Separate methods for create/delete stream in localstack utilities (#90)
Browse files Browse the repository at this point in the history
  • Loading branch information
etspaceman authored Apr 4, 2023
1 parent b02721a commit 7bb743d
Show file tree
Hide file tree
Showing 3 changed files with 274 additions and 203 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,116 @@ object AwsClients {
): Resource[F, AmazonKinesisAsync] =
kinesisClient[F](prefix).toResource

/** Creates a stream and awaits for the status to be ready
*
* @param client
* [[https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisAsync.html AmazonKinesisAsync]]
* to use
* @param streamName
* Stream name
* @param shardCount
* Shard count for stream
* @param describeRetries
* How many times to retry DescribeStreamSummary when checking the stream
* status
* @param describeRetryDuration
* How long to delay between retries of the DescribeStreamSummary call
* @param F
* F with an [[cats.effect.Async Async]] instance
* @return
*/
def createStream[F[_]](
client: AmazonKinesisAsync,
streamName: String,
shardCount: Int,
describeRetries: Int,
describeRetryDuration: FiniteDuration
)(implicit F: Async[F]): F[Unit] = {
val retryPolicy = constantDelay(describeRetryDuration).join(
limitRetries(describeRetries)
)
for {
_ <- F.interruptibleMany(
client.createStream(
new CreateStreamRequest()
.withStreamName(streamName)
.withShardCount(shardCount)
.withStreamModeDetails(
new StreamModeDetails().withStreamMode(StreamMode.PROVISIONED)
)
)
)
_ <- retryingOnFailuresAndAllErrors(
retryPolicy,
(x: DescribeStreamSummaryResult) =>
F.pure(
x.getStreamDescriptionSummary().getStreamStatus() === "ACTIVE"
),
noop[F, DescribeStreamSummaryResult],
noop[F, Throwable]
)(
F.interruptibleMany(
client.describeStreamSummary(
new DescribeStreamSummaryRequest().withStreamName(streamName)
)
)
)
} yield ()
}

/** Deletes a stream and awaits for the stream deletion to be finalized
*
* @param client
* [[https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisAsync.html AmazonKinesisAsync]]
* to use
* @param streamName
* Stream name
* @param describeRetries
* How many times to retry DescribeStreamSummary when checking the stream
* status
* @param describeRetryDuration
* How long to delay between retries of the DescribeStreamSummary call
* @param F
* F with an [[cats.effect.Async Async]] instance
* @return
*/
def deleteStream[F[_]](
client: AmazonKinesisAsync,
streamName: String,
describeRetries: Int,
describeRetryDuration: FiniteDuration
)(implicit F: Async[F]): F[Unit] = {
val retryPolicy = constantDelay(describeRetryDuration).join(
limitRetries(describeRetries)
)
for {
_ <- F.interruptibleMany(client.deleteStream(streamName))
_ <- retryingOnFailuresAndSomeErrors(
retryPolicy,
(x: Either[Throwable, DescribeStreamSummaryResult]) =>
F.pure(
x.swap.exists {
case _: ResourceNotFoundException => true
case _ => false
}
),
(e: Throwable) =>
e match {
case _: ResourceNotFoundException => F.pure(false)
case _ => F.pure(true)
},
noop[F, Either[Throwable, DescribeStreamSummaryResult]],
noop[F, Throwable]
)(
F.interruptibleMany(
client.describeStreamSummary(
new DescribeStreamSummaryRequest().withStreamName(streamName)
)
).attempt
)
} yield ()
}

/** A resource that does the following:
*
* - Builds a
Expand Down Expand Up @@ -155,64 +265,16 @@ object AwsClients {
F: Async[F]
): Resource[F, AmazonKinesisAsync] = for {
client <- kinesisClientResource(config)
retryPolicy = constantDelay(describeRetryDuration).join(
limitRetries(describeRetries)
)
result <- Resource.make(
for {
_ <- F.interruptibleMany(
client.createStream(
new CreateStreamRequest()
.withStreamName(streamName)
.withShardCount(shardCount)
.withStreamModeDetails(
new StreamModeDetails().withStreamMode(StreamMode.PROVISIONED)
)
)
)
_ <- retryingOnFailuresAndAllErrors(
retryPolicy,
(x: DescribeStreamSummaryResult) =>
F.pure(
x.getStreamDescriptionSummary().getStreamStatus() === "ACTIVE"
),
noop[F, DescribeStreamSummaryResult],
noop[F, Throwable]
)(
F.interruptibleMany(
client.describeStreamSummary(
new DescribeStreamSummaryRequest().withStreamName(streamName)
)
)
)
} yield client
createStream(
client,
streamName,
shardCount,
describeRetries,
describeRetryDuration
).as(client)
)(client =>
for {
_ <- F.interruptibleMany(client.deleteStream(streamName))
_ <- retryingOnFailuresAndSomeErrors(
retryPolicy,
(x: Either[Throwable, DescribeStreamSummaryResult]) =>
F.pure(
x.swap.exists {
case _: ResourceNotFoundException => true
case _ => false
}
),
(e: Throwable) =>
e match {
case _: ResourceNotFoundException => F.pure(false)
case _ => F.pure(true)
},
noop[F, Either[Throwable, DescribeStreamSummaryResult]],
noop[F, Throwable]
)(
F.interruptibleMany(
client.describeStreamSummary(
new DescribeStreamSummaryRequest().withStreamName(streamName)
)
).attempt
)
} yield ()
deleteStream(client, streamName, describeRetries, describeRetryDuration)
)
} yield result

Expand Down
Loading

0 comments on commit 7bb743d

Please sign in to comment.