Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Added API to reset task callback timeouts to 0
Browse files Browse the repository at this point in the history
  • Loading branch information
Vikram Singh committed Oct 6, 2017
1 parent cce8164 commit 357f8f4
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 1 deletion.
9 changes: 9 additions & 0 deletions core/src/main/java/com/netflix/conductor/dao/QueueDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,5 +123,14 @@ public default void processUnacks(String queueName) {

}

/**
* Sets the offset time without pulling out the message from the queue
* @param queueName name of the queue
* @param id message id
* @param offsetTimeInSecond time in seconds, after which the message should be marked visible. (for timed queues)
* @return true if the message is in queue and the change was successful else returns false
*/
public boolean setOffsetTime(String queueName, String id, long offsetTimeInSecond);


}
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,14 @@ public void restart(@PathParam("workflowId") String workflowId) throws Exception
public void retry(@PathParam("workflowId") String workflowId) throws Exception {
executor.retry(workflowId);
}

@POST
@Path("/{workflowId}/resetcallbacks")
@ApiOperation("Resets callback times of all in_progress tasks to 0")
@Consumes(MediaType.WILDCARD)
public void reset(@PathParam("workflowId") String workflowId) throws Exception {
executor.resetCallbacksForInProgressTasks(workflowId);
}

@DELETE
@Path("/{workflowId}")
Expand Down
2 changes: 1 addition & 1 deletion redis-persistence/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ dependencies {
compile 'com.google.inject:guice:3.0'
compile 'com.netflix.dyno:dyno-core:1.5.9'
compile 'com.netflix.dyno:dyno-jedis:1.5.9'
compile 'com.netflix.dyno-queues:dyno-queues-redis:1.0.7'
compile 'com.netflix.dyno-queues:dyno-queues-redis:1.0.8'
compile 'org.elasticsearch:elasticsearch:2.+'

//In memory redis for unit testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,4 +203,11 @@ public void processUnacks(String queueName) {
((RedisDynoQueue)queues.get(queueName)).processUnacks();;
}

@Override
public boolean setOffsetTime(String queueName, String id, long offsetTimeInSecond) {
DynoQueue queue = queues.get(queueName);
return queue.setTimeout(id, offsetTimeInSecond);

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1665,6 +1665,120 @@ public void testLongRunning() throws Exception {

}

@Test
public void testResetWorkflowInProgressTasks() throws Exception {

clearWorkflows();

WorkflowDef found = ms.getWorkflowDef(LONG_RUNNING, 1);
assertNotNull(found);

String correlationId = "unit_test_1";
Map<String, Object> input = new HashMap<String, Object>();
String inputParam1 = "p1 value";
input.put("param1", inputParam1);
input.put("param2", "p2 value");
String wfid = provider.startWorkflow(LONG_RUNNING, 1, correlationId , input);
System.out.println("testLongRunning.wfid=" + wfid);
assertNotNull(wfid);

Workflow es = ess.getExecutionStatus(wfid, true);
assertNotNull(es);
assertEquals(WorkflowStatus.RUNNING, es.getStatus());


es = ess.getExecutionStatus(wfid, true);
assertNotNull(es);
assertEquals(WorkflowStatus.RUNNING, es.getStatus());

// Check the queue
assertEquals(Integer.valueOf(1), ess.getTaskQueueSizes(Arrays.asList("junit_task_1")).get("junit_task_1"));
///

Task task = ess.poll("junit_task_1", "task1.junit.worker");
assertNotNull(task);
assertTrue(ess.ackTaskRecieved(task.getTaskId(), "task1.junit.worker"));

String param1 = (String) task.getInputData().get("p1");
String param2 = (String) task.getInputData().get("p2");

assertNotNull(param1);
assertNotNull(param2);
assertEquals("p1 value", param1);
assertEquals("p2 value", param2);


String task1Op = "task1.In.Progress";
task.getOutputData().put("op", task1Op);
task.setStatus(Status.IN_PROGRESS);
task.setCallbackAfterSeconds(3600);
ess.updateTask(task);
String taskId = task.getTaskId();

// Check the queue
assertEquals(Integer.valueOf(1), ess.getTaskQueueSizes(Arrays.asList("junit_task_1")).get("junit_task_1"));
///


es = ess.getExecutionStatus(wfid, true);
assertNotNull(es);
assertEquals(WorkflowStatus.RUNNING, es.getStatus());

// Polling for next task should not return anything
Task task2 = ess.poll("junit_task_2", "task2.junit.worker");
assertNull(task2);

task = ess.poll("junit_task_1", "task1.junit.worker");
assertNull(task);

//Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
// Reset
provider.resetCallbacksForInProgressTasks(wfid);


// Now Polling for the first task should return the same task as before
task = ess.poll("junit_task_1", "task1.junit.worker");
assertNotNull(task);
assertTrue(ess.ackTaskRecieved(task.getTaskId(), "task1.junit.worker"));
assertEquals(task.getTaskId(), taskId);
assertEquals(task.getCallbackAfterSeconds(), 0);

task1Op = "task1.Done";
List<Task> tasks = ess.getTasks(task.getTaskType(), null, 1);
assertNotNull(tasks);
assertEquals(1, tasks.size());
assertEquals(wfid, task.getWorkflowInstanceId());
task = tasks.get(0);
task.getOutputData().put("op", task1Op);
task.setStatus(Status.COMPLETED);
ess.updateTask(task);

task = ess.poll("junit_task_2", "task2.junit.worker");
assertNotNull(task);
assertTrue(ess.ackTaskRecieved(task.getTaskId(), "task2.junit.worker"));
String task2Input = (String) task.getInputData().get("tp2");
assertNotNull(task2Input);
assertEquals(task1Op, task2Input);

task2Input = (String) task.getInputData().get("tp1");
assertNotNull(task2Input);
assertEquals(inputParam1, task2Input);

task.setStatus(Status.COMPLETED);
ess.updateTask(task);


es = ess.getExecutionStatus(wfid, true);
assertNotNull(es);
assertEquals(WorkflowStatus.COMPLETED, es.getStatus());
tasks = es.getTasks();
assertNotNull(tasks);
assertEquals(2, tasks.size());


}


@Test
public void testConcurrentWorkflowExecutions() throws Exception {

Expand Down

0 comments on commit 357f8f4

Please sign in to comment.