Skip to content

Commit

Permalink
Fail WFT if Local Activity execution experienced an Error (#1591)
Browse files Browse the repository at this point in the history
  • Loading branch information
Spikhalskiy authored Jan 9, 2023
1 parent f1c8454 commit 8b14ee9
Show file tree
Hide file tree
Showing 13 changed files with 173 additions and 35 deletions.
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ plugins {
id 'org.cadixdev.licenser' version '0.6.1'
id 'com.palantir.git-version' version "${palantirGitVersionVersion}" apply false
id 'io.github.gradle-nexus.publish-plugin' version '1.1.0'
id 'com.diffplug.spotless' version '6.12.0' apply false
id 'com.diffplug.spotless' version '6.12.1' apply false
id 'com.github.nbaztec.coveralls-jacoco' version "1.2.15" apply false

// id 'org.jetbrains.kotlin.jvm' version '1.4.32'
Expand Down Expand Up @@ -39,7 +39,7 @@ ext {
guavaVersion = '31.1-jre' // [10.0,)
tallyVersion = '0.11.1' // [0.4.0,)

gsonVersion = '2.10' // [2.0,)
gsonVersion = '2.10.1' // [2.0,)

jsonPathVersion = '2.7.0' // compileOnly

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class ReplayWorkflowRunTaskHandler implements WorkflowRunTaskHandler {
@Override
public WorkflowTaskResult handleWorkflowTask(
PollWorkflowTaskQueueResponseOrBuilder workflowTask, WorkflowHistoryIterator historyIterator)
throws InterruptedException {
throws InterruptedException, Throwable {
lock.lock();
try {
Deadline wftHearbeatDeadline =
Expand Down Expand Up @@ -273,7 +273,7 @@ public void close() {
}

private void processLocalActivityRequests(Deadline wftHeartbeatDeadline)
throws InterruptedException {
throws InterruptedException, Throwable {

while (true) {
List<ExecuteLocalActivityParameters> laRequests =
Expand Down Expand Up @@ -307,6 +307,11 @@ private void processLocalActivityRequests(Deadline wftHeartbeatDeadline)
}

localActivityTaskCount--;

if (laCompletion.getProcessingError() != null) {
throw laCompletion.getProcessingError().getThrowable();
}

workflowStateMachines.handleLocalActivityCompletion(laCompletion);
// handleLocalActivityCompletion triggers eventLoop.
// After this call, there may be new local activity requests available in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package io.temporal.internal.replay;

import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponseOrBuilder;
import io.temporal.worker.NonDeterministicException;

/**
* Task handler that encapsulates a cached workflow and can handle multiple calls to
Expand All @@ -35,10 +36,12 @@ public interface WorkflowRunTaskHandler {
*
* @param workflowTask task to handle
* @return an object that can be used to build workflow task completion or failure response
* @throws Throwable if processing experienced issues that are considered unrecoverable inside the
* current workflow task. {@link NonDeterministicException} or {@link Error} are such cases.
*/
WorkflowTaskResult handleWorkflowTask(
PollWorkflowTaskQueueResponseOrBuilder workflowTask, WorkflowHistoryIterator historyIterator)
throws InterruptedException;
throws Throwable;

/**
* Handles a Direct Query (or Legacy Query) scenario. In this case, it's not a real workflow task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,15 @@ public final class LocalActivityResult {
private final @Nullable RespondActivityTaskCompletedRequest executionCompleted;
private final @Nullable ExecutionFailedResult executionFailed;
private final @Nullable RespondActivityTaskCanceledRequest executionCanceled;
/**
* If present, it will cause an immediate WFT failure instead of providing LA result to the
* workflow code.
*/
private final @Nullable ProcessingErrorResult processingError;

static LocalActivityResult completed(ActivityTaskHandler.Result ahResult, int attempt) {
return new LocalActivityResult(
ahResult.getActivityId(), attempt, ahResult.getTaskCompleted(), null, null);
ahResult.getActivityId(), attempt, ahResult.getTaskCompleted(), null, null, null);
}

static LocalActivityResult failed(
Expand All @@ -48,12 +53,18 @@ static LocalActivityResult failed(
@Nullable Duration backoff) {
ExecutionFailedResult failedResult =
new ExecutionFailedResult(retryState, timeoutFailure, backoff);
return new LocalActivityResult(activityId, attempt, null, failedResult, null);
return new LocalActivityResult(activityId, attempt, null, failedResult, null, null);
}

static LocalActivityResult cancelled(ActivityTaskHandler.Result ahResult, int attempt) {
return new LocalActivityResult(
ahResult.getActivityId(), attempt, null, null, ahResult.getTaskCanceled());
ahResult.getActivityId(), attempt, null, null, ahResult.getTaskCanceled(), null);
}

/** result created by this factory method will lead to as immediate WFT failure as possible. */
static LocalActivityResult processingFailed(String activityId, int attempt, Throwable ex) {
return new LocalActivityResult(
activityId, attempt, null, null, null, new ProcessingErrorResult(ex));
}

/**
Expand All @@ -66,12 +77,14 @@ public LocalActivityResult(
int lastAttempt,
@Nullable RespondActivityTaskCompletedRequest executionCompleted,
@Nullable ExecutionFailedResult executionFailed,
@Nullable RespondActivityTaskCanceledRequest executionCanceled) {
@Nullable RespondActivityTaskCanceledRequest executionCanceled,
@Nullable ProcessingErrorResult processingError) {
this.activityId = activityId;
this.lastAttempt = lastAttempt;
this.executionCompleted = executionCompleted;
this.executionFailed = executionFailed;
this.executionCanceled = executionCanceled;
this.processingError = processingError;
}

@Nonnull
Expand All @@ -98,6 +111,11 @@ public RespondActivityTaskCanceledRequest getExecutionCanceled() {
return executionCanceled;
}

@Nullable
public ProcessingErrorResult getProcessingError() {
return processingError;
}

@Override
public String toString() {
return "LocalActivityResult{"
Expand All @@ -112,6 +130,8 @@ public String toString() {
+ executionFailed
+ ", executionCanceled="
+ executionCanceled
+ ", processingError="
+ processingError
+ '}';
}

Expand Down Expand Up @@ -158,4 +178,22 @@ public String toString() {
+ '}';
}
}

public static class ProcessingErrorResult {
@Nonnull private final Throwable throwable;

public ProcessingErrorResult(@Nonnull Throwable throwable) {
this.throwable = throwable;
}

@Nonnull
public Throwable getThrowable() {
return throwable;
}

@Override
public String toString() {
return "ProcessingErrorResult{" + "throwable=" + throwable + '}';
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package io.temporal.internal.worker;

import static io.temporal.internal.worker.LocalActivityResult.failed;
import static io.temporal.internal.worker.LocalActivityResult.processingFailed;

import com.google.common.base.Preconditions;
import com.uber.m3.tally.Scope;
Expand All @@ -35,7 +36,6 @@
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponseOrBuilder;
import io.temporal.common.RetryOptions;
import io.temporal.failure.ApplicationFailure;
import io.temporal.failure.FailureConverter;
import io.temporal.internal.common.ProtobufTimeUtils;
import io.temporal.internal.common.RetryOptionsUtils;
import io.temporal.internal.logging.LoggerTag;
Expand Down Expand Up @@ -481,14 +481,8 @@ public void handle(LocalActivityAttemptTask attemptTask) throws Exception {
// handleLocalActivity is expected to never throw an exception and return a result
// that can be used for a workflow callback if this method throws, it's a bug.
log.error("[BUG] Code that expected to never throw an exception threw an exception", ex);
Failure failure = FailureConverter.exceptionToFailure(ex);
executionContext.callback(
failed(
activityTask.getActivityId(),
activityTask.getAttempt(),
RetryState.RETRY_STATE_INTERNAL_SERVER_ERROR,
failure,
null));
processingFailed(activityTask.getActivityId(), activityTask.getAttempt(), ex));
throw ex;
} finally {
MDC.remove(LoggerTag.ACTIVITY_ID);
Expand Down Expand Up @@ -539,6 +533,7 @@ private void handleResult(

Failure executionFailure =
activityHandlerResult.getTaskFailed().getTaskFailedRequest().getFailure();
Throwable executionThrowable = activityHandlerResult.getTaskFailed().getFailure();

RetryDecision retryDecision =
shouldRetry(
Expand All @@ -550,6 +545,9 @@ private void handleResult(
Objects.requireNonNull(
retryDecision.nextAttemptBackoff, "nextAttemptBackoff is expected to not be null"),
executionFailure);
} else if (retryDecision.failWorkflowTask()) {
executionContext.callback(
processingFailed(executionContext.getActivityId(), currentAttempt, executionThrowable));
} else {
executionContext.callback(
failed(
Expand Down Expand Up @@ -775,5 +773,9 @@ public RetryDecision(@Nonnull Duration nextAttemptBackoff) {
public boolean doNextAttempt() {
return retryState == null;
}

public boolean failWorkflowTask() {
return RetryState.RETRY_STATE_INTERNAL_SERVER_ERROR.equals(retryState);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ protected void buildWorkflow(AsyncWorkflowBuilder<Void> builder) {
1,
RespondActivityTaskCompletedRequest.newBuilder().setResult(result2).build(),
null,
null,
null);
stateMachines.handleLocalActivityCompletion(completionActivity2);
requests = stateMachines.takeLocalActivityRequests();
Expand All @@ -210,6 +211,7 @@ protected void buildWorkflow(AsyncWorkflowBuilder<Void> builder) {
1,
RespondActivityTaskCompletedRequest.newBuilder().setResult(result3).build(),
null,
null,
null);
stateMachines.handleLocalActivityCompletion(completionActivity3);
requests = stateMachines.takeLocalActivityRequests();
Expand Down Expand Up @@ -248,6 +250,7 @@ protected void buildWorkflow(AsyncWorkflowBuilder<Void> builder) {
1,
RespondActivityTaskCompletedRequest.newBuilder().setResult(result).build(),
null,
null,
null);
stateMachines.handleLocalActivityCompletion(completionActivity1);
requests = stateMachines.takeLocalActivityRequests();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import org.junit.Test;
import org.junit.rules.TestName;

public class LocalActivityThrowingpplicationFailureTest {
public class LocalActivityThrowingApplicationFailureTest {

private static WorkflowOptions options;

Expand All @@ -69,7 +69,7 @@ public void setUp() {
}

@Test
public void localActivityThrowsError() {
public void retryable() {
String name = testName.getMethodName();
WorkflowClient client = testWorkflowRule.getWorkflowClient();
TestWorkflow4 workflow = client.newWorkflowStub(TestWorkflow4.class, options);
Expand All @@ -84,7 +84,7 @@ public void localActivityThrowsError() {
}

@Test
public void localActivityNonRetryableThrowsError() {
public void nonRetryable() {
String name = testName.getMethodName();
WorkflowClient client = testWorkflowRule.getWorkflowClient();
TestWorkflow4 workflow = client.newWorkflowStub(TestWorkflow4.class, options);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.workflow.activityTests;

import static org.junit.Assert.*;

import io.temporal.activity.LocalActivityOptions;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.enums.v1.EventType;
import io.temporal.api.history.v1.HistoryEvent;
import io.temporal.client.WorkflowStub;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.shared.TestActivities;
import io.temporal.workflow.shared.TestWorkflows.NoArgsWorkflow;
import java.time.Duration;
import java.util.List;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;

/**
* If Local Activity throws an {@link Error}, it should immediately fail Workflow Task. Java Error
* signals a problem with the Worker and shouldn't lead to a failure of a Local Activity execution
* or a Workflow.
*/
public class LocalActivityThrowingErrorTest {

@Rule public TestName testName = new TestName();

@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder()
.setWorkflowTypes(LocalActivityThrowsErrorWorkflow.class)
.setActivityImplementations(new ApplicationFailureActivity())
.build();

@Test
public void throwsError() {
NoArgsWorkflow workflow = testWorkflowRule.newWorkflowStub(NoArgsWorkflow.class);
WorkflowStub workflowStub = WorkflowStub.fromTyped(workflow);
workflowStub.start();
WorkflowExecution execution = workflowStub.getExecution();
testWorkflowRule.waitForTheEndOfWFT(execution);
List<HistoryEvent> historyEvents =
testWorkflowRule.getHistoryEvents(execution, EventType.EVENT_TYPE_WORKFLOW_TASK_FAILED);
assertTrue(historyEvents.size() > 0);
}

public static class LocalActivityThrowsErrorWorkflow implements NoArgsWorkflow {

private final TestActivities.NoArgsActivity activity1 =
Workflow.newLocalActivityStub(
TestActivities.NoArgsActivity.class,
LocalActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofMinutes(2))
.build());

@Override
public void execute() {
activity1.execute();
}
}

public static class ApplicationFailureActivity implements TestActivities.NoArgsActivity {
@Override
public void execute() {
throw new Error("test");
}
}
}
2 changes: 1 addition & 1 deletion temporal-spring-boot-autoconfigure-alpha/build.gradle
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
description = '''Spring Boot AutoConfigure for Temporal Java SDK'''

ext {
otelVersion = '1.21.0'
otelVersion = '1.22.0'
otShimVersion = "${otelVersion}-alpha"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.io.Closeable;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;

/**
* TestWorkflowEnvironment provides workflow unit testing capabilities.
Expand Down Expand Up @@ -196,7 +197,7 @@ static TestWorkflowEnvironment newInstance(TestEnvironmentOptions options) {
* @param execution identifies the workflowId and runId (optionally) to reach the history for
* @return history of the execution
*/
WorkflowExecutionHistory getWorkflowExecutionHistory(WorkflowExecution execution);
WorkflowExecutionHistory getWorkflowExecutionHistory(@Nonnull WorkflowExecution execution);

/** Calls {@link #shutdownNow()} and {@link #awaitTermination(long, TimeUnit)}. */
@Override
Expand Down
Loading

0 comments on commit 8b14ee9

Please sign in to comment.