Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.15] Fix notifications listener leak in threat intel monitor #1363

Merged
merged 3 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import org.opensearch.gradle.test.RestIntegTestTask

buildscript {
ext {
opensearch_version = System.getProperty("opensearch.version", "2.15.0-SNAPSHOT")
opensearch_version = System.getProperty("opensearch.version", "2.15.1-SNAPSHOT")
isSnapshot = "true" == System.getProperty("build.snapshot", "true")
buildVersionQualifier = System.getProperty("build.version_qualifier", "")
version_tokens = opensearch_version.tokenize('-')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public void sendNotification(String configId, String severity, String subject, S
sendNotificationResponse -> {
if (sendNotificationResponse.getStatus() == RestStatus.OK) {
logger.info("Successfully sent a notification, Notification Event: " + sendNotificationResponse.getNotificationEvent());
listener.onResponse(null);
} else {
listener.onFailure(new Exception("Error while sending a notification, Notification Event: " + sendNotificationResponse.getNotificationEvent()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,19 @@ public void bulkIndexEntities(List<Entity> newEntityList, List<Entity> updatedEn
}
}
actionListener.onResponse(null);
}, actionListener::onFailure), bulkRequestList.size());
}, e1 -> {
log.error("Failed to bulk index " + getEntityName(), e1);
actionListener.onFailure(e1);
}), bulkRequestList.size());

for (BulkRequest req : bulkRequestList) {
try {
client.bulk(req, groupedListener); //todo why stash context here?
client.bulk(req, groupedListener);
} catch (Exception e) {
log.error(
() -> new ParameterizedMessage("Failed to bulk save {} {}.", req.batchSize(), getEntityName()),
e);
groupedListener.onFailure(e);
}
}
}, e -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,19 @@ public void scanIoCs(IocScanContext<Data> iocScanContext,
(iocFindings, e1) -> {
if (e1 != null) {
log.error(
() -> new ParameterizedMessage("Threat intel monitor {}: Failed to create ioc findings/ ",
() -> new ParameterizedMessage("Threat intel monitor {}: Failed to create ioc findings",
iocScanContext.getMonitor().getId(), data.size()),
e1);
scanCallback.accept(null, e1);
scanCallback.accept(data, e1);
} else {
BiConsumer<List<ThreatIntelAlert>, Exception> triggerResultConsumer = (alerts, e2) -> {
if (e2 != null) {
log.error(
() -> new ParameterizedMessage("Threat intel monitor {}: Failed to execute threat intel triggers/ ",
iocScanContext.getMonitor().getId(), data.size()),
e2);
scanCallback.accept(null, e2);
return;
// if findings are generated successfully but alerts/notifications fail we mark execution as succeeded, so that duplicate findings are not created
scanCallback.accept(data, null);
} else {
scanCallback.accept(data, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ private void executeTrigger(List<IocFinding> iocFindings,
} else {
fetchExistingAlertsForTrigger(monitor, triggerMatchedFindings, trigger, ActionListener.wrap(
existingAlerts -> {
executeActionsAndSaveAlerts(iocFindings, trigger, monitor, existingAlerts, triggerMatchedFindings, threatIntelTrigger, listener);
saveAlertsAndExecuteActions(iocFindings, trigger, monitor, existingAlerts, triggerMatchedFindings, threatIntelTrigger, listener);
},
e -> {
log.error(() -> new ParameterizedMessage(
Expand All @@ -132,7 +132,7 @@ private void executeTrigger(List<IocFinding> iocFindings,
}
}

private void executeActionsAndSaveAlerts(List<IocFinding> iocFindings,
private void saveAlertsAndExecuteActions(List<IocFinding> iocFindings,
Trigger trigger,
Monitor monitor,
List<ThreatIntelAlert> existingAlerts,
Expand All @@ -147,36 +147,38 @@ private void executeActionsAndSaveAlerts(List<IocFinding> iocFindings,
newAlerts,
existingAlerts);
if (false == trigger.getActions().isEmpty()) {
GroupedActionListener<Void> notifsListener = new GroupedActionListener<>(ActionListener.wrap(
r -> {
saveAlerts(new ArrayList<>(iocToUpdatedAlertsMap.values()),
newAlerts,
monitor,
(threatIntelAlerts, e) -> {
if (e != null) {
log.error(String.format("Threat intel monitor %s: Failed to save alerts for trigger {}", monitor.getId(), trigger.getId()), e);
listener.onFailure(e);
} else {
saveAlerts(new ArrayList<>(iocToUpdatedAlertsMap.values()),
newAlerts,
monitor,
(threatIntelAlerts, e) -> {
if (e != null) {
log.error(String.format("Threat intel monitor %s: Failed to save alerts for trigger %s", monitor.getId(), trigger.getId()), e);
listener.onFailure(e);
} else {
GroupedActionListener<Void> notifsListener = new GroupedActionListener<>(ActionListener.wrap(
r -> {
listener.onResponse(threatIntelAlerts);
}, ex -> {
log.error(String.format("Threat intel monitor {}: Failed to send notification for trigger {}", monitor.getId(), trigger.getId()), ex);
listener.onFailure(new SecurityAnalyticsException("Failed to send notification", RestStatus.INTERNAL_SERVER_ERROR, ex));
}
});
}, e -> {
log.error(String.format("Threat intel monitor %s: Failed to send notification for trigger {}", monitor.getId(), trigger.getId()), e);
listener.onFailure(new SecurityAnalyticsException("Failed to send notification", RestStatus.INTERNAL_SERVER_ERROR, e));
}
), trigger.getActions().size());
for (Action action : trigger.getActions()) {
try {
String transformedSubject = NotificationService.compileTemplate(ctx, action.getSubjectTemplate());
String transformedMessage = NotificationService.compileTemplate(ctx, action.getMessageTemplate());
String configId = action.getDestinationId();
notificationService.sendNotification(configId, trigger.getSeverity(), transformedSubject, transformedMessage, notifsListener);
} catch (Exception e) {
log.error(String.format("Threat intel monitor %s: Failed to send notification to %s for trigger %s", monitor.getId(), action.getDestinationId(), trigger.getId()), e);
notifsListener.onFailure(new SecurityAnalyticsException("Failed to send notification", RestStatus.INTERNAL_SERVER_ERROR, e));
}
), trigger.getActions().size());

for (Action action : trigger.getActions()) {
try {
String transformedSubject = NotificationService.compileTemplate(ctx, action.getSubjectTemplate());
String transformedMessage = NotificationService.compileTemplate(ctx, action.getMessageTemplate());
String configId = action.getDestinationId();
notificationService.sendNotification(configId, trigger.getSeverity(), transformedSubject, transformedMessage, notifsListener);
} catch (Exception ex) {
log.error(String.format("Threat intel monitor %s: Failed to send notification to %s for trigger %s", monitor.getId(), action.getDestinationId(), trigger.getId()), ex);
notifsListener.onFailure(new SecurityAnalyticsException("Failed to send notification", RestStatus.INTERNAL_SERVER_ERROR, ex));
}

}
}
});

}
} else {
saveAlerts(new ArrayList<>(iocToUpdatedAlertsMap.values()),
newAlerts,
Expand Down Expand Up @@ -235,7 +237,7 @@ private GroupedActionListener<List<ThreatIntelAlert>> getGroupedListenerForAllTr
r -> {
List<ThreatIntelAlert> list = new ArrayList<>();
r.forEach(list::addAll);
triggerResultConsumer.accept(list, null); //todo change emptylist to actual response
triggerResultConsumer.accept(list, null);
}, e -> {
log.error(() -> new ParameterizedMessage(
"Threat intel monitor {} Failed to execute triggers {}", monitor.getId()),
Expand Down
10 changes: 10 additions & 0 deletions src/test/java/org/opensearch/securityanalytics/TestHelpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -900,6 +900,16 @@ public static Action randomAction(String destinationId) {
return new Action(name, destinationId, template, template, throttleEnabled, throttle, OpenSearchRestTestCase.randomAlphaOfLength(10), null);
}

public static Action randomThreatInteMonitorAction(String destinationId) {
String name = OpenSearchRestTestCase.randomUnicodeOfLength(10);
Script template = randomTemplateScript("Threat intel Monitor {{ctx.monitor.name}} just entered alert status. Please investigate the issue.\n" +
" - Trigger: {{ctx.trigger.name}}\n" +
" - Severity: {{ctx.trigger.severity}}", null);
Boolean throttleEnabled = false;
Throttle throttle = randomThrottle(null, null);
return new Action(name, destinationId, template, template, throttleEnabled, throttle, OpenSearchRestTestCase.randomAlphaOfLength(10), null);
}

public static Script randomTemplateScript(String source, Map<String, Object> params) {
if (params == null) {
params = new HashMap<>();
Expand Down
Loading
Loading