diff --git a/shared/src/main/scala/kinesis4cats/producer/Producer.scala b/shared/src/main/scala/kinesis4cats/producer/Producer.scala index 8795596e..b7e9382d 100644 --- a/shared/src/main/scala/kinesis4cats/producer/Producer.scala +++ b/shared/src/main/scala/kinesis4cats/producer/Producer.scala @@ -216,7 +216,7 @@ abstract class Producer[F[_], PutReq, PutRes] private[kinesis4cats] ( ) )(ref.get.flatMap(x => _put(x.inputRecords, x.retrying))) _ <- - if (finalRes.hasFailed) { + if (finalRes.hasErrors) { if (config.raiseOnFailures) { finalRes.error.traverse(F.raiseError[Unit]).void } else { @@ -443,6 +443,9 @@ object Producer { } object Error { + implicit val producerErrorEq: Eq[Error] = + Eq.by(x => (x.invalid, x.failed)) + private def invalidRecordsMessage( records: List[InvalidRecord] ): String = { diff --git a/shared/src/test/scala/kinesis4cats/producer/ProducerSpec.scala b/shared/src/test/scala/kinesis4cats/producer/ProducerSpec.scala index bfb50ae0..8893fa32 100644 --- a/shared/src/test/scala/kinesis4cats/producer/ProducerSpec.scala +++ b/shared/src/test/scala/kinesis4cats/producer/ProducerSpec.scala @@ -33,64 +33,68 @@ import kinesis4cats.models.ShardId import kinesis4cats.models.StreamNameOrArn class ProducerSpec extends munit.CatsEffectSuite { - def fixture(aggregate: Boolean): SyncIO[FunFixture[MockProducer]] = + def fixture( + aggregate: Boolean, + raiseOnFailures: Boolean + ): SyncIO[FunFixture[MockProducer]] = ResourceFunFixture( - MockProducer(aggregate) + MockProducer(aggregate, raiseOnFailures) ) - fixture(false).test("It should retry methods and eventually produce") { - producer => - val record1 = Record(Array.fill(50)(1), "1") - val record2 = Record(Array.fill(50)(1), "2") - val record3 = Record(Array.fill(50)(1), "3") - val record4 = Record(Array.fill(50)(1), "4") - val record5 = Record(Array.fill(50)(1), "5") - - val data = NonEmptyList.of( - record1, - record2, - record3, - record4, - record5 - ) + fixture(aggregate = false, raiseOnFailures = true).test( + "It should retry methods and eventually produce" + ) { producer => + val record1 = Record(Array.fill(50)(1), "1") + val record2 = Record(Array.fill(50)(1), "2") + val record3 = Record(Array.fill(50)(1), "3") + val record4 = Record(Array.fill(50)(1), "4") + val record5 = Record(Array.fill(50)(1), "5") - val response1 = MockPutResponse( - NonEmptyList.one(record1), - List(record2, record3, record4, record5) - ) + val data = NonEmptyList.of( + record1, + record2, + record3, + record4, + record5 + ) - val response2 = MockPutResponse( - NonEmptyList.one(record2), - List(record3, record4, record5) - ) + val response1 = MockPutResponse( + NonEmptyList.one(record1), + List(record2, record3, record4, record5) + ) - val response3 = MockPutResponse( - NonEmptyList.one(record3), - List(record4, record5) - ) + val response2 = MockPutResponse( + NonEmptyList.one(record2), + List(record3, record4, record5) + ) - val response4 = MockPutResponse( - NonEmptyList.one(record4), - List(record5) - ) + val response3 = MockPutResponse( + NonEmptyList.one(record3), + List(record4, record5) + ) - val response5 = MockPutResponse( - NonEmptyList.one(record5), - List.empty - ) + val response4 = MockPutResponse( + NonEmptyList.one(record4), + List(record5) + ) - val expected: Producer.Result[MockPutResponse] = Producer.Result( - List(response1, response2, response3, response4, response5), - Nil, - Nil - ) + val response5 = MockPutResponse( + NonEmptyList.one(record5), + List.empty + ) - producer.put(data).map { res => - assert(res === expected, s"res: $res\nexp: $expected") - } + val expected: Producer.Result[MockPutResponse] = Producer.Result( + List(response1, response2, response3, response4, response5), + Nil, + Nil + ) + + producer.put(data).map { res => + assert(res === expected, s"res: $res\nexp: $expected") + } } - fixture(true).test( + fixture(aggregate = true, raiseOnFailures = true).test( "It should retry methods and eventually produce when aggregated" ) { producer => val record1 = Record("record 1".getBytes(), "1") @@ -143,26 +147,50 @@ class ProducerSpec extends munit.CatsEffectSuite { } } - fixture(false).test("It should not retry when all Results are invalid") { - producer => - val tooSmallPartitionKey = "" - val record = Record(Array.fill(50)(1), tooSmallPartitionKey) + fixture(aggregate = false, raiseOnFailures = false).test( + "It should not retry when all Results are invalid" + ) { producer => + val tooSmallPartitionKey = "" + val record = Record(Array.fill(50)(1), tooSmallPartitionKey) - val data = NonEmptyList.of(record, record, record) + val data = NonEmptyList.of(record, record, record) - val expected: Producer.Result[MockPutResponse] = Producer.Result( - successful = Nil, - invalid = List( - Producer.InvalidRecord.InvalidPartitionKey(tooSmallPartitionKey), - Producer.InvalidRecord.InvalidPartitionKey(tooSmallPartitionKey), - Producer.InvalidRecord.InvalidPartitionKey(tooSmallPartitionKey) - ), - failed = Nil - ) + val expected: Producer.Result[MockPutResponse] = Producer.Result( + successful = Nil, + invalid = List( + Producer.InvalidRecord.InvalidPartitionKey(tooSmallPartitionKey), + Producer.InvalidRecord.InvalidPartitionKey(tooSmallPartitionKey), + Producer.InvalidRecord.InvalidPartitionKey(tooSmallPartitionKey) + ), + failed = Nil + ) + + producer.put(data).map { res => + assert(res === expected, s"res: $res\nexp: $expected") + } + } - producer.put(data).map { res => - assert(res === expected, s"res: $res\nexp: $expected") - } + fixture(aggregate = false, raiseOnFailures = true).test( + "It should raise and error if there is only invalid records and 'raiseOnFailures' = true" + ) { producer => + val tooSmallPartitionKey = "" + val record = Record(Array.fill(50)(1), tooSmallPartitionKey) + + val data = NonEmptyList.of(record, record, record) + + val expected: Producer.Error = Producer.Error( + invalid = List( + Producer.InvalidRecord.InvalidPartitionKey(tooSmallPartitionKey), + Producer.InvalidRecord.InvalidPartitionKey(tooSmallPartitionKey), + Producer.InvalidRecord.InvalidPartitionKey(tooSmallPartitionKey) + ), + failed = List.empty + ) + + producer.put(data).attempt.map { + case Left(err: Producer.Error) => assert(err === expected) + case other => fail(s"Expected Producer.Error but got: $other") + } } } @@ -211,7 +239,10 @@ class MockProducer( } object MockProducer { - def apply(aggregate: Boolean): Resource[IO, MockProducer] = for { + def apply( + aggregate: Boolean, + raiseOnFailures: Boolean + ): Resource[IO, MockProducer] = for { logger <- Resource.pure(NoOpLogger[IO]) shardMapCache <- ShardMapCache.Builder .default[IO]( @@ -235,7 +266,7 @@ object MockProducer { .default[IO](StreamNameOrArn.Name("foo")) .copy( retryPolicy = RetryPolicies.limitRetries[IO](5), - raiseOnFailures = true + raiseOnFailures = raiseOnFailures ) } yield new MockProducer( logger,