Skip to content

Commit

Permalink
[FLINK-33181][Connectors/Kinesis] Not validate source & sink options …
Browse files Browse the repository at this point in the history
…when creating dynamic tables (apache#105)

Co-authored-by: Khanh <[email protected]>
  • Loading branch information
vtkhanh and Khanh authored Oct 19, 2023
1 parent c3395b7 commit 76d791f
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ public class KinesisStreamsConnectorOptionsUtils {
/** Key for accessing kinesisAsyncClient properties. */
public static final String KINESIS_CLIENT_PROPERTIES_KEY = "sink.client.properties";

public static final String CONSUMER_PREFIX = "scan.";

private final AsyncClientOptionsUtils asyncClientOptionsUtils;
private final AsyncSinkConfigurationValidator asyncSinkconfigurationValidator;
private final Map<String, String> resolvedOptions;
Expand All @@ -75,8 +77,8 @@ public class KinesisStreamsConnectorOptionsUtils {
private static final String[] NON_VALIDATED_PREFIXES =
new String[] {
AWSOptionUtils.AWS_PROPERTIES_PREFIX,
AsyncClientOptionsUtils.SINK_CLIENT_PREFIX,
KinesisProducerOptionsMapper.KINESIS_PRODUCER_PREFIX
KinesisProducerOptionsMapper.PRODUCER_PREFIX,
CONSUMER_PREFIX
};

public KinesisStreamsConnectorOptionsUtils(
Expand Down Expand Up @@ -121,6 +123,9 @@ public List<String> getNonValidatedPrefixes() {
public static class KinesisProducerOptionsMapper {
private static final Logger LOG =
LoggerFactory.getLogger(KinesisProducerOptionsMapper.class);

public static final String PRODUCER_PREFIX = "sink.";

/** prefix for deprecated producer options fallback keys. */
public static final String KINESIS_PRODUCER_PREFIX = "sink.producer.";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import static org.apache.flink.connector.base.table.AsyncSinkConnectorOptions.MAX_IN_FLIGHT_REQUESTS;
import static org.apache.flink.connector.kinesis.table.KinesisConnectorOptions.SINK_FAIL_ON_ERROR;
import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Test for {@link KinesisDynamicSink} created by {@link KinesisDynamicTableSinkFactory}. */
class KinesisDynamicTableSinkFactoryTest {
Expand Down Expand Up @@ -173,6 +174,46 @@ void testGoodTableSinkForNonPartitionedTableWithSinkOptions() {
Assertions.assertThat(sinkFunction).isInstanceOf(KinesisStreamsSink.class);
}

@Test
void testGoodTableSinkForNonPartitionedTableWithSinkAndConsumerOptions() {
ResolvedSchema sinkSchema = defaultSinkSchema();
Map<String, String> tableOptions = defaultTableOptionsWithSinkAndConsumerOptions().build();

// Construct actual DynamicTableSink using FactoryUtil
KinesisDynamicSink actualSink =
(KinesisDynamicSink) createTableSink(sinkSchema, tableOptions);

// Construct expected DynamicTableSink using factory under test
KinesisDynamicSink expectedSink =
getDefaultSinkBuilder()
.setConsumedDataType(sinkSchema.toPhysicalRowDataType())
.setStream(STREAM_NAME)
.setKinesisClientProperties(defaultProducerProperties())
.setEncodingFormat(new TestFormatFactory.EncodingFormatMock(","))
.setPartitioner(new RandomKinesisPartitionKeyGenerator<>())
.build();

Assertions.assertThat(actualSink).isEqualTo(expectedSink);

// verify the produced sink
DynamicTableSink.SinkRuntimeProvider sinkFunctionProvider =
actualSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false));
Sink<RowData> sinkFunction = ((SinkV2Provider) sinkFunctionProvider).createSink();
Assertions.assertThat(sinkFunction).isInstanceOf(KinesisStreamsSink.class);
}

@Test
void testBadTableSinkWithUnsupportedOptions() {
ResolvedSchema sinkSchema = defaultSinkSchema();
Map<String, String> tableOptions =
defaultTableOptions().withTableOption("invalid.option", "some_value").build();

assertThatThrownBy(() -> createTableSink(sinkSchema, tableOptions))
.hasCauseInstanceOf(ValidationException.class)
.hasStackTraceContaining("Unsupported options:")
.hasStackTraceContaining("invalid.option");
}

@Test
void testGoodTableSinkForNonPartitionedTableWithProducerOptions() {
ResolvedSchema sinkSchema = defaultSinkSchema();
Expand Down Expand Up @@ -258,6 +299,13 @@ private TableOptionsBuilder defaultTableOptionsWithSinkOptions() {
.withTableOption(FLUSH_BUFFER_TIMEOUT.key(), "1000");
}

private TableOptionsBuilder defaultTableOptionsWithSinkAndConsumerOptions() {
return defaultTableOptionsWithSinkOptions()
.withTableOption("scan.stream.initpos", "AT_TIMESTAMP")
.withTableOption("scan.stream.initpos-timestamp-format", "yyyy-MM-dd'T'HH:mm:ss")
.withTableOption("scan.stream.initpos-timestamp", "2022-10-22T12:00:00");
}

private TableOptionsBuilder defaultTableOptionsWithDeprecatedOptions() {
return defaultTableOptions()
.withTableOption("sink.producer.record-max-buffered-time", "1000")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.connector.aws.table.util.AWSOptionUtils;
import org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorOptionsUtils;
import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;

import java.util.Arrays;
Expand Down Expand Up @@ -60,7 +61,10 @@ public Map<String, String> getProcessedResolvedOptions() {

@Override
public List<String> getNonValidatedPrefixes() {
return Arrays.asList(AWS_PROPERTIES_PREFIX, CONSUMER_PREFIX);
return Arrays.asList(
AWS_PROPERTIES_PREFIX,
CONSUMER_PREFIX,
KinesisStreamsConnectorOptionsUtils.KinesisProducerOptionsMapper.PRODUCER_PREFIX);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.streaming.connectors.kinesis.KinesisShardAssigner;
import org.apache.flink.streaming.connectors.kinesis.util.UniformShardAssigner;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.WatermarkSpec;
Expand Down Expand Up @@ -59,6 +60,7 @@
import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/**
* Test for {@link KinesisDynamicSource} and {@link KinesisDynamicSink} created by {@link
Expand Down Expand Up @@ -245,6 +247,49 @@ public void testUnrecognizedShardAssignerTypeInConfig() {
assertThat(kinesisConsumer.getShardAssigner().getClass().equals(defaultShardAssignerClass));
}

@Test
public void testGoodTableSourceWithSinkOptions() {
ResolvedSchema sourceSchema = defaultSourceSchema();
Map<String, String> tableOptions = defaultTableOptionsWithSinkOptions().build();

// Construct actual DynamicTableSource using FactoryUtil
KinesisDynamicSource actualSource =
(KinesisDynamicSource) createTableSource(sourceSchema, tableOptions);

// Construct expected DynamicTableSink using factory under test
KinesisDynamicSource expectedSource =
new KinesisDynamicSource(
sourceSchema.toPhysicalRowDataType(),
STREAM_NAME,
DEFAULT_SHARD_ASSIGNER_ID,
defaultConsumerProperties(),
new TestFormatFactory.DecodingFormatMock(",", true));

// verify that the constructed DynamicTableSink is as expected
assertThat(actualSource).isEqualTo(expectedSource);

// verify produced source
ScanTableSource.ScanRuntimeProvider functionProvider =
actualSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
SourceFunction<RowData> sourceFunction =
as(functionProvider, SourceFunctionProvider.class).createSourceFunction();
assertThat(sourceFunction).isInstanceOf(FlinkKinesisConsumer.class);
}

@Test
public void testBadTableSourceWithUnsupportedOptions() {
ResolvedSchema sourceSchema = defaultSourceSchema();
Map<String, String> tableOptions =
defaultTableOptionsWithSinkOptions()
.withTableOption("invalid.option", "some_value")
.build();

assertThatThrownBy(() -> createTableSource(sourceSchema, tableOptions))
.hasCauseInstanceOf(ValidationException.class)
.hasStackTraceContaining("Unsupported options:")
.hasStackTraceContaining("invalid.option");
}

// --------------------------------------------------------------------------------------------
// Utilities
// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -292,6 +337,16 @@ private TableOptionsBuilder defaultTableOptions() {
.withFormatOption(TestFormatFactory.FAIL_ON_MISSING, "true");
}

private TableOptionsBuilder defaultTableOptionsWithSinkOptions() {
return defaultTableOptions()
.withTableOption("sink.fail-on-error", "true")
.withTableOption("sink.batch.max-size", "100")
.withTableOption("sink.requests.max-inflight", "100")
.withTableOption("sink.requests.max-buffered", "100")
.withTableOption("sink.flush-buffer.size", "1000")
.withTableOption("sink.flush-buffer.timeout", "1000");
}

private TableOptionsBuilder defaultSinkTableOptions() {
String connector = KinesisDynamicTableFactory.IDENTIFIER;
String format = TestFormatFactory.IDENTIFIER;
Expand Down

0 comments on commit 76d791f

Please sign in to comment.