Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Rate limiting Implementation using Redis constructs (#724)
Browse files Browse the repository at this point in the history
* Rate limitng of Tasks using Redis constructs

* Javadoc build fix

* UI changes to display the rate limitng

* Java docs and loggers

* Metrics and updated loggers to info
  • Loading branch information
pctreddy authored Aug 20, 2018
1 parent 6e73912 commit 0475f9b
Show file tree
Hide file tree
Showing 16 changed files with 205 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,9 @@ public boolean isRetriable() {

private String domain;

private int rateLimitPerSecond;
private int rateLimitPerFrequency;

private int rateLimitFrequencyInSeconds;

public Task() {

Expand Down Expand Up @@ -541,12 +543,20 @@ public void setDomain(String domain) {
this.domain = domain;
}

public int getRateLimitPerSecond() {
return rateLimitPerSecond;
public int getRateLimitPerFrequency() {
return rateLimitPerFrequency;
}

public void setRateLimitPerFrequency(int rateLimitPerFrequency) {
this.rateLimitPerFrequency = rateLimitPerFrequency;
}

public int getRateLimitFrequencyInSeconds() {
return rateLimitFrequencyInSeconds;
}

public void setRateLimitPerSecond(int rateLimitPerSecond) {
this.rateLimitPerSecond = rateLimitPerSecond;
public void setRateLimitFrequencyInSeconds(int rateLimitFrequencyInSeconds) {
this.rateLimitFrequencyInSeconds = rateLimitFrequencyInSeconds;
}

public Task copy() {
Expand All @@ -571,7 +581,8 @@ public Task copy() {
copy.setWorkerId(workerId);
copy.setWorkflowTask(workflowTask);
copy.setDomain(domain);
copy.setRateLimitPerSecond(rateLimitPerSecond);
copy.setRateLimitPerFrequency(rateLimitPerFrequency);
copy.setRateLimitFrequencyInSeconds(rateLimitFrequencyInSeconds);
return copy;
}

Expand All @@ -596,6 +607,8 @@ public String toString() {
", retriedTaskId='" + retriedTaskId + '\'' +
", retried=" + retried +
", callbackFromWorker=" + callbackFromWorker +
", rateLimitFrequencyInSeconds=" + rateLimitFrequencyInSeconds +
", rateLimitPerFrequency=" + rateLimitPerFrequency +
", responseTimeoutSeconds=" + responseTimeoutSeconds +
", workflowInstanceId='" + workflowInstanceId + '\'' +
", taskId='" + taskId + '\'' +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,39 +31,43 @@
* Defines a workflow task definition
*/
public class TaskDef extends Auditable {

public enum TimeoutPolicy {RETRY, TIME_OUT_WF, ALERT_ONLY}

public enum RetryLogic {FIXED, EXPONENTIAL_BACKOFF}




public enum TimeoutPolicy {RETRY, TIME_OUT_WF, ALERT_ONLY;}


public enum RetryLogic {FIXED, EXPONENTIAL_BACKOFF;}
private static final int ONE_HOUR = 60 * 60;

/**
* Unique name identifying the task. The name is unique across
* Unique name identifying the task. The name is unique across
*/
private String name;

private String description;

private int retryCount = 3; // Default

private long timeoutSeconds;

private List<String> inputKeys = new ArrayList<String>();

private List<String> outputKeys = new ArrayList<String>();

private TimeoutPolicy timeoutPolicy = TimeoutPolicy.TIME_OUT_WF;

private RetryLogic retryLogic = RetryLogic.FIXED;

private int retryDelaySeconds = 60;

private int responseTimeoutSeconds = ONE_HOUR;

private Integer concurrentExecLimit;

private Integer rateLimitPerSecond;
private Integer rateLimitPerFrequency;

private Integer rateLimitFrequencyInSeconds;

private Map<String, Object> inputTemplate = new HashMap<>();

Expand Down Expand Up @@ -240,18 +244,35 @@ public Map<String, Object> getInputTemplate() {

/**
*
* @return rateLimitPerSecond The max number of tasks that will be allowed to be executed per second at any given point of time.
* @return rateLimitPerFrequency The max number of tasks that will be allowed to be executed per rateLimitFrequencyInSeconds.
*/
public Integer getRateLimitPerFrequency() {
return rateLimitPerFrequency == null ? 0 : rateLimitPerFrequency;
}

/**
*
* @param rateLimitPerFrequency The max number of tasks that will be allowed to be executed per rateLimitFrequencyInSeconds.
* Setting the value to 0 removes the rate limit
*/
public void setRateLimitPerFrequency(Integer rateLimitPerFrequency) {
this.rateLimitPerFrequency = rateLimitPerFrequency;
}

/**
* @return rateLimitFrequencyInSeconds: The time bucket that is used to rate limit tasks based on {@link #getRateLimitPerFrequency()}
* If null or not set, then defaults to 1 second
*/
public Integer getRateLimitPerSecond() {
return rateLimitPerSecond == null ? 0 : rateLimitPerSecond;
public Integer getRateLimitFrequencyInSeconds() {
return rateLimitFrequencyInSeconds == null ? 1 : rateLimitFrequencyInSeconds;
}

/**
*
* @param rateLimitPerSecond The max number of tasks that will be allowed to be executed per second at any given point of time. Setting the value to 0 removes the rate limit
* @param rateLimitFrequencyInSeconds: The time window/bucket for which the rate limit needs to be applied. This will only have affect if {@link #getRateLimitPerFrequency()} is greater than zero
*/
public void setRateLimitPerSecond(Integer rateLimitPerSecond) {
this.rateLimitPerSecond = rateLimitPerSecond;
public void setRateLimitFrequencyInSeconds(Integer rateLimitFrequencyInSeconds) {
this.rateLimitFrequencyInSeconds = rateLimitFrequencyInSeconds;
}

/**
Expand Down Expand Up @@ -306,7 +327,7 @@ public boolean equals(Object o) {
getTimeoutPolicy() == taskDef.getTimeoutPolicy() &&
getRetryLogic() == taskDef.getRetryLogic() &&
Objects.equals(getConcurrentExecLimit(), taskDef.getConcurrentExecLimit()) &&
Objects.equals(getRateLimitPerSecond(), taskDef.getRateLimitPerSecond()) &&
Objects.equals(getRateLimitPerFrequency(), taskDef.getRateLimitPerFrequency()) &&
Objects.equals(getInputTemplate(), taskDef.getInputTemplate());
}

Expand All @@ -315,6 +336,6 @@ public int hashCode() {

return Objects.hash(getName(), getDescription(), getRetryCount(), getTimeoutSeconds(), getInputKeys(),
getOutputKeys(), getTimeoutPolicy(), getRetryLogic(), getRetryDelaySeconds(),
getResponseTimeoutSeconds(), getConcurrentExecLimit(), getRateLimitPerSecond(), getInputTemplate());
getResponseTimeoutSeconds(), getConcurrentExecLimit(), getRateLimitPerFrequency(), getInputTemplate());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@
import com.netflix.conductor.core.events.queue.dyno.DynoEventQueueProvider;
import com.netflix.conductor.core.execution.ParametersUtils;
import com.netflix.conductor.core.execution.mapper.DecisionTaskMapper;
import com.netflix.conductor.core.execution.mapper.ForkJoinDynamicTaskMapper;
import com.netflix.conductor.core.execution.mapper.DynamicTaskMapper;
import com.netflix.conductor.core.execution.mapper.EventTaskMapper;
import com.netflix.conductor.core.execution.mapper.ForkJoinDynamicTaskMapper;
import com.netflix.conductor.core.execution.mapper.ForkJoinTaskMapper;
import com.netflix.conductor.core.execution.mapper.JoinTaskMapper;
import com.netflix.conductor.core.execution.mapper.SimpleTaskMapper;
import com.netflix.conductor.core.execution.mapper.TaskMapper;
import com.netflix.conductor.core.execution.mapper.SubWorkflowTaskMapper;
import com.netflix.conductor.core.execution.mapper.TaskMapper;
import com.netflix.conductor.core.execution.mapper.UserDefinedTaskMapper;
import com.netflix.conductor.core.execution.mapper.WaitTaskMapper;
import com.netflix.conductor.core.execution.tasks.Event;
Expand All @@ -49,8 +49,6 @@
import com.netflix.conductor.core.execution.tasks.Wait;
import com.netflix.conductor.dao.MetadataDAO;
import com.netflix.conductor.dao.QueueDAO;
import com.netflix.conductor.service.DummyRateLimitingService;
import com.netflix.conductor.service.RateLimitingService;


/**
Expand All @@ -68,7 +66,6 @@ protected void configure() {
bind(SubWorkflow.class).asEagerSingleton();
bind(Wait.class).asEagerSingleton();
bind(Event.class).asEagerSingleton();
bind(RateLimitingService.class).to(DummyRateLimitingService.class);
}

@Provides
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import com.netflix.conductor.dao.MetadataDAO;
import com.netflix.conductor.dao.QueueDAO;
import com.netflix.conductor.metrics.Monitors;
import com.netflix.conductor.service.RateLimitingService;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -88,7 +87,6 @@ public class WorkflowExecutor {

private Configuration config;

private RateLimitingService rateLimitingService;

private ParametersUtils parametersUtils = new ParametersUtils();

Expand All @@ -98,13 +96,12 @@ public class WorkflowExecutor {

@Inject
public WorkflowExecutor(DeciderService deciderService, MetadataDAO metadataDAO, ExecutionDAO executionDAO,
QueueDAO queueDAO, Configuration config, RateLimitingService rateLimitingService) {
QueueDAO queueDAO, Configuration config) {
this.deciderService = deciderService;
this.metadataDAO = metadataDAO;
this.executionDAO = executionDAO;
this.queueDAO = queueDAO;
this.config = config;
this.rateLimitingService = rateLimitingService;

activeWorkerLastPollnSecs = config.getIntProperty("tasks.active.worker.lastpoll", 10);
}
Expand Down Expand Up @@ -737,8 +734,8 @@ public void executeSystemTask(WorkflowSystemTask systemTask, String taskId, int
logger.warn("Concurrent Execution limited for {}:{}", taskId, task.getTaskDefName());
return;
}
if (task.getRateLimitPerSecond() > 0 && !rateLimitingService.evaluateRateLimitBoundary(task)) {
logger.warn("RateLimit Execution limited for {}:{}, limit:{}", taskId, task.getTaskDefName(), task.getRateLimitPerSecond());
if (task.getRateLimitPerFrequency() > 0 && executionDAO.exceedsRateLimitPerFrequency(task)) {
logger.warn("RateLimit Execution limited for {}:{}, limit:{}", taskId, task.getTaskDefName(), task.getRateLimitPerFrequency());
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ public List<Task> getMappedTasks(TaskMapperContext taskMapperContext) throws Ter
userDefinedTask.setRetryCount(retryCount);
userDefinedTask.setCallbackAfterSeconds(taskToSchedule.getStartDelay());
userDefinedTask.setWorkflowTask(taskToSchedule);
userDefinedTask.setRateLimitPerSecond(taskDefinition.getRateLimitPerSecond());
userDefinedTask.setRateLimitPerFrequency(taskDefinition.getRateLimitPerFrequency());
userDefinedTask.setRateLimitFrequencyInSeconds(taskDefinition.getRateLimitFrequencyInSeconds());
return Collections.singletonList(userDefinedTask);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,14 @@ public interface ExecutionDAO {
* @see TaskDef#concurrencyLimit()
*/
boolean exceedsInProgressLimit(Task task);

/**
* Checks if the Task is rate limited or not based on the {@link Task#getRateLimitPerFrequency()} and {@link Task#getRateLimitFrequencyInSeconds()}
* @param task: which needs to be evaluated whether it is rateLimited or not
* @return true: If the {@link Task} is rateLimited
* false: If the {@link Task} is not rateLimited
*/
boolean exceedsRateLimitPerFrequency(Task task);

/**
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,10 @@ public static void recordTaskRateLimited(String taskDefName, int limit) {
gauge(classQualifier, "task_rate_limited", limit, "taskType", taskDefName);
}

public static void recordTaskConcurrentExecutionLimited(String taskDefName, int limit) {
gauge(classQualifier, "task_concurrent_execution_limited", limit, "taskType", taskDefName);
}

public static void recordEventQueueMessagesProcessed(String queueType, String queueName, int count) {
getCounter(classQualifier, "event_queue_messages_processed", "queueType", queueType, "queueName", queueName).increment(count);
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package com.netflix.conductor.service;

import com.google.common.base.Preconditions;
import com.netflix.conductor.annotations.Trace;
import com.netflix.conductor.common.metadata.events.EventHandler;
import com.netflix.conductor.common.metadata.tasks.TaskDef;
Expand All @@ -44,12 +43,10 @@ public class MetadataService {

private MetadataDAO metadataDAO;

private RateLimitingService rateLimitingService;

@Inject
public MetadataService(MetadataDAO metadataDAO, RateLimitingService rateLimitingService) {
public MetadataService(MetadataDAO metadataDAO) {
this.metadataDAO = metadataDAO;
this.rateLimitingService = rateLimitingService;
}

/**
Expand All @@ -63,9 +60,6 @@ public void registerTaskDef(List<TaskDef> taskDefinitions) {
taskDefinition.setUpdatedBy(null);
taskDefinition.setUpdateTime(null);
metadataDAO.createTaskDef(taskDefinition);
if(taskDefinition.getRateLimitPerSecond() != 0) {
rateLimitingService.updateRateLimitRules(taskDefinition);
}
}
}

Expand All @@ -81,9 +75,6 @@ public void updateTaskDef(TaskDef taskDefinition) {
taskDefinition.setUpdatedBy(WorkflowContext.get().getClientApp());
taskDefinition.setUpdateTime(System.currentTimeMillis());
metadataDAO.updateTaskDef(taskDefinition);
if(taskDefinition.getRateLimitPerSecond() != 0) {
rateLimitingService.updateRateLimitRules(taskDefinition);
}
}

/**
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@
import com.netflix.conductor.dao.ExecutionDAO;
import com.netflix.conductor.dao.MetadataDAO;
import com.netflix.conductor.dao.QueueDAO;
import com.netflix.conductor.service.DummyRateLimitingService;
import com.netflix.conductor.service.RateLimitingService;
import org.junit.Before;
import org.junit.Test;

Expand Down Expand Up @@ -94,8 +92,7 @@ public void init() {
taskMappers.put("EVENT", new EventTaskMapper(parametersUtils));
taskMappers.put("WAIT", new WaitTaskMapper(parametersUtils));
DeciderService deciderService = new DeciderService(metadataDAO, taskMappers);
RateLimitingService rateLimitingService = new DummyRateLimitingService();
workflowExecutor = new WorkflowExecutor(deciderService, metadataDAO, executionDAO, queueDAO, config, rateLimitingService);
workflowExecutor = new WorkflowExecutor(deciderService, metadataDAO, executionDAO, queueDAO, config);
}

@Test
Expand Down
Loading

0 comments on commit 0475f9b

Please sign in to comment.