Skip to content

Commit

Permalink
Handle responses where all records were invalid (#392)
Browse files Browse the repository at this point in the history
Co-authored-by: Eric Meisel <[email protected]>
  • Loading branch information
theon and etspaceman authored Oct 11, 2024
1 parent 0957fec commit f632c2e
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 2 deletions.
3 changes: 1 addition & 2 deletions shared/src/main/scala/kinesis4cats/producer/Producer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,7 @@ abstract class Producer[F[_], PutReq, PutRes] private[kinesis4cats] (
)
finalRes <- retryingOnFailuresAndAllErrors(
config.retryPolicy,
(x: Producer.Result[PutRes]) =>
F.pure(x.isSuccessful || (x.isPartiallySuccessful && !x.hasFailed)),
(x: Producer.Result[PutRes]) => F.pure(!x.hasFailed),
(x: Producer.Result[PutRes], details: RetryDetails) =>
for {
failed <- F.fromOption(
Expand Down
22 changes: 22 additions & 0 deletions shared/src/test/scala/kinesis4cats/producer/ProducerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,28 @@ class ProducerSpec extends munit.CatsEffectSuite {
assert(res === expected, s"res: $res\nexp: $expected")
}
}

fixture(false).test("It should not retry when all Results are invalid") {
producer =>
val tooSmallPartitionKey = ""
val record = Record(Array.fill(50)(1), tooSmallPartitionKey)

val data = NonEmptyList.of(record, record, record)

val expected: Producer.Result[MockPutResponse] = Producer.Result(
successful = Nil,
invalid = List(
Producer.InvalidRecord.InvalidPartitionKey(tooSmallPartitionKey),
Producer.InvalidRecord.InvalidPartitionKey(tooSmallPartitionKey),
Producer.InvalidRecord.InvalidPartitionKey(tooSmallPartitionKey)
),
failed = Nil
)

producer.put(data).map { res =>
assert(res === expected, s"res: $res\nexp: $expected")
}
}
}

class MockProducer(
Expand Down

0 comments on commit f632c2e

Please sign in to comment.