diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/RecordConverter.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/RecordConverter.java index 5c8093597f..a2043991bc 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/RecordConverter.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/RecordConverter.java @@ -14,6 +14,7 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo; +import java.math.BigDecimal; import java.time.Instant; import java.util.Map; @@ -56,7 +57,11 @@ public RecordConverter(final BufferAccumulator> bufferAccumulator, */ private String getAttributeValue(final Map data, String attributeName) { if (data.containsKey(attributeName)) { - return String.valueOf(data.get(attributeName)); + final Object value = data.get(attributeName); + if (value instanceof Number) { + return new BigDecimal(value.toString()).toPlainString(); + } + return String.valueOf(value); } return null; } diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/ExportRecordConverterTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/ExportRecordConverterTest.java index d996c67d8e..c2e9ed1051 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/ExportRecordConverterTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/ExportRecordConverterTest.java @@ -10,6 +10,9 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -26,6 +29,7 @@ import java.util.List; import java.util.Random; import java.util.UUID; +import java.util.stream.Stream; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; @@ -44,9 +48,9 @@ import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.ExportRecordConverter.EXPORT_RECORDS_PROCESSED_COUNT; import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.ExportRecordConverter.EXPORT_RECORDS_PROCESSING_ERROR_COUNT; import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.DDB_STREAM_EVENT_NAME_METADATA_ATTRIBUTE; -import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_VERSION_FROM_TIMESTAMP; import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE; import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_TIMESTAMP_METADATA_ATTRIBUTE; +import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_VERSION_FROM_TIMESTAMP; import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.PARTITION_KEY_METADATA_ATTRIBUTE; import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE; import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.SORT_KEY_METADATA_ATTRIBUTE; @@ -164,4 +168,41 @@ void test_writeSingleRecordToBuffer() throws Exception { verify(bytesReceivedSummary, times(1)).record(line.getBytes().length); verify(bytesProcessedSummary, times(1)).record(line.getBytes().length); } + + @ParameterizedTest + @MethodSource("decimalFormatKeysArgumentProvider") + void writing_record_to_buffer_with_ion_formatted_decimals_creates_expected_partition_and_sort_key( + final String partitionKey, + final String sortKey, + final String expectedPartitionKey, + final String expectedSortKey + ) throws Exception { + + final ArgumentCaptor recordArgumentCaptor = ArgumentCaptor.forClass(Record.class); + final String line = " $ion_1_0 {Item:{PK:" + partitionKey + ",SK:" + sortKey + "}}"; + + ExportRecordConverter objectUnderTest = new ExportRecordConverter(bufferAccumulator, tableInfo, pluginMetrics); + doNothing().when(bufferAccumulator).add(recordArgumentCaptor.capture()); + + objectUnderTest.writeToBuffer(eq(null), List.of(line)); + verify(bufferAccumulator).add(any(Record.class)); + verify(bufferAccumulator).flush(); + assertThat(recordArgumentCaptor.getValue().getData(), notNullValue()); + JacksonEvent event = (JacksonEvent) recordArgumentCaptor.getValue().getData(); + + assertThat(event.getMetadata(), notNullValue()); + + assertThat(event.getMetadata().getAttribute(PARTITION_KEY_METADATA_ATTRIBUTE), equalTo(expectedPartitionKey)); + assertThat(event.getMetadata().getAttribute(SORT_KEY_METADATA_ATTRIBUTE), equalTo(expectedSortKey)); + assertThat(event.getMetadata().getAttribute(PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE), equalTo(expectedPartitionKey + "|" + expectedSortKey)); + } + + private static Stream decimalFormatKeysArgumentProvider() { + return Stream.of( + Arguments.of("86067d1", "39.29", "860670", "39.29"), + Arguments.of("212d9", "0d0", "212000000000", "0"), + Arguments.of("0.", "4.2d1", "0", "42"), + Arguments.of("0.420d2", "42d0", "42.0", "42") + ); + } } \ No newline at end of file