Skip to content

Commit

Permalink
Update-with-Start operation (#1579)
Browse files Browse the repository at this point in the history
Adds support for Update-with-Start, using the MultiOperation API (temporalio/api#367).
  • Loading branch information
stephanos authored Aug 29, 2024
1 parent 5364a47 commit e85a098
Show file tree
Hide file tree
Showing 8 changed files with 779 additions and 115 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/docker/dynamic-config-custom.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ frontend.enableUpdateWorkflowExecution:
- value: true
frontend.enableUpdateWorkflowExecutionAsyncAccepted:
- value: true
frontend.enableExecuteMultiOperation:
- value: true
system.enableEagerWorkflowStart:
- value: true
frontend.workerVersioningRuleAPIs:
Expand Down
18 changes: 18 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,16 @@ type (
// StartWorkflowOptions configuration parameters for starting a workflow execution.
StartWorkflowOptions = internal.StartWorkflowOptions

// WithStartWorkflowOperation is a type of operation that can be executed as part of a workflow start.
// For example, use NewUpdateWithStartWorkflowOperation to perform Update-with-Start.
// NOTE: Experimental
WithStartWorkflowOperation = internal.WithStartWorkflowOperation

// UpdateWithStartWorkflowOperation is used to perform Update-with-Start.
// See NewUpdateWithStartWorkflowOperation for details.
// NOTE: Experimental
UpdateWithStartWorkflowOperation = internal.UpdateWithStartWorkflowOperation

// HistoryEventIterator is a iterator which can return history events.
HistoryEventIterator = internal.HistoryEventIterator

Expand Down Expand Up @@ -921,6 +931,14 @@ type MetricsTimer = metrics.Timer
// MetricsNopHandler is a noop handler that does nothing with the metrics.
var MetricsNopHandler = metrics.NopHandler

// NewUpdateWithStartWorkflowOperation returns an UpdateWithStartWorkflowOperation to perform Update-with-Start.
// After executing Client.ExecuteWorkflow with the UpdateWithStartWorkflow in the start options,
// the update result can be obtained.
// NOTE: Experimental
func NewUpdateWithStartWorkflowOperation(options UpdateWorkflowOptions) *UpdateWithStartWorkflowOperation {
return internal.NewUpdateWithStartWorkflowOperation(options)
}

// Dial creates an instance of a workflow client. This will attempt to connect
// to the server eagerly and will return an error if the server is not
// available.
Expand Down
81 changes: 79 additions & 2 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ package internal
import (
"context"
"crypto/tls"
"errors"
"fmt"
"sync/atomic"
"time"
Expand Down Expand Up @@ -643,9 +644,23 @@ type (
// Optional: defaulted to Fail.
WorkflowIDConflictPolicy enumspb.WorkflowIdConflictPolicy

// WithStartOperation - Operation to execute with Workflow Start.
// For example, see NewUpdateWithStartWorkflowOperation to perform Update-with-Start. Note that if the workflow is
// already running and WorkflowIDConflictPolicy is set to UseExisting, the start is skipped and only the
// operation is executed. If instead the policy is set to Fail (the default), nothing is executed and
// an error will be returned (i.e. the option WorkflowExecutionErrorWhenAlreadyStarted is ignored).
// This option will be ignored when used with Client.SignalWithStartWorkflow.
//
// Optional: defaults to nil.
//
// NOTE: Experimental
WithStartOperation WithStartWorkflowOperation

// When WorkflowExecutionErrorWhenAlreadyStarted is true, Client.ExecuteWorkflow will return an error if the
// workflow id has already been used and WorkflowIDReusePolicy would disallow a re-run. If it is set to false,
// rather than erroring a WorkflowRun instance representing the current or last run will be returned.
// workflow id has already been used and WorkflowIDReusePolicy or WorkflowIDConflictPolicy would
// disallow a re-run. If it is set to false, rather than erroring a WorkflowRun instance representing
// the current or last run will be returned. However, when WithStartOperation is set, this field is ignored and
// the WorkflowIDConflictPolicy UseExisting must be used instead to prevent erroring.
//
// Optional: defaults to false
WorkflowExecutionErrorWhenAlreadyStarted bool
Expand Down Expand Up @@ -714,6 +729,24 @@ type (
links []*commonpb.Link
}

// WithStartWorkflowOperation is a type of operation that can be executed as part of a workflow start.
WithStartWorkflowOperation interface {
isWithStartWorkflowOperation()
}

// UpdateWithStartWorkflowOperation is used to perform Update-with-Start.
// See NewUpdateWithStartWorkflowOperation for details.
UpdateWithStartWorkflowOperation struct {
input *ClientUpdateWorkflowInput
// flag to ensure the operation is only executed once
executed atomic.Bool
// channel to indicate that handle or err is available
doneCh chan struct{}
// handle and err cannot be accessed before doneCh is closed
handle WorkflowUpdateHandle
err error
}

// RetryPolicy defines the retry policy.
// Note that the history of activity with retry policy will be different: the started event will be written down into
// history only when the activity completes or "finally" timeouts/fails. And the started event only records the last
Expand Down Expand Up @@ -1004,6 +1037,50 @@ func DialCloudOperationsClient(ctx context.Context, options CloudOperationsClien
}, nil
}

// NewUpdateWithStartWorkflowOperation returns an UpdateWithStartWorkflowOperation that can be used to perform Update-with-Start.
func NewUpdateWithStartWorkflowOperation(options UpdateWorkflowOptions) *UpdateWithStartWorkflowOperation {
res := &UpdateWithStartWorkflowOperation{doneCh: make(chan struct{})}

input, err := createUpdateWorkflowInput(options)
if err != nil {
res.set(nil, err)
} else if options.RunID != "" {
res.set(nil, errors.New("RunID cannot be set because the workflow might not be running"))
}
if options.FirstExecutionRunID != "" {
res.set(nil, errors.New("FirstExecutionRunID cannot be set because the workflow might not be running"))
} else {
res.input = input
}

return res
}

// Get blocks until a server response has been received; or the context deadline is exceeded.
func (op *UpdateWithStartWorkflowOperation) Get(ctx context.Context) (WorkflowUpdateHandle, error) {
select {
case <-op.doneCh:
return op.handle, op.err
case <-ctx.Done():
return nil, ctx.Err()
}
}

func (op *UpdateWithStartWorkflowOperation) markExecuted() error {
if op.executed.Swap(true) {
return fmt.Errorf("was already executed")
}
return nil
}

func (op *UpdateWithStartWorkflowOperation) set(handle WorkflowUpdateHandle, err error) {
op.handle = handle
op.err = err
close(op.doneCh)
}

func (op *UpdateWithStartWorkflowOperation) isWithStartWorkflowOperation() {}

// NewNamespaceClient creates an instance of a namespace client, to manager lifecycle of namespaces.
func NewNamespaceClient(options ClientOptions) (NamespaceClient, error) {
// Initialize root tags
Expand Down
4 changes: 3 additions & 1 deletion internal/cmd/build/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ import (

_ "github.com/BurntSushi/toml"
_ "github.com/kisielk/errcheck/errcheck"
_ "honnef.co/go/tools/staticcheck"

"go.temporal.io/sdk/client"
"go.temporal.io/sdk/testsuite"
_ "honnef.co/go/tools/staticcheck"
)

func main() {
Expand Down Expand Up @@ -145,6 +146,7 @@ func (b *builder) integrationTest() error {
},
LogLevel: "warn",
ExtraArgs: []string{
"--dynamic-config-value", "frontend.enableExecuteMultiOperation=true",
"--dynamic-config-value", "frontend.enableUpdateWorkflowExecution=true",
"--dynamic-config-value", "frontend.enableUpdateWorkflowExecutionAsyncAccepted=true",
"--dynamic-config-value", "frontend.workerVersioningRuleAPIs=true",
Expand Down
Loading

0 comments on commit e85a098

Please sign in to comment.