From f0ac2ee4deb4899db00b65e81c923ed61892acd9 Mon Sep 17 00:00:00 2001 From: Rodrigo Zhou <2068124+rodrigozhou@users.noreply.github.com> Date: Tue, 1 Oct 2024 16:59:14 -0500 Subject: [PATCH] Fix Nexus test env to respect ScheduleToCloseTimeout (#1636) * Fix Nexus test env to respect ScheduleToCloseTimeout * address comments --- internal/internal_workflow_testsuite.go | 92 ++++++++++---- test/nexus_test.go | 158 ++++++++++++++++++++++++ 2 files changed, 224 insertions(+), 26 deletions(-) diff --git a/internal/internal_workflow_testsuite.go b/internal/internal_workflow_testsuite.go index fc7f119b5..44c36ea38 100644 --- a/internal/internal_workflow_testsuite.go +++ b/internal/internal_workflow_testsuite.go @@ -2351,7 +2351,11 @@ func (env *testWorkflowEnvironmentImpl) newTestNexusTaskHandler() *nexusTaskHand ) } -func (env *testWorkflowEnvironmentImpl) ExecuteNexusOperation(params executeNexusOperationParams, callback func(*commonpb.Payload, error), startedHandler func(opID string, e error)) int64 { +func (env *testWorkflowEnvironmentImpl) ExecuteNexusOperation( + params executeNexusOperationParams, + callback func(*commonpb.Payload, error), + startedHandler func(opID string, e error), +) int64 { seq := env.nextID() taskHandler := env.newTestNexusTaskHandler() handle := &testNexusOperationHandle{ @@ -2363,6 +2367,37 @@ func (env *testWorkflowEnvironmentImpl) ExecuteNexusOperation(params executeNexu } env.runningNexusOperations[seq] = handle + var opID string + if params.options.ScheduleToCloseTimeout > 0 { + // Timer to fail the nexus operation due to schedule to close timeout. + env.NewTimer( + params.options.ScheduleToCloseTimeout, + TimerOptions{}, + func(result *commonpb.Payloads, err error) { + timeoutErr := env.failureConverter.FailureToError(nexusOperationFailure( + params, + opID, + &failurepb.Failure{ + Message: "operation timed out", + FailureInfo: &failurepb.Failure_TimeoutFailureInfo{ + TimeoutFailureInfo: &failurepb.TimeoutFailureInfo{ + TimeoutType: enumspb.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE, + }, + }, + }, + )) + env.postCallback(func() { + // For async operation, there are two scenarios: + // 1. operation already started: the callback has already been called with the operation id, + // and calling again is no-op; + // 2. operation didn't start yet: there's no operation id to set. + handle.startedCallback("", timeoutErr) + handle.completedCallback(nil, timeoutErr) + }, true) + }, + ) + } + task := handle.newStartTask() env.runningCount++ go func() { @@ -2385,31 +2420,32 @@ func (env *testWorkflowEnvironmentImpl) ExecuteNexusOperation(params executeNexu handle.completedCallback(nil, err) }, true) return - } else { - switch v := response.GetResponse().GetStartOperation().GetVariant().(type) { - case *nexuspb.StartOperationResponse_SyncSuccess: - env.postCallback(func() { - handle.startedCallback("", nil) - handle.completedCallback(v.SyncSuccess.GetPayload(), nil) - }, true) - case *nexuspb.StartOperationResponse_AsyncSuccess: - env.postCallback(func() { - handle.startedCallback(v.AsyncSuccess.GetOperationId(), nil) - if handle.cancelRequested { - handle.cancel() - } - }, true) - case *nexuspb.StartOperationResponse_OperationError: - err := env.failureConverter.FailureToError( - nexusOperationFailure(params, "", unsuccessfulOperationErrorToTemporalFailure(v.OperationError)), - ) - env.postCallback(func() { - handle.startedCallback("", err) - handle.completedCallback(nil, err) - }, true) - default: - panic(fmt.Errorf("unknown response variant: %v", v)) - } + } + + switch v := response.GetResponse().GetStartOperation().GetVariant().(type) { + case *nexuspb.StartOperationResponse_SyncSuccess: + env.postCallback(func() { + handle.startedCallback("", nil) + handle.completedCallback(v.SyncSuccess.GetPayload(), nil) + }, true) + case *nexuspb.StartOperationResponse_AsyncSuccess: + env.postCallback(func() { + opID = v.AsyncSuccess.GetOperationId() + handle.startedCallback(v.AsyncSuccess.GetOperationId(), nil) + if handle.cancelRequested { + handle.cancel() + } + }, true) + case *nexuspb.StartOperationResponse_OperationError: + err := env.failureConverter.FailureToError( + nexusOperationFailure(params, "", unsuccessfulOperationErrorToTemporalFailure(v.OperationError)), + ) + env.postCallback(func() { + handle.startedCallback("", err) + handle.completedCallback(nil, err) + }, true) + default: + panic(fmt.Errorf("unknown response variant: %v", v)) } }() return seq @@ -2887,6 +2923,10 @@ func (h *testNexusOperationHandle) completedCallback(result *commonpb.Payload, e // startedCallback is a callback registered to handle operation start. // Must be called in a postCallback block. func (h *testNexusOperationHandle) startedCallback(opID string, e error) { + if h.started { + // Ignore duplciate starts. + return + } h.operationID = opID h.started = true h.onStarted(opID, e) diff --git a/test/nexus_test.go b/test/nexus_test.go index 4b84f4c19..83cd1a0f9 100644 --- a/test/nexus_test.go +++ b/test/nexus_test.go @@ -897,6 +897,93 @@ func TestWorkflowTestSuite_WorkflowRunOperation(t *testing.T) { }) } +func TestWorkflowTestSuite_WorkflowRunOperation_ScheduleToCloseTimeout(t *testing.T) { + handlerSleepDuration := 500 * time.Millisecond + handlerWF := func(ctx workflow.Context, _ nexus.NoValue) (nexus.NoValue, error) { + return nil, workflow.Sleep(ctx, handlerSleepDuration) + } + + opSleepDuration := 250 * time.Millisecond + op := temporalnexus.NewWorkflowRunOperation( + "op", + handlerWF, + func(ctx context.Context, _ nexus.NoValue, opts nexus.StartOperationOptions) (client.StartWorkflowOptions, error) { + time.Sleep(opSleepDuration) + return client.StartWorkflowOptions{ID: opts.RequestID}, nil + }) + + callerWF := func(ctx workflow.Context, scheduleToCloseTimeout time.Duration) error { + client := workflow.NewNexusClient("endpoint", "test") + fut := client.ExecuteOperation(ctx, op, nil, workflow.NexusOperationOptions{ + ScheduleToCloseTimeout: scheduleToCloseTimeout, + }) + var exec workflow.NexusOperationExecution + if err := fut.GetNexusOperationExecution().Get(ctx, &exec); err != nil { + return err + } + if exec.OperationID == "" { + return errors.New("got empty operation ID") + } + return fut.Get(ctx, nil) + } + + service := nexus.NewService("test") + service.Register(op) + + testCases := []struct { + name string + scheduleToCloseTimeout time.Duration + }{ + { + name: "success", + scheduleToCloseTimeout: opSleepDuration + handlerSleepDuration + 100*time.Millisecond, + }, + { + name: "timeout before operation start", + scheduleToCloseTimeout: opSleepDuration - 100*time.Millisecond, + }, + { + name: "timeout after operation start", + scheduleToCloseTimeout: opSleepDuration + 100*time.Millisecond, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + suite := testsuite.WorkflowTestSuite{} + env := suite.NewTestWorkflowEnvironment() + env.RegisterWorkflow(handlerWF) + env.RegisterNexusService(service) + env.ExecuteWorkflow(callerWF, tc.scheduleToCloseTimeout) + require.True(t, env.IsWorkflowCompleted()) + if tc.scheduleToCloseTimeout >= opSleepDuration+handlerSleepDuration { + require.NoError(t, env.GetWorkflowError()) + } else { + var execErr *temporal.WorkflowExecutionError + err := env.GetWorkflowError() + require.ErrorAs(t, err, &execErr) + var opErr *temporal.NexusOperationError + err = execErr.Unwrap() + require.ErrorAs(t, err, &opErr) + require.Equal(t, "endpoint", opErr.Endpoint) + require.Equal(t, "test", opErr.Service) + require.Equal(t, op.Name(), opErr.Operation) + if tc.scheduleToCloseTimeout < opSleepDuration { + require.Empty(t, opErr.OperationID) + } else { + require.NotEmpty(t, opErr.OperationID) + } + require.Equal(t, "nexus operation completed unsuccessfully", opErr.Message) + err = opErr.Unwrap() + var timeoutErr *temporal.TimeoutError + require.ErrorAs(t, err, &timeoutErr) + require.Equal(t, "operation timed out", timeoutErr.Message()) + } + }) + } +} + func TestWorkflowTestSuite_WorkflowRunOperation_WithCancel(t *testing.T) { wf := func(ctx workflow.Context, cancelBeforeStarted bool) error { childCtx, cancel := workflow.WithCancel(ctx) @@ -960,6 +1047,77 @@ func TestWorkflowTestSuite_WorkflowRunOperation_WithCancel(t *testing.T) { } } +func TestWorkflowTestSuite_NexusSyncOperation_ScheduleToCloseTimeout(t *testing.T) { + sleepDuration := 500 * time.Millisecond + op := temporalnexus.NewSyncOperation( + "sync-op", + func( + ctx context.Context, + c client.Client, + _ nexus.NoValue, + opts nexus.StartOperationOptions, + ) (nexus.NoValue, error) { + time.Sleep(sleepDuration) + return nil, nil + }, + ) + wf := func(ctx workflow.Context, scheduleToCloseTimeout time.Duration) error { + client := workflow.NewNexusClient("endpoint", "test") + fut := client.ExecuteOperation(ctx, op, nil, workflow.NexusOperationOptions{ + ScheduleToCloseTimeout: scheduleToCloseTimeout, + }) + return fut.Get(ctx, nil) + } + + service := nexus.NewService("test") + service.Register(op) + + testCases := []struct { + name string + scheduleToCloseTimeout time.Duration + }{ + { + name: "success", + scheduleToCloseTimeout: sleepDuration * 2, + }, + { + name: "timeout", + scheduleToCloseTimeout: sleepDuration / 2, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + suite := testsuite.WorkflowTestSuite{} + env := suite.NewTestWorkflowEnvironment() + env.RegisterWorkflow(waitForCancelWorkflow) + env.RegisterNexusService(service) + env.ExecuteWorkflow(wf, tc.scheduleToCloseTimeout) + require.True(t, env.IsWorkflowCompleted()) + if tc.scheduleToCloseTimeout >= sleepDuration { + require.NoError(t, env.GetWorkflowError()) + } else { + var execErr *temporal.WorkflowExecutionError + err := env.GetWorkflowError() + require.ErrorAs(t, err, &execErr) + var opErr *temporal.NexusOperationError + err = execErr.Unwrap() + require.ErrorAs(t, err, &opErr) + require.Equal(t, "endpoint", opErr.Endpoint) + require.Equal(t, "test", opErr.Service) + require.Equal(t, op.Name(), opErr.Operation) + require.Empty(t, opErr.OperationID) + require.Equal(t, "nexus operation completed unsuccessfully", opErr.Message) + err = opErr.Unwrap() + var timeoutErr *temporal.TimeoutError + require.ErrorAs(t, err, &timeoutErr) + require.Equal(t, "operation timed out", timeoutErr.Message()) + } + }) + } +} + func TestWorkflowTestSuite_NexusSyncOperation_ClientMethods_Panic(t *testing.T) { var panicReason any op := temporalnexus.NewSyncOperation("signal-op", func(ctx context.Context, c client.Client, _ nexus.NoValue, opts nexus.StartOperationOptions) (nexus.NoValue, error) {