diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/peerforwarder/RequiresPeerForwarding.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/peerforwarder/RequiresPeerForwarding.java index 2e46e89cdb..3f00b86bf0 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/peerforwarder/RequiresPeerForwarding.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/peerforwarder/RequiresPeerForwarding.java @@ -4,6 +4,7 @@ */ package org.opensearch.dataprepper.model.peerforwarder; +import org.opensearch.dataprepper.model.event.Event; import java.util.Collection; @@ -18,4 +19,15 @@ public interface RequiresPeerForwarding { * @return A set of keys */ Collection getIdentificationKeys(); + + /** + * Determines if an event should be forwarded to the remote peer + * + * @param event input event + * + * @return true if the event should be forwarded to the peer + */ + default boolean isApplicableEventForPeerForwarding(Event event) { + return true; + } } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/peerforwarder/RequiresPeerForwardingTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/peerforwarder/RequiresPeerForwardingTest.java new file mode 100644 index 0000000000..c2edf502df --- /dev/null +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/peerforwarder/RequiresPeerForwardingTest.java @@ -0,0 +1,33 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model.peerforwarder; + +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.event.Event; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; + +import java.util.Collection; + +class RequiresPeerForwardingTest { + + public class SimpleRequiresPeerForwarding implements RequiresPeerForwarding { + @Override + public Collection getIdentificationKeys() { + return null; + } + } + + @Test + void testRequiresPeerForwardingTest() { + Event event = mock(Event.class); + RequiresPeerForwarding requiresPeerForwarding = new SimpleRequiresPeerForwarding(); + assertThat(requiresPeerForwarding.isApplicableEventForPeerForwarding(event), equalTo(true)); + } +} + + diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessorDecorator.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessorDecorator.java index c3c0f9977b..fa85146db2 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessorDecorator.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessorDecorator.java @@ -15,6 +15,7 @@ import java.util.Collection; import java.util.Collections; +import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -79,13 +80,25 @@ private PeerForwardingProcessorDecorator(final PeerForwarder peerForwarder, fina @Override public Collection> execute(final Collection> records) { - final Collection> recordsToProcessOnLocalPeer = peerForwarder.forwardRecords(records); + final Collection> recordsToProcess = new ArrayList<>(); + final Collection> recordsSkipped = new ArrayList<>(); + for (Record record: records) { + if (((RequiresPeerForwarding)innerProcessor).isApplicableEventForPeerForwarding(record.getData())) { + recordsToProcess.add(record); + } else { + recordsSkipped.add(record); + } + } + final Collection> recordsToProcessOnLocalPeer = peerForwarder.forwardRecords(recordsToProcess); + final Collection> receivedRecordsFromBuffer = peerForwarder.receiveRecords(); final Collection> recordsToProcessLocally = CollectionUtils.union( recordsToProcessOnLocalPeer, receivedRecordsFromBuffer); - return innerProcessor.execute(recordsToProcessLocally); + Collection> recordsOut = innerProcessor.execute(recordsToProcessLocally); + recordsOut.addAll(recordsSkipped); + return recordsOut; } @Override diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java index fb4effb413..8cfd1ddd43 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java @@ -93,8 +93,8 @@ public void run() { } } - private void processAcknowledgements(List inputEvents, Collection outputRecords) { - Set outputEventsSet = ((ArrayList>)outputRecords).stream().map(Record::getData).collect(Collectors.toSet()); + private void processAcknowledgements(List inputEvents, Collection> outputRecords) { + Set outputEventsSet = outputRecords.stream().map(Record::getData).collect(Collectors.toSet()); // For each event in the input events list that is not present in the output events, send positive acknowledgement, if acknowledgements are enabled for it inputEvents.forEach(event -> { EventHandle eventHandle = event.getEventHandle(); diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessingDecoratorTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessingDecoratorTest.java index 3956c76efe..a9611178bc 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessingDecoratorTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessingDecoratorTest.java @@ -29,6 +29,7 @@ import java.util.UUID; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.CoreMatchers.equalTo; import static org.mockito.ArgumentMatchers.anyCollection; @@ -97,7 +98,6 @@ void decorateProcessors_with_different_identification_key_should_throw() { assertThrows(RuntimeException.class, () -> createObjectUnderTesDecoratedProcessors(List.of(((Processor) requiresPeerForwarding), (Processor) requiresPeerForwardingCopy))); } - @Test void decorateProcessors_with_empty_processors_should_return_empty_list_of_processors() { final List processors = createObjectUnderTesDecoratedProcessors(Collections.emptyList()); @@ -129,9 +129,12 @@ void PeerForwardingProcessingDecorator_should_have_interaction_with_getIdentific @Test void PeerForwardingProcessingDecorator_execute_should_forwardRecords_with_correct_values() { + Event event = mock(Event.class); + when(record.getData()).thenReturn(event); + when(requiresPeerForwarding.isApplicableEventForPeerForwarding(event)).thenReturn(true); List> testData = Collections.singletonList(record); - when(peerForwarder.forwardRecords(testData)).thenReturn(testData); + when(peerForwarder.forwardRecords(anyCollection())).thenReturn(testData); when(processor.execute(testData)).thenReturn(testData); @@ -140,7 +143,7 @@ void PeerForwardingProcessingDecorator_execute_should_forwardRecords_with_correc final Collection> records = processors.get(0).execute(testData); verify(requiresPeerForwarding, times(2)).getIdentificationKeys(); - verify(peerForwarder).forwardRecords(testData); + verify(peerForwarder).forwardRecords(anyCollection()); Assertions.assertNotNull(records); assertThat(records.size(), equalTo(testData.size())); assertThat(records, equalTo(testData)); @@ -148,10 +151,13 @@ void PeerForwardingProcessingDecorator_execute_should_forwardRecords_with_correc @Test void PeerForwardingProcessingDecorator_execute_should_receiveRecords() { + Event event = mock(Event.class); + when(record.getData()).thenReturn(event); + when(requiresPeerForwarding.isApplicableEventForPeerForwarding(event)).thenReturn(true); Collection> forwardTestData = Collections.singletonList(record); Collection> receiveTestData = Collections.singletonList(mock(Record.class)); - when(peerForwarder.forwardRecords(forwardTestData)).thenReturn(forwardTestData); + when(peerForwarder.forwardRecords(anyCollection())).thenReturn(forwardTestData); when(peerForwarder.receiveRecords()).thenReturn(receiveTestData); final Collection> expectedRecordsToProcessLocally = CollectionUtils.union(forwardTestData, receiveTestData); @@ -163,7 +169,7 @@ void PeerForwardingProcessingDecorator_execute_should_receiveRecords() { final Collection> records = processors.get(0).execute(forwardTestData); verify(requiresPeerForwarding, times(2)).getIdentificationKeys(); - verify(peerForwarder).forwardRecords(forwardTestData); + verify(peerForwarder).forwardRecords(anyCollection()); verify(peerForwarder).receiveRecords(); Assertions.assertNotNull(records); assertThat(records.size(), equalTo(expectedRecordsToProcessLocally.size())); @@ -172,6 +178,9 @@ void PeerForwardingProcessingDecorator_execute_should_receiveRecords() { @Test void PeerForwardingProcessingDecorator_execute_will_call_inner_processors_execute() { + Event event = mock(Event.class); + when(record.getData()).thenReturn(event); + when(requiresPeerForwarding.isApplicableEventForPeerForwarding(event)).thenReturn(true); final List processors = createObjectUnderTesDecoratedProcessors(Collections.singletonList(processor)); Collection> testData = Collections.singletonList(record); @@ -180,6 +189,38 @@ void PeerForwardingProcessingDecorator_execute_will_call_inner_processors_execut verify(processor).execute(anyCollection()); } + @Test + void PeerForwardingProcessingDecorator_inner_processor_with_is_applicable_event_overridden() { + Event event1 = mock(Event.class); + Event event2 = mock(Event.class); + Event event3 = mock(Event.class); + Record record1 = mock(Record.class); + Record record2 = mock(Record.class); + Record record3 = mock(Record.class); + Record aggregatedRecord = mock(Record.class); + List aggregatedRecords = new ArrayList<>(); + aggregatedRecords.add(aggregatedRecord); + when(processor.execute(anyCollection())).thenReturn(aggregatedRecords); + when(requiresPeerForwarding.isApplicableEventForPeerForwarding(event1)).thenReturn(true); + when(requiresPeerForwarding.isApplicableEventForPeerForwarding(event2)).thenReturn(false); + when(requiresPeerForwarding.isApplicableEventForPeerForwarding(event3)).thenReturn(true); + final List processors = createObjectUnderTesDecoratedProcessors(Collections.singletonList(processor)); + when(record1.getData()).thenReturn(event1); + when(record2.getData()).thenReturn(event2); + when(record3.getData()).thenReturn(event3); + Collection> recordsIn = new ArrayList<>(); + recordsIn.add(record1); + recordsIn.add(record2); + recordsIn.add(record3); + + assertThat(processors.size(), equalTo(1)); + Collection> recordsOut = processors.get(0).execute(recordsIn); + verify(processor).execute(anyCollection()); + assertThat(recordsOut.size(), equalTo(2)); + assertTrue(recordsOut.contains(aggregatedRecord)); + assertTrue(recordsOut.contains(record2)); + } + @Test void PeerForwardingProcessingDecorator_prepareForShutdown_will_call_inner_processors_prepareForShutdown() { final List processors = createObjectUnderTesDecoratedProcessors(Collections.singletonList(processor)); @@ -208,4 +249,4 @@ void PeerForwardingProcessingDecorator_shutdown_will_call_inner_processors_shutd } } -} \ No newline at end of file +} diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessor.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessor.java index f44592c3a1..e898840e2e 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessor.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessor.java @@ -148,6 +148,14 @@ public void shutdown() { } + @Override + public boolean isApplicableEventForPeerForwarding(Event event) { + if (whenCondition == null) { + return true; + } + return expressionEvaluator.evaluateConditional(whenCondition, event); + } + @Override public Collection getIdentificationKeys() { return aggregateProcessorConfig.getIdentificationKeys(); diff --git a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorTest.java b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorTest.java index ad0d763078..9a439efea2 100644 --- a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorTest.java +++ b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorTest.java @@ -228,6 +228,9 @@ void handleEvent_returning_with_condition_eliminates_one_record() { recordsIn.add(new Record(secondEvent)); recordsIn.add(new Record(event)); Collection> c = recordsIn; + assertThat(objectUnderTest.isApplicableEventForPeerForwarding(event), equalTo(true)); + assertThat(objectUnderTest.isApplicableEventForPeerForwarding(firstEvent), equalTo(true)); + assertThat(objectUnderTest.isApplicableEventForPeerForwarding(secondEvent), equalTo(false)); final List> recordsOut = (List>) objectUnderTest.doExecute(c); assertThat(recordsOut.size(), equalTo(2));