From e85a098eaee2105afbd91d093846fa2a95c34037 Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Thu, 29 Aug 2024 09:31:25 -0700 Subject: [PATCH] Update-with-Start operation (#1579) Adds support for Update-with-Start, using the MultiOperation API (temporalio/api#367). --- .../docker/dynamic-config-custom.yaml | 2 + client/client.go | 18 + internal/client.go | 81 +++- internal/cmd/build/main.go | 4 +- internal/internal_workflow_client.go | 381 +++++++++++++----- internal/internal_workflow_client_test.go | 146 ++++++- test/integration_test.go | 237 ++++++++++- test/workflow_test.go | 25 ++ 8 files changed, 779 insertions(+), 115 deletions(-) diff --git a/.github/workflows/docker/dynamic-config-custom.yaml b/.github/workflows/docker/dynamic-config-custom.yaml index c44b94104..7a9fe5d2e 100644 --- a/.github/workflows/docker/dynamic-config-custom.yaml +++ b/.github/workflows/docker/dynamic-config-custom.yaml @@ -7,6 +7,8 @@ frontend.enableUpdateWorkflowExecution: - value: true frontend.enableUpdateWorkflowExecutionAsyncAccepted: - value: true +frontend.enableExecuteMultiOperation: + - value: true system.enableEagerWorkflowStart: - value: true frontend.workerVersioningRuleAPIs: diff --git a/client/client.go b/client/client.go index 15e2f47b5..83d583fe3 100644 --- a/client/client.go +++ b/client/client.go @@ -162,6 +162,16 @@ type ( // StartWorkflowOptions configuration parameters for starting a workflow execution. StartWorkflowOptions = internal.StartWorkflowOptions + // WithStartWorkflowOperation is a type of operation that can be executed as part of a workflow start. + // For example, use NewUpdateWithStartWorkflowOperation to perform Update-with-Start. + // NOTE: Experimental + WithStartWorkflowOperation = internal.WithStartWorkflowOperation + + // UpdateWithStartWorkflowOperation is used to perform Update-with-Start. + // See NewUpdateWithStartWorkflowOperation for details. + // NOTE: Experimental + UpdateWithStartWorkflowOperation = internal.UpdateWithStartWorkflowOperation + // HistoryEventIterator is a iterator which can return history events. HistoryEventIterator = internal.HistoryEventIterator @@ -921,6 +931,14 @@ type MetricsTimer = metrics.Timer // MetricsNopHandler is a noop handler that does nothing with the metrics. var MetricsNopHandler = metrics.NopHandler +// NewUpdateWithStartWorkflowOperation returns an UpdateWithStartWorkflowOperation to perform Update-with-Start. +// After executing Client.ExecuteWorkflow with the UpdateWithStartWorkflow in the start options, +// the update result can be obtained. +// NOTE: Experimental +func NewUpdateWithStartWorkflowOperation(options UpdateWorkflowOptions) *UpdateWithStartWorkflowOperation { + return internal.NewUpdateWithStartWorkflowOperation(options) +} + // Dial creates an instance of a workflow client. This will attempt to connect // to the server eagerly and will return an error if the server is not // available. diff --git a/internal/client.go b/internal/client.go index 6e00c9507..0a57216a4 100644 --- a/internal/client.go +++ b/internal/client.go @@ -27,6 +27,7 @@ package internal import ( "context" "crypto/tls" + "errors" "fmt" "sync/atomic" "time" @@ -643,9 +644,23 @@ type ( // Optional: defaulted to Fail. WorkflowIDConflictPolicy enumspb.WorkflowIdConflictPolicy + // WithStartOperation - Operation to execute with Workflow Start. + // For example, see NewUpdateWithStartWorkflowOperation to perform Update-with-Start. Note that if the workflow is + // already running and WorkflowIDConflictPolicy is set to UseExisting, the start is skipped and only the + // operation is executed. If instead the policy is set to Fail (the default), nothing is executed and + // an error will be returned (i.e. the option WorkflowExecutionErrorWhenAlreadyStarted is ignored). + // This option will be ignored when used with Client.SignalWithStartWorkflow. + // + // Optional: defaults to nil. + // + // NOTE: Experimental + WithStartOperation WithStartWorkflowOperation + // When WorkflowExecutionErrorWhenAlreadyStarted is true, Client.ExecuteWorkflow will return an error if the - // workflow id has already been used and WorkflowIDReusePolicy would disallow a re-run. If it is set to false, - // rather than erroring a WorkflowRun instance representing the current or last run will be returned. + // workflow id has already been used and WorkflowIDReusePolicy or WorkflowIDConflictPolicy would + // disallow a re-run. If it is set to false, rather than erroring a WorkflowRun instance representing + // the current or last run will be returned. However, when WithStartOperation is set, this field is ignored and + // the WorkflowIDConflictPolicy UseExisting must be used instead to prevent erroring. // // Optional: defaults to false WorkflowExecutionErrorWhenAlreadyStarted bool @@ -714,6 +729,24 @@ type ( links []*commonpb.Link } + // WithStartWorkflowOperation is a type of operation that can be executed as part of a workflow start. + WithStartWorkflowOperation interface { + isWithStartWorkflowOperation() + } + + // UpdateWithStartWorkflowOperation is used to perform Update-with-Start. + // See NewUpdateWithStartWorkflowOperation for details. + UpdateWithStartWorkflowOperation struct { + input *ClientUpdateWorkflowInput + // flag to ensure the operation is only executed once + executed atomic.Bool + // channel to indicate that handle or err is available + doneCh chan struct{} + // handle and err cannot be accessed before doneCh is closed + handle WorkflowUpdateHandle + err error + } + // RetryPolicy defines the retry policy. // Note that the history of activity with retry policy will be different: the started event will be written down into // history only when the activity completes or "finally" timeouts/fails. And the started event only records the last @@ -1004,6 +1037,50 @@ func DialCloudOperationsClient(ctx context.Context, options CloudOperationsClien }, nil } +// NewUpdateWithStartWorkflowOperation returns an UpdateWithStartWorkflowOperation that can be used to perform Update-with-Start. +func NewUpdateWithStartWorkflowOperation(options UpdateWorkflowOptions) *UpdateWithStartWorkflowOperation { + res := &UpdateWithStartWorkflowOperation{doneCh: make(chan struct{})} + + input, err := createUpdateWorkflowInput(options) + if err != nil { + res.set(nil, err) + } else if options.RunID != "" { + res.set(nil, errors.New("RunID cannot be set because the workflow might not be running")) + } + if options.FirstExecutionRunID != "" { + res.set(nil, errors.New("FirstExecutionRunID cannot be set because the workflow might not be running")) + } else { + res.input = input + } + + return res +} + +// Get blocks until a server response has been received; or the context deadline is exceeded. +func (op *UpdateWithStartWorkflowOperation) Get(ctx context.Context) (WorkflowUpdateHandle, error) { + select { + case <-op.doneCh: + return op.handle, op.err + case <-ctx.Done(): + return nil, ctx.Err() + } +} + +func (op *UpdateWithStartWorkflowOperation) markExecuted() error { + if op.executed.Swap(true) { + return fmt.Errorf("was already executed") + } + return nil +} + +func (op *UpdateWithStartWorkflowOperation) set(handle WorkflowUpdateHandle, err error) { + op.handle = handle + op.err = err + close(op.doneCh) +} + +func (op *UpdateWithStartWorkflowOperation) isWithStartWorkflowOperation() {} + // NewNamespaceClient creates an instance of a namespace client, to manager lifecycle of namespaces. func NewNamespaceClient(options ClientOptions) (NamespaceClient, error) { // Initialize root tags diff --git a/internal/cmd/build/main.go b/internal/cmd/build/main.go index bc1f651ed..0a1a07fa9 100644 --- a/internal/cmd/build/main.go +++ b/internal/cmd/build/main.go @@ -41,9 +41,10 @@ import ( _ "github.com/BurntSushi/toml" _ "github.com/kisielk/errcheck/errcheck" + _ "honnef.co/go/tools/staticcheck" + "go.temporal.io/sdk/client" "go.temporal.io/sdk/testsuite" - _ "honnef.co/go/tools/staticcheck" ) func main() { @@ -145,6 +146,7 @@ func (b *builder) integrationTest() error { }, LogLevel: "warn", ExtraArgs: []string{ + "--dynamic-config-value", "frontend.enableExecuteMultiOperation=true", "--dynamic-config-value", "frontend.enableUpdateWorkflowExecution=true", "--dynamic-config-value", "frontend.enableUpdateWorkflowExecutionAsyncAccepted=true", "--dynamic-config-value", "frontend.workerVersioningRuleAPIs=true", diff --git a/internal/internal_workflow_client.go b/internal/internal_workflow_client.go index 30f956b8a..9020c1c91 100644 --- a/internal/internal_workflow_client.go +++ b/internal/internal_workflow_client.go @@ -67,15 +67,18 @@ var ( _ NamespaceClient = (*namespaceClient)(nil) ) -const ( - defaultGetHistoryTimeout = 65 * time.Second - - defaultGetSystemInfoTimeout = 5 * time.Second - - pollUpdateTimeout = 60 * time.Second +var ( + errUnsupportedOperation = fmt.Errorf("unsupported operation") + errInvalidServerResponse = fmt.Errorf("invalid server response") + errInvalidWorkflowOperation = fmt.Errorf("invalid WithStartOperation") ) -var maxListArchivedWorkflowTimeout = time.Minute * 3 +const ( + defaultGetHistoryTimeout = 65 * time.Second + defaultGetSystemInfoTimeout = 5 * time.Second + pollUpdateTimeout = 60 * time.Second + maxListArchivedWorkflowTimeout = 3 * time.Minute +) type ( // WorkflowClient is the client for starting a workflow execution. @@ -334,6 +337,9 @@ func (wc *WorkflowClient) SignalWithStartWorkflow(ctx context.Context, workflowI if options.ID != "" && options.ID != workflowID { return nil, fmt.Errorf("workflow ID from options not used, must be unset or match workflow ID parameter") } + if options.WithStartOperation != nil { + return nil, fmt.Errorf("option WithStartOperation is not allowed") + } // Default workflow ID to UUID options.ID = workflowID @@ -769,11 +775,12 @@ type UpdateWorkflowOptions struct { UpdateID string // WorkflowID is a required field indicating the workflow which should be - // updated. + // updated. However, it is optional when using UpdateWithStartWorkflowOperation. WorkflowID string // RunID is an optional field used to identify a specific run of the target // workflow. If RunID is not provided the latest run will be used. + // Note that it is incompatible with UpdateWithStartWorkflowOperation. RunID string // UpdateName is a required field which specifies the update you want to run. @@ -793,6 +800,7 @@ type UpdateWorkflowOptions struct { // FirstExecutionRunID specifies the RunID expected to identify the first // run in the workflow execution chain. If this expectation does not match // then the server will reject the update request with an error. + // Note that it is incompatible with UpdateWithStartWorkflowOperation. FirstExecutionRunID string } @@ -1154,36 +1162,20 @@ func (wc *WorkflowClient) PollWorkflowUpdate( func (wc *WorkflowClient) UpdateWorkflow( ctx context.Context, - opt UpdateWorkflowOptions, + options UpdateWorkflowOptions, ) (WorkflowUpdateHandle, error) { if err := wc.ensureInitialized(ctx); err != nil { return nil, err } - // Default update ID - updateID := opt.UpdateID - if updateID == "" { - updateID = uuid.New() - } - if opt.WaitForStage == WorkflowUpdateStageUnspecified { - return nil, errors.New("WaitForStage must be specified") - } - - if opt.WaitForStage == WorkflowUpdateStageAdmitted { - return nil, errors.New("WaitForStage WorkflowUpdateStageAdmitted is not supported") + in, err := createUpdateWorkflowInput(options) + if err != nil { + return nil, err } ctx = contextWithNewHeader(ctx) - return wc.interceptor.UpdateWorkflow(ctx, &ClientUpdateWorkflowInput{ - UpdateID: updateID, - WorkflowID: opt.WorkflowID, - UpdateName: opt.UpdateName, - Args: opt.Args, - RunID: opt.RunID, - FirstExecutionRunID: opt.FirstExecutionRunID, - WaitForStage: opt.WaitForStage, - }) + return wc.interceptor.UpdateWorkflow(ctx, in) } // CheckHealthRequest is a request for Client.CheckHealth. @@ -1618,27 +1610,35 @@ func (w *workflowClientInterceptor) ExecuteWorkflow( startRequest.WorkflowStartDelay = durationpb.New(in.Options.StartDelay) } - var response *workflowservice.StartWorkflowExecutionResponse - grpcCtx, cancel := newGRPCContext(ctx, grpcMetricsHandler( w.client.metricsHandler.WithTags(metrics.RPCTags(in.WorkflowType, metrics.NoneTagValue, in.Options.TaskQueue))), defaultGrpcRetryParameters(ctx)) defer cancel() - response, err = w.client.workflowService.StartWorkflowExecution(grpcCtx, startRequest) - eagerWorkflowTask := response.GetEagerWorkflowTask() - if eagerWorkflowTask != nil && eagerExecutor != nil { - eagerExecutor.handleResponse(eagerWorkflowTask) - } else if eagerExecutor != nil { - eagerExecutor.releaseUnused() - } - // Allow already-started error var runID string - if e, ok := err.(*serviceerror.WorkflowExecutionAlreadyStarted); ok && !in.Options.WorkflowExecutionErrorWhenAlreadyStarted { - runID = e.RunId - } else if err != nil { - return nil, err + if in.Options.WithStartOperation == nil { + response, err := w.client.workflowService.StartWorkflowExecution(grpcCtx, startRequest) + + eagerWorkflowTask := response.GetEagerWorkflowTask() + if eagerWorkflowTask != nil && eagerExecutor != nil { + eagerExecutor.handleResponse(eagerWorkflowTask) + } else if eagerExecutor != nil { + eagerExecutor.releaseUnused() + } + + // Allow already-started error + if e, ok := err.(*serviceerror.WorkflowExecutionAlreadyStarted); ok && !in.Options.WorkflowExecutionErrorWhenAlreadyStarted { + runID = e.RunId + } else if err != nil { + return nil, err + } else { + runID = response.RunId + } } else { + response, err := w.executeWorkflowWithOperation(grpcCtx, startRequest, in.Options.WithStartOperation) + if err != nil { + return nil, err + } runID = response.RunId } @@ -1662,6 +1662,124 @@ func (w *workflowClientInterceptor) ExecuteWorkflow( }, nil } +func (w *workflowClientInterceptor) executeWorkflowWithOperation( + ctx context.Context, + startRequest *workflowservice.StartWorkflowExecutionRequest, + operation WithStartWorkflowOperation, +) (*workflowservice.StartWorkflowExecutionResponse, error) { + startOp := &workflowservice.ExecuteMultiOperationRequest_Operation{ + Operation: &workflowservice.ExecuteMultiOperationRequest_Operation_StartWorkflow{ + StartWorkflow: startRequest, + }, + } + + var withStartOp *workflowservice.ExecuteMultiOperationRequest_Operation + switch t := operation.(type) { + case *UpdateWithStartWorkflowOperation: + if err := t.markExecuted(); err != nil { + return nil, fmt.Errorf("%w: %w", errInvalidWorkflowOperation, err) + } + + if t.err != nil { + return nil, fmt.Errorf("%w: %w", errInvalidWorkflowOperation, t.err) + } + + updateReq, err := w.createUpdateWorkflowRequest(ctx, t.input) + if err != nil { + return nil, fmt.Errorf("%w: %w", errInvalidWorkflowOperation, err) + } + if updateReq.WorkflowExecution.WorkflowId == "" { + updateReq.WorkflowExecution.WorkflowId = startRequest.WorkflowId + } + + withStartOp = &workflowservice.ExecuteMultiOperationRequest_Operation{ + Operation: &workflowservice.ExecuteMultiOperationRequest_Operation_UpdateWorkflow{ + UpdateWorkflow: updateReq, + }, + } + default: + return nil, fmt.Errorf("%w: %T", errUnsupportedOperation, t) + } + + multiRequest := workflowservice.ExecuteMultiOperationRequest{ + Namespace: w.client.namespace, + Operations: []*workflowservice.ExecuteMultiOperationRequest_Operation{ + startOp, + withStartOp, + }, + } + multiResp, err := w.client.workflowService.ExecuteMultiOperation(ctx, &multiRequest) + + var multiErr *serviceerror.MultiOperationExecution + if errors.As(err, &multiErr) { + if len(multiErr.OperationErrors()) != len(multiRequest.Operations) { + return nil, fmt.Errorf("%w: %v instead of %v operation errors", + errInvalidServerResponse, len(multiErr.OperationErrors()), len(multiRequest.Operations)) + } + + var startErr error + var abortedErr *serviceerror.MultiOperationAborted + for i, opReq := range multiRequest.Operations { + // if an operation error is of type MultiOperationAborted, it means it was only aborted because + // of another operation's error and is therefore not interesting or helpful + opErr := multiErr.OperationErrors()[i] + + switch t := opReq.Operation.(type) { + case *workflowservice.ExecuteMultiOperationRequest_Operation_StartWorkflow: + if !errors.As(opErr, &abortedErr) { + startErr = opErr + } + case *workflowservice.ExecuteMultiOperationRequest_Operation_UpdateWorkflow: + if !errors.As(opErr, &abortedErr) { + startErr = fmt.Errorf("%w: %w", errInvalidWorkflowOperation, opErr) + } + default: + // this would only happen if a case statement for a newly added operation is missing above + return nil, fmt.Errorf("%w: %T", errUnsupportedOperation, t) + } + } + return nil, startErr + } else if err != nil { + return nil, err + } + + if len(multiResp.Responses) != len(multiRequest.Operations) { + return nil, fmt.Errorf("%w: %v instead of %v operation results", + errInvalidServerResponse, len(multiResp.Responses), len(multiRequest.Operations)) + } + + var startResp *workflowservice.StartWorkflowExecutionResponse + for i, opReq := range multiRequest.Operations { + resp := multiResp.Responses[i].Response + + switch t := opReq.Operation.(type) { + case *workflowservice.ExecuteMultiOperationRequest_Operation_StartWorkflow: + if opResp, ok := resp.(*workflowservice.ExecuteMultiOperationResponse_Response_StartWorkflow); ok { + startResp = opResp.StartWorkflow + } else { + return nil, fmt.Errorf("%w: StartWorkflow response has the wrong type %T", errInvalidServerResponse, resp) + } + case *workflowservice.ExecuteMultiOperationRequest_Operation_UpdateWorkflow: + if opResp, ok := resp.(*workflowservice.ExecuteMultiOperationResponse_Response_UpdateWorkflow); ok { + handle, err := w.updateHandleFromResponse( + ctx, + enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_UNSPECIFIED, + opResp.UpdateWorkflow) + operation.(*UpdateWithStartWorkflowOperation).set(handle, err) + if err != nil { + return nil, fmt.Errorf("%w: %w", errInvalidWorkflowOperation, err) + } + } else { + return nil, fmt.Errorf("%w: UpdateWorkflow response has the wrong type %T", errInvalidServerResponse, resp) + } + default: + // this would only happen if a case statement for a newly added operation is missing above + return nil, fmt.Errorf("%w: %T", errUnsupportedOperation, t) + } + } + return startResp, nil +} + func (w *workflowClientInterceptor) SignalWorkflow(ctx context.Context, in *ClientSignalWorkflowInput) error { dataConverter := WithContext(ctx, w.client.dataConverter) input, err := encodeArg(dataConverter, in.Arg) @@ -1876,42 +1994,18 @@ func (w *workflowClientInterceptor) UpdateWorkflow( ctx context.Context, in *ClientUpdateWorkflowInput, ) (WorkflowUpdateHandle, error) { - argPayloads, err := w.client.dataConverter.ToPayloads(in.Args...) - if err != nil { - return nil, err - } - header, err := headerPropagated(ctx, w.client.contextPropagators) + var resp *workflowservice.UpdateWorkflowExecutionResponse + req, err := w.createUpdateWorkflowRequest(ctx, in) if err != nil { return nil, err } - desiredLifecycleStage := updateLifeCycleStageToProto(in.WaitForStage) - var resp *workflowservice.UpdateWorkflowExecutionResponse + for { var err error resp, err = func() (*workflowservice.UpdateWorkflowExecutionResponse, error) { grpcCtx, cancel := newGRPCContext(ctx, grpcTimeout(pollUpdateTimeout), grpcLongPoll(true), defaultGrpcRetryParameters(ctx)) defer cancel() - wfexec := &commonpb.WorkflowExecution{ - WorkflowId: in.WorkflowID, - RunId: in.RunID, - } - return w.client.workflowService.UpdateWorkflowExecution(grpcCtx, &workflowservice.UpdateWorkflowExecutionRequest{ - WaitPolicy: &updatepb.WaitPolicy{LifecycleStage: desiredLifecycleStage}, - Namespace: w.client.namespace, - WorkflowExecution: wfexec, - FirstExecutionRunId: in.FirstExecutionRunID, - Request: &updatepb.Request{ - Meta: &updatepb.Meta{ - UpdateId: in.UpdateID, - Identity: w.client.identity, - }, - Input: &updatepb.Input{ - Header: header, - Name: in.UpdateName, - Args: argPayloads, - }, - }, - }) + return w.client.workflowService.UpdateWorkflowExecution(grpcCtx, req) }() if err != nil { if ctx.Err() != nil { @@ -1930,44 +2024,74 @@ func (w *workflowClientInterceptor) UpdateWorkflow( break } } + // Here we know the update is at least accepted - if desiredLifecycleStage == enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED && - resp.GetStage() != enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED { - // TODO(https://github.com/temporalio/features/issues/428) replace with handle wait for stage once implemented - pollResp, err := w.client.PollWorkflowUpdate(ctx, resp.GetUpdateRef()) - if err != nil { - return nil, err - } - if pollResp.Error != nil { - return &completedUpdateHandle{ - err: pollResp.Error, - baseUpdateHandle: baseUpdateHandle{ref: resp.GetUpdateRef()}, - }, nil - } else { - return &completedUpdateHandle{ - value: pollResp.Result, - baseUpdateHandle: baseUpdateHandle{ref: resp.GetUpdateRef()}, - }, nil - } + desiredLifecycleStage := updateLifeCycleStageToProto(in.WaitForStage) + return w.updateHandleFromResponse(ctx, desiredLifecycleStage, resp) +} + +func createUpdateWorkflowInput( + options UpdateWorkflowOptions, +) (*ClientUpdateWorkflowInput, error) { + // Default update ID + updateID := options.UpdateID + if updateID == "" { + updateID = uuid.New() } - switch v := resp.GetOutcome().GetValue().(type) { - case nil: - return &lazyUpdateHandle{ - client: w.client, - baseUpdateHandle: baseUpdateHandle{ref: resp.GetUpdateRef()}, - }, nil - case *updatepb.Outcome_Failure: - return &completedUpdateHandle{ - err: w.client.failureConverter.FailureToError(v.Failure), - baseUpdateHandle: baseUpdateHandle{ref: resp.GetUpdateRef()}, - }, nil - case *updatepb.Outcome_Success: - return &completedUpdateHandle{ - value: newEncodedValue(v.Success, w.client.dataConverter), - baseUpdateHandle: baseUpdateHandle{ref: resp.GetUpdateRef()}, - }, nil + + if options.WaitForStage == WorkflowUpdateStageUnspecified { + return nil, errors.New("WaitForStage must be specified") } - return nil, fmt.Errorf("unsupported outcome type %T", resp.GetOutcome().GetValue()) + + if options.WaitForStage == WorkflowUpdateStageAdmitted { + return nil, errors.New("WaitForStage WorkflowUpdateStageAdmitted is not supported") + } + + return &ClientUpdateWorkflowInput{ + UpdateID: updateID, + WorkflowID: options.WorkflowID, + UpdateName: options.UpdateName, + Args: options.Args, + RunID: options.RunID, + FirstExecutionRunID: options.FirstExecutionRunID, + WaitForStage: options.WaitForStage, + }, nil +} + +func (w *workflowClientInterceptor) createUpdateWorkflowRequest( + ctx context.Context, + in *ClientUpdateWorkflowInput, +) (*workflowservice.UpdateWorkflowExecutionRequest, error) { + argPayloads, err := w.client.dataConverter.ToPayloads(in.Args...) + if err != nil { + return nil, err + } + + header, err := headerPropagated(ctx, w.client.contextPropagators) + if err != nil { + return nil, err + } + + return &workflowservice.UpdateWorkflowExecutionRequest{ + WaitPolicy: &updatepb.WaitPolicy{LifecycleStage: updateLifeCycleStageToProto(in.WaitForStage)}, + Namespace: w.client.namespace, + WorkflowExecution: &commonpb.WorkflowExecution{ + WorkflowId: in.WorkflowID, + RunId: in.RunID, + }, + FirstExecutionRunId: in.FirstExecutionRunID, + Request: &updatepb.Request{ + Meta: &updatepb.Meta{ + UpdateId: in.UpdateID, + Identity: w.client.identity, + }, + Input: &updatepb.Input{ + Header: header, + Name: in.UpdateName, + Args: argPayloads, + }, + }, + }, nil } func (w *workflowClientInterceptor) PollWorkflowUpdate( @@ -2028,6 +2152,51 @@ func (w *workflowClientInterceptor) PollWorkflowUpdate( // Required to implement ClientOutboundInterceptor func (*workflowClientInterceptor) mustEmbedClientOutboundInterceptorBase() {} +func (w *workflowClientInterceptor) updateHandleFromResponse( + ctx context.Context, + desiredLifecycleStage enumspb.UpdateWorkflowExecutionLifecycleStage, + resp *workflowservice.UpdateWorkflowExecutionResponse, +) (WorkflowUpdateHandle, error) { + if desiredLifecycleStage == enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED && + resp.GetStage() != enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED { + // TODO(https://github.com/temporalio/features/issues/428) replace with handle wait for stage once implemented + pollResp, err := w.client.PollWorkflowUpdate(ctx, resp.GetUpdateRef()) + if err != nil { + return nil, err + } + if pollResp.Error != nil { + return &completedUpdateHandle{ + err: pollResp.Error, + baseUpdateHandle: baseUpdateHandle{ref: resp.GetUpdateRef()}, + }, nil + } else { + return &completedUpdateHandle{ + value: pollResp.Result, + baseUpdateHandle: baseUpdateHandle{ref: resp.GetUpdateRef()}, + }, nil + } + } + + switch v := resp.GetOutcome().GetValue().(type) { + case nil: + return &lazyUpdateHandle{ + client: w.client, + baseUpdateHandle: baseUpdateHandle{ref: resp.GetUpdateRef()}, + }, nil + case *updatepb.Outcome_Failure: + return &completedUpdateHandle{ + err: w.client.failureConverter.FailureToError(v.Failure), + baseUpdateHandle: baseUpdateHandle{ref: resp.GetUpdateRef()}, + }, nil + case *updatepb.Outcome_Success: + return &completedUpdateHandle{ + value: newEncodedValue(v.Success, w.client.dataConverter), + baseUpdateHandle: baseUpdateHandle{ref: resp.GetUpdateRef()}, + }, nil + } + return nil, fmt.Errorf("unsupported outcome type %T", resp.GetOutcome().GetValue()) +} + func (uh *baseUpdateHandle) WorkflowID() string { return uh.ref.GetWorkflowExecution().GetWorkflowId() } diff --git a/internal/internal_workflow_client_test.go b/internal/internal_workflow_client_test.go index a21c9385b..d242edda1 100644 --- a/internal/internal_workflow_client_test.go +++ b/internal/internal_workflow_client_test.go @@ -976,6 +976,134 @@ func (s *workflowRunSuite) TestGetWorkflowNoExtantWorkflowAndNoRunId() { s.Equal("", workflowRunNoRunID.GetRunID()) } +func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_NonMultiOperationError() { + s.workflowServiceClient.EXPECT(). + ExecuteMultiOperation(gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil, serviceerror.NewInternal("internal error")).Times(1) + + updOp := NewUpdateWithStartWorkflowOperation( + UpdateWorkflowOptions{ + UpdateName: "update", + WaitForStage: WorkflowUpdateStageCompleted, + }) + + _, err := s.workflowClient.ExecuteWorkflow( + context.Background(), + StartWorkflowOptions{ + ID: workflowID, + TaskQueue: taskqueue, + WithStartOperation: updOp, + }, workflowType, + ) + s.ErrorContains(err, "internal error") +} + +func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_ServerResponseCountMismatch() { + s.workflowServiceClient.EXPECT(). + ExecuteMultiOperation(gomock.Any(), gomock.Any(), gomock.Any()). + Return(&workflowservice.ExecuteMultiOperationResponse{ + Responses: []*workflowservice.ExecuteMultiOperationResponse_Response{}, + }, nil).Times(1) + + updOp := NewUpdateWithStartWorkflowOperation( + UpdateWorkflowOptions{ + UpdateName: "update", + WaitForStage: WorkflowUpdateStageCompleted, + }) + + _, err := s.workflowClient.ExecuteWorkflow( + context.Background(), + StartWorkflowOptions{ + ID: workflowID, + TaskQueue: taskqueue, + WithStartOperation: updOp, + }, workflowType, + ) + s.ErrorContains(err, "invalid server response: 0 instead of 2 operation results") +} + +func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_ServerErrorResponseCountMismatch() { + s.workflowServiceClient.EXPECT(). + ExecuteMultiOperation(gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil, serviceerror.NewMultiOperationExecution("Error", []error{})).Times(1) + + updOp := NewUpdateWithStartWorkflowOperation( + UpdateWorkflowOptions{ + UpdateName: "update", + WaitForStage: WorkflowUpdateStageCompleted, + }) + + _, err := s.workflowClient.ExecuteWorkflow( + context.Background(), + StartWorkflowOptions{ + ID: workflowID, + TaskQueue: taskqueue, + WithStartOperation: updOp, + }, workflowType, + ) + s.ErrorContains(err, "invalid server response: 0 instead of 2 operation errors") +} + +func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_ServerStartResponseTypeMismatch() { + s.workflowServiceClient.EXPECT(). + ExecuteMultiOperation(gomock.Any(), gomock.Any(), gomock.Any()). + Return(&workflowservice.ExecuteMultiOperationResponse{ + Responses: []*workflowservice.ExecuteMultiOperationResponse_Response{ + { + Response: &workflowservice.ExecuteMultiOperationResponse_Response_UpdateWorkflow{}, // wrong! + }, + nil, + }, + }, nil).Times(1) + + updOp := NewUpdateWithStartWorkflowOperation( + UpdateWorkflowOptions{ + UpdateName: "update", + WaitForStage: WorkflowUpdateStageCompleted, + }) + + _, err := s.workflowClient.ExecuteWorkflow( + context.Background(), + StartWorkflowOptions{ + ID: workflowID, + TaskQueue: taskqueue, + WithStartOperation: updOp, + }, workflowType, + ) + s.ErrorContains(err, "invalid server response: StartWorkflow response has the wrong type *workflowservice.ExecuteMultiOperationResponse_Response_UpdateWorkflow") +} + +func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_ServerUpdateResponseTypeMismatch() { + s.workflowServiceClient.EXPECT(). + ExecuteMultiOperation(gomock.Any(), gomock.Any(), gomock.Any()). + Return(&workflowservice.ExecuteMultiOperationResponse{ + Responses: []*workflowservice.ExecuteMultiOperationResponse_Response{ + { + Response: &workflowservice.ExecuteMultiOperationResponse_Response_StartWorkflow{}, + }, + { + Response: &workflowservice.ExecuteMultiOperationResponse_Response_StartWorkflow{}, // wrong! + }, + }, + }, nil).Times(1) + + updOp := NewUpdateWithStartWorkflowOperation( + UpdateWorkflowOptions{ + UpdateName: "update", + WaitForStage: WorkflowUpdateStageCompleted, + }) + + _, err := s.workflowClient.ExecuteWorkflow( + context.Background(), + StartWorkflowOptions{ + ID: workflowID, + TaskQueue: taskqueue, + WithStartOperation: updOp, + }, workflowType, + ) + s.ErrorContains(err, "invalid server response: UpdateWorkflow response has the wrong type *workflowservice.ExecuteMultiOperationResponse_Response_StartWorkflow") +} + func getGetWorkflowExecutionHistoryRequest(filterType enumspb.HistoryEventFilterType) *workflowservice.GetWorkflowExecutionHistoryRequest { request := &workflowservice.GetWorkflowExecutionHistoryRequest{ Namespace: DefaultNamespace, @@ -1083,11 +1211,21 @@ func (s *workflowClientTestSuite) TestSignalWithStartWorkflowWithContextAwareDat s.Equal(startResponse.GetRunId(), resp.GetRunID()) } -func (s *workflowClientTestSuite) TestSignalWithStartWorkflowAmbiguousID() { - _, err := s.client.SignalWithStartWorkflow(context.Background(), "workflow-id-1", "my-signal", "my-signal-value", +func (s *workflowClientTestSuite) TestSignalWithStartWorkflowValidation() { + // ambiguous WorkflowID + _, err := s.client.SignalWithStartWorkflow( + context.Background(), "workflow-id-1", "my-signal", "my-signal-value", StartWorkflowOptions{ID: "workflow-id-2"}, workflowType) - s.Error(err) - s.Contains(err.Error(), "workflow ID from options not used") + s.ErrorContains(err, "workflow ID from options not used") + + // unsupported WithStartOperation + _, err = s.client.SignalWithStartWorkflow( + context.Background(), "workflow-id", "my-signal", "my-signal-value", + StartWorkflowOptions{ + ID: "workflow-id", + WithStartOperation: &UpdateWithStartWorkflowOperation{}, + }, workflowType) + s.ErrorContains(err, "option WithStartOperation is not allowed") } func (s *workflowClientTestSuite) TestStartWorkflow() { diff --git a/test/integration_test.go b/test/integration_test.go index 609a85742..df119164c 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -39,14 +39,13 @@ import ( "testing" "time" - "go.opentelemetry.io/otel/baggage" - "github.com/opentracing/opentracing-go" "github.com/pborman/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/uber-go/tally/v4" + "go.opentelemetry.io/otel/baggage" sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" "go.opentelemetry.io/otel/trace" @@ -3956,6 +3955,240 @@ func (ts *IntegrationTestSuite) TestUpdateSettingHandlerInHandler() { ts.NoError(run.Get(ctx, nil)) } +func (ts *IntegrationTestSuite) TestExecuteWorkflowWithUpdate() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + startOptionsWithOperation := func(op client.WithStartWorkflowOperation) client.StartWorkflowOptions { + startOptions := ts.startWorkflowOptions("test-update-with-start-" + uuid.New()) + startOptions.EnableEagerStart = false // not allowed to use with update-with-start + startOptions.WithStartOperation = op + return startOptions + } + + ts.Run("sends update-with-start (no running workflow)", func() { + updateOp := client.NewUpdateWithStartWorkflowOperation( + client.UpdateWorkflowOptions{ + UpdateName: "update", + Args: []any{1}, + WaitForStage: client.WorkflowUpdateStageAccepted, + }) + + startOptions := startOptionsWithOperation(updateOp) + run, err := ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) + ts.NoError(err) + + var updateResult int + updHandle, err := updateOp.Get(ctx) + ts.NoError(err) + ts.NoError(updHandle.Get(ctx, &updateResult)) + ts.Equal(1, updateResult) + + var workflowResult int + ts.NoError(run.Get(ctx, &workflowResult)) + ts.Equal(1, workflowResult) + }) + + ts.Run("sends update-with-start (already running workflow)", func() { + startOptions := startOptionsWithOperation(nil) + run1, err := ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) + ts.NoError(err) + + updateOp := client.NewUpdateWithStartWorkflowOperation( + client.UpdateWorkflowOptions{ + UpdateName: "update", + Args: []any{1}, + WaitForStage: client.WorkflowUpdateStageCompleted, + }) + + startOptions.WithStartOperation = updateOp + startOptions.WorkflowIDConflictPolicy = enumspb.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING + run2, err := ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) + ts.NoError(err) + ts.Equal(run1.GetRunID(), run2.GetRunID()) + + var updateResult int + updHandle, err := updateOp.Get(ctx) + ts.NoError(err) + ts.NoError(updHandle.Get(ctx, &updateResult)) + ts.Equal(1, updateResult) + }) + + ts.Run("sends update-with-start but update is rejected", func() { + updateOp := client.NewUpdateWithStartWorkflowOperation( + client.UpdateWorkflowOptions{ + UpdateName: "update", + Args: []any{-1}, // rejected update payload + WaitForStage: client.WorkflowUpdateStageCompleted, + }) + + startOptions := startOptionsWithOperation(updateOp) + run, err := ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) + ts.NoError(err) + ts.NotNil(run) + + var updateResult int + updHandle, err := updateOp.Get(ctx) + ts.NoError(err) + err = updHandle.Get(ctx, &updateResult) + ts.ErrorContains(err, "addend must be non-negative") + }) + + ts.Run("receives update result in separate goroutines", func() { + updateOp := client.NewUpdateWithStartWorkflowOperation( + client.UpdateWorkflowOptions{ + UpdateName: "update", + Args: []any{1}, + WaitForStage: client.WorkflowUpdateStageAccepted, + }) + + done := make(chan struct{}) + defer func() { <-done }() + go func() { + var updateResult int + updHandle, err := updateOp.Get(ctx) + ts.NoError(err) + ts.NoError(updHandle.Get(ctx, &updateResult)) + ts.Equal(1, updateResult) + done <- struct{}{} + }() + + startOptions := startOptionsWithOperation(updateOp) + _, err := ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) + ts.NoError(err) + + var updateResult int + updHandle, err := updateOp.Get(ctx) + ts.NoError(err) + ts.NoError(updHandle.Get(ctx, &updateResult)) + ts.Equal(1, updateResult) + }) + + ts.Run("fails when start request is invalid", func() { + updateOp := client.NewUpdateWithStartWorkflowOperation( + client.UpdateWorkflowOptions{ + UpdateName: "update", + WaitForStage: client.WorkflowUpdateStageCompleted, + }) + + startOptions := startOptionsWithOperation(updateOp) + startOptions.CronSchedule = "invalid!" + _, err := ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) + ts.Error(err) + }) + + ts.Run("fails when update operation is invalid", func() { + updateOp := client.NewUpdateWithStartWorkflowOperation( + client.UpdateWorkflowOptions{ + // invalid + }) + + startOptions := startOptionsWithOperation(updateOp) + _, err := ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) + ts.ErrorContains(err, "invalid WithStartOperation: WaitForStage must be specified") + + updateOp = client.NewUpdateWithStartWorkflowOperation( + client.UpdateWorkflowOptions{ + RunID: "invalid", + WaitForStage: client.WorkflowUpdateStageCompleted, + }) + + startOptions = startOptionsWithOperation(updateOp) + _, err = ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) + ts.ErrorContains(err, "invalid WithStartOperation: RunID cannot be set because the workflow might not be running") + + updateOp = client.NewUpdateWithStartWorkflowOperation( + client.UpdateWorkflowOptions{ + FirstExecutionRunID: "invalid", + WaitForStage: client.WorkflowUpdateStageCompleted, + }) + + startOptions = startOptionsWithOperation(updateOp) + _, err = ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) + ts.ErrorContains(err, "invalid WithStartOperation: FirstExecutionRunID cannot be set because the workflow might not be running") + + updateOp = client.NewUpdateWithStartWorkflowOperation( + client.UpdateWorkflowOptions{ + UpdateName: "", // invalid + WaitForStage: client.WorkflowUpdateStageCompleted, + }) + + startOptions = startOptionsWithOperation(updateOp) + _, err = ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) + ts.ErrorContains(err, "invalid WithStartOperation: ") // omitting server message intentionally + + updateOp = client.NewUpdateWithStartWorkflowOperation( + client.UpdateWorkflowOptions{ + WorkflowID: "different", // does not match Start's + UpdateName: "update", + WaitForStage: client.WorkflowUpdateStageCompleted, + }) + + startOptions = startOptionsWithOperation(updateOp) + _, err = ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) + ts.ErrorContains(err, "invalid WithStartOperation: ") // omitting server message intentionally + }) + + ts.Run("fails when workflow is already running", func() { + startOptions := startOptionsWithOperation(nil) + _, err := ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) + ts.NoError(err) + + updateOp := client.NewUpdateWithStartWorkflowOperation( + client.UpdateWorkflowOptions{ + UpdateName: "update", + Args: []any{1}, + WaitForStage: client.WorkflowUpdateStageCompleted, + }) + + startOptions.WithStartOperation = updateOp + // NOTE that WorkflowExecutionErrorWhenAlreadyStarted (defaults to false) has no impact + _, err = ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) + ts.ErrorContains(err, "Workflow execution is already running") + }) + + ts.Run("fails when executed twice", func() { + updateOp := client.NewUpdateWithStartWorkflowOperation( + client.UpdateWorkflowOptions{ + UpdateName: "update", + Args: []any{1}, + WaitForStage: client.WorkflowUpdateStageCompleted, + }) + + startOptions := startOptionsWithOperation(updateOp) + _, err := ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) + ts.NoError(err) + + _, err = ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) + ts.ErrorContains(err, "invalid WithStartOperation: was already executed") + }) + + ts.Run("propagates context", func() { + updateOp := client.NewUpdateWithStartWorkflowOperation( + client.UpdateWorkflowOptions{ + UpdateName: "update", + Args: []any{1}, + WaitForStage: client.WorkflowUpdateStageCompleted, + }) + + var propagatedValues []string + ctx := context.Background() + // Propagate values using different context propagators. + ctx = context.WithValue(ctx, contextKey(testContextKey1), "propagatedValue1") + ctx = context.WithValue(ctx, contextKey(testContextKey2), "propagatedValue2") + ctx = context.WithValue(ctx, contextKey(testContextKey3), "non-propagatedValue") + startOptions := startOptionsWithOperation(updateOp) + err := ts.executeWorkflowWithContextAndOption(ctx, startOptions, ts.workflows.ContextPropagator, &propagatedValues, true) + ts.NoError(err) + + // One copy from workflow and one copy from activity * 2 for child workflow + ts.EqualValues([]string{ + "propagatedValue1", "propagatedValue2", "activity_propagatedValue1", "activity_propagatedValue2", + "child_propagatedValue1", "child_propagatedValue2", "child_activity_propagatedValue1", "child_activity_propagatedValue2", + }, propagatedValues) + }) +} + func (ts *IntegrationTestSuite) TestSessionOnWorkerFailure() { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() diff --git a/test/workflow_test.go b/test/workflow_test.go index 3249fd32d..1b7e191b1 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -373,6 +373,30 @@ func (w *Workflows) UpdateInfoWorkflow(ctx workflow.Context) error { return nil } +func (w *Workflows) UpdateEntityWorkflow(ctx workflow.Context) (int, error) { + counter := 0 + + err := workflow.SetUpdateHandlerWithOptions(ctx, "update", func(ctx workflow.Context, add int) (int, error) { + workflow.Sleep(ctx, 1*time.Second) // force separate WFT for accept and complete + counter += add + return counter, nil + }, workflow.UpdateHandlerOptions{ + Validator: func(ctx workflow.Context, i int) error { + if i < 0 { + return fmt.Errorf("addend must be non-negative (%v)", i) + } + return nil + }, + }) + if err != nil { + return 0, err + } + + workflow.Await(ctx, func() bool { return counter >= 1 }) + + return counter, nil +} + func (w *Workflows) UpdateWithValidatorWorkflow(ctx workflow.Context) error { workflow.Go(ctx, func(ctx workflow.Context) { _ = workflow.Sleep(ctx, time.Minute) @@ -3161,6 +3185,7 @@ func (w *Workflows) register(worker worker.Worker) { worker.RegisterWorkflow(w.WorkflowWithLocalActivityStartToCloseTimeout) worker.RegisterWorkflow(w.LocalActivityStaleCache) worker.RegisterWorkflow(w.UpdateInfoWorkflow) + worker.RegisterWorkflow(w.UpdateEntityWorkflow) worker.RegisterWorkflow(w.SignalWorkflow) worker.RegisterWorkflow(w.CronWorkflow) worker.RegisterWorkflow(w.ActivityTimeoutsWorkflow)