Skip to content

Commit

Permalink
[FLINK-34260][Connectors/AWS] Update flink-connector-aws to be compat…
Browse files Browse the repository at this point in the history
…ible with updated SinkV2 interfaces
  • Loading branch information
z3d1k committed Feb 1, 2024
1 parent 05fdf37 commit f39b39e
Show file tree
Hide file tree
Showing 14 changed files with 116 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ under the License.
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,22 +48,22 @@ public class KinesisFirehoseSinkWriterTest {
.build();

@BeforeEach
void setup() {
void setup() throws IOException {
TestSinkInitContext sinkInitContext = new TestSinkInitContext();
Properties sinkProperties = AWSServicesTestUtils.createConfig("https://fake_aws_endpoint");
sinkWriter =
new KinesisFirehoseSinkWriter<>(
KinesisFirehoseSink<String> sink =
new KinesisFirehoseSink<>(
ELEMENT_CONVERTER_PLACEHOLDER,
sinkInitContext,
50,
16,
10000,
4 * 1024 * 1024,
5000,
1000 * 1024,
4 * 1024 * 1024L,
5000L,
1000 * 1024L,
true,
"streamName",
sinkProperties);
sinkWriter = (KinesisFirehoseSinkWriter<String>) sink.createWriter(sinkInitContext);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.flink.connector.kinesis.sink;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
Expand All @@ -28,6 +27,7 @@
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;

import java.io.IOException;
import java.util.Properties;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
Expand Down Expand Up @@ -55,13 +55,13 @@ public class KinesisStreamsSinkWriterTest {
.build();

@Test
void testCreateKinesisStreamsSinkWriterInitializesRateLimitingStrategyWithExpectedParameters() {
Sink.InitContext sinkInitContext = new TestSinkInitContext();
void testCreateKinesisStreamsSinkWriterInitializesRateLimitingStrategyWithExpectedParameters()
throws IOException {
TestSinkInitContext sinkInitContext = new TestSinkInitContext();
Properties sinkProperties = AWSServicesTestUtils.createConfig("https://fake_aws_endpoint");
sinkWriter =
new KinesisStreamsSinkWriter<String>(
KinesisStreamsSink<String> sink =
new KinesisStreamsSink<>(
ELEMENT_CONVERTER_PLACEHOLDER,
sinkInitContext,
MAX_BATCH_SIZE,
MAX_INFLIGHT_REQUESTS,
MAX_BUFFERED_REQUESTS,
Expand All @@ -70,8 +70,9 @@ void testCreateKinesisStreamsSinkWriterInitializesRateLimitingStrategyWithExpect
MAX_RECORD_SIZE,
FAIL_ON_ERROR,
"streamName",
"StreamARN",
"arn:aws:kinesis:us-east-1:000000000000:stream/streamName",
sinkProperties);
sinkWriter = (KinesisStreamsSinkWriter<String>) sink.createWriter(sinkInitContext);

assertThat(sinkWriter)
.extracting("rateLimitingStrategy")
Expand Down
6 changes: 6 additions & 0 deletions flink-connector-aws/flink-connector-dynamodb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ under the License.
</dependency>

<!-- Test dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.connector.base.sink.AsyncSinkBase;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.connector.dynamodb.sink.client.DynamoDbAsyncClientProvider;
import org.apache.flink.connector.dynamodb.sink.client.SdkClientProvider;
import org.apache.flink.core.io.SimpleVersionedSerializer;

import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -80,6 +84,7 @@ public class DynamoDbSink<InputT> extends AsyncSinkBase<InputT, DynamoDbWriteReq
private final boolean failOnError;
private final String tableName;
private final List<String> overwriteByPartitionKeys;
private transient SdkClientProvider<DynamoDbAsyncClient> asyncClientSdkClientProviderOverride;

protected DynamoDbSink(
ElementConverter<InputT, DynamoDbWriteRequest> elementConverter,
Expand Down Expand Up @@ -152,7 +157,7 @@ public StatefulSinkWriter<InputT, BufferedRequestState<DynamoDbWriteRequest>> re
failOnError,
tableName,
overwriteByPartitionKeys,
new DynamoDbAsyncClientProvider(dynamoDbClientProperties),
getAsyncClientProvider(dynamoDbClientProperties),
recoveredState);
}

Expand All @@ -162,4 +167,19 @@ public StatefulSinkWriter<InputT, BufferedRequestState<DynamoDbWriteRequest>> re
getWriterStateSerializer() {
return new DynamoDbWriterStateSerializer();
}

private SdkClientProvider<DynamoDbAsyncClient> getAsyncClientProvider(
Properties clientProperties) {
if (asyncClientSdkClientProviderOverride != null) {
return asyncClientSdkClientProviderOverride;
}
return new DynamoDbAsyncClientProvider(clientProperties);
}

@Internal
@VisibleForTesting
void setDynamoDbAsyncClientProvider(
SdkClientProvider<DynamoDbAsyncClient> asyncClientSdkClientProviderOverride) {
this.asyncClientSdkClientProviderOverride = asyncClientSdkClientProviderOverride;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.connector.dynamodb.sink;

import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
import org.apache.flink.connector.dynamodb.sink.client.SdkClientProvider;

Expand All @@ -39,13 +38,15 @@
import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
import software.amazon.awssdk.services.sts.model.StsException;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
Expand Down Expand Up @@ -174,7 +175,7 @@ public void testMultipleKeyDeduplication() throws Exception {
}

@Test
public void testRetryableExceptionWhenFailOnErrorOnWillNotRetry() {
public void testRetryableExceptionWhenFailOnErrorOnWillNotRetry() throws IOException {
Optional<Exception> exceptionToThrow = getGenericRetryableException();
boolean failOnError = true;

Expand Down Expand Up @@ -234,39 +235,39 @@ public void testPartiallyFailedRequestRetriesFailedRecords() throws Exception {
}

@Test
public void testNonRetryableExceptionWhenFailOnErrorOnWillNotRetry() {
public void testNonRetryableExceptionWhenFailOnErrorOnWillNotRetry() throws IOException {
Optional<Exception> exceptionToThrow = getGenericNonRetryableException();
boolean failOnError = true;

assertThatRequestsAreNotRetried(failOnError, exceptionToThrow);
}

@Test
public void testNonRetryableExceptionWhenFailOnErrorOffWillNotRetry() {
public void testNonRetryableExceptionWhenFailOnErrorOffWillNotRetry() throws IOException {
Optional<Exception> exceptionToThrow = getGenericNonRetryableException();
boolean failOnError = false;

assertThatRequestsAreNotRetried(failOnError, exceptionToThrow);
}

@Test
public void testInterruptedExceptionIsNonRetryable() {
public void testInterruptedExceptionIsNonRetryable() throws IOException {
Optional<Exception> exceptionToThrow = Optional.of(new InterruptedException());
boolean failOnError = false;

assertThatRequestsAreNotRetried(failOnError, exceptionToThrow);
}

@Test
public void testInvalidCredentialsExceptionIsNonRetryable() {
public void testInvalidCredentialsExceptionIsNonRetryable() throws IOException {
Optional<Exception> exceptionToThrow = Optional.of(StsException.builder().build());
boolean failOnError = false;

assertThatRequestsAreNotRetried(failOnError, exceptionToThrow);
}

@Test
public void testResourceNotFoundExceptionIsNonRetryable() {
public void testResourceNotFoundExceptionIsNonRetryable() throws IOException {
Optional<Exception> exceptionToThrow =
Optional.of(ResourceNotFoundException.builder().build());
boolean failOnError = false;
Expand All @@ -275,7 +276,7 @@ public void testResourceNotFoundExceptionIsNonRetryable() {
}

@Test
public void testConditionalCheckFailedExceptionIsNonRetryable() {
public void testConditionalCheckFailedExceptionIsNonRetryable() throws IOException {
Optional<Exception> exceptionToThrow =
Optional.of(ConditionalCheckFailedException.builder().build());
boolean failOnError = false;
Expand All @@ -284,7 +285,7 @@ public void testConditionalCheckFailedExceptionIsNonRetryable() {
}

@Test
public void testValidationExceptionIsNonRetryable() {
public void testValidationExceptionIsNonRetryable() throws IOException {
Optional<Exception> exceptionToThrow =
Optional.of(
DynamoDbException.builder()
Expand All @@ -299,23 +300,23 @@ public void testValidationExceptionIsNonRetryable() {
}

@Test
public void testSdkClientExceptionIsNonRetryable() {
public void testSdkClientExceptionIsNonRetryable() throws IOException {
Optional<Exception> exceptionToThrow = Optional.of(SdkClientException.builder().build());
boolean failOnError = false;

assertThatRequestsAreNotRetried(failOnError, exceptionToThrow);
}

@Test
public void testGetSizeInBytesNotImplemented() {
public void testGetSizeInBytesNotImplemented() throws IOException {
DynamoDbSinkWriter<Map<String, AttributeValue>> dynamoDbSinkWriter =
getDefaultSinkWriter(
false, Collections.emptyList(), () -> new TrackingDynamoDbAsyncClient());
assertThat(dynamoDbSinkWriter.getSizeInBytes(sinkPutRequest(item("pk", "1")))).isEqualTo(0);
}

@Test
public void testClientClosesWhenWriterIsClosed() {
public void testClientClosesWhenWriterIsClosed() throws IOException {
TestAsyncDynamoDbClientProvider testAsyncDynamoDbClientProvider =
new TestAsyncDynamoDbClientProvider(new TrackingDynamoDbAsyncClient());
DynamoDbSinkWriter<Map<String, AttributeValue>> dynamoDbSinkWriter =
Expand All @@ -327,7 +328,7 @@ public void testClientClosesWhenWriterIsClosed() {
}

private void assertThatRequestsAreNotRetried(
boolean failOnError, Optional<Exception> exceptionToThrow) {
boolean failOnError, Optional<Exception> exceptionToThrow) throws IOException {
ThrowingDynamoDbAsyncClient<Exception> throwingDynamoDbAsyncClient =
new ThrowingDynamoDbAsyncClient<>(exceptionToThrow, str -> true);
DynamoDbSinkWriter<Map<String, AttributeValue>> dynamoDbSinkWriter =
Expand All @@ -343,7 +344,8 @@ private void assertThatRequestsAreNotRetried(
private DynamoDbSinkWriter<Map<String, AttributeValue>> getDefaultSinkWriter(
boolean failOnError,
List<String> overwriteByPartitionKeys,
Supplier<DynamoDbAsyncClient> clientSupplier) {
Supplier<DynamoDbAsyncClient> clientSupplier)
throws IOException {
return getDefaultSinkWriter(
failOnError,
overwriteByPartitionKeys,
Expand All @@ -353,22 +355,24 @@ private DynamoDbSinkWriter<Map<String, AttributeValue>> getDefaultSinkWriter(
private DynamoDbSinkWriter<Map<String, AttributeValue>> getDefaultSinkWriter(
boolean failOnError,
List<String> overwriteByPartitionKeys,
SdkClientProvider<DynamoDbAsyncClient> dynamoDbAsyncClientProvider) {
Sink.InitContext initContext = new TestSinkInitContext();
return new DynamoDbSinkWriter(
new TestDynamoDbElementConverter(),
initContext,
2,
1,
10,
1024,
1000,
1024,
failOnError,
TABLE_NAME,
overwriteByPartitionKeys,
dynamoDbAsyncClientProvider,
Collections.emptyList());
SdkClientProvider<DynamoDbAsyncClient> dynamoDbAsyncClientProvider)
throws IOException {
TestSinkInitContext initContext = new TestSinkInitContext();
DynamoDbSink<Map<String, AttributeValue>> sink =
new DynamoDbSink<>(
new TestDynamoDbElementConverter(),
2,
1,
10,
1024,
1000,
1024,
failOnError,
TABLE_NAME,
overwriteByPartitionKeys,
new Properties());
sink.setDynamoDbAsyncClientProvider(dynamoDbAsyncClientProvider);
return (DynamoDbSinkWriter<Map<String, AttributeValue>>) sink.createWriter(initContext);
}

private List<DynamoDbWriteRequest> getDefaultInputRequests() {
Expand Down
6 changes: 6 additions & 0 deletions flink-connector-aws/flink-connector-kinesis/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,12 @@ under the License.
</dependency>

<!-- Flink ecosystem -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ public void testListStateChangedAfterSnapshotState() throws Exception {
new FlinkKinesisConsumer<>("fakeStream", new SimpleStringSchema(), config);
FlinkKinesisConsumer<?> mockedConsumer = spy(consumer);

RuntimeContext context = new MockStreamingRuntimeContext(true, 1, 1);
RuntimeContext context = new MockStreamingRuntimeContext(true, 1, 0);

mockedConsumer.setRuntimeContext(context);
mockedConsumer.initializeState(initializationContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException;
import com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistrySerializationFacade;
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
import lombok.NonNull;
import org.apache.avro.Schema;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -208,9 +207,7 @@ public MockAWSSchemaRegistryClient() {

@Override
public UUID getSchemaVersionIdByDefinition(
@NonNull String schemaDefinition,
@NonNull String schemaName,
@NonNull String dataFormat) {
String schemaDefinition, String schemaName, String dataFormat) {
EntityNotFoundException entityNotFoundException =
EntityNotFoundException.builder()
.message(AWSSchemaRegistryConstants.SCHEMA_NOT_FOUND_MSG)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException;
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants.COMPRESSION;
import lombok.NonNull;
import org.apache.avro.Schema;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
Expand Down Expand Up @@ -293,7 +292,7 @@ public MockGlueSchemaRegistryDeserializationFacade(
}

@Override
public String getSchemaDefinition(@NonNull byte[] data) {
public String getSchemaDefinition(byte[] data) {
return schema.getSchemaDefinition();
}

Expand Down
Loading

0 comments on commit f39b39e

Please sign in to comment.