Skip to content

Commit

Permalink
Catch processor exceptions instead of shutting down (opensearch-proje…
Browse files Browse the repository at this point in the history
…ct#4155)

Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 authored Feb 20, 2024
1 parent 94e9e65 commit e1f3167
Show file tree
Hide file tree
Showing 2 changed files with 229 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
Expand Down Expand Up @@ -102,10 +103,9 @@ private void processAcknowledgements(List<Event> inputEvents, Collection<Record<
InternalEventHandle internalEventHandle = (InternalEventHandle)(DefaultEventHandle)eventHandle;
if (internalEventHandle.getAcknowledgementSet() != null && !outputEventsSet.contains(event)) {
eventHandle.release(true);
} else if (acknowledgementsEnabled) {
invalidEventHandlesCounter.increment();
}
} else if (eventHandle != null) {
invalidEventHandlesCounter.increment();
throw new RuntimeException("Unexpected EventHandle");
}
});
Expand All @@ -126,13 +126,25 @@ private void doRun() {
}
//Should Empty list from buffer should be sent to the processors? For now sending as the Stateful processors expects it.
for (final Processor processor : processors) {

List<Event> inputEvents = null;
if (acknowledgementsEnabled) {
inputEvents = ((ArrayList<Record<Event>>)records).stream().map(Record::getData).collect(Collectors.toList());
inputEvents = ((ArrayList<Record<Event>>) records).stream().map(Record::getData).collect(Collectors.toList());
}
records = processor.execute(records);
if (inputEvents != null) {
processAcknowledgements(inputEvents, records);

try {
records = processor.execute(records);
if (inputEvents != null) {
processAcknowledgements(inputEvents, records);
}
} catch (final Exception e) {
LOG.error("A processor threw an exception. This batch of Events will be dropped, and their EventHandles will be released: ", e);
if (inputEvents != null) {
processAcknowledgements(inputEvents, Collections.emptyList());
}

records = Collections.emptyList();
break;
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
package org.opensearch.dataprepper.pipeline;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.model.CheckpointState;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.DefaultEventHandle;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.Source;
import org.opensearch.dataprepper.pipeline.common.FutureHelper;
import org.opensearch.dataprepper.pipeline.common.FutureHelperResult;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
public class ProcessWorkerTest {

@Mock
private Pipeline pipeline;

@Mock
private Buffer buffer;

@Mock
private Source source;

private List<Future<Void>> sinkFutures;

private List<Processor> processors;

@BeforeEach
void setup() {
when(pipeline.isStopRequested()).thenReturn(false).thenReturn(true);
when(source.areAcknowledgementsEnabled()).thenReturn(false);
when(pipeline.getSource()).thenReturn(source);
when(buffer.isEmpty()).thenReturn(true);
when(pipeline.getPeerForwarderDrainTimeout()).thenReturn(Duration.ofMillis(100));
when(pipeline.getReadBatchTimeoutInMillis()).thenReturn(500);

final Future<Void> sinkFuture = mock(Future.class);
sinkFutures = List.of(sinkFuture);
when(pipeline.publishToSinks(any())).thenReturn(sinkFutures);
}

private ProcessWorker createObjectUnderTest() {
return new ProcessWorker(buffer, processors, pipeline);
}

@Test
void testProcessWorkerHappyPath() {

final List<Record> records = List.of(mock(Record.class));
final CheckpointState checkpointState = mock(CheckpointState.class);
final Map.Entry<Collection, CheckpointState> readResult = Map.entry(records, checkpointState);
when(buffer.read(pipeline.getReadBatchTimeoutInMillis())).thenReturn(readResult);

final Processor processor = mock(Processor.class);
when(processor.execute(records)).thenReturn(records);
when(processor.isReadyForShutdown()).thenReturn(true);
processors = List.of(processor);

final FutureHelperResult<Void> futureHelperResult = mock(FutureHelperResult.class);
when(futureHelperResult.getFailedReasons()).thenReturn(Collections.emptyList());


try (final MockedStatic<FutureHelper> futureHelperMockedStatic = mockStatic(FutureHelper.class)) {
futureHelperMockedStatic.when(() -> FutureHelper.awaitFuturesIndefinitely(sinkFutures))
.thenReturn(futureHelperResult);

final ProcessWorker processWorker = createObjectUnderTest();

processWorker.run();
}
}

@Test
void testProcessWorkerHappyPathWithAcknowledgments() {

when(source.areAcknowledgementsEnabled()).thenReturn(true);

final List<Record<Event>> records = new ArrayList<>();
final Record<Event> mockRecord = mock(Record.class);
final Event mockEvent = mock(Event.class);
final EventHandle eventHandle = mock(DefaultEventHandle.class);
when(((DefaultEventHandle) eventHandle).getAcknowledgementSet()).thenReturn(mock(AcknowledgementSet.class));
when(mockRecord.getData()).thenReturn(mockEvent);
when(mockEvent.getEventHandle()).thenReturn(eventHandle);

records.add(mockRecord);

final CheckpointState checkpointState = mock(CheckpointState.class);
final Map.Entry<Collection, CheckpointState> readResult = Map.entry(records, checkpointState);
when(buffer.read(pipeline.getReadBatchTimeoutInMillis())).thenReturn(readResult);

final Processor processor = mock(Processor.class);
when(processor.execute(records)).thenReturn(records);
when(processor.isReadyForShutdown()).thenReturn(true);
processors = List.of(processor);

final FutureHelperResult<Void> futureHelperResult = mock(FutureHelperResult.class);
when(futureHelperResult.getFailedReasons()).thenReturn(Collections.emptyList());


try (final MockedStatic<FutureHelper> futureHelperMockedStatic = mockStatic(FutureHelper.class)) {
futureHelperMockedStatic.when(() -> FutureHelper.awaitFuturesIndefinitely(sinkFutures))
.thenReturn(futureHelperResult);

final ProcessWorker processWorker = createObjectUnderTest();

processWorker.run();
}
}

@Test
void testProcessWorkerWithProcessorThrowingExceptionIsCaughtProperly() {

final List<Record> records = List.of(mock(Record.class));
final CheckpointState checkpointState = mock(CheckpointState.class);
final Map.Entry<Collection, CheckpointState> readResult = Map.entry(records, checkpointState);
when(buffer.read(pipeline.getReadBatchTimeoutInMillis())).thenReturn(readResult);

final Processor processor = mock(Processor.class);
when(processor.execute(records)).thenThrow(RuntimeException.class);
when(processor.isReadyForShutdown()).thenReturn(true);

final Processor skippedProcessor = mock(Processor.class);
when(skippedProcessor.isReadyForShutdown()).thenReturn(true);
processors = List.of(processor, skippedProcessor);

final FutureHelperResult<Void> futureHelperResult = mock(FutureHelperResult.class);
when(futureHelperResult.getFailedReasons()).thenReturn(Collections.emptyList());


try (final MockedStatic<FutureHelper> futureHelperMockedStatic = mockStatic(FutureHelper.class)) {
futureHelperMockedStatic.when(() -> FutureHelper.awaitFuturesIndefinitely(sinkFutures))
.thenReturn(futureHelperResult);

final ProcessWorker processWorker = createObjectUnderTest();

processWorker.run();
}

verify(skippedProcessor, never()).execute(any());
}

@Test
void testProcessWorkerWithProcessorThrowingExceptionAndAcknowledgmentsEnabledIsHandledProperly() {

when(source.areAcknowledgementsEnabled()).thenReturn(true);

final List<Record<Event>> records = new ArrayList<>();
final Record<Event> mockRecord = mock(Record.class);
final Event mockEvent = mock(Event.class);
final EventHandle eventHandle = mock(DefaultEventHandle.class);
when(((DefaultEventHandle) eventHandle).getAcknowledgementSet()).thenReturn(mock(AcknowledgementSet.class));
doNothing().when(eventHandle).release(true);
when(mockRecord.getData()).thenReturn(mockEvent);
when(mockEvent.getEventHandle()).thenReturn(eventHandle);

records.add(mockRecord);

final CheckpointState checkpointState = mock(CheckpointState.class);
final Map.Entry<Collection, CheckpointState> readResult = Map.entry(records, checkpointState);
when(buffer.read(pipeline.getReadBatchTimeoutInMillis())).thenReturn(readResult);

final Processor processor = mock(Processor.class);
when(processor.execute(records)).thenThrow(RuntimeException.class);
when(processor.isReadyForShutdown()).thenReturn(true);

final Processor skippedProcessor = mock(Processor.class);
when(skippedProcessor.isReadyForShutdown()).thenReturn(true);
processors = List.of(processor, skippedProcessor);

final FutureHelperResult<Void> futureHelperResult = mock(FutureHelperResult.class);
when(futureHelperResult.getFailedReasons()).thenReturn(Collections.emptyList());


try (final MockedStatic<FutureHelper> futureHelperMockedStatic = mockStatic(FutureHelper.class)) {
futureHelperMockedStatic.when(() -> FutureHelper.awaitFuturesIndefinitely(sinkFutures))
.thenReturn(futureHelperResult);

final ProcessWorker processWorker = createObjectUnderTest();

processWorker.run();
}

verify(skippedProcessor, never()).execute(any());
}
}

0 comments on commit e1f3167

Please sign in to comment.