Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge branch 'dev'
Browse files Browse the repository at this point in the history
  • Loading branch information
pctreddy committed Dec 18, 2017
2 parents 95b8a1b + 514c0f9 commit 615eb8f
Show file tree
Hide file tree
Showing 15 changed files with 198 additions and 20 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ ui/.settings
.settings
dump.rdb
.idea
out/
*.iml
out/
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ buildscript {
}
}
plugins {
id 'nebula.netflixoss' version '3.6.0'
id 'nebula.netflixoss' version '5.0.0'
}

// Establish version and status
Expand Down
2 changes: 0 additions & 2 deletions client/go/httpclient/httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ func (c *HttpClient) logResponse(statusCode string, response string) {
}

func genParamString(paramMap map[string]string) string {

if paramMap == nil || len(paramMap) == 0 {
return ""
}
Expand Down Expand Up @@ -78,7 +77,6 @@ func (c *HttpClient) httpRequest(url string, requestType string, headers map[str
if err != nil {
return "", err
}

// Default Headers
for key, value := range c.Headers {
req.Header.Set(key, value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
package com.netflix.conductor.common.run;

import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;

import com.netflix.conductor.common.metadata.Auditable;
import com.netflix.conductor.common.metadata.tasks.Task;
Expand Down Expand Up @@ -50,13 +52,13 @@ public boolean isSuccessful(){
private WorkflowStatus status = WorkflowStatus.RUNNING;

private long endTime;

private String workflowId;

private String parentWorkflowId;

private String parentWorkflowTaskId;

private List<Task> tasks = new LinkedList<>();

private Map<String, Object> input = new HashMap<>();
Expand All @@ -79,6 +81,8 @@ public boolean isSuccessful(){

private Map<String, String> taskToDomain = new HashMap<>();

private Set<String> failedReferenceTaskNames = new HashSet<>();

public Workflow(){

}
Expand Down Expand Up @@ -301,7 +305,15 @@ public String getEvent() {
public void setEvent(String event) {
this.event = event;
}


public Set<String> getFailedReferenceTaskNames() {
return failedReferenceTaskNames;
}

public void setFailedReferenceTaskNames(Set<String> failedReferenceTaskNames) {
this.failedReferenceTaskNames = failedReferenceTaskNames;
}

@Override
public String toString() {
return workflowType + "." + version + "/" + workflowId + "." + status;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.TimeZone;
import java.util.stream.Collectors;

import com.netflix.conductor.common.run.Workflow.WorkflowStatus;

Expand Down Expand Up @@ -60,6 +61,8 @@ public class WorkflowSummary {
private long executionTime;

private String event;

private String failedReferenceTaskNames = "";

public WorkflowSummary() {

Expand Down Expand Up @@ -90,6 +93,7 @@ public WorkflowSummary(Workflow workflow) {
this.executionTime = workflow.getEndTime() - workflow.getStartTime();
}
this.event = workflow.getEvent();
this.failedReferenceTaskNames = workflow.getFailedReferenceTaskNames().stream().collect(Collectors.joining(","));
}

/**
Expand Down Expand Up @@ -193,4 +197,12 @@ public String getEvent() {
public void setEvent(String event) {
this.event = event;
}

public String getFailedReferenceTaskNames() {
return failedReferenceTaskNames;
}

public void setFailedReferenceTaskNames(String failedReferenceTaskNames) {
this.failedReferenceTaskNames = failedReferenceTaskNames;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ private DeciderOutcome decide(final WorkflowDef def, final Workflow workflow, Li
TaskDef taskDef = metadata.getTaskDef(task.getTaskDefName());
if(taskDef != null) {
checkForTimeout(taskDef, task);
// If the task has not been updated for "responseTimeout" then rescheduled it.
if(checkForResponseTimeout(taskDef, task)){
outcome.tasksToBeRequeued.add(task);
}
}

if (!task.getStatus().isSuccessful()) {
Expand Down Expand Up @@ -374,6 +378,30 @@ void checkForTimeout(TaskDef taskType, Task task) {
return;
}

@VisibleForTesting
boolean checkForResponseTimeout(TaskDef taskType, Task task) {

if(taskType == null){
logger.warn("missing task type " + task.getTaskDefName() + ", workflowId=" + task.getWorkflowInstanceId());
return false;
}
if (task.getStatus().isTerminal() || taskType.getTimeoutSeconds() <= 0 ||
!task.getStatus().equals(Status.IN_PROGRESS) || taskType.getResponseTimeoutSeconds() == 0) {
return false;
}

long responseTimeout = 1000 * taskType.getResponseTimeoutSeconds();
long now = System.currentTimeMillis();
long noResponseTime = now - task.getUpdateTime();

if (noResponseTime < responseTimeout) {
return false;
}
Monitors.recordTaskResponseTimeout(task.getTaskDefName());

return true;
}

private List<Task> getTasksToBeScheduled(WorkflowDef def, Workflow workflow, WorkflowTask taskToSchedule, int retryCount) {
return getTasksToBeScheduled(def, workflow, taskToSchedule, retryCount, null);
}
Expand Down Expand Up @@ -672,6 +700,8 @@ public static class DeciderOutcome {
List<Task> tasksToBeScheduled = new LinkedList<>();

List<Task> tasksToBeUpdated = new LinkedList<>();

List<Task> tasksToBeRequeued = new LinkedList<>();

boolean isComplete;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,8 +434,16 @@ public void updateTask(TaskResult result) throws Exception {
if (task.getStatus().isTerminal()) {
task.setEndTime(System.currentTimeMillis());
}

edao.updateTask(task);

//If the task has failed update the failed task reference name in the workflow.
//This gives the ability to look at workflow and see what tasks have failed at a high level.
if(Status.FAILED.equals(task.getStatus())) {
wf.getFailedReferenceTaskNames().add(task.getReferenceTaskName());
edao.updateWorkflow(wf);
}

result.getLogs().forEach(tl -> tl.setTaskId(task.getTaskId()));
edao.addTaskExecLog(result.getLogs());

Expand Down Expand Up @@ -511,8 +519,12 @@ public boolean decide(String workflowId) throws Exception {
List<Task> tasksToBeScheduled = outcome.tasksToBeScheduled;
setTaskDomains(tasksToBeScheduled, workflow);
List<Task> tasksToBeUpdated = outcome.tasksToBeUpdated;
List<Task> tasksToBeRequeued = outcome.tasksToBeRequeued;
boolean stateChanged = false;

if(!tasksToBeRequeued.isEmpty()){
addTaskToQueue(tasksToBeRequeued);
}
workflow.getTasks().addAll(tasksToBeScheduled);
for(Task task : tasksToBeScheduled) {
if (SystemTaskType.is(task.getTaskType()) && !task.getStatus().isTerminal()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,10 @@ public static void recordTaskTimeout(String taskType) {
counter(classQualifier, "task_timeout", "taskType", taskType);
}

public static void recordTaskResponseTimeout(String taskType) {
counter(classQualifier, "task_response_timeout", "taskType", taskType);
}

public static void recordWorkflowTermination(String workflowType, WorkflowStatus status, String ownerApp) {
counter(classQualifier, "workflow_failure", "workflowName", workflowType, "status", status.name(), "ownerApp", ""+ownerApp);
}
Expand Down
2 changes: 1 addition & 1 deletion es5-persistence/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
## Usage

1. In `server/build.gradle` file, add `compile project(':conductor-es5-persistence')` dependencie
1. In `server/build.gradle` file, add `compile project(':conductor-es5-persistence')` in dependencies
1. In `server/ConductorServer.java` file , replace `import com.netflix.conductor.dao.es.EmbeddedElasticSearch` with `import com.netflix.conductor.dao.es5.es.EmbeddedElasticSearch`
1. In `server/ServerModule.java` file, replace `import com.netflix.conductor.dao.index.ElasticSearchDAO; import com.netflix.conductor.dao.index.ElasticsearchModule` with `import com.netflix.conductor.dao.es5.index.ElasticSearchDAO; import com.netflix.conductor.dao.es5.index.ElasticsearchModule;`
1. Config property 'workflow.elasticsearch.cluster.name' , value with your elasticsearch cluster name
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-3.1-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-4.4-bin.zip
2 changes: 1 addition & 1 deletion mysql-persistence/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ dependencies {
compile 'commons-io:commons-io:2.4+'
compile 'mysql:mysql-connector-java:5.1.43'
compile 'com.zaxxer:HikariCP:2.6.3'
compile 'org.flywaydb:flyway-core:4.2.0'
compile 'org.flywaydb:flyway-core:4.0.3'

testCompile 'ch.vorburger.mariaDB4j:mariaDB4j:2.2.3'
testCompile 'ch.qos.logback:logback-core:1.2.3'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public Map<String, Long> queuesDetail() {
Number queueSize = (Number)row.get("size");
detail.put(queueName, queueSize.longValue());
}));

return detail;
}

Expand All @@ -146,7 +146,7 @@ public Map<String, Map<String, Map<String, Long>>> queuesDetailVerbose() {
" (SELECT count(*) FROM queue_message WHERE popped = false AND queue_name = q.queue_name) AS size,\n" +
" (SELECT count(*) FROM queue_message WHERE popped = true AND queue_name = q.queue_name) AS uacked \n" +
"FROM queue q";

withTransaction(tx -> tx.createQuery(GET_QUEUES_DETAIL_VERBOSE).executeAndFetchTable().asList().forEach(row -> {
String queueName = (String)row.get("queue_name");
Number queueSize = (Number)row.get("size");
Expand Down Expand Up @@ -222,7 +222,7 @@ private void removeMessage(Connection connection, String queueName, String messa

private List<String> peekMessages(String queueName, int count) {
if (count < 1) return Collections.emptyList();
String PEEK_MESSAGES = "SELECT message_id FROM queue_message WHERE queue_name = :queueName LIMIT :count";
String PEEK_MESSAGES = "SELECT message_id FROM queue_message WHERE queue_name = :queueName AND popped = false LIMIT :count";
return getWithTransaction(tx -> tx.createQuery(PEEK_MESSAGES)
.addParameter("queueName", queueName)
.addParameter("count", count)
Expand All @@ -232,7 +232,7 @@ private List<String> peekMessages(String queueName, int count) {
private List<String> popMessages(Connection connection, String queueName, List<String> messageIds) {
if (messageIds.isEmpty()) return messageIds;

String POP_MESSAGES = "UPDATE queue_message SET popped = true WHERE queue_name = :queueName AND message_id IN (%s)";
String POP_MESSAGES = "UPDATE queue_message SET popped = true WHERE queue_name = :queueName AND message_id IN (%s) AND popped = false";
String query = generateQueryWithParametersListPlaceholders(POP_MESSAGES, messageIds.size());

int result = connection.createQuery(query).addParameter("queueName", queueName).withParams(messageIds.toArray()).executeUpdate().getResult();
Expand All @@ -248,7 +248,7 @@ private List<String> popMessages(Connection connection, String queueName, List<S
private List<Message> readMessages(String queueName, List<String> messageIds) {
if (messageIds.isEmpty()) return Collections.emptyList();

String READ_MESSAGES = "SELECT message_id, payload FROM queue_message WHERE queue_name = :queueName AND message_id IN (%s)";
String READ_MESSAGES = "SELECT message_id, payload FROM queue_message WHERE queue_name = :queueName AND message_id IN (%s) AND popped = false";
String query = generateQueryWithParametersListPlaceholders(READ_MESSAGES, messageIds.size());

List<Message> messages = getWithTransaction(tx -> tx.createQuery(query)
Expand Down
Loading

0 comments on commit 615eb8f

Please sign in to comment.