Skip to content

Commit

Permalink
[FLINK-33176][Connectors/Kinesis] Handle null value in RowDataKinesis…
Browse files Browse the repository at this point in the history
…DeserializationSchema
  • Loading branch information
z3d1k committed Oct 4, 2023
1 parent 9743a29 commit 4ead337
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ public RowData deserialize(
throws IOException {

RowData physicalRow = physicalDeserializer.deserialize(recordValue);
// If message can not be deserialized by physicalDeserializer - return null to skip record
if (physicalRow == null) {
return null;
}

GenericRowData metadataRow = new GenericRowData(requestedMetadataFields.size());

for (int i = 0; i < metadataRow.getArity(); i++) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.connectors.kinesis.table;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.kinesis.table.RowDataKinesisDeserializationSchema.Metadata;
import org.apache.flink.streaming.connectors.kinesis.testutils.StaticValueDeserializationSchema;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.RowKind;

import org.junit.Test;

import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Arrays;

import static org.assertj.core.api.Assertions.assertThat;

/** Test for methods in {@link RowDataKinesisDeserializationSchema} class. */
public class RowDataKinesisDeserializationSchemaTest {
private static final String FIELD_NAME = "text_field";

@Test
public void testAddMetadataToDeserializedRecord() throws Exception {
long timestamp = Instant.now().toEpochMilli();
String shardName = "test shard";
String sequenceNumber = "sequence number";
String deserializedValue = "deserialized value";

GenericRowData rowData = new GenericRowData(RowKind.INSERT, 1);
rowData.setField(0, StringData.fromString(deserializedValue));
DataType dataType = DataTypes.ROW(DataTypes.FIELD(FIELD_NAME, DataTypes.STRING()));

ScanTableSource.ScanContext scanContext = ScanRuntimeProviderContext.INSTANCE;
TypeInformation<RowData> typeInformation = scanContext.createTypeInformation(dataType);

RowDataKinesisDeserializationSchema rowDataKinesisDeserializationSchema =
createSchema(rowData, typeInformation);

RowData row =
rowDataKinesisDeserializationSchema.deserialize(
deserializedValue.getBytes(StandardCharsets.UTF_8),
"partitionKey",
sequenceNumber,
timestamp,
"test_stream",
shardName);

assertThat(row).isNotNull();
assertThat(row.getString(0)).isEqualTo(StringData.fromString(deserializedValue));
assertThat(row.getTimestamp(1, 0)).isEqualTo(TimestampData.fromEpochMillis(timestamp));
assertThat(row.getString(2)).isEqualTo(StringData.fromString(sequenceNumber));
assertThat(row.getString(3)).isEqualTo(StringData.fromString(shardName));
}

@Test
public void testHandleNullDeserializationResult() throws Exception {
ScanTableSource.ScanContext scanContext = ScanRuntimeProviderContext.INSTANCE;
RowDataKinesisDeserializationSchema rowDataKinesisDeserializationSchema =
createSchema(
null, scanContext.createTypeInformation(DataTypes.ROW(DataTypes.STRING())));

RowData row =
rowDataKinesisDeserializationSchema.deserialize(
new byte[0],
"partitionKey",
"sequence number",
Instant.now().toEpochMilli(),
"test_stream",
"test shard");

assertThat(row).isNull();
}

private RowDataKinesisDeserializationSchema createSchema(
RowData deserializedValue, TypeInformation<RowData> typeInformation) {

DeserializationSchema<RowData> internalDeserializationSchema =
new StaticValueDeserializationSchema<>(deserializedValue, typeInformation);

return new RowDataKinesisDeserializationSchema(
internalDeserializationSchema, typeInformation, Arrays.asList(Metadata.values()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.connectors.kinesis.testutils;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;

import java.io.IOException;

/**
* A DeserializationSchema implementation which returns predefined value when the deserialize method
* is called.
*/
public class StaticValueDeserializationSchema<T> implements DeserializationSchema<T> {
private final T value;
private final TypeInformation<T> typeInformation;

public StaticValueDeserializationSchema(T value, TypeInformation<T> typeInformation) {
this.value = value;
this.typeInformation = typeInformation;
}

@Override
public T deserialize(final byte[] bytes) throws IOException {
return value;
}

@Override
public boolean isEndOfStream(final T s) {
return false;
}

@Override
public TypeInformation<T> getProducedType() {
return typeInformation;
}
}

0 comments on commit 4ead337

Please sign in to comment.