Skip to content

Commit

Permalink
Workflow Update: add more test for different workflow completion comm…
Browse files Browse the repository at this point in the history
…ands (#6396)

## What changed?
<!-- Describe what has changed in this PR -->
Add more test for different workflow completion commands.

## Why?
<!-- Tell your future self why have you made these changes -->
Only `COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION` case was covered before.
This PR adds `COMMAND_TYPE_CONTINUE_AS_NEW_WORKFLOW_EXECUTION` and
`COMMAND_TYPE_FAIL_WORKFLOW_EXECUTION` cases which should behave exactly
the same (in term of dealing with workflow updates). It also proves that
besides very specific edge case described in #6375 workflow updates and
"continue as new" work together as expected.

## How did you test it?
<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->
Run locally.

## Potential risks
<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->
N/A

## Documentation
<!-- Have you made sure this change doesn't falsify anything currently
stated in `docs/`? If significant
new behavior is added, have you described that in `docs/`? -->
No.

## Is hotfix candidate?
<!-- Is this PR a hotfix candidate or does it require a notification to
be sent to the broader community? (Yes/No) -->
No.
  • Loading branch information
alexshtin authored Aug 12, 2024
1 parent 59d3bcc commit b11df91
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 105 deletions.
12 changes: 12 additions & 0 deletions common/testing/testvars/any.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ package testvars

import (
commonpb "go.temporal.io/api/common/v1"
failurepb "go.temporal.io/api/failure/v1"

"go.temporal.io/server/common/payload"
"go.temporal.io/server/common/payloads"
)
Expand Down Expand Up @@ -63,3 +65,13 @@ func (a Any) EventID() int64 {
// This produces EventID in XX0YY format, where XX is unique for every test and YY is a random number.
return int64(randInt(a.testHash, 2, 1, 2))
}

func (a Any) ApplicationFailure() *failurepb.Failure {
return &failurepb.Failure{
Message: a.String(),
FailureInfo: &failurepb.Failure_ApplicationFailureInfo{ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{
Type: a.String(),
NonRetryable: false,
}},
}
}
250 changes: 145 additions & 105 deletions tests/update_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -3262,142 +3262,182 @@ func (s *FunctionalSuite) TestUpdateWorkflow_ScheduledSpeculativeWorkflowTask_Te

func (s *FunctionalSuite) TestUpdateWorkflow_CompleteWorkflow_AbortUpdates() {
type testCase struct {
Name string
Description string
UpdateErr string
UpdateFailure string
Commands func(tv *testvars.TestVars) []*commandpb.Command
Messages func(tv *testvars.TestVars, updRequestMsg *protocolpb.Message) []*protocolpb.Message
name string
description string
updateErr string
updateFailure string
commands func(tv *testvars.TestVars) []*commandpb.Command
messages func(tv *testvars.TestVars, updRequestMsg *protocolpb.Message) []*protocolpb.Message
}
type completionCommand struct {
name string
finalStatus enumspb.WorkflowExecutionStatus
command func(_ *testvars.TestVars) *commandpb.Command
}
testCases := []testCase{
{
Name: "admitted",
Description: "update in stateAdmitted must get an error",
UpdateErr: "workflow execution already completed",
UpdateFailure: "",
Commands: func(_ *testvars.TestVars) []*commandpb.Command { return nil },
Messages: func(_ *testvars.TestVars, _ *protocolpb.Message) []*protocolpb.Message { return nil },
name: "update admitted",
description: "update in stateAdmitted must get an error",
updateErr: "workflow execution already completed",
updateFailure: "",
commands: func(_ *testvars.TestVars) []*commandpb.Command { return nil },
messages: func(_ *testvars.TestVars, _ *protocolpb.Message) []*protocolpb.Message { return nil },
},
{
Name: "accepted",
Description: "update in stateAccepted must get an error",
UpdateErr: "workflow execution already completed",
UpdateFailure: "",
Commands: func(tv *testvars.TestVars) []*commandpb.Command { return s.UpdateAcceptCommands(tv, "1") },
Messages: func(tv *testvars.TestVars, updRequestMsg *protocolpb.Message) []*protocolpb.Message {
name: "update accepted",
description: "update in stateAccepted must get an error",
updateErr: "workflow execution already completed",
updateFailure: "",
commands: func(tv *testvars.TestVars) []*commandpb.Command { return s.UpdateAcceptCommands(tv, "1") },
messages: func(tv *testvars.TestVars, updRequestMsg *protocolpb.Message) []*protocolpb.Message {
return s.UpdateAcceptMessages(tv, updRequestMsg, "1")
},
},
{
Name: "completed",
Description: "completed update must not be affected by workflow completion",
UpdateErr: "",
UpdateFailure: "",
Commands: func(tv *testvars.TestVars) []*commandpb.Command { return s.UpdateAcceptCompleteCommands(tv, "1") },
Messages: func(tv *testvars.TestVars, updRequestMsg *protocolpb.Message) []*protocolpb.Message {
name: "update completed",
description: "completed update must not be affected by workflow completion",
updateErr: "",
updateFailure: "",
commands: func(tv *testvars.TestVars) []*commandpb.Command { return s.UpdateAcceptCompleteCommands(tv, "1") },
messages: func(tv *testvars.TestVars, updRequestMsg *protocolpb.Message) []*protocolpb.Message {
return s.UpdateAcceptCompleteMessages(tv, updRequestMsg, "1")
},
},
{
Name: "rejected",
Description: "rejected update must be rejected with rejection from workflow",
UpdateErr: "",
UpdateFailure: "rejection-of-", // Rejection from workflow.
Commands: func(tv *testvars.TestVars) []*commandpb.Command { return nil },
Messages: func(tv *testvars.TestVars, updRequestMsg *protocolpb.Message) []*protocolpb.Message {
name: "update rejected",
description: "rejected update must be rejected with rejection from workflow",
updateErr: "",
updateFailure: "rejection-of-", // Rejection from workflow.
commands: func(tv *testvars.TestVars) []*commandpb.Command { return nil },
messages: func(tv *testvars.TestVars, updRequestMsg *protocolpb.Message) []*protocolpb.Message {
return s.UpdateRejectMessages(tv, updRequestMsg, "1")
},
},
}

for _, tc := range testCases {
s.Run(tc.Name, func() {
tv := testvars.New(s.T())
workflowCompletionCommands := []completionCommand{
{
name: "workflow completed",
finalStatus: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED,
command: func(_ *testvars.TestVars) *commandpb.Command {
return &commandpb.Command{
CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION,
Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{}},
}
},
},
{
name: "workflow continued as new",
finalStatus: enumspb.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW,
command: func(tv *testvars.TestVars) *commandpb.Command {
return &commandpb.Command{
CommandType: enumspb.COMMAND_TYPE_CONTINUE_AS_NEW_WORKFLOW_EXECUTION,
Attributes: &commandpb.Command_ContinueAsNewWorkflowExecutionCommandAttributes{ContinueAsNewWorkflowExecutionCommandAttributes: &commandpb.ContinueAsNewWorkflowExecutionCommandAttributes{
WorkflowType: tv.WorkflowType(),
TaskQueue: tv.TaskQueue(),
}},
}
},
},
{
name: "workflow failed",
finalStatus: enumspb.WORKFLOW_EXECUTION_STATUS_FAILED,
command: func(tv *testvars.TestVars) *commandpb.Command {
return &commandpb.Command{
CommandType: enumspb.COMMAND_TYPE_FAIL_WORKFLOW_EXECUTION,
Attributes: &commandpb.Command_FailWorkflowExecutionCommandAttributes{FailWorkflowExecutionCommandAttributes: &commandpb.FailWorkflowExecutionCommandAttributes{
Failure: tv.Any().ApplicationFailure(),
}},
}
},
},
}

tv = s.startWorkflow(tv)
for _, tc := range testCases {
for _, wfCC := range workflowCompletionCommands {
s.Run(tc.name+" "+wfCC.name, func() {
tv := testvars.New(s.T())

tv = s.startWorkflow(tv)

wtHandlerCalls := 0
wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) {
wtHandlerCalls++
switch wtHandlerCalls {
case 1:
// Completes first WT with empty command list.
return nil, nil
case 2:
return append(tc.commands(tv), wfCC.command(tv)), nil
default:
s.Failf("wtHandler called too many times", "wtHandler shouldn't be called %d times", wtHandlerCalls)
return nil, nil
}
}

wtHandlerCalls := 0
wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) {
wtHandlerCalls++
switch wtHandlerCalls {
case 1:
// Completes first WT with empty command list.
return nil, nil
case 2:
return append(tc.Commands(tv), &commandpb.Command{
CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION,
Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{}},
}), nil
default:
s.Failf("wtHandler called too many times", "wtHandler shouldn't be called %d times", wtHandlerCalls)
return nil, nil
msgHandlerCalls := 0
msgHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*protocolpb.Message, error) {
msgHandlerCalls++
switch msgHandlerCalls {
case 1:
return nil, nil
case 2:
updRequestMsg := task.Messages[0]
return tc.messages(tv, updRequestMsg), nil
default:
s.Failf("msgHandler called too many times", "msgHandler shouldn't be called %d times", msgHandlerCalls)
return nil, nil
}
}
}

msgHandlerCalls := 0
msgHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*protocolpb.Message, error) {
msgHandlerCalls++
switch msgHandlerCalls {
case 1:
return nil, nil
case 2:
updRequestMsg := task.Messages[0]
return tc.Messages(tv, updRequestMsg), nil
default:
s.Failf("msgHandler called too many times", "msgHandler shouldn't be called %d times", msgHandlerCalls)
return nil, nil
poller := &TaskPoller{
Client: s.client,
Namespace: s.namespace,
TaskQueue: tv.TaskQueue(),
Identity: tv.WorkerIdentity(),
WorkflowTaskHandler: wtHandler,
MessageHandler: msgHandler,
Logger: s.Logger,
T: s.T(),
}
}

poller := &TaskPoller{
Client: s.client,
Namespace: s.namespace,
TaskQueue: tv.TaskQueue(),
Identity: tv.WorkerIdentity(),
WorkflowTaskHandler: wtHandler,
MessageHandler: msgHandler,
Logger: s.Logger,
T: s.T(),
}
// Drain first WT.
_, err := poller.PollAndProcessWorkflowTask()
s.NoError(err)

// Drain first WT.
_, err := poller.PollAndProcessWorkflowTask()
s.NoError(err)
updateResultCh := s.sendUpdate(NewContext(), tv, "1")

halfSecondTimeoutCtx, cancel := context.WithTimeout(NewContext(), 500*time.Millisecond)
defer cancel()
updateResultCh := s.sendUpdate(halfSecondTimeoutCtx, tv, "1")
// Complete workflow.
_, err = poller.PollAndProcessWorkflowTask()
s.NoError(err)

// Complete workflow.
_, err = poller.PollAndProcessWorkflowTask()
s.NoError(err)
updateResult := <-updateResultCh
if tc.updateErr != "" {
s.Error(updateResult.err, tc.description)
s.Contains(updateResult.err.Error(), tc.updateErr, tc.description)
} else {
s.NoError(updateResult.err, tc.description)
}

updateResult := <-updateResultCh
if tc.UpdateErr != "" {
s.Error(updateResult.err, tc.Description)
s.Contains(updateResult.err.Error(), tc.UpdateErr, tc.Description)
} else {
s.NoError(updateResult.err, tc.Description)
}
if tc.updateFailure != "" {
s.NotNil(updateResult.response.GetOutcome().GetFailure(), tc.description)
s.Contains(updateResult.response.GetOutcome().GetFailure().GetMessage(), tc.updateFailure, tc.description)
} else {
s.Nil(updateResult.response.GetOutcome().GetFailure(), tc.description)
}

if tc.UpdateFailure != "" {
s.NotNil(updateResult.response.GetOutcome().GetFailure(), tc.Description)
s.Contains(updateResult.response.GetOutcome().GetFailure().GetMessage(), tc.UpdateFailure, tc.Description)
} else {
s.Nil(updateResult.response.GetOutcome().GetFailure(), tc.Description)
}
// Check that update didn't block workflow completion.
descResp, err := s.client.DescribeWorkflowExecution(NewContext(), &workflowservice.DescribeWorkflowExecutionRequest{
Namespace: s.namespace,
Execution: tv.WorkflowExecution(),
})
s.NoError(err)
s.Equal(wfCC.finalStatus, descResp.WorkflowExecutionInfo.Status)

// Check that update didn't block workflow completion.
descResp, err := s.client.DescribeWorkflowExecution(NewContext(), &workflowservice.DescribeWorkflowExecutionRequest{
Namespace: s.namespace,
Execution: tv.WorkflowExecution(),
s.Equal(2, wtHandlerCalls)
s.Equal(2, msgHandlerCalls)
})
s.NoError(err)
s.Equal(enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED, descResp.WorkflowExecutionInfo.Status)

s.Equal(2, wtHandlerCalls)
s.Equal(2, msgHandlerCalls)
})
}
}
}

Expand Down

0 comments on commit b11df91

Please sign in to comment.