From 9fe18baf532ec1448ed755ddac23db52f2a38b0e Mon Sep 17 00:00:00 2001 From: Vikram Singh Date: Fri, 6 Oct 2017 13:35:50 -0700 Subject: [PATCH] Added API to reset task callback timeouts to 0 (Part 2) --- .../conductor/client/http/WorkflowClient.java | 4 ++++ .../core/execution/WorkflowExecutor.java | 20 +++++++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java b/client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java index 9566abf3cb..a8efa16160 100644 --- a/client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java +++ b/client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java @@ -140,6 +140,10 @@ public void retryLastFailedTask(String workflowId) { postForEntity1("workflow/{workflowId}/retry", workflowId); } + public void resetCallbacksForInProgressTasks(String workflowId) { + postForEntity1("workflow/{workflowId}//{workflowId}/resetcallbacks", workflowId); + } + public void terminateWorkflow(String workflowId, String reason) { delete(new Object[]{"reason", reason}, "workflow/{workflowId}", workflowId); } diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java index 9edb23cca8..94e4356087 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java @@ -155,6 +155,26 @@ public String startWorkflow(String name, int version, Map input, } } + public String resetCallbacksForInProgressTasks(String workflowId) throws Exception { + Workflow workflow = edao.getWorkflow(workflowId, true); + if (workflow.getStatus().isTerminal()) { + throw new ApplicationException(Code.CONFLICT, "Workflow is completed. status=" + workflow.getStatus()); + } + + // Get tasks that are in progress and have callbackAfterSeconds > 0 + // and set the callbackAfterSeconds to 0; + for(Task t: workflow.getTasks()) { + if(t.getStatus().equals(Status.IN_PROGRESS) && + t.getCallbackAfterSeconds() > 0){ + if(queue.setOffsetTime(QueueUtils.getQueueName(t), t.getTaskId(), 0)){ + t.setCallbackAfterSeconds(0); + edao.updateTask(t); + } + } + }; + return workflowId; + } + public String rerun(RerunWorkflowRequest request) throws Exception { Preconditions.checkNotNull(request.getReRunFromWorkflowId(), "reRunFromWorkflowId is missing"); if(!rerunWF(request.getReRunFromWorkflowId(), request.getReRunFromTaskId(), request.getTaskInput(),