From 76d791fe72c328fc80f3efe26f57a33e8b67dc23 Mon Sep 17 00:00:00 2001 From: Khanh Vu Date: Thu, 19 Oct 2023 12:16:50 +0100 Subject: [PATCH] [FLINK-33181][Connectors/Kinesis] Not validate source & sink options when creating dynamic tables (#105) Co-authored-by: Khanh --- .../KinesisStreamsConnectorOptionsUtils.java | 9 ++- .../KinesisDynamicTableSinkFactoryTest.java | 48 ++++++++++++++++ .../table/KinesisConsumerOptionsUtil.java | 6 +- .../table/KinesisDynamicTableFactoryTest.java | 55 +++++++++++++++++++ 4 files changed, 115 insertions(+), 3 deletions(-) diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/util/KinesisStreamsConnectorOptionsUtils.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/util/KinesisStreamsConnectorOptionsUtils.java index 4b30fe05..3d74361f 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/util/KinesisStreamsConnectorOptionsUtils.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/util/KinesisStreamsConnectorOptionsUtils.java @@ -62,6 +62,8 @@ public class KinesisStreamsConnectorOptionsUtils { /** Key for accessing kinesisAsyncClient properties. */ public static final String KINESIS_CLIENT_PROPERTIES_KEY = "sink.client.properties"; + public static final String CONSUMER_PREFIX = "scan."; + private final AsyncClientOptionsUtils asyncClientOptionsUtils; private final AsyncSinkConfigurationValidator asyncSinkconfigurationValidator; private final Map resolvedOptions; @@ -75,8 +77,8 @@ public class KinesisStreamsConnectorOptionsUtils { private static final String[] NON_VALIDATED_PREFIXES = new String[] { AWSOptionUtils.AWS_PROPERTIES_PREFIX, - AsyncClientOptionsUtils.SINK_CLIENT_PREFIX, - KinesisProducerOptionsMapper.KINESIS_PRODUCER_PREFIX + KinesisProducerOptionsMapper.PRODUCER_PREFIX, + CONSUMER_PREFIX }; public KinesisStreamsConnectorOptionsUtils( @@ -121,6 +123,9 @@ public List getNonValidatedPrefixes() { public static class KinesisProducerOptionsMapper { private static final Logger LOG = LoggerFactory.getLogger(KinesisProducerOptionsMapper.class); + + public static final String PRODUCER_PREFIX = "sink."; + /** prefix for deprecated producer options fallback keys. */ public static final String KINESIS_PRODUCER_PREFIX = "sink.producer."; diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactoryTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactoryTest.java index ef939cd3..edc2e6e5 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactoryTest.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactoryTest.java @@ -48,6 +48,7 @@ import static org.apache.flink.connector.base.table.AsyncSinkConnectorOptions.MAX_IN_FLIGHT_REQUESTS; import static org.apache.flink.connector.kinesis.table.KinesisConnectorOptions.SINK_FAIL_ON_ERROR; import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Test for {@link KinesisDynamicSink} created by {@link KinesisDynamicTableSinkFactory}. */ class KinesisDynamicTableSinkFactoryTest { @@ -173,6 +174,46 @@ void testGoodTableSinkForNonPartitionedTableWithSinkOptions() { Assertions.assertThat(sinkFunction).isInstanceOf(KinesisStreamsSink.class); } + @Test + void testGoodTableSinkForNonPartitionedTableWithSinkAndConsumerOptions() { + ResolvedSchema sinkSchema = defaultSinkSchema(); + Map tableOptions = defaultTableOptionsWithSinkAndConsumerOptions().build(); + + // Construct actual DynamicTableSink using FactoryUtil + KinesisDynamicSink actualSink = + (KinesisDynamicSink) createTableSink(sinkSchema, tableOptions); + + // Construct expected DynamicTableSink using factory under test + KinesisDynamicSink expectedSink = + getDefaultSinkBuilder() + .setConsumedDataType(sinkSchema.toPhysicalRowDataType()) + .setStream(STREAM_NAME) + .setKinesisClientProperties(defaultProducerProperties()) + .setEncodingFormat(new TestFormatFactory.EncodingFormatMock(",")) + .setPartitioner(new RandomKinesisPartitionKeyGenerator<>()) + .build(); + + Assertions.assertThat(actualSink).isEqualTo(expectedSink); + + // verify the produced sink + DynamicTableSink.SinkRuntimeProvider sinkFunctionProvider = + actualSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false)); + Sink sinkFunction = ((SinkV2Provider) sinkFunctionProvider).createSink(); + Assertions.assertThat(sinkFunction).isInstanceOf(KinesisStreamsSink.class); + } + + @Test + void testBadTableSinkWithUnsupportedOptions() { + ResolvedSchema sinkSchema = defaultSinkSchema(); + Map tableOptions = + defaultTableOptions().withTableOption("invalid.option", "some_value").build(); + + assertThatThrownBy(() -> createTableSink(sinkSchema, tableOptions)) + .hasCauseInstanceOf(ValidationException.class) + .hasStackTraceContaining("Unsupported options:") + .hasStackTraceContaining("invalid.option"); + } + @Test void testGoodTableSinkForNonPartitionedTableWithProducerOptions() { ResolvedSchema sinkSchema = defaultSinkSchema(); @@ -258,6 +299,13 @@ private TableOptionsBuilder defaultTableOptionsWithSinkOptions() { .withTableOption(FLUSH_BUFFER_TIMEOUT.key(), "1000"); } + private TableOptionsBuilder defaultTableOptionsWithSinkAndConsumerOptions() { + return defaultTableOptionsWithSinkOptions() + .withTableOption("scan.stream.initpos", "AT_TIMESTAMP") + .withTableOption("scan.stream.initpos-timestamp-format", "yyyy-MM-dd'T'HH:mm:ss") + .withTableOption("scan.stream.initpos-timestamp", "2022-10-22T12:00:00"); + } + private TableOptionsBuilder defaultTableOptionsWithDeprecatedOptions() { return defaultTableOptions() .withTableOption("sink.producer.record-max-buffered-time", "1000") diff --git a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisConsumerOptionsUtil.java b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisConsumerOptionsUtil.java index c3c2307b..6bdb5687 100644 --- a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisConsumerOptionsUtil.java +++ b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisConsumerOptionsUtil.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.connector.aws.table.util.AWSOptionUtils; +import org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorOptionsUtils; import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil; import java.util.Arrays; @@ -60,7 +61,10 @@ public Map getProcessedResolvedOptions() { @Override public List getNonValidatedPrefixes() { - return Arrays.asList(AWS_PROPERTIES_PREFIX, CONSUMER_PREFIX); + return Arrays.asList( + AWS_PROPERTIES_PREFIX, + CONSUMER_PREFIX, + KinesisStreamsConnectorOptionsUtils.KinesisProducerOptionsMapper.PRODUCER_PREFIX); } @Override diff --git a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisDynamicTableFactoryTest.java b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisDynamicTableFactoryTest.java index bf1a6dc2..0309af94 100644 --- a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisDynamicTableFactoryTest.java +++ b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisDynamicTableFactoryTest.java @@ -26,6 +26,7 @@ import org.apache.flink.streaming.connectors.kinesis.KinesisShardAssigner; import org.apache.flink.streaming.connectors.kinesis.util.UniformShardAssigner; import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.WatermarkSpec; @@ -59,6 +60,7 @@ import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink; import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** * Test for {@link KinesisDynamicSource} and {@link KinesisDynamicSink} created by {@link @@ -245,6 +247,49 @@ public void testUnrecognizedShardAssignerTypeInConfig() { assertThat(kinesisConsumer.getShardAssigner().getClass().equals(defaultShardAssignerClass)); } + @Test + public void testGoodTableSourceWithSinkOptions() { + ResolvedSchema sourceSchema = defaultSourceSchema(); + Map tableOptions = defaultTableOptionsWithSinkOptions().build(); + + // Construct actual DynamicTableSource using FactoryUtil + KinesisDynamicSource actualSource = + (KinesisDynamicSource) createTableSource(sourceSchema, tableOptions); + + // Construct expected DynamicTableSink using factory under test + KinesisDynamicSource expectedSource = + new KinesisDynamicSource( + sourceSchema.toPhysicalRowDataType(), + STREAM_NAME, + DEFAULT_SHARD_ASSIGNER_ID, + defaultConsumerProperties(), + new TestFormatFactory.DecodingFormatMock(",", true)); + + // verify that the constructed DynamicTableSink is as expected + assertThat(actualSource).isEqualTo(expectedSource); + + // verify produced source + ScanTableSource.ScanRuntimeProvider functionProvider = + actualSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE); + SourceFunction sourceFunction = + as(functionProvider, SourceFunctionProvider.class).createSourceFunction(); + assertThat(sourceFunction).isInstanceOf(FlinkKinesisConsumer.class); + } + + @Test + public void testBadTableSourceWithUnsupportedOptions() { + ResolvedSchema sourceSchema = defaultSourceSchema(); + Map tableOptions = + defaultTableOptionsWithSinkOptions() + .withTableOption("invalid.option", "some_value") + .build(); + + assertThatThrownBy(() -> createTableSource(sourceSchema, tableOptions)) + .hasCauseInstanceOf(ValidationException.class) + .hasStackTraceContaining("Unsupported options:") + .hasStackTraceContaining("invalid.option"); + } + // -------------------------------------------------------------------------------------------- // Utilities // -------------------------------------------------------------------------------------------- @@ -292,6 +337,16 @@ private TableOptionsBuilder defaultTableOptions() { .withFormatOption(TestFormatFactory.FAIL_ON_MISSING, "true"); } + private TableOptionsBuilder defaultTableOptionsWithSinkOptions() { + return defaultTableOptions() + .withTableOption("sink.fail-on-error", "true") + .withTableOption("sink.batch.max-size", "100") + .withTableOption("sink.requests.max-inflight", "100") + .withTableOption("sink.requests.max-buffered", "100") + .withTableOption("sink.flush-buffer.size", "1000") + .withTableOption("sink.flush-buffer.timeout", "1000"); + } + private TableOptionsBuilder defaultSinkTableOptions() { String connector = KinesisDynamicTableFactory.IDENTIFIER; String format = TestFormatFactory.IDENTIFIER;