Skip to content

Commit

Permalink
Merge pull request #307 from asserts/radha/sc-16129
Browse files Browse the repository at this point in the history
Batch tasks list input to EcsClient.describeTasks
  • Loading branch information
jradhakrishnan authored Jul 13, 2023
2 parents 2cca099 + dc7339e commit 6ebb3c3
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 64 deletions.
2 changes: 1 addition & 1 deletion src/main/java/ai/asserts/aws/MetadataTaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public void updateMetadata() {
}

@SuppressWarnings("unused")
@Scheduled(fixedRateString = "${aws.metadata.scrape.manager.task.fixedDelay:300000}",
@Scheduled(fixedRateString = "${aws.metadata.scrape.manager.task.fixedDelay:60000}",
initialDelayString = "${aws.metadata.scrape.manager.task.initialDelay:5000}")
@Timed(description = "Time spent scraping AWS Resource meta data from all regions", histogram = true)
public void perMinute() {
Expand Down
23 changes: 3 additions & 20 deletions src/main/java/ai/asserts/aws/cloudwatch/query/QueryBatcher.java
Original file line number Diff line number Diff line change
@@ -1,40 +1,23 @@

package ai.asserts.aws.cloudwatch.query;

import com.google.common.collect.Lists;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;

@Component
public class QueryBatcher {
private final int queryCountLimit;
private final int dataLimit;

public QueryBatcher(@Value("${aws.metric.data.query.count.limit:500}") int queryCountLimit,
@Value("${aws.metric.data.limit:100800}") int dataLimit) {
public QueryBatcher(@Value("${aws.metric.data.query.count.limit:500}") int queryCountLimit) {
this.queryCountLimit = queryCountLimit;
this.dataLimit = dataLimit;
}

public List<List<MetricQuery>> splitIntoBatches(List<MetricQuery> queries) {
// Split queries into batches to meet the following limits
// Max of 500 metrics per API call
// Max of 100,800 result samples per API call
List<List<MetricQuery>> batches = new ArrayList<>();
batches.add(new ArrayList<>());
queries.forEach(metricQuery -> {
List<MetricQuery> currentBatch = batches.get(batches.size() - 1);
int sumTillNow = currentBatch.size();
if (sumTillNow + 1 < dataLimit && currentBatch.size() < queryCountLimit) {
currentBatch.add(metricQuery);
} else {
currentBatch = new ArrayList<>();
currentBatch.add(metricQuery);
batches.add(currentBatch);
}
});
return batches;
return Lists.partition(queries, queryCountLimit);
}
}
73 changes: 44 additions & 29 deletions src/main/java/ai/asserts/aws/exporter/ECSTaskProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
import ai.asserts.aws.resource.ResourceMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.collect.Lists;
import io.prometheus.client.Collector;
import io.prometheus.client.Collector.MetricFamilySamples.Sample;
import io.prometheus.client.CollectorRegistry;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import software.amazon.awssdk.services.ecs.EcsClient;
import software.amazon.awssdk.services.ecs.model.DescribeTasksRequest;
Expand Down Expand Up @@ -69,6 +71,7 @@ public class ECSTaskProvider extends Collector implements Runnable, Initializing
private final TaskExecutorUtil taskExecutorUtil;
private final CollectorRegistry collectorRegistry;
private final SnakeCaseUtil snakeCaseUtil;
private final int describeTasksBatchSize;

@Getter
@VisibleForTesting
Expand All @@ -78,7 +81,8 @@ public ECSTaskProvider(AWSClientProvider awsClientProvider, ScrapeConfigProvider
AccountProvider accountProvider, RateLimiter rateLimiter, ResourceMapper resourceMapper,
ECSClusterProvider ecsClusterProvider, ECSTaskUtil ecsTaskUtil,
MetricSampleBuilder sampleBuilder, CollectorRegistry collectorRegistry,
TaskExecutorUtil taskExecutorUtil, SnakeCaseUtil snakeCaseUtil) {
TaskExecutorUtil taskExecutorUtil, SnakeCaseUtil snakeCaseUtil,
@Value("${aws.ecs.describeTasks.batch_size:100}") int describeTaskBatchSize) {
this.awsClientProvider = awsClientProvider;
this.scrapeConfigProvider = scrapeConfigProvider;
this.accountProvider = accountProvider;
Expand All @@ -90,6 +94,7 @@ public ECSTaskProvider(AWSClientProvider awsClientProvider, ScrapeConfigProvider
this.collectorRegistry = collectorRegistry;
this.taskExecutorUtil = taskExecutorUtil;
this.snakeCaseUtil = snakeCaseUtil;
this.describeTasksBatchSize = describeTaskBatchSize;
}

@Override
Expand Down Expand Up @@ -181,6 +186,7 @@ void discoverNewTasks(Map<Resource, List<Resource>> clusterWiseNewTasks, EcsClie

// Build list of new tasks for which we need to make the describeTasks call
if (tasksResponse.hasTaskArns()) {
log.info("Found {} tasks in cluster {}", tasksResponse.taskArns().size(), cluster.getArn());
// Build the current task list
latestTasks.addAll(tasksResponse.taskArns().stream()
.map(resourceMapper::map)
Expand All @@ -195,6 +201,9 @@ void discoverNewTasks(Map<Resource, List<Resource>> clusterWiseNewTasks, EcsClie
// Retain only new tasks
current.entrySet().removeIf(entry -> !latestTasks.contains(entry.getKey()));
latestTasks.removeIf(current::containsKey);
if (latestTasks.size() > 0) {
log.info("Found {} new tasks in cluster {}", latestTasks.size(), cluster.getArn());
}
clusterWiseNewTasks.computeIfAbsent(cluster, k -> new ArrayList<>()).addAll(latestTasks);
}

Expand All @@ -205,35 +214,41 @@ void buildNewTargets(AWSAccount account, ScrapeConfig scrapeConfig,
// Make the describeTasks call for the new tasks and build the scrape targets
clusterWiseNewTasks.forEach((cluster, tasks) -> {
if (!tasks.isEmpty()) {
// Batch in sizes of 100
String operationName = "EcsClient/describeTasks";
DescribeTasksRequest request =
DescribeTasksRequest.builder()
.cluster(cluster.getName())
.tasks(tasks.stream().map(Resource::getArn).collect(Collectors.toList()))
.build();
DescribeTasksResponse taskResponse = rateLimiter.doWithRateLimit(operationName,
ImmutableSortedMap.of(
SCRAPE_ACCOUNT_ID_LABEL, cluster.getAccount(),
SCRAPE_REGION_LABEL, cluster.getRegion(),
SCRAPE_OPERATION_LABEL, operationName,
SCRAPE_NAMESPACE_LABEL, "AWS/ECS"),
() -> ecsClient.describeTasks(request));
if (taskResponse.hasTasks()) {
taskResponse.tasks().stream()
.filter(ecsTaskUtil::hasAllInfo)
.forEach(task -> resourceMapper.map(task.taskArn()).ifPresent(taskResource -> {
String tenantName = account.getTenant();
List<StaticConfig> staticConfigs =
ecsTaskUtil.buildScrapeTargets(
account,
scrapeConfigProvider.getScrapeConfig(tenantName),
ecsClient,
cluster,
getService(task), task);
Map<Resource, List<StaticConfig>> clusterTargets =
tasksByCluster.computeIfAbsent(cluster, k -> new HashMap<>());
clusterTargets.put(taskResource, staticConfigs);
}));

List<Resource> allTasks = new ArrayList<>(tasks);
List<List<Resource>> batches = Lists.partition(allTasks, describeTasksBatchSize);
for(List<Resource> batch : batches) {
DescribeTasksRequest request =
DescribeTasksRequest.builder()
.cluster(cluster.getName())
.tasks(batch.stream().map(Resource::getArn).collect(Collectors.toList()))
.build();
DescribeTasksResponse taskResponse = rateLimiter.doWithRateLimit(operationName,
ImmutableSortedMap.of(
SCRAPE_ACCOUNT_ID_LABEL, cluster.getAccount(),
SCRAPE_REGION_LABEL, cluster.getRegion(),
SCRAPE_OPERATION_LABEL, operationName,
SCRAPE_NAMESPACE_LABEL, "AWS/ECS"),
() -> ecsClient.describeTasks(request));
if (taskResponse.hasTasks()) {
taskResponse.tasks().stream()
.filter(ecsTaskUtil::hasAllInfo)
.forEach(task -> resourceMapper.map(task.taskArn()).ifPresent(taskResource -> {
String tenantName = account.getTenant();
List<StaticConfig> staticConfigs =
ecsTaskUtil.buildScrapeTargets(
account,
scrapeConfigProvider.getScrapeConfig(tenantName),
ecsClient,
cluster,
getService(task), task);
Map<Resource, List<StaticConfig>> clusterTargets =
tasksByCluster.computeIfAbsent(cluster, k -> new HashMap<>());
clusterTargets.put(taskResource, staticConfigs);
}));
}
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public void setup() {

@Test
void noSplit() {
QueryBatcher queryBatcher = new QueryBatcher(5, 10);
QueryBatcher queryBatcher = new QueryBatcher(5);
List<MetricQuery> queries = new ArrayList<>();
for (int i = 0; i < 3; i++) {
queries.add(metricQuery);
Expand All @@ -35,7 +35,7 @@ void noSplit() {

@Test
void split() {
QueryBatcher queryBatcher = new QueryBatcher(2, 10);
QueryBatcher queryBatcher = new QueryBatcher(2);
List<MetricQuery> queries = new ArrayList<>();
for (int i = 0; i < 3; i++) {
queries.add(metricQuery);
Expand Down
33 changes: 24 additions & 9 deletions src/test/java/ai/asserts/aws/exporter/ECSTaskProviderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void setup() {
testClass = new ECSTaskProvider(awsClientProvider, scrapeConfigProvider, accountProvider,
new RateLimiter(basicMetricCollector, (account) -> "acme"), resourceMapper, ecsClusterProvider,
ecsTaskUtil, sampleBuilder,
collectorRegistry, taskExecutorUtil, snakeCaseUtil);
collectorRegistry, taskExecutorUtil, snakeCaseUtil, 1);
}


Expand Down Expand Up @@ -245,7 +245,7 @@ public void getScrapeTargets() {
testClass = new ECSTaskProvider(awsClientProvider, scrapeConfigProvider, accountProvider,
new RateLimiter(basicMetricCollector, (account) -> "acme"), resourceMapper, ecsClusterProvider,
ecsTaskUtil, sampleBuilder,
collectorRegistry, taskExecutorUtil, snakeCaseUtil) {
collectorRegistry, taskExecutorUtil, snakeCaseUtil, 2) {
@Override
void discoverNewTasks(Map<Resource, List<Resource>> clusterWiseNewTasks, EcsClient ecsClient,
Resource cluster) {
Expand Down Expand Up @@ -370,11 +370,19 @@ public void buildNewTargets() {
.build();
expect(ecsClient.describeTasks(DescribeTasksRequest.builder()
.cluster("cluster1")
.tasks("service1-arn", "service2-arn")
.tasks("service1-arn")
.build())).andReturn(DescribeTasksResponse.builder()
.tasks(task1, task2)
.tasks(task1)
.build());
expect(ecsTaskUtil.hasAllInfo(task1)).andReturn(true);
basicMetricCollector.recordLatency(eq("aws_exporter_milliseconds"), anyObject(SortedMap.class), anyLong());

expect(ecsClient.describeTasks(DescribeTasksRequest.builder()
.cluster("cluster1")
.tasks("service2-arn")
.build())).andReturn(DescribeTasksResponse.builder()
.tasks(task2)
.build());
expect(ecsTaskUtil.hasAllInfo(task2)).andReturn(true);
basicMetricCollector.recordLatency(eq("aws_exporter_milliseconds"), anyObject(SortedMap.class), anyLong());

Expand All @@ -398,14 +406,23 @@ public void buildNewTargets() {
.group("service:service4")
.taskArn("service4-task4-arn")
.build();

expect(ecsClient.describeTasks(DescribeTasksRequest.builder()
.cluster("cluster2")
.tasks("service3-arn", "service4-arn")
.tasks("service3-arn")
.build())).andReturn(DescribeTasksResponse.builder()
.tasks(task3, task4)
.tasks(task3)
.build());

basicMetricCollector.recordLatency(eq("aws_exporter_milliseconds"), anyObject(SortedMap.class), anyLong());
expect(ecsTaskUtil.hasAllInfo(task3)).andReturn(true);

expect(ecsClient.describeTasks(DescribeTasksRequest.builder()
.cluster("cluster2")
.tasks("service4-arn")
.build())).andReturn(DescribeTasksResponse.builder()
.tasks(task4)
.build());
basicMetricCollector.recordLatency(eq("aws_exporter_milliseconds"), anyObject(SortedMap.class), anyLong());
expect(ecsTaskUtil.hasAllInfo(task4)).andReturn(true);

Resource task3Resource = Resource.builder().name("task3").build();
Expand All @@ -420,8 +437,6 @@ public void buildNewTargets() {
task4))
.andReturn(ImmutableList.of(mockStaticConfig, mockStaticConfig));

basicMetricCollector.recordLatency(eq("aws_exporter_milliseconds"), anyObject(SortedMap.class), anyLong());

replayAll();
assertTrue(testClass.getTasksByCluster().isEmpty());
testClass.buildNewTargets(account, scrapeConfig, clusterWiseARNs, ecsClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.google.common.collect.ImmutableSet;
import io.prometheus.client.Collector;
import io.prometheus.client.CollectorRegistry;
import org.easymock.Capture;
import org.easymock.EasyMockSupport;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -27,7 +26,6 @@
import software.amazon.awssdk.services.redshift.model.Tag;
import software.amazon.awssdk.utils.ImmutableMap;

import java.util.Map;
import java.util.Optional;
import java.util.SortedMap;
import java.util.TreeMap;
Expand All @@ -36,7 +34,6 @@
import static ai.asserts.aws.MetricNameUtil.SCRAPE_LATENCY_METRIC;
import static org.easymock.EasyMock.anyLong;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.anyString;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
Expand Down

0 comments on commit 6ebb3c3

Please sign in to comment.