Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Raise when any records are invalid #394

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion shared/src/main/scala/kinesis4cats/producer/Producer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ abstract class Producer[F[_], PutReq, PutRes] private[kinesis4cats] (
)
)(ref.get.flatMap(x => _put(x.inputRecords, x.retrying)))
_ <-
if (finalRes.hasFailed) {
if (finalRes.hasErrors) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a change in behaviour. Before, you would only see invalid records if there was at least one failed record also. Does this change align with the documentation?

A user can configure the producer to raise an exception if any of the error paths are detected (including partially failed records).

* @param raiseOnFailures
* If true, an exception will be raised if a
* [[kinesis4cats.producer.Producer.Error Producer.Error]] is detected in
* one of the batches

if (config.raiseOnFailures) {
finalRes.error.traverse(F.raiseError[Unit]).void
} else {
Expand Down Expand Up @@ -443,6 +443,9 @@ object Producer {
}

object Error {
implicit val producerErrorEq: Eq[Error] =
Eq.by(x => (x.invalid, x.failed))

private def invalidRecordsMessage(
records: List[InvalidRecord]
): String = {
Expand Down
161 changes: 96 additions & 65 deletions shared/src/test/scala/kinesis4cats/producer/ProducerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,64 +33,68 @@ import kinesis4cats.models.ShardId
import kinesis4cats.models.StreamNameOrArn

class ProducerSpec extends munit.CatsEffectSuite {
def fixture(aggregate: Boolean): SyncIO[FunFixture[MockProducer]] =
def fixture(
aggregate: Boolean,
raiseOnFailures: Boolean
): SyncIO[FunFixture[MockProducer]] =
ResourceFunFixture(
MockProducer(aggregate)
MockProducer(aggregate, raiseOnFailures)
)

fixture(false).test("It should retry methods and eventually produce") {
producer =>
val record1 = Record(Array.fill(50)(1), "1")
val record2 = Record(Array.fill(50)(1), "2")
val record3 = Record(Array.fill(50)(1), "3")
val record4 = Record(Array.fill(50)(1), "4")
val record5 = Record(Array.fill(50)(1), "5")

val data = NonEmptyList.of(
record1,
record2,
record3,
record4,
record5
)
fixture(aggregate = false, raiseOnFailures = true).test(
"It should retry methods and eventually produce"
) { producer =>
val record1 = Record(Array.fill(50)(1), "1")
val record2 = Record(Array.fill(50)(1), "2")
val record3 = Record(Array.fill(50)(1), "3")
val record4 = Record(Array.fill(50)(1), "4")
val record5 = Record(Array.fill(50)(1), "5")

val response1 = MockPutResponse(
NonEmptyList.one(record1),
List(record2, record3, record4, record5)
)
val data = NonEmptyList.of(
record1,
record2,
record3,
record4,
record5
)

val response2 = MockPutResponse(
NonEmptyList.one(record2),
List(record3, record4, record5)
)
val response1 = MockPutResponse(
NonEmptyList.one(record1),
List(record2, record3, record4, record5)
)

val response3 = MockPutResponse(
NonEmptyList.one(record3),
List(record4, record5)
)
val response2 = MockPutResponse(
NonEmptyList.one(record2),
List(record3, record4, record5)
)

val response4 = MockPutResponse(
NonEmptyList.one(record4),
List(record5)
)
val response3 = MockPutResponse(
NonEmptyList.one(record3),
List(record4, record5)
)

val response5 = MockPutResponse(
NonEmptyList.one(record5),
List.empty
)
val response4 = MockPutResponse(
NonEmptyList.one(record4),
List(record5)
)

val expected: Producer.Result[MockPutResponse] = Producer.Result(
List(response1, response2, response3, response4, response5),
Nil,
Nil
)
val response5 = MockPutResponse(
NonEmptyList.one(record5),
List.empty
)

producer.put(data).map { res =>
assert(res === expected, s"res: $res\nexp: $expected")
}
val expected: Producer.Result[MockPutResponse] = Producer.Result(
List(response1, response2, response3, response4, response5),
Nil,
Nil
)

producer.put(data).map { res =>
assert(res === expected, s"res: $res\nexp: $expected")
}
}

fixture(true).test(
fixture(aggregate = true, raiseOnFailures = true).test(
"It should retry methods and eventually produce when aggregated"
) { producer =>
val record1 = Record("record 1".getBytes(), "1")
Expand Down Expand Up @@ -143,26 +147,50 @@ class ProducerSpec extends munit.CatsEffectSuite {
}
}

fixture(false).test("It should not retry when all Results are invalid") {
producer =>
val tooSmallPartitionKey = ""
val record = Record(Array.fill(50)(1), tooSmallPartitionKey)
fixture(aggregate = false, raiseOnFailures = false).test(
"It should not retry when all Results are invalid"
) { producer =>
val tooSmallPartitionKey = ""
val record = Record(Array.fill(50)(1), tooSmallPartitionKey)

val data = NonEmptyList.of(record, record, record)
val data = NonEmptyList.of(record, record, record)

val expected: Producer.Result[MockPutResponse] = Producer.Result(
successful = Nil,
invalid = List(
Producer.InvalidRecord.InvalidPartitionKey(tooSmallPartitionKey),
Producer.InvalidRecord.InvalidPartitionKey(tooSmallPartitionKey),
Producer.InvalidRecord.InvalidPartitionKey(tooSmallPartitionKey)
),
failed = Nil
)
val expected: Producer.Result[MockPutResponse] = Producer.Result(
successful = Nil,
invalid = List(
Producer.InvalidRecord.InvalidPartitionKey(tooSmallPartitionKey),
Producer.InvalidRecord.InvalidPartitionKey(tooSmallPartitionKey),
Producer.InvalidRecord.InvalidPartitionKey(tooSmallPartitionKey)
),
failed = Nil
)

producer.put(data).map { res =>
assert(res === expected, s"res: $res\nexp: $expected")
}
}

producer.put(data).map { res =>
assert(res === expected, s"res: $res\nexp: $expected")
}
fixture(aggregate = false, raiseOnFailures = true).test(
"It should raise and error if there is only invalid records and 'raiseOnFailures' = true"
) { producer =>
val tooSmallPartitionKey = ""
val record = Record(Array.fill(50)(1), tooSmallPartitionKey)

val data = NonEmptyList.of(record, record, record)

val expected: Producer.Error = Producer.Error(
invalid = List(
Producer.InvalidRecord.InvalidPartitionKey(tooSmallPartitionKey),
Producer.InvalidRecord.InvalidPartitionKey(tooSmallPartitionKey),
Producer.InvalidRecord.InvalidPartitionKey(tooSmallPartitionKey)
),
failed = List.empty
)

producer.put(data).attempt.map {
case Left(err: Producer.Error) => assert(err === expected)
case other => fail(s"Expected Producer.Error but got: $other")
}
}
}

Expand Down Expand Up @@ -211,7 +239,10 @@ class MockProducer(
}

object MockProducer {
def apply(aggregate: Boolean): Resource[IO, MockProducer] = for {
def apply(
aggregate: Boolean,
raiseOnFailures: Boolean
): Resource[IO, MockProducer] = for {
logger <- Resource.pure(NoOpLogger[IO])
shardMapCache <- ShardMapCache.Builder
.default[IO](
Expand All @@ -235,7 +266,7 @@ object MockProducer {
.default[IO](StreamNameOrArn.Name("foo"))
.copy(
retryPolicy = RetryPolicies.limitRetries[IO](5),
raiseOnFailures = true
raiseOnFailures = raiseOnFailures
)
} yield new MockProducer(
logger,
Expand Down