diff --git a/client/python/conductor/ConductorWorker.py b/client/python/conductor/ConductorWorker.py index 4e64ff3879..35a7202c99 100644 --- a/client/python/conductor/ConductorWorker.py +++ b/client/python/conductor/ConductorWorker.py @@ -12,7 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from __future__ import print_function +from __future__ import print_function, absolute_import import sys import time import subprocess @@ -25,7 +25,7 @@ class ConductorWorker: def __init__(self, server_url, thread_count, polling_interval): - wfcMgr = conductor.conductor.WFClientMgr(server_url) + wfcMgr = WFClientMgr(server_url) self.workflowClient = wfcMgr.workflowClient self.taskClient = wfcMgr.taskClient self.thread_count = thread_count @@ -55,11 +55,12 @@ def poll_and_execute(self, taskType, exec_function): def start(self, taskType, exec_function, wait): print('Polling for task ' + taskType + ' at a ' + str(self.polling_interval) + ' ms interval with ' + str(self.thread_count) + ' threads for task execution, with worker id as ' + hostname) for x in range(0, int(self.thread_count)): - thread = Thread(target = self.poll_and_execute, args = (taskType, exec_function, )) + thread = Thread(target=self.poll_and_execute, args=(taskType, exec_function, )) + thread.daemon = True thread.start() - if(wait): + if wait: while 1: - pass + time.sleep(1) def exc(taskType, inputData, startTime, retryCount, status, callbackAfterSeconds, pollCount): print('Executing the function') diff --git a/client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java b/client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java index f498fdec7b..9566abf3cb 100644 --- a/client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java +++ b/client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java @@ -161,5 +161,9 @@ public SearchResult search(String query) { return result; } + public SearchResult search(Integer start, Integer size, String sort, String freeText, String query) { + Object[] params = new Object[]{"start", start, "size", size, "sort", sort, "freeText", freeText, "query", query}; + return getForEntity("workflow/search", params, new GenericType>() {}); + } } diff --git a/client/src/main/java/com/netflix/conductor/client/task/WorkflowTaskCoordinator.java b/client/src/main/java/com/netflix/conductor/client/task/WorkflowTaskCoordinator.java index d9e401a4fc..8bf215f7ff 100644 --- a/client/src/main/java/com/netflix/conductor/client/task/WorkflowTaskCoordinator.java +++ b/client/src/main/java/com/netflix/conductor/client/task/WorkflowTaskCoordinator.java @@ -243,7 +243,7 @@ public synchronized void init() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); - t.setName("workflow-worker-" + count.getAndIncrement()); + t.setName(PropertyFactory.getString("", "workerNamePrefix", "workflow-worker-") + count.getAndIncrement()); return t; } }); diff --git a/client/src/test/java/com/netflix/conductor/client/worker/TestPropertyFactory.java b/client/src/test/java/com/netflix/conductor/client/worker/TestPropertyFactory.java index 47352dc5c2..e86b7131fe 100644 --- a/client/src/test/java/com/netflix/conductor/client/worker/TestPropertyFactory.java +++ b/client/src/test/java/com/netflix/conductor/client/worker/TestPropertyFactory.java @@ -62,7 +62,7 @@ public void test() { assertEquals("domainB", PropertyFactory.getString("workerB", "domain", null)); assertEquals(null, PropertyFactory.getString("workerC", "domain", null)); // Non Existent - + assertEquals("test-group-", PropertyFactory.getString("", "workerNamePrefix", "workflow-worker-")); } @Test diff --git a/client/src/test/resources/config.properties b/client/src/test/resources/config.properties index fec67a82c8..9e87f14c49 100644 --- a/client/src/test/resources/config.properties +++ b/client/src/test/resources/config.properties @@ -6,3 +6,4 @@ conductor.worker.workerB.batchSize=84 conductor.worker.workerB.domain=domainB conductor.worker.Test.paused=true conductor.worker.domainTestTask2.domain=visinghDomain +conductor.worker.workerNamePrefix=test-group- \ No newline at end of file diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowTask.java b/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowTask.java index 1c00b455fa..029d6f7456 100644 --- a/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowTask.java +++ b/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowTask.java @@ -59,7 +59,9 @@ public static boolean is(String name) { private String name; private String taskReferenceName; - + + private String description; + //Key: Name of the input parameter. MUST be one of the keys defined in TaskDef (e.g. fileName) //Value: mapping of the parameter from another task (e.g. task1.someOutputParameterAsFileName) private Map inputParameters = new HashMap(); @@ -124,6 +126,20 @@ public void setTaskReferenceName(String taskReferenceName) { this.taskReferenceName = taskReferenceName; } + /** + * @return the description + */ + public String getDescription() { + return description; + } + + /** + * @param description the description to set + */ + public void setDescription(String description) { + this.description = description; + } + /** * @return the inputParameters */ diff --git a/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java b/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java index 22b54206de..86bfa0a640 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java @@ -549,6 +549,9 @@ private Task getDynamicTasks(WorkflowDef def, Workflow workflow, WorkflowTask ta Map input = getTaskInput(taskToSchedule.getInputParameters(), workflow, null, null); Object paramValue = input.get(paramName); DynamicForkJoinTaskList dynForkTasks0 = om.convertValue(paramValue, DynamicForkJoinTaskList.class); + if(dynForkTasks0 == null) { + throw new TerminateWorkflow("Dynamic tasks could not be created. The value of " + paramName + " from task's input " + input + " has no dynamic tasks to be scheduled"); + } for( DynamicForkJoinTask dt : dynForkTasks0.getDynamicTasks()) { WorkflowTask wft = new WorkflowTask(); wft.setTaskReferenceName(dt.getReferenceName()); diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java index 536e4c814d..9edb23cca8 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java @@ -18,6 +18,7 @@ */ package com.netflix.conductor.core.execution; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; @@ -155,78 +156,14 @@ public String startWorkflow(String name, int version, Map input, } public String rerun(RerunWorkflowRequest request) throws Exception { - - Workflow reRunFromWorkflow = edao.getWorkflow(request.getReRunFromWorkflowId()); - - String workflowId = IDGenerator.generate(); - - // Persist the workflow and task First - Workflow wf = new Workflow(); - wf.setWorkflowId(workflowId); - wf.setCorrelationId((request.getCorrelationId() == null) ? reRunFromWorkflow.getCorrelationId() : request.getCorrelationId()); - wf.setWorkflowType(reRunFromWorkflow.getWorkflowType()); - wf.setVersion(reRunFromWorkflow.getVersion()); - wf.setInput((request.getWorkflowInput() == null) ? reRunFromWorkflow.getInput() : request.getWorkflowInput()); - wf.setReRunFromWorkflowId(request.getReRunFromWorkflowId()); - wf.setStatus(WorkflowStatus.RUNNING); - wf.setOwnerApp(WorkflowContext.get().getClientApp()); - wf.setCreateTime(System.currentTimeMillis()); - wf.setUpdatedBy(null); - wf.setUpdateTime(null); - - // If the "reRunFromTaskId" is not given in the RerunWorkflowRequest, - // then the whole - // workflow has to rerun - if (request.getReRunFromTaskId() != null) { - // We need to go thru the workflowDef and create tasks for - // all tasks before request.getReRunFromTaskId() and marked them - // skipped - List newTasks = new LinkedList<>(); - Map refNameToTask = new HashMap(); - reRunFromWorkflow.getTasks().forEach(task -> refNameToTask.put(task.getReferenceTaskName(), task)); - WorkflowDef wd = metadata.get(reRunFromWorkflow.getWorkflowType(), reRunFromWorkflow.getVersion()); - Iterator it = wd.getTasks().iterator(); - int seq = wf.getTasks().size(); - while (it.hasNext()) { - WorkflowTask wt = it.next(); - Task previousTask = refNameToTask.get(wt.getTaskReferenceName()); - if (previousTask.getTaskId().equals(request.getReRunFromTaskId())) { - Task theTask = new Task(); - theTask.setTaskId(IDGenerator.generate()); - theTask.setReferenceTaskName(previousTask.getReferenceTaskName()); - theTask.setInputData((request.getTaskInput() == null) ? previousTask.getInputData() : request.getTaskInput()); - theTask.setWorkflowInstanceId(workflowId); - theTask.setStatus(Status.READY_FOR_RERUN); - theTask.setTaskType(previousTask.getTaskType()); - theTask.setCorrelationId(wf.getCorrelationId()); - theTask.setSeq(seq++); - theTask.setRetryCount(previousTask.getRetryCount() + 1); - newTasks.add(theTask); - break; - } else { // Create with Skipped status - Task theTask = new Task(); - theTask.setTaskId(IDGenerator.generate()); - theTask.setReferenceTaskName(previousTask.getReferenceTaskName()); - theTask.setWorkflowInstanceId(workflowId); - theTask.setStatus(Status.SKIPPED); - theTask.setTaskType(previousTask.getTaskType()); - theTask.setCorrelationId(wf.getCorrelationId()); - theTask.setInputData(previousTask.getInputData()); - theTask.setOutputData(previousTask.getOutputData()); - theTask.setRetryCount(previousTask.getRetryCount() + 1); - theTask.setSeq(seq++); - newTasks.add(theTask); - } - } - - edao.createTasks(newTasks); + Preconditions.checkNotNull(request.getReRunFromWorkflowId(), "reRunFromWorkflowId is missing"); + if(!rerunWF(request.getReRunFromWorkflowId(), request.getReRunFromTaskId(), request.getTaskInput(), + request.getWorkflowInput(), request.getCorrelationId())){ + throw new ApplicationException(Code.INVALID_INPUT, "Task " + request.getReRunFromTaskId() + " not found"); } - - edao.createWorkflow(wf); - decide(workflowId); - return workflowId; + return request.getReRunFromWorkflowId(); } - + public void rewind(String workflowId) throws Exception { Workflow workflow = edao.getWorkflow(workflowId, true); if (!workflow.getStatus().isTerminal()) { @@ -253,13 +190,23 @@ public void retry(String workflowId) throws Exception { if (workflow.getTasks().isEmpty()) { throw new ApplicationException(Code.CONFLICT, "Workflow has not started yet"); } - int lastIndex = workflow.getTasks().size() - 1; - Task last = workflow.getTasks().get(lastIndex); - if (!last.getStatus().isTerminal()) { + + // First get the failed task and the cancelled task + Task failedTask = null; + List cancelledTasks = new ArrayList(); + for(Task t: workflow.getTasks()) { + if(t.getStatus().equals(Status.FAILED)){ + failedTask = t; + } else if(t.getStatus().equals(Status.CANCELED)){ + cancelledTasks.add(t); + + } + }; + if (failedTask != null && !failedTask.getStatus().isTerminal()) { throw new ApplicationException(Code.CONFLICT, "The last task is still not completed! I can only retry the last failed task. Use restart if you want to attempt entire workflow execution again."); } - if (last.getStatus().isSuccessful()) { + if (failedTask != null && failedTask.getStatus().isSuccessful()) { throw new ApplicationException(Code.CONFLICT, "The last task has not failed! I can only retry the last failed task. Use restart if you want to attempt entire workflow execution again."); } @@ -271,13 +218,34 @@ public void retry(String workflowId) throws Exception { update.forEach(task -> task.setRetried(true)); edao.updateTasks(update); - Task retried = last.copy(); + List rescheduledTasks = new ArrayList(); + // Now reschedule the failed task + Task retried = failedTask.copy(); retried.setTaskId(IDGenerator.generate()); - retried.setRetriedTaskId(last.getTaskId()); + retried.setRetriedTaskId(failedTask.getTaskId()); retried.setStatus(Status.SCHEDULED); - retried.setRetryCount(last.getRetryCount() + 1); - scheduleTask(workflow, Arrays.asList(retried)); - + retried.setRetryCount(failedTask.getRetryCount() + 1); + rescheduledTasks.add(retried); + + // Reschedule the cancelled task but if the join is cancelled set that to in progress + cancelledTasks.forEach(t -> { + if(t.getTaskType().equalsIgnoreCase(WorkflowTask.Type.JOIN.toString())){ + t.setStatus(Status.IN_PROGRESS); + t.setRetried(false); + edao.updateTask(t); + } else { + //edao.removeTask(t.getTaskId()); + Task copy = t.copy(); + copy.setTaskId(IDGenerator.generate()); + copy.setRetriedTaskId(t.getTaskId()); + copy.setStatus(Status.SCHEDULED); + copy.setRetryCount(t.getRetryCount() + 1); + rescheduledTasks.add(copy); + } + }); + + scheduleTask(workflow, rescheduledTasks); + workflow.setStatus(WorkflowStatus.RUNNING); edao.updateWorkflow(workflow); @@ -762,10 +730,19 @@ boolean scheduleTask(Workflow workflow, List tasks) throws Exception { if (tasks == null || tasks.isEmpty()) { return false; } - int count = workflow.getTasks().size(); + int count = 0; + + // Get the highest seq number + for(Task t: workflow.getTasks()){ + if(t.getSeq() > count){ + count = t.getSeq(); + } + } for (Task task : tasks) { - task.setSeq(++count); + if(task.getSeq() == 0){ // Set only if the seq was not set + task.setSeq(++count); + } } List created = edao.createTasks(tasks); @@ -817,5 +794,87 @@ private void terminate(final WorkflowDef def, final Workflow workflow, Terminate terminateWorkflow(workflow, tw.getMessage(), failureWorkflow); } + private boolean rerunWF(String workflowId, String taskId, Map taskInput, + Map workflowInput, String correlationId) throws Exception{ + + // Get the workflow + Workflow workflow = edao.getWorkflow(workflowId); + + // If the task Id is null it implies that the entire workflow has to be rerun + if(taskId == null){ + // remove all tasks + workflow.getTasks().forEach(t -> edao.removeTask(t.getTaskId())); + // Set workflow as RUNNING + workflow.setStatus(WorkflowStatus.RUNNING); + if(correlationId != null){ + workflow.setCorrelationId(correlationId); + } + if(workflowInput != null){ + workflow.setInput(workflowInput); + } + + edao.updateWorkflow(workflow); + + decide(workflowId); + return true; + } + + // Now iterate thru the tasks and find the "specific" task + Task theTask = null; + for(Task t: workflow.getTasks()){ + if(t.getTaskId().equals(taskId)){ + theTask = t; + break; + } else { + // If not found look into sub workflows + if(t.getTaskType().equalsIgnoreCase("SUB_WORKFLOW")){ + String subWorkflowId = t.getInputData().get("subWorkflowId").toString(); + if(rerunWF(subWorkflowId, taskId, taskInput, null, null)){ + theTask = t; + break; + } + } + } + } + + + if(theTask != null){ + // Remove all later tasks from the "theTask" + for(Task t: workflow.getTasks()){ + if(t.getSeq() > theTask.getSeq()){ + edao.removeTask(t.getTaskId()); + } + } + if(theTask.getTaskType().equalsIgnoreCase("SUB_WORKFLOW")){ + // if task is sub workflow set task as IN_PROGRESS + theTask.setStatus(Status.IN_PROGRESS); + edao.updateTask(theTask); + } else { + // Set the task to rerun + theTask.setStatus(Status.SCHEDULED); + if(taskInput != null){ + theTask.setInputData(taskInput); + } + theTask.setRetried(false); + edao.updateTask(theTask); + addTaskToQueue(theTask); + } + // and workflow as RUNNING + workflow.setStatus(WorkflowStatus.RUNNING); + if(correlationId != null){ + workflow.setCorrelationId(correlationId); + } + if(workflowInput != null){ + workflow.setInput(workflowInput); + } + + edao.updateWorkflow(workflow); + + decide(workflowId); + return true; + } + + return false; + } } diff --git a/core/src/main/java/com/netflix/conductor/service/ExecutionService.java b/core/src/main/java/com/netflix/conductor/service/ExecutionService.java index 1cd8cc8f42..cf7ecba151 100644 --- a/core/src/main/java/com/netflix/conductor/service/ExecutionService.java +++ b/core/src/main/java/com/netflix/conductor/service/ExecutionService.java @@ -176,9 +176,9 @@ public Task getPendingTaskForWorkflow(String taskReferenceName, String workflowI public boolean ackTaskRecieved(String taskId, String consumerId) throws Exception { Task task = getTask(taskId); - String queueName = QueueUtils.getQueueName(task); if (task != null) { + String queueName = QueueUtils.getQueueName(task); if(task.getResponseTimeoutSeconds() > 0) { logger.debug("Adding task " + queueName + "/" + taskId + " to be requeued if no response received " + task.getResponseTimeoutSeconds()); return queue.setUnackTimeout(queueName, task.getTaskId(), 1000 * task.getResponseTimeoutSeconds()); //Value is in millisecond diff --git a/docker/server/Dockerfile b/docker/server/Dockerfile index faff583ed6..ab6e8313c3 100644 --- a/docker/server/Dockerfile +++ b/docker/server/Dockerfile @@ -1,42 +1,22 @@ # # conductor:server - Netflix conductor server # -FROM java:8-jdk +FROM java:8-jre-alpine MAINTAINER Netflix OSS # Make app folders RUN mkdir -p /app/config /app/logs /app/libs -# Startup script(s) -COPY ./bin /app - -# Configs -COPY ./config /app/config - -# Get all the dependencies -RUN apt-get update -y \ - && apt-get -y install git \ - - # Chmod scripts - && chmod +x /app/startup.sh - -# Get and install conductor -RUN git clone https://github.com/Netflix/conductor.git \ - && cd conductor \ - && ./gradlew build -x test \ - - # Get Server Jar - && mv ./server/build/libs/conductor-server-*-all.jar /app/libs/ \ - - # Go back to root - && cd / \ - - # Clean up - && rm -rf conductor +# Copy the project directly onto the image +COPY ./docker/server/bin /app +COPY ./docker/server/config /app/config +COPY ./server/build/libs/conductor-server-*-all.jar /app/libs +# Copy the files for the server into the app folders +RUN chmod +x /app/startup.sh EXPOSE 8080 -CMD ["/app/startup.sh"] -ENTRYPOINT ["/bin/bash"] +CMD [ "/app/startup.sh" ] +ENTRYPOINT [ "/bin/sh"] diff --git a/docker/server/Dockerfile.build b/docker/server/Dockerfile.build new file mode 100644 index 0000000000..ea82dc149c --- /dev/null +++ b/docker/server/Dockerfile.build @@ -0,0 +1,13 @@ +# +# conductor:server - Netflix conductor server +# +FROM java:8-jdk + +MAINTAINER Netflix OSS + +# Copy the project directly onto the image +COPY . /conductor +WORKDIR /conductor + +# Build the server on run +ENTRYPOINT ./gradlew build -x test diff --git a/docker/server/README.md b/docker/server/README.md index 92efcfa593..872b61b8e0 100644 --- a/docker/server/README.md +++ b/docker/server/README.md @@ -3,7 +3,12 @@ This Dockerfile create the conductor:server image ## Building the image -`docker build -t conductor:server .` + +Run the following commands from the project root. + +`docker build -f docker/server/Dockerfile.build -t conductor:server-build .` +`docker run -v $(pwd):/conductor conductor:server-build` +`docker build -f docker/server/Dockerfile -t conductor:server .` ## Running the conductor server - Standalone server (interal DB): `docker run -p 8080:8080 -d -t conductor:server` diff --git a/docker/server/bin/startup.sh b/docker/server/bin/startup.sh index 2749b1606c..1b382d16dc 100755 --- a/docker/server/bin/startup.sh +++ b/docker/server/bin/startup.sh @@ -1,4 +1,4 @@ -#!/bin/bash +#!/bin/sh # startup.sh - startup script for the server docker image echo "Starting Conductor server" @@ -18,4 +18,4 @@ if [ -z "$CONFIG_PROP" ]; export config_file=/app/config/$CONFIG_PROP fi -nohup java -jar conductor-server-*-all.jar $config_file 1>&2 > /app/logs/server.log +java -jar conductor-server-*-all.jar $config_file diff --git a/docker/ui/Dockerfile b/docker/ui/Dockerfile index 7efca851ac..170f92328a 100644 --- a/docker/ui/Dockerfile +++ b/docker/ui/Dockerfile @@ -1,38 +1,42 @@ # # conductor:ui - Netflix conductor UI # -FROM node - +FROM node:alpine MAINTAINER Netflix OSS -# Make app folders -RUN mkdir -p /app/config /app/logs /app/libs +# Install the required packages for the node build +# to run on alpine +RUN apk update && apk add \ + autoconf \ + automake \ + libtool \ + build-base \ + libstdc++ \ + gcc \ + abuild \ + binutils \ + nasm \ + libpng \ + libpng-dev \ + libjpeg-turbo \ + libjpeg-turbo-dev -# Startup script(s) -COPY ./bin /app +# Make app folders +RUN mkdir -p /app/ui -# Get all the dependencies -RUN apt-get update -y \ - && apt-get -y install git \ +# Copy the ui files onto the image +COPY ./docker/ui/bin /app +COPY ./ui /app/ui - # Chmod scripts - && chmod +x /app/startup.sh +# Copy the files for the server into the app folders +RUN chmod +x /app/startup.sh # Get and install conductor UI -RUN git clone https://github.com/netflix/conductor.git \ - - # Get UI project - && mv /conductor/ui /app \ - - # Remove the conductor project - && rm -rf conductor \ - - # Install UI packages - && cd /app/ui \ +RUN cd /app/ui \ && npm install \ && npm run build --server EXPOSE 5000 -CMD ["/app/startup.sh"] -ENTRYPOINT ["/bin/bash"] +CMD [ "/app/startup.sh" ] +ENTRYPOINT ["/bin/sh"] diff --git a/docker/ui/README.md b/docker/ui/README.md index 31c6306b34..52956e387c 100644 --- a/docker/ui/README.md +++ b/docker/ui/README.md @@ -3,7 +3,10 @@ This Dockerfile create the conductor:ui image ## Building the image -`docker build -t conductor:ui .` + +Run the following commands from the project root. + +`docker build -f docker/ui/Dockerfile -t conductor:ui .` ## Running the conductor server - With localhost conductor server: `docker run -p 5000:5000 -d -t conductor:ui` diff --git a/docker/ui/bin/startup.sh b/docker/ui/bin/startup.sh index 6290e991a8..49b6beb243 100755 --- a/docker/ui/bin/startup.sh +++ b/docker/ui/bin/startup.sh @@ -1,4 +1,4 @@ -#!/bin/bash +#!/bin/sh # startup.sh - startup script for the UI docker image echo "Starting Conductor UI" @@ -12,4 +12,4 @@ if [ -z "$WF_SERVER" ]; echo "using Conductor API server from '$WF_SERVER'" fi -nohup node server.js 1>&2 > /app/logs/ui.log \ No newline at end of file +node server.js \ No newline at end of file diff --git a/docs/docs/metadata/index.md b/docs/docs/metadata/index.md index 76f6ec39dd..0211360ad0 100644 --- a/docs/docs/metadata/index.md +++ b/docs/docs/metadata/index.md @@ -98,6 +98,7 @@ Below are the mandatory minimum parameters required for each task: |name|Name of the task. MUST be registered as a task type with Conductor before starting workflow|| |taskReferenceName|Alias used to refer the task within the workflow. MUST be unique.|| |type|Type of task. SIMPLE for tasks executed by remote workers, or one of the system task types|| +|description|Description of the task|optional| |optional|true or false. When set to true - workflow continues even if the task fails. The status of the task is reflected as `COMPLETED_WITH_ERRORS`|Defaults to `false`| |inputParameters|JSON template that defines the input given to the task|See "wiring inputs and outputs" for details| diff --git a/server/src/main/java/com/netflix/conductor/server/ConductorServer.java b/server/src/main/java/com/netflix/conductor/server/ConductorServer.java index 20df3c6306..b465d6cde6 100644 --- a/server/src/main/java/com/netflix/conductor/server/ConductorServer.java +++ b/server/src/main/java/com/netflix/conductor/server/ConductorServer.java @@ -29,6 +29,7 @@ import javax.servlet.DispatcherType; import javax.ws.rs.core.MediaType; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.servlet.DefaultServlet; import org.eclipse.jetty.servlet.ServletContextHandler; @@ -51,6 +52,8 @@ import com.netflix.dyno.jedis.DynoJedisClient; import com.sun.jersey.api.client.Client; +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.JedisCluster; import redis.clients.jedis.JedisCommands; /** @@ -62,7 +65,7 @@ public class ConductorServer { private static Logger logger = LoggerFactory.getLogger(ConductorServer.class); private enum DB { - redis, dynomite, memory + redis, dynomite, memory, redis_cluster } private ServerModule sm; @@ -167,6 +170,15 @@ public HostToken getTokenForHost(Host host, Set activeHosts) { } logger.info("Starting conductor server using in memory data store"); break; + + case redis_cluster: + Host host = dynoHosts.get(0); + GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig(); + poolConfig.setMinIdle(5); + poolConfig.setMaxTotal(1000); + jedis = new JedisCluster(new HostAndPort(host.getHostName(), host.getPort()), poolConfig); + logger.info("Starting conductor server using redis_cluster " + dynoClusterName); + break; } this.sm = new ServerModule(jedis, hs, cc); diff --git a/test-harness/src/test/java/com/netflix/conductor/tests/integration/WorkflowServiceTest.java b/test-harness/src/test/java/com/netflix/conductor/tests/integration/WorkflowServiceTest.java index 9b65595b12..6994d4c8da 100644 --- a/test-harness/src/test/java/com/netflix/conductor/tests/integration/WorkflowServiceTest.java +++ b/test-harness/src/test/java/com/netflix/conductor/tests/integration/WorkflowServiceTest.java @@ -25,6 +25,7 @@ import static org.junit.Assert.assertTrue; import java.util.Arrays; +import java.util.Date; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -147,6 +148,20 @@ public void init() throws Exception { ms.registerTaskDef(Arrays.asList(task)); } + for(int i = 0; i < 5; i++){ + + String name = "junit_task_0_RT_" + i; + if(ms.getTaskDef(name) != null){ + continue; + } + + TaskDef task = new TaskDef(); + task.setName(name); + task.setTimeoutSeconds(120); + task.setRetryCount(0); + ms.registerTaskDef(Arrays.asList(task)); + } + TaskDef task = new TaskDef(); task.setName("short_time_out"); task.setTimeoutSeconds(5); @@ -182,7 +197,7 @@ public void init() throws Exception { ip2.put("tp2", "${t1.output.op}"); wft2.setInputParameters(ip2); wft2.setTaskReferenceName("t2"); - + wftasks.add(wft1); wftasks.add(wft2); def.setTasks(wftasks); @@ -337,6 +352,7 @@ public void testForkJoin() throws Exception { Map input = new HashMap(); String wfid = provider.startWorkflow(FORK_JOIN_WF, 1, "fanouttest", input ); System.out.println("testForkJoin.wfid=" + wfid); + printTaskStatuses(wfid, "initiated"); Task t1 = ess.poll("junit_task_1", "test"); assertTrue(ess.ackTaskRecieved(t1.getTaskId(), "test")); @@ -356,6 +372,7 @@ public void testForkJoin() throws Exception { Workflow wf = ess.getExecutionStatus(wfid, true); assertNotNull(wf); assertEquals("Found " + wf.getTasks(), WorkflowStatus.RUNNING, wf.getStatus()); + printTaskStatuses(wf, "T1 completed"); t3 = ess.poll("junit_task_3", "test"); assertNotNull(t3); @@ -386,6 +403,7 @@ public void testForkJoin() throws Exception { wf = ess.getExecutionStatus(wfid, true); assertNotNull(wf); + printTaskStatuses(wf, "T2 T3 completed"); assertEquals("Found " + wf.getTasks(), WorkflowStatus.RUNNING, wf.getStatus()); if (!wf.getTasks().stream().anyMatch(t -> t.getReferenceTaskName().equals("t3"))) { provider.decide(wfid); @@ -419,6 +437,7 @@ public void testForkJoin() throws Exception { wf = ess.getExecutionStatus(wfid, true); assertNotNull(wf); assertEquals("Found " + wf.getTasks(), WorkflowStatus.COMPLETED, wf.getStatus()); + printTaskStatuses(wf, "All completed"); } @Test @@ -517,6 +536,7 @@ public void testForkJoinNested() throws Exception { wf = ess.getExecutionStatus(wfid, true); assertNotNull(wf); assertEquals(WorkflowStatus.COMPLETED, wf.getStatus()); + } @Test @@ -745,6 +765,8 @@ public void testDynamicForkJoin() throws Exception { taskDef.setRetryCount(retryCount); taskDef.setRetryDelaySeconds(1); ms.updateTaskDef(taskDef); + + } private void createForkJoinWorkflow() throws Exception { @@ -800,6 +822,59 @@ private void createForkJoinWorkflow() throws Exception { } + + private void createForkJoinWorkflowWithZeroRetry() throws Exception { + + WorkflowDef def = new WorkflowDef(); + def.setName(FORK_JOIN_WF + "_2"); + def.setDescription(def.getName()); + def.setVersion(1); + def.setInputParameters(Arrays.asList("param1", "param2")); + + WorkflowTask fanout = new WorkflowTask(); + fanout.setType(Type.FORK_JOIN.name()); + fanout.setTaskReferenceName("fanouttask"); + + WorkflowTask wft1 = new WorkflowTask(); + wft1.setName("junit_task_0_RT_1"); + Map ip1 = new HashMap<>(); + ip1.put("p1", "workflow.input.param1"); + ip1.put("p2", "workflow.input.param2"); + wft1.setInputParameters(ip1); + wft1.setTaskReferenceName("t1"); + + WorkflowTask wft3 = new WorkflowTask(); + wft3.setName("junit_task_0_RT_3"); + wft3.setInputParameters(ip1); + wft3.setTaskReferenceName("t3"); + + WorkflowTask wft2 = new WorkflowTask(); + wft2.setName("junit_task_0_RT_2"); + Map ip2 = new HashMap<>(); + ip2.put("tp1", "workflow.input.param1"); + wft2.setInputParameters(ip2); + wft2.setTaskReferenceName("t2"); + + WorkflowTask wft4 = new WorkflowTask(); + wft4.setName("junit_task_0_RT_4"); + wft4.setInputParameters(ip2); + wft4.setTaskReferenceName("t4"); + + fanout.getForkTasks().add(Arrays.asList(wft1, wft3)); + fanout.getForkTasks().add(Arrays.asList(wft2)); + + def.getTasks().add(fanout); + + WorkflowTask join = new WorkflowTask(); + join.setType(Type.JOIN.name()); + join.setTaskReferenceName("fanouttask_join"); + join.setJoinOn(Arrays.asList("t3","t2")); + + def.getTasks().add(join); + def.getTasks().add(wft4); + ms.updateWorkflowDef(def); + + } private void createForkJoinNestedWorkflow() throws Exception { WorkflowDef def = new WorkflowDef(); @@ -1123,6 +1198,104 @@ public void testSimpleWorkflow() throws Exception { } + @Test + public void testWorkflowRerunWithSubWorkflows() throws Exception { + // Execute a workflow + String wfid = this.runWorkflowWithSubworkflow(); + // Check it completed + Workflow wf = ess.getExecutionStatus(wfid, true); + assertNotNull(wf); + assertEquals(WorkflowStatus.COMPLETED, wf.getStatus()); + assertEquals(2, wf.getTasks().size()); + + // Now lets pickup the first task in the sub workflow and rerun it from there + String subWorkflowId = null; + for(Task t: wf.getTasks()){ + if(t.getTaskType().equalsIgnoreCase("SUB_WORKFLOW")){ + subWorkflowId = t.getOutputData().get("subWorkflowId").toString(); + } + } + assertNotNull(subWorkflowId); + Workflow subWorkflow = ess.getExecutionStatus(subWorkflowId, true); + Task swT1 = null; + for(Task t: subWorkflow.getTasks()){ + if(t.getTaskDefName().equalsIgnoreCase("junit_task_1")){ + swT1 = t; + } + } + assertNotNull(swT1); + + RerunWorkflowRequest request = new RerunWorkflowRequest(); + request.setReRunFromTaskId(swT1.getTaskId()); + + Map newInput = new HashMap(); + newInput.put("p1", "1"); + newInput.put("p2", "2"); + request.setTaskInput(newInput); + + String correlationId = "unit_test_sw_new"; + Map input = new HashMap(); + input.put("param1", "New p1 value"); + input.put("param2", "New p2 value"); + request.setCorrelationId(correlationId); + request.setWorkflowInput(input); + + request.setReRunFromWorkflowId(wfid); + request.setReRunFromTaskId(swT1.getTaskId()); + // Rerun + provider.rerun(request); + // The main WF and the sub WF should be in RUNNING state + wf = ess.getExecutionStatus(wfid, true); + assertNotNull(wf); + assertEquals(WorkflowStatus.RUNNING, wf.getStatus()); + assertEquals(2, wf.getTasks().size()); + assertEquals(correlationId, wf.getCorrelationId()); + assertEquals("New p1 value", wf.getInput().get("param1")); + assertEquals("New p2 value", wf.getInput().get("param2")); + + subWorkflow = ess.getExecutionStatus(subWorkflowId, true); + assertNotNull(subWorkflow); + assertEquals(WorkflowStatus.RUNNING, subWorkflow.getStatus()); + // Since we are re running from the sub workflow task, there + // should be only 1 task that is SCHEDULED + assertEquals(1, subWorkflow.getTasks().size()); + assertEquals(Status.SCHEDULED, subWorkflow.getTasks().get(0).getStatus()); + + // Now execute the task + Task task = ess.poll("junit_task_1", "task1.junit.worker"); + assertNotNull(task); + assertTrue(ess.ackTaskRecieved(task.getTaskId(), "task1.junit.worker")); + assertEquals(task.getInputData().get("p1").toString(), "1"); + assertEquals(task.getInputData().get("p2").toString(), "2"); + task.getOutputData().put("op", "junit_task_1.done"); + task.setStatus(Status.COMPLETED); + ess.updateTask(task); + + subWorkflow = ess.getExecutionStatus(subWorkflowId, true); + assertNotNull(subWorkflow); + assertEquals(WorkflowStatus.RUNNING, subWorkflow.getStatus()); + assertEquals(2, subWorkflow.getTasks().size()); + + // Poll for second task of the sub workflow and execute it + task = ess.poll("junit_task_2", "task2.junit.worker"); + assertNotNull(task); + assertTrue(ess.ackTaskRecieved(task.getTaskId(), "task2.junit.worker")); + task.getOutputData().put("op", "junit_task_2.done"); + task.setStatus(Status.COMPLETED); + ess.updateTask(task); + + // Now the sub workflow and the main workflow must have finished + subWorkflow = ess.getExecutionStatus(subWorkflowId, true); + assertNotNull(subWorkflow); + assertEquals(WorkflowStatus.COMPLETED, subWorkflow.getStatus()); + assertEquals(2, subWorkflow.getTasks().size()); + + wf = ess.getExecutionStatus(wfid, true); + assertNotNull(wf); + assertEquals(WorkflowStatus.COMPLETED, wf.getStatus()); + assertEquals(2, wf.getTasks().size()); + } + @Test public void testSimpleWorkflowWithTaskSpecificDomain() throws Exception { @@ -2170,6 +2343,65 @@ public void testFailures() throws Exception { } + @Test + public void testRetryWithForkJoin() throws Exception { + String wfid = this.runAFailedForkJoinWF(); + + provider.retry(wfid); + + Workflow wf = ess.getExecutionStatus(wfid, true); + assertNotNull(wf); + + printTaskStatuses(wf, "After retry called"); + + Task t2 = ess.poll("junit_task_0_RT_2", "test"); + assertTrue(ess.ackTaskRecieved(t2.getTaskId(), "test")); + + Task t3 = ess.poll("junit_task_0_RT_3", "test"); + assertNotNull(t3); + assertTrue(ess.ackTaskRecieved(t3.getTaskId(), "test")); + + t2.setStatus(Status.COMPLETED); + t3.setStatus(Status.COMPLETED); + + ExecutorService es = Executors.newFixedThreadPool(2); + Future future1 = es.submit(()->{ + try { + ess.updateTask(t2); + } catch (Exception e) { + throw new RuntimeException(e); + } + + }); + final Task _t3 = t3; + Future future2 = es.submit(()->{ + try { + ess.updateTask(_t3); + } catch (Exception e) { + throw new RuntimeException(e); + } + + }); + future1.get(); + future2.get(); + + provider.decide(wfid); + provider.decide(wfid); + + wf = ess.getExecutionStatus(wfid, true); + assertNotNull(wf); + + printTaskStatuses(wf, "T2, T3 complete"); + provider.decide(wfid); + + Task t4 = ess.poll("junit_task_0_RT_4", "test"); + assertNotNull(t4); + t4.setStatus(Status.COMPLETED); + ess.updateTask(t4); + + printTaskStatuses(wfid, "After complete"); + } + @Test public void testRetry() throws Exception { WorkflowDef errorWorkflow = ms.getWorkflowDef(FORK_JOIN_WF, 1); @@ -2195,6 +2427,7 @@ public void testRetry() throws Exception { input.put("param2", "p2 value"); String wfid = provider.startWorkflow(LINEAR_WORKFLOW_T1_T2, 1, correlationId , input); assertNotNull(wfid); + printTaskStatuses(wfid, "initial"); Task task = getTask("junit_task_1"); assertNotNull(task); @@ -2214,8 +2447,11 @@ public void testRetry() throws Exception { assertNotNull(es); assertEquals(WorkflowStatus.FAILED, es.getStatus()); + printTaskStatuses(wfid, "before retry"); + provider.retry(wfid); + printTaskStatuses(wfid, "after retry"); es = ess.getExecutionStatus(wfid, true); assertNotNull(es); assertEquals(WorkflowStatus.RUNNING, es.getStatus()); @@ -2245,6 +2481,8 @@ public void testRetry() throws Exception { taskDef.setRetryCount(retryCount); taskDef.setRetryDelaySeconds(retryDelay); ms.updateTaskDef(taskDef); + + printTaskStatuses(wfid, "final"); } @@ -2474,7 +2712,7 @@ public void testReruns() throws Exception { // Check the tasks, at this time there should be 2 tasks // first one is skipped and the second one is scheduled assertEquals(esRR.getTasks().toString(), 2, esRR.getTasks().size()); - assertEquals(Status.SKIPPED, esRR.getTasks().get(0).getStatus()); + assertEquals(Status.COMPLETED, esRR.getTasks().get(0).getStatus()); Task tRR = esRR.getTasks().get(1); assertEquals(esRR.getTasks().toString(), Status.SCHEDULED, tRR.getStatus()); assertEquals(tRR.getTaskType(), "junit_task_2"); @@ -3140,4 +3378,185 @@ private void createWorkflowDefForDomain(){ ms.updateWorkflowDef(defSW); } catch (Exception e) {} } + + + + private String runWorkflowWithSubworkflow() throws Exception{ + clearWorkflows(); + createWorkflowDefForDomain(); + + WorkflowDef found = ms.getWorkflowDef(LINEAR_WORKFLOW_T1_T2_SW, 1); + assertNotNull(found); + + String correlationId = "unit_test_sw"; + Map input = new HashMap(); + String inputParam1 = "p1 value"; + input.put("param1", inputParam1); + input.put("param2", "p2 value"); + + String wfid = provider.startWorkflow(LINEAR_WORKFLOW_T1_T2_SW, 1, correlationId , input, null); + System.out.println("testSimpleWorkflow.wfid=" + wfid); + assertNotNull(wfid); + Workflow wf = provider.getWorkflow(wfid, false); + assertNotNull(wf); + + Workflow es = ess.getExecutionStatus(wfid, true); + assertNotNull(es); + assertEquals(es.getReasonForIncompletion(), WorkflowStatus.RUNNING, es.getStatus()); + + + es = ess.getExecutionStatus(wfid, true); + assertNotNull(es); + assertEquals(WorkflowStatus.RUNNING, es.getStatus()); + assertEquals(1, es.getTasks().size()); //The very first task is the one that should be scheduled. + + // Poll for first task and execute it + Task task = ess.poll("junit_task_3", "task3.junit.worker"); + assertNotNull(task); + assertTrue(ess.ackTaskRecieved(task.getTaskId(), "task3.junit.worker")); + task.getOutputData().put("op", "junit_task_3.done"); + task.setStatus(Status.COMPLETED); + ess.updateTask(task); + + es = ess.getExecutionStatus(wfid, true); + assertNotNull(es); + assertEquals(WorkflowStatus.RUNNING, es.getStatus()); + assertEquals(2, es.getTasks().size()); + + // Get the sub workflow id + String subWorkflowId = null; + for(Task t: es.getTasks()){ + if(t.getTaskType().equalsIgnoreCase("SUB_WORKFLOW")){ + subWorkflowId = t.getOutputData().get("subWorkflowId").toString(); + } + } + assertNotNull(subWorkflowId); + + Workflow subWorkflow = ess.getExecutionStatus(subWorkflowId, true); + assertNotNull(subWorkflow); + assertEquals(WorkflowStatus.RUNNING, subWorkflow.getStatus()); + assertEquals(1, subWorkflow.getTasks().size()); + + // Now the Sub workflow is triggers + // Poll for first task of the sub workflow and execute it + task = ess.poll("junit_task_1", "task1.junit.worker"); + assertNotNull(task); + assertTrue(ess.ackTaskRecieved(task.getTaskId(), "task1.junit.worker")); + task.getOutputData().put("op", "junit_task_1.done"); + task.setStatus(Status.COMPLETED); + ess.updateTask(task); + + subWorkflow = ess.getExecutionStatus(subWorkflowId, true); + assertNotNull(subWorkflow); + assertEquals(WorkflowStatus.RUNNING, subWorkflow.getStatus()); + assertEquals(2, subWorkflow.getTasks().size()); + + es = ess.getExecutionStatus(wfid, true); + assertNotNull(es); + assertEquals(WorkflowStatus.RUNNING, es.getStatus()); + assertEquals(2, es.getTasks().size()); + + // Poll for second task of the sub workflow and execute it + task = ess.poll("junit_task_2", "task2.junit.worker"); + assertNotNull(task); + assertTrue(ess.ackTaskRecieved(task.getTaskId(), "task2.junit.worker")); + task.getOutputData().put("op", "junit_task_2.done"); + task.setStatus(Status.COMPLETED); + ess.updateTask(task); + + // Now the sub workflow and the main workflow must have finished + subWorkflow = ess.getExecutionStatus(subWorkflowId, true); + assertNotNull(subWorkflow); + assertEquals(WorkflowStatus.COMPLETED, subWorkflow.getStatus()); + assertEquals(2, subWorkflow.getTasks().size()); + + es = ess.getExecutionStatus(wfid, true); + assertNotNull(es); + assertEquals(WorkflowStatus.COMPLETED, es.getStatus()); + assertEquals(2, es.getTasks().size()); + + return wfid; + } + + private String runAFailedForkJoinWF() throws Exception { + try{ + this.createForkJoinWorkflowWithZeroRetry(); + }catch(Exception e){} + + Map input = new HashMap(); + String wfid = provider.startWorkflow(FORK_JOIN_WF +"_2", 1, "fanouttest", input ); + System.out.println("testForkJoin.wfid=" + wfid); + Task t1 = ess.poll("junit_task_0_RT_1", "test"); + assertTrue(ess.ackTaskRecieved(t1.getTaskId(), "test")); + + Task t2 = ess.poll("junit_task_0_RT_2", "test"); + assertTrue(ess.ackTaskRecieved(t2.getTaskId(), "test")); + assertNotNull(t1); + assertNotNull(t2); + + t1.setStatus(Status.COMPLETED); + ess.updateTask(t1); + + Workflow wf = ess.getExecutionStatus(wfid, true); + assertNotNull(wf); + assertEquals("Found " + wf.getTasks(), WorkflowStatus.RUNNING, wf.getStatus()); + printTaskStatuses(wf, "Initial"); + + t2.setStatus(Status.FAILED); + + ExecutorService es = Executors.newFixedThreadPool(2); + Future future1 = es.submit(()->{ + try { + ess.updateTask(t2); + } catch (Exception e) { + throw new RuntimeException(e); + } + + }); + future1.get(); + + wf = ess.getExecutionStatus(wfid, true); + assertNotNull(wf); + if (!wf.getTasks().stream().anyMatch(t -> t.getReferenceTaskName().equals("t3"))) { + provider.decide(wfid); + wf = ess.getExecutionStatus(wfid, true); + assertNotNull(wf); + }else { + provider.decide(wfid); + } + assertTrue("Found " + wf.getTasks().stream().map(t -> t.getTaskType()).collect(Collectors.toList()), wf.getTasks().stream().anyMatch(t -> t.getReferenceTaskName().equals("t3"))); + + wf = ess.getExecutionStatus(wfid, true); + assertNotNull(wf); + + provider.decide(wfid); + provider.decide(wfid); + + wf = ess.getExecutionStatus(wfid, true); + assertNotNull(wf); + + wf = ess.getExecutionStatus(wfid, true); + assertNotNull(wf); + provider.decide(wfid); + printTaskStatuses(wfid, "After failed"); + + return wfid; + } + + private void printTaskStatuses(String wfid, String message) throws Exception{ + Workflow wf = ess.getExecutionStatus(wfid, true); + assertNotNull(wf); + printTaskStatuses(wf, message); + } + + private boolean printWFTaskDetails = false; + private void printTaskStatuses(Workflow wf, String message) throws Exception{ + if(printWFTaskDetails){ + System.out.println(message + " >>> Workflow status " + wf.getStatus().name()); + wf.getTasks().forEach(t -> { + System.out.println("Task " + String.format("%-15s",t.getTaskType()) + "\t" + String.format("%-15s",t.getReferenceTaskName()) + "\t" + String.format("%-15s",t.getWorkflowTask().getType()) + "\t" + t.getSeq() + "\t" + t.getStatus() + "\t" + t.getTaskId()); + }); + System.out.println(); + } + } }