diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index dcb5d202..96efd735 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -80,6 +80,8 @@ jobs: - name: Docker Compose Up if: matrix.java == 'temurin@17' && matrix.os == 'ubuntu-latest' + env: + GITHUB_API_TOKEN: ${{ secrets.GITHUB_TOKEN }} uses: nick-fields/retry@v2 with: timeout_minutes: 15 @@ -94,6 +96,8 @@ jobs: - name: Link Native if: matrix.java == 'temurin@17' && matrix.os == 'ubuntu-latest' && startsWith(matrix.project, 'root-native') + env: + GITHUB_API_TOKEN: ${{ secrets.GITHUB_TOKEN }} uses: nick-fields/retry@v2 with: timeout_minutes: 25 @@ -123,11 +127,11 @@ jobs: - name: Make target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') - run: mkdir -p root-js-3/target shared-ciris/target/jvm-3 root-jvm-213/target shared-localstack/target/js-2.12 smithy4s-client-logging-circe/target/js-2.13 shared-ciris/target/js-2.12 shared/target/native-2.13 aws-v1-localstack/target/jvm-2.12 compat/target/js-2.12 root-native-213/target shared-circe/target/native-2.13 kpl/target/jvm-3 shared-circe/target/js-2.12 target shared/target/jvm-2.13 shared-localstack/target/js-2.13 kpl/target/jvm-2.12 kcl-ciris/target/jvm-3 kpl-ciris/target/jvm-2.13 kpl-ciris/target/jvm-2.12 shared-circe/target/js-3 smithy4s-client-localstack/target/jvm-3 smithy4s-client-localstack/target/js-3 smithy4s-client/target/jvm-3 shared-circe/target/js-2.13 unidocs/target/jvm-2.13 compat/target/native-2.12 smithy4s-client-logging-circe/target/jvm-2.13 kcl-logging-circe/target/jvm-3 kinesis-client-logging-circe/target/jvm-3 aws-v1-localstack/target/jvm-3 shared/target/js-2.13 site/target/jvm-2.13 kinesis-client-localstack/target/jvm-3 shared/target/native-2.12 shared/target/jvm-3 kcl-localstack/target/jvm-2.13 kcl/target/jvm-3 kcl/target/jvm-2.12 root-js-212/target smithy4s-client-transformers/target/jvm-2.12 shared-ciris/target/jvm-2.12 root-jvm-3/target shared-localstack/target/native-2.13 shared-localstack/target/jvm-2.12 kcl-localstack/target/jvm-2.12 shared-ciris/target/js-3 kpl/target/jvm-2.13 smithy4s-client-localstack/target/native-3 smithy4s-client/target/js-2.13 shared-circe/target/jvm-2.12 root-native-3/target kcl-logging-circe/target/jvm-2.13 shared/target/native-3 root-js-213/target smithy4s-client-logging-circe/target/js-3 kcl-ciris/target/jvm-2.13 kinesis-client-logging-circe/target/jvm-2.13 smithy4s-client/target/js-3 compat/target/jvm-3 kinesis-client-logging-circe/target/jvm-2.12 shared-circe/target/jvm-2.13 kinesis-client/target/jvm-2.12 kpl-logging-circe/target/jvm-2.13 root-jvm-212/target shared-ciris/target/native-3 shared-localstack/target/native-3 aws-v2-localstack/target/jvm-3 compat/target/js-3 compat/target/js-2.13 kcl-http4s/target/jvm-2.12 aws-v2-localstack/target/jvm-2.12 root-native-212/target kcl-logging-circe/target/jvm-2.12 kinesis-client/target/jvm-2.13 shared/target/js-2.12 kcl/target/jvm-2.13 kpl-localstack/target/jvm-3 kpl-localstack/target/jvm-2.13 kcl-ciris/target/jvm-2.12 shared-circe/target/native-2.12 kcl-localstack/target/jvm-3 kinesis-client/target/jvm-3 shared-localstack/target/jvm-2.13 aws-v1-localstack/target/jvm-2.13 shared-ciris/target/js-2.13 kinesis-client-localstack/target/jvm-2.13 smithy4s-client/target/native-3 kpl-logging-circe/target/jvm-3 smithy4s-client-localstack/target/jvm-2.13 aws-v2-localstack/target/jvm-2.13 shared-localstack/target/js-3 compat/target/native-2.13 compat/target/native-3 kpl-logging-circe/target/jvm-2.12 integration-tests/target/js-3 shared/target/js-3 smithy4s-client-logging-circe/target/native-3 shared/target/jvm-2.12 integration-tests/target/jvm-3 integration-tests/target/jvm-2.13 smithy4s-client-localstack/target/js-2.13 shared-circe/target/jvm-3 shared-ciris/target/jvm-2.13 shared-localstack/target/jvm-3 compat/target/jvm-2.12 kpl-ciris/target/jvm-3 smithy4s-client/target/jvm-2.13 kpl-localstack/target/jvm-2.12 smithy4s-client-logging-circe/target/jvm-3 integration-tests/target/js-2.13 kcl-http4s/target/jvm-2.13 shared-circe/target/native-3 shared-ciris/target/native-2.13 kinesis-client-localstack/target/jvm-2.12 kcl-http4s/target/jvm-3 compat/target/jvm-2.13 integration-tests/target/native-3 project/target + run: mkdir -p root-js-3/target shared-ciris/target/jvm-3 root-jvm-213/target shared-localstack/target/js-2.12 smithy4s-client-logging-circe/target/js-2.13 shared-ciris/target/js-2.12 shared/target/native-2.13 aws-v1-localstack/target/jvm-2.12 compat/target/js-2.12 root-native-213/target shared-circe/target/native-2.13 kpl/target/jvm-3 shared-circe/target/js-2.12 target shared/target/jvm-2.13 shared-localstack/target/js-2.13 kpl/target/jvm-2.12 kcl-ciris/target/jvm-3 kpl-ciris/target/jvm-2.13 kpl-ciris/target/jvm-2.12 shared-circe/target/js-3 smithy4s-client-localstack/target/jvm-3 smithy4s-client-localstack/target/js-3 smithy4s-client/target/jvm-3 shared-circe/target/js-2.13 unidocs/target/jvm-2.13 compat/target/native-2.12 smithy4s-client-logging-circe/target/jvm-2.13 kcl-logging-circe/target/jvm-3 kinesis-client-logging-circe/target/jvm-3 aws-v1-localstack/target/jvm-3 shared/target/js-2.13 feral/target/jvm-3 site/target/jvm-2.13 kinesis-client-localstack/target/jvm-3 shared/target/native-2.12 shared/target/jvm-3 kcl-localstack/target/jvm-2.13 kcl/target/jvm-3 kcl/target/jvm-2.12 root-js-212/target smithy4s-client-transformers/target/jvm-2.12 shared-ciris/target/jvm-2.12 root-jvm-3/target shared-localstack/target/native-2.13 shared-localstack/target/jvm-2.12 kcl-localstack/target/jvm-2.12 shared-ciris/target/js-3 kpl/target/jvm-2.13 smithy4s-client-localstack/target/native-3 smithy4s-client/target/js-2.13 shared-circe/target/jvm-2.12 root-native-3/target kcl-logging-circe/target/jvm-2.13 shared/target/native-3 root-js-213/target smithy4s-client-logging-circe/target/js-3 kcl-ciris/target/jvm-2.13 kinesis-client-logging-circe/target/jvm-2.13 smithy4s-client/target/js-3 compat/target/jvm-3 kinesis-client-logging-circe/target/jvm-2.12 shared-circe/target/jvm-2.13 kinesis-client/target/jvm-2.12 kpl-logging-circe/target/jvm-2.13 root-jvm-212/target shared-ciris/target/native-3 shared-localstack/target/native-3 aws-v2-localstack/target/jvm-3 compat/target/js-3 compat/target/js-2.13 kcl-http4s/target/jvm-2.12 aws-v2-localstack/target/jvm-2.12 root-native-212/target kcl-logging-circe/target/jvm-2.12 kinesis-client/target/jvm-2.13 shared/target/js-2.12 kcl/target/jvm-2.13 feral/target/jvm-2.13 kpl-localstack/target/jvm-3 kpl-localstack/target/jvm-2.13 kcl-ciris/target/jvm-2.12 shared-circe/target/native-2.12 kcl-localstack/target/jvm-3 kinesis-client/target/jvm-3 shared-localstack/target/jvm-2.13 aws-v1-localstack/target/jvm-2.13 shared-ciris/target/js-2.13 kinesis-client-localstack/target/jvm-2.13 smithy4s-client/target/native-3 kpl-logging-circe/target/jvm-3 smithy4s-client-localstack/target/jvm-2.13 aws-v2-localstack/target/jvm-2.13 shared-localstack/target/js-3 compat/target/native-2.13 compat/target/native-3 kpl-logging-circe/target/jvm-2.12 integration-tests/target/js-3 shared/target/js-3 smithy4s-client-logging-circe/target/native-3 shared/target/jvm-2.12 integration-tests/target/jvm-3 integration-tests/target/jvm-2.13 smithy4s-client-localstack/target/js-2.13 shared-circe/target/jvm-3 feral/target/js-2.13 shared-ciris/target/jvm-2.13 shared-localstack/target/jvm-3 compat/target/jvm-2.12 kpl-ciris/target/jvm-3 smithy4s-client/target/jvm-2.13 kpl-localstack/target/jvm-2.12 smithy4s-client-logging-circe/target/jvm-3 integration-tests/target/js-2.13 kcl-http4s/target/jvm-2.13 shared-circe/target/native-3 shared-ciris/target/native-2.13 feral/target/js-3 kinesis-client-localstack/target/jvm-2.12 kcl-http4s/target/jvm-3 compat/target/jvm-2.13 integration-tests/target/native-3 project/target - name: Compress target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') - run: tar cf targets.tar root-js-3/target shared-ciris/target/jvm-3 root-jvm-213/target shared-localstack/target/js-2.12 smithy4s-client-logging-circe/target/js-2.13 shared-ciris/target/js-2.12 shared/target/native-2.13 aws-v1-localstack/target/jvm-2.12 compat/target/js-2.12 root-native-213/target shared-circe/target/native-2.13 kpl/target/jvm-3 shared-circe/target/js-2.12 target shared/target/jvm-2.13 shared-localstack/target/js-2.13 kpl/target/jvm-2.12 kcl-ciris/target/jvm-3 kpl-ciris/target/jvm-2.13 kpl-ciris/target/jvm-2.12 shared-circe/target/js-3 smithy4s-client-localstack/target/jvm-3 smithy4s-client-localstack/target/js-3 smithy4s-client/target/jvm-3 shared-circe/target/js-2.13 unidocs/target/jvm-2.13 compat/target/native-2.12 smithy4s-client-logging-circe/target/jvm-2.13 kcl-logging-circe/target/jvm-3 kinesis-client-logging-circe/target/jvm-3 aws-v1-localstack/target/jvm-3 shared/target/js-2.13 site/target/jvm-2.13 kinesis-client-localstack/target/jvm-3 shared/target/native-2.12 shared/target/jvm-3 kcl-localstack/target/jvm-2.13 kcl/target/jvm-3 kcl/target/jvm-2.12 root-js-212/target smithy4s-client-transformers/target/jvm-2.12 shared-ciris/target/jvm-2.12 root-jvm-3/target shared-localstack/target/native-2.13 shared-localstack/target/jvm-2.12 kcl-localstack/target/jvm-2.12 shared-ciris/target/js-3 kpl/target/jvm-2.13 smithy4s-client-localstack/target/native-3 smithy4s-client/target/js-2.13 shared-circe/target/jvm-2.12 root-native-3/target kcl-logging-circe/target/jvm-2.13 shared/target/native-3 root-js-213/target smithy4s-client-logging-circe/target/js-3 kcl-ciris/target/jvm-2.13 kinesis-client-logging-circe/target/jvm-2.13 smithy4s-client/target/js-3 compat/target/jvm-3 kinesis-client-logging-circe/target/jvm-2.12 shared-circe/target/jvm-2.13 kinesis-client/target/jvm-2.12 kpl-logging-circe/target/jvm-2.13 root-jvm-212/target shared-ciris/target/native-3 shared-localstack/target/native-3 aws-v2-localstack/target/jvm-3 compat/target/js-3 compat/target/js-2.13 kcl-http4s/target/jvm-2.12 aws-v2-localstack/target/jvm-2.12 root-native-212/target kcl-logging-circe/target/jvm-2.12 kinesis-client/target/jvm-2.13 shared/target/js-2.12 kcl/target/jvm-2.13 kpl-localstack/target/jvm-3 kpl-localstack/target/jvm-2.13 kcl-ciris/target/jvm-2.12 shared-circe/target/native-2.12 kcl-localstack/target/jvm-3 kinesis-client/target/jvm-3 shared-localstack/target/jvm-2.13 aws-v1-localstack/target/jvm-2.13 shared-ciris/target/js-2.13 kinesis-client-localstack/target/jvm-2.13 smithy4s-client/target/native-3 kpl-logging-circe/target/jvm-3 smithy4s-client-localstack/target/jvm-2.13 aws-v2-localstack/target/jvm-2.13 shared-localstack/target/js-3 compat/target/native-2.13 compat/target/native-3 kpl-logging-circe/target/jvm-2.12 integration-tests/target/js-3 shared/target/js-3 smithy4s-client-logging-circe/target/native-3 shared/target/jvm-2.12 integration-tests/target/jvm-3 integration-tests/target/jvm-2.13 smithy4s-client-localstack/target/js-2.13 shared-circe/target/jvm-3 shared-ciris/target/jvm-2.13 shared-localstack/target/jvm-3 compat/target/jvm-2.12 kpl-ciris/target/jvm-3 smithy4s-client/target/jvm-2.13 kpl-localstack/target/jvm-2.12 smithy4s-client-logging-circe/target/jvm-3 integration-tests/target/js-2.13 kcl-http4s/target/jvm-2.13 shared-circe/target/native-3 shared-ciris/target/native-2.13 kinesis-client-localstack/target/jvm-2.12 kcl-http4s/target/jvm-3 compat/target/jvm-2.13 integration-tests/target/native-3 project/target + run: tar cf targets.tar root-js-3/target shared-ciris/target/jvm-3 root-jvm-213/target shared-localstack/target/js-2.12 smithy4s-client-logging-circe/target/js-2.13 shared-ciris/target/js-2.12 shared/target/native-2.13 aws-v1-localstack/target/jvm-2.12 compat/target/js-2.12 root-native-213/target shared-circe/target/native-2.13 kpl/target/jvm-3 shared-circe/target/js-2.12 target shared/target/jvm-2.13 shared-localstack/target/js-2.13 kpl/target/jvm-2.12 kcl-ciris/target/jvm-3 kpl-ciris/target/jvm-2.13 kpl-ciris/target/jvm-2.12 shared-circe/target/js-3 smithy4s-client-localstack/target/jvm-3 smithy4s-client-localstack/target/js-3 smithy4s-client/target/jvm-3 shared-circe/target/js-2.13 unidocs/target/jvm-2.13 compat/target/native-2.12 smithy4s-client-logging-circe/target/jvm-2.13 kcl-logging-circe/target/jvm-3 kinesis-client-logging-circe/target/jvm-3 aws-v1-localstack/target/jvm-3 shared/target/js-2.13 feral/target/jvm-3 site/target/jvm-2.13 kinesis-client-localstack/target/jvm-3 shared/target/native-2.12 shared/target/jvm-3 kcl-localstack/target/jvm-2.13 kcl/target/jvm-3 kcl/target/jvm-2.12 root-js-212/target smithy4s-client-transformers/target/jvm-2.12 shared-ciris/target/jvm-2.12 root-jvm-3/target shared-localstack/target/native-2.13 shared-localstack/target/jvm-2.12 kcl-localstack/target/jvm-2.12 shared-ciris/target/js-3 kpl/target/jvm-2.13 smithy4s-client-localstack/target/native-3 smithy4s-client/target/js-2.13 shared-circe/target/jvm-2.12 root-native-3/target kcl-logging-circe/target/jvm-2.13 shared/target/native-3 root-js-213/target smithy4s-client-logging-circe/target/js-3 kcl-ciris/target/jvm-2.13 kinesis-client-logging-circe/target/jvm-2.13 smithy4s-client/target/js-3 compat/target/jvm-3 kinesis-client-logging-circe/target/jvm-2.12 shared-circe/target/jvm-2.13 kinesis-client/target/jvm-2.12 kpl-logging-circe/target/jvm-2.13 root-jvm-212/target shared-ciris/target/native-3 shared-localstack/target/native-3 aws-v2-localstack/target/jvm-3 compat/target/js-3 compat/target/js-2.13 kcl-http4s/target/jvm-2.12 aws-v2-localstack/target/jvm-2.12 root-native-212/target kcl-logging-circe/target/jvm-2.12 kinesis-client/target/jvm-2.13 shared/target/js-2.12 kcl/target/jvm-2.13 feral/target/jvm-2.13 kpl-localstack/target/jvm-3 kpl-localstack/target/jvm-2.13 kcl-ciris/target/jvm-2.12 shared-circe/target/native-2.12 kcl-localstack/target/jvm-3 kinesis-client/target/jvm-3 shared-localstack/target/jvm-2.13 aws-v1-localstack/target/jvm-2.13 shared-ciris/target/js-2.13 kinesis-client-localstack/target/jvm-2.13 smithy4s-client/target/native-3 kpl-logging-circe/target/jvm-3 smithy4s-client-localstack/target/jvm-2.13 aws-v2-localstack/target/jvm-2.13 shared-localstack/target/js-3 compat/target/native-2.13 compat/target/native-3 kpl-logging-circe/target/jvm-2.12 integration-tests/target/js-3 shared/target/js-3 smithy4s-client-logging-circe/target/native-3 shared/target/jvm-2.12 integration-tests/target/jvm-3 integration-tests/target/jvm-2.13 smithy4s-client-localstack/target/js-2.13 shared-circe/target/jvm-3 feral/target/js-2.13 shared-ciris/target/jvm-2.13 shared-localstack/target/jvm-3 compat/target/jvm-2.12 kpl-ciris/target/jvm-3 smithy4s-client/target/jvm-2.13 kpl-localstack/target/jvm-2.12 smithy4s-client-logging-circe/target/jvm-3 integration-tests/target/js-2.13 kcl-http4s/target/jvm-2.13 shared-circe/target/native-3 shared-ciris/target/native-2.13 feral/target/js-3 kinesis-client-localstack/target/jvm-2.12 kcl-http4s/target/jvm-3 compat/target/jvm-2.13 integration-tests/target/native-3 project/target - name: Upload target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') diff --git a/.mergify.yml b/.mergify.yml index da05b3c4..606bc6ff 100644 --- a/.mergify.yml +++ b/.mergify.yml @@ -150,6 +150,38 @@ pull_request_rules: add: - docs remove: [] +- name: Label feral PRs + conditions: + - files~=^feral/ + actions: + label: + add: + - feral + remove: [] +- name: Label feral3 PRs + conditions: + - files~=^.sbt/matrix/feral3/ + actions: + label: + add: + - feral3 + remove: [] +- name: Label feralJS PRs + conditions: + - files~=^.sbt/matrix/feralJS/ + actions: + label: + add: + - feralJS + remove: [] +- name: Label feralJS3 PRs + conditions: + - files~=^.sbt/matrix/feralJS3/ + actions: + label: + add: + - feralJS3 + remove: [] - name: Label integration-tests PRs conditions: - files~=^integration-tests/ diff --git a/build.sbt b/build.sbt index e823fecf..8f973039 100644 --- a/build.sbt +++ b/build.sbt @@ -66,7 +66,8 @@ lazy val `shared-circe` = projectMatrix description := "Common shared utilities for Circe", libraryDependencies ++= Seq( Circe.core.value, - Circe.parser.value + Circe.parser.value, + Circe.scodec.value ) ) .jvmPlatform(allScalaVersions) @@ -362,6 +363,19 @@ lazy val integrationTestsJvmSettings: Seq[Setting[_]] = Seq( tlJdkRelease := Some(11) ) +lazy val feral = projectMatrix + .settings( + description := "Interfaces for constructing AWS Lambda functions via Feral", + libraryDependencies ++= Seq( + Circe.core.value, + Circe.scodec.value, + Feral.lambda.value + ) + ) + .jvmPlatform(last2ScalaVersions) + .jsPlatform(last2ScalaVersions) + .dependsOn(`shared-circe`) + lazy val integrationTestsJvmDependencies = List( `kcl-http4s`, `kcl-localstack` @@ -459,7 +473,8 @@ lazy val docs = projectMatrix ), "circe" -> url("https://circe.github.io/circe/"), "ciris" -> url("https://cir.is/"), - "localstack" -> url("https://localstack.cloud/") + "localstack" -> url("https://localstack.cloud/"), + "feral" -> url("https://github.com/typelevel/feral") ), laikaConfig := LaikaConfig.defaults.withConfigValue( LinkConfig(sourceLinks = @@ -495,7 +510,8 @@ lazy val docs = projectMatrix `kinesis-client-localstack`, `smithy4s-client`, `smithy4s-client-logging-circe`, - `smithy4s-client-localstack` + `smithy4s-client-localstack`, + feral ) lazy val unidocs = projectMatrix @@ -526,7 +542,8 @@ lazy val unidocs = projectMatrix `kinesis-client-localstack`, `smithy4s-client`, `smithy4s-client-logging-circe`, - `smithy4s-client-localstack` + `smithy4s-client-localstack`, + feral ).map(_.jvm(Scala213).project): _* ) ) @@ -555,6 +572,7 @@ lazy val allProjects = Seq( `smithy4s-client`, `smithy4s-client-logging-circe`, `smithy4s-client-localstack`, + feral, `integration-tests`, unidocs ) diff --git a/docs/lambda/directory.conf b/docs/lambda/directory.conf new file mode 100644 index 00000000..70d928f2 --- /dev/null +++ b/docs/lambda/directory.conf @@ -0,0 +1,4 @@ +laika.title = Feral +laika.navigationOrder = [ + getting-started.md +] diff --git a/docs/lambda/getting-started.md b/docs/lambda/getting-started.md new file mode 100644 index 00000000..2d2848ac --- /dev/null +++ b/docs/lambda/getting-started.md @@ -0,0 +1,30 @@ +# Feral + +AWS offers the ability to consume Kinesis events via an [AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html). + +kinesis4cats offers an @:source(shared.src.main.scala.kinesis4cats.consumer.lambda.KinesisStreamRecordPayload) that maps to the event structure received in the lambda. This can be used in a [Feral](https://github.com/typelevel/feral) instance. + +Feral does offer a similar event, in which you can see an example usage [here](https://github.com/typelevel/feral/blob/main/examples/src/main/scala/feral/examples/KinesisLambda.scala). However, one common problem that users have with Kinesis Lambda functions is that they do not perform [record deaggregation](https://github.com/awslabs/kinesis-aggregation#deaggregation). kinesis4cats offers a `deaggregate` function in its `KinesisStreamEvent` class, which users can leverage to resolve this problem. + +## Installation + +```scala +libraryDependencies += "io.github.etspaceman" %% "kinesis4cats-feral" % "@VERSION@" +``` + +## Usage + +```scala mdoc:compile-only +import cats.effect._ +import feral.lambda._ + +import kinesis4cats.consumer.feral.KinesisStreamEvent + +object kinesisHandler extends IOLambda.Simple[KinesisStreamEvent, INothing] { + type Init + def apply(event: KinesisStreamEvent, context: Context[IO], init: Init) = for { + records <- IO.fromTry(event.deaggregate) + _ <- IO.println(s"Received event with ${records.size} records").as(None) + } yield None +} +``` diff --git a/feral/src/main/scala/kinesis4cats/consumer/feral/KinesisStreamEvent.scala b/feral/src/main/scala/kinesis4cats/consumer/feral/KinesisStreamEvent.scala new file mode 100644 index 00000000..21e04fe5 --- /dev/null +++ b/feral/src/main/scala/kinesis4cats/consumer/feral/KinesisStreamEvent.scala @@ -0,0 +1,112 @@ +/* + * Copyright 2023-2023 etspaceman + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kinesis4cats.consumer +package feral + +import scala.util.Try + +import java.time.Instant + +import _root_.feral.lambda.KernelSource +import io.circe.Decoder +import io.circe.scodec._ +import scodec.bits.ByteVector + +import kinesis4cats.models.EncryptionType +import kinesis4cats.models.instances.circe._ + +final case class KinesisStreamRecordPayload( + approximateArrivalTimestamp: Instant, + data: ByteVector, + kinesisSchemaVersion: String, + partitionKey: String, + encryptionType: Option[EncryptionType], + sequenceNumber: String +) { + def asRecord = Record( + sequenceNumber, + approximateArrivalTimestamp, + data, + partitionKey, + encryptionType, + None, + None + ) +} + +object KinesisStreamRecordPayload { + implicit private val instantCirceDecoder: Decoder[Instant] = + Decoder.decodeBigDecimal.emapTry { millis => + def round(x: BigDecimal) = x.setScale(0, BigDecimal.RoundingMode.DOWN) + Try { + val seconds = round(millis / 1000).toLongExact + val nanos = round((millis % 1000) * 1e6).toLongExact + Instant.ofEpochSecond(seconds, nanos) + } + } + + implicit val kinesisStreamRecordPayloadCirceDecoder + : Decoder[KinesisStreamRecordPayload] = + Decoder.forProduct6( + "approximateArrivalTimestamp", + "data", + "kinesisSchemaVersion", + "partitionKey", + "encryptionType", + "sequenceNumber" + )(KinesisStreamRecordPayload.apply) +} + +final case class KinesisStreamRecord( + awsRegion: String, + eventID: String, + eventName: String, + eventSource: String, + eventSourceArn: String, + eventVersion: String, + invokeIdentityArn: String, + kinesis: KinesisStreamRecordPayload +) + +object KinesisStreamRecord { + implicit val kinesisStreamRecordCirceDecoder: Decoder[KinesisStreamRecord] = + Decoder.forProduct8( + "awsRegion", + "eventID", + "eventName", + "eventSource", + "eventSourceARN", + "eventVersion", + "invokeIdentityArn", + "kinesis" + )(KinesisStreamRecord.apply) +} + +final case class KinesisStreamEvent( + records: List[KinesisStreamRecord] +) { + def deaggregate: Try[List[Record]] = + Record.deaggregate(records.map(_.kinesis.asRecord)) +} + +object KinesisStreamEvent { + implicit val kinesisStreamEventCirceDecoder: Decoder[KinesisStreamEvent] = + Decoder.forProduct1("Records")(KinesisStreamEvent.apply) + + implicit def kinesisStreamEventKernelSource + : KernelSource[KinesisStreamEvent] = KernelSource.emptyKernelSource +} diff --git a/project/Kinesis4CatsPlugin.scala b/project/Kinesis4CatsPlugin.scala index 5f0640db..bb9c8e32 100644 --- a/project/Kinesis4CatsPlugin.scala +++ b/project/Kinesis4CatsPlugin.scala @@ -119,7 +119,8 @@ object Kinesis4CatsPlugin extends AutoPlugin { "command" -> "sbt 'project ${{ matrix.project }}' dockerComposeUp", "retry_on" -> "error", "on_retry_command" -> "sbt 'project ${{ matrix.project }}' dockerComposeDown" - ) + ), + env = Map("GITHUB_API_TOKEN" -> "${{ secrets.GITHUB_TOKEN }}") ), WorkflowStep.Sbt( List("Test/fastLinkJS"), @@ -135,7 +136,8 @@ object Kinesis4CatsPlugin extends AutoPlugin { "max_attempts" -> "3", "command" -> "sbt 'project ${{ matrix.project }}' Test/nativeLink", "retry_on" -> "error" - ) + ), + env = Map("GITHUB_API_TOKEN" -> "${{ secrets.GITHUB_TOKEN }}") ), WorkflowStep.Sbt( List( diff --git a/project/LibraryDependencies.scala b/project/LibraryDependencies.scala index 2d87ca69..257cd767 100644 --- a/project/LibraryDependencies.scala +++ b/project/LibraryDependencies.scala @@ -80,6 +80,7 @@ object LibraryDependencies { val circeVersion = "0.14.5" val core = Def.setting("io.circe" %%% "circe-core" % circeVersion) val parser = Def.setting("io.circe" %%% "circe-parser" % circeVersion) + val scodec = Def.setting("io.circe" %%% "circe-scodec" % circeVersion) } object Ciris { @@ -87,6 +88,11 @@ object LibraryDependencies { val core = Def.setting("is.cir" %%% "ciris" % cirisVersion) } + object Feral { + val feralVersion = "0.2.2" + val lambda = Def.setting("org.typelevel" %%% "feral-lambda" % feralVersion) + } + object Http4s { val http4sVersion = "0.23.22" val emberServer = diff --git a/shared-circe/src/main/scala/kinesis4cats/models/instances/circe.scala b/shared-circe/src/main/scala/kinesis4cats/models/instances/circe.scala new file mode 100644 index 00000000..6314caf2 --- /dev/null +++ b/shared-circe/src/main/scala/kinesis4cats/models/instances/circe.scala @@ -0,0 +1,32 @@ +/* + * Copyright 2023-2023 etspaceman + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kinesis4cats.models +package instances + +import io.circe.Decoder + +object circe { + implicit val encryptionTypeCirceDecoder: Decoder[EncryptionType] = + Decoder[String].emap { + case EncryptionType.None.value => Right(EncryptionType.None) + case EncryptionType.Kms.value => Right(EncryptionType.Kms) + case x => + Left( + s"Unexpected Encryption Type value '$x' received. Expected values are [${EncryptionType.None.value}, ${EncryptionType.Kms.value}]" + ) + } +} diff --git a/shared/src/main/scala/kinesis4cats/Aggregation.scala b/shared/src/main/scala/kinesis4cats/Aggregation.scala new file mode 100644 index 00000000..23ae822a --- /dev/null +++ b/shared/src/main/scala/kinesis4cats/Aggregation.scala @@ -0,0 +1,32 @@ +/* + * Copyright 2023-2023 etspaceman + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kinesis4cats + +import cats.syntax.all._ +import scodec.bits.ByteVector + +private[kinesis4cats] object Aggregation { + // From https://github.com/awslabs/amazon-kinesis-producer/blob/master/aggregation-format.md + val magicBytes: Array[Byte] = + Array(0xf3, 0x89, 0x9a, 0xc2).map(_.toByte) + + val magicByteVector: ByteVector = ByteVector(magicBytes) + + val digestSize = 16 // MD5 digest length + + val aggregatedByteSize: Int = magicBytes.length + digestSize +} diff --git a/shared/src/main/scala/kinesis4cats/consumer/Record.scala b/shared/src/main/scala/kinesis4cats/consumer/Record.scala new file mode 100644 index 00000000..5f9b8381 --- /dev/null +++ b/shared/src/main/scala/kinesis4cats/consumer/Record.scala @@ -0,0 +1,87 @@ +/* + * Copyright 2023-2023 etspaceman + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kinesis4cats +package consumer + +import scala.util.Success +import scala.util.Try + +import java.time.Instant + +import cats.data.Chain +import cats.syntax.all._ +import scodec.bits.ByteVector + +import kinesis4cats.models.EncryptionType +import kinesis4cats.protobuf.messages + +final case class Record( + sequenceNumber: String, + approximateArrivalTimestamp: Instant, + data: ByteVector, + partitionKey: String, + encryptionType: Option[EncryptionType], + subSequenceNumber: Option[Long], + explicitHashKey: Option[String] +) { + val isAggregated: Boolean = + if (data.length >= Aggregation.aggregatedByteSize) { + data.startsWith(Aggregation.magicByteVector) + } else false + + private val dataSize = data.length.toInt - Aggregation.aggregatedByteSize + + private val dataArray = { + val arr = new Array[Byte](dataSize) + data.copyToArray(arr, 0, Aggregation.magicBytes.length.toLong, dataSize) + arr + } +} + +object Record { + def deaggregate(records: List[Record]): Try[List[Record]] = + records.flatTraverse { + case record if !record.isAggregated => Success(List(record)) + case record => + Try(messages.AggregatedRecord.parseFrom(record.dataArray)).flatMap { + ar => + val pks = ar.partitionKeyTable.toList + val ehks = ar.explicitHashKeyTable.toList + + Chain + .traverseViaChain(ar.records.toIndexedSeq.zipWithIndex) { + case (r, i) => + Try { + val partitionKey = pks(r.partitionKeyIndex.toInt) + val explicitHashKey = + r.explicitHashKeyIndex.map(x => ehks(x.toInt)) + + Record( + record.sequenceNumber, + record.approximateArrivalTimestamp, + ByteVector(r.data.toByteArray()), + partitionKey, + record.encryptionType, + Some(i.toLong), + explicitHashKey + ) + } + } + .map(_.toList) + } + } +} diff --git a/shared/src/main/scala/kinesis4cats/models/EncryptionType.scala b/shared/src/main/scala/kinesis4cats/models/EncryptionType.scala new file mode 100644 index 00000000..901207c3 --- /dev/null +++ b/shared/src/main/scala/kinesis4cats/models/EncryptionType.scala @@ -0,0 +1,26 @@ +/* + * Copyright 2023-2023 etspaceman + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kinesis4cats.models + +sealed abstract class EncryptionType(val value: String) + extends Product + with Serializable + +object EncryptionType { + case object None extends EncryptionType("NONE") + case object Kms extends EncryptionType("KMS") +} diff --git a/shared/src/main/scala/kinesis4cats/producer/batching/AggregatedBatch.scala b/shared/src/main/scala/kinesis4cats/producer/batching/AggregatedBatch.scala index f8d2de69..d3908b95 100644 --- a/shared/src/main/scala/kinesis4cats/producer/batching/AggregatedBatch.scala +++ b/shared/src/main/scala/kinesis4cats/producer/batching/AggregatedBatch.scala @@ -14,13 +14,13 @@ * limitations under the License. */ -package kinesis4cats.producer +package kinesis4cats +package producer package batching import java.io.ByteArrayOutputStream import cats.data.NonEmptyList -import cats.syntax.all._ import kinesis4cats.Utils import kinesis4cats.models.ShardId @@ -61,9 +61,9 @@ private[kinesis4cats] final case class AggregatedBatch private ( // See https://github.com/awslabs/kinesis-aggregation/blob/2.0.3/java/KinesisAggregatorV2/src/main/java/com/amazonaws/kinesis/agg/AggRecord.java#L127 def getSizeBytes: Int = - AggregatedBatch.magicBytes.length + + Aggregation.magicBytes.length + aggregatedMessageSize + - 16 // MD5 digest length + Aggregation.digestSize // See https://github.com/awslabs/kinesis-aggregation/blob/2.0.3/java/KinesisAggregatorV2/src/main/java/com/amazonaws/kinesis/agg/AggRecord.java#L330 def canAdd(record: Record.WithShard): Boolean = @@ -113,9 +113,9 @@ private[kinesis4cats] final case class AggregatedBatch private ( val baos: ByteArrayOutputStream = new ByteArrayOutputStream(getSizeBytes) baos.write( - AggregatedBatch.magicBytes, + Aggregation.magicBytes, 0, - AggregatedBatch.magicBytes.length + Aggregation.magicBytes.length ) baos.write(messageBody, 0, messageBody.length) baos.write(messageDigest, 0, messageDigest.length) @@ -148,8 +148,4 @@ private[kinesis4cats] object AggregatedBatch { config ) } - - // From https://github.com/awslabs/amazon-kinesis-producer/blob/master/aggregation-format.md - val magicBytes: Array[Byte] = - Array(0xf3, 0x89, 0x9a, 0xc2).map(_.toByte) } diff --git a/shared/src/test/scala/kinesis4cats/consumer/RecordSpec.scala b/shared/src/test/scala/kinesis4cats/consumer/RecordSpec.scala new file mode 100644 index 00000000..311655a5 --- /dev/null +++ b/shared/src/test/scala/kinesis4cats/consumer/RecordSpec.scala @@ -0,0 +1,65 @@ +/* + * Copyright 2023-2023 etspaceman + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kinesis4cats.consumer + +import java.time.Instant + +import cats.syntax.all._ +import scodec.bits.ByteVector + +import kinesis4cats.producer + +final class RecordSpec extends munit.FunSuite { + val config = producer.batching.Batcher.Config.default + + test("It should be able to deaggregate") { + val data1 = Array.fill[Byte](500)(1) + val partitionKey1 = "foo" + val data2 = Array.fill[Byte](500)(1) + val partitionKey2 = "wazzle" + + val batch = producer.batching.AggregatedBatch + .create( + producer.Record.WithShard + .fromOption(producer.Record(data1, partitionKey1), None), + producer.batching.Batcher.Config.default + ) + .add( + producer.Record.WithShard + .fromOption(producer.Record(data2, partitionKey2), None) + ) + + val testBytes = batch.asBytes + + val record = Record( + "foo", + Instant.now(), + ByteVector(testBytes), + batch.partitionKey, + None, + None, + None + ) + + val res = Record.deaggregate(List(record)).get + + assert(res.head.data.toArray.sameElements(data1)) + assert(res.head.partitionKey === partitionKey1) + assert(res(1).data.toArray.sameElements(data2)) + assert(res(1).partitionKey === partitionKey2) + } +} diff --git a/shared/src/test/scalajvm/kinesis4cats/producer/batching/AggregatedBatchSpec.scala b/shared/src/test/scalajvm/kinesis4cats/producer/batching/AggregatedBatchSpec.scala index 129680ed..eb50a521 100644 --- a/shared/src/test/scalajvm/kinesis4cats/producer/batching/AggregatedBatchSpec.scala +++ b/shared/src/test/scalajvm/kinesis4cats/producer/batching/AggregatedBatchSpec.scala @@ -33,7 +33,7 @@ import kinesis4cats.models.ShardId import kinesis4cats.syntax.bytebuffer._ @SuppressWarnings(Array("scalafix:DisableSyntax.null")) -class AggregatedBatchSpec extends munit.CatsEffectSuite { +class AggregatedBatchSpec extends munit.FunSuite { val config = Batcher.Config.default test("It calculate the record sizes to be the same") { val data1 = Array.fill[Byte](500)(1)