Skip to content

Commit

Permalink
Add tryPut to FS2Producer (#167)
Browse files Browse the repository at this point in the history
Co-authored-by: Jakub Kozłowski <[email protected]>
  • Loading branch information
etspaceman and kubukoz authored Jun 26, 2023
1 parent 106915a commit 24e61c7
Showing 1 changed file with 43 additions and 2 deletions.
45 changes: 43 additions & 2 deletions shared/src/main/scala/kinesis4cats/producer/fs2/FS2Producer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ abstract class FS2Producer[F[_], PutReq, PutRes](implicit
* @param record
* [[kinesis4cats.producer.Record Record]]
* @return
* F of F of unit. Inner F represents a `deferred.get` call, which will
* complete when the record has been published.
* F of F of Producer.Result. Inner F represents a `deferred.get` call,
* which will complete when the record has been published.
*/
def put(record: Record): F[F[Producer.Result[PutRes]]] = {
val ctx = LogContext()
Expand All @@ -86,6 +86,47 @@ abstract class FS2Producer[F[_], PutReq, PutRes](implicit
} yield deferred.get.flatten
}

/** Attempts to put a record into the producer's buffer, to be batched and
* produced at a defined interval.
*
* @param record
* [[kinesis4cats.producer.Record Record]]
* @return
* F of Option of F of Producer.Result. Inner F represents a `deferred.get`
* call, which will complete when the record has been published. F[None]
* means the producer queue is full or has been shut down.
*/
def tryPut(record: Record): F[Option[F[Producer.Result[PutRes]]]] = {
val ctx = LogContext()

for {
_ <- logger.debug(ctx.context)("Received record to put")
deferred <- Deferred[F, F[Producer.Result[PutRes]]]
sendRes <- channel.trySend(record -> deferred)
res <- sendRes.fold(
_ =>
logger
.warn(ctx.context)(
"Producer has been shut down and will not accept further requests"
)
.as(none[F[Producer.Result[PutRes]]]),
wasEnqueued =>
if (wasEnqueued)
logger
.debug(ctx.context)(
"Successfully put record into processing queue"
)
.as(deferred.get.flatten.some)
else
logger
.warn(ctx.context)(
"Producer queue is full"
)
.as(none[F[Producer.Result[PutRes]]])
)
} yield res
}

/** Stop the processing of records
*/
private[kinesis4cats] def stop(f: Fiber[F, Throwable, Unit]): F[Unit] = {
Expand Down

0 comments on commit 24e61c7

Please sign in to comment.