Skip to content

Commit

Permalink
GitHub-issue#1994 : Implementation Of Cloudwatch metrics source plugi…
Browse files Browse the repository at this point in the history
…n configuration Junit test cases and source coordinator.

Signed-off-by: venkataraopasyavula <[email protected]>
  • Loading branch information
venkataraopasyavula committed Aug 18, 2023
1 parent 21b26c5 commit 4ec407f
Show file tree
Hide file tree
Showing 11 changed files with 698 additions and 74 deletions.
3 changes: 2 additions & 1 deletion data-prepper-plugins/cloudwatch-metrics-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ dependencies {
implementation 'org.hibernate.validator:hibernate-validator:7.0.5.Final'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-csv'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.15.2'

testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
testImplementation project(':data-prepper-plugins:blocking-buffer')
}

test {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,8 @@
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source;

import org.opensearch.dataprepper.plugins.source.configuration.NamespacesListConfig;
import org.opensearch.dataprepper.plugins.source.configuration.MetricDataQueriesConfig;
import org.opensearch.dataprepper.plugins.source.configuration.DimensionsListConfig;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
Expand All @@ -21,46 +17,53 @@
import java.time.Duration;
import java.util.Collection;

import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator;
import org.opensearch.dataprepper.model.source.coordinator.UsesSourceCoordination;
import software.amazon.awssdk.services.cloudwatch.model.Dimension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.services.cloudwatch.CloudWatchClient;
import software.amazon.awssdk.services.cloudwatch.model.Metric;
import software.amazon.awssdk.services.cloudwatch.model.MetricStat;
import software.amazon.awssdk.services.cloudwatch.model.MetricDataQuery;
import software.amazon.awssdk.services.cloudwatch.model.GetMetricDataRequest;
import software.amazon.awssdk.services.cloudwatch.model.GetMetricDataResponse;
import software.amazon.awssdk.services.cloudwatch.model.MetricDataResult;
import software.amazon.awssdk.services.cloudwatch.model.CloudWatchException;
import software.amazon.awssdk.services.cloudwatch.model.ScanBy;
import java.time.Instant;
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;


/**
* An implementation of cloudwatch metrics source class to Scrape the metrics using GetMetricData API
*/
@DataPrepperPlugin(name = "cloudwatch", pluginType = Source.class, pluginConfigurationType = CloudwatchMetricsSourceConfig.class)
public class CloudwatchMetricsSource implements Source<Record<Event>> {
public class CloudwatchMetricsSource implements Source<Record<Event>>, UsesSourceCoordination {

private static final Logger LOG = LoggerFactory.getLogger(CloudwatchMetricsSource.class);

private final Collection<Dimension> dimensionCollection;
private final CloudwatchMetricsWorker cloudwatchMetricsWorker;

private final PluginMetrics pluginMetrics;

private CloudwatchMetricsWorker cloudwatchMetricsWorker;

private final CloudwatchMetricsSourceConfig cloudwatchMetricsSourceConfig;

private final AwsCredentialsSupplier awsCredentialsSupplier;

private static final Duration BUFFER_TIMEOUT = Duration.ofSeconds(30);

private Thread cloudwatchMetricsWorkerThread;

private SourceCoordinator<CloudwatchSourceProgressState> sourceCoordinator;



@DataPrepperPluginConstructor
public CloudwatchMetricsSource(
final PluginMetrics pluginMetrics,
final CloudwatchMetricsSourceConfig cloudwatchMetricsSourceConfig,
final AwsCredentialsSupplier awsCredentialsSupplier) {
this.cloudwatchMetricsSourceConfig = cloudwatchMetricsSourceConfig;
this.awsCredentialsSupplier = awsCredentialsSupplier;
cloudwatchMetricsWorker = new CloudwatchMetricsWorker();
dimensionCollection = new ArrayList<>();
this.dimensionCollection = new ArrayList<>();
this.pluginMetrics = pluginMetrics;

}

@Override
Expand All @@ -84,60 +87,32 @@ public void start(Buffer<Record<Event>> buffer) {
.credentialsProvider(credentialsProvider)
.build();

for (NamespacesListConfig namespacesListConfig : cloudwatchMetricsSourceConfig.getNamespacesListConfig()) {
dimensionCollection.clear();
for (MetricDataQueriesConfig metricDataQueriesConfig : namespacesListConfig.getNamespaceConfig().getMetricDataQueriesConfig()) {

for (DimensionsListConfig dimensionsListConfig : metricDataQueriesConfig.getMetricsConfig().getDimensionsListConfigs()) {

dimensionCollection.add(Dimension.builder()
.name(dimensionsListConfig.getDimensionConfig().getName())
.value(dimensionsListConfig.getDimensionConfig().getValue()).build());
}
try {

Metric met = Metric.builder()
.metricName(metricDataQueriesConfig.getMetricsConfig().getName())
.namespace(namespacesListConfig.getNamespaceConfig().getName())
.dimensions(dimensionCollection)
.build();

MetricStat metStat = MetricStat.builder()
.stat(metricDataQueriesConfig.getMetricsConfig().getStat())
.unit(metricDataQueriesConfig.getMetricsConfig().getUnit())
.period(metricDataQueriesConfig.getMetricsConfig().getPeriod())
.metric(met)
.build();

MetricDataQuery dataQUery = MetricDataQuery.builder()
.metricStat(metStat)
.id(metricDataQueriesConfig.getMetricsConfig().getId())
.returnData(true)
.build();

List<MetricDataQuery> dataQueries = new ArrayList<>();
dataQueries.add(dataQUery);

GetMetricDataRequest getMetReq = GetMetricDataRequest.builder()
.maxDatapoints(10)
.scanBy(ScanBy.TIMESTAMP_DESCENDING)
.startTime(Instant.parse(namespacesListConfig.getNamespaceConfig().getStartTime()))
.endTime(Instant.parse(namespacesListConfig.getNamespaceConfig().getEndTime()))
.metricDataQueries(dataQueries)
.build();

GetMetricDataResponse response = cloudWatchClient.getMetricData(getMetReq);
List<MetricDataResult> data = response.metricDataResults();
cloudwatchMetricsWorker.writeToBuffer(data, bufferAccumulator, null);

} catch (CloudWatchException | DateTimeParseException ex) {
LOG.error("Exception Occurred while scraping the metrics {0}", ex);
}
}
}
cloudwatchMetricsWorkerThread = new Thread(new CloudwatchMetricsWorker(cloudWatchClient,
bufferAccumulator,
cloudwatchMetricsSourceConfig,
dimensionCollection,
pluginMetrics,
sourceCoordinator));

cloudwatchMetricsWorkerThread.start();
}

@Override
public void stop() {
LOG.info("Stopped Cloudwatch source.");
cloudwatchMetricsWorkerThread.interrupt();
if (Objects.nonNull(sourceCoordinator)) {
sourceCoordinator.giveUpPartitions();
}
}

@Override
public <T> void setSourceCoordinator(SourceCoordinator<T> sourceCoordinator) {
this.sourceCoordinator = (SourceCoordinator<CloudwatchSourceProgressState>) sourceCoordinator;
sourceCoordinator.initialize();
}

@Override
public Class<?> getPartitionProgressStateClass() {
return CloudwatchSourceProgressState.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,168 @@
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source;

import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.PartitionIdentifier;
import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator;
import org.opensearch.dataprepper.model.source.coordinator.SourcePartition;
import org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionNotFoundException;
import org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionNotOwnedException;
import org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionUpdateException;
import org.opensearch.dataprepper.plugins.source.configuration.DimensionsListConfig;
import org.opensearch.dataprepper.plugins.source.configuration.MetricDataQueriesConfig;
import org.opensearch.dataprepper.plugins.source.configuration.NamespacesListConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.cloudwatch.CloudWatchClient;
import software.amazon.awssdk.services.cloudwatch.model.CloudWatchException;
import software.amazon.awssdk.services.cloudwatch.model.Dimension;
import software.amazon.awssdk.services.cloudwatch.model.GetMetricDataRequest;
import software.amazon.awssdk.services.cloudwatch.model.GetMetricDataResponse;
import software.amazon.awssdk.services.cloudwatch.model.Metric;
import software.amazon.awssdk.services.cloudwatch.model.MetricDataQuery;
import software.amazon.awssdk.services.cloudwatch.model.MetricDataResult;
import software.amazon.awssdk.services.cloudwatch.model.MetricStat;
import software.amazon.awssdk.services.cloudwatch.model.ScanBy;

import java.time.Instant;
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* implements the CloudwatchMetricsWorker to read and metrics data message and push to buffer.
*/
/**
* An implementation of cloudwatch metrics source worker class to write the metric to Buffer
*/
public class CloudwatchMetricsWorker {
public class CloudwatchMetricsWorker implements Runnable{

private static final Logger LOG = LoggerFactory.getLogger(CloudwatchMetricsWorker.class);

private static final int STANDARD_BACKOFF_MILLIS = 30_000;

private final BufferAccumulator<Record<Event>> bufferAccumulator;

private final CloudWatchClient cloudWatchClient;

private final CloudwatchMetricsSourceConfig cloudwatchMetricsSourceConfig;

private final Collection<Dimension> dimensionCollection;

private final PluginMetrics pluginMetrics;

private final Function<Map<String, Object>, List<PartitionIdentifier>> partitionCreationSupplier;

private final SourceCoordinator<CloudwatchSourceProgressState> sourceCoordinator;

public CloudwatchMetricsWorker(final CloudWatchClient cloudWatchClient,
final BufferAccumulator<Record<Event>> bufferAccumulator,
final CloudwatchMetricsSourceConfig cloudwatchMetricsSourceConfig,
final Collection<Dimension> dimensionCollection,
final PluginMetrics pluginMetrics,
final SourceCoordinator<CloudwatchSourceProgressState> sourceCoordinator) {
this.bufferAccumulator = bufferAccumulator;
this.cloudWatchClient = cloudWatchClient;
this.cloudwatchMetricsSourceConfig = cloudwatchMetricsSourceConfig;
this.dimensionCollection = dimensionCollection;
this.pluginMetrics = pluginMetrics;
this.sourceCoordinator = sourceCoordinator;
final List<String> namespaceMetricNames = cloudwatchMetricsSourceConfig.getNamespacesListConfig().stream()
.map(NamespacesListConfig::getNamespaceConfig).flatMap(namespaceConfig -> namespaceConfig.getMetricDataQueriesConfig()
.stream().map(metricName -> namespaceConfig.getName() + "@" + metricName.getMetricsConfig().getName())).collect(Collectors.toList());
this.partitionCreationSupplier = new CloudwatchPartitionCreationSupplier(namespaceMetricNames);
}

void startProcessingCloudwatchMetrics(final int waitTimeMillis){
final Optional<SourcePartition<CloudwatchSourceProgressState>> objectToProcess = sourceCoordinator.getNextPartition(partitionCreationSupplier);

if (objectToProcess.isEmpty()) {
try {
Thread.sleep(waitTimeMillis);
} catch (InterruptedException e) {
e.printStackTrace();
}
return;
}
final String nameSpaceName = objectToProcess.get().getPartitionKey().split("@")[0];
final String metricName = objectToProcess.get().getPartitionKey().split("@")[1];
try {
processCloudwatchMetrics(nameSpaceName,metricName);
sourceCoordinator.completePartition(objectToProcess.get().getPartitionKey());
} catch (final PartitionNotOwnedException | PartitionNotFoundException | PartitionUpdateException e) {
LOG.warn("Cloud watch source received an exception from the source coordinator. There is a potential for duplicate data from {}, giving up partition and getting next partition: {}",objectToProcess.get().getPartitionKey() , e.getMessage());
sourceCoordinator.giveUpPartitions();
}
}

protected void processCloudwatchMetrics(final String nameSpaceName, final String metricName) {
for(NamespacesListConfig namespacesListConfig : cloudwatchMetricsSourceConfig.getNamespacesListConfig().stream()
.filter(obj -> obj.getNamespaceConfig().getName().equals(nameSpaceName)).collect(Collectors.toList())) {
dimensionCollection.clear();
for (MetricDataQueriesConfig metricDataQueriesConfig : namespacesListConfig.getNamespaceConfig().getMetricDataQueriesConfig().stream().
filter(obj -> obj.getMetricsConfig().getName().equals(metricName)).collect(Collectors.toList())) {

for (DimensionsListConfig dimensionsListConfig : metricDataQueriesConfig.getMetricsConfig().getDimensionsListConfigs()) {

dimensionCollection.add(Dimension.builder()
.name(dimensionsListConfig.getDimensionConfig().getName())
.value(dimensionsListConfig.getDimensionConfig().getValue()).build());
}
try {
Metric met = Metric.builder()
.metricName(metricDataQueriesConfig.getMetricsConfig().getName())
.namespace(namespacesListConfig.getNamespaceConfig().getName())
.dimensions(dimensionCollection)
.build();

MetricStat metStat = MetricStat.builder()
.stat(metricDataQueriesConfig.getMetricsConfig().getStat())
.unit(metricDataQueriesConfig.getMetricsConfig().getUnit())
.period(metricDataQueriesConfig.getMetricsConfig().getPeriod())
.metric(met)
.build();

MetricDataQuery dataQuery = MetricDataQuery.builder()
.metricStat(metStat)
.id(metricDataQueriesConfig.getMetricsConfig().getId())
.returnData(true)
.build();

List<MetricDataQuery> dataQueries = new ArrayList<>();
dataQueries.add(dataQuery);

GetMetricDataRequest getMetReq = GetMetricDataRequest.builder()
.maxDatapoints(10)
.scanBy(ScanBy.TIMESTAMP_DESCENDING)
.startTime(Instant.parse(namespacesListConfig.getNamespaceConfig().getStartTime()))
.endTime(Instant.parse(namespacesListConfig.getNamespaceConfig().getEndTime()))
.metricDataQueries(dataQueries)
.build();

GetMetricDataResponse response = cloudWatchClient.getMetricData(getMetReq);
List<MetricDataResult> data = response.metricDataResults();
writeToBuffer(data, bufferAccumulator, null);

} catch (CloudWatchException | DateTimeParseException ex) {
LOG.error("Exception Occurred while scraping the metrics {0}", ex);
}
}
}
}

/**
* Helps to write metrics data to buffer and to send end to end acknowledgements after successful processing
* @param metricsData metricsData
Expand All @@ -53,4 +190,9 @@ public void writeToBuffer(final List<MetricDataResult> metricsData,
LOG.error("Exception while flushing record events to buffer {0}", ex);
}
}

@Override
public void run() {
startProcessingCloudwatchMetrics(STANDARD_BACKOFF_MILLIS);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source;

import org.opensearch.dataprepper.model.source.coordinator.PartitionIdentifier;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

public class CloudwatchPartitionCreationSupplier implements Function<Map<String, Object>, List<PartitionIdentifier>> {

private List<String> metrics;

public CloudwatchPartitionCreationSupplier(final List<String> metrics) {
this.metrics = metrics;
}

@Override
public List<PartitionIdentifier> apply(final Map<String, Object> globalStateMap) {
final List<PartitionIdentifier> objectsToProcess = new ArrayList<>();
for ( String metric : metrics) {
objectsToProcess.add(PartitionIdentifier.builder().withPartitionKey(metric).build());
}
return objectsToProcess;
}
}
Loading

0 comments on commit 4ec407f

Please sign in to comment.