From d4e324593a8f269f8f66081237e94ce6fcd828fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Koz=C5=82owski?= Date: Fri, 6 Oct 2023 20:52:40 +0200 Subject: [PATCH] Update to smithy4s 0.18 (#227) Co-authored-by: Eric Meisel --- .github/workflows/ci.yml | 29 ++++++++++--------- .jvmopts | 2 +- .mergify.yml | 18 ++++++------ build.sbt | 10 +++---- docker/docker-compose.yml | 2 +- docs/smithy4s/getting-started.md | 14 ++++----- .../smithy4s/client/KinesisClientSpec.scala | 11 +++---- project/Kinesis4CatsPlugin.scala | 17 +++++++++-- project/plugins.sbt | 2 +- .../localstack/LocalstackConfig.scala | 2 +- .../localstack/LocalstackKinesisClient.scala | 12 ++++---- .../LocalstackFS2KinesisProducer.scala | 12 ++++---- .../LocalstackKinesisProducer.scala | 12 ++++---- .../smithy4s/client/KinesisClient.scala | 22 +++++++------- .../client/producer/KinesisProducer.scala | 17 ++++++----- .../producer/fs2/FS2KinesisProducer.scala | 13 +++++---- 16 files changed, 103 insertions(+), 92 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 722cd9ea..97ad721b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -28,7 +28,7 @@ jobs: strategy: fail-fast: false matrix: - os: [ubuntu-latest] + os: [macos-latest] scala: [2.13] java: [temurin@17] project: [root-jvm-212, root-jvm-213, root-jvm-3, root-js-212, root-js-213, root-js-3, root-native-212, root-native-213, root-native-3] @@ -53,19 +53,22 @@ jobs: if: matrix.java == 'temurin@17' && steps.setup-java-temurin-17.outputs.cache-hit == 'false' run: sbt +update - - name: Install brew formulae (ubuntu) - if: startsWith(matrix.os, 'ubuntu') - run: /home/linuxbrew/.linuxbrew/bin/brew install openssl s2n + - name: Install brew formulae (macOS) + if: startsWith(matrix.os, 'macos') + run: brew install openssl s2n + + - name: Setup Docker + run: brew install docker docker-compose && colima start && sudo ln -sf $HOME/.colima/default/docker.sock /var/run/docker.sock - name: Check that workflows are up to date run: sbt githubWorkflowCheck - name: Check headers and formatting - if: matrix.java == 'temurin@17' && matrix.os == 'ubuntu-latest' + if: matrix.java == 'temurin@17' && matrix.os == 'macos-latest' run: sbt 'project ${{ matrix.project }}' headerCheckAll fmtCheck - name: Docker Compose Up - if: matrix.java == 'temurin@17' && matrix.os == 'ubuntu-latest' + if: matrix.java == 'temurin@17' && matrix.os == 'macos-latest' env: GITHUB_API_TOKEN: ${{ secrets.GITHUB_TOKEN }} uses: nick-fields/retry@v2 @@ -77,11 +80,11 @@ jobs: max_attempts: 3 - name: Link JS - if: matrix.java == 'temurin@17' && matrix.os == 'ubuntu-latest' && startsWith(matrix.project, 'root-js') + if: matrix.java == 'temurin@17' && matrix.os == 'macos-latest' && startsWith(matrix.project, 'root-js') run: sbt 'project ${{ matrix.project }}' Test/fastLinkJS - name: Link Native - if: matrix.java == 'temurin@17' && matrix.os == 'ubuntu-latest' && startsWith(matrix.project, 'root-native') + if: matrix.java == 'temurin@17' && matrix.os == 'macos-latest' && startsWith(matrix.project, 'root-native') env: GITHUB_API_TOKEN: ${{ secrets.GITHUB_TOKEN }} uses: nick-fields/retry@v2 @@ -92,7 +95,7 @@ jobs: retry_on: error - name: Test - if: matrix.java == 'temurin@17' && matrix.os == 'ubuntu-latest' + if: matrix.java == 'temurin@17' && matrix.os == 'macos-latest' run: sbt 'project ${{ matrix.project }}' test - name: Print docker logs and container listing @@ -100,15 +103,15 @@ jobs: run: sbt 'project ${{ matrix.project }}' dockerComposePs dockerComposeLogs - name: Remove docker containers - if: matrix.java == 'temurin@17' && matrix.os == 'ubuntu-latest' + if: matrix.java == 'temurin@17' && matrix.os == 'macos-latest' run: sbt 'project ${{ matrix.project }}' dockerComposeDown - name: Check scalafix lints - if: matrix.java == 'temurin@17' && matrix.os == 'ubuntu-latest' && matrix.scala != '3.3.1' + if: matrix.java == 'temurin@17' && matrix.os == 'macos-latest' && matrix.scala != '3.3.1' run: sbt 'project ${{ matrix.project }}' fixCheck - name: Generate API documentation - if: matrix.java == 'temurin@17' && matrix.os == 'ubuntu-latest' + if: matrix.java == 'temurin@17' && matrix.os == 'macos-latest' run: sbt 'project ${{ matrix.project }}' doc - name: Make target directories @@ -132,7 +135,7 @@ jobs: if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') strategy: matrix: - os: [ubuntu-latest] + os: [macos-latest] java: [temurin@17] runs-on: ${{ matrix.os }} steps: diff --git a/.jvmopts b/.jvmopts index caccb30d..be1e4a78 100644 --- a/.jvmopts +++ b/.jvmopts @@ -1,6 +1,6 @@ -Dfile.encoding=UTF8 -Xms1G --Xmx6G +-Xmx8G -Xss4M -XX:ReservedCodeCacheSize=250M -XX:+TieredCompilation diff --git a/.mergify.yml b/.mergify.yml index 45a5576d..147a13d2 100644 --- a/.mergify.yml +++ b/.mergify.yml @@ -10,15 +10,15 @@ pull_request_rules: conditions: - author=scala-steward - body~=labels:.*early-semver-patch - - status-success=Build and Test (ubuntu-latest, 2.13, temurin@17, root-jvm-212) - - status-success=Build and Test (ubuntu-latest, 2.13, temurin@17, root-jvm-213) - - status-success=Build and Test (ubuntu-latest, 2.13, temurin@17, root-jvm-3) - - status-success=Build and Test (ubuntu-latest, 2.13, temurin@17, root-js-212) - - status-success=Build and Test (ubuntu-latest, 2.13, temurin@17, root-js-213) - - status-success=Build and Test (ubuntu-latest, 2.13, temurin@17, root-js-3) - - status-success=Build and Test (ubuntu-latest, 2.13, temurin@17, root-native-212) - - status-success=Build and Test (ubuntu-latest, 2.13, temurin@17, root-native-213) - - status-success=Build and Test (ubuntu-latest, 2.13, temurin@17, root-native-3) + - status-success=Build and Test (macos-latest, 2.13, temurin@17, root-jvm-212) + - status-success=Build and Test (macos-latest, 2.13, temurin@17, root-jvm-213) + - status-success=Build and Test (macos-latest, 2.13, temurin@17, root-jvm-3) + - status-success=Build and Test (macos-latest, 2.13, temurin@17, root-js-212) + - status-success=Build and Test (macos-latest, 2.13, temurin@17, root-js-213) + - status-success=Build and Test (macos-latest, 2.13, temurin@17, root-js-3) + - status-success=Build and Test (macos-latest, 2.13, temurin@17, root-native-212) + - status-success=Build and Test (macos-latest, 2.13, temurin@17, root-native-213) + - status-success=Build and Test (macos-latest, 2.13, temurin@17, root-native-3) actions: merge: method: squash diff --git a/build.sbt b/build.sbt index 8ecab1f2..d23536c5 100644 --- a/build.sbt +++ b/build.sbt @@ -508,12 +508,10 @@ lazy val docs = projectMatrix ) ), laikaConfig := LaikaConfig.defaults.withConfigValue( - LinkConfig(sourceLinks = - Seq( - SourceLinks( - baseUri = "https://github.com/etspaceman/kinesis4cats/blob/main/", - suffix = "scala" - ) + LinkConfig.empty.addSourceLinks( + SourceLinks( + baseUri = "https://github.com/etspaceman/kinesis4cats/blob/main/", + suffix = "scala" ) ) ) diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 5e41e48e..87835e08 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -1,7 +1,7 @@ version: '3.8' services: localstack: - image: "localstack/localstack:2.2.0" + image: "localstack/localstack:2.3.0" environment: - "SERVICES=cloudwatch,kinesis,dynamodb,sts" - "KINESIS_LATENCY=0" diff --git a/docs/smithy4s/getting-started.md b/docs/smithy4s/getting-started.md index 03314dcb..d856393a 100644 --- a/docs/smithy4s/getting-started.md +++ b/docs/smithy4s/getting-started.md @@ -9,7 +9,7 @@ This module intends to be a native Scala implementation of a Kinesis Client usin Some known issues: - [SubscribeToShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html) will not work properly as it uses the http2 protocl. -- Updates to the smithy file(s) in this module are not intended to be backwards compatible. +- Updates to the smithy file(s) in this module are not intended to be backwards compatible. ## Installation @@ -25,7 +25,7 @@ import com.amazonaws.kinesis._ import org.http4s.blaze.client.BlazeClientBuilder import org.typelevel.log4cats.slf4j.Slf4jLogger import smithy4s.aws._ -import smithy4s.ByteArray +import smithy4s.Blob import kinesis4cats.smithy4s.client.KinesisClient @@ -40,7 +40,7 @@ object MyApp extends IOApp { for { _ <- client.createStream(StreamName("my-stream"), Some(1)) _ <- client.putRecord( - Data(ByteArray("my-data".getBytes())), + Data(Blob("my-data".getBytes())), PartitionKey("some-partitionk-key"), Some(StreamName("my-stream")) ) @@ -114,7 +114,7 @@ The Producer offering here allows users to handle these failure paths in multipl #### Retrying failures -A user can supply a @:source(compat.src.main.scala.kinesis4cats.compat.retry.RetryPolicy) that can be used to retry both error paths until a fully successful response is received. +A user can supply a @:source(compat.src.main.scala.kinesis4cats.compat.retry.RetryPolicy) that can be used to retry both error paths until a fully successful response is received. In the event of a partially-failed response, the retry routine will only retry the failed records. @@ -200,7 +200,7 @@ BlazeClientBuilder[IO].resource.flatMap(client => ## FS2 -This package provides a [KPL-like](https://github.com/awslabs/amazon-kinesis-producer) producer via implementing @:source(shared.src.main.scala.kinesis4cats.producer.fs2.FS2Producer). This interface receives records from a user, enqueues them into a Queue and puts them as batches to Kinesis on a configured interval. This leverages all of the functionality of the @:source(shared.src.main.scala.kinesis4cats.producer.Producer) interface, including batching, aggregation and retries. +This package provides a [KPL-like](https://github.com/awslabs/amazon-kinesis-producer) producer via implementing @:source(shared.src.main.scala.kinesis4cats.producer.fs2.FS2Producer). This interface receives records from a user, enqueues them into a Queue and puts them as batches to Kinesis on a configured interval. This leverages all of the functionality of the @:source(shared.src.main.scala.kinesis4cats.producer.Producer) interface, including batching, aggregation and retries. ### Usage @@ -215,7 +215,7 @@ import kinesis4cats.models.StreamNameOrArn import kinesis4cats.producer.Record object MyApp extends IOApp { - override def run(args: List[String]) = + override def run(args: List[String]) = BlazeClientBuilder[IO].resource.flatMap(client => FS2KinesisProducer.Builder .default[IO]( @@ -258,7 +258,7 @@ import kinesis4cats.models.StreamNameOrArn import kinesis4cats.producer.Record object MyApp extends IOApp { - override def run(args: List[String]) = + override def run(args: List[String]) = BlazeClientBuilder[IO].resource.flatMap(client => FS2KinesisProducer.Builder .default[IO]( diff --git a/integration-tests/src/test/scala/kinesis4cats/smithy4s/client/KinesisClientSpec.scala b/integration-tests/src/test/scala/kinesis4cats/smithy4s/client/KinesisClientSpec.scala index 8af83ee6..f7c19c0b 100644 --- a/integration-tests/src/test/scala/kinesis4cats/smithy4s/client/KinesisClientSpec.scala +++ b/integration-tests/src/test/scala/kinesis4cats/smithy4s/client/KinesisClientSpec.scala @@ -19,11 +19,12 @@ package smithy4s.client import scala.concurrent.duration._ -import _root_.smithy4s.ByteArray +import _root_.smithy4s.Blob import _root_.smithy4s.aws.AwsRegion import cats.effect._ import cats.syntax.all._ import com.amazonaws.kinesis._ +import fs2.io.compression._ import fs2.io.net.tls.TLSContext import io.circe.jawn._ import io.circe.syntax._ @@ -123,7 +124,7 @@ abstract class KinesisClientSpec extends munit.CatsEffectSuite { record1 <- IO(Arbitrary.arbitrary[TestData].one) _ <- client .putRecord( - Data(ByteArray(record1.asJson.noSpaces.getBytes())), + Data(Blob(record1.asJson.noSpaces.getBytes())), PartitionKey("foo"), Some(StreamName(streamName)) ) @@ -133,11 +134,11 @@ abstract class KinesisClientSpec extends munit.CatsEffectSuite { .putRecords( List( PutRecordsRequestEntry( - Data(ByteArray(record2.asJson.noSpaces.getBytes())), + Data(Blob(record2.asJson.noSpaces.getBytes())), PartitionKey("foo") ), PutRecordsRequestEntry( - Data(ByteArray(record3.asJson.noSpaces.getBytes())), + Data(Blob(record3.asJson.noSpaces.getBytes())), PartitionKey("foo") ) ), @@ -158,7 +159,7 @@ abstract class KinesisClientSpec extends munit.CatsEffectSuite { streamARN = Some(StreamARN(streamArn)) ) recordBytes = records.records - .map(x => new String(x.data.value.array)) + .map(x => x.data.value.toUTF8String) recordsParsed <- recordBytes.traverse(bytes => IO.fromEither(decode[TestData](bytes)) ) diff --git a/project/Kinesis4CatsPlugin.scala b/project/Kinesis4CatsPlugin.scala index 14e4c484..02baab61 100644 --- a/project/Kinesis4CatsPlugin.scala +++ b/project/Kinesis4CatsPlugin.scala @@ -75,9 +75,18 @@ object Kinesis4CatsPlugin extends AutoPlugin { tlSonatypeUseLegacyHost := true, resolvers += "s01 snapshots" at "https://s01.oss.sonatype.org/content/repositories/snapshots/", resolvers += "jitpack" at "https://jitpack.io", - Global / concurrentRestrictions += Tags.limit(NativeTags.Link, 1), - githubWorkflowBuildPreamble ++= nativeBrewInstallWorkflowSteps.value, + githubWorkflowBuildPreamble ++= nativeBrewInstallWorkflowSteps.value ++ Seq( + WorkflowStep.Run( + List( + "brew install docker docker-compose && " + + "colima start && " + + "sudo ln -sf $HOME/.colima/default/docker.sock /var/run/docker.sock" + ), + name = Some("Setup Docker") + ) + ), githubWorkflowBuildMatrixFailFast := Some(false), + githubWorkflowOSes := Seq("macos-latest"), githubWorkflowBuild := { val style = (tlCiHeaderCheck.value, tlCiScalafmtCheck.value) match { case (true, true) => // headers + formatting @@ -310,6 +319,10 @@ object Kinesis4CatsPlugin extends AutoPlugin { ";clean;coverage;test;coverageReport;coverageOff" ) ).flatten + + override def globalSettings: Seq[Setting[_]] = Seq( + concurrentRestrictions += Tags.limit(NativeTags.Link, 1) + ) } object Kinesis4CatsPluginKeys { diff --git a/project/plugins.sbt b/project/plugins.sbt index 36b22c5d..714ba0d2 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -6,7 +6,7 @@ addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.11.1") addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.3") addSbtPlugin("org.portable-scala" % "sbt-crossproject" % "1.3.2") addSbtPlugin( - "com.disneystreaming.smithy4s" % "smithy4s-sbt-codegen" % "0.17.19" + "com.disneystreaming.smithy4s" % "smithy4s-sbt-codegen" % "0.18.0" ) addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.11.0") addSbtPlugin("com.eed3si9n" % "sbt-projectmatrix" % "0.9.1") diff --git a/shared-localstack/src/main/scala/kinesis4cats/localstack/LocalstackConfig.scala b/shared-localstack/src/main/scala/kinesis4cats/localstack/LocalstackConfig.scala index 6017c944..a7d1d0d3 100644 --- a/shared-localstack/src/main/scala/kinesis4cats/localstack/LocalstackConfig.scala +++ b/shared-localstack/src/main/scala/kinesis4cats/localstack/LocalstackConfig.scala @@ -165,7 +165,7 @@ object LocalstackConfig { def defaultHost(prefix: Option[String] = None): ConfigValue[Effect, String] = CirisReader.readDefaulted( List("localstack", "host"), - "localhost", + "127.0.0.1", prefix ) diff --git a/smithy4s-client-localstack/src/main/scala/kinesis4cats/smithy4s/client/localstack/LocalstackKinesisClient.scala b/smithy4s-client-localstack/src/main/scala/kinesis4cats/smithy4s/client/localstack/LocalstackKinesisClient.scala index c9af9105..14ab96df 100644 --- a/smithy4s-client-localstack/src/main/scala/kinesis4cats/smithy4s/client/localstack/LocalstackKinesisClient.scala +++ b/smithy4s-client-localstack/src/main/scala/kinesis4cats/smithy4s/client/localstack/LocalstackKinesisClient.scala @@ -21,6 +21,8 @@ import cats.effect.Async import cats.effect.Resource import cats.syntax.all._ import com.amazonaws.kinesis._ +import fs2.compression.Compression +import fs2.io.file.Files import org.http4s.client.Client import org.typelevel.log4cats.StructuredLogger import org.typelevel.log4cats.noop.NoOpLogger @@ -51,7 +53,7 @@ object LocalstackKinesisClient { ) } - final case class Builder[F[_]] private ( + final case class Builder[F[_]: Compression: Files] private ( client: Client[F], region: AwsRegion, localstackConfig: LocalstackConfig, @@ -101,22 +103,18 @@ object LocalstackKinesisClient { } object Builder { - def default[F[_]]( + def default[F[_]: Async: Compression: Files]( client: Client[F], region: AwsRegion, prefix: Option[String] = None - )(implicit - F: Async[F] ): F[Builder[F]] = LocalstackConfig .load(prefix) .map(default(client, region, _)) - def default[F[_]]( + def default[F[_]: Async: Compression: Files]( client: Client[F], region: AwsRegion, config: LocalstackConfig - )(implicit - F: Async[F] ): Builder[F] = Builder[F]( client, diff --git a/smithy4s-client-localstack/src/main/scala/kinesis4cats/smithy4s/client/producer/fs2/localstack/LocalstackFS2KinesisProducer.scala b/smithy4s-client-localstack/src/main/scala/kinesis4cats/smithy4s/client/producer/fs2/localstack/LocalstackFS2KinesisProducer.scala index 3fd0e860..9aa919bc 100644 --- a/smithy4s-client-localstack/src/main/scala/kinesis4cats/smithy4s/client/producer/fs2/localstack/LocalstackFS2KinesisProducer.scala +++ b/smithy4s-client-localstack/src/main/scala/kinesis4cats/smithy4s/client/producer/fs2/localstack/LocalstackFS2KinesisProducer.scala @@ -19,7 +19,9 @@ package producer package fs2 package localstack +import _root_.fs2.compression.Compression import _root_.fs2.concurrent.Channel +import _root_.fs2.io.file.Files import cats.effect._ import cats.effect.kernel.DeferredSink import cats.effect.syntax.all._ @@ -45,7 +47,7 @@ import kinesis4cats.smithy4s.client.producer.localstack.LocalstackKinesisProduce * middleware, and leverages mock AWS credentials */ object LocalstackFS2KinesisProducer { - final case class Builder[F[_]] private ( + final case class Builder[F[_]: Compression: Files] private ( client: Client[F], region: AwsRegion, localstackConfig: LocalstackConfig, @@ -120,24 +122,20 @@ object LocalstackFS2KinesisProducer { } object Builder { - def default[F[_]]( + def default[F[_]: Async: Compression: Files]( client: Client[F], region: AwsRegion, streamNameOrArn: StreamNameOrArn, prefix: Option[String] = None - )(implicit - F: Async[F] ): F[Builder[F]] = LocalstackConfig .load(prefix) .map(default(client, region, streamNameOrArn, _)) - def default[F[_]]( + def default[F[_]: Async: Compression: Files]( client: Client[F], region: AwsRegion, streamNameOrArn: StreamNameOrArn, config: LocalstackConfig - )(implicit - F: Async[F] ): Builder[F] = Builder[F]( client, diff --git a/smithy4s-client-localstack/src/main/scala/kinesis4cats/smithy4s/client/producer/localstack/LocalstackKinesisProducer.scala b/smithy4s-client-localstack/src/main/scala/kinesis4cats/smithy4s/client/producer/localstack/LocalstackKinesisProducer.scala index 62cb87e5..33bdfbeb 100644 --- a/smithy4s-client-localstack/src/main/scala/kinesis4cats/smithy4s/client/producer/localstack/LocalstackKinesisProducer.scala +++ b/smithy4s-client-localstack/src/main/scala/kinesis4cats/smithy4s/client/producer/localstack/LocalstackKinesisProducer.scala @@ -18,6 +18,8 @@ package kinesis4cats.smithy4s.client package producer package localstack +import _root_.fs2.compression.Compression +import _root_.fs2.io.file.Files import cats.effect._ import cats.syntax.all._ import org.http4s.client.Client @@ -51,7 +53,7 @@ object LocalstackKinesisProducer { ) } - final case class Builder[F[_]] private ( + final case class Builder[F[_]: Compression: Files] private ( client: Client[F], region: AwsRegion, localstackConfig: LocalstackConfig, @@ -125,24 +127,20 @@ object LocalstackKinesisProducer { } object Builder { - def default[F[_]]( + def default[F[_]: Async: Compression: Files]( client: Client[F], region: AwsRegion, streamNameOrArn: StreamNameOrArn, prefix: Option[String] = None - )(implicit - F: Async[F] ): F[Builder[F]] = LocalstackConfig .load(prefix) .map(default(client, region, streamNameOrArn, _)) - def default[F[_]]( + def default[F[_]: Async: Compression: Files]( client: Client[F], region: AwsRegion, streamNameOrArn: StreamNameOrArn, config: LocalstackConfig - )(implicit - F: Async[F] ): Builder[F] = Builder[F]( client, diff --git a/smithy4s-client/src/main/scala/kinesis4cats/smithy4s/client/KinesisClient.scala b/smithy4s-client/src/main/scala/kinesis4cats/smithy4s/client/KinesisClient.scala index 564b614b..a2ccbc2b 100644 --- a/smithy4s-client/src/main/scala/kinesis4cats/smithy4s/client/KinesisClient.scala +++ b/smithy4s-client/src/main/scala/kinesis4cats/smithy4s/client/KinesisClient.scala @@ -18,11 +18,12 @@ package kinesis4cats package smithy4s.client import _root_.smithy4s.aws._ -import _root_.smithy4s.aws.http4s._ import cats.Show import cats.effect.{Async, Resource} import cats.syntax.all._ import com.amazonaws.kinesis._ +import fs2.compression.Compression +import fs2.io.file.Files import org.http4s.client.Client import org.http4s.{Request, Response} import org.typelevel.log4cats.StructuredLogger @@ -47,11 +48,11 @@ import kinesis4cats.smithy4s.client.middleware.RequestResponseLogger */ object KinesisClient { - final case class Builder[F[_]] private ( + final case class Builder[F[_]: Compression: Files] private ( client: Client[F], region: AwsRegion, logger: StructuredLogger[F], - credentialsResourceF: SimpleHttpClient[F] => Resource[F, F[ + credentialsResourceF: Client[F] => Resource[F, F[ AwsCredentials ]], encoders: LogEncoders[F], @@ -63,7 +64,7 @@ object KinesisClient { def withLogger(logger: StructuredLogger[F]): Builder[F] = copy(logger = logger) def withCredentials( - credentialsResourceF: SimpleHttpClient[F] => Resource[F, F[ + credentialsResourceF: Client[F] => Resource[F, F[ AwsCredentials ]] ): Builder[F] = @@ -80,24 +81,23 @@ object KinesisClient { if (logRequestsResponses) RequestResponseLogger(logger, encoders)(client) else client - val backend = - AwsHttp4sBackend[F](RequestResponseLogger(logger, encoders)(clnt)) for { - credentials <- credentialsResourceF(backend) + credentials <- credentialsResourceF(client) environment = AwsEnvironment.make[F]( - backend, + clnt, F.pure(region), credentials, F.realTime.map(_.toSeconds).map(Timestamp(_, 0)) ) - awsClient <- AwsClient.simple(Kinesis.service, environment) + awsClient <- AwsClient(Kinesis.service, environment) } yield awsClient } } object Builder { - def default[F[_]](client: Client[F], region: AwsRegion)(implicit - F: Async[F] + def default[F[_]: Async: Compression: Files]( + client: Client[F], + region: AwsRegion ): Builder[F] = Builder[F]( client, diff --git a/smithy4s-client/src/main/scala/kinesis4cats/smithy4s/client/producer/KinesisProducer.scala b/smithy4s-client/src/main/scala/kinesis4cats/smithy4s/client/producer/KinesisProducer.scala index 6bd5e21f..1b7e2f87 100644 --- a/smithy4s-client/src/main/scala/kinesis4cats/smithy4s/client/producer/KinesisProducer.scala +++ b/smithy4s-client/src/main/scala/kinesis4cats/smithy4s/client/producer/KinesisProducer.scala @@ -19,6 +19,8 @@ package producer import java.time.Instant +import _root_.fs2.compression.Compression +import _root_.fs2.io.file.Files import cats.data.NonEmptyList import cats.effect.Resource import cats.effect._ @@ -27,9 +29,8 @@ import com.amazonaws.kinesis._ import org.http4s.client.Client import org.typelevel.log4cats.StructuredLogger import org.typelevel.log4cats.noop.NoOpLogger -import smithy4s.ByteArray +import smithy4s.Blob import smithy4s.aws.AwsCredentialsProvider -import smithy4s.aws.SimpleHttpClient import smithy4s.aws.kernel.AwsCredentials import smithy4s.aws.kernel.AwsRegion @@ -69,7 +70,7 @@ final class KinesisProducer[F[_]] private[kinesis4cats] ( def toEntry(record: Rec): PutRecordsRequestEntry = PutRecordsRequestEntry( - Data(ByteArray(record.data)), + Data(Blob(record.data)), PartitionKey(record.partitionKey), record.explicitHashKey.map(HashKey(_)) ) @@ -103,12 +104,12 @@ final class KinesisProducer[F[_]] private[kinesis4cats] ( object KinesisProducer { - final case class Builder[F[_]] private ( + final case class Builder[F[_]: Compression: Files] private ( config: Producer.Config[F], client: Client[F], region: AwsRegion, logger: StructuredLogger[F], - credentialsResourceF: SimpleHttpClient[F] => Resource[F, F[ + credentialsResourceF: Client[F] => Resource[F, F[ AwsCredentials ]], encoders: LogEncoders[F], @@ -124,7 +125,7 @@ object KinesisProducer { def withLogger(logger: StructuredLogger[F]): Builder[F] = copy(logger = logger) def withCredentials( - credentialsResourceF: SimpleHttpClient[F] => Resource[F, F[ + credentialsResourceF: Client[F] => Resource[F, F[ AwsCredentials ]] ): Builder[F] = @@ -158,11 +159,11 @@ object KinesisProducer { } object Builder { - def default[F[_]]( + def default[F[_]: Async: Compression: Files]( streamNameOrArn: models.StreamNameOrArn, client: Client[F], region: AwsRegion - )(implicit F: Async[F]): Builder[F] = Builder[F]( + ): Builder[F] = Builder[F]( Producer.Config.default(streamNameOrArn), client, region, diff --git a/smithy4s-client/src/main/scala/kinesis4cats/smithy4s/client/producer/fs2/FS2KinesisProducer.scala b/smithy4s-client/src/main/scala/kinesis4cats/smithy4s/client/producer/fs2/FS2KinesisProducer.scala index f6029a29..4f8bc1cb 100644 --- a/smithy4s-client/src/main/scala/kinesis4cats/smithy4s/client/producer/fs2/FS2KinesisProducer.scala +++ b/smithy4s-client/src/main/scala/kinesis4cats/smithy4s/client/producer/fs2/FS2KinesisProducer.scala @@ -18,7 +18,9 @@ package kinesis4cats.smithy4s.client package producer package fs2 +import _root_.fs2.compression.Compression import _root_.fs2.concurrent.Channel +import _root_.fs2.io.file.Files import cats.effect._ import cats.effect.kernel.DeferredSink import cats.effect.syntax.all._ @@ -28,7 +30,6 @@ import org.http4s.client.Client import org.typelevel.log4cats.StructuredLogger import org.typelevel.log4cats.noop.NoOpLogger import smithy4s.aws.AwsCredentialsProvider -import smithy4s.aws.SimpleHttpClient import smithy4s.aws.kernel.AwsCredentials import smithy4s.aws.kernel.AwsRegion @@ -67,12 +68,12 @@ final class FS2KinesisProducer[F[_]] private[kinesis4cats] ( object FS2KinesisProducer { - final case class Builder[F[_]] private ( + final case class Builder[F[_]: Compression: Files] private ( config: FS2Producer.Config[F], client: Client[F], region: AwsRegion, logger: StructuredLogger[F], - credentialsResourceF: SimpleHttpClient[F] => Resource[F, F[ + credentialsResourceF: Client[F] => Resource[F, F[ AwsCredentials ]], encoders: KinesisProducer.LogEncoders[F], @@ -90,7 +91,7 @@ object FS2KinesisProducer { def withLogger(logger: StructuredLogger[F]): Builder[F] = copy(logger = logger) def withCredentials( - credentialsResourceF: SimpleHttpClient[F] => Resource[F, F[ + credentialsResourceF: Client[F] => Resource[F, F[ AwsCredentials ]] ): Builder[F] = @@ -127,11 +128,11 @@ object FS2KinesisProducer { } object Builder { - def default[F[_]]( + def default[F[_]: Async: Compression: Files]( streamNameOrArn: models.StreamNameOrArn, client: Client[F], region: AwsRegion - )(implicit F: Async[F]): Builder[F] = Builder[F]( + ): Builder[F] = Builder[F]( FS2Producer.Config.default(streamNameOrArn), client, region,