From 040bf54ab9d872fcd6cd3119a6d170a71e7cd1e1 Mon Sep 17 00:00:00 2001 From: Jaakko Pallari Date: Mon, 19 Aug 2024 18:14:53 +0300 Subject: [PATCH] Fix approximateArrivalTimestamp decoder (#367) Co-authored-by: Eric Meisel --- .../consumer/feral/KinesisStreamEvent.scala | 8 +- .../feral/KinesisStreamRecordSpec.scala | 73 +++++++++++++++++++ 2 files changed, 78 insertions(+), 3 deletions(-) create mode 100644 feral/src/test/scala/kinesis4cats/consumer/feral/KinesisStreamRecordSpec.scala diff --git a/feral/src/main/scala/kinesis4cats/consumer/feral/KinesisStreamEvent.scala b/feral/src/main/scala/kinesis4cats/consumer/feral/KinesisStreamEvent.scala index 21e04fe5..ef5e7b4e 100644 --- a/feral/src/main/scala/kinesis4cats/consumer/feral/KinesisStreamEvent.scala +++ b/feral/src/main/scala/kinesis4cats/consumer/feral/KinesisStreamEvent.scala @@ -50,11 +50,13 @@ final case class KinesisStreamRecordPayload( object KinesisStreamRecordPayload { implicit private val instantCirceDecoder: Decoder[Instant] = - Decoder.decodeBigDecimal.emapTry { millis => + Decoder.decodeBigDecimal.emapTry { secondsDecimal => def round(x: BigDecimal) = x.setScale(0, BigDecimal.RoundingMode.DOWN) Try { - val seconds = round(millis / 1000).toLongExact - val nanos = round((millis % 1000) * 1e6).toLongExact + val roundedSecondsDecimal = round(secondsDecimal) + val seconds = roundedSecondsDecimal.toLongExact + val nanos = + round((secondsDecimal - roundedSecondsDecimal) * 1e9).toLongExact Instant.ofEpochSecond(seconds, nanos) } } diff --git a/feral/src/test/scala/kinesis4cats/consumer/feral/KinesisStreamRecordSpec.scala b/feral/src/test/scala/kinesis4cats/consumer/feral/KinesisStreamRecordSpec.scala new file mode 100644 index 00000000..9824c9e0 --- /dev/null +++ b/feral/src/test/scala/kinesis4cats/consumer/feral/KinesisStreamRecordSpec.scala @@ -0,0 +1,73 @@ +/* + * Copyright 2023-2023 etspaceman + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kinesis4cats.consumer + +import java.time.Instant + +import io.circe.parser.decode +import scodec.bits.ByteVector + +import kinesis4cats.consumer.feral.KinesisStreamRecord +import kinesis4cats.consumer.feral.KinesisStreamRecordPayload + +class KinesisStreamRecordSpec extends munit.ScalaCheckSuite { + test("It should parse the event correctly from JSON") { + val recordJson = """ + { + "kinesis": { + "kinesisSchemaVersion": "1.0", + "partitionKey": "1", + "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", + "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", + "approximateArrivalTimestamp": 1723739794.987 + }, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", + "awsRegion": "us-east-2", + "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" + } + """ + + val record = KinesisStreamRecord( + awsRegion = "us-east-2", + eventID = + "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", + eventName = "aws:kinesis:record", + eventSource = "aws:kinesis", + eventSourceArn = + "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream", + eventVersion = "1.0", + invokeIdentityArn = "arn:aws:iam::123456789012:role/lambda-role", + kinesis = KinesisStreamRecordPayload( + approximateArrivalTimestamp = + Instant.ofEpochSecond(1723739794L, 987000000L), + data = ByteVector.fromValidBase64("SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg=="), + kinesisSchemaVersion = "1.0", + partitionKey = "1", + encryptionType = None, + sequenceNumber = + "49590338271490256608559692538361571095921575989136588898" + ) + ) + + val parsedRecord = decode[KinesisStreamRecord](recordJson) + assertEquals(parsedRecord, Right(record)) + } +}