Skip to content

Commit

Permalink
Catch exceptions when writing to the output codec and drop the event. (
Browse files Browse the repository at this point in the history
…#3210)

Catch exceptions when writing to the output codec and drop the event. Correctly release failed events in the S3 sink.

Signed-off-by: David Venable <[email protected]>
(cherry picked from commit c1cbb22)
  • Loading branch information
dlvenable authored and github-actions[bot] committed Aug 22, 2023
1 parent 1dd6de4 commit 379438e
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,28 +112,32 @@ void output(Collection<Record<Event>> records) {
try {
for (Record<Event> record : records) {

if (currentBuffer.getEventCount() == 0) {
final Event eventForSchemaAutoGenerate = record.getData();
codec.start(currentBuffer.getOutputStream(), eventForSchemaAutoGenerate, codecContext);
}

final Event event = record.getData();
codec.writeEvent(event, currentBuffer.getOutputStream());
int count = currentBuffer.getEventCount() + 1;
currentBuffer.setEventCount(count);

if (event.getEventHandle() != null) {
bufferedEventHandles.add(event.getEventHandle());
try {
if (currentBuffer.getEventCount() == 0) {
codec.start(currentBuffer.getOutputStream(), event, codecContext);
}

codec.writeEvent(event, currentBuffer.getOutputStream());
int count = currentBuffer.getEventCount() + 1;
currentBuffer.setEventCount(count);

if (event.getEventHandle() != null) {
bufferedEventHandles.add(event.getEventHandle());
}
} catch (Exception ex) {
if (event.getEventHandle() != null) {
event.getEventHandle().release(false);
}
LOG.error("Unable to add event to buffer. Dropping this event.");
}

flushToS3IfNeeded();
}
} catch (IOException e) {
LOG.error("Exception while write event into buffer :", e);
flushToS3IfNeeded();
} finally {
reentrantLock.unlock();
}

flushToS3IfNeeded();

reentrantLock.unlock();
}

private void releaseEventHandles(final boolean result) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.InOrder;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.codec.OutputCodec;
import org.opensearch.dataprepper.model.configuration.PluginModel;
Expand Down Expand Up @@ -54,6 +55,7 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
Expand Down Expand Up @@ -471,6 +473,38 @@ void output_will_release_only_new_handles_since_a_flush() throws IOException {
}
}

@Test
void output_will_skip_and_drop_failed_records() throws IOException {
bufferFactory = mock(BufferFactory.class);
final Buffer buffer = mock(Buffer.class);
when(bufferFactory.getBuffer(any(S3Client.class), any(), any())).thenReturn(buffer);

final long objectSize = random.nextInt(1_000_000) + 10_000;
when(buffer.getSize()).thenReturn(objectSize);

final OutputStream outputStream = mock(OutputStream.class);
when(buffer.getOutputStream()).thenReturn(outputStream);


List<Record<Event>> records = generateEventRecords(2);
Event event1 = records.get(0).getData();
Event event2 = records.get(1).getData();

doThrow(RuntimeException.class).when(codec).writeEvent(event1, outputStream);

createObjectUnderTest().output(records);

InOrder inOrder = inOrder(codec);
inOrder.verify(codec).start(eq(outputStream), eq(event1), any());
inOrder.verify(codec).writeEvent(event1, outputStream);
inOrder.verify(codec).writeEvent(event2, outputStream);

verify(event1.getEventHandle()).release(false);
verify(event1.getEventHandle(), never()).release(true);
verify(event2.getEventHandle()).release(true);
verify(event2.getEventHandle(), never()).release(false);
}

@Test
void output_will_release_only_new_handles_since_a_flush_when_S3_fails() throws IOException {
bufferFactory = mock(BufferFactory.class);
Expand Down

0 comments on commit 379438e

Please sign in to comment.