-
Notifications
You must be signed in to change notification settings - Fork 195
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
GitHub-Issue#2778: Added CloudWatchLogsSink (#3084)
GitHub-Issue#2778: Refactoring config files for CloudWatchLogs Sink (#4) --------- Signed-off-by: Taylor Gray <[email protected]> Signed-off-by: Marcos Gonzalez Mayedo <[email protected]> Signed-off-by: Marcos Gonzalez Mayedo <[email protected]> Co-authored-by: Taylor Gray <[email protected]> Co-authored-by: Marcos <[email protected]>
- Loading branch information
1 parent
3ab7831
commit 9cef0d5
Showing
27 changed files
with
284 additions
and
40 deletions.
There are no files selected for viewing
101 changes: 101 additions & 0 deletions
101
...main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSink.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs; | ||
|
||
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; | ||
import org.opensearch.dataprepper.metrics.PluginMetrics; | ||
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; | ||
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; | ||
import org.opensearch.dataprepper.model.configuration.PluginSetting; | ||
import org.opensearch.dataprepper.model.event.Event; | ||
import org.opensearch.dataprepper.model.record.Record; | ||
import org.opensearch.dataprepper.model.sink.AbstractSink; | ||
import org.opensearch.dataprepper.model.sink.Sink; | ||
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.buffer.Buffer; | ||
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.buffer.BufferFactory; | ||
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.buffer.InMemoryBufferFactory; | ||
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client.CloudWatchLogsDispatcher; | ||
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client.CloudWatchLogsMetrics; | ||
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client.CloudWatchLogsService; | ||
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client.CloudWatchLogsClientFactory; | ||
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.AwsConfig; | ||
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.CloudWatchLogsSinkConfig; | ||
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.ThresholdConfig; | ||
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.exception.InvalidBufferTypeException; | ||
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.utils.CloudWatchLogsLimits; | ||
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient; | ||
|
||
import java.util.Collection; | ||
import java.util.concurrent.Executor; | ||
import java.util.concurrent.Executors; | ||
|
||
@DataPrepperPlugin(name = "cloudwatch_logs", pluginType = Sink.class, pluginConfigurationType = CloudWatchLogsSinkConfig.class) | ||
public class CloudWatchLogsSink extends AbstractSink<Record<Event>> { | ||
private final CloudWatchLogsService cloudWatchLogsService; | ||
private volatile boolean isInitialized; | ||
@DataPrepperPluginConstructor | ||
public CloudWatchLogsSink(final PluginSetting pluginSetting, | ||
final PluginMetrics pluginMetrics, | ||
final CloudWatchLogsSinkConfig cloudWatchLogsSinkConfig, | ||
final AwsCredentialsSupplier awsCredentialsSupplier) { | ||
super(pluginSetting); | ||
|
||
AwsConfig awsConfig = cloudWatchLogsSinkConfig.getAwsConfig(); | ||
ThresholdConfig thresholdConfig = cloudWatchLogsSinkConfig.getThresholdConfig(); | ||
|
||
CloudWatchLogsMetrics cloudWatchLogsMetrics = new CloudWatchLogsMetrics(pluginMetrics); | ||
CloudWatchLogsLimits cloudWatchLogsLimits = new CloudWatchLogsLimits(thresholdConfig.getBatchSize(), | ||
thresholdConfig.getMaxEventSizeBytes(), | ||
thresholdConfig.getMaxRequestSize(),thresholdConfig.getLogSendInterval()); | ||
|
||
CloudWatchLogsClient cloudWatchLogsClient = CloudWatchLogsClientFactory.createCwlClient(awsConfig, awsCredentialsSupplier); | ||
|
||
BufferFactory bufferFactory = null; | ||
if (cloudWatchLogsSinkConfig.getBufferType().equals("in_memory")) { | ||
bufferFactory = new InMemoryBufferFactory(); | ||
} | ||
|
||
Executor executor = Executors.newCachedThreadPool(); | ||
|
||
CloudWatchLogsDispatcher cloudWatchLogsDispatcher = CloudWatchLogsDispatcher.builder() | ||
.cloudWatchLogsClient(cloudWatchLogsClient) | ||
.cloudWatchLogsMetrics(cloudWatchLogsMetrics) | ||
.logGroup(cloudWatchLogsSinkConfig.getLogGroup()) | ||
.logStream(cloudWatchLogsSinkConfig.getLogStream()) | ||
.backOffTimeBase(thresholdConfig.getBackOffTime()) | ||
.retryCount(thresholdConfig.getRetryCount()) | ||
.executor(executor) | ||
.build(); | ||
|
||
Buffer buffer; | ||
try { | ||
buffer = bufferFactory.getBuffer(); | ||
} catch (NullPointerException e) { | ||
throw new InvalidBufferTypeException("Error loading buffer!"); | ||
} | ||
|
||
cloudWatchLogsService = new CloudWatchLogsService(buffer, cloudWatchLogsLimits, cloudWatchLogsDispatcher); | ||
} | ||
|
||
@Override | ||
public void doInitialize() { | ||
isInitialized = Boolean.TRUE; | ||
} | ||
|
||
@Override | ||
public void doOutput(Collection<Record<Event>> records) { | ||
if (records.isEmpty()) { | ||
return; | ||
} | ||
|
||
cloudWatchLogsService.processLogEvents(records); | ||
} | ||
|
||
@Override | ||
public boolean isReady() { | ||
return isInitialized; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
7 changes: 7 additions & 0 deletions
7
...search/dataprepper/plugins/sink/cloudwatch_logs/exception/InvalidBufferTypeException.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.exception; | ||
|
||
public class InvalidBufferTypeException extends RuntimeException { | ||
public InvalidBufferTypeException(String message) { | ||
super(message); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
136 changes: 136 additions & 0 deletions
136
.../java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSinkTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,136 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs; | ||
|
||
import org.junit.jupiter.api.BeforeEach; | ||
import org.junit.jupiter.api.Test; | ||
import org.mockito.MockedStatic; | ||
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; | ||
import org.opensearch.dataprepper.metrics.PluginMetrics; | ||
import org.opensearch.dataprepper.model.configuration.PluginSetting; | ||
import org.opensearch.dataprepper.model.event.Event; | ||
import org.opensearch.dataprepper.model.event.JacksonEvent; | ||
import org.opensearch.dataprepper.model.record.Record; | ||
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client.CloudWatchLogsClientFactory; | ||
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client.CloudWatchLogsMetrics; | ||
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.AwsConfig; | ||
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.CloudWatchLogsSinkConfig; | ||
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.ThresholdConfig; | ||
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Collection; | ||
|
||
import static org.junit.jupiter.api.Assertions.assertTrue; | ||
import static org.mockito.ArgumentMatchers.any; | ||
import static org.mockito.Mockito.atLeast; | ||
import static org.mockito.Mockito.mock; | ||
import static org.mockito.Mockito.mockStatic; | ||
import static org.mockito.Mockito.spy; | ||
import static org.mockito.Mockito.times; | ||
import static org.mockito.Mockito.verify; | ||
import static org.mockito.Mockito.when; | ||
|
||
class CloudWatchLogsSinkTest { | ||
private PluginSetting mockPluginSetting; | ||
private PluginMetrics mockPluginMetrics; | ||
private CloudWatchLogsSinkConfig mockCloudWatchLogsSinkConfig; | ||
private AwsCredentialsSupplier mockCredentialSupplier; | ||
private AwsConfig mockAwsConfig; | ||
private ThresholdConfig thresholdConfig; | ||
private CloudWatchLogsMetrics mockCloudWatchLogsMetrics; | ||
private CloudWatchLogsClient mockClient; | ||
private static final String TEST_LOG_GROUP = "testLogGroup"; | ||
private static final String TEST_LOG_STREAM= "testLogStream"; | ||
private static final String TEST_PLUGIN_NAME = "testPluginName"; | ||
private static final String TEST_PIPELINE_NAME = "testPipelineName"; | ||
private static final String TEST_BUFFER_TYPE = "in_memory"; | ||
@BeforeEach | ||
void setUp() { | ||
mockPluginSetting = mock(PluginSetting.class); | ||
mockPluginMetrics = mock(PluginMetrics.class); | ||
mockCloudWatchLogsSinkConfig = mock(CloudWatchLogsSinkConfig.class); | ||
mockCredentialSupplier = mock(AwsCredentialsSupplier.class); | ||
mockAwsConfig = mock(AwsConfig.class); | ||
thresholdConfig = new ThresholdConfig(); | ||
mockCloudWatchLogsMetrics = mock(CloudWatchLogsMetrics.class); | ||
mockClient = mock(CloudWatchLogsClient.class); | ||
|
||
when(mockCloudWatchLogsSinkConfig.getAwsConfig()).thenReturn(mockAwsConfig); | ||
when(mockCloudWatchLogsSinkConfig.getThresholdConfig()).thenReturn(thresholdConfig); | ||
when(mockCloudWatchLogsSinkConfig.getLogGroup()).thenReturn(TEST_LOG_GROUP); | ||
when(mockCloudWatchLogsSinkConfig.getLogStream()).thenReturn(TEST_LOG_STREAM); | ||
when(mockCloudWatchLogsSinkConfig.getBufferType()).thenReturn(TEST_BUFFER_TYPE); | ||
|
||
when(mockPluginSetting.getName()).thenReturn(TEST_PLUGIN_NAME); | ||
when(mockPluginSetting.getPipelineName()).thenReturn(TEST_PIPELINE_NAME); | ||
} | ||
|
||
CloudWatchLogsSink getTestCloudWatchSink() { | ||
return new CloudWatchLogsSink(mockPluginSetting, mockPluginMetrics, mockCloudWatchLogsSinkConfig, | ||
mockCredentialSupplier); | ||
} | ||
|
||
Collection<Record<Event>> getMockedRecords() { | ||
Collection<Record<Event>> testCollection = new ArrayList<>(); | ||
Record<Event> mockedEvent = new Record<>(JacksonEvent.fromMessage("")); | ||
Record<Event> spyEvent = spy(mockedEvent); | ||
|
||
testCollection.add(spyEvent); | ||
|
||
return testCollection; | ||
} | ||
|
||
@Test | ||
void WHEN_sink_is_initialized_THEN_sink_is_ready_returns_true() { | ||
try(MockedStatic<CloudWatchLogsClientFactory> mockedStatic = mockStatic(CloudWatchLogsClientFactory.class)) { | ||
mockedStatic.when(() -> CloudWatchLogsClientFactory.createCwlClient(any(AwsConfig.class), | ||
any(AwsCredentialsSupplier.class))) | ||
.thenReturn(mockClient); | ||
|
||
CloudWatchLogsSink testCloudWatchSink = getTestCloudWatchSink(); | ||
testCloudWatchSink.doInitialize(); | ||
assertTrue(testCloudWatchSink.isReady()); | ||
} | ||
} | ||
|
||
@Test | ||
void WHEN_given_sample_empty_records_THEN_records_are_processed() { | ||
try(MockedStatic<CloudWatchLogsClientFactory> mockedStatic = mockStatic(CloudWatchLogsClientFactory.class)) { | ||
mockedStatic.when(() -> CloudWatchLogsClientFactory.createCwlClient(any(AwsConfig.class), | ||
any(AwsCredentialsSupplier.class))) | ||
.thenReturn(mockClient); | ||
|
||
CloudWatchLogsSink testCloudWatchSink = getTestCloudWatchSink(); | ||
testCloudWatchSink.doInitialize(); | ||
Collection<Record<Event>> spyEvents = getMockedRecords(); | ||
|
||
testCloudWatchSink.doOutput(spyEvents); | ||
|
||
for (Record<Event> spyEvent : spyEvents) { | ||
verify(spyEvent, atLeast(1)).getData(); | ||
} | ||
} | ||
} | ||
|
||
@Test | ||
void WHEN_given_sample_empty_records_THEN_records_are_not_processed() { | ||
try(MockedStatic<CloudWatchLogsClientFactory> mockedStatic = mockStatic(CloudWatchLogsClientFactory.class)) { | ||
mockedStatic.when(() -> CloudWatchLogsClientFactory.createCwlClient(any(AwsConfig.class), | ||
any(AwsCredentialsSupplier.class))) | ||
.thenReturn(mockClient); | ||
|
||
CloudWatchLogsSink testCloudWatchSink = getTestCloudWatchSink(); | ||
testCloudWatchSink.doInitialize(); | ||
Collection<Record<Event>> spyEvents = spy(ArrayList.class); | ||
|
||
assertTrue(spyEvents.isEmpty()); | ||
|
||
testCloudWatchSink.doOutput(spyEvents); | ||
verify(spyEvents, times(2)).isEmpty(); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.