Skip to content

Commit

Permalink
Add Nexus operation link support
Browse files Browse the repository at this point in the history
  • Loading branch information
Quinn-With-Two-Ns committed Oct 11, 2024
1 parent 111f38f commit f626cb6
Show file tree
Hide file tree
Showing 12 changed files with 245 additions and 21 deletions.
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ plugins {

allprojects {
repositories {
mavenLocal()
mavenCentral()
}
}
Expand All @@ -32,7 +33,7 @@ ext {
// Platforms
grpcVersion = '1.54.1' // [1.38.0,) Needed for io.grpc.protobuf.services.HealthStatusManager
jacksonVersion = '2.14.2' // [2.9.0,)
nexusVersion = '0.1.0-alpha1'
nexusVersion = '0.2.0'
// we don't upgrade to 1.10.x because it requires kotlin 1.6. Users may use 1.10.x in their environments though.
micrometerVersion = project.hasProperty("edgeDepsTest") ? '1.10.5' : '1.9.9' // [1.0.0,)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

package io.temporal.internal.common;

import io.nexusrpc.Link;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;

public class NexusUtil {
Expand All @@ -42,5 +45,13 @@ public static Duration parseRequestTimeout(String timeout) {
}
}

public static Link nexusProtoLinkToLink(io.temporal.api.nexus.v1.Link nexusLink)
throws URISyntaxException {
return Link.newBuilder()
.setType(nexusLink.getType())
.setUri(new URI(nexusLink.getUrl()))
.build();
}

private NexusUtil() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@
import io.temporal.nexus.NexusOperationContext;

public class NexusOperationContextImpl implements NexusOperationContext {
private final String namespace;
private final String taskQueue;
private final WorkflowClient client;
private final Scope metricsScope;

public NexusOperationContextImpl(String taskQueue, WorkflowClient client, Scope metricsScope) {
public NexusOperationContextImpl(
String namespace, String taskQueue, WorkflowClient client, Scope metricsScope) {
this.namespace = namespace;
this.taskQueue = taskQueue;
this.client = client;
this.metricsScope = metricsScope;
Expand All @@ -47,4 +50,8 @@ public WorkflowClient getWorkflowClient() {
public String getTaskQueue() {
return taskQueue;
}

public String getNamespace() {
return namespace;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@

package io.temporal.internal.nexus;

import static io.temporal.internal.common.NexusUtil.nexusProtoLinkToLink;

import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.uber.m3.tally.Scope;
import io.nexusrpc.FailureInfo;
import io.nexusrpc.Header;
Expand All @@ -38,12 +39,17 @@
import io.temporal.internal.worker.NexusTaskHandler;
import io.temporal.internal.worker.ShutdownManager;
import io.temporal.worker.TypeAlreadyRegisteredException;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NexusTaskHandlerImpl implements NexusTaskHandler {
private static final Logger log = LoggerFactory.getLogger(NexusTaskHandlerImpl.class);
private final DataConverter dataConverter;
private final String namespace;
private final String taskQueue;
Expand Down Expand Up @@ -112,7 +118,7 @@ public Result handle(NexusTask task, Scope metricsScope) throws TimeoutException
}

CurrentNexusOperationContext.set(
new NexusOperationContextImpl(taskQueue, client, metricsScope));
new NexusOperationContextImpl(namespace, taskQueue, client, metricsScope));

switch (request.getVariantCase()) {
case START_OPERATION:
Expand Down Expand Up @@ -208,15 +214,27 @@ private void convertKnownFailures(Throwable failure) {
}

private StartOperationResponse handleStartOperation(
OperationContext.Builder ctx, StartOperationRequest task)
throws InvalidProtocolBufferException {
OperationContext.Builder ctx, StartOperationRequest task) {
ctx.setService(task.getService()).setOperation(task.getOperation());

OperationStartDetails.Builder operationStartDetails =
OperationStartDetails.newBuilder()
.setCallbackUrl(task.getCallback())
.setRequestId(task.getRequestId());
task.getCallbackHeaderMap().forEach(operationStartDetails::putCallbackHeader);
task.getLinksList()
.forEach(
link -> {
try {
operationStartDetails.addLink(nexusProtoLinkToLink(link));
} catch (URISyntaxException e) {
log.error("failed to parse link url: " + link.getUrl(), e);
throw new OperationHandlerException(
OperationHandlerException.ErrorType.BAD_REQUEST,
"Invalid link URL: " + link.getUrl(),
e);
}
});

HandlerInputContent.Builder input =
HandlerInputContent.newBuilder().setDataStream(task.getPayload().toByteString().newInput());
Expand All @@ -234,6 +252,15 @@ private StartOperationResponse handleStartOperation(
startResponseBuilder.setAsyncSuccess(
StartOperationResponse.Async.newBuilder()
.setOperationId(result.getAsyncOperationId())
.addAllLinks(
result.getLinks().stream()
.map(
link ->
io.temporal.api.nexus.v1.Link.newBuilder()
.setType(link.getType())
.setUrl(link.getUri().toString())
.build())
.collect(Collectors.toList()))
.build());
}
} catch (OperationUnsuccessfulException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

package io.temporal.internal.worker;

import static io.temporal.internal.common.GrpcUtils.isChannelShutdownException;

import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
Expand Down Expand Up @@ -134,13 +136,18 @@ public CompletableFuture<Void> waitOnWorkerShutdownRequest(
() -> {
try {
shutdownRequest.get();
} catch (StatusRuntimeException e) {
// If the server does not support shutdown, ignore the exception
if (Status.Code.UNIMPLEMENTED.equals(e.getStatus().getCode())) {
return;
}
log.warn("failed to call shutdown worker", e);
} catch (Exception e) {
if (e instanceof ExecutionException) {
e = (Exception) e.getCause();
}
if (e instanceof StatusRuntimeException) {
// If the server does not support shutdown, ignore the exception
if (Status.Code.UNIMPLEMENTED.equals(
((StatusRuntimeException) e).getStatus().getCode())
|| isChannelShutdownException((StatusRuntimeException) e)) {
return;
}
}
log.warn("failed to call shutdown worker", e);
} finally {
future.complete(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

/**
* Context object passed to a Nexus operation implementation. Use {@link
* Nexus#getExecutionContext()} from a Nexus Operation implementation to access.
* Nexus#getOperationContext()} from a Nexus Operation implementation to access.
*/
@Experimental
public interface NexusOperationContext {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,20 @@

package io.temporal.nexus;

import static io.temporal.internal.common.LinkConverter.workflowEventToNexusLink;
import static io.temporal.internal.common.NexusUtil.nexusProtoLinkToLink;

import io.nexusrpc.OperationInfo;
import io.nexusrpc.handler.*;
import io.nexusrpc.handler.OperationHandler;
import io.temporal.api.common.v1.Link;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.enums.v1.EventType;
import io.temporal.client.WorkflowClient;
import io.temporal.internal.client.NexusStartWorkflowRequest;
import io.temporal.internal.nexus.CurrentNexusOperationContext;
import io.temporal.internal.nexus.NexusOperationContextImpl;
import java.net.URISyntaxException;

class RunWorkflowOperation<T, R> implements OperationHandler<T, R> {
private final WorkflowHandleFactory<T, R> handleFactory;
Expand All @@ -49,7 +56,32 @@ public OperationStartResult<R> start(
operationStartDetails.getCallbackUrl(),
operationStartDetails.getCallbackHeaders(),
nexusCtx.getTaskQueue());
return OperationStartResult.async(handle.getInvoker().invoke(nexusRequest).getWorkflowId());

WorkflowExecution workflowExec = handle.getInvoker().invoke(nexusRequest);

// Create the link information about the new workflow and return to the caller.
Link.WorkflowEvent workflowEventLink =
Link.WorkflowEvent.newBuilder()
.setNamespace(nexusCtx.getNamespace())
.setWorkflowId(workflowExec.getWorkflowId())
.setRunId(workflowExec.getRunId())
.setEventRef(
Link.WorkflowEvent.EventReference.newBuilder()
.setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED))
.build();
io.temporal.api.nexus.v1.Link nexusLink = workflowEventToNexusLink(workflowEventLink);
try {
OperationStartResult.Builder<R> result =
OperationStartResult.<R>newBuilder().setAsyncOperationId(workflowExec.getWorkflowId());
if (nexusLink != null) {
result.addLink(nexusProtoLinkToLink(nexusLink));
}
return result.build();
} catch (URISyntaxException e) {
// Not expected as the link is constructed by the SDK.
throw new OperationHandlerException(
OperationHandlerException.ErrorType.BAD_REQUEST, "failed to construct result URL", e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.workflow.nexus;

import static io.temporal.internal.common.WorkflowExecutionUtils.getEventOfType;

import io.nexusrpc.handler.OperationHandler;
import io.nexusrpc.handler.OperationImpl;
import io.nexusrpc.handler.ServiceImpl;
import io.temporal.api.enums.v1.EventType;
import io.temporal.api.history.v1.History;
import io.temporal.api.history.v1.HistoryEvent;
import io.temporal.client.WorkflowOptions;
import io.temporal.client.WorkflowStub;
import io.temporal.nexus.WorkflowClientOperationHandlers;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.workflow.*;
import io.temporal.workflow.shared.TestNexusServices;
import io.temporal.workflow.shared.TestWorkflows;
import java.time.Duration;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;

public class WorkflowOperationLinkingTest extends BaseNexusTest {
@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder()
.setWorkflowTypes(TestNexus.class, TestOperationWorkflow.class)
.setNexusServiceImplementation(new TestNexusServiceImpl())
.build();

@Override
protected SDKTestWorkflowRule getTestWorkflowRule() {
return testWorkflowRule;
}

@Test
public void testWorkflowOperationLinks() {
TestWorkflows.TestWorkflow1 workflowStub =
testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflows.TestWorkflow1.class);
String result = workflowStub.execute(testWorkflowRule.getTaskQueue());
Assert.assertEquals("Hello from operation workflow " + testWorkflowRule.getTaskQueue(), result);
History history =
testWorkflowRule
.getWorkflowClient()
.fetchHistory(WorkflowStub.fromTyped(workflowStub).getExecution().getWorkflowId())
.getHistory();
// Assert that the operation started event has a link to the workflow execution started event
HistoryEvent nexusStartedEvent =
getEventOfType(history, EventType.EVENT_TYPE_NEXUS_OPERATION_STARTED);
Assert.assertEquals(1, nexusStartedEvent.getLinksCount());
Assert.assertEquals(
EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED,
nexusStartedEvent.getLinks(0).getWorkflowEvent().getEventRef().getEventType());
}

public static class TestNexus implements TestWorkflows.TestWorkflow1 {
@Override
public String execute(String input) {
NexusOperationOptions options =
NexusOperationOptions.newBuilder()
.setScheduleToCloseTimeout(Duration.ofSeconds(10))
.build();
NexusServiceOptions serviceOptions =
NexusServiceOptions.newBuilder()
.setEndpoint(getEndpointName())
.setOperationOptions(options)
.build();
TestNexusServices.TestNexusService1 serviceStub =
Workflow.newNexusServiceStub(TestNexusServices.TestNexusService1.class, serviceOptions);
// Start an asynchronous operation backed by a workflow
NexusOperationHandle<String> asyncOpHandle =
Workflow.startNexusOperation(serviceStub::operation, input);
NexusOperationExecution asyncExec = asyncOpHandle.getExecution().get();
// Signal the operation to unblock, this makes sure the operation doesn't complete before the
// operation
// started event is written to history
Workflow.newExternalWorkflowStub(OperationWorkflow.class, asyncExec.getOperationId().get())
.unblock();
return asyncOpHandle.getResult().get();
}
}

@WorkflowInterface
public interface OperationWorkflow {
@WorkflowMethod
String execute(String arg);

@SignalMethod
void unblock();
}

public static class TestOperationWorkflow implements OperationWorkflow {
boolean unblocked = false;

@Override
public String execute(String arg) {
Workflow.await(() -> unblocked);
return "Hello from operation workflow " + arg;
}

@Override
public void unblock() {
unblocked = true;
}
}

@ServiceImpl(service = TestNexusServices.TestNexusService1.class)
public class TestNexusServiceImpl {
@OperationImpl
public OperationHandler<String, String> operation() {
return WorkflowClientOperationHandlers.fromWorkflowMethod(
(context, details, client, input) ->
client.newWorkflowStub(
AsyncWorkflowOperationTest.OperationWorkflow.class,
WorkflowOptions.newBuilder().setWorkflowId(details.getRequestId()).build())
::execute);
}
}
}
Loading

0 comments on commit f626cb6

Please sign in to comment.