Skip to content

Commit

Permalink
Add when condition to each geoip entry (opensearch-project#4034)
Browse files Browse the repository at this point in the history
* Add when condition

Signed-off-by: Asif Sohail Mohammed <[email protected]>

---------

Signed-off-by: Asif Sohail Mohammed <[email protected]>
  • Loading branch information
asifsmohammed authored Feb 12, 2024
1 parent 24a9d81 commit 41c4f39
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@
public class GeoIPProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {

private static final Logger LOG = LoggerFactory.getLogger(GeoIPProcessor.class);
//TODO: rename metrics
static final String GEO_IP_PROCESSING_MATCH = "geoIpProcessingMatch";
static final String GEO_IP_PROCESSING_MISMATCH = "geoIpProcessingMismatch";
private final Counter geoIpProcessingMatchCounter;
private final Counter geoIpProcessingMismatchCounter;
static final String GEO_IP_EVENTS_PROCESSED = "eventsProcessed";
static final String GEO_IP_EVENTS_FAILED_LOOKUP = "eventsFailedLookup";
static final String GEO_IP_EVENTS_FAILED_ENGINE_EXCEPTION = "eventsFailedEngineException";
private final Counter geoIpEventsProcessed;
private final Counter geoIpEventsFailedLookup;
private final Counter geoIpEventsFailedEngineException;
private final GeoIPProcessorConfig geoIPProcessorConfig;
private final List<String> tagsOnFailure;
private final GeoIPProcessorService geoIPProcessorService;
private final String whenCondition;
private final ExpressionEvaluator expressionEvaluator;

/**
Expand All @@ -63,10 +63,11 @@ public GeoIPProcessor(final PluginMetrics pluginMetrics,
this.geoIPProcessorConfig = geoIPProcessorConfig;
this.geoIPProcessorService = geoIpConfigSupplier.getGeoIPProcessorService();
this.tagsOnFailure = geoIPProcessorConfig.getTagsOnFailure();
this.whenCondition = geoIPProcessorConfig.getWhenCondition();
this.expressionEvaluator = expressionEvaluator;
this.geoIpProcessingMatchCounter = pluginMetrics.counter(GEO_IP_PROCESSING_MATCH);
this.geoIpProcessingMismatchCounter = pluginMetrics.counter(GEO_IP_PROCESSING_MISMATCH);
this.geoIpEventsProcessed = pluginMetrics.counter(GEO_IP_EVENTS_PROCESSED);
this.geoIpEventsFailedLookup = pluginMetrics.counter(GEO_IP_EVENTS_FAILED_LOOKUP);
//TODO: Use the exception metric for exceptions from service
this.geoIpEventsFailedEngineException = pluginMetrics.counter(GEO_IP_EVENTS_FAILED_ENGINE_EXCEPTION);
}

/**
Expand All @@ -80,10 +81,16 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor

for (final Record<Event> eventRecord : records) {
final Event event = eventRecord.getData();
final String whenCondition = geoIPProcessorConfig.getWhenCondition();

if (whenCondition != null && !expressionEvaluator.evaluateConditional(whenCondition, event)) {
continue;
}
for (EntryConfig entry : geoIPProcessorConfig.getEntries()) {

boolean isEventFailedLookup = false;
geoIpEventsProcessed.increment();

for (final EntryConfig entry : geoIPProcessorConfig.getEntries()) {
final String source = entry.getSource();
final List<String> attributes = entry.getFields();
final String ipAddress = event.get(source, String.class);
Expand All @@ -94,18 +101,23 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
if (IPValidationCheck.isPublicIpAddress(ipAddress)) {
geoData = geoIPProcessorService.getGeoData(InetAddress.getByName(ipAddress), attributes);
eventRecord.getData().put(entry.getTarget(), geoData);
geoIpProcessingMatchCounter.increment();
} else {
isEventFailedLookup = true;
}
} catch (final IOException | EnrichFailedException ex) {
geoIpProcessingMismatchCounter.increment();
event.getMetadata().addTags(tagsOnFailure);
isEventFailedLookup = true;
LOG.error(DataPrepperMarkers.EVENT, "Failed to get Geo data for event: [{}] for the IP address [{}]", event, ipAddress, ex);
}
} else {
//No Enrichment.
event.getMetadata().addTags(tagsOnFailure);
isEventFailedLookup = true;
}
}

if (isEventFailedLookup) {
geoIpEventsFailedLookup.increment();
event.getMetadata().addTags(tagsOnFailure);
}
}
return records;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ public class GeoIPProcessorConfig {
@JsonProperty("geoip_when")
private String whenCondition;


/**
* Get List of entries
* @return List of EntryConfig
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,12 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.processor.GeoIPProcessor.GEO_IP_PROCESSING_MATCH;
import static org.opensearch.dataprepper.plugins.processor.GeoIPProcessor.GEO_IP_PROCESSING_MISMATCH;
import static org.opensearch.dataprepper.plugins.processor.GeoIPProcessor.GEO_IP_EVENTS_FAILED_LOOKUP;
import static org.opensearch.dataprepper.plugins.processor.GeoIPProcessor.GEO_IP_EVENTS_PROCESSED;

@ExtendWith(MockitoExtension.class)
class GeoIPProcessorTest {
Expand All @@ -62,21 +63,21 @@ class GeoIPProcessorTest {
@Mock
private PluginMetrics pluginMetrics;
@Mock
private Counter geoIpProcessingMatch;
private Counter geoIpEventsProcessed;
@Mock
private Counter geoIpProcessingMismatch;
private Counter geoIpEventsFailedLookup;

@BeforeEach
void setUp() {
when(geoIpConfigSupplier.getGeoIPProcessorService()).thenReturn(geoIPProcessorService);
lenient().when(pluginMetrics.counter(GEO_IP_PROCESSING_MATCH)).thenReturn(geoIpProcessingMatch);
lenient().when(pluginMetrics.counter(GEO_IP_PROCESSING_MISMATCH)).thenReturn(geoIpProcessingMismatch);
lenient().when(pluginMetrics.counter(GEO_IP_EVENTS_PROCESSED)).thenReturn(geoIpEventsProcessed);
lenient().when(pluginMetrics.counter(GEO_IP_EVENTS_FAILED_LOOKUP)).thenReturn(geoIpEventsFailedLookup);
}

@AfterEach
void tearDown() {
verifyNoMoreInteractions(geoIpProcessingMatch);
verifyNoMoreInteractions(geoIpProcessingMismatch);
verifyNoMoreInteractions(geoIpEventsProcessed);
verifyNoMoreInteractions(geoIpEventsFailedLookup);
}

private GeoIPProcessor createObjectUnderTest() {
Expand Down Expand Up @@ -121,11 +122,11 @@ void doExecuteTest_with_when_condition_should_only_enrich_events_that_match_when
final Event event = record.getData();
assertThat(event.get("/peer/status", String.class), equalTo("success"));
}
verify(geoIpProcessingMatch).increment();
verify(geoIpEventsProcessed, times(1)).increment();
}

@Test
void doExecuteTest() throws NoSuchFieldException, IllegalAccessException {
void doExecuteTest_should_add_geo_data_to_event_if_source_is_non_null() throws NoSuchFieldException, IllegalAccessException {
when(geoIPProcessorConfig.getEntries()).thenReturn(List.of(entry));
when(entry.getSource()).thenReturn(SOURCE);
when(entry.getTarget()).thenReturn(TARGET);
Expand All @@ -141,10 +142,26 @@ void doExecuteTest() throws NoSuchFieldException, IllegalAccessException {
final Event event = record.getData();
assertThat(event.get("/peer/ip", String.class), equalTo("136.226.242.205"));
assertThat(event.containsKey("geolocation"), equalTo(true));
verify(geoIpProcessingMatch).increment();
verify(geoIpEventsProcessed).increment();
}
}

@Test
void doExecuteTest_should_not_add_geo_data_to_event_if_source_is_null() throws NoSuchFieldException, IllegalAccessException {
when(geoIPProcessorConfig.getEntries()).thenReturn(List.of(entry));
when(entry.getSource()).thenReturn("ip");
when(entry.getFields()).thenReturn(setFields());

final GeoIPProcessor geoIPProcessor = createObjectUnderTest();

ReflectivelySetField.setField(GeoIPProcessor.class, geoIPProcessor,
"geoIPProcessorService", geoIPProcessorService);
Collection<Record<Event>> records = geoIPProcessor.doExecute(setEventQueue());

verify(geoIpEventsProcessed).increment();
verify(geoIpEventsFailedLookup).increment();
}

@Test
void test_tags_when_enrich_fails() {
when(entry.getSource()).thenReturn(SOURCE);
Expand All @@ -165,7 +182,8 @@ void test_tags_when_enrich_fails() {
for (final Record<Event> record : records) {
Event event = record.getData();
assertTrue(event.getMetadata().hasTags(testTags));
verify(geoIpProcessingMismatch).increment();
verify(geoIpEventsFailedLookup).increment();
verify(geoIpEventsProcessed).increment();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@
import static org.opensearch.dataprepper.plugins.processor.configuration.EntryConfig.DEFAULT_TARGET;

class EntryConfigTest {
public static final String SOURCE_VALUE = "source";
public static final String TARGET_VALUE = "target";
public static final List<String> FIELDS_VALUE = List.of("city", "country");
private EntryConfig entryConfig;

@BeforeEach
Expand All @@ -38,12 +35,16 @@ void testDefaultConfig() {

@Test
void testCustomConfig() throws NoSuchFieldException, IllegalAccessException {
ReflectivelySetField.setField(EntryConfig.class, entryConfig, "source", SOURCE_VALUE);
ReflectivelySetField.setField(EntryConfig.class, entryConfig, "target", TARGET_VALUE);
ReflectivelySetField.setField(EntryConfig.class, entryConfig, "fields", FIELDS_VALUE);
final String sourceValue = "source";
final String targetValue = "target";
final List<String> fieldsValue = List.of("city", "country");

assertThat(entryConfig.getSource(), equalTo(SOURCE_VALUE));
assertThat(entryConfig.getTarget(), equalTo(TARGET_VALUE));
assertThat(entryConfig.getFields(), equalTo(FIELDS_VALUE));
ReflectivelySetField.setField(EntryConfig.class, entryConfig, "source", sourceValue);
ReflectivelySetField.setField(EntryConfig.class, entryConfig, "target", targetValue);
ReflectivelySetField.setField(EntryConfig.class, entryConfig, "fields", fieldsValue);

assertThat(entryConfig.getSource(), equalTo(sourceValue));
assertThat(entryConfig.getTarget(), equalTo(targetValue));
assertThat(entryConfig.getFields(), equalTo(fieldsValue));
}
}

This file was deleted.

0 comments on commit 41c4f39

Please sign in to comment.