Skip to content

Commit

Permalink
[FLINK-34260][Connectors/AWS] Update flink-connector-aws to be compat…
Browse files Browse the repository at this point in the history
…ible with updated SinkV2 interfaces
  • Loading branch information
z3d1k committed Jan 29, 2024
1 parent 38aafb4 commit 54a0b4f
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 43 deletions.
9 changes: 2 additions & 7 deletions .github/workflows/push_pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,10 @@ concurrency:
cancel-in-progress: true
jobs:
compile_and_test:
uses: ./.github/workflows/common.yml
uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
strategy:
matrix:
flink: [1.17.1, 1.18.0]
with:
flink_version: ${{ matrix.flink }}
flink_url: https://archive.apache.org/dist/flink/flink-${{ matrix.flink }}/flink-${{ matrix.flink }}-bin-scala_2.12.tgz
cache_flink_binary: true
secrets: inherit
flink: [ 1.17.1, 1.18.0, 1.19-SNAPSHOT ]

python_test:
strategy:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,22 +48,22 @@ public class KinesisFirehoseSinkWriterTest {
.build();

@BeforeEach
void setup() {
void setup() throws IOException {
TestSinkInitContext sinkInitContext = new TestSinkInitContext();
Properties sinkProperties = AWSServicesTestUtils.createConfig("https://fake_aws_endpoint");
sinkWriter =
new KinesisFirehoseSinkWriter<>(
ELEMENT_CONVERTER_PLACEHOLDER,
sinkInitContext,
50,
16,
10000,
4 * 1024 * 1024,
5000,
1000 * 1024,
true,
"streamName",
sinkProperties);
KinesisFirehoseSink<String> sink = KinesisFirehoseSink.<String>builder()
.setDeliveryStreamName("streamName")
.setSerializationSchema(new SimpleStringSchema())
.setMaxBatchSize(50)
.setMaxInFlightRequests(16)
.setMaxBufferedRequests(10000)
.setMaxBatchSizeInBytes(4 * 1024 * 1024)
.setMaxTimeInBufferMS(5000)
.setMaxRecordSizeInBytes(1000 * 1024)
.setFailOnError(true)
.setFirehoseClientProperties(sinkProperties)
.build();
sinkWriter = (KinesisFirehoseSinkWriter<String>) sink.createWriter(sinkInitContext);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;

import java.io.IOException;
import java.util.Properties;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
Expand Down Expand Up @@ -55,13 +56,12 @@ public class KinesisStreamsSinkWriterTest {
.build();

@Test
void testCreateKinesisStreamsSinkWriterInitializesRateLimitingStrategyWithExpectedParameters() {
Sink.InitContext sinkInitContext = new TestSinkInitContext();
void testCreateKinesisStreamsSinkWriterInitializesRateLimitingStrategyWithExpectedParameters() throws IOException {
TestSinkInitContext sinkInitContext = new TestSinkInitContext();
Properties sinkProperties = AWSServicesTestUtils.createConfig("https://fake_aws_endpoint");
sinkWriter =
new KinesisStreamsSinkWriter<String>(
KinesisStreamsSink<String> sink =
new KinesisStreamsSink<>(
ELEMENT_CONVERTER_PLACEHOLDER,
sinkInitContext,
MAX_BATCH_SIZE,
MAX_INFLIGHT_REQUESTS,
MAX_BUFFERED_REQUESTS,
Expand All @@ -72,6 +72,7 @@ void testCreateKinesisStreamsSinkWriterInitializesRateLimitingStrategyWithExpect
"streamName",
"StreamARN",
sinkProperties);
sinkWriter = (KinesisStreamsSinkWriter<String>) sink.createWriter(sinkInitContext);

assertThat(sinkWriter)
.extracting("rateLimitingStrategy")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.connector.base.sink.AsyncSinkBase;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.connector.dynamodb.sink.client.DynamoDbAsyncClientProvider;
import org.apache.flink.connector.dynamodb.sink.client.SdkClientProvider;
import org.apache.flink.core.io.SimpleVersionedSerializer;

import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -80,6 +84,7 @@ public class DynamoDbSink<InputT> extends AsyncSinkBase<InputT, DynamoDbWriteReq
private final boolean failOnError;
private final String tableName;
private final List<String> overwriteByPartitionKeys;
private transient SdkClientProvider<DynamoDbAsyncClient> asyncClientSdkClientProviderOverride;

protected DynamoDbSink(
ElementConverter<InputT, DynamoDbWriteRequest> elementConverter,
Expand Down Expand Up @@ -152,7 +157,7 @@ public StatefulSinkWriter<InputT, BufferedRequestState<DynamoDbWriteRequest>> re
failOnError,
tableName,
overwriteByPartitionKeys,
new DynamoDbAsyncClientProvider(dynamoDbClientProperties),
getAsyncClientProvider(dynamoDbClientProperties),
recoveredState);
}

Expand All @@ -162,4 +167,19 @@ public StatefulSinkWriter<InputT, BufferedRequestState<DynamoDbWriteRequest>> re
getWriterStateSerializer() {
return new DynamoDbWriterStateSerializer();
}

private SdkClientProvider<DynamoDbAsyncClient> getAsyncClientProvider(
Properties clientProperties) {
if (asyncClientSdkClientProviderOverride != null) {
return asyncClientSdkClientProviderOverride;
}
return new DynamoDbAsyncClientProvider(clientProperties);
}

@Internal
@VisibleForTesting
void setDynamoDbAsyncClientProvider(
SdkClientProvider<DynamoDbAsyncClient> asyncClientSdkClientProviderOverride) {
this.asyncClientSdkClientProviderOverride = asyncClientSdkClientProviderOverride;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.connector.dynamodb.sink;

import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
import org.apache.flink.connector.dynamodb.sink.client.SdkClientProvider;

Expand All @@ -39,6 +38,7 @@
import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
import software.amazon.awssdk.services.sts.model.StsException;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -354,21 +354,25 @@ private DynamoDbSinkWriter<Map<String, AttributeValue>> getDefaultSinkWriter(
boolean failOnError,
List<String> overwriteByPartitionKeys,
SdkClientProvider<DynamoDbAsyncClient> dynamoDbAsyncClientProvider) {
Sink.InitContext initContext = new TestSinkInitContext();
return new DynamoDbSinkWriter(
new TestDynamoDbElementConverter(),
initContext,
2,
1,
10,
1024,
1000,
1024,
failOnError,
TABLE_NAME,
overwriteByPartitionKeys,
dynamoDbAsyncClientProvider,
Collections.emptyList());
TestSinkInitContext initContext = new TestSinkInitContext();
DynamoDbSink<Map<String, AttributeValue>> sink =
DynamoDbSink.<Map<String, AttributeValue>>builder()
.setTableName(TABLE_NAME)
.setOverwriteByPartitionKeys(overwriteByPartitionKeys)
.setElementConverter(new TestDynamoDbElementConverter())
.setMaxBatchSize(2)
.setMaxInFlightRequests(1)
.setMaxBufferedRequests(10)
.setMaxTimeInBufferMS(1000)
.setMaxRecordSizeInBytes(1024)
.setFailOnError(failOnError)
.build();
sink.setDynamoDbAsyncClientProvider(dynamoDbAsyncClientProvider);
try {
return (DynamoDbSinkWriter<Map<String, AttributeValue>>) sink.createWriter(initContext);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private List<DynamoDbWriteRequest> getDefaultInputRequests() {
Expand Down

0 comments on commit 54a0b4f

Please sign in to comment.