Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Nexus operation link support #2266

Open
wants to merge 2 commits into
base: nexus
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ ext {
// Platforms
grpcVersion = '1.54.1' // [1.38.0,) Needed for io.grpc.protobuf.services.HealthStatusManager
jacksonVersion = '2.14.2' // [2.9.0,)
nexusVersion = '0.1.0-alpha1'
nexusVersion = '0.2.0-alpha'
// we don't upgrade to 1.10.x because it requires kotlin 1.6. Users may use 1.10.x in their environments though.
micrometerVersion = project.hasProperty("edgeDepsTest") ? '1.10.5' : '1.9.9' // [1.0.0,)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

package io.temporal.internal.common;

import io.nexusrpc.Link;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;

public class NexusUtil {
Expand All @@ -42,5 +45,13 @@ public static Duration parseRequestTimeout(String timeout) {
}
}

public static Link nexusProtoLinkToLink(io.temporal.api.nexus.v1.Link nexusLink)
throws URISyntaxException {
return Link.newBuilder()
.setType(nexusLink.getType())
.setUri(new URI(nexusLink.getUrl()))
.build();
}

private NexusUtil() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@
import io.temporal.nexus.NexusOperationContext;

public class NexusOperationContextImpl implements NexusOperationContext {
private final String namespace;
private final String taskQueue;
private final WorkflowClient client;
private final Scope metricsScope;

public NexusOperationContextImpl(String taskQueue, WorkflowClient client, Scope metricsScope) {
public NexusOperationContextImpl(
String namespace, String taskQueue, WorkflowClient client, Scope metricsScope) {
this.namespace = namespace;
this.taskQueue = taskQueue;
this.client = client;
this.metricsScope = metricsScope;
Expand All @@ -47,4 +50,8 @@ public WorkflowClient getWorkflowClient() {
public String getTaskQueue() {
return taskQueue;
}

public String getNamespace() {
return namespace;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@

package io.temporal.internal.nexus;

import static io.temporal.internal.common.NexusUtil.nexusProtoLinkToLink;

import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.uber.m3.tally.Scope;
import io.nexusrpc.FailureInfo;
import io.nexusrpc.Header;
Expand All @@ -38,12 +39,17 @@
import io.temporal.internal.worker.NexusTaskHandler;
import io.temporal.internal.worker.ShutdownManager;
import io.temporal.worker.TypeAlreadyRegisteredException;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NexusTaskHandlerImpl implements NexusTaskHandler {
private static final Logger log = LoggerFactory.getLogger(NexusTaskHandlerImpl.class);
private final DataConverter dataConverter;
private final String namespace;
private final String taskQueue;
Expand Down Expand Up @@ -112,7 +118,7 @@ public Result handle(NexusTask task, Scope metricsScope) throws TimeoutException
}

CurrentNexusOperationContext.set(
new NexusOperationContextImpl(taskQueue, client, metricsScope));
new NexusOperationContextImpl(namespace, taskQueue, client, metricsScope));

switch (request.getVariantCase()) {
case START_OPERATION:
Expand Down Expand Up @@ -208,15 +214,27 @@ private void convertKnownFailures(Throwable failure) {
}

private StartOperationResponse handleStartOperation(
OperationContext.Builder ctx, StartOperationRequest task)
throws InvalidProtocolBufferException {
OperationContext.Builder ctx, StartOperationRequest task) {
ctx.setService(task.getService()).setOperation(task.getOperation());

OperationStartDetails.Builder operationStartDetails =
OperationStartDetails.newBuilder()
.setCallbackUrl(task.getCallback())
.setRequestId(task.getRequestId());
task.getCallbackHeaderMap().forEach(operationStartDetails::putCallbackHeader);
task.getLinksList()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bergundy is it fair to say their is no way to set this right now?

.forEach(
link -> {
try {
operationStartDetails.addLink(nexusProtoLinkToLink(link));
} catch (URISyntaxException e) {
log.error("failed to parse link url: " + link.getUrl(), e);
throw new OperationHandlerException(
OperationHandlerException.ErrorType.BAD_REQUEST,
"Invalid link URL: " + link.getUrl(),
e);
}
});

HandlerInputContent.Builder input =
HandlerInputContent.newBuilder().setDataStream(task.getPayload().toByteString().newInput());
Expand All @@ -234,6 +252,15 @@ private StartOperationResponse handleStartOperation(
startResponseBuilder.setAsyncSuccess(
StartOperationResponse.Async.newBuilder()
.setOperationId(result.getAsyncOperationId())
.addAllLinks(
result.getLinks().stream()
.map(
link ->
io.temporal.api.nexus.v1.Link.newBuilder()
.setType(link.getType())
.setUrl(link.getUri().toString())
.build())
.collect(Collectors.toList()))
.build());
}
} catch (OperationUnsuccessfulException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

package io.temporal.internal.worker;

import static io.temporal.internal.common.GrpcUtils.isChannelShutdownException;

import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
Expand Down Expand Up @@ -134,13 +136,18 @@ public CompletableFuture<Void> waitOnWorkerShutdownRequest(
() -> {
try {
shutdownRequest.get();
} catch (StatusRuntimeException e) {
// If the server does not support shutdown, ignore the exception
if (Status.Code.UNIMPLEMENTED.equals(e.getStatus().getCode())) {
return;
}
log.warn("failed to call shutdown worker", e);
} catch (Exception e) {
if (e instanceof ExecutionException) {
e = (Exception) e.getCause();
}
if (e instanceof StatusRuntimeException) {
// If the server does not support shutdown, ignore the exception
if (Status.Code.UNIMPLEMENTED.equals(
((StatusRuntimeException) e).getStatus().getCode())
|| isChannelShutdownException((StatusRuntimeException) e)) {
return;
}
}
log.warn("failed to call shutdown worker", e);
} finally {
future.complete(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

/**
* Context object passed to a Nexus operation implementation. Use {@link
* Nexus#getExecutionContext()} from a Nexus Operation implementation to access.
* Nexus#getOperationContext()} from a Nexus Operation implementation to access.
*/
@Experimental
public interface NexusOperationContext {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,20 @@

package io.temporal.nexus;

import static io.temporal.internal.common.LinkConverter.workflowEventToNexusLink;
import static io.temporal.internal.common.NexusUtil.nexusProtoLinkToLink;

import io.nexusrpc.OperationInfo;
import io.nexusrpc.handler.*;
import io.nexusrpc.handler.OperationHandler;
import io.temporal.api.common.v1.Link;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.enums.v1.EventType;
import io.temporal.client.WorkflowClient;
import io.temporal.internal.client.NexusStartWorkflowRequest;
import io.temporal.internal.nexus.CurrentNexusOperationContext;
import io.temporal.internal.nexus.NexusOperationContextImpl;
import java.net.URISyntaxException;

class RunWorkflowOperation<T, R> implements OperationHandler<T, R> {
private final WorkflowHandleFactory<T, R> handleFactory;
Expand All @@ -49,7 +56,32 @@ public OperationStartResult<R> start(
operationStartDetails.getCallbackUrl(),
operationStartDetails.getCallbackHeaders(),
nexusCtx.getTaskQueue());
return OperationStartResult.async(handle.getInvoker().invoke(nexusRequest).getWorkflowId());

WorkflowExecution workflowExec = handle.getInvoker().invoke(nexusRequest);

// Create the link information about the new workflow and return to the caller.
Link.WorkflowEvent workflowEventLink =
Link.WorkflowEvent.newBuilder()
.setNamespace(nexusCtx.getNamespace())
.setWorkflowId(workflowExec.getWorkflowId())
.setRunId(workflowExec.getRunId())
.setEventRef(
Link.WorkflowEvent.EventReference.newBuilder()
.setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED))
.build();
io.temporal.api.nexus.v1.Link nexusLink = workflowEventToNexusLink(workflowEventLink);
try {
OperationStartResult.Builder<R> result =
OperationStartResult.<R>newBuilder().setAsyncOperationId(workflowExec.getWorkflowId());
if (nexusLink != null) {
result.addLink(nexusProtoLinkToLink(nexusLink));
}
return result.build();
} catch (URISyntaxException e) {
// Not expected as the link is constructed by the SDK.
throw new OperationHandlerException(
OperationHandlerException.ErrorType.BAD_REQUEST, "failed to construct result URL", e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.workflow.nexus;

import static io.temporal.internal.common.WorkflowExecutionUtils.getEventOfType;

import io.nexusrpc.handler.OperationHandler;
import io.nexusrpc.handler.OperationImpl;
import io.nexusrpc.handler.ServiceImpl;
import io.temporal.api.enums.v1.EventType;
import io.temporal.api.history.v1.History;
import io.temporal.api.history.v1.HistoryEvent;
import io.temporal.client.WorkflowOptions;
import io.temporal.client.WorkflowStub;
import io.temporal.nexus.WorkflowClientOperationHandlers;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.workflow.*;
import io.temporal.workflow.shared.TestNexusServices;
import io.temporal.workflow.shared.TestWorkflows;
import java.time.Duration;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;

public class WorkflowOperationLinkingTest extends BaseNexusTest {
@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder()
.setWorkflowTypes(TestNexus.class, TestOperationWorkflow.class)
.setNexusServiceImplementation(new TestNexusServiceImpl())
.build();

@Override
protected SDKTestWorkflowRule getTestWorkflowRule() {
return testWorkflowRule;
}

@Test
public void testWorkflowOperationLinks() {
TestWorkflows.TestWorkflow1 workflowStub =
testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflows.TestWorkflow1.class);
String result = workflowStub.execute(testWorkflowRule.getTaskQueue());
Assert.assertEquals("Hello from operation workflow " + testWorkflowRule.getTaskQueue(), result);
History history =
testWorkflowRule
.getWorkflowClient()
.fetchHistory(WorkflowStub.fromTyped(workflowStub).getExecution().getWorkflowId())
.getHistory();
// Assert that the operation started event has a link to the workflow execution started event
HistoryEvent nexusStartedEvent =
getEventOfType(history, EventType.EVENT_TYPE_NEXUS_OPERATION_STARTED);
Assert.assertEquals(1, nexusStartedEvent.getLinksCount());
Assert.assertEquals(
EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED,
nexusStartedEvent.getLinks(0).getWorkflowEvent().getEventRef().getEventType());
}

public static class TestNexus implements TestWorkflows.TestWorkflow1 {
@Override
public String execute(String input) {
NexusOperationOptions options =
NexusOperationOptions.newBuilder()
.setScheduleToCloseTimeout(Duration.ofSeconds(10))
.build();
NexusServiceOptions serviceOptions =
NexusServiceOptions.newBuilder()
.setEndpoint(getEndpointName())
.setOperationOptions(options)
.build();
TestNexusServices.TestNexusService1 serviceStub =
Workflow.newNexusServiceStub(TestNexusServices.TestNexusService1.class, serviceOptions);
// Start an asynchronous operation backed by a workflow
NexusOperationHandle<String> asyncOpHandle =
Workflow.startNexusOperation(serviceStub::operation, input);
NexusOperationExecution asyncExec = asyncOpHandle.getExecution().get();
// Signal the operation to unblock, this makes sure the operation doesn't complete before the
// operation
// started event is written to history
Workflow.newExternalWorkflowStub(OperationWorkflow.class, asyncExec.getOperationId().get())
.unblock();
return asyncOpHandle.getResult().get();
}
}

@WorkflowInterface
public interface OperationWorkflow {
@WorkflowMethod
String execute(String arg);

@SignalMethod
void unblock();
}

public static class TestOperationWorkflow implements OperationWorkflow {
boolean unblocked = false;

@Override
public String execute(String arg) {
Workflow.await(() -> unblocked);
return "Hello from operation workflow " + arg;
}

@Override
public void unblock() {
unblocked = true;
}
}

@ServiceImpl(service = TestNexusServices.TestNexusService1.class)
public class TestNexusServiceImpl {
@OperationImpl
public OperationHandler<String, String> operation() {
return WorkflowClientOperationHandlers.fromWorkflowMethod(
(context, details, client, input) ->
client.newWorkflowStub(
AsyncWorkflowOperationTest.OperationWorkflow.class,
WorkflowOptions.newBuilder().setWorkflowId(details.getRequestId()).build())
::execute);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
* limitations under the License.
*/

package io.temporal.internal.testservice;
package io.temporal.internal.common;

import io.temporal.api.common.v1.Link;
import io.temporal.api.enums.v1.EventType;
Expand All @@ -32,7 +32,7 @@

public class LinkConverter {

private static final Logger log = LoggerFactory.getLogger(StateMachines.class);
private static final Logger log = LoggerFactory.getLogger(LinkConverter.class);

private static final String linkPathFormat = "temporal:///namespaces/%s/workflows/%s/%s/history";

Expand Down
Loading
Loading