Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workflow Update: abort updates with failure if update was accepted but workflow completes #6630

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 80 additions & 7 deletions service/history/workflow/update/abort_reason.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,29 +25,102 @@
package update

import (
"fmt"

failurepb "go.temporal.io/api/failure/v1"
"go.temporal.io/server/service/history/consts"
)

type (
AbortReason uint32

reasonState struct {
r AbortReason
st state
}

failureError struct {
f *failurepb.Failure
err error
}
)

const (
AbortReasonRegistryCleared AbortReason = iota + 1
AbortReasonWorkflowCompleted
AbortReasonWorkflowContinuing
lastAbortReason
)

// Error returns an error which will be set to Update futures while aborting Update.
func (r AbortReason) Error() error {
// Matrix of "abort reason/Update state" to "failure/error" pair. Only one value (failure or error) is allowed per pair.
var reasonStateMatrix = map[reasonState]failureError{
// If the registry is cleared, then all Updates (no matter what state they are)
// are aborted with retryable registryClearedErr error.
reasonState{r: AbortReasonRegistryCleared, st: stateCreated}: {f: nil, err: registryClearedErr},
reasonState{r: AbortReasonRegistryCleared, st: stateProvisionallyAdmitted}: {f: nil, err: registryClearedErr},
reasonState{r: AbortReasonRegistryCleared, st: stateAdmitted}: {f: nil, err: registryClearedErr},
reasonState{r: AbortReasonRegistryCleared, st: stateSent}: {f: nil, err: registryClearedErr},
reasonState{r: AbortReasonRegistryCleared, st: stateProvisionallyAccepted}: {f: nil, err: registryClearedErr},
reasonState{r: AbortReasonRegistryCleared, st: stateAccepted}: {f: nil, err: registryClearedErr},
reasonState{r: AbortReasonRegistryCleared, st: stateProvisionallyCompleted}: {f: nil, err: registryClearedErr},
reasonState{r: AbortReasonRegistryCleared, st: stateCompleted}: {f: nil, err: registryClearedErr},
reasonState{r: AbortReasonRegistryCleared, st: stateAborted}: {f: nil, err: registryClearedErr},
reasonState{r: AbortReasonRegistryCleared, st: stateProvisionallyCompletedAfterAccepted}: {f: nil, err: registryClearedErr},

// If the Workflow is completed, then pre-accepted Updates are aborted with non-retryable ErrWorkflowCompleted error.
reasonState{r: AbortReasonWorkflowCompleted, st: stateCreated}: {f: nil, err: consts.ErrWorkflowCompleted},
reasonState{r: AbortReasonWorkflowCompleted, st: stateProvisionallyAdmitted}: {f: nil, err: consts.ErrWorkflowCompleted},
reasonState{r: AbortReasonWorkflowCompleted, st: stateAdmitted}: {f: nil, err: consts.ErrWorkflowCompleted},
reasonState{r: AbortReasonWorkflowCompleted, st: stateSent}: {f: nil, err: consts.ErrWorkflowCompleted},
// Accepted Updates are failed with special server failure because if a client knows that Update has been accepted,
// it expects any following requests to return an Update result (or failure) but not an error.
// There can be different types of Update failures coming from worker and a client must handle them anyway.
// It is easier and less error-prone for a client to handle only Update failures instead of both failures and
// not obvious NotFound errors in case if the Workflow completes before the Update completes.
reasonState{r: AbortReasonWorkflowCompleted, st: stateProvisionallyAccepted}: {f: acceptedUpdateCompletedWorkflowFailure, err: nil},
reasonState{r: AbortReasonWorkflowCompleted, st: stateAccepted}: {f: acceptedUpdateCompletedWorkflowFailure, err: nil},
// Completed Updates can't be aborted. These cases are just to fill the matrix.
reasonState{r: AbortReasonWorkflowCompleted, st: stateProvisionallyCompleted}: {f: nil, err: consts.ErrWorkflowCompleted},
reasonState{r: AbortReasonWorkflowCompleted, st: stateCompleted}: {f: nil, err: consts.ErrWorkflowCompleted},
reasonState{r: AbortReasonWorkflowCompleted, st: stateAborted}: {f: nil, err: consts.ErrWorkflowCompleted},
reasonState{r: AbortReasonWorkflowCompleted, st: stateProvisionallyCompletedAfterAccepted}: {f: nil, err: consts.ErrWorkflowCompleted},

// If Workflow is starting new run, then all Updates are aborted with retryable ErrWorkflowClosing error.
// Internal retries will send them to the new run.
reasonState{r: AbortReasonWorkflowContinuing, st: stateCreated}: {f: nil, err: consts.ErrWorkflowClosing},
reasonState{r: AbortReasonWorkflowContinuing, st: stateProvisionallyAdmitted}: {f: nil, err: consts.ErrWorkflowClosing},
reasonState{r: AbortReasonWorkflowContinuing, st: stateAdmitted}: {f: nil, err: consts.ErrWorkflowClosing},
reasonState{r: AbortReasonWorkflowContinuing, st: stateSent}: {f: nil, err: consts.ErrWorkflowClosing},
// Accepted Update can't be applied to the new run, and must be failed same way as if Workflow is completed.
reasonState{r: AbortReasonWorkflowContinuing, st: stateProvisionallyAccepted}: {f: acceptedUpdateCompletedWorkflowFailure, err: nil},
reasonState{r: AbortReasonWorkflowContinuing, st: stateAccepted}: {f: acceptedUpdateCompletedWorkflowFailure, err: nil},
// Completed Updates can't be aborted. These cases are just to fill the matrix.
reasonState{r: AbortReasonWorkflowContinuing, st: stateProvisionallyCompleted}: {f: nil, err: consts.ErrWorkflowCompleted},
reasonState{r: AbortReasonWorkflowContinuing, st: stateCompleted}: {f: nil, err: consts.ErrWorkflowCompleted},
reasonState{r: AbortReasonWorkflowContinuing, st: stateAborted}: {f: nil, err: consts.ErrWorkflowCompleted},
reasonState{r: AbortReasonWorkflowContinuing, st: stateProvisionallyCompletedAfterAccepted}: {f: nil, err: consts.ErrWorkflowCompleted},
}

// FailureError returns failure or error which will be set to Update futures while aborting Update.
// Only one of the return values will be non-nil.
func (r AbortReason) FailureError(st state) (*failurepb.Failure, error) {
fe, ok := reasonStateMatrix[reasonState{r: r, st: st}]
if !ok {
panic("unknown workflow update abort reason or update state")
}
return fe.f, fe.err
}

func (r AbortReason) String() string {
switch r {
case AbortReasonRegistryCleared:
return registryClearedErr
return "RegistryCleared"
case AbortReasonWorkflowCompleted:
return consts.ErrWorkflowCompleted
return "WorkflowCompleted"
case AbortReasonWorkflowContinuing:
return consts.ErrWorkflowClosing
default:
panic("unknown workflow update abort reason")
return "WorkflowContinuing"
case lastAbortReason:
return fmt.Sprintf("invalid reason %d", r)
}
return fmt.Sprintf("unrecognized reason %d", r)
}
47 changes: 47 additions & 0 deletions service/history/workflow/update/abort_reason_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package update

import (
"fmt"
"testing"

"github.com/stretchr/testify/require"
)

func TestAbortReasonUpdateStateMatrix(t *testing.T) {
for r := AbortReasonRegistryCleared; r < lastAbortReason; r++ {
for st := stateCreated; st < lastState; st <<= 1 {
fe, ok := reasonStateMatrix[reasonState{r: r, st: st}]
require.True(t, ok, fmt.Sprintf("missing combination: %v, %v", r, st))
if fe.f != nil {
require.Nil(t, fe.err)
}
if fe.err != nil {
require.Nil(t, fe.f)
}
}
}
}
9 changes: 9 additions & 0 deletions service/history/workflow/update/errors_failures.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,13 @@ var (
NonRetryable: true,
}},
}

acceptedUpdateCompletedWorkflowFailure = &failurepb.Failure{
Message: "Workflow Update is failed because it was accepted by Workflow but then Workflow completed.",
Source: "Server",
FailureInfo: &failurepb.Failure_ApplicationFailureInfo{ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{
Type: "AcceptedUpdateCompletedWorkflow",
NonRetryable: true,
}},
}
)
25 changes: 16 additions & 9 deletions service/history/workflow/update/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ package update_test

import (
"context"
"fmt"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -132,8 +131,10 @@ func TestNewRegistry(t *testing.T) {
upd := reg.Find(context.Background(), tv.UpdateID())
require.NotNil(t, upd)

_, err := upd.WaitLifecycleStage(context.Background(), 0, 100*time.Millisecond)
require.Equal(t, err, consts.ErrWorkflowCompleted)
status, err := upd.WaitLifecycleStage(context.Background(), 0, 100*time.Millisecond)
require.NoError(t, err)
require.NotNil(t, status)
require.Equal(t, "Workflow Update is failed because it was accepted by Workflow but then Workflow completed.", status.Outcome.GetFailure().Message)
})

t.Run("registry created from store with update in stateCompleted has no updates but increased completed count", func(t *testing.T) {
Expand Down Expand Up @@ -580,13 +581,19 @@ func TestAbort(t *testing.T) {
// abort both updates
reg.Abort(update.AbortReasonWorkflowCompleted)

for i := 1; i <= 2; i++ {
upd := reg.Find(context.Background(), tv.UpdateID(fmt.Sprintf("%d", i)))
require.NotNil(t, upd)
upd1 := reg.Find(context.Background(), tv.UpdateID("1"))
require.NotNil(t, upd1)
status1, err := upd1.WaitLifecycleStage(context.Background(), 0, 2*time.Second)
require.Equal(t, consts.ErrWorkflowCompleted, err)
require.Nil(t, status1)

upd2 := reg.Find(context.Background(), tv.UpdateID("2"))
require.NotNil(t, upd2)
status2, err := upd2.WaitLifecycleStage(context.Background(), 0, 2*time.Second)
require.NoError(t, err)
require.NotNil(t, status2)
require.Equal(t, "Workflow Update is failed because it was accepted by Workflow but then Workflow completed.", status2.Outcome.GetFailure().Message)

_, err := upd.WaitLifecycleStage(context.Background(), 0, 2*time.Second)
require.Equal(t, consts.ErrWorkflowCompleted, err)
}
require.Equal(t, 2, reg.Len(), "registry should still contain both updates")
}

Expand Down
9 changes: 8 additions & 1 deletion service/history/workflow/update/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@

package update

import (
"fmt"
)

type (
state uint32
stateSet uint32
Expand All @@ -40,6 +44,7 @@ const (
stateCompleted
stateAborted
stateProvisionallyCompletedAfterAccepted
lastState
)

func (s state) String() string {
Expand All @@ -64,8 +69,10 @@ func (s state) String() string {
return "Aborted"
case stateProvisionallyCompletedAfterAccepted:
return "ProvisionallyCompletedAfterAccepted"
case lastState:
return fmt.Sprintf("invalid state %d", s)
}
return "unrecognized state"
return fmt.Sprintf("unrecognized state %d", s)
}

func (s state) Matches(mask stateSet) bool {
Expand Down
22 changes: 12 additions & 10 deletions service/history/workflow/update/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,21 +263,21 @@ func (u *Update) WaitLifecycleStage(
func (u *Update) abort(reason AbortReason) {
u.instrumentation.countAborted()

abortFailure, abortErr := reason.FailureError(u.state)
var abortOutcome *updatepb.Outcome
if abortFailure != nil {
abortOutcome = &updatepb.Outcome{Value: &updatepb.Outcome_Failure{Failure: abortFailure}}
}

const preAcceptedStates = stateSet(stateCreated | stateProvisionallyAdmitted | stateAdmitted | stateSent | stateProvisionallyAccepted)
if u.state.Matches(preAcceptedStates | stateSet(stateProvisionallyCompletedAfterAccepted)) {
u.accepted.(*future.FutureImpl[*failurepb.Failure]).Set(nil, reason.Error())
u.outcome.(*future.FutureImpl[*updatepb.Outcome]).Set(nil, reason.Error())
u.accepted.(*future.FutureImpl[*failurepb.Failure]).Set(abortFailure, abortErr)
u.outcome.(*future.FutureImpl[*updatepb.Outcome]).Set(abortOutcome, abortErr)
}

const preCompletedStates = stateSet(stateAccepted | stateProvisionallyCompleted)
if u.state.Matches(preCompletedStates) {
abortErr := reason.Error()
if reason == AbortReasonWorkflowContinuing {
// Accepted Update can't be applied to the new run, and must be aborted
// same way as if Workflow is completed.
abortErr = AbortReasonWorkflowCompleted.Error()
}
Comment on lines -274 to -279
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am happy to remove this hack.

u.outcome.(*future.FutureImpl[*updatepb.Outcome]).Set(nil, abortErr)
u.outcome.(*future.FutureImpl[*updatepb.Outcome]).Set(abortOutcome, abortErr)
}

u.setState(stateAborted)
Expand All @@ -302,7 +302,9 @@ func (u *Update) Admit(
// There shouldn't be any waiters before Update is admitted (this func returns).
// Call abort to seal the Update.
u.abort(AbortReasonWorkflowCompleted)
return AbortReasonWorkflowCompleted.Error()
// This error must be not nil.
_, abortErr := AbortReasonWorkflowCompleted.FailureError(stateCreated)
return abortErr
}

u.instrumentation.countRequestMsg()
Expand Down
5 changes: 3 additions & 2 deletions service/history/workflow/update/update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,8 +707,9 @@ func TestUpdateState(t *testing.T) {
require.NoError(t, err)

status, err := upd.WaitLifecycleStage(context.Background(), UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED, 100*time.Millisecond)
require.ErrorIs(t, err, consts.ErrWorkflowCompleted)
require.Nil(t, status)
require.NoError(t, err)
require.NotNil(t, status)
require.Equal(t, "Workflow Update is failed because it was accepted by Workflow but then Workflow completed.", status.Outcome.GetFailure().Message)
},
}, {
title: "fail to transition to stateCompleted on store write failure",
Expand Down
24 changes: 14 additions & 10 deletions tests/update_workflow_sdk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"time"

"github.com/stretchr/testify/suite"

enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
updatepb "go.temporal.io/api/update/v1"
Expand Down Expand Up @@ -135,11 +134,14 @@ func (s *UpdateWorkflowSdkSuite) TestUpdateWorkflow_TimeoutWorkflowAfterUpdateAc
updateHandle, err := s.updateWorkflowWaitAccepted(ctx, tv, "my-update-arg")
s.NoError(err)

var notFound *serviceerror.NotFound
s.ErrorAs(updateHandle.Get(ctx, nil), &notFound)
err = updateHandle.Get(ctx, nil)
var appErr *temporal.ApplicationError
s.ErrorAs(err, &appErr)
s.Contains("Workflow Update is failed because it was accepted by Workflow but then Workflow completed.", appErr.Message())

_, pollErr := s.pollUpdate(ctx, tv, &updatepb.WaitPolicy{LifecycleStage: enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED})
s.ErrorAs(pollErr, &notFound)
pollFailure, pollErr := s.pollUpdate(ctx, tv, &updatepb.WaitPolicy{LifecycleStage: enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED})
s.NoError(pollErr)
s.Equal("Workflow Update is failed because it was accepted by Workflow but then Workflow completed.", pollFailure.GetOutcome().GetFailure().GetMessage())

var wee *temporal.WorkflowExecutionError
s.ErrorAs(wfRun.Get(ctx, nil), &wee)
Expand Down Expand Up @@ -197,11 +199,14 @@ func (s *UpdateWorkflowSdkSuite) TestUpdateWorkflow_TerminateWorkflowAfterUpdate

s.NoError(s.SdkClient().TerminateWorkflow(ctx, tv.WorkflowID(), wfRun.GetRunID(), "reason"))

var notFound *serviceerror.NotFound
s.ErrorAs(updateHandle.Get(ctx, nil), &notFound)
err = updateHandle.Get(ctx, nil)
var appErr *temporal.ApplicationError
s.ErrorAs(err, &appErr)
s.Contains("Workflow Update is failed because it was accepted by Workflow but then Workflow completed.", appErr.Message())

_, pollErr := s.pollUpdate(ctx, tv, &updatepb.WaitPolicy{LifecycleStage: enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED})
s.ErrorAs(pollErr, &notFound)
pollFailure, pollErr := s.pollUpdate(ctx, tv, &updatepb.WaitPolicy{LifecycleStage: enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED})
s.NoError(pollErr)
s.Equal("Workflow Update is failed because it was accepted by Workflow but then Workflow completed.", pollFailure.GetOutcome().GetFailure().GetMessage())

var wee *temporal.WorkflowExecutionError
s.ErrorAs(wfRun.Get(ctx, nil), &wee)
Expand Down Expand Up @@ -327,7 +332,6 @@ func (s *UpdateWorkflowSdkSuite) TestUpdateWorkflow_TimeoutWithRetryAfterUpdateA
TaskQueue: tv.TaskQueue().Name,
WorkflowRunTimeout: 1 * time.Second,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: time.Nanosecond,
MaximumAttempts: 2,
},
}, workflowFn)
Expand Down
Loading
Loading