Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #299 from Netflix/dev
Browse files Browse the repository at this point in the history
Retry and Rerun bug fix and others
  • Loading branch information
v1r3n authored Aug 11, 2017
2 parents 370b296 + 22b6299 commit 0dfbe72
Show file tree
Hide file tree
Showing 19 changed files with 672 additions and 151 deletions.
11 changes: 6 additions & 5 deletions client/python/conductor/ConductorWorker.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
from __future__ import print_function, absolute_import
import sys
import time
import subprocess
Expand All @@ -25,7 +25,7 @@

class ConductorWorker:
def __init__(self, server_url, thread_count, polling_interval):
wfcMgr = conductor.conductor.WFClientMgr(server_url)
wfcMgr = WFClientMgr(server_url)
self.workflowClient = wfcMgr.workflowClient
self.taskClient = wfcMgr.taskClient
self.thread_count = thread_count
Expand Down Expand Up @@ -55,11 +55,12 @@ def poll_and_execute(self, taskType, exec_function):
def start(self, taskType, exec_function, wait):
print('Polling for task ' + taskType + ' at a ' + str(self.polling_interval) + ' ms interval with ' + str(self.thread_count) + ' threads for task execution, with worker id as ' + hostname)
for x in range(0, int(self.thread_count)):
thread = Thread(target = self.poll_and_execute, args = (taskType, exec_function, ))
thread = Thread(target=self.poll_and_execute, args=(taskType, exec_function, ))
thread.daemon = True
thread.start()
if(wait):
if wait:
while 1:
pass
time.sleep(1)

def exc(taskType, inputData, startTime, retryCount, status, callbackAfterSeconds, pollCount):
print('Executing the function')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,5 +161,9 @@ public SearchResult<WorkflowSummary> search(String query) {
return result;
}

public SearchResult<WorkflowSummary> search(Integer start, Integer size, String sort, String freeText, String query) {
Object[] params = new Object[]{"start", start, "size", size, "sort", sort, "freeText", freeText, "query", query};
return getForEntity("workflow/search", params, new GenericType<SearchResult<WorkflowSummary>>() {});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ public synchronized void init() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("workflow-worker-" + count.getAndIncrement());
t.setName(PropertyFactory.getString("", "workerNamePrefix", "workflow-worker-") + count.getAndIncrement());
return t;
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void test() {
assertEquals("domainB", PropertyFactory.getString("workerB", "domain", null));
assertEquals(null, PropertyFactory.getString("workerC", "domain", null)); // Non Existent


assertEquals("test-group-", PropertyFactory.getString("", "workerNamePrefix", "workflow-worker-"));
}

@Test
Expand Down
1 change: 1 addition & 0 deletions client/src/test/resources/config.properties
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ conductor.worker.workerB.batchSize=84
conductor.worker.workerB.domain=domainB
conductor.worker.Test.paused=true
conductor.worker.domainTestTask2.domain=visinghDomain
conductor.worker.workerNamePrefix=test-group-
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ public static boolean is(String name) {
private String name;

private String taskReferenceName;


private String description;

//Key: Name of the input parameter. MUST be one of the keys defined in TaskDef (e.g. fileName)
//Value: mapping of the parameter from another task (e.g. task1.someOutputParameterAsFileName)
private Map<String, Object> inputParameters = new HashMap<String, Object>();
Expand Down Expand Up @@ -124,6 +126,20 @@ public void setTaskReferenceName(String taskReferenceName) {
this.taskReferenceName = taskReferenceName;
}

/**
* @return the description
*/
public String getDescription() {
return description;
}

/**
* @param description the description to set
*/
public void setDescription(String description) {
this.description = description;
}

/**
* @return the inputParameters
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,9 @@ private Task getDynamicTasks(WorkflowDef def, Workflow workflow, WorkflowTask ta
Map<String, Object> input = getTaskInput(taskToSchedule.getInputParameters(), workflow, null, null);
Object paramValue = input.get(paramName);
DynamicForkJoinTaskList dynForkTasks0 = om.convertValue(paramValue, DynamicForkJoinTaskList.class);
if(dynForkTasks0 == null) {
throw new TerminateWorkflow("Dynamic tasks could not be created. The value of " + paramName + " from task's input " + input + " has no dynamic tasks to be scheduled");
}
for( DynamicForkJoinTask dt : dynForkTasks0.getDynamicTasks()) {
WorkflowTask wft = new WorkflowTask();
wft.setTaskReferenceName(dt.getReferenceName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package com.netflix.conductor.core.execution;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -155,78 +156,14 @@ public String startWorkflow(String name, int version, Map<String, Object> input,
}

public String rerun(RerunWorkflowRequest request) throws Exception {

Workflow reRunFromWorkflow = edao.getWorkflow(request.getReRunFromWorkflowId());

String workflowId = IDGenerator.generate();

// Persist the workflow and task First
Workflow wf = new Workflow();
wf.setWorkflowId(workflowId);
wf.setCorrelationId((request.getCorrelationId() == null) ? reRunFromWorkflow.getCorrelationId() : request.getCorrelationId());
wf.setWorkflowType(reRunFromWorkflow.getWorkflowType());
wf.setVersion(reRunFromWorkflow.getVersion());
wf.setInput((request.getWorkflowInput() == null) ? reRunFromWorkflow.getInput() : request.getWorkflowInput());
wf.setReRunFromWorkflowId(request.getReRunFromWorkflowId());
wf.setStatus(WorkflowStatus.RUNNING);
wf.setOwnerApp(WorkflowContext.get().getClientApp());
wf.setCreateTime(System.currentTimeMillis());
wf.setUpdatedBy(null);
wf.setUpdateTime(null);

// If the "reRunFromTaskId" is not given in the RerunWorkflowRequest,
// then the whole
// workflow has to rerun
if (request.getReRunFromTaskId() != null) {
// We need to go thru the workflowDef and create tasks for
// all tasks before request.getReRunFromTaskId() and marked them
// skipped
List<Task> newTasks = new LinkedList<>();
Map<String, Task> refNameToTask = new HashMap<String, Task>();
reRunFromWorkflow.getTasks().forEach(task -> refNameToTask.put(task.getReferenceTaskName(), task));
WorkflowDef wd = metadata.get(reRunFromWorkflow.getWorkflowType(), reRunFromWorkflow.getVersion());
Iterator<WorkflowTask> it = wd.getTasks().iterator();
int seq = wf.getTasks().size();
while (it.hasNext()) {
WorkflowTask wt = it.next();
Task previousTask = refNameToTask.get(wt.getTaskReferenceName());
if (previousTask.getTaskId().equals(request.getReRunFromTaskId())) {
Task theTask = new Task();
theTask.setTaskId(IDGenerator.generate());
theTask.setReferenceTaskName(previousTask.getReferenceTaskName());
theTask.setInputData((request.getTaskInput() == null) ? previousTask.getInputData() : request.getTaskInput());
theTask.setWorkflowInstanceId(workflowId);
theTask.setStatus(Status.READY_FOR_RERUN);
theTask.setTaskType(previousTask.getTaskType());
theTask.setCorrelationId(wf.getCorrelationId());
theTask.setSeq(seq++);
theTask.setRetryCount(previousTask.getRetryCount() + 1);
newTasks.add(theTask);
break;
} else { // Create with Skipped status
Task theTask = new Task();
theTask.setTaskId(IDGenerator.generate());
theTask.setReferenceTaskName(previousTask.getReferenceTaskName());
theTask.setWorkflowInstanceId(workflowId);
theTask.setStatus(Status.SKIPPED);
theTask.setTaskType(previousTask.getTaskType());
theTask.setCorrelationId(wf.getCorrelationId());
theTask.setInputData(previousTask.getInputData());
theTask.setOutputData(previousTask.getOutputData());
theTask.setRetryCount(previousTask.getRetryCount() + 1);
theTask.setSeq(seq++);
newTasks.add(theTask);
}
}

edao.createTasks(newTasks);
Preconditions.checkNotNull(request.getReRunFromWorkflowId(), "reRunFromWorkflowId is missing");
if(!rerunWF(request.getReRunFromWorkflowId(), request.getReRunFromTaskId(), request.getTaskInput(),
request.getWorkflowInput(), request.getCorrelationId())){
throw new ApplicationException(Code.INVALID_INPUT, "Task " + request.getReRunFromTaskId() + " not found");
}

edao.createWorkflow(wf);
decide(workflowId);
return workflowId;
return request.getReRunFromWorkflowId();
}

public void rewind(String workflowId) throws Exception {
Workflow workflow = edao.getWorkflow(workflowId, true);
if (!workflow.getStatus().isTerminal()) {
Expand All @@ -253,13 +190,23 @@ public void retry(String workflowId) throws Exception {
if (workflow.getTasks().isEmpty()) {
throw new ApplicationException(Code.CONFLICT, "Workflow has not started yet");
}
int lastIndex = workflow.getTasks().size() - 1;
Task last = workflow.getTasks().get(lastIndex);
if (!last.getStatus().isTerminal()) {

// First get the failed task and the cancelled task
Task failedTask = null;
List<Task> cancelledTasks = new ArrayList<Task>();
for(Task t: workflow.getTasks()) {
if(t.getStatus().equals(Status.FAILED)){
failedTask = t;
} else if(t.getStatus().equals(Status.CANCELED)){
cancelledTasks.add(t);

}
};
if (failedTask != null && !failedTask.getStatus().isTerminal()) {
throw new ApplicationException(Code.CONFLICT,
"The last task is still not completed! I can only retry the last failed task. Use restart if you want to attempt entire workflow execution again.");
}
if (last.getStatus().isSuccessful()) {
if (failedTask != null && failedTask.getStatus().isSuccessful()) {
throw new ApplicationException(Code.CONFLICT,
"The last task has not failed! I can only retry the last failed task. Use restart if you want to attempt entire workflow execution again.");
}
Expand All @@ -271,13 +218,34 @@ public void retry(String workflowId) throws Exception {
update.forEach(task -> task.setRetried(true));
edao.updateTasks(update);

Task retried = last.copy();
List<Task> rescheduledTasks = new ArrayList<Task>();
// Now reschedule the failed task
Task retried = failedTask.copy();
retried.setTaskId(IDGenerator.generate());
retried.setRetriedTaskId(last.getTaskId());
retried.setRetriedTaskId(failedTask.getTaskId());
retried.setStatus(Status.SCHEDULED);
retried.setRetryCount(last.getRetryCount() + 1);
scheduleTask(workflow, Arrays.asList(retried));

retried.setRetryCount(failedTask.getRetryCount() + 1);
rescheduledTasks.add(retried);

// Reschedule the cancelled task but if the join is cancelled set that to in progress
cancelledTasks.forEach(t -> {
if(t.getTaskType().equalsIgnoreCase(WorkflowTask.Type.JOIN.toString())){
t.setStatus(Status.IN_PROGRESS);
t.setRetried(false);
edao.updateTask(t);
} else {
//edao.removeTask(t.getTaskId());
Task copy = t.copy();
copy.setTaskId(IDGenerator.generate());
copy.setRetriedTaskId(t.getTaskId());
copy.setStatus(Status.SCHEDULED);
copy.setRetryCount(t.getRetryCount() + 1);
rescheduledTasks.add(copy);
}
});

scheduleTask(workflow, rescheduledTasks);

workflow.setStatus(WorkflowStatus.RUNNING);
edao.updateWorkflow(workflow);

Expand Down Expand Up @@ -762,10 +730,19 @@ boolean scheduleTask(Workflow workflow, List<Task> tasks) throws Exception {
if (tasks == null || tasks.isEmpty()) {
return false;
}
int count = workflow.getTasks().size();
int count = 0;

// Get the highest seq number
for(Task t: workflow.getTasks()){
if(t.getSeq() > count){
count = t.getSeq();
}
}

for (Task task : tasks) {
task.setSeq(++count);
if(task.getSeq() == 0){ // Set only if the seq was not set
task.setSeq(++count);
}
}

List<Task> created = edao.createTasks(tasks);
Expand Down Expand Up @@ -817,5 +794,87 @@ private void terminate(final WorkflowDef def, final Workflow workflow, Terminate
terminateWorkflow(workflow, tw.getMessage(), failureWorkflow);
}

private boolean rerunWF(String workflowId, String taskId, Map<String, Object> taskInput,
Map<String, Object> workflowInput, String correlationId) throws Exception{

// Get the workflow
Workflow workflow = edao.getWorkflow(workflowId);

// If the task Id is null it implies that the entire workflow has to be rerun
if(taskId == null){
// remove all tasks
workflow.getTasks().forEach(t -> edao.removeTask(t.getTaskId()));
// Set workflow as RUNNING
workflow.setStatus(WorkflowStatus.RUNNING);
if(correlationId != null){
workflow.setCorrelationId(correlationId);
}
if(workflowInput != null){
workflow.setInput(workflowInput);
}

edao.updateWorkflow(workflow);

decide(workflowId);
return true;
}

// Now iterate thru the tasks and find the "specific" task
Task theTask = null;
for(Task t: workflow.getTasks()){
if(t.getTaskId().equals(taskId)){
theTask = t;
break;
} else {
// If not found look into sub workflows
if(t.getTaskType().equalsIgnoreCase("SUB_WORKFLOW")){
String subWorkflowId = t.getInputData().get("subWorkflowId").toString();
if(rerunWF(subWorkflowId, taskId, taskInput, null, null)){
theTask = t;
break;
}
}
}
}


if(theTask != null){
// Remove all later tasks from the "theTask"
for(Task t: workflow.getTasks()){
if(t.getSeq() > theTask.getSeq()){
edao.removeTask(t.getTaskId());
}
}
if(theTask.getTaskType().equalsIgnoreCase("SUB_WORKFLOW")){
// if task is sub workflow set task as IN_PROGRESS
theTask.setStatus(Status.IN_PROGRESS);
edao.updateTask(theTask);
} else {
// Set the task to rerun
theTask.setStatus(Status.SCHEDULED);
if(taskInput != null){
theTask.setInputData(taskInput);
}
theTask.setRetried(false);
edao.updateTask(theTask);
addTaskToQueue(theTask);
}
// and workflow as RUNNING
workflow.setStatus(WorkflowStatus.RUNNING);
if(correlationId != null){
workflow.setCorrelationId(correlationId);
}
if(workflowInput != null){
workflow.setInput(workflowInput);
}

edao.updateWorkflow(workflow);

decide(workflowId);
return true;
}

return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,9 @@ public Task getPendingTaskForWorkflow(String taskReferenceName, String workflowI

public boolean ackTaskRecieved(String taskId, String consumerId) throws Exception {
Task task = getTask(taskId);
String queueName = QueueUtils.getQueueName(task);

if (task != null) {
String queueName = QueueUtils.getQueueName(task);
if(task.getResponseTimeoutSeconds() > 0) {
logger.debug("Adding task " + queueName + "/" + taskId + " to be requeued if no response received " + task.getResponseTimeoutSeconds());
return queue.setUnackTimeout(queueName, task.getTaskId(), 1000 * task.getResponseTimeoutSeconds()); //Value is in millisecond
Expand Down
Loading

0 comments on commit 0dfbe72

Please sign in to comment.