From dc7339ebff0a3fdf5c1ac68799498bb5dafe2444 Mon Sep 17 00:00:00 2001 From: jradhakrishnan Date: Thu, 13 Jul 2023 11:34:15 +0530 Subject: [PATCH] Batch tasks list input to EcsClient.describeTasks [ch16129] * Batch task list * Switch to `Guava` `Lists.partition(List, int)` for batching everywhere * Reduce ECS discovery interval from `5m` to `1m` --- .../ai/asserts/aws/MetadataTaskManager.java | 2 +- .../aws/cloudwatch/query/QueryBatcher.java | 23 +----- .../asserts/aws/exporter/ECSTaskProvider.java | 73 +++++++++++-------- .../cloudwatch/query/QueryBatcherTest.java | 4 +- .../aws/exporter/ECSTaskProviderTest.java | 33 ++++++--- .../aws/exporter/RedshiftExporterTest.java | 3 - 6 files changed, 74 insertions(+), 64 deletions(-) diff --git a/src/main/java/ai/asserts/aws/MetadataTaskManager.java b/src/main/java/ai/asserts/aws/MetadataTaskManager.java index 2a603ea3..ce2839fd 100644 --- a/src/main/java/ai/asserts/aws/MetadataTaskManager.java +++ b/src/main/java/ai/asserts/aws/MetadataTaskManager.java @@ -176,7 +176,7 @@ public void updateMetadata() { } @SuppressWarnings("unused") - @Scheduled(fixedRateString = "${aws.metadata.scrape.manager.task.fixedDelay:300000}", + @Scheduled(fixedRateString = "${aws.metadata.scrape.manager.task.fixedDelay:60000}", initialDelayString = "${aws.metadata.scrape.manager.task.initialDelay:5000}") @Timed(description = "Time spent scraping AWS Resource meta data from all regions", histogram = true) public void perMinute() { diff --git a/src/main/java/ai/asserts/aws/cloudwatch/query/QueryBatcher.java b/src/main/java/ai/asserts/aws/cloudwatch/query/QueryBatcher.java index 75d1ed4e..2996d965 100644 --- a/src/main/java/ai/asserts/aws/cloudwatch/query/QueryBatcher.java +++ b/src/main/java/ai/asserts/aws/cloudwatch/query/QueryBatcher.java @@ -1,40 +1,23 @@ package ai.asserts.aws.cloudwatch.query; +import com.google.common.collect.Lists; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; -import java.util.ArrayList; import java.util.List; @Component public class QueryBatcher { private final int queryCountLimit; - private final int dataLimit; - public QueryBatcher(@Value("${aws.metric.data.query.count.limit:500}") int queryCountLimit, - @Value("${aws.metric.data.limit:100800}") int dataLimit) { + public QueryBatcher(@Value("${aws.metric.data.query.count.limit:500}") int queryCountLimit) { this.queryCountLimit = queryCountLimit; - this.dataLimit = dataLimit; } public List> splitIntoBatches(List queries) { // Split queries into batches to meet the following limits // Max of 500 metrics per API call - // Max of 100,800 result samples per API call - List> batches = new ArrayList<>(); - batches.add(new ArrayList<>()); - queries.forEach(metricQuery -> { - List currentBatch = batches.get(batches.size() - 1); - int sumTillNow = currentBatch.size(); - if (sumTillNow + 1 < dataLimit && currentBatch.size() < queryCountLimit) { - currentBatch.add(metricQuery); - } else { - currentBatch = new ArrayList<>(); - currentBatch.add(metricQuery); - batches.add(currentBatch); - } - }); - return batches; + return Lists.partition(queries, queryCountLimit); } } diff --git a/src/main/java/ai/asserts/aws/exporter/ECSTaskProvider.java b/src/main/java/ai/asserts/aws/exporter/ECSTaskProvider.java index 7009afea..997546c6 100644 --- a/src/main/java/ai/asserts/aws/exporter/ECSTaskProvider.java +++ b/src/main/java/ai/asserts/aws/exporter/ECSTaskProvider.java @@ -18,12 +18,14 @@ import ai.asserts.aws.resource.ResourceMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSortedMap; +import com.google.common.collect.Lists; import io.prometheus.client.Collector; import io.prometheus.client.Collector.MetricFamilySamples.Sample; import io.prometheus.client.CollectorRegistry; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import software.amazon.awssdk.services.ecs.EcsClient; import software.amazon.awssdk.services.ecs.model.DescribeTasksRequest; @@ -69,6 +71,7 @@ public class ECSTaskProvider extends Collector implements Runnable, Initializing private final TaskExecutorUtil taskExecutorUtil; private final CollectorRegistry collectorRegistry; private final SnakeCaseUtil snakeCaseUtil; + private final int describeTasksBatchSize; @Getter @VisibleForTesting @@ -78,7 +81,8 @@ public ECSTaskProvider(AWSClientProvider awsClientProvider, ScrapeConfigProvider AccountProvider accountProvider, RateLimiter rateLimiter, ResourceMapper resourceMapper, ECSClusterProvider ecsClusterProvider, ECSTaskUtil ecsTaskUtil, MetricSampleBuilder sampleBuilder, CollectorRegistry collectorRegistry, - TaskExecutorUtil taskExecutorUtil, SnakeCaseUtil snakeCaseUtil) { + TaskExecutorUtil taskExecutorUtil, SnakeCaseUtil snakeCaseUtil, + @Value("${aws.ecs.describeTasks.batch_size:100}") int describeTaskBatchSize) { this.awsClientProvider = awsClientProvider; this.scrapeConfigProvider = scrapeConfigProvider; this.accountProvider = accountProvider; @@ -90,6 +94,7 @@ public ECSTaskProvider(AWSClientProvider awsClientProvider, ScrapeConfigProvider this.collectorRegistry = collectorRegistry; this.taskExecutorUtil = taskExecutorUtil; this.snakeCaseUtil = snakeCaseUtil; + this.describeTasksBatchSize = describeTaskBatchSize; } @Override @@ -181,6 +186,7 @@ void discoverNewTasks(Map> clusterWiseNewTasks, EcsClie // Build list of new tasks for which we need to make the describeTasks call if (tasksResponse.hasTaskArns()) { + log.info("Found {} tasks in cluster {}", tasksResponse.taskArns().size(), cluster.getArn()); // Build the current task list latestTasks.addAll(tasksResponse.taskArns().stream() .map(resourceMapper::map) @@ -195,6 +201,9 @@ void discoverNewTasks(Map> clusterWiseNewTasks, EcsClie // Retain only new tasks current.entrySet().removeIf(entry -> !latestTasks.contains(entry.getKey())); latestTasks.removeIf(current::containsKey); + if (latestTasks.size() > 0) { + log.info("Found {} new tasks in cluster {}", latestTasks.size(), cluster.getArn()); + } clusterWiseNewTasks.computeIfAbsent(cluster, k -> new ArrayList<>()).addAll(latestTasks); } @@ -205,35 +214,41 @@ void buildNewTargets(AWSAccount account, ScrapeConfig scrapeConfig, // Make the describeTasks call for the new tasks and build the scrape targets clusterWiseNewTasks.forEach((cluster, tasks) -> { if (!tasks.isEmpty()) { + // Batch in sizes of 100 String operationName = "EcsClient/describeTasks"; - DescribeTasksRequest request = - DescribeTasksRequest.builder() - .cluster(cluster.getName()) - .tasks(tasks.stream().map(Resource::getArn).collect(Collectors.toList())) - .build(); - DescribeTasksResponse taskResponse = rateLimiter.doWithRateLimit(operationName, - ImmutableSortedMap.of( - SCRAPE_ACCOUNT_ID_LABEL, cluster.getAccount(), - SCRAPE_REGION_LABEL, cluster.getRegion(), - SCRAPE_OPERATION_LABEL, operationName, - SCRAPE_NAMESPACE_LABEL, "AWS/ECS"), - () -> ecsClient.describeTasks(request)); - if (taskResponse.hasTasks()) { - taskResponse.tasks().stream() - .filter(ecsTaskUtil::hasAllInfo) - .forEach(task -> resourceMapper.map(task.taskArn()).ifPresent(taskResource -> { - String tenantName = account.getTenant(); - List staticConfigs = - ecsTaskUtil.buildScrapeTargets( - account, - scrapeConfigProvider.getScrapeConfig(tenantName), - ecsClient, - cluster, - getService(task), task); - Map> clusterTargets = - tasksByCluster.computeIfAbsent(cluster, k -> new HashMap<>()); - clusterTargets.put(taskResource, staticConfigs); - })); + + List allTasks = new ArrayList<>(tasks); + List> batches = Lists.partition(allTasks, describeTasksBatchSize); + for(List batch : batches) { + DescribeTasksRequest request = + DescribeTasksRequest.builder() + .cluster(cluster.getName()) + .tasks(batch.stream().map(Resource::getArn).collect(Collectors.toList())) + .build(); + DescribeTasksResponse taskResponse = rateLimiter.doWithRateLimit(operationName, + ImmutableSortedMap.of( + SCRAPE_ACCOUNT_ID_LABEL, cluster.getAccount(), + SCRAPE_REGION_LABEL, cluster.getRegion(), + SCRAPE_OPERATION_LABEL, operationName, + SCRAPE_NAMESPACE_LABEL, "AWS/ECS"), + () -> ecsClient.describeTasks(request)); + if (taskResponse.hasTasks()) { + taskResponse.tasks().stream() + .filter(ecsTaskUtil::hasAllInfo) + .forEach(task -> resourceMapper.map(task.taskArn()).ifPresent(taskResource -> { + String tenantName = account.getTenant(); + List staticConfigs = + ecsTaskUtil.buildScrapeTargets( + account, + scrapeConfigProvider.getScrapeConfig(tenantName), + ecsClient, + cluster, + getService(task), task); + Map> clusterTargets = + tasksByCluster.computeIfAbsent(cluster, k -> new HashMap<>()); + clusterTargets.put(taskResource, staticConfigs); + })); + } } } }); diff --git a/src/test/java/ai/asserts/aws/cloudwatch/query/QueryBatcherTest.java b/src/test/java/ai/asserts/aws/cloudwatch/query/QueryBatcherTest.java index 35d6f662..245b5590 100644 --- a/src/test/java/ai/asserts/aws/cloudwatch/query/QueryBatcherTest.java +++ b/src/test/java/ai/asserts/aws/cloudwatch/query/QueryBatcherTest.java @@ -21,7 +21,7 @@ public void setup() { @Test void noSplit() { - QueryBatcher queryBatcher = new QueryBatcher(5, 10); + QueryBatcher queryBatcher = new QueryBatcher(5); List queries = new ArrayList<>(); for (int i = 0; i < 3; i++) { queries.add(metricQuery); @@ -35,7 +35,7 @@ void noSplit() { @Test void split() { - QueryBatcher queryBatcher = new QueryBatcher(2, 10); + QueryBatcher queryBatcher = new QueryBatcher(2); List queries = new ArrayList<>(); for (int i = 0; i < 3; i++) { queries.add(metricQuery); diff --git a/src/test/java/ai/asserts/aws/exporter/ECSTaskProviderTest.java b/src/test/java/ai/asserts/aws/exporter/ECSTaskProviderTest.java index ea710527..5b9a1670 100644 --- a/src/test/java/ai/asserts/aws/exporter/ECSTaskProviderTest.java +++ b/src/test/java/ai/asserts/aws/exporter/ECSTaskProviderTest.java @@ -101,7 +101,7 @@ public void setup() { testClass = new ECSTaskProvider(awsClientProvider, scrapeConfigProvider, accountProvider, new RateLimiter(basicMetricCollector, (account) -> "acme"), resourceMapper, ecsClusterProvider, ecsTaskUtil, sampleBuilder, - collectorRegistry, taskExecutorUtil, snakeCaseUtil); + collectorRegistry, taskExecutorUtil, snakeCaseUtil, 1); } @@ -245,7 +245,7 @@ public void getScrapeTargets() { testClass = new ECSTaskProvider(awsClientProvider, scrapeConfigProvider, accountProvider, new RateLimiter(basicMetricCollector, (account) -> "acme"), resourceMapper, ecsClusterProvider, ecsTaskUtil, sampleBuilder, - collectorRegistry, taskExecutorUtil, snakeCaseUtil) { + collectorRegistry, taskExecutorUtil, snakeCaseUtil, 2) { @Override void discoverNewTasks(Map> clusterWiseNewTasks, EcsClient ecsClient, Resource cluster) { @@ -370,11 +370,19 @@ public void buildNewTargets() { .build(); expect(ecsClient.describeTasks(DescribeTasksRequest.builder() .cluster("cluster1") - .tasks("service1-arn", "service2-arn") + .tasks("service1-arn") .build())).andReturn(DescribeTasksResponse.builder() - .tasks(task1, task2) + .tasks(task1) .build()); expect(ecsTaskUtil.hasAllInfo(task1)).andReturn(true); + basicMetricCollector.recordLatency(eq("aws_exporter_milliseconds"), anyObject(SortedMap.class), anyLong()); + + expect(ecsClient.describeTasks(DescribeTasksRequest.builder() + .cluster("cluster1") + .tasks("service2-arn") + .build())).andReturn(DescribeTasksResponse.builder() + .tasks(task2) + .build()); expect(ecsTaskUtil.hasAllInfo(task2)).andReturn(true); basicMetricCollector.recordLatency(eq("aws_exporter_milliseconds"), anyObject(SortedMap.class), anyLong()); @@ -398,14 +406,23 @@ public void buildNewTargets() { .group("service:service4") .taskArn("service4-task4-arn") .build(); + expect(ecsClient.describeTasks(DescribeTasksRequest.builder() .cluster("cluster2") - .tasks("service3-arn", "service4-arn") + .tasks("service3-arn") .build())).andReturn(DescribeTasksResponse.builder() - .tasks(task3, task4) + .tasks(task3) .build()); - + basicMetricCollector.recordLatency(eq("aws_exporter_milliseconds"), anyObject(SortedMap.class), anyLong()); expect(ecsTaskUtil.hasAllInfo(task3)).andReturn(true); + + expect(ecsClient.describeTasks(DescribeTasksRequest.builder() + .cluster("cluster2") + .tasks("service4-arn") + .build())).andReturn(DescribeTasksResponse.builder() + .tasks(task4) + .build()); + basicMetricCollector.recordLatency(eq("aws_exporter_milliseconds"), anyObject(SortedMap.class), anyLong()); expect(ecsTaskUtil.hasAllInfo(task4)).andReturn(true); Resource task3Resource = Resource.builder().name("task3").build(); @@ -420,8 +437,6 @@ public void buildNewTargets() { task4)) .andReturn(ImmutableList.of(mockStaticConfig, mockStaticConfig)); - basicMetricCollector.recordLatency(eq("aws_exporter_milliseconds"), anyObject(SortedMap.class), anyLong()); - replayAll(); assertTrue(testClass.getTasksByCluster().isEmpty()); testClass.buildNewTargets(account, scrapeConfig, clusterWiseARNs, ecsClient); diff --git a/src/test/java/ai/asserts/aws/exporter/RedshiftExporterTest.java b/src/test/java/ai/asserts/aws/exporter/RedshiftExporterTest.java index d2b1aeb1..309f3e60 100644 --- a/src/test/java/ai/asserts/aws/exporter/RedshiftExporterTest.java +++ b/src/test/java/ai/asserts/aws/exporter/RedshiftExporterTest.java @@ -17,7 +17,6 @@ import com.google.common.collect.ImmutableSet; import io.prometheus.client.Collector; import io.prometheus.client.CollectorRegistry; -import org.easymock.Capture; import org.easymock.EasyMockSupport; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -27,7 +26,6 @@ import software.amazon.awssdk.services.redshift.model.Tag; import software.amazon.awssdk.utils.ImmutableMap; -import java.util.Map; import java.util.Optional; import java.util.SortedMap; import java.util.TreeMap; @@ -36,7 +34,6 @@ import static ai.asserts.aws.MetricNameUtil.SCRAPE_LATENCY_METRIC; import static org.easymock.EasyMock.anyLong; import static org.easymock.EasyMock.anyObject; -import static org.easymock.EasyMock.anyString; import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.expectLastCall;