Skip to content

Commit

Permalink
Merge pull request #308 from asserts/radha/sc-16131
Browse files Browse the repository at this point in the history
Switch to Guava RateLimiter
  • Loading branch information
jradhakrishnan authored Jul 13, 2023
2 parents 6ebb3c3 + 686cd48 commit be0cde7
Show file tree
Hide file tree
Showing 61 changed files with 305 additions and 287 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,15 @@

import ai.asserts.aws.account.AccountTenantMapper;
import ai.asserts.aws.exporter.BasicMetricCollector;
import lombok.AllArgsConstructor;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.RateLimiter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.Semaphore;

import static ai.asserts.aws.MetricNameUtil.ASSERTS_ERROR_TYPE;
import static ai.asserts.aws.MetricNameUtil.SCRAPE_ACCOUNT_ID_LABEL;
Expand All @@ -28,28 +26,42 @@
import static java.lang.String.format;
import static java.util.stream.Collectors.joining;

@Component
@Slf4j
@AllArgsConstructor
public class RateLimiter {
@SuppressWarnings("UnstableApiUsage")
public class AWSApiCallRateLimiter {
private final BasicMetricCollector metricCollector;
private final AccountTenantMapper accountTenantMapper;
private final double defaultRateLimit;

private final ThreadLocal<Map<String, Integer>> apiCallCounts = ThreadLocal.withInitial(TreeMap::new);

private final Map<String, Semaphore> semaphores = new LinkedHashMap<>();
private final Map<String, RateLimiter> rateLimiters = new HashMap<>();

@VisibleForTesting
public AWSApiCallRateLimiter(BasicMetricCollector metricCollector, AccountTenantMapper accountTenantMapper) {
this(metricCollector, accountTenantMapper, 20);
}

public AWSApiCallRateLimiter(BasicMetricCollector metricCollector, AccountTenantMapper accountTenantMapper,
double defaultRateLimit) {
this.metricCollector = metricCollector;
this.accountTenantMapper = accountTenantMapper;
this.defaultRateLimit = defaultRateLimit;
}

public <K extends AWSAPICall<V>, V> V doWithRateLimit(String api, SortedMap<String, String> labels, K k) {
String accountId = labels.get(SCRAPE_ACCOUNT_ID_LABEL);
String region = labels.get(SCRAPE_REGION_LABEL);
String clientType = api.split("/")[0];
String regionKey = accountId + "/" + region;
String fullKey = regionKey + "/" + clientType;
Semaphore theSemaphore = semaphores.computeIfAbsent(fullKey, s -> new Semaphore(2));
String fullKey = regionKey + "/" + api;
RateLimiter rateLimiter = rateLimiters.computeIfAbsent(fullKey, s -> RateLimiter.create(defaultRateLimit));
long tick = System.currentTimeMillis();
String tenantName = accountTenantMapper.getTenantName(labels.get(SCRAPE_ACCOUNT_ID_LABEL));
try {
theSemaphore.acquire();
double waitTime = rateLimiter.acquire();
if (waitTime > 0.5) {
log.warn("Operation {} throttled for {} seconds", fullKey, waitTime);
}
Map<String, Integer> callCounts = apiCallCounts.get();
String operationName = labels.getOrDefault(SCRAPE_OPERATION_LABEL, "unknown");
String callCountKey = regionKey + "/" + operationName;
Expand All @@ -74,7 +86,6 @@ public <K extends AWSAPICall<V>, V> V doWithRateLimit(String api, SortedMap<Stri
labels.put(TENANT, tenantName);
}
metricCollector.recordLatency(SCRAPE_LATENCY_METRIC, labels, tick);
theSemaphore.release();
}
}

Expand Down
14 changes: 12 additions & 2 deletions src/main/java/ai/asserts/aws/AwsExporterBeanConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@
package ai.asserts.aws;

import ai.asserts.aws.account.AccountProvider;
import ai.asserts.aws.account.AccountTenantMapper;
import ai.asserts.aws.account.HekateDistributedAccountProvider;
import ai.asserts.aws.account.NoopAccountProvider;
import ai.asserts.aws.account.SingleInstanceAccountProvider;
import ai.asserts.aws.cluster.HekateCluster;
import ai.asserts.aws.exporter.AccountIDProvider;
import ai.asserts.aws.exporter.BasicMetricCollector;
import io.micrometer.core.instrument.MeterRegistry;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand All @@ -19,6 +22,13 @@
@Configuration
@SuppressWarnings("unused")
public class AwsExporterBeanConfiguration {
@Bean
public AWSApiCallRateLimiter getRateLimiter(BasicMetricCollector metricCollector,
AccountTenantMapper accountTenantMapper,
@Value("${aws_exporter.aws_api_calls_rate_limit:5}") double rateLimit) {
return new AWSApiCallRateLimiter(metricCollector, accountTenantMapper, rateLimit);
}

@Bean
@ConditionalOnProperty(name = "aws_exporter.tenant_mode", havingValue = "single", matchIfMissing = true)
public RestTemplate restTemplate() {
Expand All @@ -41,13 +51,13 @@ public TaskThreadPool awsAPICallsPool(MeterRegistry meterRegistry) {
}

@Bean
@ConditionalOnProperty(name = "aws_exporter.deployment_mode", havingValue = "single-tenant-distributed")
@ConditionalOnProperty(name = "aws_exporter.deployment_mode", havingValue = "single-tenant-distributed")
public HekateCluster hekateCluster() {
return new HekateCluster();
}

@Bean
@ConditionalOnProperty(name = "aws_exporter.deployment_mode", havingValue = "single-tenant-distributed")
@ConditionalOnProperty(name = "aws_exporter.deployment_mode", havingValue = "single-tenant-distributed")
public AccountProvider getDistributedAccountProvider(HekateCluster hekateCluster,
EnvironmentConfig environmentConfig,
AccountIDProvider accountIDProvider,
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/ai/asserts/aws/TaskExecutorUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
public class TaskExecutorUtil {
private final TaskThreadPool taskThreadPool;

private final RateLimiter rateLimiter;
private final AWSApiCallRateLimiter rateLimiter;
private static final ThreadLocal<String> tenantName = new ThreadLocal<>();


public TaskExecutorUtil(@Qualifier("aws-api-calls-thread-pool") TaskThreadPool taskThreadPool, RateLimiter rateLimiter) {
public TaskExecutorUtil(@Qualifier("aws-api-calls-thread-pool") TaskThreadPool taskThreadPool, AWSApiCallRateLimiter rateLimiter) {
this.taskThreadPool = taskThreadPool;
this.rateLimiter = rateLimiter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.stream.Collectors;

@Slf4j
@SuppressWarnings("UnstableApiUsage")
public class HekateDistributedAccountProvider implements AccountProvider {
private static final HashFunction hashFunction = Hashing.murmur3_128();
private final HekateCluster hekateCluster;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import ai.asserts.aws.AWSClientProvider;
import ai.asserts.aws.CollectionBuilderTask;
import ai.asserts.aws.RateLimiter;
import ai.asserts.aws.AWSApiCallRateLimiter;
import ai.asserts.aws.ScrapeConfigProvider;
import ai.asserts.aws.TaskExecutorUtil;
import ai.asserts.aws.EnvironmentConfig;
Expand Down Expand Up @@ -46,7 +46,7 @@
public class AlarmFetcher extends Collector implements InitializingBean {
public final CollectorRegistry collectorRegistry;
private final AccountProvider accountProvider;
private final RateLimiter rateLimiter;
private final AWSApiCallRateLimiter rateLimiter;
private final AWSClientProvider awsClientProvider;
private final AlarmMetricConverter alarmMetricConverter;
private final MetricSampleBuilder sampleBuilder;
Expand All @@ -59,7 +59,7 @@ public class AlarmFetcher extends Collector implements InitializingBean {
public AlarmFetcher(AccountProvider accountProvider,
AWSClientProvider awsClientProvider,
CollectorRegistry collectorRegistry,
RateLimiter rateLimiter,
AWSApiCallRateLimiter rateLimiter,
MetricSampleBuilder sampleBuilder,
AlarmMetricConverter alarmMetricConverter,
ScrapeConfigProvider scrapeConfigProvider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import ai.asserts.aws.AWSClientProvider;
import ai.asserts.aws.EnvironmentConfig;
import ai.asserts.aws.MetricNameUtil;
import ai.asserts.aws.RateLimiter;
import ai.asserts.aws.AWSApiCallRateLimiter;
import ai.asserts.aws.ScrapeConfigProvider;
import ai.asserts.aws.SimpleTenantTask;
import ai.asserts.aws.TaskExecutorUtil;
Expand Down Expand Up @@ -54,7 +54,7 @@ public class MetricQueryProvider {
private final ResourceTagHelper resourceTagHelper;
private final MetricQueryBuilder metricQueryBuilder;
private final Supplier<Map<String, Map<String, Map<Integer, List<MetricQuery>>>>> metricQueryCache;
private final RateLimiter rateLimiter;
private final AWSApiCallRateLimiter rateLimiter;
private final TaskExecutorUtil taskExecutorUtil;

public MetricQueryProvider(EnvironmentConfig environmentConfig,
Expand All @@ -65,7 +65,7 @@ public MetricQueryProvider(EnvironmentConfig environmentConfig,
AWSClientProvider awsClientProvider,
ResourceTagHelper resourceTagHelper,
MetricQueryBuilder metricQueryBuilder,
RateLimiter rateLimiter,
AWSApiCallRateLimiter rateLimiter,
TaskExecutorUtil taskExecutorUtil) {
this.environmentConfig = environmentConfig;
this.accountProvider = accountProvider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
public class QueryBatcher {
private final int queryCountLimit;

public QueryBatcher(@Value("${aws.metric.data.query.count.limit:500}") int queryCountLimit) {
public QueryBatcher(@Value("${aws_exporter.metric_data_query_limit:500}") int queryCountLimit) {
this.queryCountLimit = queryCountLimit;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import ai.asserts.aws.AWSClientProvider;
import ai.asserts.aws.MetricNameUtil;
import ai.asserts.aws.RateLimiter;
import ai.asserts.aws.AWSApiCallRateLimiter;
import ai.asserts.aws.CollectionBuilderTask;
import ai.asserts.aws.TaskExecutorUtil;
import ai.asserts.aws.account.AWSAccount;
Expand Down Expand Up @@ -51,7 +51,7 @@
public class ApiGatewayToLambdaBuilder extends Collector
implements MetricProvider, InitializingBean {
private final AWSClientProvider awsClientProvider;
private final RateLimiter rateLimiter;
private final AWSApiCallRateLimiter rateLimiter;
private final AccountProvider accountProvider;
private final MetricSampleBuilder metricSampleBuilder;
private final CollectorRegistry collectorRegistry;
Expand All @@ -65,7 +65,7 @@ public class ApiGatewayToLambdaBuilder extends Collector
private volatile List<MetricFamilySamples> apiResourceMetrics = new ArrayList<>();

public ApiGatewayToLambdaBuilder(AWSClientProvider awsClientProvider,
RateLimiter rateLimiter, AccountProvider accountProvider,
AWSApiCallRateLimiter rateLimiter, AccountProvider accountProvider,
MetricSampleBuilder metricSampleBuilder,
CollectorRegistry collectorRegistry, MetricNameUtil metricNameUtil,
TaskExecutorUtil taskExecutorUtil) {
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/ai/asserts/aws/exporter/DynamoDBExporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import ai.asserts.aws.AWSClientProvider;
import ai.asserts.aws.CollectionBuilderTask;
import ai.asserts.aws.RateLimiter;
import ai.asserts.aws.AWSApiCallRateLimiter;
import ai.asserts.aws.ScrapeConfigProvider;
import ai.asserts.aws.TagUtil;
import ai.asserts.aws.TaskExecutorUtil;
Expand Down Expand Up @@ -43,7 +43,7 @@ public class DynamoDBExporter extends Collector implements InitializingBean {
public final CollectorRegistry collectorRegistry;
private final AccountProvider accountProvider;
private final AWSClientProvider awsClientProvider;
private final RateLimiter rateLimiter;
private final AWSApiCallRateLimiter rateLimiter;
private final MetricSampleBuilder sampleBuilder;
private final ResourceTagHelper resourceTagHelper;
private final TagUtil tagUtil;
Expand All @@ -53,7 +53,7 @@ public class DynamoDBExporter extends Collector implements InitializingBean {

public DynamoDBExporter(
AccountProvider accountProvider, AWSClientProvider awsClientProvider, CollectorRegistry collectorRegistry,
RateLimiter rateLimiter, MetricSampleBuilder sampleBuilder, ResourceTagHelper resourceTagHelper,
AWSApiCallRateLimiter rateLimiter, MetricSampleBuilder sampleBuilder, ResourceTagHelper resourceTagHelper,
TagUtil tagUtil, TaskExecutorUtil taskExecutorUtil, ScrapeConfigProvider scrapeConfigProvider) {
this.accountProvider = accountProvider;
this.awsClientProvider = awsClientProvider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import ai.asserts.aws.AWSClientProvider;
import ai.asserts.aws.CollectionBuilderTask;
import ai.asserts.aws.RateLimiter;
import ai.asserts.aws.AWSApiCallRateLimiter;
import ai.asserts.aws.ScrapeConfigProvider;
import ai.asserts.aws.TagUtil;
import ai.asserts.aws.TaskExecutorUtil;
Expand Down Expand Up @@ -58,7 +58,7 @@ public class EC2ToEBSVolumeExporter extends Collector implements MetricProvider,
private final AWSClientProvider awsClientProvider;
private final MetricSampleBuilder metricSampleBuilder;
private final CollectorRegistry collectorRegistry;
private final RateLimiter rateLimiter;
private final AWSApiCallRateLimiter rateLimiter;
private final TagUtil tagUtil;
private final ECSServiceDiscoveryExporter ecsServiceDiscoveryExporter;
private final TaskExecutorUtil taskExecutorUtil;
Expand All @@ -69,7 +69,7 @@ public class EC2ToEBSVolumeExporter extends Collector implements MetricProvider,

public EC2ToEBSVolumeExporter(AccountProvider accountProvider,
AWSClientProvider awsClientProvider, MetricSampleBuilder metricSampleBuilder,
CollectorRegistry collectorRegistry, RateLimiter rateLimiter, TagUtil tagUtil,
CollectorRegistry collectorRegistry, AWSApiCallRateLimiter rateLimiter, TagUtil tagUtil,
ECSServiceDiscoveryExporter ecsServiceDiscoveryExporter,
TaskExecutorUtil taskExecutorUtil, ScrapeConfigProvider scrapeConfigProvider) {
this.accountProvider = accountProvider;
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/ai/asserts/aws/exporter/ECSClusterProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import ai.asserts.aws.AWSClientProvider;
import ai.asserts.aws.account.AWSAccount;
import ai.asserts.aws.RateLimiter;
import ai.asserts.aws.AWSApiCallRateLimiter;
import ai.asserts.aws.resource.Resource;
import ai.asserts.aws.resource.ResourceMapper;
import com.google.common.base.Suppliers;
Expand Down Expand Up @@ -39,11 +39,11 @@
@Slf4j
public class ECSClusterProvider {
private final AWSClientProvider awsClientProvider;
private final RateLimiter rateLimiter;
private final AWSApiCallRateLimiter rateLimiter;
private final ResourceMapper resourceMapper;
private final Map<String, Map<String, Supplier<Set<Resource>>>> clusterProviders = new ConcurrentHashMap<>();

public ECSClusterProvider(AWSClientProvider awsClientProvider, RateLimiter rateLimiter,
public ECSClusterProvider(AWSClientProvider awsClientProvider, AWSApiCallRateLimiter rateLimiter,
ResourceMapper resourceMapper) {
this.awsClientProvider = awsClientProvider;
this.rateLimiter = rateLimiter;
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/ai/asserts/aws/exporter/ECSTaskProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
package ai.asserts.aws.exporter;

import ai.asserts.aws.AWSClientProvider;
import ai.asserts.aws.RateLimiter;
import ai.asserts.aws.AWSApiCallRateLimiter;
import ai.asserts.aws.ScrapeConfigProvider;
import ai.asserts.aws.SimpleTenantTask;
import ai.asserts.aws.SnakeCaseUtil;
Expand Down Expand Up @@ -63,7 +63,7 @@ public class ECSTaskProvider extends Collector implements Runnable, Initializing
private final AWSClientProvider awsClientProvider;
private final ScrapeConfigProvider scrapeConfigProvider;
private final AccountProvider accountProvider;
private final RateLimiter rateLimiter;
private final AWSApiCallRateLimiter rateLimiter;
private final ResourceMapper resourceMapper;
private final ECSClusterProvider ecsClusterProvider;
private final ECSTaskUtil ecsTaskUtil;
Expand All @@ -78,11 +78,11 @@ public class ECSTaskProvider extends Collector implements Runnable, Initializing
private final Map<Resource, Map<Resource, List<StaticConfig>>> tasksByCluster = new ConcurrentHashMap<>();

public ECSTaskProvider(AWSClientProvider awsClientProvider, ScrapeConfigProvider scrapeConfigProvider,
AccountProvider accountProvider, RateLimiter rateLimiter, ResourceMapper resourceMapper,
AccountProvider accountProvider, AWSApiCallRateLimiter rateLimiter, ResourceMapper resourceMapper,
ECSClusterProvider ecsClusterProvider, ECSTaskUtil ecsTaskUtil,
MetricSampleBuilder sampleBuilder, CollectorRegistry collectorRegistry,
TaskExecutorUtil taskExecutorUtil, SnakeCaseUtil snakeCaseUtil,
@Value("${aws.ecs.describeTasks.batch_size:100}") int describeTaskBatchSize) {
@Value("${aws_exporter.ecs_describeTasks_batch_size:100}") int describeTaskBatchSize) {
this.awsClientProvider = awsClientProvider;
this.scrapeConfigProvider = scrapeConfigProvider;
this.accountProvider = accountProvider;
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/ai/asserts/aws/exporter/ECSTaskUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
package ai.asserts.aws.exporter;

import ai.asserts.aws.AWSClientProvider;
import ai.asserts.aws.RateLimiter;
import ai.asserts.aws.AWSApiCallRateLimiter;
import ai.asserts.aws.TagUtil;
import ai.asserts.aws.TaskExecutorUtil;
import ai.asserts.aws.account.AWSAccount;
Expand Down Expand Up @@ -57,7 +57,7 @@ public class ECSTaskUtil {
public static final String INSTALLED_ENV_NAME = "INSTALL_ENV_NAME";
private final AWSClientProvider awsClientProvider;
private final ResourceMapper resourceMapper;
private final RateLimiter rateLimiter;
private final AWSApiCallRateLimiter rateLimiter;
private final TagUtil tagUtil;
private final TaskExecutorUtil taskExecutorUtil;
private final String envName;
Expand All @@ -75,7 +75,7 @@ public class ECSTaskUtil {
public static final String PROMETHEUS_METRIC_PATH_DOCKER_LABEL = "PROMETHEUS_EXPORTER_PATH";


public ECSTaskUtil(AWSClientProvider awsClientProvider, ResourceMapper resourceMapper, RateLimiter rateLimiter,
public ECSTaskUtil(AWSClientProvider awsClientProvider, ResourceMapper resourceMapper, AWSApiCallRateLimiter rateLimiter,
TagUtil tagUtil, TaskExecutorUtil taskExecutorUtil) {
this.awsClientProvider = awsClientProvider;
this.resourceMapper = resourceMapper;
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/ai/asserts/aws/exporter/EMRExporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import ai.asserts.aws.AWSClientProvider;
import ai.asserts.aws.CollectionBuilderTask;
import ai.asserts.aws.RateLimiter;
import ai.asserts.aws.AWSApiCallRateLimiter;
import ai.asserts.aws.TaskExecutorUtil;
import ai.asserts.aws.account.AWSAccount;
import ai.asserts.aws.account.AccountProvider;
Expand Down Expand Up @@ -38,14 +38,14 @@ public class EMRExporter extends Collector implements InitializingBean {
public final CollectorRegistry collectorRegistry;
private final AccountProvider accountProvider;
private final AWSClientProvider awsClientProvider;
private final RateLimiter rateLimiter;
private final AWSApiCallRateLimiter rateLimiter;
private final MetricSampleBuilder sampleBuilder;
private final TaskExecutorUtil taskExecutorUtil;
private volatile List<MetricFamilySamples> metricFamilySamples = new ArrayList<>();

public EMRExporter(
AccountProvider accountProvider, AWSClientProvider awsClientProvider, CollectorRegistry collectorRegistry,
RateLimiter rateLimiter, MetricSampleBuilder sampleBuilder, TaskExecutorUtil taskExecutorUtil) {
AWSApiCallRateLimiter rateLimiter, MetricSampleBuilder sampleBuilder, TaskExecutorUtil taskExecutorUtil) {
this.accountProvider = accountProvider;
this.awsClientProvider = awsClientProvider;
this.collectorRegistry = collectorRegistry;
Expand Down
Loading

0 comments on commit be0cde7

Please sign in to comment.