Skip to content

Commit

Permalink
Default Clients for the KCL (#117)
Browse files Browse the repository at this point in the history
Co-authored-by: Eric Meisel <[email protected]>
  • Loading branch information
kubukoz and etspaceman authored May 5, 2023
1 parent 08ccc6e commit 8ea14b6
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 136 deletions.
11 changes: 5 additions & 6 deletions docs/kcl/circe.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@ import kinesis4cats.syntax.bytebuffer._

object MyApp extends ResourceApp.Forever {
override def run(args: List[String]) = for {
consumerBuilder <- KCLConsumer.Builder.default[IO](
new SingleStreamTracker("my-stream"),
"my-app-name",
)
consumer <- consumerBuilder
consumer <- KCLConsumer.Builder.default[IO](
new SingleStreamTracker("my-stream"),
"my-app-name",
)
.withCallback(
(records: List[CommittableRecord[IO]]) =>
(records: List[CommittableRecord[IO]]) =>
records.traverse_(r => IO.println(r.data.asString))
)
.configure(_.withLogEncoders(kclCirceEncoders))
Expand Down
34 changes: 16 additions & 18 deletions docs/kcl/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@ import kinesis4cats.syntax.bytebuffer._

object MyApp extends ResourceApp.Forever {
override def run(args: List[String]) = for {
consumerBuilder <- KCLConsumer.Builder.default[IO](
new SingleStreamTracker("my-stream"),
"my-app-name",
)
consumer <- consumerBuilder.withCallback(
(records: List[CommittableRecord[IO]]) =>
records.traverse_(r => IO.println(r.data.asString))
).build
consumer <- KCLConsumer.Builder.default[IO](
new SingleStreamTracker("my-stream"),
"my-app-name",
).withCallback(
(records: List[CommittableRecord[IO]]) =>
records.traverse_(r => IO.println(r.data.asString))
).build
_ <- consumer.run()
} yield ()
}
Expand All @@ -52,7 +51,7 @@ import kinesis4cats.kcl.multistream._
import kinesis4cats.syntax.bytebuffer._

object MyApp extends ResourceApp.Forever {
override def run(args: List[String]) = {
override def run(args: List[String]) = {
val streamArn1 = StreamArn(AwsRegion.US_EAST_1, "my-stream-1", "123456789012")
val streamArn2 = StreamArn(AwsRegion.US_EAST_1, "my-stream-2", "123456789012")
val position = InitialPositionInStreamExtended
Expand All @@ -63,12 +62,12 @@ object MyApp extends ResourceApp.Forever {
kinesisClient,
Map(streamArn1 -> position, streamArn2 -> position)
).toResource
consumerBuilder <- KCLConsumer.Builder
consumer <- KCLConsumer.Builder
.default[IO](tracker, "my-app-name")
consumer <- consumerBuilder.withCallback(
(records: List[CommittableRecord[IO]]) =>
records.traverse_(r => IO.println(r.data.asString))
).build
.withCallback(
(records: List[CommittableRecord[IO]]) =>
records.traverse_(r => IO.println(r.data.asString))
).build
_ <- consumer.run()
} yield ()
}
Expand All @@ -90,11 +89,10 @@ import kinesis4cats.syntax.bytebuffer._

object MyApp extends ResourceApp.Forever {
override def run(args: List[String]) = for {
consumerBuilder <- KCLConsumerFS2.Builder.default[IO](
new SingleStreamTracker("my-stream"),
consumer <- KCLConsumerFS2.Builder.default[IO](
new SingleStreamTracker("my-stream"),
"my-app-name",
)
consumer <- consumerBuilder.build
).build
_ <- consumer
.stream()
.flatMap(stream =>
Expand Down
15 changes: 7 additions & 8 deletions docs/kcl/http4s.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,13 @@ import kinesis4cats.syntax.bytebuffer._

object MyApp extends ResourceApp.Forever {
override def run(args: List[String]) = for {
consumerBuilder <- KCLConsumer.Builder.default[IO](
new SingleStreamTracker("my-stream"),
"my-app-name",
)
consumer <- consumerBuilder.withCallback(
(records: List[CommittableRecord[IO]]) =>
records.traverse_(r => IO.println(r.data.asString))
).build
consumer <- KCLConsumer.Builder.default[IO](
new SingleStreamTracker("my-stream"),
"my-app-name",
).withCallback(
(records: List[CommittableRecord[IO]]) =>
records.traverse_(r => IO.println(r.data.asString))
).build
_ <- KCLService.server[IO](consumer, port"8080", host"0.0.0.0")
} yield ()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,24 +50,23 @@ object LocalstackKCLConsumerFS2 {
kinesisClient <- AwsClients.kinesisClientResource(localstackConfig)
cloudWatchClient <- AwsClients.cloudwatchClientResource(localstackConfig)
dynamoClient <- AwsClients.dynamoClientResource(localstackConfig)
default <- KCLConsumerFS2.Builder.default(
streamTracker,
appName,
kinesisClient,
dynamoClient,
cloudWatchClient,
false
)
retrievalConfig =
if (streamTracker.isMultiStream()) new PollingConfig(kinesisClient)
else
new PollingConfig(
streamTracker.streamConfigList.get(0).streamIdentifier.streamName,
kinesisClient
)
initial = default
.configure(x =>
x.configureLeaseManagementConfig(_.shardSyncIntervalMillis(1000L))
initial = KCLConsumerFS2.Builder
.default(
streamTracker,
appName
)
.withKinesisClient(kinesisClient)
.withDynamoClient(dynamoClient)
.withCloudWatchClient(cloudWatchClient)
.configure(
_.configureLeaseManagementConfig(_.shardSyncIntervalMillis(1000L))
.configureCoordinatorConfig(_.parentShardPollIntervalMillis(1000L))
.configureRetrievalConfig(
_.retrievalSpecificConfig(retrievalConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,14 @@ object LocalstackKCLConsumer {
kinesisClient <- AwsClients.kinesisClientResource(localstackConfig)
cloudWatchClient <- AwsClients.cloudwatchClientResource(localstackConfig)
dynamoClient <- AwsClients.dynamoClientResource(localstackConfig)
default <- KCLConsumer.Builder.default(
streamTracker,
appName,
kinesisClient,
dynamoClient,
cloudWatchClient,
false
)
default = KCLConsumer.Builder
.default(
streamTracker,
appName
)
.withKinesisClient(kinesisClient)
.withDynamoClient(dynamoClient)
.withCloudWatchClient(cloudWatchClient)
retrievalConfig =
if (streamTracker.isMultiStream()) new PollingConfig(kinesisClient)
else
Expand Down
112 changes: 73 additions & 39 deletions kcl/src/main/scala/kinesis4cats/kcl/KCLConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

package kinesis4cats.kcl

import cats.InvariantMonoidal
import cats.effect.Deferred
import cats.effect.kernel.Sync
import cats.effect.syntax.all._
import cats.effect.{Async, Ref, Resource}
import cats.syntax.all._
Expand Down Expand Up @@ -205,65 +207,97 @@ object KCLConsumer {
)(callback)
}

object BuilderConfig {
private[kinesis4cats] trait Make[F[_]] { self =>
def make(
kinesisClient: KinesisAsyncClient,
dynamoClient: DynamoDbAsyncClient,
cloudWatchClient: CloudWatchAsyncClient,
workerId: String
): BuilderConfig[F]

def andThen(f: BuilderConfig[F] => BuilderConfig[F]): Make[F] =
(k, d, c, wid) => f(self.make(k, d, c, wid))
}
private[kinesis4cats] object Make {
def default[F[_]: InvariantMonoidal](
appName: String,
streamTracker: StreamTracker
): Make[F] =
(kClient, dClient, cClient, workerId) =>
BuilderConfig(
new CheckpointConfig(),
new CoordinatorConfig(appName),
new LeaseManagementConfig(appName, dClient, kClient, workerId),
new LifecycleConfig(),
new MetricsConfig(cClient, appName),
new RetrievalConfig(kClient, streamTracker, appName),
ProcessConfig.default,
RecordProcessor.LogEncoders.show,
(_: List[CommittableRecord[F]]) => InvariantMonoidal[F].unit
)
}
}

final case class Builder[F[_]] private (
config: BuilderConfig[F]
config: BuilderConfig.Make[F],
mkKinesisClient: Resource[F, KinesisAsyncClient],
mkDynamoClient: Resource[F, DynamoDbAsyncClient],
mkCloudWatchClient: Resource[F, CloudWatchAsyncClient],
mkWorkerId: F[String]
)(implicit F: Async[F]) {

def configure(f: BuilderConfig[F] => BuilderConfig[F]): Builder[F] = copy(
config = f(config)
config = config.andThen(f)
)

def withCallback(
callback: List[CommittableRecord[F]] => F[Unit]
): Builder[F] =
copy(config = config.withCallback(callback))
configure(_.withCallback(callback))

def withKinesisClient(client: KinesisAsyncClient): Builder[F] =
copy(mkKinesisClient = Resource.pure(client))

def withDynamoClient(client: DynamoDbAsyncClient): Builder[F] =
copy(mkDynamoClient = Resource.pure(client))

def withCloudWatchClient(client: CloudWatchAsyncClient): Builder[F] =
copy(mkCloudWatchClient = Resource.pure(client))

def withWorkerId(workerId: String): Builder[F] =
copy(mkWorkerId = F.pure(workerId))

def build: Resource[F, KCLConsumer[F]] =
config.build.map(new KCLConsumer[F](_))
(
mkKinesisClient,
mkDynamoClient,
mkCloudWatchClient,
mkWorkerId.toResource
).mapN(config.make).flatMap(_.build).map(new KCLConsumer[F](_))
}

object Builder {

def default[F[_]](
streamTracker: StreamTracker,
appName: String,
kinesisClient: => KinesisAsyncClient =
KinesisAsyncClient.builder().build(),
dynamoClient: => DynamoDbAsyncClient =
DynamoDbAsyncClient.builder().build(),
cloudWatchClient: => CloudWatchAsyncClient =
CloudWatchAsyncClient.builder().build(),
managedClients: Boolean = true
appName: String
)(implicit
F: Async[F]
): Resource[F, Builder[F]] = for {
kClient <-
if (managedClients)
Resource.fromAutoCloseable(
F.delay(kinesisClient)
)
else Resource.pure[F, KinesisAsyncClient](kinesisClient)
dClient <-
if (managedClients) Resource.fromAutoCloseable(F.delay(dynamoClient))
else Resource.pure[F, DynamoDbAsyncClient](dynamoClient)
cClient <-
if (managedClients)
Resource.fromAutoCloseable(F.delay(cloudWatchClient))
else Resource.pure[F, CloudWatchAsyncClient](cloudWatchClient)
workerId = Utils.randomUUIDString
} yield Builder(
BuilderConfig(
new CheckpointConfig(),
new CoordinatorConfig(appName),
new LeaseManagementConfig(appName, dClient, kClient, workerId),
new LifecycleConfig(),
new MetricsConfig(cClient, appName),
new RetrievalConfig(kClient, streamTracker, appName),
ProcessConfig.default,
RecordProcessor.LogEncoders.show,
(_: List[CommittableRecord[F]]) => F.unit
): Builder[F] =
Builder(
config = BuilderConfig.Make.default(appName, streamTracker),
mkKinesisClient = Resource.fromAutoCloseable(
Sync[F].delay(KinesisAsyncClient.create())
),
mkDynamoClient = Resource.fromAutoCloseable(
Sync[F].delay(DynamoDbAsyncClient.create())
),
mkCloudWatchClient = Resource.fromAutoCloseable(
Sync[F].delay(CloudWatchAsyncClient.create())
),
mkWorkerId = F.delay(Utils.randomUUIDString)
)
)

@annotation.unused
private def unapply[F[_]](builder: Builder[F]): Unit = ()
Expand Down
Loading

0 comments on commit 8ea14b6

Please sign in to comment.