From 41c4f39722cc5477fbd777ca07bfae6bbb231d4b Mon Sep 17 00:00:00 2001 From: Asif Sohail Mohammed Date: Mon, 12 Feb 2024 10:13:45 -0600 Subject: [PATCH] Add when condition to each geoip entry (#4034) * Add when condition Signed-off-by: Asif Sohail Mohammed --------- Signed-off-by: Asif Sohail Mohammed --- .../plugins/processor/GeoIPProcessor.java | 40 +++++++++++------- .../processor/GeoIPProcessorConfig.java | 1 - .../processor/loadtype/LoadTypeOptions.java | 37 ---------------- .../plugins/processor/GeoIPProcessorTest.java | 42 +++++++++++++------ .../configuration/EntryConfigTest.java | 19 +++++---- .../loadtype/LoadTypeOptionsTest.java | 30 ------------- 6 files changed, 66 insertions(+), 103 deletions(-) delete mode 100644 data-prepper-plugins/geoip-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/loadtype/LoadTypeOptions.java delete mode 100644 data-prepper-plugins/geoip-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/loadtype/LoadTypeOptionsTest.java diff --git a/data-prepper-plugins/geoip-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/GeoIPProcessor.java b/data-prepper-plugins/geoip-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/GeoIPProcessor.java index 3b3e42f9d0..1d04ede2e8 100644 --- a/data-prepper-plugins/geoip-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/GeoIPProcessor.java +++ b/data-prepper-plugins/geoip-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/GeoIPProcessor.java @@ -37,15 +37,15 @@ public class GeoIPProcessor extends AbstractProcessor, Record> { private static final Logger LOG = LoggerFactory.getLogger(GeoIPProcessor.class); - //TODO: rename metrics - static final String GEO_IP_PROCESSING_MATCH = "geoIpProcessingMatch"; - static final String GEO_IP_PROCESSING_MISMATCH = "geoIpProcessingMismatch"; - private final Counter geoIpProcessingMatchCounter; - private final Counter geoIpProcessingMismatchCounter; + static final String GEO_IP_EVENTS_PROCESSED = "eventsProcessed"; + static final String GEO_IP_EVENTS_FAILED_LOOKUP = "eventsFailedLookup"; + static final String GEO_IP_EVENTS_FAILED_ENGINE_EXCEPTION = "eventsFailedEngineException"; + private final Counter geoIpEventsProcessed; + private final Counter geoIpEventsFailedLookup; + private final Counter geoIpEventsFailedEngineException; private final GeoIPProcessorConfig geoIPProcessorConfig; private final List tagsOnFailure; private final GeoIPProcessorService geoIPProcessorService; - private final String whenCondition; private final ExpressionEvaluator expressionEvaluator; /** @@ -63,10 +63,11 @@ public GeoIPProcessor(final PluginMetrics pluginMetrics, this.geoIPProcessorConfig = geoIPProcessorConfig; this.geoIPProcessorService = geoIpConfigSupplier.getGeoIPProcessorService(); this.tagsOnFailure = geoIPProcessorConfig.getTagsOnFailure(); - this.whenCondition = geoIPProcessorConfig.getWhenCondition(); this.expressionEvaluator = expressionEvaluator; - this.geoIpProcessingMatchCounter = pluginMetrics.counter(GEO_IP_PROCESSING_MATCH); - this.geoIpProcessingMismatchCounter = pluginMetrics.counter(GEO_IP_PROCESSING_MISMATCH); + this.geoIpEventsProcessed = pluginMetrics.counter(GEO_IP_EVENTS_PROCESSED); + this.geoIpEventsFailedLookup = pluginMetrics.counter(GEO_IP_EVENTS_FAILED_LOOKUP); + //TODO: Use the exception metric for exceptions from service + this.geoIpEventsFailedEngineException = pluginMetrics.counter(GEO_IP_EVENTS_FAILED_ENGINE_EXCEPTION); } /** @@ -80,10 +81,16 @@ public Collection> doExecute(final Collection> recor for (final Record eventRecord : records) { final Event event = eventRecord.getData(); + final String whenCondition = geoIPProcessorConfig.getWhenCondition(); + if (whenCondition != null && !expressionEvaluator.evaluateConditional(whenCondition, event)) { continue; } - for (EntryConfig entry : geoIPProcessorConfig.getEntries()) { + + boolean isEventFailedLookup = false; + geoIpEventsProcessed.increment(); + + for (final EntryConfig entry : geoIPProcessorConfig.getEntries()) { final String source = entry.getSource(); final List attributes = entry.getFields(); final String ipAddress = event.get(source, String.class); @@ -94,18 +101,23 @@ public Collection> doExecute(final Collection> recor if (IPValidationCheck.isPublicIpAddress(ipAddress)) { geoData = geoIPProcessorService.getGeoData(InetAddress.getByName(ipAddress), attributes); eventRecord.getData().put(entry.getTarget(), geoData); - geoIpProcessingMatchCounter.increment(); + } else { + isEventFailedLookup = true; } } catch (final IOException | EnrichFailedException ex) { - geoIpProcessingMismatchCounter.increment(); - event.getMetadata().addTags(tagsOnFailure); + isEventFailedLookup = true; LOG.error(DataPrepperMarkers.EVENT, "Failed to get Geo data for event: [{}] for the IP address [{}]", event, ipAddress, ex); } } else { //No Enrichment. - event.getMetadata().addTags(tagsOnFailure); + isEventFailedLookup = true; } } + + if (isEventFailedLookup) { + geoIpEventsFailedLookup.increment(); + event.getMetadata().addTags(tagsOnFailure); + } } return records; } diff --git a/data-prepper-plugins/geoip-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/GeoIPProcessorConfig.java b/data-prepper-plugins/geoip-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/GeoIPProcessorConfig.java index ca0b00e8e7..222a8698e4 100644 --- a/data-prepper-plugins/geoip-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/GeoIPProcessorConfig.java +++ b/data-prepper-plugins/geoip-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/GeoIPProcessorConfig.java @@ -30,7 +30,6 @@ public class GeoIPProcessorConfig { @JsonProperty("geoip_when") private String whenCondition; - /** * Get List of entries * @return List of EntryConfig diff --git a/data-prepper-plugins/geoip-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/loadtype/LoadTypeOptions.java b/data-prepper-plugins/geoip-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/loadtype/LoadTypeOptions.java deleted file mode 100644 index 38fe132a07..0000000000 --- a/data-prepper-plugins/geoip-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/loadtype/LoadTypeOptions.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.dataprepper.plugins.processor.loadtype; - -import com.fasterxml.jackson.annotation.JsonCreator; - -import java.util.Arrays; -import java.util.Map; -import java.util.stream.Collectors; - -/** - * LoadTypeOptions enumeration - */ -public enum LoadTypeOptions { - - INMEMORY("memory_map"), - CACHE("cache"); - - private final String option; - private static final Map OPTIONS_MAP = Arrays.stream(LoadTypeOptions.values()) - .collect(Collectors.toMap( - value -> value.option, - value -> value - )); - - LoadTypeOptions(final String option) { - this.option = option; - } - - @JsonCreator - static LoadTypeOptions fromOptionValue(final String option) { - return OPTIONS_MAP.get(option); - } -} \ No newline at end of file diff --git a/data-prepper-plugins/geoip-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/GeoIPProcessorTest.java b/data-prepper-plugins/geoip-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/GeoIPProcessorTest.java index d6b8a7bf01..55d28b566b 100644 --- a/data-prepper-plugins/geoip-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/GeoIPProcessorTest.java +++ b/data-prepper-plugins/geoip-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/GeoIPProcessorTest.java @@ -39,11 +39,12 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -import static org.opensearch.dataprepper.plugins.processor.GeoIPProcessor.GEO_IP_PROCESSING_MATCH; -import static org.opensearch.dataprepper.plugins.processor.GeoIPProcessor.GEO_IP_PROCESSING_MISMATCH; +import static org.opensearch.dataprepper.plugins.processor.GeoIPProcessor.GEO_IP_EVENTS_FAILED_LOOKUP; +import static org.opensearch.dataprepper.plugins.processor.GeoIPProcessor.GEO_IP_EVENTS_PROCESSED; @ExtendWith(MockitoExtension.class) class GeoIPProcessorTest { @@ -62,21 +63,21 @@ class GeoIPProcessorTest { @Mock private PluginMetrics pluginMetrics; @Mock - private Counter geoIpProcessingMatch; + private Counter geoIpEventsProcessed; @Mock - private Counter geoIpProcessingMismatch; + private Counter geoIpEventsFailedLookup; @BeforeEach void setUp() { when(geoIpConfigSupplier.getGeoIPProcessorService()).thenReturn(geoIPProcessorService); - lenient().when(pluginMetrics.counter(GEO_IP_PROCESSING_MATCH)).thenReturn(geoIpProcessingMatch); - lenient().when(pluginMetrics.counter(GEO_IP_PROCESSING_MISMATCH)).thenReturn(geoIpProcessingMismatch); + lenient().when(pluginMetrics.counter(GEO_IP_EVENTS_PROCESSED)).thenReturn(geoIpEventsProcessed); + lenient().when(pluginMetrics.counter(GEO_IP_EVENTS_FAILED_LOOKUP)).thenReturn(geoIpEventsFailedLookup); } @AfterEach void tearDown() { - verifyNoMoreInteractions(geoIpProcessingMatch); - verifyNoMoreInteractions(geoIpProcessingMismatch); + verifyNoMoreInteractions(geoIpEventsProcessed); + verifyNoMoreInteractions(geoIpEventsFailedLookup); } private GeoIPProcessor createObjectUnderTest() { @@ -121,11 +122,11 @@ void doExecuteTest_with_when_condition_should_only_enrich_events_that_match_when final Event event = record.getData(); assertThat(event.get("/peer/status", String.class), equalTo("success")); } - verify(geoIpProcessingMatch).increment(); + verify(geoIpEventsProcessed, times(1)).increment(); } @Test - void doExecuteTest() throws NoSuchFieldException, IllegalAccessException { + void doExecuteTest_should_add_geo_data_to_event_if_source_is_non_null() throws NoSuchFieldException, IllegalAccessException { when(geoIPProcessorConfig.getEntries()).thenReturn(List.of(entry)); when(entry.getSource()).thenReturn(SOURCE); when(entry.getTarget()).thenReturn(TARGET); @@ -141,10 +142,26 @@ void doExecuteTest() throws NoSuchFieldException, IllegalAccessException { final Event event = record.getData(); assertThat(event.get("/peer/ip", String.class), equalTo("136.226.242.205")); assertThat(event.containsKey("geolocation"), equalTo(true)); - verify(geoIpProcessingMatch).increment(); + verify(geoIpEventsProcessed).increment(); } } + @Test + void doExecuteTest_should_not_add_geo_data_to_event_if_source_is_null() throws NoSuchFieldException, IllegalAccessException { + when(geoIPProcessorConfig.getEntries()).thenReturn(List.of(entry)); + when(entry.getSource()).thenReturn("ip"); + when(entry.getFields()).thenReturn(setFields()); + + final GeoIPProcessor geoIPProcessor = createObjectUnderTest(); + + ReflectivelySetField.setField(GeoIPProcessor.class, geoIPProcessor, + "geoIPProcessorService", geoIPProcessorService); + Collection> records = geoIPProcessor.doExecute(setEventQueue()); + + verify(geoIpEventsProcessed).increment(); + verify(geoIpEventsFailedLookup).increment(); + } + @Test void test_tags_when_enrich_fails() { when(entry.getSource()).thenReturn(SOURCE); @@ -165,7 +182,8 @@ void test_tags_when_enrich_fails() { for (final Record record : records) { Event event = record.getData(); assertTrue(event.getMetadata().hasTags(testTags)); - verify(geoIpProcessingMismatch).increment(); + verify(geoIpEventsFailedLookup).increment(); + verify(geoIpEventsProcessed).increment(); } } diff --git a/data-prepper-plugins/geoip-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/configuration/EntryConfigTest.java b/data-prepper-plugins/geoip-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/configuration/EntryConfigTest.java index 3863663c7a..129dcb7370 100644 --- a/data-prepper-plugins/geoip-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/configuration/EntryConfigTest.java +++ b/data-prepper-plugins/geoip-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/configuration/EntryConfigTest.java @@ -19,9 +19,6 @@ import static org.opensearch.dataprepper.plugins.processor.configuration.EntryConfig.DEFAULT_TARGET; class EntryConfigTest { - public static final String SOURCE_VALUE = "source"; - public static final String TARGET_VALUE = "target"; - public static final List FIELDS_VALUE = List.of("city", "country"); private EntryConfig entryConfig; @BeforeEach @@ -38,12 +35,16 @@ void testDefaultConfig() { @Test void testCustomConfig() throws NoSuchFieldException, IllegalAccessException { - ReflectivelySetField.setField(EntryConfig.class, entryConfig, "source", SOURCE_VALUE); - ReflectivelySetField.setField(EntryConfig.class, entryConfig, "target", TARGET_VALUE); - ReflectivelySetField.setField(EntryConfig.class, entryConfig, "fields", FIELDS_VALUE); + final String sourceValue = "source"; + final String targetValue = "target"; + final List fieldsValue = List.of("city", "country"); - assertThat(entryConfig.getSource(), equalTo(SOURCE_VALUE)); - assertThat(entryConfig.getTarget(), equalTo(TARGET_VALUE)); - assertThat(entryConfig.getFields(), equalTo(FIELDS_VALUE)); + ReflectivelySetField.setField(EntryConfig.class, entryConfig, "source", sourceValue); + ReflectivelySetField.setField(EntryConfig.class, entryConfig, "target", targetValue); + ReflectivelySetField.setField(EntryConfig.class, entryConfig, "fields", fieldsValue); + + assertThat(entryConfig.getSource(), equalTo(sourceValue)); + assertThat(entryConfig.getTarget(), equalTo(targetValue)); + assertThat(entryConfig.getFields(), equalTo(fieldsValue)); } } diff --git a/data-prepper-plugins/geoip-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/loadtype/LoadTypeOptionsTest.java b/data-prepper-plugins/geoip-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/loadtype/LoadTypeOptionsTest.java deleted file mode 100644 index 999a970482..0000000000 --- a/data-prepper-plugins/geoip-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/loadtype/LoadTypeOptionsTest.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.dataprepper.plugins.processor.loadtype; - -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.junit.jupiter.MockitoExtension; - -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.jupiter.api.Assertions.assertNotNull; - -@ExtendWith(MockitoExtension.class) -class LoadTypeOptionsTest { - - @Test - void notNull_test() { - assertNotNull(LoadTypeOptions.INMEMORY); - } - - @Test - void fromOptionValue_test() { - LoadTypeOptions loadTypeOptions = LoadTypeOptions.fromOptionValue("memory_map"); - assertNotNull(loadTypeOptions); - assertThat(loadTypeOptions.toString(), equalTo("INMEMORY")); - } -}