From 0475f9bab192514e83d5d41b0eced3427e213795 Mon Sep 17 00:00:00 2001 From: Poorna Reddy Date: Mon, 20 Aug 2018 09:44:14 -0700 Subject: [PATCH] Rate limiting Implementation using Redis constructs (#724) * Rate limitng of Tasks using Redis constructs * Javadoc build fix * UI changes to display the rate limitng * Java docs and loggers * Metrics and updated loggers to info --- .../conductor/common/metadata/tasks/Task.java | 25 +++++-- .../common/metadata/tasks/TaskDef.java | 69 ++++++++++++------- .../conductor/core/config/CoreModule.java | 7 +- .../core/execution/WorkflowExecutor.java | 9 +-- .../mapper/UserDefinedTaskMapper.java | 3 +- .../netflix/conductor/dao/ExecutionDAO.java | 8 +++ .../netflix/conductor/metrics/Monitors.java | 4 ++ .../service/DummyRateLimitingService.java | 20 ------ .../conductor/service/MetadataService.java | 11 +-- .../service/RateLimitingService.java | 30 -------- .../core/execution/TestWorkflowExecutor.java | 5 +- .../dao/mysql/MySQLExecutionDAO.java | 14 +++- .../conductor/dao/dynomite/DynoProxy.java | 4 ++ .../dao/dynomite/RedisExecutionDAO.java | 57 ++++++++++++++- .../dao/dynomite/RedisExecutionDAOTest.java | 28 ++++++++ .../workflow/tasks/TasksMetaList.js | 21 ++++++ 16 files changed, 205 insertions(+), 110 deletions(-) delete mode 100644 core/src/main/java/com/netflix/conductor/service/DummyRateLimitingService.java delete mode 100644 core/src/main/java/com/netflix/conductor/service/RateLimitingService.java diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/Task.java b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/Task.java index fa61f81510..20b64bf381 100644 --- a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/Task.java +++ b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/Task.java @@ -128,7 +128,9 @@ public boolean isRetriable() { private String domain; - private int rateLimitPerSecond; + private int rateLimitPerFrequency; + + private int rateLimitFrequencyInSeconds; public Task() { @@ -541,12 +543,20 @@ public void setDomain(String domain) { this.domain = domain; } - public int getRateLimitPerSecond() { - return rateLimitPerSecond; + public int getRateLimitPerFrequency() { + return rateLimitPerFrequency; + } + + public void setRateLimitPerFrequency(int rateLimitPerFrequency) { + this.rateLimitPerFrequency = rateLimitPerFrequency; + } + + public int getRateLimitFrequencyInSeconds() { + return rateLimitFrequencyInSeconds; } - public void setRateLimitPerSecond(int rateLimitPerSecond) { - this.rateLimitPerSecond = rateLimitPerSecond; + public void setRateLimitFrequencyInSeconds(int rateLimitFrequencyInSeconds) { + this.rateLimitFrequencyInSeconds = rateLimitFrequencyInSeconds; } public Task copy() { @@ -571,7 +581,8 @@ public Task copy() { copy.setWorkerId(workerId); copy.setWorkflowTask(workflowTask); copy.setDomain(domain); - copy.setRateLimitPerSecond(rateLimitPerSecond); + copy.setRateLimitPerFrequency(rateLimitPerFrequency); + copy.setRateLimitFrequencyInSeconds(rateLimitFrequencyInSeconds); return copy; } @@ -596,6 +607,8 @@ public String toString() { ", retriedTaskId='" + retriedTaskId + '\'' + ", retried=" + retried + ", callbackFromWorker=" + callbackFromWorker + + ", rateLimitFrequencyInSeconds=" + rateLimitFrequencyInSeconds + + ", rateLimitPerFrequency=" + rateLimitPerFrequency + ", responseTimeoutSeconds=" + responseTimeoutSeconds + ", workflowInstanceId='" + workflowInstanceId + '\'' + ", taskId='" + taskId + '\'' + diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskDef.java b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskDef.java index fabe9b36df..30e2eea075 100644 --- a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskDef.java +++ b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskDef.java @@ -31,39 +31,43 @@ * Defines a workflow task definition */ public class TaskDef extends Auditable { - - public enum TimeoutPolicy {RETRY, TIME_OUT_WF, ALERT_ONLY} - - public enum RetryLogic {FIXED, EXPONENTIAL_BACKOFF} - + + + + public enum TimeoutPolicy {RETRY, TIME_OUT_WF, ALERT_ONLY;} + + + public enum RetryLogic {FIXED, EXPONENTIAL_BACKOFF;} private static final int ONE_HOUR = 60 * 60; - + /** - * Unique name identifying the task. The name is unique across + * Unique name identifying the task. The name is unique across */ private String name; - + private String description; - + private int retryCount = 3; // Default private long timeoutSeconds; private List inputKeys = new ArrayList(); - + private List outputKeys = new ArrayList(); - + private TimeoutPolicy timeoutPolicy = TimeoutPolicy.TIME_OUT_WF; - + private RetryLogic retryLogic = RetryLogic.FIXED; - + private int retryDelaySeconds = 60; - + private int responseTimeoutSeconds = ONE_HOUR; - + private Integer concurrentExecLimit; - private Integer rateLimitPerSecond; + private Integer rateLimitPerFrequency; + + private Integer rateLimitFrequencyInSeconds; private Map inputTemplate = new HashMap<>(); @@ -240,18 +244,35 @@ public Map getInputTemplate() { /** * - * @return rateLimitPerSecond The max number of tasks that will be allowed to be executed per second at any given point of time. + * @return rateLimitPerFrequency The max number of tasks that will be allowed to be executed per rateLimitFrequencyInSeconds. + */ + public Integer getRateLimitPerFrequency() { + return rateLimitPerFrequency == null ? 0 : rateLimitPerFrequency; + } + + /** + * + * @param rateLimitPerFrequency The max number of tasks that will be allowed to be executed per rateLimitFrequencyInSeconds. + * Setting the value to 0 removes the rate limit + */ + public void setRateLimitPerFrequency(Integer rateLimitPerFrequency) { + this.rateLimitPerFrequency = rateLimitPerFrequency; + } + + /** + * @return rateLimitFrequencyInSeconds: The time bucket that is used to rate limit tasks based on {@link #getRateLimitPerFrequency()} + * If null or not set, then defaults to 1 second */ - public Integer getRateLimitPerSecond() { - return rateLimitPerSecond == null ? 0 : rateLimitPerSecond; + public Integer getRateLimitFrequencyInSeconds() { + return rateLimitFrequencyInSeconds == null ? 1 : rateLimitFrequencyInSeconds; } /** * - * @param rateLimitPerSecond The max number of tasks that will be allowed to be executed per second at any given point of time. Setting the value to 0 removes the rate limit + * @param rateLimitFrequencyInSeconds: The time window/bucket for which the rate limit needs to be applied. This will only have affect if {@link #getRateLimitPerFrequency()} is greater than zero */ - public void setRateLimitPerSecond(Integer rateLimitPerSecond) { - this.rateLimitPerSecond = rateLimitPerSecond; + public void setRateLimitFrequencyInSeconds(Integer rateLimitFrequencyInSeconds) { + this.rateLimitFrequencyInSeconds = rateLimitFrequencyInSeconds; } /** @@ -306,7 +327,7 @@ public boolean equals(Object o) { getTimeoutPolicy() == taskDef.getTimeoutPolicy() && getRetryLogic() == taskDef.getRetryLogic() && Objects.equals(getConcurrentExecLimit(), taskDef.getConcurrentExecLimit()) && - Objects.equals(getRateLimitPerSecond(), taskDef.getRateLimitPerSecond()) && + Objects.equals(getRateLimitPerFrequency(), taskDef.getRateLimitPerFrequency()) && Objects.equals(getInputTemplate(), taskDef.getInputTemplate()); } @@ -315,6 +336,6 @@ public int hashCode() { return Objects.hash(getName(), getDescription(), getRetryCount(), getTimeoutSeconds(), getInputKeys(), getOutputKeys(), getTimeoutPolicy(), getRetryLogic(), getRetryDelaySeconds(), - getResponseTimeoutSeconds(), getConcurrentExecLimit(), getRateLimitPerSecond(), getInputTemplate()); + getResponseTimeoutSeconds(), getConcurrentExecLimit(), getRateLimitPerFrequency(), getInputTemplate()); } } diff --git a/core/src/main/java/com/netflix/conductor/core/config/CoreModule.java b/core/src/main/java/com/netflix/conductor/core/config/CoreModule.java index 17f3816a6f..7c504e1450 100644 --- a/core/src/main/java/com/netflix/conductor/core/config/CoreModule.java +++ b/core/src/main/java/com/netflix/conductor/core/config/CoreModule.java @@ -33,14 +33,14 @@ import com.netflix.conductor.core.events.queue.dyno.DynoEventQueueProvider; import com.netflix.conductor.core.execution.ParametersUtils; import com.netflix.conductor.core.execution.mapper.DecisionTaskMapper; -import com.netflix.conductor.core.execution.mapper.ForkJoinDynamicTaskMapper; import com.netflix.conductor.core.execution.mapper.DynamicTaskMapper; import com.netflix.conductor.core.execution.mapper.EventTaskMapper; +import com.netflix.conductor.core.execution.mapper.ForkJoinDynamicTaskMapper; import com.netflix.conductor.core.execution.mapper.ForkJoinTaskMapper; import com.netflix.conductor.core.execution.mapper.JoinTaskMapper; import com.netflix.conductor.core.execution.mapper.SimpleTaskMapper; -import com.netflix.conductor.core.execution.mapper.TaskMapper; import com.netflix.conductor.core.execution.mapper.SubWorkflowTaskMapper; +import com.netflix.conductor.core.execution.mapper.TaskMapper; import com.netflix.conductor.core.execution.mapper.UserDefinedTaskMapper; import com.netflix.conductor.core.execution.mapper.WaitTaskMapper; import com.netflix.conductor.core.execution.tasks.Event; @@ -49,8 +49,6 @@ import com.netflix.conductor.core.execution.tasks.Wait; import com.netflix.conductor.dao.MetadataDAO; import com.netflix.conductor.dao.QueueDAO; -import com.netflix.conductor.service.DummyRateLimitingService; -import com.netflix.conductor.service.RateLimitingService; /** @@ -68,7 +66,6 @@ protected void configure() { bind(SubWorkflow.class).asEagerSingleton(); bind(Wait.class).asEagerSingleton(); bind(Event.class).asEagerSingleton(); - bind(RateLimitingService.class).to(DummyRateLimitingService.class); } @Provides diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java index 8df7ef19fc..630b4c2504 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java @@ -41,7 +41,6 @@ import com.netflix.conductor.dao.MetadataDAO; import com.netflix.conductor.dao.QueueDAO; import com.netflix.conductor.metrics.Monitors; -import com.netflix.conductor.service.RateLimitingService; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -88,7 +87,6 @@ public class WorkflowExecutor { private Configuration config; - private RateLimitingService rateLimitingService; private ParametersUtils parametersUtils = new ParametersUtils(); @@ -98,13 +96,12 @@ public class WorkflowExecutor { @Inject public WorkflowExecutor(DeciderService deciderService, MetadataDAO metadataDAO, ExecutionDAO executionDAO, - QueueDAO queueDAO, Configuration config, RateLimitingService rateLimitingService) { + QueueDAO queueDAO, Configuration config) { this.deciderService = deciderService; this.metadataDAO = metadataDAO; this.executionDAO = executionDAO; this.queueDAO = queueDAO; this.config = config; - this.rateLimitingService = rateLimitingService; activeWorkerLastPollnSecs = config.getIntProperty("tasks.active.worker.lastpoll", 10); } @@ -737,8 +734,8 @@ public void executeSystemTask(WorkflowSystemTask systemTask, String taskId, int logger.warn("Concurrent Execution limited for {}:{}", taskId, task.getTaskDefName()); return; } - if (task.getRateLimitPerSecond() > 0 && !rateLimitingService.evaluateRateLimitBoundary(task)) { - logger.warn("RateLimit Execution limited for {}:{}, limit:{}", taskId, task.getTaskDefName(), task.getRateLimitPerSecond()); + if (task.getRateLimitPerFrequency() > 0 && executionDAO.exceedsRateLimitPerFrequency(task)) { + logger.warn("RateLimit Execution limited for {}:{}, limit:{}", taskId, task.getTaskDefName(), task.getRateLimitPerFrequency()); return; } } diff --git a/core/src/main/java/com/netflix/conductor/core/execution/mapper/UserDefinedTaskMapper.java b/core/src/main/java/com/netflix/conductor/core/execution/mapper/UserDefinedTaskMapper.java index 779823fdc3..1890bfee88 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/mapper/UserDefinedTaskMapper.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/mapper/UserDefinedTaskMapper.java @@ -88,7 +88,8 @@ public List getMappedTasks(TaskMapperContext taskMapperContext) throws Ter userDefinedTask.setRetryCount(retryCount); userDefinedTask.setCallbackAfterSeconds(taskToSchedule.getStartDelay()); userDefinedTask.setWorkflowTask(taskToSchedule); - userDefinedTask.setRateLimitPerSecond(taskDefinition.getRateLimitPerSecond()); + userDefinedTask.setRateLimitPerFrequency(taskDefinition.getRateLimitPerFrequency()); + userDefinedTask.setRateLimitFrequencyInSeconds(taskDefinition.getRateLimitFrequencyInSeconds()); return Collections.singletonList(userDefinedTask); } diff --git a/core/src/main/java/com/netflix/conductor/dao/ExecutionDAO.java b/core/src/main/java/com/netflix/conductor/dao/ExecutionDAO.java index c79481c86e..411ef5dc8e 100644 --- a/core/src/main/java/com/netflix/conductor/dao/ExecutionDAO.java +++ b/core/src/main/java/com/netflix/conductor/dao/ExecutionDAO.java @@ -80,6 +80,14 @@ public interface ExecutionDAO { * @see TaskDef#concurrencyLimit() */ boolean exceedsInProgressLimit(Task task); + + /** + * Checks if the Task is rate limited or not based on the {@link Task#getRateLimitPerFrequency()} and {@link Task#getRateLimitFrequencyInSeconds()} + * @param task: which needs to be evaluated whether it is rateLimited or not + * @return true: If the {@link Task} is rateLimited + * false: If the {@link Task} is not rateLimited + */ + boolean exceedsRateLimitPerFrequency(Task task); /** * diff --git a/core/src/main/java/com/netflix/conductor/metrics/Monitors.java b/core/src/main/java/com/netflix/conductor/metrics/Monitors.java index 2c4736c0e8..34f8fc6327 100644 --- a/core/src/main/java/com/netflix/conductor/metrics/Monitors.java +++ b/core/src/main/java/com/netflix/conductor/metrics/Monitors.java @@ -220,6 +220,10 @@ public static void recordTaskRateLimited(String taskDefName, int limit) { gauge(classQualifier, "task_rate_limited", limit, "taskType", taskDefName); } + public static void recordTaskConcurrentExecutionLimited(String taskDefName, int limit) { + gauge(classQualifier, "task_concurrent_execution_limited", limit, "taskType", taskDefName); + } + public static void recordEventQueueMessagesProcessed(String queueType, String queueName, int count) { getCounter(classQualifier, "event_queue_messages_processed", "queueType", queueType, "queueName", queueName).increment(count); } diff --git a/core/src/main/java/com/netflix/conductor/service/DummyRateLimitingService.java b/core/src/main/java/com/netflix/conductor/service/DummyRateLimitingService.java deleted file mode 100644 index d472dead92..0000000000 --- a/core/src/main/java/com/netflix/conductor/service/DummyRateLimitingService.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.netflix.conductor.service; - -import com.netflix.conductor.common.metadata.tasks.Task; -import com.netflix.conductor.common.metadata.tasks.TaskDef; - -/** - * A Dummy/No-op implementation of the {@link RateLimitingService} - */ -public class DummyRateLimitingService implements RateLimitingService{ - - @Override - public boolean evaluateRateLimitBoundary(Task task) { - return true; - } - - @Override - public void updateRateLimitRules(TaskDef taskDef) { - - } -} diff --git a/core/src/main/java/com/netflix/conductor/service/MetadataService.java b/core/src/main/java/com/netflix/conductor/service/MetadataService.java index f532266dd0..6faae594ff 100644 --- a/core/src/main/java/com/netflix/conductor/service/MetadataService.java +++ b/core/src/main/java/com/netflix/conductor/service/MetadataService.java @@ -18,7 +18,6 @@ */ package com.netflix.conductor.service; -import com.google.common.base.Preconditions; import com.netflix.conductor.annotations.Trace; import com.netflix.conductor.common.metadata.events.EventHandler; import com.netflix.conductor.common.metadata.tasks.TaskDef; @@ -44,12 +43,10 @@ public class MetadataService { private MetadataDAO metadataDAO; - private RateLimitingService rateLimitingService; @Inject - public MetadataService(MetadataDAO metadataDAO, RateLimitingService rateLimitingService) { + public MetadataService(MetadataDAO metadataDAO) { this.metadataDAO = metadataDAO; - this.rateLimitingService = rateLimitingService; } /** @@ -63,9 +60,6 @@ public void registerTaskDef(List taskDefinitions) { taskDefinition.setUpdatedBy(null); taskDefinition.setUpdateTime(null); metadataDAO.createTaskDef(taskDefinition); - if(taskDefinition.getRateLimitPerSecond() != 0) { - rateLimitingService.updateRateLimitRules(taskDefinition); - } } } @@ -81,9 +75,6 @@ public void updateTaskDef(TaskDef taskDefinition) { taskDefinition.setUpdatedBy(WorkflowContext.get().getClientApp()); taskDefinition.setUpdateTime(System.currentTimeMillis()); metadataDAO.updateTaskDef(taskDefinition); - if(taskDefinition.getRateLimitPerSecond() != 0) { - rateLimitingService.updateRateLimitRules(taskDefinition); - } } /** diff --git a/core/src/main/java/com/netflix/conductor/service/RateLimitingService.java b/core/src/main/java/com/netflix/conductor/service/RateLimitingService.java deleted file mode 100644 index c7e182c3f2..0000000000 --- a/core/src/main/java/com/netflix/conductor/service/RateLimitingService.java +++ /dev/null @@ -1,30 +0,0 @@ -package com.netflix.conductor.service; - -import com.netflix.conductor.common.metadata.tasks.Task; -import com.netflix.conductor.common.metadata.tasks.TaskDef; - -/** - * This interface is intended provide rate limiting ability to all the System Tasks - * Provides the abstraction to register rate limiting at a {@link TaskDef} level and - * the ability to evaluate if the {@link Task} has breached the configured rate limit - */ -public interface RateLimitingService { - - /** - * This method needs to answer if a particular task is eligible for execution or not. - * Intended to be evoked before execution of the task - * - * @param task an instance of {@link Task} which is evaluated to see if the rate limit boundary has been breached - * @return true: if the task execution rate limit boundary has not been breached and is ok to continue with the task execution - * false: if the task execution rate limit boundary has been breached and is not ok to continue with the task execution - */ - boolean evaluateRateLimitBoundary(Task task); - - /** - * This method is an abstraction to save the {@link TaskDef#rateLimitPerSecond} in case if it is greater than 0 - * Once the rateLimit configuration is saved, the configuration will be applied across all the Workflows that use this TaskDef - * - * @param taskDef: An instance of {@link TaskDef} which has the {@link TaskDef#rateLimitPerSecond} which will be used to configure the rate limit rules. - */ - void updateRateLimitRules(TaskDef taskDef); -} diff --git a/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java b/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java index 258f4d802c..9013dc509a 100644 --- a/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java +++ b/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java @@ -41,8 +41,6 @@ import com.netflix.conductor.dao.ExecutionDAO; import com.netflix.conductor.dao.MetadataDAO; import com.netflix.conductor.dao.QueueDAO; -import com.netflix.conductor.service.DummyRateLimitingService; -import com.netflix.conductor.service.RateLimitingService; import org.junit.Before; import org.junit.Test; @@ -94,8 +92,7 @@ public void init() { taskMappers.put("EVENT", new EventTaskMapper(parametersUtils)); taskMappers.put("WAIT", new WaitTaskMapper(parametersUtils)); DeciderService deciderService = new DeciderService(metadataDAO, taskMappers); - RateLimitingService rateLimitingService = new DummyRateLimitingService(); - workflowExecutor = new WorkflowExecutor(deciderService, metadataDAO, executionDAO, queueDAO, config, rateLimitingService); + workflowExecutor = new WorkflowExecutor(deciderService, metadataDAO, executionDAO, queueDAO, config); } @Test diff --git a/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/MySQLExecutionDAO.java b/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/MySQLExecutionDAO.java index af7c819857..fa8cc56e8c 100644 --- a/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/MySQLExecutionDAO.java +++ b/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/MySQLExecutionDAO.java @@ -127,6 +127,16 @@ public void updateTask(Task task) { withTransaction(connection -> updateTask(connection, task)); } + /** + * This is a dummy implementation and this feature is not for Mysql backed Conductor + * @param task: which needs to be evaluated whether it is rateLimited or not + * @return + */ + @Override + public boolean exceedsRateLimitPerFrequency(Task task) { + return false; + } + @Override public boolean exceedsInProgressLimit(Task task) { TaskDef taskDef = metadata.getTaskDef(task.getTaskDefName()); @@ -142,7 +152,7 @@ public boolean exceedsInProgressLimit(Task task) { long current = getInProgressTaskCount(task.getTaskDefName()); if (current >= limit) { - Monitors.recordTaskRateLimited(task.getTaskDefName(), limit); + Monitors.recordTaskConcurrentExecutionLimited(task.getTaskDefName(), limit); return true; } @@ -158,7 +168,7 @@ public boolean exceedsInProgressLimit(Task task) { if (rateLimited) { logger.info("Task execution count limited. {}, limit {}, current {}", task.getTaskDefName(), limit, getInProgressTaskCount(task.getTaskDefName())); - Monitors.recordTaskRateLimited(task.getTaskDefName(), limit); + Monitors.recordTaskConcurrentExecutionLimited(task.getTaskDefName(), limit); } return rateLimited; diff --git a/redis-persistence/src/main/java/com/netflix/conductor/dao/dynomite/DynoProxy.java b/redis-persistence/src/main/java/com/netflix/conductor/dao/dynomite/DynoProxy.java index 11c90e2759..2899d10b79 100644 --- a/redis-persistence/src/main/java/com/netflix/conductor/dao/dynomite/DynoProxy.java +++ b/redis-persistence/src/main/java/com/netflix/conductor/dao/dynomite/DynoProxy.java @@ -106,6 +106,10 @@ public Long zrem(String key, String member) { return dynoClient.zrem(key, member); } + public long zremrangeByScore(String key, String start, String end) { return dynoClient.zremrangeByScore(key, start, end);} + + public long zcount(String key, double min, double max) { return dynoClient.zcount(key, min, max);} + public String set(String key, String value) { String retVal = dynoClient.set(key, value); return retVal; diff --git a/redis-persistence/src/main/java/com/netflix/conductor/dao/dynomite/RedisExecutionDAO.java b/redis-persistence/src/main/java/com/netflix/conductor/dao/dynomite/RedisExecutionDAO.java index 6e5518a6be..aa3aa65cf6 100644 --- a/redis-persistence/src/main/java/com/netflix/conductor/dao/dynomite/RedisExecutionDAO.java +++ b/redis-persistence/src/main/java/com/netflix/conductor/dao/dynomite/RedisExecutionDAO.java @@ -67,6 +67,7 @@ public class RedisExecutionDAO extends BaseDynoDAO implements ExecutionDAO { private static final String RAW_JSON_FIELD = "rawJSON"; // Keys Families private static final String TASK_LIMIT_BUCKET = "TASK_LIMIT_BUCKET"; + private static final String TASK_RATE_LIMIT_BUCKET = "TASK_RATE_LIMIT_BUCKET"; private final static String IN_PROGRESS_TASKS = "IN_PROGRESS_TASKS"; private final static String TASKS_IN_PROGRESS_STATUS = "TASKS_IN_PROGRESS_STATUS"; //Tasks which are in IN_PROGRESS status. private final static String WORKFLOW_TO_TASKS = "WORKFLOW_TO_TASKS"; @@ -244,6 +245,58 @@ public void updateTask(Task task) { indexDAO.indexTask(task); } + /** + * This method evaluates if the {@link Task} is rate limited or not based on {@link Task#getRateLimitPerFrequency()} + * and {@link Task#getRateLimitFrequencyInSeconds()} + * + * The rate limiting is implemented using the Redis constructs of sorted set and TTL of each element in the rate limited bucket. + *
    + *
  • All the entries that are in the not in the frequency bucket are cleaned up by leveraging {@link DynoProxy#zremrangeByScore(String, String, String)}, + * this is done to make the next step of evaluation efficient
  • + *
  • A current count(tasks executed within the frequency) is calculated based on the current time and the beginning of the rate limit frequency time(which is current time - {@link Task#getRateLimitFrequencyInSeconds()} in millis), + * this is achieved by using {@link DynoProxy#zcount(String, double, double)}
  • + *
  • Once the count is calculated then a evaluation is made to determine if it is within the bounds of {@link Task#getRateLimitPerFrequency()}, if so the count is increased and an expiry TTL is added to the entry
  • + *
+ * + * @param task: which needs to be evaluated whether it is rateLimited or not + * @return true: If the {@link Task} is rateLimited + * false: If the {@link Task} is not rateLimited + */ + @Override + public boolean exceedsRateLimitPerFrequency(Task task) { + int rateLimitPerFrequency = task.getRateLimitPerFrequency(); + int rateLimitFrequencyInSeconds = task.getRateLimitFrequencyInSeconds(); + if (rateLimitPerFrequency <= 0 && rateLimitFrequencyInSeconds <=0) { + logger.debug("Rate limit not applied to the Task: {} either rateLimitPerFrequency: {} or rateLimitFrequencyInSeconds: {} is 0 or less", + task, rateLimitPerFrequency, rateLimitFrequencyInSeconds); + return false; + } else { + logger.debug("Evaluating rate limiting for Task: {} with rateLimitPerFrequency: {} and rateLimitFrequencyInSeconds: {}", + task, rateLimitPerFrequency, rateLimitFrequencyInSeconds); + long currentTimeEpochMillis = System.currentTimeMillis(); + long currentTimeEpochMinusRateLimitBucket = currentTimeEpochMillis - (rateLimitFrequencyInSeconds * 1000); + String key = nsKey(TASK_RATE_LIMIT_BUCKET, task.getTaskDefName()); + dynoClient.zremrangeByScore(key, "-inf", String.valueOf(currentTimeEpochMinusRateLimitBucket)); + int currentBucketCount = Math.toIntExact( + dynoClient.zcount(key, + currentTimeEpochMinusRateLimitBucket, + currentTimeEpochMillis)); + + if (currentBucketCount < rateLimitPerFrequency) { + dynoClient.zadd(key, currentTimeEpochMillis, String.valueOf(currentTimeEpochMillis)); + dynoClient.expire(key, rateLimitFrequencyInSeconds); + logger.info("Task: {} with rateLimitPerFrequency: {} and rateLimitFrequencyInSeconds: {} within the rate limit with current count {}", + task, rateLimitPerFrequency, rateLimitFrequencyInSeconds, ++currentBucketCount); + Monitors.recordTaskRateLimited(task.getTaskDefName(), rateLimitPerFrequency); + return false; + } else { + logger.info("Task: {} with rateLimitPerFrequency: {} and rateLimitFrequencyInSeconds: {} is out of bounds of rate limit with current count {}", + task, rateLimitPerFrequency, rateLimitFrequencyInSeconds, currentBucketCount); + return true; + } + } + } + @Override public boolean exceedsInProgressLimit(Task task) { TaskDef taskDef = metadataDA0.getTaskDef(task.getTaskDefName()); @@ -258,7 +311,7 @@ public boolean exceedsInProgressLimit(Task task) { long current = getInProgressTaskCount(task.getTaskDefName()); if(current >= limit) { logger.info("Task execution count limited. task - {}:{}, limit: {}, current: {}", task.getTaskId(), task.getTaskDefName(), limit, current); - Monitors.recordTaskRateLimited(task.getTaskDefName(), limit); + Monitors.recordTaskConcurrentExecutionLimited(task.getTaskDefName(), limit); return true; } @@ -275,7 +328,7 @@ public boolean exceedsInProgressLimit(Task task) { String inProgressKey = nsKey(TASKS_IN_PROGRESS_STATUS, task.getTaskDefName()); //Cleanup any items that are still present in the rate limit bucket but not in progress anymore! ids.stream().filter(id -> !dynoClient.sismember(inProgressKey, id)).forEach(id2 -> dynoClient.zrem(rateLimitKey, id2)); - Monitors.recordTaskRateLimited(task.getTaskDefName(), limit); + Monitors.recordTaskConcurrentExecutionLimited(task.getTaskDefName(), limit); } return rateLimited; } diff --git a/redis-persistence/src/test/java/com/netflix/conductor/dao/dynomite/RedisExecutionDAOTest.java b/redis-persistence/src/test/java/com/netflix/conductor/dao/dynomite/RedisExecutionDAOTest.java index 4620805bfb..e83dd2138b 100644 --- a/redis-persistence/src/test/java/com/netflix/conductor/dao/dynomite/RedisExecutionDAOTest.java +++ b/redis-persistence/src/test/java/com/netflix/conductor/dao/dynomite/RedisExecutionDAOTest.java @@ -57,6 +57,7 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.when; /** * @author Viren @@ -487,4 +488,31 @@ public void testCorrelateTaskToWorkflowInDS() throws Exception { assertEquals(taskId, tasks.get(0).getTaskId()); } + @Test + public void testExceedsRateLimitWhenNoRateLimitSet() { + Task task =new Task(); + assertFalse(executionDAO.exceedsRateLimitPerFrequency(task)); + } + + @Test + public void testExceedsRateLimitWithinLimit() { + Task task =new Task(); + task.setRateLimitFrequencyInSeconds(60); + task.setRateLimitPerFrequency(20); + + assertFalse(executionDAO.exceedsRateLimitPerFrequency(task)); + + } + + @Test + public void testExceedsRateLimitOutOfLimit() { + Task task =new Task(); + task.setRateLimitFrequencyInSeconds(60); + task.setRateLimitPerFrequency(1); + + assertFalse(executionDAO.exceedsRateLimitPerFrequency(task)); + assertTrue(executionDAO.exceedsRateLimitPerFrequency(task)); + + } + } diff --git a/ui/src/components/workflow/tasks/TasksMetaList.js b/ui/src/components/workflow/tasks/TasksMetaList.js index cda83ba9ad..0c68fd46c2 100644 --- a/ui/src/components/workflow/tasks/TasksMetaList.js +++ b/ui/src/components/workflow/tasks/TasksMetaList.js @@ -82,6 +82,21 @@ class TaskMetaList extends React.Component { value={row.concurrentExecLimit} addonBefore="Concurrent Exec Limit" /> +
+ +
+

@@ -120,6 +135,12 @@ class TaskMetaList extends React.Component { Concurrent Exec Limit + + Rate Limit Amount + + + Rate Limit Frequency Seconds + Retry Logic