From 4ec407f703b707880ac5ba8f094ee30f12d7769f Mon Sep 17 00:00:00 2001 From: venkataraopasyavula Date: Fri, 18 Aug 2023 20:25:04 +0530 Subject: [PATCH] GitHub-issue#1994 : Implementation Of Cloudwatch metrics source plugin configuration Junit test cases and source coordinator. Signed-off-by: venkataraopasyavula --- .../cloudwatch-metrics-source/build.gradle | 3 +- .../source/CloudwatchMetricsSource.java | 117 +++++-------- .../source/CloudwatchMetricsWorker.java | 146 +++++++++++++++- .../CloudwatchPartitionCreationSupplier.java | 31 ++++ .../source/CloudwatchSourceProgressState.java | 16 ++ .../source/AwsAuthenticationAdapterTest.java | 86 ++++++++++ .../CloudwatchMetricsSourceConfigTest.java | 74 +++++++++ .../source/CloudwatchMetricsSourceTest.java | 86 ++++++++++ .../source/CloudwatchMetricsWorkerTest.java | 157 ++++++++++++++++++ .../AwsAuthenticationOptionsTest.java | 53 ++++++ .../org.mockito.plugins.MockMaker | 3 + 11 files changed, 698 insertions(+), 74 deletions(-) create mode 100644 data-prepper-plugins/cloudwatch-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/CloudwatchPartitionCreationSupplier.java create mode 100644 data-prepper-plugins/cloudwatch-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/CloudwatchSourceProgressState.java create mode 100644 data-prepper-plugins/cloudwatch-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/AwsAuthenticationAdapterTest.java create mode 100644 data-prepper-plugins/cloudwatch-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/CloudwatchMetricsSourceConfigTest.java create mode 100644 data-prepper-plugins/cloudwatch-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/CloudwatchMetricsSourceTest.java create mode 100644 data-prepper-plugins/cloudwatch-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/CloudwatchMetricsWorkerTest.java create mode 100644 data-prepper-plugins/cloudwatch-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/configuration/AwsAuthenticationOptionsTest.java create mode 100644 data-prepper-plugins/cloudwatch-metrics-source/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker diff --git a/data-prepper-plugins/cloudwatch-metrics-source/build.gradle b/data-prepper-plugins/cloudwatch-metrics-source/build.gradle index 3e6f8876a0..41f542f2a6 100644 --- a/data-prepper-plugins/cloudwatch-metrics-source/build.gradle +++ b/data-prepper-plugins/cloudwatch-metrics-source/build.gradle @@ -13,7 +13,8 @@ dependencies { implementation 'org.hibernate.validator:hibernate-validator:7.0.5.Final' implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-csv' implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.15.2' - + testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml' + testImplementation project(':data-prepper-plugins:blocking-buffer') } test { diff --git a/data-prepper-plugins/cloudwatch-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/CloudwatchMetricsSource.java b/data-prepper-plugins/cloudwatch-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/CloudwatchMetricsSource.java index 69a472cb69..9e108bc499 100644 --- a/data-prepper-plugins/cloudwatch-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/CloudwatchMetricsSource.java +++ b/data-prepper-plugins/cloudwatch-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/CloudwatchMetricsSource.java @@ -2,12 +2,8 @@ * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 */ - package org.opensearch.dataprepper.plugins.source; -import org.opensearch.dataprepper.plugins.source.configuration.NamespacesListConfig; -import org.opensearch.dataprepper.plugins.source.configuration.MetricDataQueriesConfig; -import org.opensearch.dataprepper.plugins.source.configuration.DimensionsListConfig; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; import org.opensearch.dataprepper.buffer.common.BufferAccumulator; import org.opensearch.dataprepper.metrics.PluginMetrics; @@ -21,37 +17,43 @@ import java.time.Duration; import java.util.Collection; +import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.UsesSourceCoordination; import software.amazon.awssdk.services.cloudwatch.model.Dimension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.services.cloudwatch.CloudWatchClient; -import software.amazon.awssdk.services.cloudwatch.model.Metric; -import software.amazon.awssdk.services.cloudwatch.model.MetricStat; -import software.amazon.awssdk.services.cloudwatch.model.MetricDataQuery; -import software.amazon.awssdk.services.cloudwatch.model.GetMetricDataRequest; -import software.amazon.awssdk.services.cloudwatch.model.GetMetricDataResponse; -import software.amazon.awssdk.services.cloudwatch.model.MetricDataResult; -import software.amazon.awssdk.services.cloudwatch.model.CloudWatchException; -import software.amazon.awssdk.services.cloudwatch.model.ScanBy; -import java.time.Instant; -import java.time.format.DateTimeParseException; import java.util.ArrayList; -import java.util.List; +import java.util.Objects; /** * An implementation of cloudwatch metrics source class to Scrape the metrics using GetMetricData API */ @DataPrepperPlugin(name = "cloudwatch", pluginType = Source.class, pluginConfigurationType = CloudwatchMetricsSourceConfig.class) -public class CloudwatchMetricsSource implements Source> { +public class CloudwatchMetricsSource implements Source>, UsesSourceCoordination { private static final Logger LOG = LoggerFactory.getLogger(CloudwatchMetricsSource.class); + private final Collection dimensionCollection; - private final CloudwatchMetricsWorker cloudwatchMetricsWorker; + + private final PluginMetrics pluginMetrics; + + private CloudwatchMetricsWorker cloudwatchMetricsWorker; + private final CloudwatchMetricsSourceConfig cloudwatchMetricsSourceConfig; + private final AwsCredentialsSupplier awsCredentialsSupplier; + private static final Duration BUFFER_TIMEOUT = Duration.ofSeconds(30); + + private Thread cloudwatchMetricsWorkerThread; + + private SourceCoordinator sourceCoordinator; + + + @DataPrepperPluginConstructor public CloudwatchMetricsSource( final PluginMetrics pluginMetrics, @@ -59,8 +61,9 @@ public CloudwatchMetricsSource( final AwsCredentialsSupplier awsCredentialsSupplier) { this.cloudwatchMetricsSourceConfig = cloudwatchMetricsSourceConfig; this.awsCredentialsSupplier = awsCredentialsSupplier; - cloudwatchMetricsWorker = new CloudwatchMetricsWorker(); - dimensionCollection = new ArrayList<>(); + this.dimensionCollection = new ArrayList<>(); + this.pluginMetrics = pluginMetrics; + } @Override @@ -84,60 +87,32 @@ public void start(Buffer> buffer) { .credentialsProvider(credentialsProvider) .build(); - for (NamespacesListConfig namespacesListConfig : cloudwatchMetricsSourceConfig.getNamespacesListConfig()) { - dimensionCollection.clear(); - for (MetricDataQueriesConfig metricDataQueriesConfig : namespacesListConfig.getNamespaceConfig().getMetricDataQueriesConfig()) { - - for (DimensionsListConfig dimensionsListConfig : metricDataQueriesConfig.getMetricsConfig().getDimensionsListConfigs()) { - - dimensionCollection.add(Dimension.builder() - .name(dimensionsListConfig.getDimensionConfig().getName()) - .value(dimensionsListConfig.getDimensionConfig().getValue()).build()); - } - try { - - Metric met = Metric.builder() - .metricName(metricDataQueriesConfig.getMetricsConfig().getName()) - .namespace(namespacesListConfig.getNamespaceConfig().getName()) - .dimensions(dimensionCollection) - .build(); - - MetricStat metStat = MetricStat.builder() - .stat(metricDataQueriesConfig.getMetricsConfig().getStat()) - .unit(metricDataQueriesConfig.getMetricsConfig().getUnit()) - .period(metricDataQueriesConfig.getMetricsConfig().getPeriod()) - .metric(met) - .build(); - - MetricDataQuery dataQUery = MetricDataQuery.builder() - .metricStat(metStat) - .id(metricDataQueriesConfig.getMetricsConfig().getId()) - .returnData(true) - .build(); - - List dataQueries = new ArrayList<>(); - dataQueries.add(dataQUery); - - GetMetricDataRequest getMetReq = GetMetricDataRequest.builder() - .maxDatapoints(10) - .scanBy(ScanBy.TIMESTAMP_DESCENDING) - .startTime(Instant.parse(namespacesListConfig.getNamespaceConfig().getStartTime())) - .endTime(Instant.parse(namespacesListConfig.getNamespaceConfig().getEndTime())) - .metricDataQueries(dataQueries) - .build(); - - GetMetricDataResponse response = cloudWatchClient.getMetricData(getMetReq); - List data = response.metricDataResults(); - cloudwatchMetricsWorker.writeToBuffer(data, bufferAccumulator, null); - - } catch (CloudWatchException | DateTimeParseException ex) { - LOG.error("Exception Occurred while scraping the metrics {0}", ex); - } - } - } + cloudwatchMetricsWorkerThread = new Thread(new CloudwatchMetricsWorker(cloudWatchClient, + bufferAccumulator, + cloudwatchMetricsSourceConfig, + dimensionCollection, + pluginMetrics, + sourceCoordinator)); + + cloudwatchMetricsWorkerThread.start(); } + @Override public void stop() { - LOG.info("Stopped Cloudwatch source."); + cloudwatchMetricsWorkerThread.interrupt(); + if (Objects.nonNull(sourceCoordinator)) { + sourceCoordinator.giveUpPartitions(); + } + } + + @Override + public void setSourceCoordinator(SourceCoordinator sourceCoordinator) { + this.sourceCoordinator = (SourceCoordinator) sourceCoordinator; + sourceCoordinator.initialize(); + } + + @Override + public Class getPartitionProgressStateClass() { + return CloudwatchSourceProgressState.class; } } diff --git a/data-prepper-plugins/cloudwatch-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/CloudwatchMetricsWorker.java b/data-prepper-plugins/cloudwatch-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/CloudwatchMetricsWorker.java index 35b235a82d..6341268b95 100644 --- a/data-prepper-plugins/cloudwatch-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/CloudwatchMetricsWorker.java +++ b/data-prepper-plugins/cloudwatch-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/CloudwatchMetricsWorker.java @@ -2,20 +2,46 @@ * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 */ - package org.opensearch.dataprepper.plugins.source; import org.opensearch.dataprepper.buffer.common.BufferAccumulator; +import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.source.coordinator.PartitionIdentifier; +import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.SourcePartition; +import org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionNotFoundException; +import org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionNotOwnedException; +import org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionUpdateException; +import org.opensearch.dataprepper.plugins.source.configuration.DimensionsListConfig; +import org.opensearch.dataprepper.plugins.source.configuration.MetricDataQueriesConfig; +import org.opensearch.dataprepper.plugins.source.configuration.NamespacesListConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.cloudwatch.CloudWatchClient; +import software.amazon.awssdk.services.cloudwatch.model.CloudWatchException; +import software.amazon.awssdk.services.cloudwatch.model.Dimension; +import software.amazon.awssdk.services.cloudwatch.model.GetMetricDataRequest; +import software.amazon.awssdk.services.cloudwatch.model.GetMetricDataResponse; +import software.amazon.awssdk.services.cloudwatch.model.Metric; +import software.amazon.awssdk.services.cloudwatch.model.MetricDataQuery; import software.amazon.awssdk.services.cloudwatch.model.MetricDataResult; +import software.amazon.awssdk.services.cloudwatch.model.MetricStat; +import software.amazon.awssdk.services.cloudwatch.model.ScanBy; +import java.time.Instant; +import java.time.format.DateTimeParseException; +import java.util.ArrayList; +import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; /** * implements the CloudwatchMetricsWorker to read and metrics data message and push to buffer. @@ -23,10 +49,121 @@ /** * An implementation of cloudwatch metrics source worker class to write the metric to Buffer */ -public class CloudwatchMetricsWorker { +public class CloudwatchMetricsWorker implements Runnable{ private static final Logger LOG = LoggerFactory.getLogger(CloudwatchMetricsWorker.class); + private static final int STANDARD_BACKOFF_MILLIS = 30_000; + + private final BufferAccumulator> bufferAccumulator; + + private final CloudWatchClient cloudWatchClient; + + private final CloudwatchMetricsSourceConfig cloudwatchMetricsSourceConfig; + + private final Collection dimensionCollection; + + private final PluginMetrics pluginMetrics; + + private final Function, List> partitionCreationSupplier; + + private final SourceCoordinator sourceCoordinator; + + public CloudwatchMetricsWorker(final CloudWatchClient cloudWatchClient, + final BufferAccumulator> bufferAccumulator, + final CloudwatchMetricsSourceConfig cloudwatchMetricsSourceConfig, + final Collection dimensionCollection, + final PluginMetrics pluginMetrics, + final SourceCoordinator sourceCoordinator) { + this.bufferAccumulator = bufferAccumulator; + this.cloudWatchClient = cloudWatchClient; + this.cloudwatchMetricsSourceConfig = cloudwatchMetricsSourceConfig; + this.dimensionCollection = dimensionCollection; + this.pluginMetrics = pluginMetrics; + this.sourceCoordinator = sourceCoordinator; + final List namespaceMetricNames = cloudwatchMetricsSourceConfig.getNamespacesListConfig().stream() + .map(NamespacesListConfig::getNamespaceConfig).flatMap(namespaceConfig -> namespaceConfig.getMetricDataQueriesConfig() + .stream().map(metricName -> namespaceConfig.getName() + "@" + metricName.getMetricsConfig().getName())).collect(Collectors.toList()); + this.partitionCreationSupplier = new CloudwatchPartitionCreationSupplier(namespaceMetricNames); + } + + void startProcessingCloudwatchMetrics(final int waitTimeMillis){ + final Optional> objectToProcess = sourceCoordinator.getNextPartition(partitionCreationSupplier); + + if (objectToProcess.isEmpty()) { + try { + Thread.sleep(waitTimeMillis); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return; + } + final String nameSpaceName = objectToProcess.get().getPartitionKey().split("@")[0]; + final String metricName = objectToProcess.get().getPartitionKey().split("@")[1]; + try { + processCloudwatchMetrics(nameSpaceName,metricName); + sourceCoordinator.completePartition(objectToProcess.get().getPartitionKey()); + } catch (final PartitionNotOwnedException | PartitionNotFoundException | PartitionUpdateException e) { + LOG.warn("Cloud watch source received an exception from the source coordinator. There is a potential for duplicate data from {}, giving up partition and getting next partition: {}",objectToProcess.get().getPartitionKey() , e.getMessage()); + sourceCoordinator.giveUpPartitions(); + } + } + + protected void processCloudwatchMetrics(final String nameSpaceName, final String metricName) { + for(NamespacesListConfig namespacesListConfig : cloudwatchMetricsSourceConfig.getNamespacesListConfig().stream() + .filter(obj -> obj.getNamespaceConfig().getName().equals(nameSpaceName)).collect(Collectors.toList())) { + dimensionCollection.clear(); + for (MetricDataQueriesConfig metricDataQueriesConfig : namespacesListConfig.getNamespaceConfig().getMetricDataQueriesConfig().stream(). + filter(obj -> obj.getMetricsConfig().getName().equals(metricName)).collect(Collectors.toList())) { + + for (DimensionsListConfig dimensionsListConfig : metricDataQueriesConfig.getMetricsConfig().getDimensionsListConfigs()) { + + dimensionCollection.add(Dimension.builder() + .name(dimensionsListConfig.getDimensionConfig().getName()) + .value(dimensionsListConfig.getDimensionConfig().getValue()).build()); + } + try { + Metric met = Metric.builder() + .metricName(metricDataQueriesConfig.getMetricsConfig().getName()) + .namespace(namespacesListConfig.getNamespaceConfig().getName()) + .dimensions(dimensionCollection) + .build(); + + MetricStat metStat = MetricStat.builder() + .stat(metricDataQueriesConfig.getMetricsConfig().getStat()) + .unit(metricDataQueriesConfig.getMetricsConfig().getUnit()) + .period(metricDataQueriesConfig.getMetricsConfig().getPeriod()) + .metric(met) + .build(); + + MetricDataQuery dataQuery = MetricDataQuery.builder() + .metricStat(metStat) + .id(metricDataQueriesConfig.getMetricsConfig().getId()) + .returnData(true) + .build(); + + List dataQueries = new ArrayList<>(); + dataQueries.add(dataQuery); + + GetMetricDataRequest getMetReq = GetMetricDataRequest.builder() + .maxDatapoints(10) + .scanBy(ScanBy.TIMESTAMP_DESCENDING) + .startTime(Instant.parse(namespacesListConfig.getNamespaceConfig().getStartTime())) + .endTime(Instant.parse(namespacesListConfig.getNamespaceConfig().getEndTime())) + .metricDataQueries(dataQueries) + .build(); + + GetMetricDataResponse response = cloudWatchClient.getMetricData(getMetReq); + List data = response.metricDataResults(); + writeToBuffer(data, bufferAccumulator, null); + + } catch (CloudWatchException | DateTimeParseException ex) { + LOG.error("Exception Occurred while scraping the metrics {0}", ex); + } + } + } + } + /** * Helps to write metrics data to buffer and to send end to end acknowledgements after successful processing * @param metricsData metricsData @@ -53,4 +190,9 @@ public void writeToBuffer(final List metricsData, LOG.error("Exception while flushing record events to buffer {0}", ex); } } + + @Override + public void run() { + startProcessingCloudwatchMetrics(STANDARD_BACKOFF_MILLIS); + } } diff --git a/data-prepper-plugins/cloudwatch-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/CloudwatchPartitionCreationSupplier.java b/data-prepper-plugins/cloudwatch-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/CloudwatchPartitionCreationSupplier.java new file mode 100644 index 0000000000..9a8241b46d --- /dev/null +++ b/data-prepper-plugins/cloudwatch-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/CloudwatchPartitionCreationSupplier.java @@ -0,0 +1,31 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source; + +import org.opensearch.dataprepper.model.source.coordinator.PartitionIdentifier; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +public class CloudwatchPartitionCreationSupplier implements Function, List> { + + private List metrics; + + public CloudwatchPartitionCreationSupplier(final List metrics) { + this.metrics = metrics; + } + + @Override + public List apply(final Map globalStateMap) { + final List objectsToProcess = new ArrayList<>(); + for ( String metric : metrics) { + objectsToProcess.add(PartitionIdentifier.builder().withPartitionKey(metric).build()); + } + return objectsToProcess; + } +} diff --git a/data-prepper-plugins/cloudwatch-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/CloudwatchSourceProgressState.java b/data-prepper-plugins/cloudwatch-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/CloudwatchSourceProgressState.java new file mode 100644 index 0000000000..e44dd7677d --- /dev/null +++ b/data-prepper-plugins/cloudwatch-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/CloudwatchSourceProgressState.java @@ -0,0 +1,16 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source; + +import com.fasterxml.jackson.annotation.JsonCreator; + +public class CloudwatchSourceProgressState { + + + @JsonCreator + CloudwatchSourceProgressState() { + } +} diff --git a/data-prepper-plugins/cloudwatch-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/AwsAuthenticationAdapterTest.java b/data-prepper-plugins/cloudwatch-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/AwsAuthenticationAdapterTest.java new file mode 100644 index 0000000000..7b76c9af1a --- /dev/null +++ b/data-prepper-plugins/cloudwatch-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/AwsAuthenticationAdapterTest.java @@ -0,0 +1,86 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions; +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.plugins.source.configuration.AwsAuthenticationOptions; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.regions.Region; + +import java.util.Collections; +import java.util.Map; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class AwsAuthenticationAdapterTest { + @Mock + private AwsCredentialsSupplier awsCredentialsSupplier; + @Mock + private CloudwatchMetricsSourceConfig cloudwatchMetricsSourceConfig; + + @Mock + private AwsAuthenticationOptions awsAuthenticationOptions; + private String stsRoleArn; + + @BeforeEach + void setUp() { + when(cloudwatchMetricsSourceConfig.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationOptions); + + stsRoleArn = UUID.randomUUID().toString(); + when(awsAuthenticationOptions.getAwsStsRoleArn()).thenReturn(stsRoleArn); + } + + private AwsAuthenticationAdapter createObjectUnderTest() { + return new AwsAuthenticationAdapter(awsCredentialsSupplier, cloudwatchMetricsSourceConfig); + } + + @Test + void getCredentialsProvider_returns_AwsCredentialsProvider_from_AwsCredentialsSupplier() { + final AwsCredentialsProvider expectedProvider = mock(AwsCredentialsProvider.class); + when(awsCredentialsSupplier.getProvider(any(AwsCredentialsOptions.class))) + .thenReturn(expectedProvider); + + assertThat(createObjectUnderTest().getCredentialsProvider(), equalTo(expectedProvider)); + } + + @ParameterizedTest + @ValueSource(strings = {"us-east-1", "eu-west-1"}) + void getCredentialsProvider_creates_expected_AwsCredentialsOptions(final String regionString) { + final String externalId = UUID.randomUUID().toString(); + final Region region = Region.of(regionString); + + final Map headerOverrides = Collections.singletonMap(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + when(awsAuthenticationOptions.getAwsRegion()).thenReturn(region); + + createObjectUnderTest().getCredentialsProvider(); + + final ArgumentCaptor credentialsOptionsArgumentCaptor = ArgumentCaptor.forClass(AwsCredentialsOptions.class); + verify(awsCredentialsSupplier).getProvider(credentialsOptionsArgumentCaptor.capture()); + + final AwsCredentialsOptions actualOptions = credentialsOptionsArgumentCaptor.getValue(); + + assertThat(actualOptions, notNullValue()); + assertThat(actualOptions.getStsRoleArn(), equalTo(stsRoleArn)); + assertThat(actualOptions.getRegion(), equalTo(region)); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/cloudwatch-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/CloudwatchMetricsSourceConfigTest.java b/data-prepper-plugins/cloudwatch-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/CloudwatchMetricsSourceConfigTest.java new file mode 100644 index 0000000000..78bc043c9d --- /dev/null +++ b/data-prepper-plugins/cloudwatch-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/CloudwatchMetricsSourceConfigTest.java @@ -0,0 +1,74 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.source; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.plugins.source.configuration.DimensionConfig; +import org.opensearch.dataprepper.plugins.source.configuration.MetricsConfig; +import org.opensearch.dataprepper.plugins.source.configuration.NamespaceConfig; +import software.amazon.awssdk.regions.Region; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; + +public class CloudwatchMetricsSourceConfigTest { + + private ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS)); + + @Test + public void cloud_watch_metrics_configuration_test() throws JsonProcessingException { + + String cloudMetricsYaml = " aws:\n" + + " region: ap-south-1\n" + + " sts_role_arn: \"arn:aws:iam::524239988944:role/app-test\"\n" + + " batch_size: 2000\n" + + " namespaces:\n" + + " - namespace:\n" + + " name: \"AWS/S3\"\n" + + " start_time: \"2023-05-19T18:35:24z\"\n" + + " end_time: \"2023-08-07T18:35:24z\"\n" + + " metricDataQueries:\n" + + " - metric:\n" + + " name: BucketSizeBytes\n" + + " id: \"q1\"\n" + + " period: 86400\n" + + " stat: \"Average\"\n" + + " unit: \"Bytes\"\n" + + " dimensions:\n" + + " - dimension:\n" + + " name: \"StorageType\"\n" + + " value: \"StandardStorage\""; + final CloudwatchMetricsSourceConfig metricsSourceConfig = objectMapper.readValue(cloudMetricsYaml, CloudwatchMetricsSourceConfig.class); + assertThat(metricsSourceConfig.getBatchSize(), equalTo(2000)); + assertThat(metricsSourceConfig.getAwsAuthenticationOptions().getAwsRegion(), equalTo(Region.AP_SOUTH_1)); + assertThat(metricsSourceConfig.getAwsAuthenticationOptions().getAwsStsRoleArn(), equalTo("arn:aws:iam::524239988944:role/app-test")); + + final NamespaceConfig namespaceConfig = metricsSourceConfig.getNamespacesListConfig().get(0).getNamespaceConfig(); + assertThat(namespaceConfig.getName(), equalTo("AWS/S3")); + assertThat(namespaceConfig.getEndTime(), equalTo("2023-08-07T18:35:24z")); + assertThat(namespaceConfig.getStartTime(), equalTo("2023-05-19T18:35:24z")); + assertThat(namespaceConfig.getMetricNames(), nullValue()); + final MetricsConfig metricsConfig = namespaceConfig.getMetricDataQueriesConfig().get(0).getMetricsConfig(); + assertThat(metricsConfig.getName(), equalTo("BucketSizeBytes")); + assertThat(metricsConfig.getId(), equalTo("q1")); + assertThat(metricsConfig.getPeriod(), equalTo(86400)); + assertThat(metricsConfig.getStat(), equalTo("Average")); + assertThat(metricsConfig.getUnit(), equalTo("Bytes")); + final DimensionConfig dimensionConfig = + metricsConfig.getDimensionsListConfigs().get(0).getDimensionConfig(); + assertThat(dimensionConfig.getName(), equalTo("StorageType")); + assertThat(dimensionConfig.getValue(), equalTo("StandardStorage")); + } + + @Test + public void cloud_watch_default_batch_size_test(){ + assertThat(new CloudwatchMetricsSourceConfig().getBatchSize(),equalTo(1000)); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/cloudwatch-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/CloudwatchMetricsSourceTest.java b/data-prepper-plugins/cloudwatch-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/CloudwatchMetricsSourceTest.java new file mode 100644 index 0000000000..e1fa431d4b --- /dev/null +++ b/data-prepper-plugins/cloudwatch-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/CloudwatchMetricsSourceTest.java @@ -0,0 +1,86 @@ +package org.opensearch.dataprepper.plugins.source; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.buffer.common.BufferAccumulator; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer; +import org.opensearch.dataprepper.plugins.source.configuration.AwsAuthenticationOptions; +import software.amazon.awssdk.regions.Region; + +import java.time.Duration; +import java.util.HashMap; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.when; + +public class CloudwatchMetricsSourceTest { + + private final String PLUGIN_NAME = "cloudwatch"; + private final String TEST_PIPELINE_NAME = "cloudwatch-test-pipeline"; + + private CloudwatchMetricsSource cloudwatchMetricsSource; + private PluginMetrics pluginMetrics; + private CloudwatchMetricsSourceConfig cloudwatchMetricsSourceConfig; + private PluginFactory pluginFactory; + private AcknowledgementSetManager acknowledgementSetManager; + private AwsCredentialsSupplier awsCredentialsSupplier; + + private int recordsToAccumulate = 100; + + private Duration bufferTimeout = Duration.ofSeconds(10); + + private AwsAuthenticationOptions awsAuthenticationOptions; + + + @BeforeEach + void setUp() { + pluginMetrics = PluginMetrics.fromNames(PLUGIN_NAME, TEST_PIPELINE_NAME); + cloudwatchMetricsSourceConfig = mock(CloudwatchMetricsSourceConfig.class); + pluginFactory = mock(PluginFactory.class); + acknowledgementSetManager = mock(AcknowledgementSetManager.class); + awsCredentialsSupplier = mock(AwsCredentialsSupplier.class); + awsAuthenticationOptions = mock(AwsAuthenticationOptions.class); + when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.AF_SOUTH_1); + when(awsAuthenticationOptions.getAwsStsRoleArn()).thenReturn("test-arn"); + when(cloudwatchMetricsSourceConfig.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationOptions); + cloudwatchMetricsSource = new CloudwatchMetricsSource(pluginMetrics, cloudwatchMetricsSourceConfig,awsCredentialsSupplier); + } + + @Test + void start_should_throw_IllegalStateException_when_buffer_is_null() { + assertThrows(IllegalStateException.class, () -> cloudwatchMetricsSource.start(null)); + } + + private BlockingBuffer> getBuffer() { + final HashMap integerHashMap = new HashMap<>(); + integerHashMap.put("buffer_size", 2); + integerHashMap.put("batch_size", 2); + final PluginSetting pluginSetting = new PluginSetting("blocking_buffer", integerHashMap); + pluginSetting.setPipelineName("pipeline"); + return new BlockingBuffer<>(pluginSetting); + } + + @Test + void start_should_read_data_from_cloud_watch_and_push_to_buffer(){ + final BlockingBuffer> buffer = getBuffer(); + final BufferAccumulator bufferAccumulator = mock(BufferAccumulator.class); + try (final MockedStatic bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class)) { + bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, recordsToAccumulate, bufferTimeout)) + .thenReturn(bufferAccumulator); + cloudwatchMetricsSource.start(buffer); + } + assertThat(buffer.isEmpty(),equalTo(true)); + } +} diff --git a/data-prepper-plugins/cloudwatch-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/CloudwatchMetricsWorkerTest.java b/data-prepper-plugins/cloudwatch-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/CloudwatchMetricsWorkerTest.java new file mode 100644 index 0000000000..6df4fba9d7 --- /dev/null +++ b/data-prepper-plugins/cloudwatch-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/CloudwatchMetricsWorkerTest.java @@ -0,0 +1,157 @@ +package org.opensearch.dataprepper.plugins.source; + + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.MockedStatic; +import org.opensearch.dataprepper.buffer.common.BufferAccumulator; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.SourcePartition; +import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer; +import org.opensearch.dataprepper.plugins.source.configuration.DimensionConfig; +import org.opensearch.dataprepper.plugins.source.configuration.DimensionsListConfig; +import org.opensearch.dataprepper.plugins.source.configuration.MetricDataQueriesConfig; +import org.opensearch.dataprepper.plugins.source.configuration.MetricsConfig; +import org.opensearch.dataprepper.plugins.source.configuration.NamespaceConfig; +import org.opensearch.dataprepper.plugins.source.configuration.NamespacesListConfig; +import software.amazon.awssdk.services.cloudwatch.CloudWatchClient; +import software.amazon.awssdk.services.cloudwatch.model.Dimension; +import software.amazon.awssdk.services.cloudwatch.model.GetMetricDataRequest; +import software.amazon.awssdk.services.cloudwatch.model.GetMetricDataResponse; +import software.amazon.awssdk.services.cloudwatch.model.MetricDataResult; +import software.amazon.awssdk.services.cloudwatch.model.MetricStat; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class CloudwatchMetricsWorkerTest { + + private CloudWatchClient cloudWatchClient; + + private BufferAccumulator> bufferAccumulator; + + private CloudwatchMetricsSourceConfig cloudwatchMetricsSourceConfig; + + private Collection dimensionCollection; + + private NamespacesListConfig namespacesListConfig; + + private NamespaceConfig namespaceConfig; + + private PluginMetrics pluginMetrics; + + private GetMetricDataResponse getMetricDataResponse; + + private MetricDataQueriesConfig metricDataQueriesConfig; + + private MetricsConfig metricsConfig; + + private DimensionsListConfig dimensionsListConfig; + + private SourceCoordinator sourceCoordinator; + + MetricDataResult metricDataResultMock = MetricDataResult.builder() + .id("BucketSizeBytes") + .statusCode("SUCCESS") + .values(1.10) + .build(); + + @BeforeEach + public void setup(){ + final String namespaceName = "AWS/S3"; + this.sourceCoordinator = mock(SourceCoordinator.class); + this.cloudWatchClient = mock(CloudWatchClient.class); + this.dimensionCollection = new ArrayList<>(); + this.dimensionCollection.add(Dimension.builder().name("StorageType").value("StandardStorage").build()); + this.namespacesListConfig = mock(NamespacesListConfig.class); + this.namespaceConfig = mock(NamespaceConfig.class); + when(namespaceConfig.getName()).thenReturn(namespaceName); + when(namespacesListConfig.getNamespaceConfig()).thenReturn(namespaceConfig); + this.cloudwatchMetricsSourceConfig = mock(CloudwatchMetricsSourceConfig.class); + this.metricDataQueriesConfig = mock(MetricDataQueriesConfig.class); + this.metricsConfig = mock(MetricsConfig.class); + DimensionConfig dimensionConfig = mock(DimensionConfig.class); + when(dimensionConfig.getName()).thenReturn("StorageType"); + when(dimensionConfig.getValue()).thenReturn("StandardStorage"); + this.dimensionsListConfig = mock(DimensionsListConfig.class); + when(dimensionsListConfig.getDimensionConfig()).thenReturn(dimensionConfig); + when(metricsConfig.getName()).thenReturn("test-metric"); + when(metricsConfig.getDimensionsListConfigs()).thenReturn(List.of(dimensionsListConfig)); + when(metricDataQueriesConfig.getMetricsConfig()).thenReturn(metricsConfig); + when(namespaceConfig.getStartTime()).thenReturn("2023-05-19T18:35:24z"); + when(namespaceConfig.getEndTime()).thenReturn("2023-08-07T18:35:24z"); + when(namespaceConfig.getMetricDataQueriesConfig()).thenReturn(List.of(metricDataQueriesConfig)); + when(cloudwatchMetricsSourceConfig.getNamespacesListConfig()).thenReturn(List.of(namespacesListConfig)); + this.getMetricDataResponse = mock(GetMetricDataResponse.class); + this.bufferAccumulator = mock(BufferAccumulator.class); + when(getMetricDataResponse.metricDataResults()).thenReturn(List.of(metricDataResultMock)); + when(cloudWatchClient.getMetricData(any(GetMetricDataRequest.class))).thenReturn(getMetricDataResponse); + } + + private BlockingBuffer> getBuffer() { + final HashMap integerHashMap = new HashMap<>(); + integerHashMap.put("buffer_size", 2); + integerHashMap.put("batch_size", 2); + final PluginSetting pluginSetting = new PluginSetting("blocking_buffer", integerHashMap); + pluginSetting.setPipelineName("pipeline"); + return new BlockingBuffer<>(pluginSetting); + } + + private CloudwatchMetricsWorker createObjectUnderTest(final BufferAccumulator bufferAccumulator1){ + return new CloudwatchMetricsWorker(cloudWatchClient,bufferAccumulator1,cloudwatchMetricsSourceConfig,dimensionCollection,pluginMetrics, sourceCoordinator); + } + + @Test + public void cloud_watch_metrics_test() throws Exception { + final String partitionKey = "AWS/S3" + "@" + "test-metric"; + final SourcePartition partitionToProcess = SourcePartition.builder(CloudwatchSourceProgressState.class) + .withPartitionKey(partitionKey) + .withPartitionClosedCount(0L) + .build(); + + given(sourceCoordinator.getNextPartition(any(Function.class))).willReturn(Optional.of(partitionToProcess)); + final BlockingBuffer> buffer = getBuffer(); + int recordsToAccumulate = 100; + Duration bufferTimeout = Duration.ofSeconds(10); + final ArgumentCaptor getMetricDataRequestCaptor = ArgumentCaptor.forClass(GetMetricDataRequest.class); + try (final MockedStatic bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class)) { + bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, recordsToAccumulate, bufferTimeout)) + .thenReturn(bufferAccumulator); + createObjectUnderTest(bufferAccumulator).run(); + } + verify(cloudWatchClient).getMetricData(getMetricDataRequestCaptor.capture()); + final GetMetricDataRequest getMetricDataRequestCaptorValue = getMetricDataRequestCaptor.getValue(); + assertThat(getMetricDataRequestCaptorValue.endTime().toString(),equalTo("2023-08-07T18:35:24Z")); + assertThat(getMetricDataRequestCaptorValue.startTime().toString(),equalTo("2023-05-19T18:35:24Z")); + final MetricStat metricStat = getMetricDataRequestCaptorValue.metricDataQueries().get(0).metricStat(); + assertThat(metricStat.metric().namespace(),equalTo("AWS/S3")); + assertThat(metricStat.metric().metricName(),equalTo("test-metric")); + assertThat(metricStat.metric().dimensions().get(0).name(),equalTo("StorageType")); + assertThat(metricStat.metric().dimensions().get(0).value(),equalTo("StandardStorage")); + verify(bufferAccumulator,times(1)).add(any(Record.class)); + verify(bufferAccumulator,times(1)).flush(); + } + + + +} diff --git a/data-prepper-plugins/cloudwatch-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/configuration/AwsAuthenticationOptionsTest.java b/data-prepper-plugins/cloudwatch-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/configuration/AwsAuthenticationOptionsTest.java new file mode 100644 index 0000000000..7c02103f3f --- /dev/null +++ b/data-prepper-plugins/cloudwatch-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/configuration/AwsAuthenticationOptionsTest.java @@ -0,0 +1,53 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.source.configuration; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import software.amazon.awssdk.regions.Region; + +import java.util.Collections; +import java.util.Map; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; + +class AwsAuthenticationOptionsTest { + private ObjectMapper objectMapper = new ObjectMapper(); + @ParameterizedTest + @ValueSource(strings = {"us-east-1", "us-west-2", "eu-central-1"}) + void getAwsRegion_returns_Region_of(final String regionString) { + final Region expectedRegionObject = Region.of(regionString); + final Map jsonMap = Map.of("region", regionString); + final AwsAuthenticationOptions objectUnderTest = objectMapper.convertValue(jsonMap, AwsAuthenticationOptions.class); + assertThat(objectUnderTest.getAwsRegion(), equalTo(expectedRegionObject)); + } + + @Test + void getAwsRegion_returns_null_when_region_is_null() { + final Map jsonMap = Collections.emptyMap(); + final AwsAuthenticationOptions objectUnderTest = objectMapper.convertValue(jsonMap, AwsAuthenticationOptions.class); + assertThat(objectUnderTest.getAwsRegion(), nullValue()); + } + + @Test + void getAwsStsRoleArn_returns_value_from_deserialized_JSON() { + final String stsRoleArn = UUID.randomUUID().toString(); + final Map jsonMap = Map.of("sts_role_arn", stsRoleArn); + final AwsAuthenticationOptions objectUnderTest = objectMapper.convertValue(jsonMap, AwsAuthenticationOptions.class); + assertThat(objectUnderTest.getAwsStsRoleArn(), equalTo(stsRoleArn)); + } + + @Test + void getAwsStsRoleArn_returns_null_if_not_in_JSON() { + final Map jsonMap = Collections.emptyMap(); + final AwsAuthenticationOptions objectUnderTest = objectMapper.convertValue(jsonMap, AwsAuthenticationOptions.class); + assertThat(objectUnderTest.getAwsStsRoleArn(), nullValue()); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/cloudwatch-metrics-source/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/data-prepper-plugins/cloudwatch-metrics-source/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker new file mode 100644 index 0000000000..23c33feb6d --- /dev/null +++ b/data-prepper-plugins/cloudwatch-metrics-source/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker @@ -0,0 +1,3 @@ +# To enable mocking of final classes with vanilla Mockito +# https://github.com/mockito/mockito/wiki/What%27s-new-in-Mockito-2#mock-the-unmockable-opt-in-mocking-of-final-classesmethods +mock-maker-inline