diff --git a/common/testing/testvars/any.go b/common/testing/testvars/any.go index dcc91d563cd..296343d61ee 100644 --- a/common/testing/testvars/any.go +++ b/common/testing/testvars/any.go @@ -26,6 +26,8 @@ package testvars import ( commonpb "go.temporal.io/api/common/v1" + failurepb "go.temporal.io/api/failure/v1" + "go.temporal.io/server/common/payload" "go.temporal.io/server/common/payloads" ) @@ -63,3 +65,13 @@ func (a Any) EventID() int64 { // This produces EventID in XX0YY format, where XX is unique for every test and YY is a random number. return int64(randInt(a.testHash, 2, 1, 2)) } + +func (a Any) ApplicationFailure() *failurepb.Failure { + return &failurepb.Failure{ + Message: a.String(), + FailureInfo: &failurepb.Failure_ApplicationFailureInfo{ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{ + Type: a.String(), + NonRetryable: false, + }}, + } +} diff --git a/tests/update_workflow.go b/tests/update_workflow.go index 7853011684f..f28b78f335c 100644 --- a/tests/update_workflow.go +++ b/tests/update_workflow.go @@ -3262,142 +3262,182 @@ func (s *FunctionalSuite) TestUpdateWorkflow_ScheduledSpeculativeWorkflowTask_Te func (s *FunctionalSuite) TestUpdateWorkflow_CompleteWorkflow_AbortUpdates() { type testCase struct { - Name string - Description string - UpdateErr string - UpdateFailure string - Commands func(tv *testvars.TestVars) []*commandpb.Command - Messages func(tv *testvars.TestVars, updRequestMsg *protocolpb.Message) []*protocolpb.Message + name string + description string + updateErr string + updateFailure string + commands func(tv *testvars.TestVars) []*commandpb.Command + messages func(tv *testvars.TestVars, updRequestMsg *protocolpb.Message) []*protocolpb.Message + } + type completionCommand struct { + name string + finalStatus enumspb.WorkflowExecutionStatus + command func(_ *testvars.TestVars) *commandpb.Command } testCases := []testCase{ { - Name: "admitted", - Description: "update in stateAdmitted must get an error", - UpdateErr: "workflow execution already completed", - UpdateFailure: "", - Commands: func(_ *testvars.TestVars) []*commandpb.Command { return nil }, - Messages: func(_ *testvars.TestVars, _ *protocolpb.Message) []*protocolpb.Message { return nil }, + name: "update admitted", + description: "update in stateAdmitted must get an error", + updateErr: "workflow execution already completed", + updateFailure: "", + commands: func(_ *testvars.TestVars) []*commandpb.Command { return nil }, + messages: func(_ *testvars.TestVars, _ *protocolpb.Message) []*protocolpb.Message { return nil }, }, { - Name: "accepted", - Description: "update in stateAccepted must get an error", - UpdateErr: "workflow execution already completed", - UpdateFailure: "", - Commands: func(tv *testvars.TestVars) []*commandpb.Command { return s.UpdateAcceptCommands(tv, "1") }, - Messages: func(tv *testvars.TestVars, updRequestMsg *protocolpb.Message) []*protocolpb.Message { + name: "update accepted", + description: "update in stateAccepted must get an error", + updateErr: "workflow execution already completed", + updateFailure: "", + commands: func(tv *testvars.TestVars) []*commandpb.Command { return s.UpdateAcceptCommands(tv, "1") }, + messages: func(tv *testvars.TestVars, updRequestMsg *protocolpb.Message) []*protocolpb.Message { return s.UpdateAcceptMessages(tv, updRequestMsg, "1") }, }, { - Name: "completed", - Description: "completed update must not be affected by workflow completion", - UpdateErr: "", - UpdateFailure: "", - Commands: func(tv *testvars.TestVars) []*commandpb.Command { return s.UpdateAcceptCompleteCommands(tv, "1") }, - Messages: func(tv *testvars.TestVars, updRequestMsg *protocolpb.Message) []*protocolpb.Message { + name: "update completed", + description: "completed update must not be affected by workflow completion", + updateErr: "", + updateFailure: "", + commands: func(tv *testvars.TestVars) []*commandpb.Command { return s.UpdateAcceptCompleteCommands(tv, "1") }, + messages: func(tv *testvars.TestVars, updRequestMsg *protocolpb.Message) []*protocolpb.Message { return s.UpdateAcceptCompleteMessages(tv, updRequestMsg, "1") }, }, { - Name: "rejected", - Description: "rejected update must be rejected with rejection from workflow", - UpdateErr: "", - UpdateFailure: "rejection-of-", // Rejection from workflow. - Commands: func(tv *testvars.TestVars) []*commandpb.Command { return nil }, - Messages: func(tv *testvars.TestVars, updRequestMsg *protocolpb.Message) []*protocolpb.Message { + name: "update rejected", + description: "rejected update must be rejected with rejection from workflow", + updateErr: "", + updateFailure: "rejection-of-", // Rejection from workflow. + commands: func(tv *testvars.TestVars) []*commandpb.Command { return nil }, + messages: func(tv *testvars.TestVars, updRequestMsg *protocolpb.Message) []*protocolpb.Message { return s.UpdateRejectMessages(tv, updRequestMsg, "1") }, }, } - for _, tc := range testCases { - s.Run(tc.Name, func() { - tv := testvars.New(s.T()) + workflowCompletionCommands := []completionCommand{ + { + name: "workflow completed", + finalStatus: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED, + command: func(_ *testvars.TestVars) *commandpb.Command { + return &commandpb.Command{ + CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION, + Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{}}, + } + }, + }, + { + name: "workflow continued as new", + finalStatus: enumspb.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW, + command: func(tv *testvars.TestVars) *commandpb.Command { + return &commandpb.Command{ + CommandType: enumspb.COMMAND_TYPE_CONTINUE_AS_NEW_WORKFLOW_EXECUTION, + Attributes: &commandpb.Command_ContinueAsNewWorkflowExecutionCommandAttributes{ContinueAsNewWorkflowExecutionCommandAttributes: &commandpb.ContinueAsNewWorkflowExecutionCommandAttributes{ + WorkflowType: tv.WorkflowType(), + TaskQueue: tv.TaskQueue(), + }}, + } + }, + }, + { + name: "workflow failed", + finalStatus: enumspb.WORKFLOW_EXECUTION_STATUS_FAILED, + command: func(tv *testvars.TestVars) *commandpb.Command { + return &commandpb.Command{ + CommandType: enumspb.COMMAND_TYPE_FAIL_WORKFLOW_EXECUTION, + Attributes: &commandpb.Command_FailWorkflowExecutionCommandAttributes{FailWorkflowExecutionCommandAttributes: &commandpb.FailWorkflowExecutionCommandAttributes{ + Failure: tv.Any().ApplicationFailure(), + }}, + } + }, + }, + } - tv = s.startWorkflow(tv) + for _, tc := range testCases { + for _, wfCC := range workflowCompletionCommands { + s.Run(tc.name+" "+wfCC.name, func() { + tv := testvars.New(s.T()) + + tv = s.startWorkflow(tv) + + wtHandlerCalls := 0 + wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) { + wtHandlerCalls++ + switch wtHandlerCalls { + case 1: + // Completes first WT with empty command list. + return nil, nil + case 2: + return append(tc.commands(tv), wfCC.command(tv)), nil + default: + s.Failf("wtHandler called too many times", "wtHandler shouldn't be called %d times", wtHandlerCalls) + return nil, nil + } + } - wtHandlerCalls := 0 - wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) { - wtHandlerCalls++ - switch wtHandlerCalls { - case 1: - // Completes first WT with empty command list. - return nil, nil - case 2: - return append(tc.Commands(tv), &commandpb.Command{ - CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION, - Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{}}, - }), nil - default: - s.Failf("wtHandler called too many times", "wtHandler shouldn't be called %d times", wtHandlerCalls) - return nil, nil + msgHandlerCalls := 0 + msgHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*protocolpb.Message, error) { + msgHandlerCalls++ + switch msgHandlerCalls { + case 1: + return nil, nil + case 2: + updRequestMsg := task.Messages[0] + return tc.messages(tv, updRequestMsg), nil + default: + s.Failf("msgHandler called too many times", "msgHandler shouldn't be called %d times", msgHandlerCalls) + return nil, nil + } } - } - msgHandlerCalls := 0 - msgHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*protocolpb.Message, error) { - msgHandlerCalls++ - switch msgHandlerCalls { - case 1: - return nil, nil - case 2: - updRequestMsg := task.Messages[0] - return tc.Messages(tv, updRequestMsg), nil - default: - s.Failf("msgHandler called too many times", "msgHandler shouldn't be called %d times", msgHandlerCalls) - return nil, nil + poller := &TaskPoller{ + Client: s.client, + Namespace: s.namespace, + TaskQueue: tv.TaskQueue(), + Identity: tv.WorkerIdentity(), + WorkflowTaskHandler: wtHandler, + MessageHandler: msgHandler, + Logger: s.Logger, + T: s.T(), } - } - poller := &TaskPoller{ - Client: s.client, - Namespace: s.namespace, - TaskQueue: tv.TaskQueue(), - Identity: tv.WorkerIdentity(), - WorkflowTaskHandler: wtHandler, - MessageHandler: msgHandler, - Logger: s.Logger, - T: s.T(), - } + // Drain first WT. + _, err := poller.PollAndProcessWorkflowTask() + s.NoError(err) - // Drain first WT. - _, err := poller.PollAndProcessWorkflowTask() - s.NoError(err) + updateResultCh := s.sendUpdate(NewContext(), tv, "1") - halfSecondTimeoutCtx, cancel := context.WithTimeout(NewContext(), 500*time.Millisecond) - defer cancel() - updateResultCh := s.sendUpdate(halfSecondTimeoutCtx, tv, "1") + // Complete workflow. + _, err = poller.PollAndProcessWorkflowTask() + s.NoError(err) - // Complete workflow. - _, err = poller.PollAndProcessWorkflowTask() - s.NoError(err) + updateResult := <-updateResultCh + if tc.updateErr != "" { + s.Error(updateResult.err, tc.description) + s.Contains(updateResult.err.Error(), tc.updateErr, tc.description) + } else { + s.NoError(updateResult.err, tc.description) + } - updateResult := <-updateResultCh - if tc.UpdateErr != "" { - s.Error(updateResult.err, tc.Description) - s.Contains(updateResult.err.Error(), tc.UpdateErr, tc.Description) - } else { - s.NoError(updateResult.err, tc.Description) - } + if tc.updateFailure != "" { + s.NotNil(updateResult.response.GetOutcome().GetFailure(), tc.description) + s.Contains(updateResult.response.GetOutcome().GetFailure().GetMessage(), tc.updateFailure, tc.description) + } else { + s.Nil(updateResult.response.GetOutcome().GetFailure(), tc.description) + } - if tc.UpdateFailure != "" { - s.NotNil(updateResult.response.GetOutcome().GetFailure(), tc.Description) - s.Contains(updateResult.response.GetOutcome().GetFailure().GetMessage(), tc.UpdateFailure, tc.Description) - } else { - s.Nil(updateResult.response.GetOutcome().GetFailure(), tc.Description) - } + // Check that update didn't block workflow completion. + descResp, err := s.client.DescribeWorkflowExecution(NewContext(), &workflowservice.DescribeWorkflowExecutionRequest{ + Namespace: s.namespace, + Execution: tv.WorkflowExecution(), + }) + s.NoError(err) + s.Equal(wfCC.finalStatus, descResp.WorkflowExecutionInfo.Status) - // Check that update didn't block workflow completion. - descResp, err := s.client.DescribeWorkflowExecution(NewContext(), &workflowservice.DescribeWorkflowExecutionRequest{ - Namespace: s.namespace, - Execution: tv.WorkflowExecution(), + s.Equal(2, wtHandlerCalls) + s.Equal(2, msgHandlerCalls) }) - s.NoError(err) - s.Equal(enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED, descResp.WorkflowExecutionInfo.Status) - - s.Equal(2, wtHandlerCalls) - s.Equal(2, msgHandlerCalls) - }) + } } }