Skip to content

Commit

Permalink
Exposing an ability to fail WorkflowTask for any callback in executor…
Browse files Browse the repository at this point in the history
… code (#1589)

Refactor a clean separate WorkflowMutableState
  • Loading branch information
Spikhalskiy authored Jan 10, 2023
1 parent 8b14ee9 commit e97ceb8
Show file tree
Hide file tree
Showing 14 changed files with 296 additions and 165 deletions.
3 changes: 2 additions & 1 deletion gradle/licensing.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ subprojects {
apply plugin: 'org.cadixdev.licenser'
license {
header rootProject.file('LICENSE.header')
exclude '**/*.puml'
exclude '**/*.puml', 'io/temporal/api', 'gogoproto/Gogo.java'

}
tasks.check.dependsOn('checkLicenseMain')
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import com.google.common.base.Preconditions;
import com.google.protobuf.util.Timestamps;
import io.temporal.api.command.v1.ContinueAsNewWorkflowExecutionCommandAttributes;
import io.temporal.api.common.v1.*;
import io.temporal.api.failure.v1.Failure;
import io.temporal.api.history.v1.WorkflowExecutionStartedEventAttributes;
Expand All @@ -47,11 +46,6 @@ final class BasicWorkflowContext {

@Nullable private final Failure previousRunFailure;

// Mutable, accumulated during execution:
private SearchAttributes.Builder searchAttributes;
private boolean cancelRequested;
private ContinueAsNewWorkflowExecutionCommandAttributes continueAsNewOnCompletion;

BasicWorkflowContext(
String namespace,
@Nonnull WorkflowExecution workflowExecution,
Expand All @@ -60,9 +54,6 @@ final class BasicWorkflowContext {
this.namespace = namespace;
this.workflowExecution = Preconditions.checkNotNull(workflowExecution);
this.startedAttributes = startedAttributes;
if (startedAttributes.hasSearchAttributes()) {
this.searchAttributes = startedAttributes.getSearchAttributes().toBuilder();
}
this.runStartedTimestampMillis = runStartedTimestampMillis;
this.lastCompletionResult =
startedAttributes.hasLastCompletionResult()
Expand All @@ -81,22 +72,6 @@ WorkflowType getWorkflowType() {
return startedAttributes.getWorkflowType();
}

boolean isCancelRequested() {
return cancelRequested;
}

void setCancelRequested(boolean flag) {
cancelRequested = flag;
}

ContinueAsNewWorkflowExecutionCommandAttributes getContinueAsNewOnCompletion() {
return continueAsNewOnCompletion;
}

void setContinueAsNewOnCompletion(ContinueAsNewWorkflowExecutionCommandAttributes parameters) {
this.continueAsNewOnCompletion = parameters;
}

Optional<String> getContinuedExecutionRunId() {
WorkflowExecutionStartedEventAttributes attributes = getWorkflowStartedEventAttributes();
String runId = attributes.getContinuedExecutionRunId();
Expand Down Expand Up @@ -152,27 +127,10 @@ public Payload getMemo(String key) {
return startedAttributes.getMemo().getFieldsMap().get(key);
}

@Nullable
SearchAttributes getSearchAttributes() {
return searchAttributes == null || searchAttributes.getIndexedFieldsCount() == 0
? null
: searchAttributes.build();
}

int getAttempt() {
return startedAttributes.getAttempt();
}

void mergeSearchAttributes(SearchAttributes searchAttributes) {
if (searchAttributes == null || searchAttributes.getIndexedFieldsCount() == 0) {
return;
}
if (this.searchAttributes == null) {
this.searchAttributes = SearchAttributes.newBuilder();
}
this.searchAttributes.putAllIndexedFields(searchAttributes.getIndexedFieldsMap());
}

public String getCronSchedule() {
return startedAttributes.getCronSchedule();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,7 @@
import com.uber.m3.tally.Scope;
import io.temporal.api.command.v1.ContinueAsNewWorkflowExecutionCommandAttributes;
import io.temporal.api.command.v1.SignalExternalWorkflowExecutionCommandAttributes;
import io.temporal.api.common.v1.Payload;
import io.temporal.api.common.v1.Payloads;
import io.temporal.api.common.v1.SearchAttributes;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.common.v1.WorkflowType;
import io.temporal.api.common.v1.*;
import io.temporal.api.failure.v1.Failure;
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponse;
import io.temporal.internal.statemachines.ExecuteActivityParameters;
Expand Down Expand Up @@ -61,17 +57,6 @@ public interface ReplayWorkflowContext extends ReplayAware {

WorkflowType getWorkflowType();

/**
* @return true if cancellation of the workflow is requested.
*/
boolean isCancelRequested();

/**
* When these attributes are present upon completion of the workflow code the ContinueAsNew
* command is emitted instead of the workflow completion.
*/
ContinueAsNewWorkflowExecutionCommandAttributes getContinueAsNewOnCompletion();

/**
* RunId of the first run in the continue-as-new chain. Empty if this workflow never called
* continue as new.
Expand Down Expand Up @@ -99,12 +84,6 @@ public interface ReplayWorkflowContext extends ReplayAware {

Payload getMemo(String key);

/**
* @return search attributes as a non-deserialized protobuf, null if empty
*/
@Nullable
SearchAttributes getSearchAttributes();

/**
* Requests an activity execution.
*
Expand Down Expand Up @@ -155,8 +134,6 @@ Functions.Proc1<Exception> signalExternalWorkflowExecution(
void requestCancelExternalWorkflowExecution(
WorkflowExecution execution, Functions.Proc2<Void, RuntimeException> callback);

void continueAsNewOnCompletion(ContinueAsNewWorkflowExecutionCommandAttributes attributes);

/**
* @return time of the {@link PollWorkflowTaskQueueResponse} start event of the workflow task
* being processed or replayed.
Expand Down Expand Up @@ -261,9 +238,6 @@ void getVersion(
*/
UUID randomUUID();

/** Updates or inserts search attributes used to index workflows. */
void upsertSearchAttributes(SearchAttributes searchAttributes);

/**
* @return workflow retry attempt. default is 1
*/
Expand Down Expand Up @@ -303,4 +277,53 @@ void getVersion(
* @return eventId of the last / currently active workflow task of this workflow
*/
long getCurrentWorkflowTaskStartedEventId();

/**
* @return true if cancellation of the workflow is requested.
*/
boolean isCancelRequested();

void setCancelRequested();

/**
* @return true if the worker's execution or a or replay of the workflow method finished or failed
*/
boolean isWorkflowMethodCompleted();

void setWorkflowMethodCompleted();

/**
* When these attributes are present upon completion of the workflow code the ContinueAsNew
* command is emitted instead of the workflow completion.
*/
ContinueAsNewWorkflowExecutionCommandAttributes getContinueAsNewOnCompletion();

void continueAsNewOnCompletion(ContinueAsNewWorkflowExecutionCommandAttributes attributes);

Throwable getWorkflowTaskFailure();

/**
* Can be used by any code (both control and executing in workflow threads) to communicate that
* something is off, correct handling of Workflow Task is no possible and the worker should fail
* the Workflow Task.
*
* <p>Note that this method is created to be from callback and other places where it may be tricky
* to propagate an exception. If you usecase is in the main synchronous code of WFT processing
* worker executor control thread - prefer direct exception throwing or return over using this
* indirect way.
*
* @param failure cause of the workflow task failure, this exception will be propagated by
* rethrowing in Workflow Executor thread
*/
void failWorkflowTask(Throwable failure);

/**
* @return search attributes collected during the workflow execution up to the current moment as a
* non-deserialized protobuf, null if empty
*/
@Nullable
SearchAttributes getSearchAttributes();

/** Updates or inserts search attributes used to index workflows. */
void upsertSearchAttributes(@Nonnull SearchAttributes searchAttributes);
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@
* Switch both to nullable.
*/
final class ReplayWorkflowContextImpl implements ReplayWorkflowContext {

private final WorkflowStateMachines workflowStateMachines;
private final BasicWorkflowContext basicWorkflowContext;
private final WorkflowStateMachines workflowStateMachines;
private final WorkflowMutableState mutableState;
private final @Nullable String fullReplayDirectQueryName;
private final Scope replayAwareWorkflowMetricsScope;
private final SingleWorkerOptions workerOptions;
Expand All @@ -71,6 +71,7 @@ final class ReplayWorkflowContextImpl implements ReplayWorkflowContext {
this.basicWorkflowContext =
new BasicWorkflowContext(
namespace, workflowExecution, startedAttributes, runStartedTimestampMillis);
this.mutableState = new WorkflowMutableState(startedAttributes);
this.fullReplayDirectQueryName = fullReplayDirectQueryName;
this.replayAwareWorkflowMetricsScope =
new ReplayAwareScope(workflowMetricsScope, this, workflowStateMachines::currentTimeMillis);
Expand Down Expand Up @@ -118,24 +119,6 @@ public WorkflowType getWorkflowType() {
return basicWorkflowContext.getWorkflowType();
}

/**
* TODO methods and state like this (that tracks effectively an execution result) either doesn't
* belong here or should be abstracted into a separate interface with setter methods.
*/
@Override
public boolean isCancelRequested() {
return basicWorkflowContext.isCancelRequested();
}

void setCancelRequested(boolean flag) {
basicWorkflowContext.setCancelRequested(flag);
}

@Override
public ContinueAsNewWorkflowExecutionCommandAttributes getContinueAsNewOnCompletion() {
return basicWorkflowContext.getContinueAsNewOnCompletion();
}

@Nonnull
@Override
public Duration getWorkflowTaskTimeout() {
Expand Down Expand Up @@ -189,7 +172,7 @@ public Payload getMemo(String key) {
@Override
@Nullable
public SearchAttributes getSearchAttributes() {
return basicWorkflowContext.getSearchAttributes();
return mutableState.getSearchAttributes();
}

@Override
Expand Down Expand Up @@ -240,12 +223,6 @@ public void requestCancelExternalWorkflowExecution(
workflowStateMachines.requestCancelExternalWorkflowExecution(attributes, callback);
}

@Override
public void continueAsNewOnCompletion(
ContinueAsNewWorkflowExecutionCommandAttributes attributes) {
basicWorkflowContext.setContinueAsNewOnCompletion(attributes);
}

@Override
public boolean isReplaying() {
return workflowStateMachines.isReplaying();
Expand Down Expand Up @@ -315,9 +292,9 @@ public long currentTimeMillis() {
}

@Override
public void upsertSearchAttributes(SearchAttributes searchAttributes) {
public void upsertSearchAttributes(@Nonnull SearchAttributes searchAttributes) {
workflowStateMachines.upsertSearchAttributes(searchAttributes);
basicWorkflowContext.mergeSearchAttributes(searchAttributes);
mutableState.upsertSearchAttributes(searchAttributes);
}

@Override
Expand Down Expand Up @@ -357,4 +334,48 @@ public Map<String, Payload> getHeader() {
public long getCurrentWorkflowTaskStartedEventId() {
return workflowStateMachines.getCurrentStartedEventId();
}

/*
* MUTABLE STATE OPERATIONS
*/

@Override
public boolean isCancelRequested() {
return mutableState.isCancelRequested();
}

@Override
public void setCancelRequested() {
mutableState.setCancelRequested();
}

public boolean isWorkflowMethodCompleted() {
return mutableState.isWorkflowMethodCompleted();
}

@Override
public void setWorkflowMethodCompleted() {
this.mutableState.setWorkflowMethodCompleted();
}

@Override
public ContinueAsNewWorkflowExecutionCommandAttributes getContinueAsNewOnCompletion() {
return mutableState.getContinueAsNewOnCompletion();
}

@Override
public void continueAsNewOnCompletion(
ContinueAsNewWorkflowExecutionCommandAttributes attributes) {
mutableState.continueAsNewOnCompletion(attributes);
}

@Override
public Throwable getWorkflowTaskFailure() {
return mutableState.getWorkflowTaskFailure();
}

@Override
public void failWorkflowTask(Throwable failure) {
mutableState.failWorkflowTask(failure);
}
}
Loading

0 comments on commit e97ceb8

Please sign in to comment.