diff --git a/build.gradle b/build.gradle index 2b01ab57c..04f98031b 100644 --- a/build.gradle +++ b/build.gradle @@ -24,6 +24,7 @@ plugins { allprojects { repositories { + mavenLocal() mavenCentral() } } @@ -32,7 +33,7 @@ ext { // Platforms grpcVersion = '1.54.1' // [1.38.0,) Needed for io.grpc.protobuf.services.HealthStatusManager jacksonVersion = '2.14.2' // [2.9.0,) - nexusVersion = '0.1.0-alpha1' + nexusVersion = '0.2.0' // we don't upgrade to 1.10.x because it requires kotlin 1.6. Users may use 1.10.x in their environments though. micrometerVersion = project.hasProperty("edgeDepsTest") ? '1.10.5' : '1.9.9' // [1.0.0,) diff --git a/temporal-sdk/src/main/java/io/temporal/internal/common/NexusUtil.java b/temporal-sdk/src/main/java/io/temporal/internal/common/NexusUtil.java index 884ad899f..dc6ee53f5 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/common/NexusUtil.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/common/NexusUtil.java @@ -20,6 +20,9 @@ package io.temporal.internal.common; +import io.nexusrpc.Link; +import java.net.URI; +import java.net.URISyntaxException; import java.time.Duration; public class NexusUtil { @@ -42,5 +45,13 @@ public static Duration parseRequestTimeout(String timeout) { } } + public static Link nexusProtoLinkToLink(io.temporal.api.nexus.v1.Link nexusLink) + throws URISyntaxException { + return Link.newBuilder() + .setType(nexusLink.getType()) + .setUri(new URI(nexusLink.getUrl())) + .build(); + } + private NexusUtil() {} } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusOperationContextImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusOperationContextImpl.java index 2d887711e..b54c0867d 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusOperationContextImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusOperationContextImpl.java @@ -25,11 +25,14 @@ import io.temporal.nexus.NexusOperationContext; public class NexusOperationContextImpl implements NexusOperationContext { + private final String namespace; private final String taskQueue; private final WorkflowClient client; private final Scope metricsScope; - public NexusOperationContextImpl(String taskQueue, WorkflowClient client, Scope metricsScope) { + public NexusOperationContextImpl( + String namespace, String taskQueue, WorkflowClient client, Scope metricsScope) { + this.namespace = namespace; this.taskQueue = taskQueue; this.client = client; this.metricsScope = metricsScope; @@ -47,4 +50,8 @@ public WorkflowClient getWorkflowClient() { public String getTaskQueue() { return taskQueue; } + + public String getNamespace() { + return namespace; + } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java index c0b793d76..0e0d1a3eb 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java @@ -20,8 +20,9 @@ package io.temporal.internal.nexus; +import static io.temporal.internal.common.NexusUtil.nexusProtoLinkToLink; + import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; import com.uber.m3.tally.Scope; import io.nexusrpc.FailureInfo; import io.nexusrpc.Header; @@ -38,12 +39,17 @@ import io.temporal.internal.worker.NexusTaskHandler; import io.temporal.internal.worker.ShutdownManager; import io.temporal.worker.TypeAlreadyRegisteredException; +import java.net.URISyntaxException; import java.time.Duration; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class NexusTaskHandlerImpl implements NexusTaskHandler { + private static final Logger log = LoggerFactory.getLogger(NexusTaskHandlerImpl.class); private final DataConverter dataConverter; private final String namespace; private final String taskQueue; @@ -112,7 +118,7 @@ public Result handle(NexusTask task, Scope metricsScope) throws TimeoutException } CurrentNexusOperationContext.set( - new NexusOperationContextImpl(taskQueue, client, metricsScope)); + new NexusOperationContextImpl(namespace, taskQueue, client, metricsScope)); switch (request.getVariantCase()) { case START_OPERATION: @@ -208,8 +214,7 @@ private void convertKnownFailures(Throwable failure) { } private StartOperationResponse handleStartOperation( - OperationContext.Builder ctx, StartOperationRequest task) - throws InvalidProtocolBufferException { + OperationContext.Builder ctx, StartOperationRequest task) { ctx.setService(task.getService()).setOperation(task.getOperation()); OperationStartDetails.Builder operationStartDetails = @@ -217,6 +222,19 @@ private StartOperationResponse handleStartOperation( .setCallbackUrl(task.getCallback()) .setRequestId(task.getRequestId()); task.getCallbackHeaderMap().forEach(operationStartDetails::putCallbackHeader); + task.getLinksList() + .forEach( + link -> { + try { + operationStartDetails.addLink(nexusProtoLinkToLink(link)); + } catch (URISyntaxException e) { + log.error("failed to parse link url: " + link.getUrl(), e); + throw new OperationHandlerException( + OperationHandlerException.ErrorType.BAD_REQUEST, + "Invalid link URL: " + link.getUrl(), + e); + } + }); HandlerInputContent.Builder input = HandlerInputContent.newBuilder().setDataStream(task.getPayload().toByteString().newInput()); @@ -234,6 +252,15 @@ private StartOperationResponse handleStartOperation( startResponseBuilder.setAsyncSuccess( StartOperationResponse.Async.newBuilder() .setOperationId(result.getAsyncOperationId()) + .addAllLinks( + result.getLinks().stream() + .map( + link -> + io.temporal.api.nexus.v1.Link.newBuilder() + .setType(link.getType()) + .setUrl(link.getUri().toString()) + .build()) + .collect(Collectors.toList())) .build()); } } catch (OperationUnsuccessfulException e) { diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/ShutdownManager.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/ShutdownManager.java index a98eae1d9..db60a25bd 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/ShutdownManager.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/ShutdownManager.java @@ -20,6 +20,8 @@ package io.temporal.internal.worker; +import static io.temporal.internal.common.GrpcUtils.isChannelShutdownException; + import com.google.common.util.concurrent.ListenableFuture; import io.grpc.Status; import io.grpc.StatusRuntimeException; @@ -134,13 +136,18 @@ public CompletableFuture waitOnWorkerShutdownRequest( () -> { try { shutdownRequest.get(); - } catch (StatusRuntimeException e) { - // If the server does not support shutdown, ignore the exception - if (Status.Code.UNIMPLEMENTED.equals(e.getStatus().getCode())) { - return; - } - log.warn("failed to call shutdown worker", e); } catch (Exception e) { + if (e instanceof ExecutionException) { + e = (Exception) e.getCause(); + } + if (e instanceof StatusRuntimeException) { + // If the server does not support shutdown, ignore the exception + if (Status.Code.UNIMPLEMENTED.equals( + ((StatusRuntimeException) e).getStatus().getCode()) + || isChannelShutdownException((StatusRuntimeException) e)) { + return; + } + } log.warn("failed to call shutdown worker", e); } finally { future.complete(null); diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/NexusOperationContext.java b/temporal-sdk/src/main/java/io/temporal/nexus/NexusOperationContext.java index f40a251fb..6642c3548 100644 --- a/temporal-sdk/src/main/java/io/temporal/nexus/NexusOperationContext.java +++ b/temporal-sdk/src/main/java/io/temporal/nexus/NexusOperationContext.java @@ -26,7 +26,7 @@ /** * Context object passed to a Nexus operation implementation. Use {@link - * Nexus#getExecutionContext()} from a Nexus Operation implementation to access. + * Nexus#getOperationContext()} from a Nexus Operation implementation to access. */ @Experimental public interface NexusOperationContext { diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowRunOperation.java b/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowRunOperation.java index e1cbc702f..bc8955060 100644 --- a/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowRunOperation.java +++ b/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowRunOperation.java @@ -20,13 +20,20 @@ package io.temporal.nexus; +import static io.temporal.internal.common.LinkConverter.workflowEventToNexusLink; +import static io.temporal.internal.common.NexusUtil.nexusProtoLinkToLink; + import io.nexusrpc.OperationInfo; import io.nexusrpc.handler.*; import io.nexusrpc.handler.OperationHandler; +import io.temporal.api.common.v1.Link; +import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.api.enums.v1.EventType; import io.temporal.client.WorkflowClient; import io.temporal.internal.client.NexusStartWorkflowRequest; import io.temporal.internal.nexus.CurrentNexusOperationContext; import io.temporal.internal.nexus.NexusOperationContextImpl; +import java.net.URISyntaxException; class RunWorkflowOperation implements OperationHandler { private final WorkflowHandleFactory handleFactory; @@ -49,7 +56,32 @@ public OperationStartResult start( operationStartDetails.getCallbackUrl(), operationStartDetails.getCallbackHeaders(), nexusCtx.getTaskQueue()); - return OperationStartResult.async(handle.getInvoker().invoke(nexusRequest).getWorkflowId()); + + WorkflowExecution workflowExec = handle.getInvoker().invoke(nexusRequest); + + // Create the link information about the new workflow and return to the caller. + Link.WorkflowEvent workflowEventLink = + Link.WorkflowEvent.newBuilder() + .setNamespace(nexusCtx.getNamespace()) + .setWorkflowId(workflowExec.getWorkflowId()) + .setRunId(workflowExec.getRunId()) + .setEventRef( + Link.WorkflowEvent.EventReference.newBuilder() + .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED)) + .build(); + io.temporal.api.nexus.v1.Link nexusLink = workflowEventToNexusLink(workflowEventLink); + try { + OperationStartResult.Builder result = + OperationStartResult.newBuilder().setAsyncOperationId(workflowExec.getWorkflowId()); + if (nexusLink != null) { + result.addLink(nexusProtoLinkToLink(nexusLink)); + } + return result.build(); + } catch (URISyntaxException e) { + // Not expected as the link is constructed by the SDK. + throw new OperationHandlerException( + OperationHandlerException.ErrorType.BAD_REQUEST, "failed to construct result URL", e); + } } @Override diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/WorkflowOperationLinkingTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/WorkflowOperationLinkingTest.java new file mode 100644 index 000000000..793a5f33a --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/WorkflowOperationLinkingTest.java @@ -0,0 +1,139 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.workflow.nexus; + +import static io.temporal.internal.common.WorkflowExecutionUtils.getEventOfType; + +import io.nexusrpc.handler.OperationHandler; +import io.nexusrpc.handler.OperationImpl; +import io.nexusrpc.handler.ServiceImpl; +import io.temporal.api.enums.v1.EventType; +import io.temporal.api.history.v1.History; +import io.temporal.api.history.v1.HistoryEvent; +import io.temporal.client.WorkflowOptions; +import io.temporal.client.WorkflowStub; +import io.temporal.nexus.WorkflowClientOperationHandlers; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.*; +import io.temporal.workflow.shared.TestNexusServices; +import io.temporal.workflow.shared.TestWorkflows; +import java.time.Duration; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +public class WorkflowOperationLinkingTest extends BaseNexusTest { + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes(TestNexus.class, TestOperationWorkflow.class) + .setNexusServiceImplementation(new TestNexusServiceImpl()) + .build(); + + @Override + protected SDKTestWorkflowRule getTestWorkflowRule() { + return testWorkflowRule; + } + + @Test + public void testWorkflowOperationLinks() { + TestWorkflows.TestWorkflow1 workflowStub = + testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflows.TestWorkflow1.class); + String result = workflowStub.execute(testWorkflowRule.getTaskQueue()); + Assert.assertEquals("Hello from operation workflow " + testWorkflowRule.getTaskQueue(), result); + History history = + testWorkflowRule + .getWorkflowClient() + .fetchHistory(WorkflowStub.fromTyped(workflowStub).getExecution().getWorkflowId()) + .getHistory(); + // Assert that the operation started event has a link to the workflow execution started event + HistoryEvent nexusStartedEvent = + getEventOfType(history, EventType.EVENT_TYPE_NEXUS_OPERATION_STARTED); + Assert.assertEquals(1, nexusStartedEvent.getLinksCount()); + Assert.assertEquals( + EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED, + nexusStartedEvent.getLinks(0).getWorkflowEvent().getEventRef().getEventType()); + } + + public static class TestNexus implements TestWorkflows.TestWorkflow1 { + @Override + public String execute(String input) { + NexusOperationOptions options = + NexusOperationOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofSeconds(10)) + .build(); + NexusServiceOptions serviceOptions = + NexusServiceOptions.newBuilder() + .setEndpoint(getEndpointName()) + .setOperationOptions(options) + .build(); + TestNexusServices.TestNexusService1 serviceStub = + Workflow.newNexusServiceStub(TestNexusServices.TestNexusService1.class, serviceOptions); + // Start an asynchronous operation backed by a workflow + NexusOperationHandle asyncOpHandle = + Workflow.startNexusOperation(serviceStub::operation, input); + NexusOperationExecution asyncExec = asyncOpHandle.getExecution().get(); + // Signal the operation to unblock, this makes sure the operation doesn't complete before the + // operation + // started event is written to history + Workflow.newExternalWorkflowStub(OperationWorkflow.class, asyncExec.getOperationId().get()) + .unblock(); + return asyncOpHandle.getResult().get(); + } + } + + @WorkflowInterface + public interface OperationWorkflow { + @WorkflowMethod + String execute(String arg); + + @SignalMethod + void unblock(); + } + + public static class TestOperationWorkflow implements OperationWorkflow { + boolean unblocked = false; + + @Override + public String execute(String arg) { + Workflow.await(() -> unblocked); + return "Hello from operation workflow " + arg; + } + + @Override + public void unblock() { + unblocked = true; + } + } + + @ServiceImpl(service = TestNexusServices.TestNexusService1.class) + public class TestNexusServiceImpl { + @OperationImpl + public OperationHandler operation() { + return WorkflowClientOperationHandlers.fromWorkflowMethod( + (context, details, client, input) -> + client.newWorkflowStub( + AsyncWorkflowOperationTest.OperationWorkflow.class, + WorkflowOptions.newBuilder().setWorkflowId(details.getRequestId()).build()) + ::execute); + } + } +} diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/LinkConverter.java b/temporal-serviceclient/src/main/java/io/temporal/internal/common/LinkConverter.java similarity index 97% rename from temporal-test-server/src/main/java/io/temporal/internal/testservice/LinkConverter.java rename to temporal-serviceclient/src/main/java/io/temporal/internal/common/LinkConverter.java index fac2fe174..e9a75a5b5 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/LinkConverter.java +++ b/temporal-serviceclient/src/main/java/io/temporal/internal/common/LinkConverter.java @@ -18,7 +18,7 @@ * limitations under the License. */ -package io.temporal.internal.testservice; +package io.temporal.internal.common; import io.temporal.api.common.v1.Link; import io.temporal.api.enums.v1.EventType; @@ -32,7 +32,7 @@ public class LinkConverter { - private static final Logger log = LoggerFactory.getLogger(StateMachines.class); + private static final Logger log = LoggerFactory.getLogger(LinkConverter.class); private static final String linkPathFormat = "temporal:///namespaces/%s/workflows/%s/%s/history"; diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java index 6a36716fc..d878cd588 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java @@ -20,7 +20,7 @@ package io.temporal.internal.testservice; -import static io.temporal.internal.testservice.LinkConverter.*; +import static io.temporal.internal.common.LinkConverter.*; import static io.temporal.internal.testservice.StateMachines.Action.CANCEL; import static io.temporal.internal.testservice.StateMachines.Action.COMPLETE; import static io.temporal.internal.testservice.StateMachines.Action.CONTINUE_AS_NEW; diff --git a/temporal-test-server/src/test/java/io/temporal/internal/testservice/LinkConverterTest.java b/temporal-test-server/src/test/java/io/temporal/internal/common/LinkConverterTest.java similarity index 98% rename from temporal-test-server/src/test/java/io/temporal/internal/testservice/LinkConverterTest.java rename to temporal-test-server/src/test/java/io/temporal/internal/common/LinkConverterTest.java index 597b34ee9..46c99d244 100644 --- a/temporal-test-server/src/test/java/io/temporal/internal/testservice/LinkConverterTest.java +++ b/temporal-test-server/src/test/java/io/temporal/internal/common/LinkConverterTest.java @@ -18,10 +18,10 @@ * limitations under the License. */ -package io.temporal.internal.testservice; +package io.temporal.internal.common; -import static io.temporal.internal.testservice.LinkConverter.nexusLinkToWorkflowEvent; -import static io.temporal.internal.testservice.LinkConverter.workflowEventToNexusLink; +import static io.temporal.internal.common.LinkConverter.nexusLinkToWorkflowEvent; +import static io.temporal.internal.common.LinkConverter.workflowEventToNexusLink; import static org.junit.Assert.*; import io.temporal.api.common.v1.Link; diff --git a/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusWorkflowTest.java b/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusWorkflowTest.java index a98c8c76e..9b209c02b 100644 --- a/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusWorkflowTest.java +++ b/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusWorkflowTest.java @@ -38,7 +38,7 @@ import io.temporal.api.workflowservice.v1.*; import io.temporal.client.WorkflowOptions; import io.temporal.client.WorkflowStub; -import io.temporal.internal.testservice.LinkConverter; +import io.temporal.internal.common.LinkConverter; import io.temporal.internal.testservice.NexusTaskToken; import io.temporal.testing.internal.SDKTestWorkflowRule; import io.temporal.testserver.functional.common.TestWorkflows;