Skip to content

Commit

Permalink
Add dynamodb_event_name metadata attribute, change mapping for Ddb IN…
Browse files Browse the repository at this point in the history
…SERT and MODIFY to be index bulk action (opensearch-project#3585)

Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 authored Nov 5, 2023
1 parent fa69169 commit 60f84ad
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ public class MetadataKeyAttributes {

static final String SORT_KEY_METADATA_ATTRIBUTE = "sort_key";

static final String EVENT_TIMESTAMP_METADATA_ATTRIBUTE = "ts";
static final String EVENT_TIMESTAMP_METADATA_ATTRIBUTE = "dynamodb_timestamp";

static final String EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE = "op";
static final String EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE = "opensearch_action";

static final String DDB_STREAM_EVENT_NAME_METADATA_ATTRIBUTE = "dynamodb_event_name";

static final String EVENT_TABLE_NAME_METADATA_ATTRIBUTE = "table_name";
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import java.util.Map;

import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.DDB_STREAM_EVENT_NAME_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_TABLE_NAME_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_TIMESTAMP_METADATA_ATTRIBUTE;
Expand Down Expand Up @@ -80,6 +81,7 @@ public void addToBuffer(final AcknowledgementSet acknowledgementSet, Map<String,

eventMetadata.setAttribute(EVENT_TABLE_NAME_METADATA_ATTRIBUTE, tableInfo.getTableName());
eventMetadata.setAttribute(EVENT_TIMESTAMP_METADATA_ATTRIBUTE, eventCreationTimeMillis);
eventMetadata.setAttribute(DDB_STREAM_EVENT_NAME_METADATA_ATTRIBUTE, eventName);
eventMetadata.setAttribute(EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE, mapStreamEventNameToBulkAction(eventName));
String partitionKey = getAttributeValue(keys, tableInfo.getMetadata().getPartitionKeyAttributeName());
eventMetadata.setAttribute(PARTITION_KEY_METADATA_ATTRIBUTE, partitionKey);
Expand Down Expand Up @@ -110,9 +112,8 @@ private String mapStreamEventNameToBulkAction(final String streamEventName) {

switch (streamEventName) {
case "INSERT":
return OpenSearchBulkActions.CREATE.toString();
case "MODIFY":
return OpenSearchBulkActions.UPSERT.toString();
return OpenSearchBulkActions.INDEX.toString();
case "REMOVE":
return OpenSearchBulkActions.DELETE.toString();
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyDouble;
import static org.mockito.ArgumentMatchers.eq;
Expand All @@ -39,6 +40,7 @@
import static org.mockito.Mockito.verifyNoInteractions;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.ExportRecordConverter.EXPORT_RECORDS_PROCESSED_COUNT;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.ExportRecordConverter.EXPORT_RECORDS_PROCESSING_ERROR_COUNT;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.DDB_STREAM_EVENT_NAME_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_TIMESTAMP_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.PARTITION_KEY_METADATA_ATTRIBUTE;
Expand Down Expand Up @@ -141,5 +143,6 @@ void test_writeSingleRecordToBuffer() throws Exception {
assertThat(event.getMetadata().getAttribute(PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE), equalTo(pk + "|" + sk));
assertThat(event.getMetadata().getAttribute(EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE), equalTo(OpenSearchBulkActions.INDEX.toString()));
assertThat(event.getMetadata().getAttribute(EVENT_TIMESTAMP_METADATA_ATTRIBUTE), notNullValue());
assertThat(event.getMetadata().getAttribute(DDB_STREAM_EVENT_NAME_METADATA_ATTRIBUTE), nullValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.DDB_STREAM_EVENT_NAME_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_TIMESTAMP_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.PARTITION_KEY_METADATA_ATTRIBUTE;
Expand Down Expand Up @@ -134,7 +135,8 @@ void test_writeSingleRecordToBuffer() throws Exception {
assertThat(event.getMetadata().getAttribute(PARTITION_KEY_METADATA_ATTRIBUTE), equalTo(partitionKey));
assertThat(event.getMetadata().getAttribute(SORT_KEY_METADATA_ATTRIBUTE), equalTo(sortKey));
assertThat(event.getMetadata().getAttribute(PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE), equalTo(partitionKey + "|" + sortKey));
assertThat(event.getMetadata().getAttribute(EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE), equalTo(OpenSearchBulkActions.CREATE.toString()));
assertThat(event.getMetadata().getAttribute(EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE), equalTo(OpenSearchBulkActions.INDEX.toString()));
assertThat(event.getMetadata().getAttribute(DDB_STREAM_EVENT_NAME_METADATA_ATTRIBUTE), equalTo("INSERT"));
assertThat(event.getMetadata().getAttribute(EVENT_TIMESTAMP_METADATA_ATTRIBUTE), equalTo(record.dynamodb().approximateCreationDateTime().toEpochMilli()));

verifyNoInteractions(changeEventErrorCounter);
Expand Down

0 comments on commit 60f84ad

Please sign in to comment.