From 4ead3377447f0db6fe377a3a499e89689f5d2380 Mon Sep 17 00:00:00 2001 From: Aleksandr Pilipenko Date: Mon, 2 Oct 2023 19:23:25 +0100 Subject: [PATCH] [FLINK-33176][Connectors/Kinesis] Handle null value in RowDataKinesisDeserializationSchema --- .../RowDataKinesisDeserializationSchema.java | 5 + ...wDataKinesisDeserializationSchemaTest.java | 108 ++++++++++++++++++ .../StaticValueDeserializationSchema.java | 52 +++++++++ 3 files changed, 165 insertions(+) create mode 100644 flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/table/RowDataKinesisDeserializationSchemaTest.java create mode 100644 flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/StaticValueDeserializationSchema.java diff --git a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/RowDataKinesisDeserializationSchema.java b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/RowDataKinesisDeserializationSchema.java index 3ffd0518..4d9023d5 100644 --- a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/RowDataKinesisDeserializationSchema.java +++ b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/RowDataKinesisDeserializationSchema.java @@ -118,6 +118,11 @@ public RowData deserialize( throws IOException { RowData physicalRow = physicalDeserializer.deserialize(recordValue); + // If message can not be deserialized by physicalDeserializer - return null to skip record + if (physicalRow == null) { + return null; + } + GenericRowData metadataRow = new GenericRowData(requestedMetadataFields.size()); for (int i = 0; i < metadataRow.getArity(); i++) { diff --git a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/table/RowDataKinesisDeserializationSchemaTest.java b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/table/RowDataKinesisDeserializationSchemaTest.java new file mode 100644 index 00000000..711b5352 --- /dev/null +++ b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/table/RowDataKinesisDeserializationSchemaTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.table; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.connectors.kinesis.table.RowDataKinesisDeserializationSchema.Metadata; +import org.apache.flink.streaming.connectors.kinesis.testutils.StaticValueDeserializationSchema; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.RowKind; + +import org.junit.Test; + +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.util.Arrays; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for methods in {@link RowDataKinesisDeserializationSchema} class. */ +public class RowDataKinesisDeserializationSchemaTest { + private static final String FIELD_NAME = "text_field"; + + @Test + public void testAddMetadataToDeserializedRecord() throws Exception { + long timestamp = Instant.now().toEpochMilli(); + String shardName = "test shard"; + String sequenceNumber = "sequence number"; + String deserializedValue = "deserialized value"; + + GenericRowData rowData = new GenericRowData(RowKind.INSERT, 1); + rowData.setField(0, StringData.fromString(deserializedValue)); + DataType dataType = DataTypes.ROW(DataTypes.FIELD(FIELD_NAME, DataTypes.STRING())); + + ScanTableSource.ScanContext scanContext = ScanRuntimeProviderContext.INSTANCE; + TypeInformation typeInformation = scanContext.createTypeInformation(dataType); + + RowDataKinesisDeserializationSchema rowDataKinesisDeserializationSchema = + createSchema(rowData, typeInformation); + + RowData row = + rowDataKinesisDeserializationSchema.deserialize( + deserializedValue.getBytes(StandardCharsets.UTF_8), + "partitionKey", + sequenceNumber, + timestamp, + "test_stream", + shardName); + + assertThat(row).isNotNull(); + assertThat(row.getString(0)).isEqualTo(StringData.fromString(deserializedValue)); + assertThat(row.getTimestamp(1, 0)).isEqualTo(TimestampData.fromEpochMillis(timestamp)); + assertThat(row.getString(2)).isEqualTo(StringData.fromString(sequenceNumber)); + assertThat(row.getString(3)).isEqualTo(StringData.fromString(shardName)); + } + + @Test + public void testHandleNullDeserializationResult() throws Exception { + ScanTableSource.ScanContext scanContext = ScanRuntimeProviderContext.INSTANCE; + RowDataKinesisDeserializationSchema rowDataKinesisDeserializationSchema = + createSchema( + null, scanContext.createTypeInformation(DataTypes.ROW(DataTypes.STRING()))); + + RowData row = + rowDataKinesisDeserializationSchema.deserialize( + new byte[0], + "partitionKey", + "sequence number", + Instant.now().toEpochMilli(), + "test_stream", + "test shard"); + + assertThat(row).isNull(); + } + + private RowDataKinesisDeserializationSchema createSchema( + RowData deserializedValue, TypeInformation typeInformation) { + + DeserializationSchema internalDeserializationSchema = + new StaticValueDeserializationSchema<>(deserializedValue, typeInformation); + + return new RowDataKinesisDeserializationSchema( + internalDeserializationSchema, typeInformation, Arrays.asList(Metadata.values())); + } +} diff --git a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/StaticValueDeserializationSchema.java b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/StaticValueDeserializationSchema.java new file mode 100644 index 00000000..4349bed0 --- /dev/null +++ b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/StaticValueDeserializationSchema.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.testutils; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import java.io.IOException; + +/** + * A DeserializationSchema implementation which returns predefined value when the deserialize method + * is called. + */ +public class StaticValueDeserializationSchema implements DeserializationSchema { + private final T value; + private final TypeInformation typeInformation; + + public StaticValueDeserializationSchema(T value, TypeInformation typeInformation) { + this.value = value; + this.typeInformation = typeInformation; + } + + @Override + public T deserialize(final byte[] bytes) throws IOException { + return value; + } + + @Override + public boolean isEndOfStream(final T s) { + return false; + } + + @Override + public TypeInformation getProducedType() { + return typeInformation; + } +}