Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Fix the ResponseTimeout issue
Browse files Browse the repository at this point in the history
  • Loading branch information
Vikram Singh committed Oct 17, 2017
1 parent 1b1844b commit f9f9920
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ private DeciderOutcome decide(final WorkflowDef def, final Workflow workflow, Li
TaskDef taskDef = metadata.getTaskDef(task.getTaskDefName());
if(taskDef != null) {
checkForTimeout(taskDef, task);
// If the task has not been updated for "responseTimeout" then rescheduled it.
if(checkForResponseTimeout(taskDef, task)){
outcome.tasksToBeRequeued.add(task);
}
}

if (!task.getStatus().isSuccessful()) {
Expand Down Expand Up @@ -374,6 +378,30 @@ void checkForTimeout(TaskDef taskType, Task task) {
return;
}

@VisibleForTesting
boolean checkForResponseTimeout(TaskDef taskType, Task task) {

if(taskType == null){
logger.warn("missing task type " + task.getTaskDefName() + ", workflowId=" + task.getWorkflowInstanceId());
return false;
}
if (task.getStatus().isTerminal() || taskType.getTimeoutSeconds() <= 0 ||
!task.getStatus().equals(Status.IN_PROGRESS) || taskType.getResponseTimeoutSeconds() == 0) {
return false;
}

long responseTimeout = 1000 * taskType.getResponseTimeoutSeconds();
long now = System.currentTimeMillis();
long noResponseTime = now - task.getUpdateTime();

if (noResponseTime < responseTimeout) {
return false;
}
Monitors.recordTaskResponseTimeout(task.getTaskDefName());

return true;
}

private List<Task> getTasksToBeScheduled(WorkflowDef def, Workflow workflow, WorkflowTask taskToSchedule, int retryCount) {
return getTasksToBeScheduled(def, workflow, taskToSchedule, retryCount, null);
}
Expand Down Expand Up @@ -672,6 +700,8 @@ public static class DeciderOutcome {
List<Task> tasksToBeScheduled = new LinkedList<>();

List<Task> tasksToBeUpdated = new LinkedList<>();

List<Task> tasksToBeRequeued = new LinkedList<>();

boolean isComplete;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,8 +511,12 @@ public boolean decide(String workflowId) throws Exception {
List<Task> tasksToBeScheduled = outcome.tasksToBeScheduled;
setTaskDomains(tasksToBeScheduled, workflow);
List<Task> tasksToBeUpdated = outcome.tasksToBeUpdated;
List<Task> tasksToBeRequeued = outcome.tasksToBeRequeued;
boolean stateChanged = false;

if(!tasksToBeRequeued.isEmpty()){
addTaskToQueue(tasksToBeRequeued);
}
workflow.getTasks().addAll(tasksToBeScheduled);
for(Task task : tasksToBeScheduled) {
if (SystemTaskType.is(task.getTaskType()) && !task.getStatus().isTerminal()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,10 @@ public static void recordTaskTimeout(String taskType) {
counter(classQualifier, "task_timeout", "taskType", taskType);
}

public static void recordTaskResponseTimeout(String taskType) {
counter(classQualifier, "task_response_timeout", "taskType", taskType);
}

public static void recordWorkflowTermination(String workflowType, WorkflowStatus status, String ownerApp) {
counter(classQualifier, "workflow_failure", "workflowName", workflowType, "status", status.name(), "ownerApp", ""+ownerApp);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1148,6 +1148,8 @@ public void testSimpleWorkflow() throws Exception {
assertTrue(ess.ackTaskRecieved(task.getTaskId(), "task1.junit.worker"));
assertEquals(wfid, task.getWorkflowInstanceId());

provider.decide(wfid);

String task1Op = "task1.Done";
List<Task> tasks = ess.getTasks(task.getTaskType(), null, 1);
assertNotNull(tasks);
Expand Down Expand Up @@ -1197,6 +1199,67 @@ public void testSimpleWorkflow() throws Exception {

}

@Test
public void testSimpleWorkflowWithResponseTimeout() throws Exception {

createWFWithResponseTimeout();

String correlationId = "unit_test_1";
Map<String, Object> input = new HashMap<String, Object>();
String inputParam1 = "p1 value";
input.put("param1", inputParam1);
input.put("param2", "p2 value");
String wfid = provider.startWorkflow("RTOWF", 1, correlationId , input);
System.out.println("testSimpleWorkflowWithResponseTimeout.wfid=" + wfid);
assertNotNull(wfid);

Workflow es = ess.getExecutionStatus(wfid, true);
assertNotNull(es);
assertEquals(WorkflowStatus.RUNNING, es.getStatus());
assertEquals(1, es.getTasks().size()); //The very first task is the one that should be scheduled.


// Polling for the first task should return the same task as before
Task task = ess.poll("task_rt", "task1.junit.worker");
assertNotNull(task);
assertEquals("task_rt", task.getTaskType());
assertTrue(ess.ackTaskRecieved(task.getTaskId(), "task1.junit.worker"));
assertEquals(wfid, task.getWorkflowInstanceId());

// As the task_rt is out of the queue, the next poll should not get it
Task nullTask = ess.poll("task_rt", "task1.junit.worker");
assertNull(nullTask);

// Now since the ResponseTimeOut is set to 15 secs, sleep
Thread.sleep(15000);
provider.decide(wfid);

// Polling now should get the same task back because it should have been put back in the queue
Task taskAgain = ess.poll("task_rt", "task1.junit.worker");
assertNotNull(taskAgain);
assertEquals(task.getTaskId(), taskAgain.getTaskId());

String task1Op = "task1.Done";
task.getOutputData().put("op", task1Op);
task.setStatus(Status.COMPLETED);
ess.updateTask(task);

task = ess.poll("junit_task_2", "task2.junit.worker");
assertNotNull(task);
assertEquals("junit_task_2", task.getTaskType());
assertTrue(ess.ackTaskRecieved(task.getTaskId(), "task2.junit.worker"));

task.setStatus(Status.COMPLETED);
task.setReasonForIncompletion("unit test failure");
ess.updateTask(task);


es = ess.getExecutionStatus(wfid, true);
assertNotNull(es);
assertEquals(WorkflowStatus.COMPLETED, es.getStatus());

}

@Test
public void testWorkflowRerunWithSubWorkflows() throws Exception {
// Execute a workflow
Expand Down Expand Up @@ -3475,7 +3538,50 @@ private void createWorkflowDefForDomain(){
} catch (Exception e) {}
}


private void createWFWithResponseTimeout() throws Exception{
TaskDef task = new TaskDef();
task.setName("task_rt");
task.setTimeoutSeconds(120);
task.setRetryCount(RETRY_COUNT);
task.setResponseTimeoutSeconds(15);
ms.registerTaskDef(Arrays.asList(task));

WorkflowDef def = new WorkflowDef();
def.setName("RTOWF");
def.setDescription(def.getName());
def.setVersion(1);
def.setInputParameters(Arrays.asList("param1", "param2"));
Map<String, Object> outputParameters = new HashMap<>();
outputParameters.put("o1", "${workflow.input.param1}");
outputParameters.put("o2", "${t2.output.uuid}");
outputParameters.put("o3", "${t1.output.op}");
def.setOutputParameters(outputParameters);
def.setFailureWorkflow("$workflow.input.failureWfName");
def.setSchemaVersion(2);
LinkedList<WorkflowTask> wftasks = new LinkedList<>();

WorkflowTask wft1 = new WorkflowTask();
wft1.setName("task_rt");
Map<String, Object> ip1 = new HashMap<>();
ip1.put("p1", "${workflow.input.param1}");
ip1.put("p2", "${workflow.input.param2}");
wft1.setInputParameters(ip1);
wft1.setTaskReferenceName("task_rt_t1");

WorkflowTask wft2 = new WorkflowTask();
wft2.setName("junit_task_2");
Map<String, Object> ip2 = new HashMap<>();
ip2.put("tp1", "${workflow.input.param1}");
ip2.put("tp2", "${t1.output.op}");
wft2.setInputParameters(ip2);
wft2.setTaskReferenceName("t2");

wftasks.add(wft1);
wftasks.add(wft2);
def.setTasks(wftasks);

ms.updateWorkflowDef(def);
}

private String runWorkflowWithSubworkflow() throws Exception{
clearWorkflows();
Expand Down

0 comments on commit f9f9920

Please sign in to comment.