Skip to content

Commit

Permalink
Merge branch 'opensearch-project:main' into feature-prometheus-sink-d…
Browse files Browse the repository at this point in the history
…raft
  • Loading branch information
mallikagogoi7 authored Aug 17, 2023
2 parents 7b5f1c4 + b7661e6 commit 3d3cca6
Show file tree
Hide file tree
Showing 109 changed files with 2,933 additions and 633 deletions.
4 changes: 2 additions & 2 deletions .github/pull_request_template.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
[Describe what this change achieves]

### Issues Resolved
[List any issues this PR will resolve]
Resolves #[Issue number to be closed when this PR is merged]

### Check List
- [ ] New functionality includes testing.
- [ ] New functionality has been documented.
- [ ] New functionality has a documentation issue. Please link to it in this PR.
- [ ] New functionality has javadoc added
- [ ] Commits are signed with a real name per the DCO

Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ subprojects {

configure(subprojects.findAll {it.name != 'data-prepper-api'}) {
dependencies {
implementation platform('software.amazon.awssdk:bom:2.17.264')
implementation platform('software.amazon.awssdk:bom:2.20.67')
implementation 'jakarta.validation:jakarta.validation-api:3.0.2'
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class InMemorySinkAccessor {
public List<Record<Event>> get(final String testingKey) {
lock.lock();
try {
return recordsMap.getOrDefault(testingKey, Collections.emptyList());
return new ArrayList<>(recordsMap.getOrDefault(testingKey, Collections.emptyList()));
} finally {
lock.unlock();
}
Expand All @@ -49,7 +49,7 @@ public List<Record<Event>> get(final String testingKey) {
public List<Record<Event>> getAndClear(final String testingKey) {
lock.lock();
try {
final List<Record<Event>> records = recordsMap.getOrDefault(testingKey, Collections.emptyList());
final List<Record<Event>> records = get(testingKey);

recordsMap.remove(testingKey);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class HeapCircuitBreaker implements InnerCircuitBreaker, AutoCloseable {
scheduledExecutorService
.scheduleAtFixedRate(this::checkMemory, 0L, checkInterval.toMillis(), TimeUnit.MILLISECONDS);

LOG.info("Heap circuit breaker with usage of {} bytes.", usageBytes);
LOG.info("Circuit breaker heap limit is set to {} bytes.", usageBytes);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ public synchronized void shutdown() {
stopRequested.set(true);
} catch (Exception ex) {
LOG.error("Pipeline [{}] - Encountered exception while stopping the source, " +
"proceeding with termination of process workers", name);
"proceeding with termination of process workers", name, ex);
}

shutdownExecutorService(processorExecutorService, processorShutdownTimeout.toMillis(), "processor");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ void testSimpleRelationalOperatorExpressionWithInValidLiteralType() {
@Test
void testSimpleRelationalOperatorExpressionWithJsonPointerTypeValidValue() {
final String testKey = "testKey";
final int testValue = random.nextInt(1000);
final int testValue = random.nextInt(1000) + 2;
final Map<String, Integer> data = Map.of(testKey, testValue);
final Event testEvent = createTestEvent(data);
final String greaterThanStatement = String.format(" /%s > %d", testKey, testValue - 1);
Expand All @@ -207,7 +207,7 @@ void testSimpleRelationalOperatorExpressionWithJsonPointerTypeValidValue() {
}

@Test
void testSimpleRelationalOperatorExpressionWithJsonPointerTypeInValidValue() {
void testSimpleRelationalOperatorExpressionWithJsonPointerTypeInValidValueWithPositiveInteger() {
final String testKey = "testKey";
final boolean testValue = true;
final Map<String, Boolean> data = Map.of(testKey, testValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.event.Event;
import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.plugins.hasher.IdentificationKeysHasher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -49,7 +50,7 @@ private AggregateActionSynchronizer(final AggregateAction aggregateAction, final
this.actionConcludeGroupEventsProcessingErrors = pluginMetrics.counter(ACTION_CONCLUDE_GROUP_EVENTS_PROCESSING_ERRORS);
}

AggregateActionOutput concludeGroup(final AggregateIdentificationKeysHasher.IdentificationKeysMap hash, final AggregateGroup aggregateGroup, final boolean forceConclude) {
AggregateActionOutput concludeGroup(final IdentificationKeysHasher.IdentificationKeysMap hash, final AggregateGroup aggregateGroup, final boolean forceConclude) {
final Lock concludeGroupLock = aggregateGroup.getConcludeGroupLock();
final Lock handleEventForGroupLock = aggregateGroup.getHandleEventForGroupLock();

Expand All @@ -74,7 +75,7 @@ AggregateActionOutput concludeGroup(final AggregateIdentificationKeysHasher.Iden
return actionOutput;
}

AggregateActionResponse handleEventForGroup(final Event event, final AggregateIdentificationKeysHasher.IdentificationKeysMap hash, final AggregateGroup aggregateGroup) {
AggregateActionResponse handleEventForGroup(final Event event, final IdentificationKeysHasher.IdentificationKeysMap hash, final AggregateGroup aggregateGroup) {
final Lock concludeGroupLock = aggregateGroup.getConcludeGroupLock();
final Lock handleEventForGroupLock = aggregateGroup.getHandleEventForGroupLock();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.plugins.processor.aggregate;

import com.google.common.collect.Maps;
import org.opensearch.dataprepper.plugins.hasher.IdentificationKeysHasher;

import java.time.Duration;
import java.util.ArrayList;
Expand All @@ -14,33 +15,33 @@

class AggregateGroupManager {

private final Map<AggregateIdentificationKeysHasher.IdentificationKeysMap, AggregateGroup> allGroups = Maps.newConcurrentMap();
private final Map<IdentificationKeysHasher.IdentificationKeysMap, AggregateGroup> allGroups = Maps.newConcurrentMap();
private final Duration groupDuration;

AggregateGroupManager(final Duration groupDuration) {
this.groupDuration = groupDuration;
}

AggregateGroup getAggregateGroup(final AggregateIdentificationKeysHasher.IdentificationKeysMap identificationKeysMap) {
AggregateGroup getAggregateGroup(final IdentificationKeysHasher.IdentificationKeysMap identificationKeysMap) {
return allGroups.computeIfAbsent(identificationKeysMap, (hash) -> new AggregateGroup(identificationKeysMap.getKeyMap()));
}

List<Map.Entry<AggregateIdentificationKeysHasher.IdentificationKeysMap, AggregateGroup>> getGroupsToConclude(final boolean forceConclude) {
final List<Map.Entry<AggregateIdentificationKeysHasher.IdentificationKeysMap, AggregateGroup>> groupsToConclude = new ArrayList<>();
for (final Map.Entry<AggregateIdentificationKeysHasher.IdentificationKeysMap, AggregateGroup> groupEntry : allGroups.entrySet()) {
List<Map.Entry<IdentificationKeysHasher.IdentificationKeysMap, AggregateGroup>> getGroupsToConclude(final boolean forceConclude) {
final List<Map.Entry<IdentificationKeysHasher.IdentificationKeysMap, AggregateGroup>> groupsToConclude = new ArrayList<>();
for (final Map.Entry<IdentificationKeysHasher.IdentificationKeysMap, AggregateGroup> groupEntry : allGroups.entrySet()) {
if (groupEntry.getValue().shouldConcludeGroup(groupDuration) || forceConclude) {
groupsToConclude.add(groupEntry);
}
}
return groupsToConclude;
}

void closeGroup(final AggregateIdentificationKeysHasher.IdentificationKeysMap hashKeyMap, final AggregateGroup group) {
void closeGroup(final IdentificationKeysHasher.IdentificationKeysMap hashKeyMap, final AggregateGroup group) {
allGroups.remove(hashKeyMap, group);
group.resetGroup();
}

void putGroupWithHash(final AggregateIdentificationKeysHasher.IdentificationKeysMap hashKeyMap, final AggregateGroup group) {
void putGroupWithHash(final IdentificationKeysHasher.IdentificationKeysMap hashKeyMap, final AggregateGroup group) {
allGroups.put(hashKeyMap, group);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.plugins.hasher.IdentificationKeysHasher;

import java.util.Collection;
import java.util.LinkedList;
Expand All @@ -41,7 +42,7 @@ public class AggregateProcessor extends AbstractProcessor<Record<Event>, Record<
private final AggregateProcessorConfig aggregateProcessorConfig;
private final AggregateGroupManager aggregateGroupManager;
private final AggregateActionSynchronizer aggregateActionSynchronizer;
private final AggregateIdentificationKeysHasher aggregateIdentificationKeysHasher;
private final IdentificationKeysHasher identificationKeysHasher;
private final AggregateAction aggregateAction;

private boolean forceConclude = false;
Expand All @@ -51,15 +52,15 @@ public class AggregateProcessor extends AbstractProcessor<Record<Event>, Record<
@DataPrepperPluginConstructor
public AggregateProcessor(final AggregateProcessorConfig aggregateProcessorConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory, final ExpressionEvaluator expressionEvaluator) {
this(aggregateProcessorConfig, pluginMetrics, pluginFactory, new AggregateGroupManager(aggregateProcessorConfig.getGroupDuration()),
new AggregateIdentificationKeysHasher(aggregateProcessorConfig.getIdentificationKeys()), new AggregateActionSynchronizer.AggregateActionSynchronizerProvider(), expressionEvaluator);
new IdentificationKeysHasher(aggregateProcessorConfig.getIdentificationKeys()), new AggregateActionSynchronizer.AggregateActionSynchronizerProvider(), expressionEvaluator);
}
public AggregateProcessor(final AggregateProcessorConfig aggregateProcessorConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory, final AggregateGroupManager aggregateGroupManager,
final AggregateIdentificationKeysHasher aggregateIdentificationKeysHasher, final AggregateActionSynchronizer.AggregateActionSynchronizerProvider aggregateActionSynchronizerProvider, final ExpressionEvaluator expressionEvaluator) {
final IdentificationKeysHasher identificationKeysHasher, final AggregateActionSynchronizer.AggregateActionSynchronizerProvider aggregateActionSynchronizerProvider, final ExpressionEvaluator expressionEvaluator) {
super(pluginMetrics);
this.aggregateProcessorConfig = aggregateProcessorConfig;
this.aggregateGroupManager = aggregateGroupManager;
this.expressionEvaluator = expressionEvaluator;
this.aggregateIdentificationKeysHasher = aggregateIdentificationKeysHasher;
this.identificationKeysHasher = identificationKeysHasher;
this.aggregateAction = loadAggregateAction(pluginFactory);
this.aggregateActionSynchronizer = aggregateActionSynchronizerProvider.provide(aggregateAction, aggregateGroupManager, pluginMetrics);

Expand All @@ -82,8 +83,8 @@ private AggregateAction loadAggregateAction(final PluginFactory pluginFactory) {
public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
final List<Record<Event>> recordsOut = new LinkedList<>();

final List<Map.Entry<AggregateIdentificationKeysHasher.IdentificationKeysMap, AggregateGroup>> groupsToConclude = aggregateGroupManager.getGroupsToConclude(forceConclude);
for (final Map.Entry<AggregateIdentificationKeysHasher.IdentificationKeysMap, AggregateGroup> groupEntry : groupsToConclude) {
final List<Map.Entry<IdentificationKeysHasher.IdentificationKeysMap, AggregateGroup>> groupsToConclude = aggregateGroupManager.getGroupsToConclude(forceConclude);
for (final Map.Entry<IdentificationKeysHasher.IdentificationKeysMap, AggregateGroup> groupEntry : groupsToConclude) {
final AggregateActionOutput actionOutput = aggregateActionSynchronizer.concludeGroup(groupEntry.getKey(), groupEntry.getValue(), forceConclude);

final List<Event> concludeGroupEvents = actionOutput != null ? actionOutput.getEvents() : null;
Expand All @@ -105,7 +106,7 @@ public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
handleEventsDropped++;
continue;
}
final AggregateIdentificationKeysHasher.IdentificationKeysMap identificationKeysMap = aggregateIdentificationKeysHasher.createIdentificationKeysMapFromEvent(event);
final IdentificationKeysHasher.IdentificationKeysMap identificationKeysMap = identificationKeysHasher.createIdentificationKeysMapFromEvent(event);
final AggregateGroup aggregateGroupForEvent = aggregateGroupManager.getAggregateGroup(identificationKeysMap);

final AggregateActionResponse handleEventResponse = aggregateActionSynchronizer.handleEventForGroup(event, identificationKeysMap, aggregateGroupForEvent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.opensearch.dataprepper.plugins.hasher.IdentificationKeysHasher;

import java.time.Duration;
import java.util.List;
Expand Down Expand Up @@ -47,7 +48,7 @@ public class AggregateActionSynchronizerTest {
private AggregateGroup aggregateGroup;

@Mock
private AggregateIdentificationKeysHasher.IdentificationKeysMap identificationKeysMap;
private IdentificationKeysHasher.IdentificationKeysMap identificationKeysMap;

@Mock
private AggregateActionResponse aggregateActionResponse;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.plugins.hasher.IdentificationKeysHasher;

import java.time.Duration;
import java.util.Collections;
Expand All @@ -28,7 +29,7 @@ public class AggregateGroupManagerTest {

private AggregateGroupManager aggregateGroupManager;

private AggregateIdentificationKeysHasher.IdentificationKeysMap identificationKeysMap;
private IdentificationKeysHasher.IdentificationKeysMap identificationKeysMap;

private static final Duration TEST_GROUP_DURATION = Duration.ofSeconds(new Random().nextInt(10) + 10);

Expand All @@ -37,7 +38,7 @@ void setup() {
final Map<Object, Object> identificationKeysHash = new HashMap<>();
identificationKeysHash.put(UUID.randomUUID().toString(), UUID.randomUUID().toString());

identificationKeysMap = new AggregateIdentificationKeysHasher.IdentificationKeysMap(identificationKeysHash);
identificationKeysMap = new IdentificationKeysHasher.IdentificationKeysMap(identificationKeysHash);
}

private AggregateGroupManager createObjectUnderTest() {
Expand Down Expand Up @@ -92,16 +93,16 @@ void getGroupsToConclude_returns_correct_group() {

final AggregateGroup groupToConclude = mock(AggregateGroup.class);
when(groupToConclude.shouldConcludeGroup(TEST_GROUP_DURATION)).thenReturn(true);
final AggregateIdentificationKeysHasher.IdentificationKeysMap hashForGroupToConclude = mock(AggregateIdentificationKeysHasher.IdentificationKeysMap.class);
final IdentificationKeysHasher.IdentificationKeysMap hashForGroupToConclude = mock(IdentificationKeysHasher.IdentificationKeysMap.class);

final AggregateGroup groupToNotConclude = mock(AggregateGroup.class);
when(groupToNotConclude.shouldConcludeGroup(TEST_GROUP_DURATION)).thenReturn(false);
final AggregateIdentificationKeysHasher.IdentificationKeysMap hashForGroupToNotConclude = mock(AggregateIdentificationKeysHasher.IdentificationKeysMap.class);
final IdentificationKeysHasher.IdentificationKeysMap hashForGroupToNotConclude = mock(IdentificationKeysHasher.IdentificationKeysMap.class);

aggregateGroupManager.putGroupWithHash(hashForGroupToConclude, groupToConclude);
aggregateGroupManager.putGroupWithHash(hashForGroupToNotConclude, groupToNotConclude);

final List<Map.Entry<AggregateIdentificationKeysHasher.IdentificationKeysMap, AggregateGroup>> groupsToConclude = aggregateGroupManager.getGroupsToConclude(false);
final List<Map.Entry<IdentificationKeysHasher.IdentificationKeysMap, AggregateGroup>> groupsToConclude = aggregateGroupManager.getGroupsToConclude(false);

assertThat(groupsToConclude.size(), equalTo(1));
assertThat(groupsToConclude.get(0), notNullValue());
Expand All @@ -114,15 +115,15 @@ void getGroupsToConclude_with_force_conclude_return_all() {
aggregateGroupManager = createObjectUnderTest();

final AggregateGroup groupToConclude1 = mock(AggregateGroup.class);
final AggregateIdentificationKeysHasher.IdentificationKeysMap hashForGroupToConclude1 = mock(AggregateIdentificationKeysHasher.IdentificationKeysMap.class);
final IdentificationKeysHasher.IdentificationKeysMap hashForGroupToConclude1 = mock(IdentificationKeysHasher.IdentificationKeysMap.class);

final AggregateGroup groupToConclude2 = mock(AggregateGroup.class);
final AggregateIdentificationKeysHasher.IdentificationKeysMap hashForGroupToConclude2 = mock(AggregateIdentificationKeysHasher.IdentificationKeysMap.class);
final IdentificationKeysHasher.IdentificationKeysMap hashForGroupToConclude2 = mock(IdentificationKeysHasher.IdentificationKeysMap.class);

aggregateGroupManager.putGroupWithHash(hashForGroupToConclude1, groupToConclude1);
aggregateGroupManager.putGroupWithHash(hashForGroupToConclude2, groupToConclude2);

final List<Map.Entry<AggregateIdentificationKeysHasher.IdentificationKeysMap, AggregateGroup>> groupsToConclude = aggregateGroupManager.getGroupsToConclude(true);
final List<Map.Entry<IdentificationKeysHasher.IdentificationKeysMap, AggregateGroup>> groupsToConclude = aggregateGroupManager.getGroupsToConclude(true);

assertThat(groupsToConclude.size(), equalTo(2));
assertThat(groupsToConclude.get(0), notNullValue());
Expand Down
Loading

0 comments on commit 3d3cca6

Please sign in to comment.