diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml index cfbe8c06..fb1c7119 100644 --- a/.github/workflows/push_pr.yml +++ b/.github/workflows/push_pr.yml @@ -23,15 +23,10 @@ concurrency: cancel-in-progress: true jobs: compile_and_test: - uses: ./.github/workflows/common.yml + uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils strategy: matrix: - flink: [1.17.1, 1.18.0] - with: - flink_version: ${{ matrix.flink }} - flink_url: https://archive.apache.org/dist/flink/flink-${{ matrix.flink }}/flink-${{ matrix.flink }}-bin-scala_2.12.tgz - cache_flink_binary: true - secrets: inherit + flink_version: [ 1.17.1, 1.18.0, 1.19-SNAPSHOT ] python_test: strategy: diff --git a/flink-connector-aws/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java b/flink-connector-aws/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java index 29160f7d..77f8787a 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java @@ -48,22 +48,22 @@ public class KinesisFirehoseSinkWriterTest { .build(); @BeforeEach - void setup() { + void setup() throws IOException { TestSinkInitContext sinkInitContext = new TestSinkInitContext(); Properties sinkProperties = AWSServicesTestUtils.createConfig("https://fake_aws_endpoint"); - sinkWriter = - new KinesisFirehoseSinkWriter<>( - ELEMENT_CONVERTER_PLACEHOLDER, - sinkInitContext, - 50, - 16, - 10000, - 4 * 1024 * 1024, - 5000, - 1000 * 1024, - true, - "streamName", - sinkProperties); + KinesisFirehoseSink sink = KinesisFirehoseSink.builder() + .setDeliveryStreamName("streamName") + .setSerializationSchema(new SimpleStringSchema()) + .setMaxBatchSize(50) + .setMaxInFlightRequests(16) + .setMaxBufferedRequests(10000) + .setMaxBatchSizeInBytes(4 * 1024 * 1024) + .setMaxTimeInBufferMS(5000) + .setMaxRecordSizeInBytes(1000 * 1024) + .setFailOnError(true) + .setFirehoseClientProperties(sinkProperties) + .build(); + sinkWriter = (KinesisFirehoseSinkWriter) sink.createWriter(sinkInitContext); } @Test diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriterTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriterTest.java index eccfe0ac..0d5b6eda 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriterTest.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriterTest.java @@ -28,6 +28,7 @@ import org.junit.jupiter.api.Test; import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry; +import java.io.IOException; import java.util.Properties; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; @@ -55,13 +56,12 @@ public class KinesisStreamsSinkWriterTest { .build(); @Test - void testCreateKinesisStreamsSinkWriterInitializesRateLimitingStrategyWithExpectedParameters() { - Sink.InitContext sinkInitContext = new TestSinkInitContext(); + void testCreateKinesisStreamsSinkWriterInitializesRateLimitingStrategyWithExpectedParameters() throws IOException { + TestSinkInitContext sinkInitContext = new TestSinkInitContext(); Properties sinkProperties = AWSServicesTestUtils.createConfig("https://fake_aws_endpoint"); - sinkWriter = - new KinesisStreamsSinkWriter( + KinesisStreamsSink sink = + new KinesisStreamsSink<>( ELEMENT_CONVERTER_PLACEHOLDER, - sinkInitContext, MAX_BATCH_SIZE, MAX_INFLIGHT_REQUESTS, MAX_BUFFERED_REQUESTS, @@ -72,6 +72,7 @@ void testCreateKinesisStreamsSinkWriterInitializesRateLimitingStrategyWithExpect "streamName", "StreamARN", sinkProperties); + sinkWriter = (KinesisStreamsSinkWriter) sink.createWriter(sinkInitContext); assertThat(sinkWriter) .extracting("rateLimitingStrategy") diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSink.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSink.java index c90aa196..8f64a679 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSink.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSink.java @@ -20,12 +20,16 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.connector.base.sink.AsyncSinkBase; import org.apache.flink.connector.base.sink.writer.BufferedRequestState; import org.apache.flink.connector.base.sink.writer.ElementConverter; import org.apache.flink.connector.dynamodb.sink.client.DynamoDbAsyncClientProvider; +import org.apache.flink.connector.dynamodb.sink.client.SdkClientProvider; import org.apache.flink.core.io.SimpleVersionedSerializer; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; + import java.io.IOException; import java.util.Collection; import java.util.Collections; @@ -80,6 +84,7 @@ public class DynamoDbSink extends AsyncSinkBase overwriteByPartitionKeys; + private transient SdkClientProvider asyncClientSdkClientProviderOverride; protected DynamoDbSink( ElementConverter elementConverter, @@ -152,7 +157,7 @@ public StatefulSinkWriter> re failOnError, tableName, overwriteByPartitionKeys, - new DynamoDbAsyncClientProvider(dynamoDbClientProperties), + getAsyncClientProvider(dynamoDbClientProperties), recoveredState); } @@ -162,4 +167,19 @@ public StatefulSinkWriter> re getWriterStateSerializer() { return new DynamoDbWriterStateSerializer(); } + + private SdkClientProvider getAsyncClientProvider( + Properties clientProperties) { + if (asyncClientSdkClientProviderOverride != null) { + return asyncClientSdkClientProviderOverride; + } + return new DynamoDbAsyncClientProvider(clientProperties); + } + + @Internal + @VisibleForTesting + void setDynamoDbAsyncClientProvider( + SdkClientProvider asyncClientSdkClientProviderOverride) { + this.asyncClientSdkClientProviderOverride = asyncClientSdkClientProviderOverride; + } } diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java index d37e184b..3ed51cd4 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java @@ -18,7 +18,6 @@ package org.apache.flink.connector.dynamodb.sink; -import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.connector.base.sink.writer.TestSinkInitContext; import org.apache.flink.connector.dynamodb.sink.client.SdkClientProvider; @@ -39,6 +38,7 @@ import software.amazon.awssdk.services.dynamodb.model.WriteRequest; import software.amazon.awssdk.services.sts.model.StsException; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -354,21 +354,25 @@ private DynamoDbSinkWriter> getDefaultSinkWriter( boolean failOnError, List overwriteByPartitionKeys, SdkClientProvider dynamoDbAsyncClientProvider) { - Sink.InitContext initContext = new TestSinkInitContext(); - return new DynamoDbSinkWriter( - new TestDynamoDbElementConverter(), - initContext, - 2, - 1, - 10, - 1024, - 1000, - 1024, - failOnError, - TABLE_NAME, - overwriteByPartitionKeys, - dynamoDbAsyncClientProvider, - Collections.emptyList()); + TestSinkInitContext initContext = new TestSinkInitContext(); + DynamoDbSink> sink = + DynamoDbSink.>builder() + .setTableName(TABLE_NAME) + .setOverwriteByPartitionKeys(overwriteByPartitionKeys) + .setElementConverter(new TestDynamoDbElementConverter()) + .setMaxBatchSize(2) + .setMaxInFlightRequests(1) + .setMaxBufferedRequests(10) + .setMaxTimeInBufferMS(1000) + .setMaxRecordSizeInBytes(1024) + .setFailOnError(failOnError) + .build(); + sink.setDynamoDbAsyncClientProvider(dynamoDbAsyncClientProvider); + try { + return (DynamoDbSinkWriter>) sink.createWriter(initContext); + } catch (IOException e) { + throw new RuntimeException(e); + } } private List getDefaultInputRequests() {