Skip to content

Commit

Permalink
Merge branch 'opensearch-project:main' into sns-sink-plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
udaych20 authored Jul 12, 2023
2 parents 643d06c + decccb9 commit dce26a7
Show file tree
Hide file tree
Showing 26 changed files with 1,096 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,12 @@ public interface AcknowledgementSet {
* @since 2.2
*/
public boolean release(final EventHandle eventHandle, final boolean result);

/**
* Indicates that the addition of initial set of events to
* the acknowledgement set is completed.
* It is possible that more events are added to the set as the
* initial events are going through the pipeline line.
*/
public void complete();
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,18 @@ public void run() {
while (!isStopped) {
try {
final List<Record<Event>> records = inMemorySourceAccessor.read(testingKey);
if (records.size() == 0) {
Thread.sleep(1000);
continue;
}
AcknowledgementSet ackSet =
acknowledgementSetManager.create((result) ->
{
inMemorySourceAccessor.setAckReceived(result);
},
Duration.ofSeconds(15));
records.stream().forEach((record) -> { ackSet.add(record.getData()); });
ackSet.complete();
writeToBuffer(records);
} catch (final Exception ex) {
LOG.error("Error during source loop.", ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class DefaultAcknowledgementSet implements AcknowledgementSet {
private final Map<EventHandle, AtomicInteger> pendingAcknowledgments;
private Future<?> callbackFuture;
private final DefaultAcknowledgementSetMetrics metrics;
private boolean completed;

public DefaultAcknowledgementSet(final ExecutorService executor, final Consumer<Boolean> callback, final Duration expiryTime, final DefaultAcknowledgementSetMetrics metrics) {
this.callback = callback;
Expand All @@ -42,6 +43,7 @@ public DefaultAcknowledgementSet(final ExecutorService executor, final Consumer<
this.expiryTime = Instant.now().plusMillis(expiryTime.toMillis());
this.callbackFuture = null;
this.metrics = metrics;
this.completed = false;
pendingAcknowledgments = new HashMap<>();
lock = new ReentrantLock(true);
}
Expand Down Expand Up @@ -84,6 +86,8 @@ public boolean isDone() {
if (Instant.now().isAfter(expiryTime)) {
if (callbackFuture != null) {
callbackFuture.cancel(true);
callbackFuture = null;
LOG.warn("AcknowledgementSet expired");
}
metrics.increment(DefaultAcknowledgementSetMetrics.EXPIRED_METRIC_NAME);
return true;
Expand All @@ -98,6 +102,19 @@ public Instant getExpiryTime() {
return expiryTime;
}

@Override
public void complete() {
lock.lock();
try {
completed = true;
if (pendingAcknowledgments.size() == 0) {
callbackFuture = executor.submit(() -> callback.accept(this.result));
}
} finally {
lock.unlock();
}
}

@Override
public boolean release(final EventHandle eventHandle, final boolean result) {
lock.lock();
Expand All @@ -114,9 +131,11 @@ public boolean release(final EventHandle eventHandle, final boolean result) {
}
if (pendingAcknowledgments.get(eventHandle).decrementAndGet() == 0) {
pendingAcknowledgments.remove(eventHandle);
if (pendingAcknowledgments.size() == 0) {
if (completed && pendingAcknowledgments.size() == 0) {
callbackFuture = executor.submit(() -> callback.accept(this.result));
return true;
} else if (pendingAcknowledgments.size() == 0) {
LOG.warn("Acknowledgement set is not completed. Delaying callback until it is completed");
}
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ void setup() {
AcknowledgementSet acknowledgementSet1 = acknowledgementSetManager.create((flag) -> { result = flag; }, TEST_TIMEOUT_MS);
acknowledgementSet1.add(event1);
acknowledgementSet1.add(event2);
acknowledgementSet1.complete();
}

DefaultAcknowledgementSetManager createObjectUnderTest() {
Expand Down Expand Up @@ -98,6 +99,7 @@ void testMultipleAcknowledgementSets() throws InterruptedException {

AcknowledgementSet acknowledgementSet2 = acknowledgementSetManager.create((flag) -> { result = flag; }, TEST_TIMEOUT_MS);
acknowledgementSet2.add(event3);
acknowledgementSet2.complete();

acknowledgementSetManager.releaseEventReference(eventHandle2, true);
acknowledgementSetManager.releaseEventReference(eventHandle3, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ void setupEvent() {
@Test
void testDefaultAcknowledgementSetBasic() throws Exception {
defaultAcknowledgementSet.add(event);
defaultAcknowledgementSet.complete();
assertThat(handle, not(equalTo(null)));
assertThat(handle.getAcknowledgementSet(), equalTo(defaultAcknowledgementSet));
assertThat(defaultAcknowledgementSet.release(handle, true), equalTo(true));
Expand All @@ -97,6 +98,7 @@ void testDefaultAcknowledgementSetBasic() throws Exception {
@Test
void testDefaultAcknowledgementSetMultipleAcquireAndRelease() throws Exception {
defaultAcknowledgementSet.add(event);
defaultAcknowledgementSet.complete();
assertThat(handle, not(equalTo(null)));
assertThat(handle.getAcknowledgementSet(), equalTo(defaultAcknowledgementSet));
defaultAcknowledgementSet.acquire(handle);
Expand All @@ -111,6 +113,7 @@ void testDefaultAcknowledgementSetMultipleAcquireAndRelease() throws Exception {
@Test
void testDefaultAcknowledgementInvalidAcquire() {
defaultAcknowledgementSet.add(event);
defaultAcknowledgementSet.complete();
DefaultAcknowledgementSet secondAcknowledgementSet = createObjectUnderTest();
DefaultEventHandle handle2 = new DefaultEventHandle(secondAcknowledgementSet);
defaultAcknowledgementSet.acquire(handle2);
Expand All @@ -120,6 +123,7 @@ void testDefaultAcknowledgementInvalidAcquire() {
@Test
void testDefaultAcknowledgementInvalidRelease() {
defaultAcknowledgementSet.add(event);
defaultAcknowledgementSet.complete();
DefaultAcknowledgementSet secondAcknowledgementSet = createObjectUnderTest();
DefaultEventHandle handle2 = new DefaultEventHandle(secondAcknowledgementSet);
assertThat(defaultAcknowledgementSet.release(handle2, true), equalTo(false));
Expand All @@ -129,6 +133,7 @@ void testDefaultAcknowledgementInvalidRelease() {
@Test
void testDefaultAcknowledgementDuplicateReleaseError() throws Exception {
defaultAcknowledgementSet.add(event);
defaultAcknowledgementSet.complete();
assertThat(handle, not(equalTo(null)));
assertThat(handle.getAcknowledgementSet(), equalTo(defaultAcknowledgementSet));
assertThat(defaultAcknowledgementSet.release(handle, true), equalTo(true));
Expand All @@ -144,6 +149,7 @@ void testDefaultAcknowledgementSetWithCustomCallback() throws Exception {
}
);
defaultAcknowledgementSet.add(event);
defaultAcknowledgementSet.complete();
assertThat(handle, not(equalTo(null)));
assertThat(handle.getAcknowledgementSet(), equalTo(defaultAcknowledgementSet));
assertThat(defaultAcknowledgementSet.release(handle, true), equalTo(true));
Expand All @@ -162,6 +168,7 @@ void testDefaultAcknowledgementSetNegativeAcknowledgements() throws Exception {
}
);
defaultAcknowledgementSet.add(event);
defaultAcknowledgementSet.complete();
assertThat(handle, not(equalTo(null)));
assertThat(handle.getAcknowledgementSet(), equalTo(defaultAcknowledgementSet));
defaultAcknowledgementSet.acquire(handle);
Expand Down Expand Up @@ -190,6 +197,7 @@ void testDefaultAcknowledgementSetExpirations() throws Exception {
}
);
defaultAcknowledgementSet.add(event);
defaultAcknowledgementSet.complete();
assertThat(handle, not(equalTo(null)));
assertThat(handle.getAcknowledgementSet(), equalTo(defaultAcknowledgementSet));
assertThat(defaultAcknowledgementSet.release(handle, true), equalTo(true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ public <T> void consumeRecords() throws Exception {
if (!acknowledgementsEnabled) {
offsets.forEach((partition, offsetRange) ->
updateOffsetsToCommit(partition, new OffsetAndMetadata(offsetRange.getMaximum() + 1)));
} else {
acknowledgementSet.complete();
}
}
}
Expand Down
10 changes: 10 additions & 0 deletions data-prepper-plugins/opensearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ Default is null.

- `proxy`(optional): A String of the address of a forward HTTP proxy. The format is like "<host-name-or-ip>:\<port\>". Examples: "example.com:8100", "http://example.com:8100", "112.112.112.112:8100". Note: port number cannot be omitted.

- `enable_request_compression` (optional): A boolean that enables or disables request compression when sending requests to OpenSearch. Default is true.

- `index_type` (optional): a String from the list [`custom`, `trace-analytics-raw`, `trace-analytics-service-map`, `management_disabled`], which represents an index type. Defaults to `custom` if `serverless` is `false` in [AWS Configuration](#aws_configuration), otherwise defaults to `management_disabled`. This index_type instructs Sink plugin what type of data it is handling.

```
Expand Down Expand Up @@ -153,6 +155,14 @@ If not provided, the sink will try to push the data to OpenSearch server indefin
all the records received from the upstream prepper at a time will be sent as a single bulk request.
If a single record turns out to be larger than the set bulk size, it will be sent as a bulk request of a single document.

- `estimate_bulk_size_using_compression` (optional): A boolean dictating whether to compress the bulk requests when estimating
the size. This option is ignored if request compression is not enabled for the OpenSearch client. This is an experimental
feature and makes no guarantees about the accuracy of the estimation. Default is false.

- `max_local_compressions_for_estimation` (optional): An integer of the maximum number of times to compress a partially packed
bulk request when estimating its size. Bulk size accuracy increases with this value but performance degrades. This setting is experimental
and is ignored unless `estimate_bulk_size_using_compression` is enabled. Default is 2.

- `flush_timeout` (optional): A long of the millisecond duration to try packing a bulk request up to the bulk_size before flushing.
If this timeout expires before a bulk request has reached the bulk_size, the request will be flushed as-is. Set to -1 to disable
the flush timeout and instead flush whatever is present at the end of each batch. Default is 60,000, or one minute.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConstants;
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexType;
import org.apache.commons.lang3.RandomStringUtils;

import static org.mockito.Mockito.when;

import javax.ws.rs.HttpMethod;
Expand Down Expand Up @@ -215,16 +216,19 @@ public void testInstantiateSinkRawSpanReservedAliasAlreadyUsedAsIndex() throws I
RuntimeException.class, () -> sink.doInitialize());
}

@Test
public void testOutputRawSpanDefault() throws IOException, InterruptedException {
@ParameterizedTest
@CsvSource({"true,true", "true,false", "false,true", "false,false"})
public void testOutputRawSpanDefault(final boolean estimateBulkSizeUsingCompression,
final boolean isRequestCompressionEnabled) throws IOException, InterruptedException {
final String testDoc1 = readDocFromFile(DEFAULT_RAW_SPAN_FILE_1);
final String testDoc2 = readDocFromFile(DEFAULT_RAW_SPAN_FILE_2);
final ObjectMapper mapper = new ObjectMapper();
@SuppressWarnings("unchecked") final Map<String, Object> expData1 = mapper.readValue(testDoc1, Map.class);
@SuppressWarnings("unchecked") final Map<String, Object> expData2 = mapper.readValue(testDoc2, Map.class);

final List<Record<Event>> testRecords = Arrays.asList(jsonStringToRecord(testDoc1), jsonStringToRecord(testDoc2));
final PluginSetting pluginSetting = generatePluginSetting(IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null);
final PluginSetting pluginSetting = generatePluginSetting(IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null,
estimateBulkSizeUsingCompression, isRequestCompressionEnabled);
final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true);
sink.output(testRecords);

Expand Down Expand Up @@ -275,20 +279,24 @@ public void testOutputRawSpanDefault() throws IOException, InterruptedException
.add(OpenSearchSink.BULKREQUEST_SIZE_BYTES).toString());
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.size(), equalTo(3));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(0).getValue(), closeTo(1.0, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(773.0, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(773.0, 0));
final double expectedBulkRequestSizeBytes = isRequestCompressionEnabled && estimateBulkSizeUsingCompression ? 773.0 : 2058.0;
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(expectedBulkRequestSizeBytes, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(expectedBulkRequestSizeBytes, 0));
}

@Test
public void testOutputRawSpanWithDLQ() throws IOException, InterruptedException {
@ParameterizedTest
@CsvSource({"true,true", "true,false", "false,true", "false,false"})
public void testOutputRawSpanWithDLQ(final boolean estimateBulkSizeUsingCompression,
final boolean isRequestCompressionEnabled) throws IOException, InterruptedException {
// TODO: write test case
final String testDoc1 = readDocFromFile("raw-span-error.json");
final String testDoc2 = readDocFromFile(DEFAULT_RAW_SPAN_FILE_1);
final ObjectMapper mapper = new ObjectMapper();
@SuppressWarnings("unchecked") final Map<String, Object> expData = mapper.readValue(testDoc2, Map.class);

final List<Record<Event>> testRecords = Arrays.asList(jsonStringToRecord(testDoc1), jsonStringToRecord(testDoc2));
final PluginSetting pluginSetting = generatePluginSetting(IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null);
final PluginSetting pluginSetting = generatePluginSetting(IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null,
estimateBulkSizeUsingCompression, isRequestCompressionEnabled);
// generate temporary directory for dlq file
final File tempDirectory = Files.createTempDirectory("").toFile();
// add dlq file path into setting
Expand Down Expand Up @@ -331,8 +339,9 @@ public void testOutputRawSpanWithDLQ() throws IOException, InterruptedException
.add(OpenSearchSink.BULKREQUEST_SIZE_BYTES).toString());
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.size(), equalTo(3));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(0).getValue(), closeTo(1.0, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(1066.0, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(1066.0, 0));
final double expectedBulkRequestSizeBytes = isRequestCompressionEnabled && estimateBulkSizeUsingCompression ? 1066.0 : 2072.0;
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(expectedBulkRequestSizeBytes, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(expectedBulkRequestSizeBytes, 0));

}

Expand All @@ -355,14 +364,17 @@ public void testInstantiateSinkServiceMapDefault() throws IOException {
}
}

@Test
public void testOutputServiceMapDefault() throws IOException, InterruptedException {
@ParameterizedTest
@CsvSource({"true,true", "true,false", "false,true", "false,false"})
public void testOutputServiceMapDefault(final boolean estimateBulkSizeUsingCompression,
final boolean isRequestCompressionEnabled) throws IOException, InterruptedException {
final String testDoc = readDocFromFile(DEFAULT_SERVICE_MAP_FILE);
final ObjectMapper mapper = new ObjectMapper();
@SuppressWarnings("unchecked") final Map<String, Object> expData = mapper.readValue(testDoc, Map.class);

final List<Record<Event>> testRecords = Collections.singletonList(jsonStringToRecord(testDoc));
final PluginSetting pluginSetting = generatePluginSetting(IndexType.TRACE_ANALYTICS_SERVICE_MAP.getValue(), null, null);
final PluginSetting pluginSetting = generatePluginSetting(IndexType.TRACE_ANALYTICS_SERVICE_MAP.getValue(), null, null,
estimateBulkSizeUsingCompression, isRequestCompressionEnabled);
OpenSearchSink sink = createObjectUnderTest(pluginSetting, true);
sink.output(testRecords);
final String expIndexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_SERVICE_MAP);
Expand All @@ -388,8 +400,9 @@ public void testOutputServiceMapDefault() throws IOException, InterruptedExcepti
.add(OpenSearchSink.BULKREQUEST_SIZE_BYTES).toString());
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.size(), equalTo(3));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(0).getValue(), closeTo(1.0, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(366.0, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(366.0, 0));
final double expectedBulkRequestSizeBytes = isRequestCompressionEnabled && estimateBulkSizeUsingCompression ? 366.0 : 265.0;
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(expectedBulkRequestSizeBytes, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(expectedBulkRequestSizeBytes, 0));

// Check restart for index already exists
sink = createObjectUnderTest(pluginSetting, true);
Expand Down Expand Up @@ -884,6 +897,15 @@ private PluginSetting generatePluginSetting(final String indexType, final String
return generatePluginSettingByMetadata(metadata);
}

private PluginSetting generatePluginSetting(final String indexType, final String indexAlias,
final String templateFilePath, final boolean estimateBulkSizeUsingCompression,
final boolean requestCompressionEnabled) {
final Map<String, Object> metadata = initializeConfigurationMetadata(indexType, indexAlias, templateFilePath);
metadata.put(IndexConfiguration.ESTIMATE_BULK_SIZE_USING_COMPRESSION, estimateBulkSizeUsingCompression);
metadata.put(ConnectionConfiguration.REQUEST_COMPRESSION_ENABLED, requestCompressionEnabled);
return generatePluginSettingByMetadata(metadata);
}

private PluginSetting generatePluginSetting(final String indexType, final String indexAlias,
final String templateType,
final String templateFilePath) {
Expand Down
Loading

0 comments on commit dce26a7

Please sign in to comment.