Skip to content

Commit

Permalink
Graceful Producer Shutdown (#72)
Browse files Browse the repository at this point in the history
  • Loading branch information
etspaceman authored Feb 28, 2023
1 parent 782d468 commit 0bb546a
Show file tree
Hide file tree
Showing 16 changed files with 69 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package kcl

import scala.concurrent.duration._

import cats.effect.kernel.Deferred
import cats.effect.Deferred
import cats.effect.std.Queue
import cats.effect.{IO, Resource, SyncIO}
import cats.syntax.all._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package fs2
import scala.concurrent.duration._

import _root_.fs2.Stream
import cats.effect.kernel.Deferred
import cats.effect.Deferred
import cats.effect.{IO, Resource, SyncIO}
import cats.syntax.all._
import io.circe.parser._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package kcl.fs2.multistream
import scala.concurrent.duration._

import _root_.fs2.Stream
import cats.effect.kernel.Deferred
import cats.effect.Deferred
import cats.effect.syntax.all._
import cats.effect.{IO, Resource, SyncIO}
import cats.syntax.all._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package multistream

import scala.concurrent.duration._

import cats.effect.kernel.Deferred
import cats.effect.Deferred
import cats.effect.std.Queue
import cats.effect.syntax.all._
import cats.effect.{IO, Resource, SyncIO}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

package kinesis4cats.smithy4s.client

import cats.effect.Async
import cats.effect.IO
import cats.effect.SyncIO
import cats.effect.kernel.Async
import com.amazonaws.kinesis.Kinesis
import org.http4s.blaze.client.BlazeClientBuilder

Expand Down
2 changes: 1 addition & 1 deletion kcl/src/main/scala/kinesis4cats/kcl/KCLConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package kinesis4cats.kcl

import cats.effect.kernel.Deferred
import cats.effect.Deferred
import cats.effect.syntax.all._
import cats.effect.{Async, Ref, Resource}
import cats.syntax.all._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package kinesis4cats.kcl.fs2
import scala.concurrent.duration._

import cats.Parallel
import cats.effect.kernel.Deferred
import cats.effect.Deferred
import cats.effect.std.Queue
import cats.effect.syntax.all._
import cats.effect.{Async, Ref, Resource}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import scala.jdk.CollectionConverters._
import java.time.Instant

import cats.data.NonEmptyList
import cats.effect.Resource
import cats.effect._
import cats.effect.kernel.Resource
import cats.effect.syntax.all._
import cats.syntax.all._
import org.typelevel.log4cats.StructuredLogger
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ package kinesis4cats.client
package producer
package fs2

import cats.effect.Async
import cats.effect.kernel.Resource
import cats.effect.std.Queue
import _root_.fs2.concurrent.Channel
import cats.effect._
import cats.effect.syntax.all._
import org.typelevel.log4cats.StructuredLogger
import org.typelevel.log4cats.slf4j.Slf4jLogger
Expand All @@ -36,9 +35,9 @@ import kinesis4cats.producer.fs2.FS2Producer
*
* @param config
* [[kinesis.producer.fs2.FS2Producer.Config FS2Producer.Config]]
* @param queue
* [[cats.effect.std.Queue Queue]] of
* [[kinesis4cats.producer.Record Records]] to produce.
* @param channel
* [[https://github.com/typelevel/fs2/blob/main/core/shared/src/main/scala/fs2/concurrent/Channel.scala Channel]]
* of [[kinesis4cats.producer.Record Records]] to produce.
* @param underlying
* [[kinesis4cats.smithy4s.client.producer.KinesisProducer KinesisProducer]]
* @param callback:
Expand All @@ -49,7 +48,7 @@ import kinesis4cats.producer.fs2.FS2Producer
final class FS2KinesisProducer[F[_]] private[kinesis4cats] (
override val logger: StructuredLogger[F],
override val config: FS2Producer.Config,
override protected val queue: Queue[F, Option[Record]],
override protected val channel: Channel[F, Record],
override protected val underlying: KinesisProducer[F]
)(
override protected val callback: (
Expand Down Expand Up @@ -101,11 +100,10 @@ object FS2KinesisProducer {
config.producerConfig,
_underlying
)
queue <- Queue.bounded[F, Option[Record]](config.queueSize).toResource
producer = new FS2KinesisProducer[F](logger, config, queue, underlying)(
channel <- Channel.bounded[F, Record](config.queueSize).toResource
producer = new FS2KinesisProducer[F](logger, config, channel, underlying)(
callback
)
_ <- producer.start()
_ <- Resource.onFinalize(producer.stop())
_ <- producer.resource
} yield producer
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.concurrent.duration._

import _root_.ciris._
import cats.effect.Async
import cats.effect.kernel.Resource
import cats.effect.Resource
import cats.syntax.all._
import com.amazonaws.auth.AWSCredentialsProvider
import com.amazonaws.regions.Regions
Expand Down
51 changes: 34 additions & 17 deletions shared/src/main/scala/kinesis4cats/producer/fs2/FS2Producer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@ package fs2

import scala.concurrent.duration._

import _root_.fs2.Stream
import _root_.fs2.concurrent.Channel
import cats.data.Ior
import cats.data.NonEmptyList
import cats.effect.Async
import cats.effect.Resource
import cats.effect.std.Queue
import cats.effect._
import cats.effect.syntax.all._
import cats.syntax.all._
import org.typelevel.log4cats.StructuredLogger
Expand Down Expand Up @@ -53,7 +51,7 @@ abstract class FS2Producer[F[_], PutReq, PutRes](implicit

/** The underlying queue of records to process
*/
protected def queue: Queue[F, Option[Record]]
protected def channel: Channel[F, Record]

/** A user defined function that can be run against the results of a request
*/
Expand All @@ -70,28 +68,45 @@ abstract class FS2Producer[F[_], PutReq, PutRes](implicit
* @return
* F of Unit
*/
def put(record: Record): F[Unit] = queue.offer(Some(record))
def put(record: Record): F[Unit] = {
val ctx = LogContext()

for {
_ <- logger.debug(ctx.context)("Received record to put")
res <- channel.send(record)
_ <- res.bitraverse(
_ =>
logger.warn(ctx.context)(
"Producer has been shut down and will not accept further requests"
),
_ =>
logger.debug(ctx.context)(
"Successfully put record into processing queue"
)
)
} yield ()
}

/** Stop the processing of records
*/
private[kinesis4cats] def stop(): F[Unit] = {
private[kinesis4cats] def stop(f: Fiber[F, Throwable, Unit]): F[Unit] = {
val ctx = LogContext()
for {
_ <- logger.debug(ctx.context)("Stopping the FS2KinesisProducer")
_ <- queue.offer(None)
_ <- channel.close
_ <- f.join.void.timeoutTo(config.gracefulShutdownWait, f.cancel)
} yield ()
}

/** Start the processing of records
*/
private[kinesis4cats] def start(): Resource[F, Unit] = {
private[kinesis4cats] def start(): F[Unit] = {
val ctx = LogContext()

for {
_ <- logger
.debug(ctx.context)("Starting the FS2KinesisProducer")
.toResource
_ <- Stream
.fromQueueNoneTerminated(queue)
_ <- channel.stream
.groupWithin(config.putMaxChunk, config.putMaxWait)
.evalMap { x =>
val c = ctx.addEncoded("batchSize", x.size)
Expand All @@ -116,12 +131,12 @@ abstract class FS2Producer[F[_], PutReq, PutRes](implicit
}
.compile
.drain
.background
.void
} yield ()

}

private[kinesis4cats] def resource: Resource[F, Unit] =
Resource.make(start().start)(stop).void

}

object FS2Producer {
Expand Down Expand Up @@ -149,7 +164,8 @@ object FS2Producer {
putMaxWait: FiniteDuration,
putMaxRetries: Option[Int],
putRetryInterval: FiniteDuration,
producerConfig: Producer.Config
producerConfig: Producer.Config,
gracefulShutdownWait: FiniteDuration
)

object Config {
Expand All @@ -159,7 +175,8 @@ object FS2Producer {
100.millis,
Some(5),
0.seconds,
Producer.Config.default(streamNameOrArn)
Producer.Config.default(streamNameOrArn),
30.seconds
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package localstack
import scala.concurrent.duration._

import cats.effect.Async
import cats.effect.kernel.Resource
import cats.effect.Resource
import cats.effect.syntax.all._
import cats.syntax.all._
import com.amazonaws.kinesis._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ package producer
package fs2
package localstack

import _root_.fs2.concurrent.Channel
import cats.effect._
import cats.effect.std.Queue
import cats.effect.syntax.all._
import com.amazonaws.kinesis.PutRecordsOutput
import org.http4s.client.Client
Expand Down Expand Up @@ -91,8 +91,8 @@ object LocalstackFS2KinesisProducer {
),
loggerF(F)
)
queue <- Queue
.bounded[F, Option[Record]](producerConfig.queueSize)
channel <- Channel
.bounded[F, Record](producerConfig.queueSize)
.toResource
underlying = new KinesisProducer[F](
logger,
Expand All @@ -103,13 +103,12 @@ object LocalstackFS2KinesisProducer {
producer = new FS2KinesisProducer[F](
logger,
producerConfig,
queue,
channel,
underlying
)(
callback
)
_ <- producer.start()
_ <- Resource.onFinalize(producer.stop())
_ <- producer.resource
} yield producer

/** Creates a [[cats.effect.Resource Resource]] of a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package kinesis4cats.smithy4s.client.middleware

import cats.effect.kernel.Async
import cats.effect.Async
import org.http4s.client.Client
import org.typelevel.log4cats.StructuredLogger

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ package producer
import java.time.Instant

import cats.data.NonEmptyList
import cats.effect.Resource
import cats.effect._
import cats.effect.kernel.Resource
import cats.effect.syntax.all._
import cats.syntax.all._
import com.amazonaws.kinesis._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ package kinesis4cats.smithy4s.client
package producer
package fs2

import cats.effect.Async
import cats.effect.kernel.Resource
import cats.effect.std.Queue
import _root_.fs2.concurrent.Channel
import cats.effect._
import cats.effect.syntax.all._
import com.amazonaws.kinesis.PutRecordsInput
import com.amazonaws.kinesis.PutRecordsOutput
Expand All @@ -40,9 +39,9 @@ import kinesis4cats.producer.fs2.FS2Producer
*
* @param config
* [[kinesis.producer.fs2.FS2Producer.Config FS2Producer.Config]]
* @param queue
* [[cats.effect.std.Queue Queue]] of
* [[kinesis4cats.producer.Record Records]] to produce.
* @param channel
* [[https://github.com/typelevel/fs2/blob/main/core/shared/src/main/scala/fs2/concurrent/Channel.scala Channel]]
* of [[kinesis4cats.producer.Record Records]] to produce.
* @param underlying
* [[kinesis4cats.smithy4s.client.producer.KinesisProducer KinesisProducer]]
* @param callback:
Expand All @@ -55,7 +54,7 @@ import kinesis4cats.producer.fs2.FS2Producer
final class FS2KinesisProducer[F[_]] private[kinesis4cats] (
override val logger: StructuredLogger[F],
override val config: FS2Producer.Config,
override protected val queue: Queue[F, Option[Record]],
override protected val channel: Channel[F, Record],
override protected val underlying: KinesisProducer[F]
)(
override protected val callback: (
Expand Down Expand Up @@ -131,11 +130,10 @@ object FS2KinesisProducer {
loggerF,
credsF
)
queue <- Queue.bounded[F, Option[Record]](config.queueSize).toResource
producer = new FS2KinesisProducer[F](logger, config, queue, underlying)(
channel <- Channel.bounded[F, Record](config.queueSize).toResource
producer = new FS2KinesisProducer[F](logger, config, channel, underlying)(
callback
)
_ <- producer.start()
_ <- Resource.onFinalize(producer.stop())
_ <- producer.resource
} yield producer
}

0 comments on commit 0bb546a

Please sign in to comment.