Skip to content

Commit

Permalink
Use FS2 Streams for responses to SubscribeToShard + Paginated Requests (
Browse files Browse the repository at this point in the history
  • Loading branch information
etspaceman authored Mar 14, 2023
1 parent 12bc763 commit 1851288
Show file tree
Hide file tree
Showing 9 changed files with 265 additions and 90 deletions.
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,8 @@ lazy val `kinesis-client` = projectMatrix
Aws.V2.kinesis,
Aws.V2.dynamo,
Aws.V2.cloudwatch,
Log4Cats.slf4j
Log4Cats.slf4j,
FS2.reactiveStreams
)
)
.jvmPlatform(allScalaVersions)
Expand Down
3 changes: 0 additions & 3 deletions docs/client/localstack.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ import kinesis4cats.client.producer.localstack.LocalstackKinesisProducer
import kinesis4cats.client.producer.fs2.localstack.LocalstackFS2KinesisProducer
import kinesis4cats.producer.logging.instances.show._

// Load a KinesisClient as an effect
LocalstackKinesisClient.client[IO]()

// Load a KinesisClient as a Resource
LocalstackKinesisClient.clientResource[IO]()

Expand Down
2 changes: 1 addition & 1 deletion integration-tests/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
</encoder>
</appender>
<logger name="kinesis4cats" level="ERROR"/>
<logger name="software.amazon.kinesis" level="ERROR"/>
<logger name="software.amazon" level="ERROR"/>
<appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender">
<appender-ref ref="STDOUT" />
</appender>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import scala.jdk.CollectionConverters._

import cats.effect.{IO, SyncIO}
import cats.syntax.all._
import fs2.interop.reactivestreams._
import io.circe.parser._
import io.circe.syntax._
import org.scalacheck.Arbitrary
Expand Down Expand Up @@ -180,17 +179,48 @@ class KinesisClientSpec extends munit.CatsEffectSuite {
recordsParsed <- recordBytes.traverse(bytes =>
IO.fromEither(decode[TestData](bytes))
)
streams <- client.listStreams()
streams2 <- client.listStreamsPaginator().compile.toList
streams3 <- client
.listStreamsPaginator(
ListStreamsRequest.builder().build()
)
.compile
.toList
consumers <- client.listStreamConsumers(
ListStreamConsumersRequest.builder().streamARN(streamArn).build()
)
consumers2 <- client
.listStreamConsumersPaginator(
ListStreamConsumersRequest.builder().streamARN(streamArn).build()
)
.flatMap { x =>
fromPublisher[IO, ListStreamConsumersResponse](x, 10).compile.toList

}
.compile
.toList
records2 <- client
.subscribeToShard(
SubscribeToShardRequest
.builder()
.consumerARN(
consumers.consumers().asScala.toList.head.consumerARN()
)
.shardId(shard.shardId())
.startingPosition(
StartingPosition
.builder()
.`type`(ShardIteratorType.TRIM_HORIZON)
.build()
)
.build()
)
.take(3)
.compile
.toList
recordBytes2 = records2
.flatMap(_.records().asScala.toList)
.map(x => new String(x.data().asByteArray()))
recordsParsed2 <- recordBytes2.traverse(bytes =>
IO.fromEither(decode[TestData](bytes))
)
_ <- client.deregisterStreamConsumer(
DeregisterStreamConsumerRequest
.builder()
Expand Down Expand Up @@ -260,8 +290,21 @@ class KinesisClientSpec extends munit.CatsEffectSuite {
)
} yield {
assertEquals(List(record1, record2, record3), recordsParsed)
assertEquals(List(record1, record2, record3), recordsParsed2)
assert(consumers.consumers().size() === 1)
assert(consumers2.head.consumers().size() === 1)
assertEquals(
streams.streamNames().asScala.toList,
List(streamName, "test-kcl-service-stream")
)
assertEquals(
streams2.flatMap(_.streamNames().asScala.toList),
List(streamName, "test-kcl-service-stream")
)
assertEquals(
streams3.flatMap(_.streamNames().asScala.toList),
List(streamName, "test-kcl-service-stream")
)
assertEquals(
tags.tags().asScala.toList.map(x => (x.key(), x.value())),
List("foo" -> "bar")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package localstack

import scala.concurrent.duration._

import cats.effect.std.Dispatcher
import cats.effect.syntax.all._
import cats.effect.{Async, Resource}
import cats.syntax.all._
Expand All @@ -32,46 +33,6 @@ import kinesis4cats.localstack.aws.v2.AwsClients

object LocalstackKinesisClient {

/** Builds a [[kinesis4cats.client.KinesisClient KinesisClient]] that is
* compliant for Localstack usage.
*
* @param config
* [[kinesis4cats.localstack.LocalstackConfig LocalstackConfig]]
* @param F
* F with an [[cats.effect.Async Async]] instance
* @param LE
* [[kinesis4cats.client.KinesisClient.LogEncoders LogEncoders]]
* @return
* F of [[kinesis4cats.client.KinesisClient KinesisClient]]
*/
def client[F[_]](
config: LocalstackConfig
)(implicit F: Async[F], LE: KinesisClient.LogEncoders): F[KinesisClient[F]] =
for {
underlying <- AwsClients.kinesisClient(config)
logger <- Slf4jLogger.create[F]
} yield new KinesisClient(underlying, logger)

/** Builds a [[kinesis4cats.client.KinesisClient KinesisClient]] that is
* compliant for Localstack usage.
*
* @param prefix
* Optional prefix for parsing configuration. Default to None
* @param F
* F with an [[cats.effect.Async Async]] instance
* @param LE
* [[kinesis4cats.client.KinesisClient.LogEncoders LogEncoders]]
* @return
* F of [[kinesis4cats.client.KinesisClient KinesisClient]]
*/
def client[F[_]](
prefix: Option[String] = None
)(implicit F: Async[F], LE: KinesisClient.LogEncoders): F[KinesisClient[F]] =
for {
underlying <- AwsClients.kinesisClient(prefix)
logger <- Slf4jLogger.create[F]
} yield new KinesisClient(underlying, logger)

/** Builds a [[kinesis4cats.client.KinesisClient KinesisClient]] that is
* compliant for Localstack usage. Lifecycle is managed as a
* [[cats.effect.Resource Resource]].
Expand All @@ -89,8 +50,11 @@ object LocalstackKinesisClient {
def clientResource[F[_]](config: LocalstackConfig)(implicit
F: Async[F],
LE: KinesisClient.LogEncoders
): Resource[F, KinesisClient[F]] =
client[F](config).toResource
): Resource[F, KinesisClient[F]] = for {
underlying <- AwsClients.kinesisClient(config).toResource
logger <- Slf4jLogger.create[F].toResource
dispatcher <- Dispatcher.parallel[F]
} yield new KinesisClient(underlying, logger, dispatcher)

/** Builds a [[kinesis4cats.client.KinesisClient KinesisClient]] that is
* compliant for Localstack usage. Lifecycle is managed as a
Expand All @@ -110,7 +74,7 @@ object LocalstackKinesisClient {
F: Async[F],
LE: KinesisClient.LogEncoders
): Resource[F, KinesisClient[F]] =
client[F](prefix).toResource
LocalstackConfig.resource(prefix).flatMap(clientResource[F])

/** A resources that does the following:
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,25 @@ object circe {
Json.obj(fields.toSeq: _*)
}

implicit val sdkEventTypeEncoder
: Encoder[kin.SubscribeToShardEventStream.EventType] =
Encoder[String].contramap(_.toString())

implicit val subscribeToShardEventEncoder
: Encoder[kin.SubscribeToShardEvent] = x => {
val fields: Map[String, Json] = Map
.empty[String, Json]
.safeAdd("childShards", x.childShards())
.safeAdd("continuationSequenceNumber", x.continuationSequenceNumber())
.safeAdd("hasChildShards", x.hasChildShards())
.safeAdd("hasRecords", x.hasRecords())
.safeAdd("millisBehindLatest", x.millisBehindLatest())
.safeAdd("records", x.records())
.safeAdd("sdkEventType", x.sdkEventType())

Json.obj(fields.toSeq: _*)
}

implicit val kinesisClientLogEncoders: KinesisClient.LogEncoders =
new KinesisClient.LogEncoders()

Expand Down
Loading

0 comments on commit 1851288

Please sign in to comment.