-
Notifications
You must be signed in to change notification settings - Fork 195
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'opensearch-project:main' into feature-httpsink-connection
- Loading branch information
Showing
62 changed files
with
2,858 additions
and
250 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
37 changes: 37 additions & 0 deletions
37
.../cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/buffer/Buffer.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.dataprepper.plugins.sink.buffer; | ||
|
||
import java.util.ArrayList; | ||
|
||
/** | ||
* Buffer that handles the temporary storage of | ||
* events. It isolates the implementation of system storage. | ||
* 1. Reads in a String. | ||
* 2. Transforms to Byte type. | ||
* 3. Returns a Byte type. | ||
*/ | ||
public interface Buffer { | ||
/** | ||
* Size of buffer in events. | ||
* @return int | ||
*/ | ||
int getEventCount(); | ||
|
||
/** | ||
* Size of buffer in bytes. | ||
* @return int | ||
*/ | ||
int getBufferSize(); | ||
|
||
void writeEvent(byte[] event); | ||
|
||
byte[] popEvent(); | ||
|
||
ArrayList<byte[]> getBufferedData(); | ||
|
||
void clearBuffer(); | ||
} |
14 changes: 14 additions & 0 deletions
14
...atch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/buffer/BufferFactory.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.dataprepper.plugins.sink.buffer; | ||
|
||
/** | ||
* BufferFactory will act as a means for decoupling the rest of | ||
* the code from the type of buffer being used. | ||
*/ | ||
public interface BufferFactory { | ||
Buffer getBuffer(); | ||
} |
50 changes: 50 additions & 0 deletions
50
...tch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/buffer/InMemoryBuffer.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.dataprepper.plugins.sink.buffer; | ||
|
||
import java.util.ArrayList; | ||
|
||
public class InMemoryBuffer implements Buffer { | ||
private final ArrayList<byte[]> eventsBuffered; | ||
private int bufferSize = 0; | ||
|
||
InMemoryBuffer() { | ||
eventsBuffered = new ArrayList<>(); | ||
} | ||
|
||
@Override | ||
public int getEventCount() { | ||
return eventsBuffered.size(); | ||
} | ||
|
||
@Override | ||
public int getBufferSize() { | ||
return bufferSize; | ||
} | ||
|
||
@Override | ||
public void writeEvent(final byte[] event) { | ||
eventsBuffered.add(event); | ||
bufferSize += event.length; | ||
} | ||
|
||
@Override | ||
public byte[] popEvent() { | ||
bufferSize -= eventsBuffered.get(0).length; | ||
return eventsBuffered.remove(0); | ||
} | ||
|
||
@Override | ||
public ArrayList<byte[]> getBufferedData() { | ||
return eventsBuffered; | ||
} | ||
|
||
@Override | ||
public void clearBuffer() { | ||
bufferSize = 0; | ||
eventsBuffered.clear(); | ||
} | ||
} |
13 changes: 13 additions & 0 deletions
13
...s/src/main/java/org/opensearch/dataprepper/plugins/sink/buffer/InMemoryBufferFactory.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.dataprepper.plugins.sink.buffer; | ||
|
||
public class InMemoryBufferFactory implements BufferFactory{ | ||
@Override | ||
public Buffer getBuffer() { | ||
return new InMemoryBuffer(); | ||
} | ||
} |
56 changes: 56 additions & 0 deletions
56
...main/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsClientFactory.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.dataprepper.plugins.sink.client; | ||
|
||
import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions; | ||
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; | ||
import org.opensearch.dataprepper.plugins.sink.config.AwsConfig; | ||
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; | ||
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; | ||
import software.amazon.awssdk.core.retry.RetryPolicy; | ||
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient; | ||
|
||
/** | ||
* CwlClientFactory is in charge of reading in | ||
* aws config parameters to return a working | ||
* client for interfacing with | ||
* CloudWatchLogs services. | ||
*/ | ||
public final class CloudWatchLogsClientFactory { | ||
|
||
/** | ||
* Generates a CloudWatchLogs Client based on STS role ARN system credentials. | ||
* @param awsConfig - AwsConfig specifying region, roles, and header overrides. | ||
* @param awsCredentialsSupplier - AwsCredentialsSupplier Interface for which to create CredentialsProvider for Client config. | ||
* @return CloudWatchLogsClient - used to interact with CloudWatch Logs services. | ||
*/ | ||
public static CloudWatchLogsClient createCwlClient(final AwsConfig awsConfig, final AwsCredentialsSupplier awsCredentialsSupplier) { | ||
final AwsCredentialsOptions awsCredentialsOptions = convertToCredentialOptions(awsConfig); | ||
final AwsCredentialsProvider awsCredentialsProvider = awsCredentialsSupplier.getProvider(awsCredentialsOptions); | ||
|
||
return CloudWatchLogsClient.builder() | ||
.region(awsConfig.getAwsRegion()) | ||
.credentialsProvider(awsCredentialsProvider) | ||
.overrideConfiguration(createOverrideConfiguration()).build(); | ||
} | ||
|
||
private static ClientOverrideConfiguration createOverrideConfiguration() { | ||
final RetryPolicy retryPolicy = RetryPolicy.builder().numRetries(AwsConfig.DEFAULT_CONNECTION_ATTEMPTS).build(); | ||
|
||
return ClientOverrideConfiguration.builder() | ||
.retryPolicy(retryPolicy) | ||
.build(); | ||
} | ||
|
||
private static AwsCredentialsOptions convertToCredentialOptions(final AwsConfig awsConfig) { | ||
return AwsCredentialsOptions.builder() | ||
.withRegion(awsConfig.getAwsRegion()) | ||
.withStsRoleArn(awsConfig.getAwsStsRoleArn()) | ||
.withStsExternalId(awsConfig.getAwsStsExternalId()) | ||
.withStsHeaderOverrides(awsConfig.getAwsStsHeaderOverrides()) | ||
.build(); | ||
} | ||
} |
5 changes: 5 additions & 0 deletions
5
...oudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/config/AwsConfig.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
97 changes: 97 additions & 0 deletions
97
...-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/threshold/ThresholdCheck.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.dataprepper.plugins.sink.threshold; | ||
/** | ||
* ThresholdCheck receives parameters for which to reference the | ||
* limits of a buffer and CloudWatchLogsClient before making a | ||
* PutLogEvent request to AWS. | ||
*/ | ||
public class ThresholdCheck { | ||
private final int batchSize; | ||
private final int maxEventSizeBytes; | ||
private final int maxRequestSizeBytes; | ||
private final long logSendInterval; | ||
|
||
public ThresholdCheck(final int batchSize, final int maxEventSizeBytes, final int maxRequestSizeBytes, final int logSendInterval) { | ||
this.batchSize = batchSize; | ||
this.maxEventSizeBytes = maxEventSizeBytes; | ||
this.maxRequestSizeBytes = maxRequestSizeBytes; | ||
this.logSendInterval = logSendInterval; | ||
} | ||
|
||
/** | ||
* Checks to see if we exceed any of the threshold conditions. | ||
* @param currentTime - (long) denoting the time in seconds. | ||
* @param currentRequestSize - size of request in bytes. | ||
* @param batchSize - size of batch in events. | ||
* @return boolean - true if we exceed the threshold events or false otherwise. | ||
*/ | ||
public boolean isGreaterThanThresholdReached(final long currentTime, final int currentRequestSize, final int batchSize) { | ||
return ((isGreaterThanBatchSize(batchSize) || isGreaterEqualToLogSendInterval(currentTime) | ||
|| isGreaterThanMaxRequestSize(currentRequestSize)) && (batchSize > 0)); | ||
} | ||
|
||
/** | ||
* Checks to see if we equal any of the threshold conditions. | ||
* @param currentRequestSize - size of request in bytes. | ||
* @param batchSize - size of batch in events. | ||
* @return boolean - true if we equal the threshold events or false otherwise. | ||
*/ | ||
public boolean isEqualToThresholdReached(final int currentRequestSize, final int batchSize) { | ||
return ((isEqualBatchSize(batchSize) || isEqualMaxRequestSize(currentRequestSize)) && (batchSize > 0)); | ||
} | ||
|
||
/** | ||
* Checks if the interval passed in is equal to or greater | ||
* than the threshold interval for sending PutLogEvents. | ||
* @param currentTimeSeconds int denoting seconds. | ||
* @return boolean - true if greater than or equal to logInterval, false otherwise. | ||
*/ | ||
private boolean isGreaterEqualToLogSendInterval(final long currentTimeSeconds) { | ||
return currentTimeSeconds >= logSendInterval; | ||
} | ||
|
||
/** | ||
* Determines if the event size is greater than the max event size. | ||
* @param eventSize int denoting size of event. | ||
* @return boolean - true if greater than MaxEventSize, false otherwise. | ||
*/ | ||
public boolean isGreaterThanMaxEventSize(final int eventSize) { | ||
return eventSize > maxEventSizeBytes; | ||
} | ||
|
||
/** | ||
* Checks if the request size is greater than or equal to the current size passed in. | ||
* @param currentRequestSize int denoting size of request(Sum of PutLogEvent messages). | ||
* @return boolean - true if greater than Max request size, smaller otherwise. | ||
*/ | ||
private boolean isGreaterThanMaxRequestSize(final int currentRequestSize) { | ||
return currentRequestSize > maxRequestSizeBytes; | ||
} | ||
|
||
/** | ||
* Checks if the current batch size is greater to the threshold | ||
* batch size. | ||
* @param batchSize int denoting the size of the batch of PutLogEvents. | ||
* @return boolean - true if greater, false otherwise. | ||
*/ | ||
private boolean isGreaterThanBatchSize(final int batchSize) { | ||
return batchSize > this.batchSize; | ||
} | ||
|
||
/** | ||
* Checks if the request size is greater than or equal to the current size passed in. | ||
* @param currentRequestSize int denoting size of request(Sum of PutLogEvent messages). | ||
* @return boolean - true if equal Max request size, smaller otherwise. | ||
*/ | ||
private boolean isEqualMaxRequestSize(final int currentRequestSize) { | ||
return currentRequestSize == maxRequestSizeBytes; | ||
} | ||
|
||
private boolean isEqualBatchSize(final int batchSize) { | ||
return batchSize == this.batchSize; | ||
} | ||
} |
21 changes: 21 additions & 0 deletions
21
...c/test/java/org/opensearch/dataprepper/plugins/sink/buffer/InMemoryBufferFactoryTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.dataprepper.plugins.sink.buffer; | ||
|
||
import org.junit.jupiter.api.Test; | ||
|
||
import static org.hamcrest.MatcherAssert.assertThat; | ||
import static org.hamcrest.Matchers.notNullValue; | ||
import static org.hamcrest.Matchers.typeCompatibleWith; | ||
|
||
public class InMemoryBufferFactoryTest { | ||
@Test | ||
void check_buffer_not_null() { | ||
Buffer buffer = new InMemoryBufferFactory().getBuffer(); | ||
assertThat(buffer, notNullValue()); | ||
assertThat(buffer.getClass(), typeCompatibleWith(Buffer.class)); | ||
} | ||
} |
Oops, something went wrong.