From a3648db0d39017b3da41c6105a4f4c75655071fe Mon Sep 17 00:00:00 2001 From: Eric Meisel Date: Wed, 22 Feb 2023 13:39:35 -0600 Subject: [PATCH] FS2 Kinesis Producers (#55) --- .github/workflows/ci.yml | 19 +- .mergify.yml | 24 +++ build.sbt | 35 +++- docs/client/directory.conf | 1 + docs/client/fs2.md | 43 +++++ docs/client/localstack.md | 4 + docs/smithy4s/directory.conf | 1 + docs/smithy4s/fs2.md | 49 +++++ docs/smithy4s/localstack.md | 14 ++ .../kinesis4cats/kcl/fs2/KCLConsumerFS2.scala | 2 +- .../producer/fs2/FS2KinesisProducer.scala | 111 +++++++++++ .../LocalstackFS2KinesisProducer.scala | 119 ++++++++++++ .../LocalstackKinesisProducer.scala | 45 ++++- .../KinesisProducerNoShardMapSpec.scala | 89 +++++++++ .../producer/fs2/FS2KinesisProducerSpec.scala | 74 ++++++++ .../client/producer/KinesisProducer.scala | 4 +- project/Kinesis4CatsPlugin.scala | 9 + .../producer/fs2/FS2Producer.scala | 166 +++++++++++++++++ .../kinesis4cats/producer/ProducerSpec.scala | 2 +- .../producer/fs2/FS2ProducerSpec.scala | 93 ++++++++++ .../kinesis4cats/producer/Producer.scala | 37 ++-- .../kinesis4cats/producer/ShardMapCache.scala | 13 +- .../producer/batching/AggregatedBatch.scala | 16 +- .../producer/batching/Batcher.scala | 9 +- .../batching/AggregatedBatchSpec.scala | 43 +++++ .../producer/fs2/FS2KinesisProducer.scala | 141 ++++++++++++++ .../LocalstackFS2KinesisProducer.scala | 175 ++++++++++++++++++ .../LocalstackKinesisProducer.scala | 36 +++- .../KinesisProducerNoShardMapSpec.scala | 115 ++++++++++++ .../producer/fs2/FS2KinesisProducerSpec.scala | 101 ++++++++++ .../client/middleware/ResponseLogger.scala | 3 +- 31 files changed, 1538 insertions(+), 55 deletions(-) create mode 100644 docs/client/fs2.md create mode 100644 docs/smithy4s/fs2.md create mode 100644 kinesis-client-fs2/src/main/scala/kinesis4cats/client/producer/fs2/FS2KinesisProducer.scala create mode 100644 kinesis-client-localstack/src/main/scala/kinesis4cats/client/producer/fs2/localstack/LocalstackFS2KinesisProducer.scala create mode 100644 kinesis-client-producer-tests/src/it/scala/kinesis4cats/client/producer/KinesisProducerNoShardMapSpec.scala create mode 100644 kinesis-client-producer-tests/src/it/scala/kinesis4cats/client/producer/fs2/FS2KinesisProducerSpec.scala create mode 100644 shared-fs2/src/main/scala/kinesis4cats/producer/fs2/FS2Producer.scala create mode 100644 shared-tests/src/main/scala/kinesis4cats/producer/fs2/FS2ProducerSpec.scala create mode 100644 smithy4s-client-fs2/src/main/scala/kinesis4cats/smithy4s/client/producer/fs2/FS2KinesisProducer.scala create mode 100644 smithy4s-client-localstack/src/main/scala/kinesis4cats/smithy4s/client/producer/fs2/localstack/LocalstackFS2KinesisProducer.scala create mode 100644 smithy4s-client-producer-tests/src/it/scala/kinesis4cats/smithy4s/client/producer/KinesisProducerNoShardMapSpec.scala create mode 100644 smithy4s-client-producer-tests/src/it/scala/kinesis4cats/smithy4s/client/producer/fs2/FS2KinesisProducerSpec.scala diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b3adbbc7..48abdc41 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -66,6 +66,11 @@ jobs: ~/Library/Caches/Coursier/v1 key: ${{ runner.os }}-sbt-cache-v2-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }} + - name: Setup protoc + uses: arduino/setup-protoc@v1 + with: + repo-token: ${{ secrets.GITHUB_TOKEN }} + - name: Check that workflows are up to date run: sbt githubWorkflowCheck @@ -87,11 +92,11 @@ jobs: - name: Make target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') - run: mkdir -p smithy4s-client-tests/target/jvm-2.13 shared-ciris/target/jvm-3 smithy4s-client-producer-tests/target/jvm-2.13 aws-v1-localstack/target/jvm-2.12 kpl/target/jvm-3 target shared/target/jvm-2.13 kpl/target/jvm-2.12 kcl-ciris/target/jvm-3 kpl-ciris/target/jvm-2.13 kinesis-client-tests/target/jvm-2.12 kpl-ciris/target/jvm-2.12 smithy4s-client-localstack/target/jvm-3 smithy4s-client/target/jvm-3 unidocs/target/jvm-2.13 kernel-tests/target/jvm-2.12 smithy4s-client-logging-circe/target/jvm-2.13 kcl-logging-circe/target/jvm-3 .js/target kcl-fs2-ciris/target/jvm-2.12 kinesis-client-logging-circe/target/jvm-3 aws-v1-localstack/target/jvm-3 kcl-tests/target/jvm-2.12 site/target/jvm-2.13 kinesis-client-localstack/target/jvm-3 shared/target/jvm-3 kpl-tests/target/jvm-2.13 kcl-localstack/target/jvm-2.13 kcl-fs2-ciris/target/jvm-3 kcl/target/jvm-3 kcl/target/jvm-2.12 unidocs/target/jvm-2.12 smithy4s-client-transformers/target/jvm-2.12 kpl-tests/target/jvm-3 shared-ciris/target/jvm-2.12 shared-localstack/target/jvm-2.12 kpl-tests/target/jvm-2.12 kcl-localstack/target/jvm-2.12 kpl/target/jvm-2.13 kinesis-client-producer-tests/target/jvm-3 shared-circe/target/jvm-2.12 shared-tests/target/jvm-3 kcl-logging-circe/target/jvm-2.13 kinesis-client-tests/target/jvm-3 kcl-tests/target/jvm-2.13 kcl-ciris/target/jvm-2.13 kinesis-client-logging-circe/target/jvm-2.13 unidocs/target/jvm-3 smithy4s-client-producer-tests/target/jvm-3 compat/target/jvm-3 kinesis-client-logging-circe/target/jvm-2.12 shared-circe/target/jvm-2.13 kinesis-client/target/jvm-2.12 kpl-logging-circe/target/jvm-2.13 kcl-tests/target/jvm-3 aws-v2-localstack/target/jvm-3 kernel-tests/target/jvm-2.13 kcl-http4s/target/jvm-2.12 aws-v2-localstack/target/jvm-2.12 kcl-logging-circe/target/jvm-2.12 kinesis-client/target/jvm-2.13 .jvm/target kcl/target/jvm-2.13 kpl-localstack/target/jvm-3 kpl-localstack/target/jvm-2.13 kcl-ciris/target/jvm-2.12 .native/target kinesis-client-tests/target/jvm-2.13 kcl-fs2/target/jvm-2.13 kcl-localstack/target/jvm-3 kcl-fs2/target/jvm-2.12 kinesis-client/target/jvm-3 shared-localstack/target/jvm-2.13 shared-tests/target/jvm-2.12 aws-v1-localstack/target/jvm-2.13 kinesis-client-producer-tests/target/jvm-2.13 shared-tests/target/jvm-2.13 kinesis-client-localstack/target/jvm-2.13 kernel-tests/target/jvm-3 kpl-logging-circe/target/jvm-3 smithy4s-client-localstack/target/jvm-2.13 aws-v2-localstack/target/jvm-2.13 kpl-logging-circe/target/jvm-2.12 kcl-fs2/target/jvm-3 shared/target/jvm-2.12 smithy4s-client-tests/target/jvm-3 shared-circe/target/jvm-3 kinesis-client-producer-tests/target/jvm-2.12 shared-ciris/target/jvm-2.13 shared-localstack/target/jvm-3 compat/target/jvm-2.12 kpl-ciris/target/jvm-3 smithy4s-client/target/jvm-2.13 kpl-localstack/target/jvm-2.12 smithy4s-client-logging-circe/target/jvm-3 kcl-http4s/target/jvm-2.13 kinesis-client-localstack/target/jvm-2.12 kcl-fs2-ciris/target/jvm-2.13 kcl-http4s/target/jvm-3 compat/target/jvm-2.13 project/target + run: mkdir -p smithy4s-client-tests/target/jvm-2.13 shared-ciris/target/jvm-3 smithy4s-client-producer-tests/target/jvm-2.13 aws-v1-localstack/target/jvm-2.12 kpl/target/jvm-3 target shared/target/jvm-2.13 kpl/target/jvm-2.12 kcl-ciris/target/jvm-3 kpl-ciris/target/jvm-2.13 kinesis-client-tests/target/jvm-2.12 kpl-ciris/target/jvm-2.12 smithy4s-client-localstack/target/jvm-3 smithy4s-client/target/jvm-3 unidocs/target/jvm-2.13 kernel-tests/target/jvm-2.12 smithy4s-client-logging-circe/target/jvm-2.13 kcl-logging-circe/target/jvm-3 .js/target kcl-fs2-ciris/target/jvm-2.12 kinesis-client-logging-circe/target/jvm-3 aws-v1-localstack/target/jvm-3 kcl-tests/target/jvm-2.12 site/target/jvm-2.13 kinesis-client-localstack/target/jvm-3 shared/target/jvm-3 kpl-tests/target/jvm-2.13 kcl-localstack/target/jvm-2.13 kcl-fs2-ciris/target/jvm-3 kcl/target/jvm-3 kcl/target/jvm-2.12 unidocs/target/jvm-2.12 smithy4s-client-transformers/target/jvm-2.12 kpl-tests/target/jvm-3 shared-ciris/target/jvm-2.12 shared-localstack/target/jvm-2.12 kpl-tests/target/jvm-2.12 kcl-localstack/target/jvm-2.12 kpl/target/jvm-2.13 kinesis-client-fs2/target/jvm-2.12 kinesis-client-producer-tests/target/jvm-3 shared-circe/target/jvm-2.12 shared-tests/target/jvm-3 kcl-logging-circe/target/jvm-2.13 kinesis-client-tests/target/jvm-3 kcl-tests/target/jvm-2.13 shared-fs2/target/jvm-2.13 kcl-ciris/target/jvm-2.13 kinesis-client-logging-circe/target/jvm-2.13 unidocs/target/jvm-3 smithy4s-client-producer-tests/target/jvm-3 compat/target/jvm-3 kinesis-client-logging-circe/target/jvm-2.12 shared-circe/target/jvm-2.13 kinesis-client/target/jvm-2.12 kpl-logging-circe/target/jvm-2.13 kcl-tests/target/jvm-3 aws-v2-localstack/target/jvm-3 kernel-tests/target/jvm-2.13 kcl-http4s/target/jvm-2.12 aws-v2-localstack/target/jvm-2.12 kcl-logging-circe/target/jvm-2.12 kinesis-client/target/jvm-2.13 kinesis-client-fs2/target/jvm-2.13 .jvm/target kcl/target/jvm-2.13 kpl-localstack/target/jvm-3 kpl-localstack/target/jvm-2.13 kcl-ciris/target/jvm-2.12 .native/target kinesis-client-tests/target/jvm-2.13 kcl-fs2/target/jvm-2.13 kcl-localstack/target/jvm-3 smithy4s-client-fs2/target/jvm-2.13 kcl-fs2/target/jvm-2.12 shared-fs2/target/jvm-2.12 kinesis-client-fs2/target/jvm-3 kinesis-client/target/jvm-3 shared-localstack/target/jvm-2.13 shared-tests/target/jvm-2.12 aws-v1-localstack/target/jvm-2.13 kinesis-client-producer-tests/target/jvm-2.13 shared-tests/target/jvm-2.13 kinesis-client-localstack/target/jvm-2.13 kernel-tests/target/jvm-3 kpl-logging-circe/target/jvm-3 smithy4s-client-localstack/target/jvm-2.13 aws-v2-localstack/target/jvm-2.13 kpl-logging-circe/target/jvm-2.12 kcl-fs2/target/jvm-3 smithy4s-client-fs2/target/jvm-3 shared/target/jvm-2.12 smithy4s-client-tests/target/jvm-3 shared-circe/target/jvm-3 kinesis-client-producer-tests/target/jvm-2.12 shared-ciris/target/jvm-2.13 shared-fs2/target/jvm-3 shared-localstack/target/jvm-3 compat/target/jvm-2.12 kpl-ciris/target/jvm-3 smithy4s-client/target/jvm-2.13 kpl-localstack/target/jvm-2.12 smithy4s-client-logging-circe/target/jvm-3 kcl-http4s/target/jvm-2.13 kinesis-client-localstack/target/jvm-2.12 kcl-fs2-ciris/target/jvm-2.13 kcl-http4s/target/jvm-3 compat/target/jvm-2.13 project/target - name: Compress target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') - run: tar cf targets.tar smithy4s-client-tests/target/jvm-2.13 shared-ciris/target/jvm-3 smithy4s-client-producer-tests/target/jvm-2.13 aws-v1-localstack/target/jvm-2.12 kpl/target/jvm-3 target shared/target/jvm-2.13 kpl/target/jvm-2.12 kcl-ciris/target/jvm-3 kpl-ciris/target/jvm-2.13 kinesis-client-tests/target/jvm-2.12 kpl-ciris/target/jvm-2.12 smithy4s-client-localstack/target/jvm-3 smithy4s-client/target/jvm-3 unidocs/target/jvm-2.13 kernel-tests/target/jvm-2.12 smithy4s-client-logging-circe/target/jvm-2.13 kcl-logging-circe/target/jvm-3 .js/target kcl-fs2-ciris/target/jvm-2.12 kinesis-client-logging-circe/target/jvm-3 aws-v1-localstack/target/jvm-3 kcl-tests/target/jvm-2.12 site/target/jvm-2.13 kinesis-client-localstack/target/jvm-3 shared/target/jvm-3 kpl-tests/target/jvm-2.13 kcl-localstack/target/jvm-2.13 kcl-fs2-ciris/target/jvm-3 kcl/target/jvm-3 kcl/target/jvm-2.12 unidocs/target/jvm-2.12 smithy4s-client-transformers/target/jvm-2.12 kpl-tests/target/jvm-3 shared-ciris/target/jvm-2.12 shared-localstack/target/jvm-2.12 kpl-tests/target/jvm-2.12 kcl-localstack/target/jvm-2.12 kpl/target/jvm-2.13 kinesis-client-producer-tests/target/jvm-3 shared-circe/target/jvm-2.12 shared-tests/target/jvm-3 kcl-logging-circe/target/jvm-2.13 kinesis-client-tests/target/jvm-3 kcl-tests/target/jvm-2.13 kcl-ciris/target/jvm-2.13 kinesis-client-logging-circe/target/jvm-2.13 unidocs/target/jvm-3 smithy4s-client-producer-tests/target/jvm-3 compat/target/jvm-3 kinesis-client-logging-circe/target/jvm-2.12 shared-circe/target/jvm-2.13 kinesis-client/target/jvm-2.12 kpl-logging-circe/target/jvm-2.13 kcl-tests/target/jvm-3 aws-v2-localstack/target/jvm-3 kernel-tests/target/jvm-2.13 kcl-http4s/target/jvm-2.12 aws-v2-localstack/target/jvm-2.12 kcl-logging-circe/target/jvm-2.12 kinesis-client/target/jvm-2.13 .jvm/target kcl/target/jvm-2.13 kpl-localstack/target/jvm-3 kpl-localstack/target/jvm-2.13 kcl-ciris/target/jvm-2.12 .native/target kinesis-client-tests/target/jvm-2.13 kcl-fs2/target/jvm-2.13 kcl-localstack/target/jvm-3 kcl-fs2/target/jvm-2.12 kinesis-client/target/jvm-3 shared-localstack/target/jvm-2.13 shared-tests/target/jvm-2.12 aws-v1-localstack/target/jvm-2.13 kinesis-client-producer-tests/target/jvm-2.13 shared-tests/target/jvm-2.13 kinesis-client-localstack/target/jvm-2.13 kernel-tests/target/jvm-3 kpl-logging-circe/target/jvm-3 smithy4s-client-localstack/target/jvm-2.13 aws-v2-localstack/target/jvm-2.13 kpl-logging-circe/target/jvm-2.12 kcl-fs2/target/jvm-3 shared/target/jvm-2.12 smithy4s-client-tests/target/jvm-3 shared-circe/target/jvm-3 kinesis-client-producer-tests/target/jvm-2.12 shared-ciris/target/jvm-2.13 shared-localstack/target/jvm-3 compat/target/jvm-2.12 kpl-ciris/target/jvm-3 smithy4s-client/target/jvm-2.13 kpl-localstack/target/jvm-2.12 smithy4s-client-logging-circe/target/jvm-3 kcl-http4s/target/jvm-2.13 kinesis-client-localstack/target/jvm-2.12 kcl-fs2-ciris/target/jvm-2.13 kcl-http4s/target/jvm-3 compat/target/jvm-2.13 project/target + run: tar cf targets.tar smithy4s-client-tests/target/jvm-2.13 shared-ciris/target/jvm-3 smithy4s-client-producer-tests/target/jvm-2.13 aws-v1-localstack/target/jvm-2.12 kpl/target/jvm-3 target shared/target/jvm-2.13 kpl/target/jvm-2.12 kcl-ciris/target/jvm-3 kpl-ciris/target/jvm-2.13 kinesis-client-tests/target/jvm-2.12 kpl-ciris/target/jvm-2.12 smithy4s-client-localstack/target/jvm-3 smithy4s-client/target/jvm-3 unidocs/target/jvm-2.13 kernel-tests/target/jvm-2.12 smithy4s-client-logging-circe/target/jvm-2.13 kcl-logging-circe/target/jvm-3 .js/target kcl-fs2-ciris/target/jvm-2.12 kinesis-client-logging-circe/target/jvm-3 aws-v1-localstack/target/jvm-3 kcl-tests/target/jvm-2.12 site/target/jvm-2.13 kinesis-client-localstack/target/jvm-3 shared/target/jvm-3 kpl-tests/target/jvm-2.13 kcl-localstack/target/jvm-2.13 kcl-fs2-ciris/target/jvm-3 kcl/target/jvm-3 kcl/target/jvm-2.12 unidocs/target/jvm-2.12 smithy4s-client-transformers/target/jvm-2.12 kpl-tests/target/jvm-3 shared-ciris/target/jvm-2.12 shared-localstack/target/jvm-2.12 kpl-tests/target/jvm-2.12 kcl-localstack/target/jvm-2.12 kpl/target/jvm-2.13 kinesis-client-fs2/target/jvm-2.12 kinesis-client-producer-tests/target/jvm-3 shared-circe/target/jvm-2.12 shared-tests/target/jvm-3 kcl-logging-circe/target/jvm-2.13 kinesis-client-tests/target/jvm-3 kcl-tests/target/jvm-2.13 shared-fs2/target/jvm-2.13 kcl-ciris/target/jvm-2.13 kinesis-client-logging-circe/target/jvm-2.13 unidocs/target/jvm-3 smithy4s-client-producer-tests/target/jvm-3 compat/target/jvm-3 kinesis-client-logging-circe/target/jvm-2.12 shared-circe/target/jvm-2.13 kinesis-client/target/jvm-2.12 kpl-logging-circe/target/jvm-2.13 kcl-tests/target/jvm-3 aws-v2-localstack/target/jvm-3 kernel-tests/target/jvm-2.13 kcl-http4s/target/jvm-2.12 aws-v2-localstack/target/jvm-2.12 kcl-logging-circe/target/jvm-2.12 kinesis-client/target/jvm-2.13 kinesis-client-fs2/target/jvm-2.13 .jvm/target kcl/target/jvm-2.13 kpl-localstack/target/jvm-3 kpl-localstack/target/jvm-2.13 kcl-ciris/target/jvm-2.12 .native/target kinesis-client-tests/target/jvm-2.13 kcl-fs2/target/jvm-2.13 kcl-localstack/target/jvm-3 smithy4s-client-fs2/target/jvm-2.13 kcl-fs2/target/jvm-2.12 shared-fs2/target/jvm-2.12 kinesis-client-fs2/target/jvm-3 kinesis-client/target/jvm-3 shared-localstack/target/jvm-2.13 shared-tests/target/jvm-2.12 aws-v1-localstack/target/jvm-2.13 kinesis-client-producer-tests/target/jvm-2.13 shared-tests/target/jvm-2.13 kinesis-client-localstack/target/jvm-2.13 kernel-tests/target/jvm-3 kpl-logging-circe/target/jvm-3 smithy4s-client-localstack/target/jvm-2.13 aws-v2-localstack/target/jvm-2.13 kpl-logging-circe/target/jvm-2.12 kcl-fs2/target/jvm-3 smithy4s-client-fs2/target/jvm-3 shared/target/jvm-2.12 smithy4s-client-tests/target/jvm-3 shared-circe/target/jvm-3 kinesis-client-producer-tests/target/jvm-2.12 shared-ciris/target/jvm-2.13 shared-fs2/target/jvm-3 shared-localstack/target/jvm-3 compat/target/jvm-2.12 kpl-ciris/target/jvm-3 smithy4s-client/target/jvm-2.13 kpl-localstack/target/jvm-2.12 smithy4s-client-logging-circe/target/jvm-3 kcl-http4s/target/jvm-2.13 kinesis-client-localstack/target/jvm-2.12 kcl-fs2-ciris/target/jvm-2.13 kcl-http4s/target/jvm-3 compat/target/jvm-2.13 project/target - name: Upload target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') @@ -144,6 +149,11 @@ jobs: ~/Library/Caches/Coursier/v1 key: ${{ runner.os }}-sbt-cache-v2-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }} + - name: Setup protoc + uses: arduino/setup-protoc@v1 + with: + repo-token: ${{ secrets.GITHUB_TOKEN }} + - name: Download target directories (2.12.17, rootJVM) uses: actions/download-artifact@v3 with: @@ -230,6 +240,11 @@ jobs: ~/Library/Caches/Coursier/v1 key: ${{ runner.os }}-sbt-cache-v2-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }} + - name: Setup protoc + uses: arduino/setup-protoc@v1 + with: + repo-token: ${{ secrets.GITHUB_TOKEN }} + - name: Generate site run: sbt '++ ${{ matrix.scala }}' docs/tlSite diff --git a/.mergify.yml b/.mergify.yml index b71844a4..bd6937ee 100644 --- a/.mergify.yml +++ b/.mergify.yml @@ -120,6 +120,14 @@ pull_request_rules: add: - kinesis-client remove: [] +- name: Label kinesis-client-fs2 PRs + conditions: + - files~=^kinesis-client-fs2/ + actions: + label: + add: + - kinesis-client-fs2 + remove: [] - name: Label kinesis-client-localstack PRs conditions: - files~=^kinesis-client-localstack/ @@ -216,6 +224,14 @@ pull_request_rules: add: - shared-ciris remove: [] +- name: Label shared-fs2 PRs + conditions: + - files~=^shared-fs2/ + actions: + label: + add: + - shared-fs2 + remove: [] - name: Label shared-localstack PRs conditions: - files~=^shared-localstack/ @@ -240,6 +256,14 @@ pull_request_rules: add: - smithy4s-client remove: [] +- name: Label smithy4s-client-fs2 PRs + conditions: + - files~=^smithy4s-client-fs2/ + actions: + label: + add: + - smithy4s-client-fs2 + remove: [] - name: Label smithy4s-client-localstack PRs conditions: - files~=^smithy4s-client-localstack/ diff --git a/build.sbt b/build.sbt index 7983be86..7bee3a26 100644 --- a/build.sbt +++ b/build.sbt @@ -25,6 +25,7 @@ lazy val shared = projectMatrix libraryDependencies ++= Seq( Aws.Aggregation.aggregator % Test, Aws.Aggregation.deaggregator % Test, + Aws.kcl % Test, Log4Cats.slf4j % Test ) ) @@ -32,6 +33,15 @@ lazy val shared = projectMatrix .enableIntegrationTests .dependsOn(compat) +lazy val `shared-fs2` = projectMatrix + .settings( + description := "Common code for FS2", + libraryDependencies ++= Seq(FS2.core) + ) + .jvmPlatform(allScalaVersions) + .enableIntegrationTests + .dependsOn(shared) + lazy val `shared-circe` = projectMatrix .settings( description := "Common shared utilities for Circe", @@ -69,7 +79,7 @@ lazy val `shared-tests` = projectMatrix ) .jvmPlatform(allScalaVersions) .enableIntegrationTests - .dependsOn(`shared-localstack`, `kernel-tests`) + .dependsOn(`shared-localstack`, `shared-fs2`, `kernel-tests`) lazy val `aws-v2-localstack` = projectMatrix .settings( @@ -292,6 +302,11 @@ lazy val `kinesis-client` = projectMatrix .enableIntegrationTests .dependsOn(shared) +lazy val `kinesis-client-fs2` = projectMatrix + .jvmPlatform(allScalaVersions) + .enableIntegrationTests + .dependsOn(`kinesis-client`, `shared-fs2`) + lazy val `kinesis-client-logging-circe` = projectMatrix .settings( description := "JSON structured logging instances for the Java Kinesis Client, via Circe" @@ -310,7 +325,7 @@ lazy val `kinesis-client-localstack` = projectMatrix ) .jvmPlatform(allScalaVersions) .enableIntegrationTests - .dependsOn(`aws-v2-localstack`, `kinesis-client`) + .dependsOn(`aws-v2-localstack`, `kinesis-client-fs2`) lazy val `kinesis-client-tests` = projectMatrix .enablePlugins(NoPublishPlugin) @@ -377,6 +392,11 @@ lazy val `smithy4s-client` = projectMatrix .enableIntegrationTests .dependsOn(shared) +lazy val `smithy4s-client-fs2` = projectMatrix + .jvmPlatform(last2ScalaVersions) + .enableIntegrationTests + .dependsOn(`smithy4s-client`, `shared-fs2`) + lazy val `smithy4s-client-logging-circe` = projectMatrix .enablePlugins(Smithy4sCodegenPlugin) .settings( @@ -392,7 +412,7 @@ lazy val `smithy4s-client-localstack` = projectMatrix description := "A test-kit for working with Kinesis and Localstack, via the Smithy4s Client project" ) .jvmPlatform(last2ScalaVersions) - .dependsOn(`shared-localstack`, `smithy4s-client`) + .dependsOn(`shared-localstack`, `smithy4s-client-fs2`) lazy val `smithy4s-client-tests` = projectMatrix .enablePlugins(NoPublishPlugin) @@ -472,6 +492,7 @@ lazy val docs = projectMatrix compat, `kernel-tests`, shared, + `shared-fs2`, `shared-circe`, `shared-ciris`, `shared-localstack`, @@ -490,9 +511,11 @@ lazy val docs = projectMatrix `kpl-logging-circe`, `kpl-localstack`, `kinesis-client`, + `kinesis-client-fs2`, `kinesis-client-logging-circe`, `kinesis-client-localstack`, `smithy4s-client`, + `smithy4s-client-fs2`, `smithy4s-client-logging-circe`, `smithy4s-client-localstack` ) @@ -508,6 +531,7 @@ lazy val unidocs = projectMatrix compat, `kernel-tests`, shared, + `shared-fs2`, `shared-circe`, `shared-ciris`, `shared-localstack`, @@ -526,9 +550,11 @@ lazy val unidocs = projectMatrix `kpl-logging-circe`, `kpl-localstack`, `kinesis-client`, + `kinesis-client-fs2`, `kinesis-client-logging-circe`, `kinesis-client-localstack`, `smithy4s-client`, + `smithy4s-client-fs2`, `smithy4s-client-logging-circe`, `smithy4s-client-localstack` ).flatMap(_.projectRefs): _* @@ -539,6 +565,7 @@ lazy val allProjects = Seq( compat, `kernel-tests`, shared, + `shared-fs2`, `shared-circe`, `shared-ciris`, `shared-localstack`, @@ -559,12 +586,14 @@ lazy val allProjects = Seq( `kpl-localstack`, `kpl-tests`, `kinesis-client`, + `kinesis-client-fs2`, `kinesis-client-logging-circe`, `kinesis-client-localstack`, `kinesis-client-tests`, `kinesis-client-producer-tests`, `smithy4s-client-transformers`, `smithy4s-client`, + `smithy4s-client-fs2`, `smithy4s-client-logging-circe`, `smithy4s-client-localstack`, `smithy4s-client-tests`, diff --git a/docs/client/directory.conf b/docs/client/directory.conf index 8be74960..75695579 100644 --- a/docs/client/directory.conf +++ b/docs/client/directory.conf @@ -1,6 +1,7 @@ laika.title = Kinesis Client laika.navigationOrder = [ getting-started.md + fs2.md circe.md localstack.md ] diff --git a/docs/client/fs2.md b/docs/client/fs2.md new file mode 100644 index 00000000..b1b61b9b --- /dev/null +++ b/docs/client/fs2.md @@ -0,0 +1,43 @@ +# FS2 + +This module provides a [KPL-like](https://github.com/awslabs/amazon-kinesis-producer) producer via implementing @:source(shared.src.main.scala.kinesis4cats.producer.fs2.FS2Producer). This interface receives records from a user, enqueues them into a Queue and puts them as batches to Kinesis on a configured interval. This leverages all of the functionality of the @:source(shared.src.main.scala.kinesis4cats.producer.Producer) interface, including batching, aggregation and retries. + +## Installation + +```scala +libraryDependencies += "io.github.etspaceman" %% "kinesis4cats-client-fs2" % "@VERSION@" +``` + +## Usage + +```scala mdoc:compile-only +import cats.effect._ +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient + +import kinesis4cats.client.logging.instances.show._ +import kinesis4cats.client.producer.fs2.FS2KinesisProducer +import kinesis4cats.producer.logging.instances.show._ +import kinesis4cats.producer._ +import kinesis4cats.producer.fs2._ +import kinesis4cats.models.StreamNameOrArn + +object MyApp extends IOApp { + override def run(args: List[String]) = + FS2KinesisProducer[IO]( + FS2Producer.Config.default(StreamNameOrArn.Name("my-stream")), + KinesisAsyncClient.builder().build() + ).use(producer => + for { + _ <- producer.put( + Record("my-data".getBytes(), "some-partition-key") + ) + _ <- producer.put( + Record("my-data-2".getBytes(), "some-partition-key-2") + ) + _ <- producer.put( + Record("my-data-3".getBytes(), "some-partition-key-3") + ) + } yield ExitCode.Success + ) +} +``` diff --git a/docs/client/localstack.md b/docs/client/localstack.md index 81632400..9c06c5f5 100644 --- a/docs/client/localstack.md +++ b/docs/client/localstack.md @@ -16,6 +16,7 @@ import cats.effect.IO import kinesis4cats.client.localstack.LocalstackKinesisClient import kinesis4cats.client.logging.instances.show._ import kinesis4cats.client.producer.localstack.LocalstackKinesisProducer +import kinesis4cats.client.producer.fs2.localstack.LocalstackFS2KinesisProducer import kinesis4cats.producer.logging.instances.show._ // Load a KinesisClient as an effect @@ -30,4 +31,7 @@ LocalstackKinesisClient.streamResource[IO]("my-stream", 1) // Load a KinesisProducer as a resource LocalstackKinesisProducer.resource[IO]("my-stream") + +// Load a FS2KinesisProducer as a resource +LocalstackFS2KinesisProducer.resource[IO]("my-stream") ``` diff --git a/docs/smithy4s/directory.conf b/docs/smithy4s/directory.conf index cd68f538..f6fd8091 100644 --- a/docs/smithy4s/directory.conf +++ b/docs/smithy4s/directory.conf @@ -1,6 +1,7 @@ laika.title = Smithy4s laika.navigationOrder = [ getting-started.md + fs2.md circe.md localstack.md ] diff --git a/docs/smithy4s/fs2.md b/docs/smithy4s/fs2.md new file mode 100644 index 00000000..f52726bc --- /dev/null +++ b/docs/smithy4s/fs2.md @@ -0,0 +1,49 @@ +# FS2 + +This module provides a [KPL-like](https://github.com/awslabs/amazon-kinesis-producer) producer via implementing @:source(shared.src.main.scala.kinesis4cats.producer.fs2.FS2Producer). This interface receives records from a user, enqueues them into a Queue and puts them as batches to Kinesis on a configured interval. This leverages all of the functionality of the @:source(shared.src.main.scala.kinesis4cats.producer.Producer) interface, including batching, aggregation and retries. + +## Installation + +```scala +libraryDependencies += "io.github.etspaceman" %% "kinesis4cats-smithy4s-client-fs2" % "@VERSION@" +``` + +## Usage + +```scala mdoc:compile-only +import cats.effect._ +import org.http4s.blaze.client.BlazeClientBuilder +import org.typelevel.log4cats.slf4j.Slf4jLogger +import smithy4s.aws._ + +import kinesis4cats.smithy4s.client.logging.instances.show._ +import kinesis4cats.smithy4s.client.producer.fs2.FS2KinesisProducer +import kinesis4cats.producer.logging.instances.show._ +import kinesis4cats.producer.fs2._ +import kinesis4cats.producer._ +import kinesis4cats.models.StreamNameOrArn + +object MyApp extends IOApp { + override def run(args: List[String]) = + BlazeClientBuilder[IO].resource.flatMap(client => + FS2KinesisProducer[IO]( + FS2Producer.Config.default(StreamNameOrArn.Name("my-stream")), + client, + IO.pure(AwsRegion.US_EAST_1), + loggerF = (_: Async[IO]) => Slf4jLogger.create[IO] + ) + ).use(producer => + for { + _ <- producer.put( + Record("my-data".getBytes(), "some-partition-key") + ) + _ <- producer.put( + Record("my-data-2".getBytes(), "some-partition-key-2") + ) + _ <- producer.put( + Record("my-data-3".getBytes(), "some-partition-key-3") + ) + } yield ExitCode.Success + ) +} +``` diff --git a/docs/smithy4s/localstack.md b/docs/smithy4s/localstack.md index 89a98cf5..71987cb6 100644 --- a/docs/smithy4s/localstack.md +++ b/docs/smithy4s/localstack.md @@ -20,6 +20,7 @@ import kinesis4cats.logging.instances.show._ import kinesis4cats.smithy4s.client.localstack.LocalstackKinesisClient import kinesis4cats.smithy4s.client.logging.instances.show._ import kinesis4cats.smithy4s.client.producer.localstack.LocalstackKinesisProducer +import kinesis4cats.smithy4s.client.producer.fs2.localstack.LocalstackFS2KinesisProducer import kinesis4cats.producer.logging.instances.show._ // Load a KinesisClient as a Resource val kinesisClientResource = for { @@ -45,4 +46,17 @@ val kinesisProducerResource = for { loggerF = (_: Async[IO]) => Slf4jLogger.create[IO] ) } yield producer + +// Load a FS2KinesisProducer as a Resource +val fs2KinesisProducerResource = for { + underlying <- BlazeClientBuilder[IO] + .withCheckEndpointAuthentication(false) + .resource + producer <- LocalstackFS2KinesisProducer.resource[IO]( + underlying, + "my-stream", + IO.pure(AwsRegion.US_EAST_1), + loggerF = (_: Async[IO]) => Slf4jLogger.create[IO] + ) +} yield producer ``` diff --git a/kcl-fs2/src/main/scala/kinesis4cats/kcl/fs2/KCLConsumerFS2.scala b/kcl-fs2/src/main/scala/kinesis4cats/kcl/fs2/KCLConsumerFS2.scala index abc82921..a4f1f2e1 100644 --- a/kcl-fs2/src/main/scala/kinesis4cats/kcl/fs2/KCLConsumerFS2.scala +++ b/kcl-fs2/src/main/scala/kinesis4cats/kcl/fs2/KCLConsumerFS2.scala @@ -690,8 +690,8 @@ object KCLConsumerFS2 { object FS2Config { val default = FS2Config( - 100, 1000, + 100, 10.seconds, 5, 0.seconds diff --git a/kinesis-client-fs2/src/main/scala/kinesis4cats/client/producer/fs2/FS2KinesisProducer.scala b/kinesis-client-fs2/src/main/scala/kinesis4cats/client/producer/fs2/FS2KinesisProducer.scala new file mode 100644 index 00000000..5729dfec --- /dev/null +++ b/kinesis-client-fs2/src/main/scala/kinesis4cats/client/producer/fs2/FS2KinesisProducer.scala @@ -0,0 +1,111 @@ +/* + * Copyright 2023-2023 etspaceman + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kinesis4cats.client +package producer +package fs2 + +import cats.effect.Async +import cats.effect.kernel.Resource +import cats.effect.std.Queue +import cats.effect.syntax.all._ +import org.typelevel.log4cats.StructuredLogger +import org.typelevel.log4cats.slf4j.Slf4jLogger +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient +import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest +import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse + +import kinesis4cats.producer._ +import kinesis4cats.producer.fs2.FS2Producer + +/** A buffered Kinesis producer which will produce batches of data at a + * configurable rate. + * + * @param config + * [[kinesis.producer.fs2.FS2Producer.Config FS2Producer.Config]] + * @param queue + * [[cats.effect.std.Queue Queue]] of + * [[kinesis4cats.producer.Record Records]] to produce. + * @param underlying + * [[kinesis4cats.smithy4s.client.producer.KinesisProducer KinesisProducer]] + * @param callback: + * Function that can be run after each of the put results from the underlying + * @param F + * [[cats.effect.Async Async]] + */ +final class FS2KinesisProducer[F[_]] private[kinesis4cats] ( + override val logger: StructuredLogger[F], + override val config: FS2Producer.Config, + override protected val queue: Queue[F, Option[Record]], + override protected val underlying: KinesisProducer[F] +)( + override protected val callback: ( + Producer.Res[PutRecordsResponse], + Async[F] + ) => F[Unit] +)(implicit + F: Async[F] +) extends FS2Producer[F, PutRecordsRequest, PutRecordsResponse] + +object FS2KinesisProducer { + + /** Basic constructor for the + * [[kinesis4cats.client.producer.fs2.FS2KinesisProducer FS2KinesisProducer]] + * + * @param config + * [[kinesis4cats.producer.fs2.FS2Producer.Config FS2Producer.Config]] + * @param _underlying + * [[https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/kinesis/KinesisAsyncClient.html KinesisAsyncClient]] + * instance + * @param callback + * Function that can be run after each of the put results from the + * underlying + * @param F + * [[cats.effect.Async Async]] + * @param LE + * [[kinesis4cats.producer.Producer.LogEncoders Producer.LogEncoders]] + * @param KLE + * [[kinesis4cats.client.KinesisClient.LogEncoders KinesisClient.LogEncoders]] + * @param SLE + * [[kinesis4cats.producer.ShardMapCache.LogEncoders ShardMapCache.LogEncoders]] + * @return + * [[cats.effect.Resource Resource]] of + * [[kinesis4cats.client.producer.fs2.FS2KinesisProducer FS2KinesisProducer]] + */ + def apply[F[_]]( + config: FS2Producer.Config, + _underlying: KinesisAsyncClient, + callback: (Producer.Res[PutRecordsResponse], Async[F]) => F[Unit] = + (_: Producer.Res[PutRecordsResponse], f: Async[F]) => f.unit + )(implicit + F: Async[F], + LE: Producer.LogEncoders, + KLE: KinesisClient.LogEncoders, + SLE: ShardMapCache.LogEncoders + ): Resource[F, FS2KinesisProducer[F]] = for { + logger <- Slf4jLogger.create[F].toResource + underlying <- KinesisProducer( + config.producerConfig, + _underlying + ) + queue <- Queue.bounded[F, Option[Record]](config.queueSize).toResource + producer = new FS2KinesisProducer[F](logger, config, queue, underlying)( + callback + ) + _ <- producer.start() + _ <- Resource.onFinalize(producer.stop()) + } yield producer +} diff --git a/kinesis-client-localstack/src/main/scala/kinesis4cats/client/producer/fs2/localstack/LocalstackFS2KinesisProducer.scala b/kinesis-client-localstack/src/main/scala/kinesis4cats/client/producer/fs2/localstack/LocalstackFS2KinesisProducer.scala new file mode 100644 index 00000000..bebeadd3 --- /dev/null +++ b/kinesis-client-localstack/src/main/scala/kinesis4cats/client/producer/fs2/localstack/LocalstackFS2KinesisProducer.scala @@ -0,0 +1,119 @@ +/* + * Copyright 2023-2023 etspaceman + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kinesis4cats.client.producer +package fs2 +package localstack + +import cats.effect._ +import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse + +import kinesis4cats.client.KinesisClient +import kinesis4cats.localstack.LocalstackConfig +import kinesis4cats.localstack.aws.v2.AwsClients +import kinesis4cats.models.StreamNameOrArn +import kinesis4cats.producer.Producer +import kinesis4cats.producer.ShardMapCache +import kinesis4cats.producer.fs2.FS2Producer + +object LocalstackFS2KinesisProducer { + + /** Builds a [[kinesis4cats.client.producer.KinesisProducer KinesisProducer]] + * that is compliant for Localstack usage. Lifecycle is managed as a + * [[cats.effect.Resource Resource]]. + * + * @param producerConfig + * [[kinesis4cats.producer.fs2.FS2Producer.Config FS2Producer.Config]] + * @param config + * [[kinesis4cats.localstack.LocalstackConfig LocalstackConfig]] + * @param callback + * Function that can be run after each of the put results from the + * underlying + * @param F + * F with an [[cats.effect.Async Async]] instance + * @param LE + * [[kinesis4cats.client.KinesisClient.LogEncoders LogEncoders]] + * @param SLE + * [[kinesis4cats.producer.ShardMapCache.LogEncoders ShardMapCache.LogEncoders]] + * @param PLE + * [[kinesis4cats.producer.Producer.LogEncoders Producer.LogEncoders]] + * @return + * [[cats.effect.Resource Resource]] of + * [[kinesis4cats.client.producer.fs2.FS2KinesisProducer FS2KinesisProducer]] + */ + def resource[F[_]]( + producerConfig: FS2Producer.Config, + config: LocalstackConfig, + callback: (Producer.Res[PutRecordsResponse], Async[F]) => F[Unit] + )(implicit + F: Async[F], + LE: KinesisClient.LogEncoders, + SLE: ShardMapCache.LogEncoders, + PLE: Producer.LogEncoders + ): Resource[F, FS2KinesisProducer[F]] = AwsClients + .kinesisClientResource[F](config) + .flatMap(underlying => + FS2KinesisProducer[F]( + producerConfig, + underlying, + callback + ) + ) + + /** Builds a [[kinesis4cats.client.producer.KinesisProducer KinesisProducer]] + * that is compliant for Localstack usage. Lifecycle is managed as a + * [[cats.effect.Resource Resource]]. + * + * @param streamName + * Name of stream for the producer to produce to + * @param prefix + * Optional prefix for parsing configuration. Default to None + * @param producerConfig + * String => + * [[kinesis4cats.producer.fs2.FS2Producer.Config FS2Producer.Config]] + * function that creates configuration given a stream name. Defaults to + * Producer.Config.default + * @param callback + * Function that can be run after each of the put results from the + * underlying. Defaults to F.unit. + * @param F + * F with an [[cats.effect.Async Async]] instance + * @param LE + * [[kinesis4cats.client.KinesisClient.LogEncoders LogEncoders]] + * @param SLE + * [[kinesis4cats.producer.ShardMapCache.LogEncoders ShardMapCache.LogEncoders]] + * @param PLE + * [[kinesis4cats.producer.Producer.LogEncoders Producer.LogEncoders]] + * @return + * [[cats.effect.Resource Resource]] of + * [[kinesis4cats.client.producer.fs2.FS2KinesisProducer FS2KinesisProducer]] + */ + def resource[F[_]]( + streamName: String, + prefix: Option[String] = None, + producerConfig: String => FS2Producer.Config = (streamName: String) => + FS2Producer.Config.default(StreamNameOrArn.Name(streamName)), + callback: (Producer.Res[PutRecordsResponse], Async[F]) => F[Unit] = + (_: Producer.Res[PutRecordsResponse], f: Async[F]) => f.unit + )(implicit + F: Async[F], + LE: KinesisClient.LogEncoders, + SLE: ShardMapCache.LogEncoders, + PLE: Producer.LogEncoders + ): Resource[F, FS2KinesisProducer[F]] = LocalstackConfig + .resource[F](prefix) + .flatMap(resource[F](producerConfig(streamName), _, callback)) +} diff --git a/kinesis-client-localstack/src/main/scala/kinesis4cats/client/producer/localstack/LocalstackKinesisProducer.scala b/kinesis-client-localstack/src/main/scala/kinesis4cats/client/producer/localstack/LocalstackKinesisProducer.scala index a75a596a..5e3eaccc 100644 --- a/kinesis-client-localstack/src/main/scala/kinesis4cats/client/producer/localstack/LocalstackKinesisProducer.scala +++ b/kinesis-client-localstack/src/main/scala/kinesis4cats/client/producer/localstack/LocalstackKinesisProducer.scala @@ -17,6 +17,9 @@ package kinesis4cats.client.producer.localstack import cats.effect._ +import cats.effect.syntax.all._ +import cats.syntax.all._ +import org.typelevel.log4cats.slf4j.Slf4jLogger import kinesis4cats.client.KinesisClient import kinesis4cats.client.producer.KinesisProducer @@ -24,6 +27,7 @@ import kinesis4cats.localstack.LocalstackConfig import kinesis4cats.localstack.aws.v2.AwsClients import kinesis4cats.models.StreamNameOrArn import kinesis4cats.producer.Producer +import kinesis4cats.producer.ShardMap import kinesis4cats.producer.ShardMapCache object LocalstackKinesisProducer { @@ -50,7 +54,12 @@ object LocalstackKinesisProducer { */ def resource[F[_]]( producerConfig: Producer.Config, - config: LocalstackConfig + config: LocalstackConfig, + shardMapF: ( + KinesisClient[F], + StreamNameOrArn, + Async[F] + ) => F[Either[ShardMapCache.Error, ShardMap]] )(implicit F: Async[F], LE: KinesisClient.LogEncoders, @@ -58,11 +67,22 @@ object LocalstackKinesisProducer { PLE: Producer.LogEncoders ): Resource[F, KinesisProducer[F]] = AwsClients .kinesisClientResource[F](config) - .flatMap(underlying => - KinesisProducer[F]( - producerConfig, - underlying - ) + .flatMap(_underlying => + for { + logger <- Slf4jLogger.create[F].toResource + underlying <- KinesisClient[F](_underlying) + shardMapCache <- ShardMapCache[F]( + producerConfig.shardMapCacheConfig, + shardMapF(underlying, producerConfig.streamNameOrArn, F), + Slf4jLogger.create[F].widen + ) + producer = new KinesisProducer[F]( + logger, + shardMapCache, + producerConfig, + underlying + ) + } yield producer ) /** Builds a [[kinesis4cats.client.producer.KinesisProducer KinesisProducer]] @@ -93,7 +113,16 @@ object LocalstackKinesisProducer { streamName: String, prefix: Option[String] = None, producerConfig: String => Producer.Config = (streamName: String) => - Producer.Config.default(StreamNameOrArn.Name(streamName)) + Producer.Config.default(StreamNameOrArn.Name(streamName)), + shardMapF: ( + KinesisClient[F], + StreamNameOrArn, + Async[F] + ) => F[Either[ShardMapCache.Error, ShardMap]] = ( + client: KinesisClient[F], + streamNameOrArn: StreamNameOrArn, + f: Async[F] + ) => KinesisProducer.getShardMap(client, streamNameOrArn)(f) )(implicit F: Async[F], LE: KinesisClient.LogEncoders, @@ -101,5 +130,5 @@ object LocalstackKinesisProducer { PLE: Producer.LogEncoders ): Resource[F, KinesisProducer[F]] = LocalstackConfig .resource[F](prefix) - .flatMap(resource[F](producerConfig(streamName), _)) + .flatMap(resource[F](producerConfig(streamName), _, shardMapF)) } diff --git a/kinesis-client-producer-tests/src/it/scala/kinesis4cats/client/producer/KinesisProducerNoShardMapSpec.scala b/kinesis-client-producer-tests/src/it/scala/kinesis4cats/client/producer/KinesisProducerNoShardMapSpec.scala new file mode 100644 index 00000000..63fc9733 --- /dev/null +++ b/kinesis-client-producer-tests/src/it/scala/kinesis4cats/client/producer/KinesisProducerNoShardMapSpec.scala @@ -0,0 +1,89 @@ +/* + * Copyright 2023-2023 etspaceman + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kinesis4cats.client.producer + +import java.util.UUID + +import cats.effect._ +import cats.effect.syntax.all._ +import cats.syntax.all._ +import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest +import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse + +import kinesis4cats.client.KinesisClient +import kinesis4cats.client.localstack.LocalstackKinesisClient +import kinesis4cats.client.logging.instances.show._ +import kinesis4cats.client.producer.localstack.LocalstackKinesisProducer +import kinesis4cats.kcl.CommittableRecord +import kinesis4cats.kcl.localstack.LocalstackKCLConsumer +import kinesis4cats.kcl.logging.instances.show._ +import kinesis4cats.models.StreamNameOrArn +import kinesis4cats.producer.Producer +import kinesis4cats.producer.ProducerSpec +import kinesis4cats.producer.ShardMapCache +import kinesis4cats.producer.logging.instances.show._ +import kinesis4cats.syntax.bytebuffer._ + +class KinesisProducerNoShardMapSpec + extends ProducerSpec[ + PutRecordsRequest, + PutRecordsResponse, + CommittableRecord[IO] + ]() { + override lazy val streamName: String = + s"kinesis-client-producer-spec-${UUID.randomUUID().toString()}" + override def producerResource + : Resource[IO, Producer[IO, PutRecordsRequest, PutRecordsResponse]] = + LocalstackKinesisProducer.resource[IO]( + streamName, + shardMapF = ( + _: KinesisClient[IO], + _: StreamNameOrArn, + _: Async[IO] + ) => + IO.pure( + ShardMapCache + .ListShardsError( + new RuntimeException("Expected Exception") + ) + .asLeft + ) + ) + + override def aAsBytes(a: CommittableRecord[IO]): Array[Byte] = a.data.asArray + + override def fixture( + shardCount: Int, + appName: String + ): SyncIO[FunFixture[ProducerSpec.Resources[ + IO, + PutRecordsRequest, + PutRecordsResponse, + CommittableRecord[IO] + ]]] = ResourceFixture( + for { + _ <- LocalstackKinesisClient.streamResource[IO](streamName, shardCount) + deferredWithResults <- LocalstackKCLConsumer.kclConsumerWithResults( + streamName, + appName + )((_: List[CommittableRecord[IO]]) => IO.unit) + _ <- deferredWithResults.deferred.get.toResource + producer <- producerResource + } yield ProducerSpec.Resources(deferredWithResults.resultsQueue, producer) + ) + +} diff --git a/kinesis-client-producer-tests/src/it/scala/kinesis4cats/client/producer/fs2/FS2KinesisProducerSpec.scala b/kinesis-client-producer-tests/src/it/scala/kinesis4cats/client/producer/fs2/FS2KinesisProducerSpec.scala new file mode 100644 index 00000000..b7bc47c9 --- /dev/null +++ b/kinesis-client-producer-tests/src/it/scala/kinesis4cats/client/producer/fs2/FS2KinesisProducerSpec.scala @@ -0,0 +1,74 @@ +/* + * Copyright 2023-2023 etspaceman + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kinesis4cats.client.producer.fs2 + +import java.util.UUID + +import cats.effect._ +import cats.effect.syntax.all._ +import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest +import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse + +import kinesis4cats.client.localstack.LocalstackKinesisClient +import kinesis4cats.client.logging.instances.show._ +import kinesis4cats.client.producer.fs2.localstack.LocalstackFS2KinesisProducer +import kinesis4cats.kcl.CommittableRecord +import kinesis4cats.kcl.localstack.LocalstackKCLConsumer +import kinesis4cats.kcl.logging.instances.show._ +import kinesis4cats.producer.fs2.FS2Producer +import kinesis4cats.producer.fs2.FS2ProducerSpec +import kinesis4cats.producer.logging.instances.show._ +import kinesis4cats.syntax.bytebuffer._ + +class KinesisFS2ProducerSpec + extends FS2ProducerSpec[ + PutRecordsRequest, + PutRecordsResponse, + CommittableRecord[IO] + ]() { + override lazy val streamName: String = + s"kinesis-client-fs2-producer-spec-${UUID.randomUUID().toString()}" + override def producerResource + : Resource[IO, FS2Producer[IO, PutRecordsRequest, PutRecordsResponse]] = + LocalstackFS2KinesisProducer.resource[IO](streamName) + + override def aAsBytes(a: CommittableRecord[IO]): Array[Byte] = a.data.asArray + + override def fixture( + shardCount: Int, + appName: String + ): SyncIO[FunFixture[FS2ProducerSpec.Resources[ + IO, + PutRecordsRequest, + PutRecordsResponse, + CommittableRecord[IO] + ]]] = ResourceFixture( + for { + _ <- LocalstackKinesisClient.streamResource[IO](streamName, shardCount) + deferredWithResults <- LocalstackKCLConsumer.kclConsumerWithResults( + streamName, + appName + )((_: List[CommittableRecord[IO]]) => IO.unit) + _ <- deferredWithResults.deferred.get.toResource + producer <- producerResource + } yield FS2ProducerSpec.Resources( + deferredWithResults.resultsQueue, + producer + ) + ) + +} diff --git a/kinesis-client/src/main/scala/kinesis4cats/client/producer/KinesisProducer.scala b/kinesis-client/src/main/scala/kinesis4cats/client/producer/KinesisProducer.scala index 272740fd..cabab3a4 100644 --- a/kinesis-client/src/main/scala/kinesis4cats/client/producer/KinesisProducer.scala +++ b/kinesis-client/src/main/scala/kinesis4cats/client/producer/KinesisProducer.scala @@ -53,7 +53,7 @@ import kinesis4cats.syntax.id._ * @param LE * [[kinesis4cats.producer.Producer.LogEncoders Producer.LogEncoders]] */ -final class KinesisProducer[F[_]] private ( +final class KinesisProducer[F[_]] private[kinesis4cats] ( override val logger: StructuredLogger[F], override val shardMapCache: ShardMapCache[F], override val config: Producer.Config, @@ -107,7 +107,7 @@ final class KinesisProducer[F[_]] private ( object KinesisProducer { - private def getShardMap[F[_]]( + private[kinesis4cats] def getShardMap[F[_]]( client: KinesisClient[F], streamNameOrArn: models.StreamNameOrArn )(implicit diff --git a/project/Kinesis4CatsPlugin.scala b/project/Kinesis4CatsPlugin.scala index 08a63010..c9f23047 100644 --- a/project/Kinesis4CatsPlugin.scala +++ b/project/Kinesis4CatsPlugin.scala @@ -28,6 +28,7 @@ object Kinesis4CatsPlugin extends AutoPlugin { import de.heikoseeberger.sbtheader.HeaderPlugin.autoImport._ import org.scalafmt.sbt.ScalafmtPlugin.autoImport._ import sbtassembly.AssemblyPlugin.autoImport._ + import sbtprotobuf.ProtobufPlugin.autoImport._ import scalafix.sbt.ScalafixPlugin.autoImport._ private val primaryJavaOSCond = Def.setting { @@ -55,6 +56,14 @@ object Kinesis4CatsPlugin extends AutoPlugin { scalaVersion := Scala213, tlCiMimaBinaryIssueCheck := tlBaseVersion.value != "0.0", resolvers += "jitpack" at "https://jitpack.io", + protobufUseSystemProtoc := sys.env.get("CI").nonEmpty, + githubWorkflowJobSetup ++= List( + WorkflowStep.Use( + UseRef.Public("arduino", "setup-protoc", "v1"), + name = Some("Setup protoc"), + params = Map("repo-token" -> "${{ secrets.GITHUB_TOKEN }}") + ) + ), githubWorkflowBuild := { val style = (tlCiHeaderCheck.value, tlCiScalafmtCheck.value) match { case (true, true) => // headers + formatting diff --git a/shared-fs2/src/main/scala/kinesis4cats/producer/fs2/FS2Producer.scala b/shared-fs2/src/main/scala/kinesis4cats/producer/fs2/FS2Producer.scala new file mode 100644 index 00000000..f72cb572 --- /dev/null +++ b/shared-fs2/src/main/scala/kinesis4cats/producer/fs2/FS2Producer.scala @@ -0,0 +1,166 @@ +/* + * Copyright 2023-2023 etspaceman + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kinesis4cats.producer +package fs2 + +import scala.concurrent.duration._ + +import _root_.fs2.Stream +import cats.data.Ior +import cats.data.NonEmptyList +import cats.effect.Async +import cats.effect.Resource +import cats.effect.std.Queue +import cats.effect.syntax.all._ +import cats.syntax.all._ +import org.typelevel.log4cats.StructuredLogger + +import kinesis4cats.logging.LogContext +import kinesis4cats.models.StreamNameOrArn + +/** An interface that runs a [[kinesis4cats.producer.Producer Producer's]] + * putWithRetry method in the background against a stream of records, offered + * by the user. This is intended to be used in the same way that the + * [[https://github.com/awslabs/amazon-kinesis-producer KPL]]. + * + * @param F + * [[cats.effect.Async Async]] + * @tparam PutReq + * The class that represents a batch put request for the underlying client + * @tparam PutRes + * The class that represents a batch put response for the underlying client + */ +abstract class FS2Producer[F[_], PutReq, PutRes](implicit + F: Async[F] +) { + + def logger: StructuredLogger[F] + def config: FS2Producer.Config + + /** The underlying queue of records to process + */ + protected def queue: Queue[F, Option[Record]] + + /** A user defined function that can be run against the results of a request + */ + protected def callback + : (Ior[Producer.Error, NonEmptyList[PutRes]], Async[F]) => F[Unit] + + protected def underlying: Producer[F, PutReq, PutRes] + + /** Put a record into the producer's buffer, to be batched and produced at a + * defined interval + * + * @param record + * [[kinesis4cats.producer.Record Record]] + * @return + * F of Unit + */ + def put(record: Record): F[Unit] = queue.offer(Some(record)) + + /** Stop the processing of records + */ + private[kinesis4cats] def stop(): F[Unit] = { + val ctx = LogContext() + for { + _ <- logger.debug(ctx.context)("Stopping the FS2KinesisProducer") + _ <- queue.offer(None) + } yield () + } + + /** Start the processing of records + */ + private[kinesis4cats] def start(): Resource[F, Unit] = { + val ctx = LogContext() + for { + _ <- logger + .debug(ctx.context)("Starting the FS2KinesisProducer") + .toResource + _ <- Stream + .fromQueueNoneTerminated(queue) + .groupWithin(config.putMaxChunk, config.putMaxWait) + .evalMap { x => + val c = ctx.addEncoded("batchSize", x.size) + x.toNel.fold(F.unit) { records => + for { + _ <- logger.debug(c.context)( + "Received batch to process" + ) + _ <- underlying + .putWithRetry( + records, + config.putMaxRetries, + config.putRetryInterval + ) + .flatMap(callback(_, implicitly)) + .void + _ <- logger.debug(c.context)( + "Finished processing batch" + ) + } yield () + } + } + .compile + .drain + .background + .void + } yield () + + } + +} + +object FS2Producer { + + /** Configuration for the + * [[kinesis4cats.producer.fs2.FS2Producer FS2Producer]] + * + * @param queueSize + * Size of underlying buffer of records + * @param putMaxChunk + * Max records to buffer before running a put request + * @param putMaxWait + * Max time to wait before running a put request + * @param putMaxRetries + * Number of retries for the underlying put request. None means infinite + * retries. + * @param putRetryInterval + * Delay between retries + * @param producerConfig + * [[kinesis4cats.producer.Producer.Config Producer.Config]] + */ + final case class Config( + queueSize: Int, + putMaxChunk: Int, + putMaxWait: FiniteDuration, + putMaxRetries: Option[Int], + putRetryInterval: FiniteDuration, + producerConfig: Producer.Config + ) + + object Config { + def default(streamNameOrArn: StreamNameOrArn): Config = Config( + 1000, + 500, + 100.millis, + Some(5), + 0.seconds, + Producer.Config.default(streamNameOrArn) + ) + } + +} diff --git a/shared-tests/src/main/scala/kinesis4cats/producer/ProducerSpec.scala b/shared-tests/src/main/scala/kinesis4cats/producer/ProducerSpec.scala index 8a87e2df..1038993a 100644 --- a/shared-tests/src/main/scala/kinesis4cats/producer/ProducerSpec.scala +++ b/shared-tests/src/main/scala/kinesis4cats/producer/ProducerSpec.scala @@ -70,7 +70,7 @@ private[kinesis4cats] abstract class ProducerSpec[PutReq, PutRes, A] (x: Int) => IO(x === 50), noop[IO, Int] )(resources.resultsQueue.size) - _ <- IO(assert(size === 50)) + _ <- IO(assertEquals(size, 50)) results <- resources.resultsQueue.tryTakeN(None) resultRecords <- results.traverse { x => IO.fromEither(decode[TestData](new String(aAsBytes(x)))) diff --git a/shared-tests/src/main/scala/kinesis4cats/producer/fs2/FS2ProducerSpec.scala b/shared-tests/src/main/scala/kinesis4cats/producer/fs2/FS2ProducerSpec.scala new file mode 100644 index 00000000..eb48a674 --- /dev/null +++ b/shared-tests/src/main/scala/kinesis4cats/producer/fs2/FS2ProducerSpec.scala @@ -0,0 +1,93 @@ +/* + * Copyright 2023-2023 etspaceman + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kinesis4cats +package producer +package fs2 + +import scala.concurrent.duration._ + +import java.util.UUID + +import cats.data.NonEmptyList +import cats.effect._ +import cats.effect.std.Queue +import cats.syntax.all._ +import io.circe.parser._ +import io.circe.syntax._ +import org.scalacheck.Arbitrary +import retry.RetryPolicies._ +import retry._ + +import kinesis4cats.syntax.scalacheck._ + +private[kinesis4cats] abstract class FS2ProducerSpec[PutReq, PutRes, A] + extends munit.CatsEffectSuite { + + def producerResource: Resource[IO, FS2Producer[IO, PutReq, PutRes]] + def streamName: String + def aAsBytes(a: A): Array[Byte] + + def fixture( + shardCount: Int, + appName: String + ): SyncIO[FunFixture[FS2ProducerSpec.Resources[IO, PutReq, PutRes, A]]] + + override def munitTimeout: Duration = 5.minutes + + def appName = streamName + + fixture(3, appName).test("It should produce records end to end") { + resources => + for { + data <- IO(Arbitrary.arbitrary[TestData].take(50).toList) + records = NonEmptyList.fromListUnsafe( + data.map(x => + Record( + x.asJson.noSpaces.getBytes(), + UUID.randomUUID().toString(), + None, + None + ) + ) + ) + _ <- records.traverse(resources.producer.put) + retryPolicy = limitRetries[IO](30).join(constantDelay(1.second)) + size <- retryingOnFailures( + retryPolicy, + (x: Int) => IO(x === 50), + noop[IO, Int] + )(resources.resultsQueue.size) + _ <- IO(assertEquals(size, 50)) + results <- resources.resultsQueue.tryTakeN(None) + resultRecords <- results.traverse { x => + IO.fromEither(decode[TestData](new String(aAsBytes(x)))) + } + } yield assert( + resultRecords.forall(data.contains), + s"res: ${resultRecords}\nexp: ${data}" + ) + } +} + +object FS2ProducerSpec { + + final case class Resources[F[_], PutReq, PutRes, A]( + resultsQueue: Queue[F, A], + producer: FS2Producer[F, PutReq, PutRes] + ) + +} diff --git a/shared/src/main/scala/kinesis4cats/producer/Producer.scala b/shared/src/main/scala/kinesis4cats/producer/Producer.scala index e4504836..9d33a7c5 100644 --- a/shared/src/main/scala/kinesis4cats/producer/Producer.scala +++ b/shared/src/main/scala/kinesis4cats/producer/Producer.scala @@ -196,7 +196,7 @@ abstract class Producer[F[_], PutReq, PutRes](implicit records: NonEmptyList[Record], retries: Option[Int], retryDuration: FiniteDuration - ): F[Ior[Producer.Error, NonEmptyList[PutRes]]] = { + ): F[Producer.Res[PutRes]] = { val ctx = LogContext() .addEncoded("retriesRemaining", retries.fold("Infinite")(_.toString())) .addEncoded("retryDuration", retryDuration) @@ -204,11 +204,15 @@ abstract class Producer[F[_], PutReq, PutRes](implicit case x if x.isRight => F.pure(x) case x if x.left.exists(e => e.errors.exists(_.isLeft)) => F.pure(x) case x if retries.exists(_ <= 0) => - logger - .warn(ctx.context)( - "All retries have been exhausted, and the final retry detected errors" - ) - .as(x) + if (config.raiseOnExhaustedRetries) { + x.leftTraverse(F.raiseError) + } else { + logger + .warn(ctx.context)( + "All retries have been exhausted, and the final retry detected errors" + ) + .as(x) + } case x => logger .debug(ctx.context)("Failures detected, retrying failed records") @@ -236,6 +240,9 @@ abstract class Producer[F[_], PutReq, PutRes](implicit object Producer { + type Res[A] = Ior[Producer.Error, NonEmptyList[A]] + type Errs = Ior[NonEmptyList[InvalidRecord], NonEmptyList[FailedRecord]] + /** [[kinesis4cats.logging.LogEncoder LogEncoder]] instances for the * [[kinesis4cats.producer.Producer]] * @@ -261,12 +268,16 @@ object Producer { * @param raiseOnFailures * If true, an exception will be raised if a * [[kinesis4cats.producer.Producer.Error Producer.Error]] is detected in - * one fo the batches + * one of the batches * @param shardMapCacheConfig * [[kinesis4cats.producer.ShardMapCache.Config ShardMapCache.Config]] * @param streamNameOrArn * [[kinesis4cats.models.StreamNameOrArn StreamNameOrArn]] either a stream * name or a stream ARN for the producer. + * @param raiseOnExhaustedRetries + * If true, an exception will be raised if a + * [[kinesis4cats.producer.Producer.Error Producer.Error]] is detected in + * the final batch of retried put requests */ final case class Config( warnOnShardCacheMisses: Boolean, @@ -275,7 +286,8 @@ object Producer { raiseOnFailures: Boolean, shardMapCacheConfig: ShardMapCache.Config, batcherConfig: Batcher.Config, - streamNameOrArn: StreamNameOrArn + streamNameOrArn: StreamNameOrArn, + raiseOnExhaustedRetries: Boolean ) object Config { @@ -296,7 +308,8 @@ object Producer { false, ShardMapCache.Config.default, Batcher.Config.default, - streamNameOrArn + streamNameOrArn, + false ) } @@ -312,11 +325,7 @@ object Producer { * which represent records that failed to produce to Kinesis within a * given batch */ - final case class Error( - errors: Option[ - Ior[NonEmptyList[InvalidRecord], NonEmptyList[FailedRecord]] - ] - ) extends Exception { + final case class Error(errors: Option[Errs]) extends Exception { private[kinesis4cats] def add(that: Error): Error = Error( errors.combine(that.errors) ) diff --git a/shared/src/main/scala/kinesis4cats/producer/ShardMapCache.scala b/shared/src/main/scala/kinesis4cats/producer/ShardMapCache.scala index 1bf8c4bd..b7f87e2f 100644 --- a/shared/src/main/scala/kinesis4cats/producer/ShardMapCache.scala +++ b/shared/src/main/scala/kinesis4cats/producer/ShardMapCache.scala @@ -101,10 +101,15 @@ private[kinesis4cats] class ShardMapCache[F[_]] private ( /** Start the cache */ - private def start() = refresh() - .flatMap(_ => F.sleep(config.refreshInterval)) - .foreverM - .background + private def start() = for { + _ <- refresh().toResource + _ <- F + .sleep(config.refreshInterval) + .flatMap(_ => refresh()) + .foreverM + .background + .void + } yield () } diff --git a/shared/src/main/scala/kinesis4cats/producer/batching/AggregatedBatch.scala b/shared/src/main/scala/kinesis4cats/producer/batching/AggregatedBatch.scala index cae1db80..d3b7333e 100644 --- a/shared/src/main/scala/kinesis4cats/producer/batching/AggregatedBatch.scala +++ b/shared/src/main/scala/kinesis4cats/producer/batching/AggregatedBatch.scala @@ -30,7 +30,11 @@ import kinesis4cats.producer.Producer import kinesis4cats.protobuf.Messages.AggregatedRecord /** Represents records that can be aggregated into a single record using the - * [[https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation KPL Aggregation Format]] + * [[https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation KPL Aggregation Format]]. + * We can aggregate records that share a single partition key. If the shard-map + * never changed, you could potentially aggregate records for an entire shard, + * however if a user scales the stream, then that shard map would be out of + * date. * * @param shardId * Shard ID for the batch @@ -45,10 +49,8 @@ import kinesis4cats.protobuf.Messages.AggregatedRecord * Map of partition kyes to their known index in the records * @param digest * [[https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/security/MessageDigest.html MessageDigest]] - * @param aggPartitionKey + * @param partitionKey * Partition Key for the aggregated record - * @param aggExplicitHashKey - * Explicit Hash Key for the aggregated record * @param config * [[kinesis4cats.producer.batching.Batcher.Config Batcher.Config]] */ @@ -59,8 +61,7 @@ private[kinesis4cats] final case class AggregatedBatch private ( explicitHashKeys: Map[String, Int], partitionKeys: Map[String, Int], digest: MessageDigest, - aggPartitionKey: String, - aggExplicitHashKey: Option[String], + partitionKey: String, config: Batcher.Config ) { @@ -135,7 +136,7 @@ private[kinesis4cats] final case class AggregatedBatch private ( } def asRecord: Record.WithShard = Record.WithShard( - Record(asBytes, aggPartitionKey, aggExplicitHashKey, None), + Record(asBytes, partitionKey, None, None), shardId ) } @@ -156,7 +157,6 @@ private[kinesis4cats] object AggregatedBatch { Map(record.record.partitionKey -> 0), digest, record.record.partitionKey, - None, config ) } diff --git a/shared/src/main/scala/kinesis4cats/producer/batching/Batcher.scala b/shared/src/main/scala/kinesis4cats/producer/batching/Batcher.scala index 529dad42..edae76a3 100644 --- a/shared/src/main/scala/kinesis4cats/producer/batching/Batcher.scala +++ b/shared/src/main/scala/kinesis4cats/producer/batching/Batcher.scala @@ -112,14 +112,15 @@ private[kinesis4cats] final class Batcher(config: Batcher.Config) { ): Option[NonEmptyList[Batch]] = { val aggregated = records - .groupByNem(_.predictedShard) + .groupByNem(_.record.partitionKey) .toNonEmptyList - .flatTraverse(_aggregateShard(_)) + .flatTraverse(_aggregatePartitionKey(_)) aggregated.flatMap(_batch(_)) } /** Aggregate a list of [[kinesis4cats.producer.Record.WithShard records]] + * that share the same partition key * * @param records * [[cats.data.NonEmptyList NonEmptyList]] of @@ -130,7 +131,7 @@ private[kinesis4cats] final class Batcher(config: Batcher.Config) { * List of batches */ @annotation.tailrec - private def _aggregateShard( + private def _aggregatePartitionKey( records: NonEmptyList[Record.WithShard], res: Option[NonEmptyList[AggregatedBatch]] = None ): Option[NonEmptyList[Record.WithShard]] = { @@ -146,7 +147,7 @@ private[kinesis4cats] final class Batcher(config: Batcher.Config) { NonEmptyList.fromList(records.tail) match { case None => newRes.map(_.map(_.asRecord).reverse) - case Some(recs) => _aggregateShard(recs, newRes) + case Some(recs) => _aggregatePartitionKey(recs, newRes) } } diff --git a/shared/src/test/scala/kinesis4cats/producer/batching/AggregatedBatchSpec.scala b/shared/src/test/scala/kinesis4cats/producer/batching/AggregatedBatchSpec.scala index dcd30f30..a476be68 100644 --- a/shared/src/test/scala/kinesis4cats/producer/batching/AggregatedBatchSpec.scala +++ b/shared/src/test/scala/kinesis4cats/producer/batching/AggregatedBatchSpec.scala @@ -17,11 +17,19 @@ package kinesis4cats.producer package batching +import scala.jdk.CollectionConverters._ + +import java.nio.ByteBuffer +import java.time.Instant + import cats.syntax.all._ import com.amazonaws.kinesis.agg.AggRecord +import software.amazon.kinesis.retrieval.AggregatorUtil +import software.amazon.kinesis.retrieval.KinesisClientRecord import kinesis4cats.instances.eq._ import kinesis4cats.models.ShardId +import kinesis4cats.syntax.bytebuffer._ @SuppressWarnings(Array("scalafix:DisableSyntax.null")) class AggregatedBatchSpec extends munit.CatsEffectSuite { @@ -105,4 +113,39 @@ class AggregatedBatchSpec extends munit.CatsEffectSuite { assert(agRecord.addUserRecord(partitionKey2, null, data2)) assert(!agRecord.addUserRecord(partitionKey3, null, data3)) } + + test("It should be able to deaggregate") { + val data1 = Array.fill[Byte](500)(1) + val partitionKey1 = "foo" + val data2 = Array.fill[Byte](500)(1) + val partitionKey2 = "wazzle" + + val batch = AggregatedBatch + .create( + Record.WithShard.fromOption(Record(data1, partitionKey1), None), + Batcher.Config.default + ) + .add(Record.WithShard.fromOption(Record(data2, partitionKey2), None)) + + val testBytes = batch.asBytes + + val kclRecord = KinesisClientRecord + .builder() + .data(ByteBuffer.wrap(testBytes)) + .partitionKey(batch.partitionKey) + .aggregated(true) + .approximateArrivalTimestamp(Instant.now()) + .build() + + val res = new AggregatorUtil() + .deaggregate(java.util.List.of[KinesisClientRecord](kclRecord)) + .asScala + .toList + + assert(res.head.data().asArray === data1) + assert(res.head.partitionKey() === partitionKey1) + assert(res(1).data().asArray === data2) + assert(res(1).partitionKey() === partitionKey2) + + } } diff --git a/smithy4s-client-fs2/src/main/scala/kinesis4cats/smithy4s/client/producer/fs2/FS2KinesisProducer.scala b/smithy4s-client-fs2/src/main/scala/kinesis4cats/smithy4s/client/producer/fs2/FS2KinesisProducer.scala new file mode 100644 index 00000000..ee6a4e09 --- /dev/null +++ b/smithy4s-client-fs2/src/main/scala/kinesis4cats/smithy4s/client/producer/fs2/FS2KinesisProducer.scala @@ -0,0 +1,141 @@ +/* + * Copyright 2023-2023 etspaceman + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kinesis4cats.smithy4s.client +package producer +package fs2 + +import cats.effect.Async +import cats.effect.kernel.Resource +import cats.effect.std.Queue +import cats.effect.syntax.all._ +import com.amazonaws.kinesis.PutRecordsInput +import com.amazonaws.kinesis.PutRecordsOutput +import org.http4s.client.Client +import org.typelevel.log4cats.StructuredLogger +import org.typelevel.log4cats.noop.NoOpLogger +import smithy4s.aws.AwsCredentialsProvider +import smithy4s.aws.SimpleHttpClient +import smithy4s.aws.kernel.AwsCredentials +import smithy4s.aws.kernel.AwsRegion + +import kinesis4cats.producer._ +import kinesis4cats.producer.fs2.FS2Producer + +/** A buffered Kinesis producer which will produce batches of data at a + * configurable rate. + * + * @param config + * [[kinesis.producer.fs2.FS2Producer.Config FS2Producer.Config]] + * @param queue + * [[cats.effect.std.Queue Queue]] of + * [[kinesis4cats.producer.Record Records]] to produce. + * @param underlying + * [[kinesis4cats.smithy4s.client.producer.KinesisProducer KinesisProducer]] + * @param callback: + * Function that can be run after each of the put results from the underlying + * @param F + * [[cats.effect.Async Async]] + * @param LE + * [[kinesis4cats.producer.Producer.LogEncoders Producer.LogEncoders]] + */ +final class FS2KinesisProducer[F[_]] private[kinesis4cats] ( + override val logger: StructuredLogger[F], + override val config: FS2Producer.Config, + override protected val queue: Queue[F, Option[Record]], + override protected val underlying: KinesisProducer[F] +)( + override protected val callback: ( + Producer.Res[PutRecordsOutput], + Async[F] + ) => F[Unit] +)(implicit + F: Async[F] +) extends FS2Producer[F, PutRecordsInput, PutRecordsOutput] + +object FS2KinesisProducer { + + /** Basic constructor for the + * [[kinesis4cats.smithy4s.client.producer.fs2.FS2KinesisProducer FS2KinesisProducer]] + * + * @param config + * [[kinesis4cats.producer.fs2.FS2Producer.Config FS2Producer.Config]] + * @param client + * [[org.http4s.client.Client Client]] instance + * @param region + * [[https://github.com/disneystreaming/smithy4s/blob/series/0.17/modules/aws-kernel/src/smithy4s/aws/AwsRegion.scala AwsRegion]] + * @param loggerF + * [[cats.effect.Async Async]] => F of + * [[org.typelevel.log4cats.StructuredLogger StructuredLogger]]. Default + * uses [[org.typelevel.log4cats.noop.NoOpLogger NoOpLogger]] + * @param credsF + * ( + * [[https://github.com/disneystreaming/smithy4s/blob/series/0.17/modules/aws/src/smithy4s/aws/SimpleHttpClient.scala SimpleHttpClient]], + * [[cats.effect.Async Async]]) => F of + * [[https://github.com/disneystreaming/smithy4s/blob/series/0.17/modules/aws-kernel/src/smithy4s/aws/AwsCredentials.scala AwsCredentials]] + * Default uses + * [[https://github.com/disneystreaming/smithy4s/blob/series/0.17/modules/aws/src/smithy4s/aws/AwsCredentialsProvider.scala AwsCredentialsProvider.default]] + * @param callback + * Function that can be run after each of the put results from the + * underlying + * @param F + * [[cats.effect.Async Async]] + * @param LE + * [[kinesis4cats.producer.Producer.LogEncoders Producer.LogEncoders]] + * @param KLE + * [[kinesis4cats.smithy4s.client.KinesisClient.LogEncoders KinesisClient.LogEncoders]] + * @param SLE + * [[kinesis4cats.producer.ShardMapCache.LogEncoders ShardMapCache.LogEncoders]] + * @return + * [[cats.effect.Resource Resource]] of + * [[kinesis4cats.smithy4s.client.producer.KinesisProducer KinesisProducer]] + */ + def apply[F[_]]( + config: FS2Producer.Config, + client: Client[F], + region: F[AwsRegion], + loggerF: Async[F] => F[StructuredLogger[F]] = (f: Async[F]) => + f.pure(NoOpLogger[F](f)), + credsF: ( + SimpleHttpClient[F], + Async[F] + ) => Resource[F, F[AwsCredentials]] = + (x: SimpleHttpClient[F], f: Async[F]) => + AwsCredentialsProvider.default[F](x)(f), + callback: (Producer.Res[PutRecordsOutput], Async[F]) => F[Unit] = + (_: Producer.Res[PutRecordsOutput], f: Async[F]) => f.unit + )(implicit + F: Async[F], + LE: Producer.LogEncoders, + KLE: KinesisClient.LogEncoders[F], + SLE: ShardMapCache.LogEncoders + ): Resource[F, FS2KinesisProducer[F]] = for { + logger <- loggerF(F).toResource + underlying <- KinesisProducer( + config.producerConfig, + client, + region, + loggerF, + credsF + ) + queue <- Queue.bounded[F, Option[Record]](config.queueSize).toResource + producer = new FS2KinesisProducer[F](logger, config, queue, underlying)( + callback + ) + _ <- producer.start() + _ <- Resource.onFinalize(producer.stop()) + } yield producer +} diff --git a/smithy4s-client-localstack/src/main/scala/kinesis4cats/smithy4s/client/producer/fs2/localstack/LocalstackFS2KinesisProducer.scala b/smithy4s-client-localstack/src/main/scala/kinesis4cats/smithy4s/client/producer/fs2/localstack/LocalstackFS2KinesisProducer.scala new file mode 100644 index 00000000..949da589 --- /dev/null +++ b/smithy4s-client-localstack/src/main/scala/kinesis4cats/smithy4s/client/producer/fs2/localstack/LocalstackFS2KinesisProducer.scala @@ -0,0 +1,175 @@ +/* + * Copyright 2023-2023 etspaceman + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kinesis4cats.smithy4s.client +package producer +package fs2 +package localstack + +import cats.effect._ +import cats.effect.std.Queue +import cats.effect.syntax.all._ +import com.amazonaws.kinesis.PutRecordsOutput +import org.http4s.client.Client +import org.typelevel.log4cats.StructuredLogger +import org.typelevel.log4cats.slf4j.Slf4jLogger +import smithy4s.aws.kernel.AwsRegion + +import kinesis4cats.localstack.LocalstackConfig +import kinesis4cats.logging.LogEncoder +import kinesis4cats.models.StreamNameOrArn +import kinesis4cats.producer.Producer +import kinesis4cats.producer.Record +import kinesis4cats.producer.ShardMapCache +import kinesis4cats.producer.fs2.FS2Producer +import kinesis4cats.smithy4s.client.localstack.LocalstackKinesisClient + +/** Like KinesisProducer, but also includes the + * [[kinesis4cats.smithy4s.client.middleware.LocalstackProxy LocalstackProxy]] + * middleware, and leverages mock AWS credentials + */ +object LocalstackFS2KinesisProducer { + + /** Creates a [[cats.effect.Resource Resource]] of a + * [[kinesis4cats.smithy4s.client.producer.KinesisProducer KinesisProducer]] + * that is compatible with Localstack + * + * @param client + * [[https://http4s.org/v0.23/docs/client.html Client]] + * @param region + * [[https://github.com/disneystreaming/smithy4s/blob/series/0.17/modules/aws-kernel/src/smithy4s/aws/AwsRegion.scala AwsRegion]] + * @param producerConfig + * [[kinesis4cats.producer.Producer.Config Producer.Config]] + * @param config + * [[kinesis4cats.localstack.LocalstackConfig LocalstackConfig]] + * @param loggerF + * [[cats.effect.Async Async]] => [[cats.effect.Async Async]] of + * [[https://github.com/typelevel/log4cats/blob/main/core/shared/src/main/scala/org/typelevel/log4cats/StructuredLogger.scala StructuredLogger]]. + * @param callback + * Function that can be run after each of the put results from the + * underlying + * @param F + * [[cats.effect.Async Async]] + * @return + * [[https://github.com/disneystreaming/smithy4s/blob/series/0.17/modules/aws-kernel/src/smithy4s/aws/AwsEnvironment.scala AwsEnvironment]] + */ + def resource[F[_]]( + client: Client[F], + region: F[AwsRegion], + producerConfig: FS2Producer.Config, + config: LocalstackConfig, + loggerF: Async[F] => F[StructuredLogger[F]], + callback: (Producer.Res[PutRecordsOutput], Async[F]) => F[Unit] + )(implicit + F: Async[F], + LE: KinesisClient.LogEncoders[F], + LELC: LogEncoder[LocalstackConfig], + SLE: ShardMapCache.LogEncoders, + PLE: Producer.LogEncoders + ): Resource[F, FS2KinesisProducer[F]] = for { + logger <- loggerF(F).toResource + _underlying <- LocalstackKinesisClient + .clientResource[F](client, region, config, loggerF) + shardMapCache <- ShardMapCache[F]( + producerConfig.producerConfig.shardMapCacheConfig, + KinesisProducer.getShardMap( + _underlying, + producerConfig.producerConfig.streamNameOrArn + ), + loggerF(F) + ) + queue <- Queue + .bounded[F, Option[Record]](producerConfig.queueSize) + .toResource + underlying = new KinesisProducer[F]( + logger, + shardMapCache, + producerConfig.producerConfig, + _underlying + ) + producer = new FS2KinesisProducer[F]( + logger, + producerConfig, + queue, + underlying + )( + callback + ) + _ <- producer.start() + _ <- Resource.onFinalize(producer.stop()) + } yield producer + + /** Creates a [[cats.effect.Resource Resource]] of a + * [[kinesis4cats.smithy4s.client.producer.KinesisProducer KinesisProducer]] + * that is compatible with Localstack + * + * @param client + * [[https://http4s.org/v0.23/docs/client.html Client]] + * @param streamName + * Name of stream that this producer will produce to + * @param region + * [[https://github.com/disneystreaming/smithy4s/blob/series/0.17/modules/aws-kernel/src/smithy4s/aws/AwsRegion.scala AwsRegion]]. + * @param prefix + * Optional string prefix to apply when loading configuration. Default to + * None + * @param producerConfig + * String => [[kinesis4cats.producer.Producer.Config Producer.Config]] + * function that creates configuration given a stream name. Defaults to + * Producer.Config.default + * @param loggerF + * [[cats.effect.Async Async]] => [[cats.effect.Async Async]] of + * [[https://github.com/typelevel/log4cats/blob/main/core/shared/src/main/scala/org/typelevel/log4cats/StructuredLogger.scala StructuredLogger]]. + * Default is + * [[https://github.com/typelevel/log4cats/blob/main/noop/shared/src/main/scala/org/typelevel/log4cats/noop/NoOpLogger.scala NoOpLogger]] + * @param callback + * Function that can be run after each of the put results from the + * underlying. Default is F.unit + * @param F + * [[cats.effect.Async Async]] + * @return + * [[https://github.com/disneystreaming/smithy4s/blob/series/0.17/modules/aws-kernel/src/smithy4s/aws/AwsEnvironment.scala AwsEnvironment]] + */ + def resource[F[_]]( + client: Client[F], + streamName: String, + region: F[AwsRegion], + prefix: Option[String] = None, + producerConfig: String => FS2Producer.Config = streamName => + FS2Producer.Config + .default(StreamNameOrArn.Name(streamName)), + loggerF: Async[F] => F[StructuredLogger[F]] = (f: Async[F]) => + Slf4jLogger.create[F](f, implicitly), + callback: (Producer.Res[PutRecordsOutput], Async[F]) => F[Unit] = + (_: Producer.Res[PutRecordsOutput], f: Async[F]) => f.unit + )(implicit + F: Async[F], + LE: KinesisClient.LogEncoders[F], + LELC: LogEncoder[LocalstackConfig], + SLE: ShardMapCache.LogEncoders, + PLE: Producer.LogEncoders + ): Resource[F, FS2KinesisProducer[F]] = LocalstackConfig + .resource[F](prefix) + .flatMap( + resource[F]( + client, + region, + producerConfig(streamName), + _, + loggerF, + callback + ) + ) +} diff --git a/smithy4s-client-localstack/src/main/scala/kinesis4cats/smithy4s/client/producer/localstack/LocalstackKinesisProducer.scala b/smithy4s-client-localstack/src/main/scala/kinesis4cats/smithy4s/client/producer/localstack/LocalstackKinesisProducer.scala index b7435129..12980bac 100644 --- a/smithy4s-client-localstack/src/main/scala/kinesis4cats/smithy4s/client/producer/localstack/LocalstackKinesisProducer.scala +++ b/smithy4s-client-localstack/src/main/scala/kinesis4cats/smithy4s/client/producer/localstack/LocalstackKinesisProducer.scala @@ -14,7 +14,9 @@ * limitations under the License. */ -package kinesis4cats.smithy4s.client.producer.localstack +package kinesis4cats.smithy4s.client +package producer +package localstack import cats.effect._ import cats.effect.syntax.all._ @@ -27,10 +29,9 @@ import kinesis4cats.localstack.LocalstackConfig import kinesis4cats.logging.LogEncoder import kinesis4cats.models.StreamNameOrArn import kinesis4cats.producer.Producer +import kinesis4cats.producer.ShardMap import kinesis4cats.producer.ShardMapCache -import kinesis4cats.smithy4s.client.KinesisClient import kinesis4cats.smithy4s.client.localstack.LocalstackKinesisClient -import kinesis4cats.smithy4s.client.producer.KinesisProducer /** Like KinesisProducer, but also includes the * [[kinesis4cats.smithy4s.client.middleware.LocalstackProxy LocalstackProxy]] @@ -63,7 +64,12 @@ object LocalstackKinesisProducer { region: F[AwsRegion], producerConfig: Producer.Config, config: LocalstackConfig, - loggerF: Async[F] => F[StructuredLogger[F]] + loggerF: Async[F] => F[StructuredLogger[F]], + shardMapF: ( + KinesisClient[F], + StreamNameOrArn, + Async[F] + ) => F[Either[ShardMapCache.Error, ShardMap]] )(implicit F: Async[F], LE: KinesisClient.LogEncoders[F], @@ -76,7 +82,7 @@ object LocalstackKinesisProducer { .clientResource[F](client, region, config, loggerF) shardMapCache <- ShardMapCache[F]( producerConfig.shardMapCacheConfig, - KinesisProducer.getShardMap(underlying, producerConfig.streamNameOrArn), + shardMapF(underlying, producerConfig.streamNameOrArn, F), loggerF(F) ) producer = new KinesisProducer[F]( @@ -123,7 +129,16 @@ object LocalstackKinesisProducer { Producer.Config .default(StreamNameOrArn.Name(streamName)), loggerF: Async[F] => F[StructuredLogger[F]] = (f: Async[F]) => - Slf4jLogger.create[F](f, implicitly) + Slf4jLogger.create[F](f, implicitly), + shardMapF: ( + KinesisClient[F], + StreamNameOrArn, + Async[F] + ) => F[Either[ShardMapCache.Error, ShardMap]] = ( + client: KinesisClient[F], + streamNameOrArn: StreamNameOrArn, + f: Async[F] + ) => KinesisProducer.getShardMap(client, streamNameOrArn)(f) )(implicit F: Async[F], LE: KinesisClient.LogEncoders[F], @@ -133,6 +148,13 @@ object LocalstackKinesisProducer { ): Resource[F, KinesisProducer[F]] = LocalstackConfig .resource[F](prefix) .flatMap( - resource[F](client, region, producerConfig(streamName), _, loggerF) + resource[F]( + client, + region, + producerConfig(streamName), + _, + loggerF, + shardMapF + ) ) } diff --git a/smithy4s-client-producer-tests/src/it/scala/kinesis4cats/smithy4s/client/producer/KinesisProducerNoShardMapSpec.scala b/smithy4s-client-producer-tests/src/it/scala/kinesis4cats/smithy4s/client/producer/KinesisProducerNoShardMapSpec.scala new file mode 100644 index 00000000..d980e221 --- /dev/null +++ b/smithy4s-client-producer-tests/src/it/scala/kinesis4cats/smithy4s/client/producer/KinesisProducerNoShardMapSpec.scala @@ -0,0 +1,115 @@ +/* + * Copyright 2023-2023 etspaceman + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kinesis4cats.smithy4s.client.producer + +import java.util.UUID + +import cats.effect._ +import cats.effect.syntax.all._ +import cats.syntax.all._ +import com.amazonaws.kinesis.PutRecordsInput +import com.amazonaws.kinesis.PutRecordsOutput +import org.http4s.blaze.client.BlazeClientBuilder +import org.typelevel.log4cats.slf4j.Slf4jLogger +import smithy4s.aws.kernel.AwsRegion + +import kinesis4cats.kcl.CommittableRecord +import kinesis4cats.kcl.localstack.LocalstackKCLConsumer +import kinesis4cats.kcl.logging.instances.show._ +import kinesis4cats.logging.instances.show._ +import kinesis4cats.models.StreamNameOrArn +import kinesis4cats.producer.Producer +import kinesis4cats.producer.ProducerSpec +import kinesis4cats.producer.ShardMapCache +import kinesis4cats.producer.logging.instances.show._ +import kinesis4cats.smithy4s.client.KinesisClient +import kinesis4cats.smithy4s.client.localstack.LocalstackKinesisClient +import kinesis4cats.smithy4s.client.logging.instances.show._ +import kinesis4cats.smithy4s.client.producer.localstack.LocalstackKinesisProducer +import kinesis4cats.syntax.bytebuffer._ + +class KinesisProducerNoShardMapSpec + extends ProducerSpec[ + PutRecordsInput, + PutRecordsOutput, + CommittableRecord[IO] + ]() { + override lazy val streamName: String = + s"kinesis-client-producer-no-shard-map-spec-${UUID.randomUUID().toString()}" + + def http4sClientResource = + BlazeClientBuilder[IO].withCheckEndpointAuthentication(false).resource + + lazy val region = IO.pure(AwsRegion.US_EAST_1) + + override def producerResource + : Resource[IO, Producer[IO, PutRecordsInput, PutRecordsOutput]] = + for { + http4sClient <- http4sClientResource + producer <- LocalstackKinesisProducer + .resource[IO]( + http4sClient, + streamName, + region, + loggerF = (_: Async[IO]) => Slf4jLogger.create[IO], + shardMapF = ( + _: KinesisClient[IO], + _: StreamNameOrArn, + _: Async[IO] + ) => + IO.pure( + ShardMapCache + .ListShardsError( + new RuntimeException("Expected Exception") + ) + .asLeft + ) + ) + } yield producer + + override def aAsBytes(a: CommittableRecord[IO]): Array[Byte] = a.data.asArray + + override def fixture( + shardCount: Int, + appName: String + ): SyncIO[FunFixture[ProducerSpec.Resources[ + IO, + PutRecordsInput, + PutRecordsOutput, + CommittableRecord[IO] + ]]] = ResourceFixture( + for { + http4sClient <- http4sClientResource + _ <- LocalstackKinesisClient + .streamResource[IO]( + http4sClient, + region, + streamName, + shardCount, + loggerF = (f: Async[IO]) => Slf4jLogger.create[IO](f, implicitly) + ) + deferredWithResults <- LocalstackKCLConsumer.kclConsumerWithResults( + streamName, + appName, + resultsQueueSize = 100 + )((_: List[CommittableRecord[IO]]) => IO.unit) + _ <- deferredWithResults.deferred.get.toResource + producer <- producerResource + } yield ProducerSpec.Resources(deferredWithResults.resultsQueue, producer) + ) + +} diff --git a/smithy4s-client-producer-tests/src/it/scala/kinesis4cats/smithy4s/client/producer/fs2/FS2KinesisProducerSpec.scala b/smithy4s-client-producer-tests/src/it/scala/kinesis4cats/smithy4s/client/producer/fs2/FS2KinesisProducerSpec.scala new file mode 100644 index 00000000..bd7d9e24 --- /dev/null +++ b/smithy4s-client-producer-tests/src/it/scala/kinesis4cats/smithy4s/client/producer/fs2/FS2KinesisProducerSpec.scala @@ -0,0 +1,101 @@ +/* + * Copyright 2023-2023 etspaceman + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kinesis4cats.smithy4s.client.producer.fs2 + +import java.util.UUID + +import cats.effect._ +import cats.effect.syntax.all._ +import com.amazonaws.kinesis.PutRecordsInput +import com.amazonaws.kinesis.PutRecordsOutput +import org.http4s.blaze.client.BlazeClientBuilder +import org.typelevel.log4cats.slf4j.Slf4jLogger +import smithy4s.aws.kernel.AwsRegion + +import kinesis4cats.kcl.CommittableRecord +import kinesis4cats.kcl.localstack.LocalstackKCLConsumer +import kinesis4cats.kcl.logging.instances.show._ +import kinesis4cats.logging.instances.show._ +import kinesis4cats.producer.fs2.FS2Producer +import kinesis4cats.producer.fs2.FS2ProducerSpec +import kinesis4cats.producer.logging.instances.show._ +import kinesis4cats.smithy4s.client.localstack.LocalstackKinesisClient +import kinesis4cats.smithy4s.client.logging.instances.show._ +import kinesis4cats.smithy4s.client.producer.fs2.localstack.LocalstackFS2KinesisProducer +import kinesis4cats.syntax.bytebuffer._ + +class FS2KinesisProducerSpec + extends FS2ProducerSpec[ + PutRecordsInput, + PutRecordsOutput, + CommittableRecord[IO] + ]() { + override lazy val streamName: String = + s"kinesis-client-fs2-producer-spec-${UUID.randomUUID().toString()}" + + def http4sClientResource = + BlazeClientBuilder[IO].withCheckEndpointAuthentication(false).resource + + lazy val region = IO.pure(AwsRegion.US_EAST_1) + + override def producerResource + : Resource[IO, FS2Producer[IO, PutRecordsInput, PutRecordsOutput]] = + for { + http4sClient <- http4sClientResource + producer <- LocalstackFS2KinesisProducer + .resource[IO]( + http4sClient, + streamName, + region, + loggerF = (_: Async[IO]) => Slf4jLogger.create[IO] + ) + } yield producer + + override def aAsBytes(a: CommittableRecord[IO]): Array[Byte] = a.data.asArray + + override def fixture( + shardCount: Int, + appName: String + ): SyncIO[FunFixture[FS2ProducerSpec.Resources[ + IO, + PutRecordsInput, + PutRecordsOutput, + CommittableRecord[IO] + ]]] = ResourceFixture( + for { + http4sClient <- http4sClientResource + _ <- LocalstackKinesisClient + .streamResource[IO]( + http4sClient, + region, + streamName, + shardCount, + loggerF = (f: Async[IO]) => Slf4jLogger.create[IO](f, implicitly) + ) + deferredWithResults <- LocalstackKCLConsumer.kclConsumerWithResults( + streamName, + appName + )((_: List[CommittableRecord[IO]]) => IO.unit) + _ <- deferredWithResults.deferred.get.toResource + producer <- producerResource + } yield FS2ProducerSpec.Resources( + deferredWithResults.resultsQueue, + producer + ) + ) + +} diff --git a/smithy4s-client/src/main/scala/kinesis4cats/smithy4s/client/middleware/ResponseLogger.scala b/smithy4s-client/src/main/scala/kinesis4cats/smithy4s/client/middleware/ResponseLogger.scala index 85b8c028..7be397de 100644 --- a/smithy4s-client/src/main/scala/kinesis4cats/smithy4s/client/middleware/ResponseLogger.scala +++ b/smithy4s-client/src/main/scala/kinesis4cats/smithy4s/client/middleware/ResponseLogger.scala @@ -75,8 +75,9 @@ object ResponseLogger { for { _ <- logger.debug(ctx.context)("Successfully completed request") + body <- newResponseBody.as[String] _ <- logger - .trace(ctx.addEncoded("responseBody", newResponseBody).context)( + .trace(ctx.addEncoded("responseBody", body).context)( "Logging response body" ) .handleErrorWith(t =>