Skip to content

Commit

Permalink
Fix Nexus test env to respect ScheduleToCloseTimeout (#1636)
Browse files Browse the repository at this point in the history
* Fix Nexus test env to respect ScheduleToCloseTimeout

* address comments
  • Loading branch information
rodrigozhou authored Oct 1, 2024
1 parent cf3153e commit f0ac2ee
Show file tree
Hide file tree
Showing 2 changed files with 224 additions and 26 deletions.
92 changes: 66 additions & 26 deletions internal/internal_workflow_testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -2351,7 +2351,11 @@ func (env *testWorkflowEnvironmentImpl) newTestNexusTaskHandler() *nexusTaskHand
)
}

func (env *testWorkflowEnvironmentImpl) ExecuteNexusOperation(params executeNexusOperationParams, callback func(*commonpb.Payload, error), startedHandler func(opID string, e error)) int64 {
func (env *testWorkflowEnvironmentImpl) ExecuteNexusOperation(
params executeNexusOperationParams,
callback func(*commonpb.Payload, error),
startedHandler func(opID string, e error),
) int64 {
seq := env.nextID()
taskHandler := env.newTestNexusTaskHandler()
handle := &testNexusOperationHandle{
Expand All @@ -2363,6 +2367,37 @@ func (env *testWorkflowEnvironmentImpl) ExecuteNexusOperation(params executeNexu
}
env.runningNexusOperations[seq] = handle

var opID string
if params.options.ScheduleToCloseTimeout > 0 {
// Timer to fail the nexus operation due to schedule to close timeout.
env.NewTimer(
params.options.ScheduleToCloseTimeout,
TimerOptions{},
func(result *commonpb.Payloads, err error) {
timeoutErr := env.failureConverter.FailureToError(nexusOperationFailure(
params,
opID,
&failurepb.Failure{
Message: "operation timed out",
FailureInfo: &failurepb.Failure_TimeoutFailureInfo{
TimeoutFailureInfo: &failurepb.TimeoutFailureInfo{
TimeoutType: enumspb.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE,
},
},
},
))
env.postCallback(func() {
// For async operation, there are two scenarios:
// 1. operation already started: the callback has already been called with the operation id,
// and calling again is no-op;
// 2. operation didn't start yet: there's no operation id to set.
handle.startedCallback("", timeoutErr)
handle.completedCallback(nil, timeoutErr)
}, true)
},
)
}

task := handle.newStartTask()
env.runningCount++
go func() {
Expand All @@ -2385,31 +2420,32 @@ func (env *testWorkflowEnvironmentImpl) ExecuteNexusOperation(params executeNexu
handle.completedCallback(nil, err)
}, true)
return
} else {
switch v := response.GetResponse().GetStartOperation().GetVariant().(type) {
case *nexuspb.StartOperationResponse_SyncSuccess:
env.postCallback(func() {
handle.startedCallback("", nil)
handle.completedCallback(v.SyncSuccess.GetPayload(), nil)
}, true)
case *nexuspb.StartOperationResponse_AsyncSuccess:
env.postCallback(func() {
handle.startedCallback(v.AsyncSuccess.GetOperationId(), nil)
if handle.cancelRequested {
handle.cancel()
}
}, true)
case *nexuspb.StartOperationResponse_OperationError:
err := env.failureConverter.FailureToError(
nexusOperationFailure(params, "", unsuccessfulOperationErrorToTemporalFailure(v.OperationError)),
)
env.postCallback(func() {
handle.startedCallback("", err)
handle.completedCallback(nil, err)
}, true)
default:
panic(fmt.Errorf("unknown response variant: %v", v))
}
}

switch v := response.GetResponse().GetStartOperation().GetVariant().(type) {
case *nexuspb.StartOperationResponse_SyncSuccess:
env.postCallback(func() {
handle.startedCallback("", nil)
handle.completedCallback(v.SyncSuccess.GetPayload(), nil)
}, true)
case *nexuspb.StartOperationResponse_AsyncSuccess:
env.postCallback(func() {
opID = v.AsyncSuccess.GetOperationId()
handle.startedCallback(v.AsyncSuccess.GetOperationId(), nil)
if handle.cancelRequested {
handle.cancel()
}
}, true)
case *nexuspb.StartOperationResponse_OperationError:
err := env.failureConverter.FailureToError(
nexusOperationFailure(params, "", unsuccessfulOperationErrorToTemporalFailure(v.OperationError)),
)
env.postCallback(func() {
handle.startedCallback("", err)
handle.completedCallback(nil, err)
}, true)
default:
panic(fmt.Errorf("unknown response variant: %v", v))
}
}()
return seq
Expand Down Expand Up @@ -2887,6 +2923,10 @@ func (h *testNexusOperationHandle) completedCallback(result *commonpb.Payload, e
// startedCallback is a callback registered to handle operation start.
// Must be called in a postCallback block.
func (h *testNexusOperationHandle) startedCallback(opID string, e error) {
if h.started {
// Ignore duplciate starts.
return
}
h.operationID = opID
h.started = true
h.onStarted(opID, e)
Expand Down
158 changes: 158 additions & 0 deletions test/nexus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -897,6 +897,93 @@ func TestWorkflowTestSuite_WorkflowRunOperation(t *testing.T) {
})
}

func TestWorkflowTestSuite_WorkflowRunOperation_ScheduleToCloseTimeout(t *testing.T) {
handlerSleepDuration := 500 * time.Millisecond
handlerWF := func(ctx workflow.Context, _ nexus.NoValue) (nexus.NoValue, error) {
return nil, workflow.Sleep(ctx, handlerSleepDuration)
}

opSleepDuration := 250 * time.Millisecond
op := temporalnexus.NewWorkflowRunOperation(
"op",
handlerWF,
func(ctx context.Context, _ nexus.NoValue, opts nexus.StartOperationOptions) (client.StartWorkflowOptions, error) {
time.Sleep(opSleepDuration)
return client.StartWorkflowOptions{ID: opts.RequestID}, nil
})

callerWF := func(ctx workflow.Context, scheduleToCloseTimeout time.Duration) error {
client := workflow.NewNexusClient("endpoint", "test")
fut := client.ExecuteOperation(ctx, op, nil, workflow.NexusOperationOptions{
ScheduleToCloseTimeout: scheduleToCloseTimeout,
})
var exec workflow.NexusOperationExecution
if err := fut.GetNexusOperationExecution().Get(ctx, &exec); err != nil {
return err
}
if exec.OperationID == "" {
return errors.New("got empty operation ID")
}
return fut.Get(ctx, nil)
}

service := nexus.NewService("test")
service.Register(op)

testCases := []struct {
name string
scheduleToCloseTimeout time.Duration
}{
{
name: "success",
scheduleToCloseTimeout: opSleepDuration + handlerSleepDuration + 100*time.Millisecond,
},
{
name: "timeout before operation start",
scheduleToCloseTimeout: opSleepDuration - 100*time.Millisecond,
},
{
name: "timeout after operation start",
scheduleToCloseTimeout: opSleepDuration + 100*time.Millisecond,
},
}

for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
suite := testsuite.WorkflowTestSuite{}
env := suite.NewTestWorkflowEnvironment()
env.RegisterWorkflow(handlerWF)
env.RegisterNexusService(service)
env.ExecuteWorkflow(callerWF, tc.scheduleToCloseTimeout)
require.True(t, env.IsWorkflowCompleted())
if tc.scheduleToCloseTimeout >= opSleepDuration+handlerSleepDuration {
require.NoError(t, env.GetWorkflowError())
} else {
var execErr *temporal.WorkflowExecutionError
err := env.GetWorkflowError()
require.ErrorAs(t, err, &execErr)
var opErr *temporal.NexusOperationError
err = execErr.Unwrap()
require.ErrorAs(t, err, &opErr)
require.Equal(t, "endpoint", opErr.Endpoint)
require.Equal(t, "test", opErr.Service)
require.Equal(t, op.Name(), opErr.Operation)
if tc.scheduleToCloseTimeout < opSleepDuration {
require.Empty(t, opErr.OperationID)
} else {
require.NotEmpty(t, opErr.OperationID)
}
require.Equal(t, "nexus operation completed unsuccessfully", opErr.Message)
err = opErr.Unwrap()
var timeoutErr *temporal.TimeoutError
require.ErrorAs(t, err, &timeoutErr)
require.Equal(t, "operation timed out", timeoutErr.Message())
}
})
}
}

func TestWorkflowTestSuite_WorkflowRunOperation_WithCancel(t *testing.T) {
wf := func(ctx workflow.Context, cancelBeforeStarted bool) error {
childCtx, cancel := workflow.WithCancel(ctx)
Expand Down Expand Up @@ -960,6 +1047,77 @@ func TestWorkflowTestSuite_WorkflowRunOperation_WithCancel(t *testing.T) {
}
}

func TestWorkflowTestSuite_NexusSyncOperation_ScheduleToCloseTimeout(t *testing.T) {
sleepDuration := 500 * time.Millisecond
op := temporalnexus.NewSyncOperation(
"sync-op",
func(
ctx context.Context,
c client.Client,
_ nexus.NoValue,
opts nexus.StartOperationOptions,
) (nexus.NoValue, error) {
time.Sleep(sleepDuration)
return nil, nil
},
)
wf := func(ctx workflow.Context, scheduleToCloseTimeout time.Duration) error {
client := workflow.NewNexusClient("endpoint", "test")
fut := client.ExecuteOperation(ctx, op, nil, workflow.NexusOperationOptions{
ScheduleToCloseTimeout: scheduleToCloseTimeout,
})
return fut.Get(ctx, nil)
}

service := nexus.NewService("test")
service.Register(op)

testCases := []struct {
name string
scheduleToCloseTimeout time.Duration
}{
{
name: "success",
scheduleToCloseTimeout: sleepDuration * 2,
},
{
name: "timeout",
scheduleToCloseTimeout: sleepDuration / 2,
},
}

for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
suite := testsuite.WorkflowTestSuite{}
env := suite.NewTestWorkflowEnvironment()
env.RegisterWorkflow(waitForCancelWorkflow)
env.RegisterNexusService(service)
env.ExecuteWorkflow(wf, tc.scheduleToCloseTimeout)
require.True(t, env.IsWorkflowCompleted())
if tc.scheduleToCloseTimeout >= sleepDuration {
require.NoError(t, env.GetWorkflowError())
} else {
var execErr *temporal.WorkflowExecutionError
err := env.GetWorkflowError()
require.ErrorAs(t, err, &execErr)
var opErr *temporal.NexusOperationError
err = execErr.Unwrap()
require.ErrorAs(t, err, &opErr)
require.Equal(t, "endpoint", opErr.Endpoint)
require.Equal(t, "test", opErr.Service)
require.Equal(t, op.Name(), opErr.Operation)
require.Empty(t, opErr.OperationID)
require.Equal(t, "nexus operation completed unsuccessfully", opErr.Message)
err = opErr.Unwrap()
var timeoutErr *temporal.TimeoutError
require.ErrorAs(t, err, &timeoutErr)
require.Equal(t, "operation timed out", timeoutErr.Message())
}
})
}
}

func TestWorkflowTestSuite_NexusSyncOperation_ClientMethods_Panic(t *testing.T) {
var panicReason any
op := temporalnexus.NewSyncOperation("signal-op", func(ctx context.Context, c client.Client, _ nexus.NoValue, opts nexus.StartOperationOptions) (nexus.NoValue, error) {
Expand Down

0 comments on commit f0ac2ee

Please sign in to comment.