Skip to content

Commit

Permalink
FIX: deal with event with object size zero (opensearch-project#3806)
Browse files Browse the repository at this point in the history
* FIX: deal with event with object size zero

Signed-off-by: George Chen <[email protected]>
  • Loading branch information
chenqi0805 authored Dec 5, 2023
1 parent 48d0d72 commit ad62f74
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class SqsWorker implements Runnable {
static final String SQS_MESSAGES_DELETED_METRIC_NAME = "sqsMessagesDeleted";
static final String SQS_MESSAGES_FAILED_METRIC_NAME = "sqsMessagesFailed";
static final String SQS_MESSAGES_DELETE_FAILED_METRIC_NAME = "sqsMessagesDeleteFailed";
static final String S3_OBJECTS_EMPTY_METRIC_NAME = "s3ObjectsEmpty";
static final String SQS_MESSAGE_DELAY_METRIC_NAME = "sqsMessageDelay";
static final String SQS_VISIBILITY_TIMEOUT_CHANGED_COUNT_METRIC_NAME = "sqsVisibilityTimeoutChangedCount";
static final String SQS_VISIBILITY_TIMEOUT_CHANGE_FAILED_COUNT_METRIC_NAME = "sqsVisibilityTimeoutChangeFailedCount";
Expand All @@ -67,6 +68,7 @@ public class SqsWorker implements Runnable {
private final Counter sqsMessagesReceivedCounter;
private final Counter sqsMessagesDeletedCounter;
private final Counter sqsMessagesFailedCounter;
private final Counter s3ObjectsEmptyCounter;
private final Counter sqsMessagesDeleteFailedCounter;
private final Counter acknowledgementSetCallbackCounter;
private final Counter sqsVisibilityTimeoutChangedCount;
Expand Down Expand Up @@ -102,6 +104,7 @@ public SqsWorker(final AcknowledgementSetManager acknowledgementSetManager,
sqsMessagesReceivedCounter = pluginMetrics.counter(SQS_MESSAGES_RECEIVED_METRIC_NAME);
sqsMessagesDeletedCounter = pluginMetrics.counter(SQS_MESSAGES_DELETED_METRIC_NAME);
sqsMessagesFailedCounter = pluginMetrics.counter(SQS_MESSAGES_FAILED_METRIC_NAME);
s3ObjectsEmptyCounter = pluginMetrics.counter(S3_OBJECTS_EMPTY_METRIC_NAME);
sqsMessagesDeleteFailedCounter = pluginMetrics.counter(SQS_MESSAGES_DELETE_FAILED_METRIC_NAME);
sqsMessageDelayTimer = pluginMetrics.timer(SQS_MESSAGE_DELAY_METRIC_NAME);
acknowledgementSetCallbackCounter = pluginMetrics.counter(ACKNOWLEDGEMENT_SET_CALLACK_METRIC_NAME);
Expand Down Expand Up @@ -214,6 +217,12 @@ private List<DeleteMessageBatchRequestEntry> processS3EventNotificationRecords(f
if (s3SourceConfig.getOnErrorOption().equals(OnErrorOption.DELETE_MESSAGES)) {
deleteMessageBatchRequestEntryCollection.add(buildDeleteMessageBatchRequestEntry(parsedMessage.getMessage()));
}
} else if (parsedMessage.getObjectSize() == 0L) {
s3ObjectsEmptyCounter.increment();
LOG.debug("Received empty S3 object: {} in the SQS message. " +
"The S3 object is skipped and the SQS message will be deleted.",
parsedMessage.getObjectKey());
deleteMessageBatchRequestEntryCollection.add(buildDeleteMessageBatchRequestEntry(parsedMessage.getMessage()));
} else {
if (s3SourceConfig.getNotificationSource().equals(NotificationSourceOption.S3)
&& !parsedMessage.isEmptyNotification()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public class ParsedMessage {
private boolean failedParsing;
private String bucketName;
private String objectKey;
private long objectSize;
private String eventName;
private DateTime eventTime;
private boolean emptyNotification;
Expand All @@ -33,6 +34,7 @@ public ParsedMessage(final Message message, final boolean failedParsing) {
this.message = message;
this.bucketName = notificationRecords.get(0).getS3().getBucket().getName();
this.objectKey = notificationRecords.get(0).getS3().getObject().getUrlDecodedKey();
this.objectSize = notificationRecords.get(0).getS3().getObject().getSizeAsLong();
this.eventName = notificationRecords.get(0).getEventName();
this.eventTime = notificationRecords.get(0).getEventTime();
this.failedParsing = false;
Expand All @@ -43,6 +45,7 @@ public ParsedMessage(final Message message, final boolean failedParsing) {
this.message = message;
this.bucketName = eventBridgeNotification.getDetail().getBucket().getName();
this.objectKey = eventBridgeNotification.getDetail().getObject().getUrlDecodedKey();
this.objectSize = eventBridgeNotification.getDetail().getObject().getSize();
this.detailType = eventBridgeNotification.getDetailType();
this.eventTime = eventBridgeNotification.getTime();
}
Expand All @@ -63,6 +66,10 @@ public String getObjectKey() {
return objectKey;
}

public long getObjectSize() {
return objectSize;
}

public String getEventName() {
return eventName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import static org.opensearch.dataprepper.plugins.source.s3.SqsWorker.SQS_MESSAGES_FAILED_METRIC_NAME;
import static org.opensearch.dataprepper.plugins.source.s3.SqsWorker.SQS_MESSAGES_RECEIVED_METRIC_NAME;
import static org.opensearch.dataprepper.plugins.source.s3.SqsWorker.SQS_MESSAGE_DELAY_METRIC_NAME;
import static org.opensearch.dataprepper.plugins.source.s3.SqsWorker.S3_OBJECTS_EMPTY_METRIC_NAME;

class SqsWorkerTest {
private SqsWorker sqsWorker;
Expand All @@ -88,6 +89,7 @@ class SqsWorkerTest {
private Counter sqsMessagesDeletedCounter;
private Counter sqsMessagesFailedCounter;
private Counter sqsMessagesDeleteFailedCounter;
private Counter s3ObjectsEmptyCounter;
private Timer sqsMessageDelayTimer;
private AcknowledgementSetManager acknowledgementSetManager;
private AcknowledgementSet acknowledgementSet;
Expand Down Expand Up @@ -120,11 +122,13 @@ void setUp() {
sqsMessagesDeletedCounter = mock(Counter.class);
sqsMessagesFailedCounter = mock(Counter.class);
sqsMessagesDeleteFailedCounter = mock(Counter.class);
s3ObjectsEmptyCounter = mock(Counter.class);
sqsMessageDelayTimer = mock(Timer.class);
when(pluginMetrics.counter(SQS_MESSAGES_RECEIVED_METRIC_NAME)).thenReturn(sqsMessagesReceivedCounter);
when(pluginMetrics.counter(SQS_MESSAGES_DELETED_METRIC_NAME)).thenReturn(sqsMessagesDeletedCounter);
when(pluginMetrics.counter(SQS_MESSAGES_FAILED_METRIC_NAME)).thenReturn(sqsMessagesFailedCounter);
when(pluginMetrics.counter(SQS_MESSAGES_DELETE_FAILED_METRIC_NAME)).thenReturn(sqsMessagesDeleteFailedCounter);
when(pluginMetrics.counter(S3_OBJECTS_EMPTY_METRIC_NAME)).thenReturn(s3ObjectsEmptyCounter);
when(pluginMetrics.timer(SQS_MESSAGE_DELAY_METRIC_NAME)).thenReturn(sqsMessageDelayTimer);

sqsWorker = new SqsWorker(acknowledgementSetManager, sqsClient, s3Service, s3SourceConfig, pluginMetrics, backoff);
Expand All @@ -135,6 +139,7 @@ void cleanup() {
verifyNoMoreInteractions(sqsMessagesReceivedCounter);
verifyNoMoreInteractions(sqsMessagesDeletedCounter);
verifyNoMoreInteractions(sqsMessagesFailedCounter);
verifyNoMoreInteractions(s3ObjectsEmptyCounter);
verifyNoMoreInteractions(sqsMessageDelayTimer);
}

Expand Down Expand Up @@ -303,8 +308,41 @@ void processSqsMessages_should_not_interact_with_S3Service_and_delete_message_if

verify(sqsMessagesReceivedCounter).increment(1);
verify(sqsMessagesDeletedCounter).increment(1);
verify(s3ObjectsEmptyCounter).increment();
}

@Test
void processSqsMessages_should_not_interact_with_S3Service_and_delete_message_if_EmptyFolderEvent() {
final String messageId = UUID.randomUUID().toString();
final String receiptHandle = UUID.randomUUID().toString();
final Message message = mock(Message.class);
when(message.body()).thenReturn(createEmptyFolderEventNotification(Instant.now()));
when(message.messageId()).thenReturn(messageId);
when(message.receiptHandle()).thenReturn(receiptHandle);

final ReceiveMessageResponse receiveMessageResponse = mock(ReceiveMessageResponse.class);
when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))).thenReturn(receiveMessageResponse);
when(receiveMessageResponse.messages()).thenReturn(Collections.singletonList(message));

final int messagesProcessed = sqsWorker.processSqsMessages();
assertThat(messagesProcessed, equalTo(1));
verifyNoInteractions(s3Service);

final ArgumentCaptor<DeleteMessageBatchRequest> deleteMessageBatchRequestArgumentCaptor = ArgumentCaptor.forClass(DeleteMessageBatchRequest.class);
verify(sqsClient).deleteMessageBatch(deleteMessageBatchRequestArgumentCaptor.capture());
final DeleteMessageBatchRequest actualDeleteMessageBatchRequest = deleteMessageBatchRequestArgumentCaptor.getValue();

assertThat(actualDeleteMessageBatchRequest, notNullValue());
assertThat(actualDeleteMessageBatchRequest.entries().size(), equalTo(1));
assertThat(actualDeleteMessageBatchRequest.queueUrl(), equalTo(s3SourceConfig.getSqsOptions().getSqsUrl()));
assertThat(actualDeleteMessageBatchRequest.entries().get(0).id(), equalTo(messageId));
assertThat(actualDeleteMessageBatchRequest.entries().get(0).receiptHandle(), equalTo(receiptHandle));
assertThat(messagesProcessed, equalTo(1));

verify(sqsMessagesReceivedCounter).increment(1);
verify(sqsMessagesDeletedCounter).increment(1);
verify(s3ObjectsEmptyCounter).increment();
}

@ParameterizedTest
@ValueSource(strings = {"ObjectRemoved:Delete", "ObjectRemoved:DeleteMarkerCreated"})
Expand Down Expand Up @@ -598,6 +636,15 @@ private static String createPutNotification(final Instant startTime) {
return createEventNotification("ObjectCreated:Put", startTime);
}

private static String createEmptyFolderEventNotification(final Instant startTime) {
return "{\"Records\":[{\"eventVersion\":\"2.1\",\"eventSource\":\"aws:s3\",\"awsRegion\":\"us-east-1\"," +
"\"eventTime\":\"" + startTime + "\",\"eventName\":\"ObjectCreated:Put\",\"userIdentity\":{\"principalId\":\"AWS:AROAX:xxxxxx\"}," +
"\"requestParameters\":{\"sourceIPAddress\":\"52.95.4.23\"},\"responseElements\":{\"x-amz-request-id\":\"ABCD\"," +
"\"x-amz-id-2\":\"abcd\"},\"s3\":{\"s3SchemaVersion\":\"1.0\",\"configurationId\":\"any-event\"," +
"\"bucket\":{\"name\":\"my-osi-bucket\",\"ownerIdentity\":{\"principalId\":\"ID\"},\"arn\":\"arn:aws:s3:::bucketName\"}," +
"\"object\":{\"key\":\"empty-folder/\",\"size\":0,\"eTag\":\"abcd\",\"sequencer\":\"ABCD\"}}}]}";
}

private static String createEventNotification(final String eventName, final Instant startTime) {
return "{\"Records\":[{\"eventVersion\":\"2.1\",\"eventSource\":\"aws:s3\",\"awsRegion\":\"us-east-1\"," +
"\"eventTime\":\"" + startTime + "\",\"eventName\":\"" + eventName + "\",\"userIdentity\":{\"principalId\":\"AWS:AROAX:xxxxxx\"}," +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import software.amazon.awssdk.services.sqs.model.Message;

import java.util.List;
import java.util.Random;
import java.util.UUID;

import static org.hamcrest.MatcherAssert.assertThat;
Expand All @@ -16,6 +17,7 @@
import static org.mockito.Mockito.when;

class ParsedMessageTest {
private static final Random RANDOM = new Random();
private Message message;
private S3EventNotification.S3Entity s3Entity;
private S3EventNotification.S3BucketEntity s3BucketEntity;
Expand Down Expand Up @@ -53,10 +55,12 @@ void test_parsed_message_with_S3EventNotificationRecord() {
final String testDecodedObjectKey = UUID.randomUUID().toString();
final String testEventName = UUID.randomUUID().toString();
final DateTime testEventTime = DateTime.now();
final long testSize = RANDOM.nextLong();

when(s3EventNotificationRecord.getS3()).thenReturn(s3Entity);
when(s3Entity.getBucket()).thenReturn(s3BucketEntity);
when(s3Entity.getObject()).thenReturn(s3ObjectEntity);
when(s3ObjectEntity.getSizeAsLong()).thenReturn(testSize);
when(s3BucketEntity.getName()).thenReturn(testBucketName);
when(s3ObjectEntity.getUrlDecodedKey()).thenReturn(testDecodedObjectKey);
when(s3EventNotificationRecord.getEventName()).thenReturn(testEventName);
Expand All @@ -67,6 +71,7 @@ void test_parsed_message_with_S3EventNotificationRecord() {
assertThat(parsedMessage.getMessage(), equalTo(message));
assertThat(parsedMessage.getBucketName(), equalTo(testBucketName));
assertThat(parsedMessage.getObjectKey(), equalTo(testDecodedObjectKey));
assertThat(parsedMessage.getObjectSize(), equalTo(testSize));
assertThat(parsedMessage.getEventName(), equalTo(testEventName));
assertThat(parsedMessage.getEventTime(), equalTo(testEventTime));
assertThat(parsedMessage.isFailedParsing(), equalTo(false));
Expand All @@ -79,13 +84,15 @@ void test_parsed_message_with_S3EventBridgeNotification() {
final String testDecodedObjectKey = UUID.randomUUID().toString();
final String testDetailType = UUID.randomUUID().toString();
final DateTime testEventTime = DateTime.now();
final int testSize = RANDOM.nextInt();

when(s3EventBridgeNotification.getDetail()).thenReturn(detail);
when(s3EventBridgeNotification.getDetail().getBucket()).thenReturn(bucket);
when(s3EventBridgeNotification.getDetail().getObject()).thenReturn(object);

when(bucket.getName()).thenReturn(testBucketName);
when(object.getUrlDecodedKey()).thenReturn(testDecodedObjectKey);
when(object.getSize()).thenReturn(testSize);
when(s3EventBridgeNotification.getDetailType()).thenReturn(testDetailType);
when(s3EventBridgeNotification.getTime()).thenReturn(testEventTime);

Expand All @@ -94,6 +101,7 @@ void test_parsed_message_with_S3EventBridgeNotification() {
assertThat(parsedMessage.getMessage(), equalTo(message));
assertThat(parsedMessage.getBucketName(), equalTo(testBucketName));
assertThat(parsedMessage.getObjectKey(), equalTo(testDecodedObjectKey));
assertThat(parsedMessage.getObjectSize(), equalTo((long) testSize));
assertThat(parsedMessage.getDetailType(), equalTo(testDetailType));
assertThat(parsedMessage.getEventTime(), equalTo(testEventTime));
assertThat(parsedMessage.isFailedParsing(), equalTo(false));
Expand Down

0 comments on commit ad62f74

Please sign in to comment.