Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nexus Interceptors #2244

Draft
wants to merge 10 commits into
base: nexus
Choose a base branch
from
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ ext {
// Platforms
grpcVersion = '1.54.1' // [1.38.0,) Needed for io.grpc.protobuf.services.HealthStatusManager
jacksonVersion = '2.14.2' // [2.9.0,)
nexusVersion = '0.1.0-alpha1'
// we don't upgrade to 1.10.x because it requires kotlin 1.6. Users may use 1.10.x in their environments though.
micrometerVersion = project.hasProperty("edgeDepsTest") ? '1.10.5' : '1.9.9' // [1.0.0,)

Expand Down
2 changes: 2 additions & 0 deletions docker/github/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ services:
- "6934:6934"
- "6935:6935"
- "6939:6939"
- "7243:7243"
environment:
- "CASSANDRA_SEEDS=cassandra"
- "ENABLE_ES=true"
- "ES_SEEDS=elasticsearch"
- "DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/development.yaml"
- "FRONTEND_HTTP_PORT=7243"
depends_on:
- cassandra
- elasticsearch
Expand Down
10 changes: 9 additions & 1 deletion docker/github/dynamicconfig/development.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,12 @@ history.MaxBufferedQueryCount:
worker.buildIdScavengerEnabled:
- value: true
worker.removableBuildIdDurationSinceDefault:
- value: 1
- value: 1
system.enableNexus:
- value: true
component.nexusoperations.callback.endpoint.template:
- value: http://localhost:7243/namespaces/{{.NamespaceName}}/nexus/callback
component.callbacks.allowedAddresses:
- value:
- Pattern: "localhost:7243"
AllowInsecure: true
1 change: 1 addition & 0 deletions temporal-kotlin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ dependencies {

// this module shouldn't carry temporal-sdk with it, especially for situations when users may be using a shaded artifact
compileOnly project(':temporal-sdk')
implementation "io.nexusrpc:nexus-sdk:$nexusVersion"

implementation "org.jetbrains.kotlin:kotlin-reflect"

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.workflow

import io.temporal.kotlin.TemporalDsl

/**
* @see NexusOperationOptions
*/
inline fun NexusOperationOptions(
options: @TemporalDsl NexusOperationOptions.Builder.() -> Unit
): NexusOperationOptions {
return NexusOperationOptions.newBuilder().apply(options).build()
}

/**
* Create a new instance of [NexusOperationOptions], optionally overriding some of its properties.
*/
inline fun NexusOperationOptions.copy(
overrides: @TemporalDsl NexusOperationOptions.Builder.() -> Unit
): NexusOperationOptions {
return toBuilder().apply(overrides).build()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.workflow

import io.temporal.kotlin.TemporalDsl

/**
* @see NexusServiceOptions
*/
inline fun NexusServiceOptions(
options: @TemporalDsl NexusServiceOptions.Builder.() -> Unit
): NexusServiceOptions {
return NexusServiceOptions.newBuilder().apply(options).build()
}

/**
* Create a new instance of [NexusServiceOptions], optionally overriding some of its properties.
*/
inline fun NexusServiceOptions.copy(
overrides: @TemporalDsl NexusServiceOptions.Builder.() -> Unit
): NexusServiceOptions {
return toBuilder().apply(overrides).build()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.nexus

import io.temporal.workflow.NexusOperationOptions
import org.junit.Assert.assertEquals
import org.junit.Test
import java.time.Duration

class NexusOperationOptionsExtTest {

@Test
fun `OperationOptions DSL should be equivalent to builder`() {
val dslOperationOptions = NexusOperationOptions {
setScheduleToCloseTimeout(Duration.ofMinutes(1))
}

val builderOperationOptions = NexusOperationOptions.newBuilder()
.setScheduleToCloseTimeout(Duration.ofMinutes(1))
.build()

assertEquals(builderOperationOptions, dslOperationOptions)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.nexus

import io.temporal.workflow.NexusOperationOptions
import io.temporal.workflow.NexusServiceOptions
import org.junit.Assert.assertEquals
import org.junit.Test
import java.time.Duration

class NexusServiceOptionsExtTest {

@Test
fun `ServiceOptions DSL should be equivalent to builder`() {
val dslServiceOptions = NexusServiceOptions {
setEndpoint("TestEndpoint")
setOperationOptions(
NexusOperationOptions {
setScheduleToCloseTimeout(Duration.ofMinutes(1))
}
)
setOperationMethodOptions(
mapOf(
"test" to NexusOperationOptions {
setScheduleToCloseTimeout(Duration.ofMinutes(2))
}
)
)
}

val builderServiceOptions = NexusServiceOptions.newBuilder()
.setEndpoint("TestEndpoint")
.setOperationOptions(
NexusOperationOptions.newBuilder()
.setScheduleToCloseTimeout(Duration.ofMinutes(1))
.build()
)
.setOperationMethodOptions(
mapOf(
"test" to NexusOperationOptions.newBuilder()
.setScheduleToCloseTimeout(Duration.ofMinutes(2))
.build()
)
)
.build()

assertEquals(builderServiceOptions, dslServiceOptions)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.workflow

import io.nexusrpc.Operation
import io.nexusrpc.Service
import io.nexusrpc.handler.OperationContext
import io.nexusrpc.handler.OperationHandler
import io.nexusrpc.handler.OperationImpl
import io.nexusrpc.handler.OperationStartDetails
import io.nexusrpc.handler.ServiceImpl
import io.nexusrpc.handler.SynchronousOperationFunction
import io.temporal.client.WorkflowClientOptions
import io.temporal.client.WorkflowOptions
import io.temporal.common.converter.DefaultDataConverter
import io.temporal.common.converter.JacksonJsonPayloadConverter
import io.temporal.common.converter.KotlinObjectMapperFactory
import io.temporal.internal.async.FunctionWrappingUtil
import io.temporal.internal.sync.AsyncInternal
import io.temporal.testing.internal.SDKTestWorkflowRule
import org.junit.Assert.assertTrue
import org.junit.Rule
import org.junit.Test
import java.time.Duration

class KotlinAsyncNexusTest {

@Rule
@JvmField
var testWorkflowRule: SDKTestWorkflowRule = SDKTestWorkflowRule.newBuilder()
.setWorkflowTypes(WorkflowImpl::class.java)
.setNexusServiceImplementation(TestNexusServiceImpl())
.setWorkflowClientOptions(
WorkflowClientOptions.newBuilder()
.setDataConverter(DefaultDataConverter(JacksonJsonPayloadConverter(KotlinObjectMapperFactory.new())))
.build()
)
.build()

@Service
interface TestNexusService {
@Operation
fun operation(): String?
}

@ServiceImpl(service = TestNexusService::class)
class TestNexusServiceImpl {
@OperationImpl
fun operation(): OperationHandler<Void, String> {
// Implemented inline
return OperationHandler.sync<Void, String>(
SynchronousOperationFunction<Void, String> { ctx: OperationContext, details: OperationStartDetails, _: Void? -> "Hello Kotlin" }
)
}
}

@WorkflowInterface
interface TestWorkflow {
@WorkflowMethod
fun execute()
}

class WorkflowImpl : TestWorkflow {
override fun execute() {
val nexusService = Workflow.newNexusServiceStub(
TestNexusService::class.java,
NexusServiceOptions {
setOperationOptions(
NexusOperationOptions {
setScheduleToCloseTimeout(Duration.ofSeconds(10))
}
)
}
)
assertTrue(
"This has to be true to make Async.function(nexusService::operation) work correctly as expected",
AsyncInternal.isAsync(nexusService::operation)
)
assertTrue(
"This has to be true to make Async.function(nexusService::operation) work correctly as expected",
AsyncInternal.isAsync(FunctionWrappingUtil.temporalJavaFunctionalWrapper(nexusService::operation))
)
Async.function(nexusService::operation).get()
}
}

@Test
fun asyncNexusWorkflowTest() {
val client = testWorkflowRule.workflowClient
val options = WorkflowOptions.newBuilder().setTaskQueue(testWorkflowRule.taskQueue).build()
val workflowStub = client.newWorkflowStub(TestWorkflow::class.java, options)
workflowStub.execute()
}
}
1 change: 1 addition & 0 deletions temporal-opentracing/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ ext {
dependencies {
// this module shouldn't carry temporal-sdk with it, especially for situations when users may be using a shaded artifact
compileOnly project(':temporal-sdk')
implementation "io.nexusrpc:nexus-sdk:$nexusVersion"
api group: 'io.opentracing', name: 'opentracing-api', version: "$opentracingVersion"

implementation "com.google.guava:guava:$guavaVersion"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@
package io.temporal.opentracing;

import io.temporal.common.interceptors.*;
import io.temporal.opentracing.internal.ContextAccessor;
import io.temporal.opentracing.internal.OpenTracingActivityInboundCallsInterceptor;
import io.temporal.opentracing.internal.OpenTracingWorkflowInboundCallsInterceptor;
import io.temporal.opentracing.internal.SpanFactory;
import io.temporal.opentracing.internal.*;

public class OpenTracingWorkerInterceptor implements WorkerInterceptor {
private final OpenTracingOptions options;
Expand Down Expand Up @@ -52,4 +49,9 @@ public ActivityInboundCallsInterceptor interceptActivity(ActivityInboundCallsInt
return new OpenTracingActivityInboundCallsInterceptor(
next, options, spanFactory, contextAccessor);
}

@Override
public NexusOperationInboundCallsInterceptor interceptNexusOperation(NexusOperationInboundCallsInterceptor next) {
return new OpenTracingNexusOperationInboundCallsInterceptor(next, options, spanFactory, contextAccessor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ public enum SpanOperationType {
UPDATE_WORKFLOW("UpdateWorkflow"),
HANDLE_QUERY("HandleQuery"),
HANDLE_SIGNAL("HandleSignal"),
HANDLE_UPDATE("HandleUpdate");
HANDLE_UPDATE("HandleUpdate"),
EXECUTE_NEXUS_OPERATION("ExecuteNexusOperation"),
START_NEXUS_OPERATION("StartNexusOperation"),
CANCEL_NEXUS_OPERATION("CancelNexusOperation");

private final String defaultPrefix;

Expand Down
Loading
Loading