From 4a548b30177a39967442c8276173b9947d47d5ca Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Sat, 15 Jul 2023 14:03:27 -0700 Subject: [PATCH 1/3] Add support for update info --- internal/interceptor.go | 5 ++++ internal/interceptor_base.go | 5 ++++ internal/internal_event_handlers.go | 8 +++--- internal/internal_event_handlers_test.go | 5 +++- internal/internal_update.go | 13 +++++++--- internal/internal_update_test.go | 22 ++++++++-------- internal/internal_worker_base.go | 2 +- internal/internal_workflow.go | 5 ++-- internal/internal_workflow_testsuite.go | 8 +++--- internal/workflow.go | 19 ++++++++++++++ internal/workflow_testsuite.go | 4 +-- test/integration_test.go | 32 ++++++++++++++++++++++++ test/workflow_test.go | 19 ++++++++++++++ workflow/workflow.go | 9 +++++++ 14 files changed, 127 insertions(+), 29 deletions(-) diff --git a/internal/interceptor.go b/internal/interceptor.go index b0e30a3d0..f239c3d54 100644 --- a/internal/interceptor.go +++ b/internal/interceptor.go @@ -186,6 +186,11 @@ type WorkflowOutboundInterceptor interface { // GetInfo intercepts workflow.GetInfo. GetInfo(ctx Context) *WorkflowInfo + // GetUpdateInfo intercepts workflow.GetUpdateInfo. + // + // NOTE: Experimental + GetUpdateInfo(ctx Context) *UpdateInfo + // GetLogger intercepts workflow.GetLogger. GetLogger(ctx Context) log.Logger diff --git a/internal/interceptor_base.go b/internal/interceptor_base.go index 519913d05..458a9f4c6 100644 --- a/internal/interceptor_base.go +++ b/internal/interceptor_base.go @@ -218,6 +218,11 @@ func (w *WorkflowOutboundInterceptorBase) GetInfo(ctx Context) *WorkflowInfo { return w.Next.GetInfo(ctx) } +// GetUpdateInfo implements WorkflowOutboundInterceptor.GetUpdateInfo. +func (w *WorkflowOutboundInterceptorBase) GetUpdateInfo(ctx Context) *UpdateInfo { + return w.Next.GetUpdateInfo(ctx) +} + // GetLogger implements WorkflowOutboundInterceptor.GetLogger. func (w *WorkflowOutboundInterceptorBase) GetLogger(ctx Context) log.Logger { return w.Next.GetLogger(ctx) diff --git a/internal/internal_event_handlers.go b/internal/internal_event_handlers.go index 4c8b8c2c3..fcd34fd77 100644 --- a/internal/internal_event_handlers.go +++ b/internal/internal_event_handlers.go @@ -153,7 +153,7 @@ type ( cancelHandler func() // A cancel handler to be invoked on a cancel notification signalHandler func(name string, input *commonpb.Payloads, header *commonpb.Header) error // A signal handler to be invoked on a signal event queryHandler func(queryType string, queryArgs *commonpb.Payloads, header *commonpb.Header) (*commonpb.Payloads, error) - updateHandler func(name string, args *commonpb.Payloads, header *commonpb.Header, callbacks UpdateCallbacks) + updateHandler func(name string, ID string, args *commonpb.Payloads, header *commonpb.Header, callbacks UpdateCallbacks) logger log.Logger isReplay bool // flag to indicate if workflow is in replay mode @@ -323,8 +323,8 @@ func (wc *workflowEnvironmentImpl) takeOutgoingMessages() []*protocolpb.Message return retval } -func (wc *workflowEnvironmentImpl) ScheduleUpdate(name string, args *commonpb.Payloads, hdr *commonpb.Header, callbacks UpdateCallbacks) { - wc.updateHandler(name, args, hdr, callbacks) +func (wc *workflowEnvironmentImpl) ScheduleUpdate(name string, ID string, args *commonpb.Payloads, hdr *commonpb.Header, callbacks UpdateCallbacks) { + wc.updateHandler(name, ID, args, hdr, callbacks) } func withExpectedEventPredicate(pred func(*historypb.HistoryEvent) bool) msgSendOpt { @@ -577,7 +577,7 @@ func (wc *workflowEnvironmentImpl) RegisterQueryHandler( } func (wc *workflowEnvironmentImpl) RegisterUpdateHandler( - handler func(string, *commonpb.Payloads, *commonpb.Header, UpdateCallbacks), + handler func(string, string, *commonpb.Payloads, *commonpb.Header, UpdateCallbacks), ) { wc.updateHandler = handler } diff --git a/internal/internal_event_handlers_test.go b/internal/internal_event_handlers_test.go index a08b2b965..78ad10bcc 100644 --- a/internal/internal_event_handlers_test.go +++ b/internal/internal_event_handlers_test.go @@ -430,14 +430,16 @@ func TestUpdateEvents(t *testing.T) { var ( gotName string + gotID string gotArgs *commonpb.Payloads gotHeader *commonpb.Header ) weh := &workflowExecutionEventHandlerImpl{ workflowEnvironmentImpl: &workflowEnvironmentImpl{ - updateHandler: func(name string, args *commonpb.Payloads, header *commonpb.Header, cb UpdateCallbacks) { + updateHandler: func(name string, ID string, args *commonpb.Payloads, header *commonpb.Header, cb UpdateCallbacks) { gotName = name + gotID = ID gotArgs = args gotHeader = header }, @@ -468,6 +470,7 @@ func TestUpdateEvents(t *testing.T) { require.NoError(t, err) require.Equal(t, input.Name, gotName) + require.Equal(t, t.Name()+"-id", gotID) require.True(t, proto.Equal(input.Header, gotHeader)) require.True(t, proto.Equal(input.Args, gotArgs)) diff --git a/internal/internal_update.go b/internal/internal_update.go index 3feb0456e..14ed69e5f 100644 --- a/internal/internal_update.go +++ b/internal/internal_update.go @@ -88,13 +88,13 @@ type ( // updateProtocol wraps an updateEnv and some protocol metadata to // implement the UpdateCallbacks abstraction. It handles callbacks by - // sending protocol lmessages. + // sending protocol messages. updateProtocol struct { protoInstanceID string clientIdentity string requestMsgID string requestSeqID int64 - scheduleUpdate func(name string, args *commonpb.Payloads, header *commonpb.Header, callbacks UpdateCallbacks) + scheduleUpdate func(name string, ID string, args *commonpb.Payloads, header *commonpb.Header, callbacks UpdateCallbacks) env updateEnv state updateState } @@ -114,7 +114,7 @@ type ( // update callbacks. func newUpdateProtocol( protoInstanceID string, - scheduleUpdate func(name string, args *commonpb.Payloads, header *commonpb.Header, callbacks UpdateCallbacks), + scheduleUpdate func(name string, ID string, args *commonpb.Payloads, header *commonpb.Header, callbacks UpdateCallbacks), env updateEnv, ) *updateProtocol { return &updateProtocol{ @@ -143,7 +143,7 @@ func (up *updateProtocol) HandleMessage(msg *protocolpb.Message) error { up.requestMsgID = msg.GetId() up.requestSeqID = msg.GetEventId() input := req.GetInput() - up.scheduleUpdate(input.GetName(), input.GetArgs(), input.GetHeader(), up) + up.scheduleUpdate(input.GetName(), req.GetMeta().GetUpdateId(), input.GetArgs(), input.GetHeader(), up) up.state = updateStateRequestInitiated return nil } @@ -241,6 +241,7 @@ func (up *updateProtocol) checkAcceptedEvent(e *historypb.HistoryEvent) bool { func defaultUpdateHandler( rootCtx Context, name string, + ID string, serializedArgs *commonpb.Payloads, header *commonpb.Header, callbacks UpdateCallbacks, @@ -253,6 +254,10 @@ func defaultUpdateHandler( return } scheduler.Spawn(ctx, name, func(ctx Context) { + ctx = WithValue(ctx, updateInfoContextKey, &UpdateInfo{ + ID: ID, + }) + eo := getWorkflowEnvOptions(ctx) // If we suspect that handler registration has not occurred (e.g. diff --git a/internal/internal_update_test.go b/internal/internal_update_test.go index 5ae2ac951..3287841ac 100644 --- a/internal/internal_update_test.go +++ b/internal/internal_update_test.go @@ -203,7 +203,7 @@ func TestDefaultUpdateHandler(t *testing.T) { UpdateHandlerOptions{}, ) var rejectErr error - defaultUpdateHandler(ctx, "will_not_be_found", args, hdr, &testUpdateCallbacks{ + defaultUpdateHandler(ctx, "will_not_be_found", "testID", args, hdr, &testUpdateCallbacks{ RejectImpl: func(err error) { rejectErr = err }, }, runOnCallingThread) require.ErrorContains(t, rejectErr, "unknown update") @@ -221,7 +221,7 @@ func TestDefaultUpdateHandler(t *testing.T) { ) junkArgs := &commonpb.Payloads{Payloads: []*commonpb.Payload{&commonpb.Payload{}}} var rejectErr error - defaultUpdateHandler(ctx, t.Name(), junkArgs, hdr, &testUpdateCallbacks{ + defaultUpdateHandler(ctx, t.Name(), "testID", junkArgs, hdr, &testUpdateCallbacks{ RejectImpl: func(err error) { rejectErr = err }, }, runOnCallingThread) require.ErrorContains(t, rejectErr, "unable to decode") @@ -238,7 +238,7 @@ func TestDefaultUpdateHandler(t *testing.T) { UpdateHandlerOptions{Validator: validatorFunc}, ) var rejectErr error - defaultUpdateHandler(ctx, t.Name(), args, hdr, &testUpdateCallbacks{ + defaultUpdateHandler(ctx, t.Name(), "testID", args, hdr, &testUpdateCallbacks{ RejectImpl: func(err error) { rejectErr = err }, }, runOnCallingThread) require.Equal(t, validatorFunc(ctx, argStr), rejectErr) @@ -252,7 +252,7 @@ func TestDefaultUpdateHandler(t *testing.T) { accepted bool result interface{} ) - defaultUpdateHandler(ctx, t.Name(), args, hdr, &testUpdateCallbacks{ + defaultUpdateHandler(ctx, t.Name(), "testID", args, hdr, &testUpdateCallbacks{ AcceptImpl: func() { accepted = true }, CompleteImpl: func(success interface{}, err error) { resultErr = err @@ -272,7 +272,7 @@ func TestDefaultUpdateHandler(t *testing.T) { accepted bool result interface{} ) - defaultUpdateHandler(ctx, t.Name(), args, hdr, &testUpdateCallbacks{ + defaultUpdateHandler(ctx, t.Name(), "testID", args, hdr, &testUpdateCallbacks{ AcceptImpl: func() { accepted = true }, CompleteImpl: func(success interface{}, err error) { resultErr = err @@ -323,7 +323,7 @@ func TestDefaultUpdateHandler(t *testing.T) { mustSetUpdateHandler(t, ctx, t.Name(), updateFunc, UpdateHandlerOptions{}) }, } - defaultUpdateHandler(ctx, t.Name(), args, hdr, &testUpdateCallbacks{ + defaultUpdateHandler(ctx, t.Name(), "testID", args, hdr, &testUpdateCallbacks{ RejectImpl: func(err error) { rejectErr = err }, AcceptImpl: func() { accepted = true }, CompleteImpl: func(success interface{}, err error) { @@ -344,7 +344,7 @@ func TestDefaultUpdateHandler(t *testing.T) { func TestInvalidUpdateStateTransitions(t *testing.T) { // these would all reflect programming errors so we expect panics - stubUpdateHandler := func(string, *commonpb.Payloads, *commonpb.Header, UpdateCallbacks) {} + stubUpdateHandler := func(string, string, *commonpb.Payloads, *commonpb.Header, UpdateCallbacks) {} requestMsg := protocolpb.Message{ Id: t.Name() + "-id", ProtocolInstanceId: t.Name() + "-proto-id", @@ -412,8 +412,8 @@ func TestInvalidUpdateStateTransitions(t *testing.T) { } func TestCompletedEventPredicate(t *testing.T) { - updateID := t.Name() + "-updaet-id" - stubUpdateHandler := func(string, *commonpb.Payloads, *commonpb.Header, UpdateCallbacks) {} + updateID := t.Name() + "-update-id" + stubUpdateHandler := func(string, string, *commonpb.Payloads, *commonpb.Header, UpdateCallbacks) {} requestMsg := protocolpb.Message{ Id: t.Name() + "-id", ProtocolInstanceId: updateID, @@ -450,10 +450,10 @@ func TestCompletedEventPredicate(t *testing.T) { } func TestAcceptedEventPredicate(t *testing.T) { - updateID := t.Name() + "-updaet-id" + updateID := t.Name() + "-update-id" requestMsgID := t.Name() + "request-msg-id" requestSeqID := int64(1234) - stubUpdateHandler := func(string, *commonpb.Payloads, *commonpb.Header, UpdateCallbacks) {} + stubUpdateHandler := func(string, string, *commonpb.Payloads, *commonpb.Header, UpdateCallbacks) {} request := updatepb.Request{ Meta: &updatepb.Meta{UpdateId: updateID}, } diff --git a/internal/internal_worker_base.go b/internal/internal_worker_base.go index 00f6ec7f3..1b7dfbbb0 100644 --- a/internal/internal_worker_base.go +++ b/internal/internal_worker_base.go @@ -112,7 +112,7 @@ type ( handler func(queryType string, queryArgs *commonpb.Payloads, header *commonpb.Header) (*commonpb.Payloads, error), ) RegisterUpdateHandler( - handler func(string, *commonpb.Payloads, *commonpb.Header, UpdateCallbacks), + handler func(string, string, *commonpb.Payloads, *commonpb.Header, UpdateCallbacks), ) IsReplaying() bool MutableSideEffect(id string, f func() interface{}, equals func(a, b interface{}) bool) converter.EncodedValue diff --git a/internal/internal_workflow.go b/internal/internal_workflow.go index c2e226766..b87fda30d 100644 --- a/internal/internal_workflow.go +++ b/internal/internal_workflow.go @@ -260,6 +260,7 @@ const ( workflowResultContextKey = "workflowResult" coroutinesContextKey = "coroutines" workflowEnvOptionsContextKey = "wfEnvOptions" + updateInfoContextKey = "updateInfo" ) // Assert that structs do indeed implement the interfaces @@ -541,8 +542,8 @@ func (d *syncWorkflowDefinition) Execute(env WorkflowEnvironment, header *common ) getWorkflowEnvironment(d.rootCtx).RegisterUpdateHandler( - func(name string, serializedArgs *commonpb.Payloads, header *commonpb.Header, callbacks UpdateCallbacks) { - defaultUpdateHandler(d.rootCtx, name, serializedArgs, header, callbacks, coroScheduler{d.dispatcher}) + func(name string, ID string, serializedArgs *commonpb.Payloads, header *commonpb.Header, callbacks UpdateCallbacks) { + defaultUpdateHandler(d.rootCtx, name, ID, serializedArgs, header, callbacks, coroScheduler{d.dispatcher}) }) getWorkflowEnvironment(d.rootCtx).RegisterQueryHandler( diff --git a/internal/internal_workflow_testsuite.go b/internal/internal_workflow_testsuite.go index 09d221d09..cd2e027ce 100644 --- a/internal/internal_workflow_testsuite.go +++ b/internal/internal_workflow_testsuite.go @@ -185,7 +185,7 @@ type ( workflowCancelHandler func() signalHandler func(name string, input *commonpb.Payloads, header *commonpb.Header) error queryHandler func(string, *commonpb.Payloads, *commonpb.Header) (*commonpb.Payloads, error) - updateHandler func(name string, input *commonpb.Payloads, header *commonpb.Header, resp UpdateCallbacks) + updateHandler func(name string, ID string, input *commonpb.Payloads, header *commonpb.Header, resp UpdateCallbacks) startedHandler func(r WorkflowExecution, e error) isWorkflowCompleted bool @@ -2021,7 +2021,7 @@ func (env *testWorkflowEnvironmentImpl) RegisterSignalHandler( } func (env *testWorkflowEnvironmentImpl) RegisterUpdateHandler( - handler func(name string, input *commonpb.Payloads, header *commonpb.Header, resp UpdateCallbacks), + handler func(name string, ID string, input *commonpb.Payloads, header *commonpb.Header, resp UpdateCallbacks), ) { env.updateHandler = handler } @@ -2361,12 +2361,12 @@ func (env *testWorkflowEnvironmentImpl) queryWorkflow(queryType string, args ... return newEncodedValue(blob, env.GetDataConverter()), nil } -func (env *testWorkflowEnvironmentImpl) updateWorkflow(name string, uc UpdateCallbacks, args ...interface{}) { +func (env *testWorkflowEnvironmentImpl) updateWorkflow(name string, ID string, uc UpdateCallbacks, args ...interface{}) { data, err := encodeArgs(env.GetDataConverter(), args) if err != nil { panic(err) } - env.updateHandler(name, data, nil, uc) + env.updateHandler(name, ID, data, nil, uc) } func (env *testWorkflowEnvironmentImpl) queryWorkflowByID(workflowID, queryType string, args ...interface{}) (converter.EncodedValue, error) { diff --git a/internal/workflow.go b/internal/workflow.go index 70f873ec8..b2c9e00e6 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -993,6 +993,11 @@ type WorkflowInfo struct { currentHistoryLength int } +// UpdateInfo information about a currently running update +type UpdateInfo struct { + ID string +} + // GetBinaryChecksum return binary checksum. func (wInfo *WorkflowInfo) GetBinaryChecksum() string { if wInfo.BinaryChecksum == "" { @@ -1017,6 +1022,20 @@ func (wc *workflowEnvironmentInterceptor) GetInfo(ctx Context) *WorkflowInfo { return wc.env.WorkflowInfo() } +// GetUpdateInfo extracts info of a currently running update from a context. +func GetUpdateInfo(ctx Context) *UpdateInfo { + i := getWorkflowOutboundInterceptor(ctx) + return i.GetUpdateInfo(ctx) +} + +func (wc *workflowEnvironmentInterceptor) GetUpdateInfo(ctx Context) *UpdateInfo { + uc := ctx.Value(updateInfoContextKey) + if uc == nil { + panic("getWorkflowOutboundInterceptor: No update associated with this context") + } + return uc.(*UpdateInfo) +} + // GetLogger returns a logger to be used in workflow's context func GetLogger(ctx Context) log.Logger { i := getWorkflowOutboundInterceptor(ctx) diff --git a/internal/workflow_testsuite.go b/internal/workflow_testsuite.go index 89ec5f586..a4bb8fb09 100644 --- a/internal/workflow_testsuite.go +++ b/internal/workflow_testsuite.go @@ -828,8 +828,8 @@ func (e *TestWorkflowEnvironment) QueryWorkflow(queryType string, args ...interf return e.impl.queryWorkflow(queryType, args...) } -func (e *TestWorkflowEnvironment) UpdateWorkflow(name string, uc UpdateCallbacks, args ...interface{}) { - e.impl.updateWorkflow(name, uc, args...) +func (e *TestWorkflowEnvironment) UpdateWorkflow(name string, ID string, uc UpdateCallbacks, args ...interface{}) { + e.impl.updateWorkflow(name, ID, uc, args...) } // QueryWorkflowByID queries a child workflow by its ID and returns the result synchronously diff --git a/test/integration_test.go b/test/integration_test.go index 39efc967e..e7ce822ab 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -1221,6 +1221,38 @@ func (ts *IntegrationTestSuite) TestInspectLocalActivityInfoLocalActivityWorkerO ts.Nil(err) } +func (ts *IntegrationTestSuite) TestUpdateInfo() { + ctx := context.Background() + run, err := ts.client.ExecuteWorkflow(ctx, + ts.startWorkflowOptions("test-update-info"), ts.workflows.UpdateInfoWorkflow) + ts.Nil(err) + // Send an update request with a know update ID + handler, err := ts.client.UpdateWorkflowWithOptions(ctx, &client.UpdateWorkflowWithOptionsRequest{ + UpdateID: "testID", + WorkflowID: run.GetID(), + RunID: run.GetRunID(), + UpdateName: "update", + }) + ts.NoError(err) + // Verify the upate handler can access the update info and return the updateID + var result string + ts.NoError(handler.Get(ctx, &result)) + ts.Equal("testID", result) + // Test the update validator can also use the update info + handler, err = ts.client.UpdateWorkflowWithOptions(ctx, &client.UpdateWorkflowWithOptionsRequest{ + UpdateID: "notTestID", + WorkflowID: run.GetID(), + RunID: run.GetRunID(), + UpdateName: "update", + }) + ts.NoError(err) + err = handler.Get(ctx, nil) + ts.Error(err) + // complete workflow + ts.NoError(ts.client.SignalWorkflow(ctx, run.GetID(), run.GetRunID(), "finish", "finished")) + ts.NoError(run.Get(ctx, nil)) +} + func (ts *IntegrationTestSuite) TestBasicSession() { var expected []string err := ts.executeWorkflow("test-basic-session", ts.workflows.BasicSession, &expected) diff --git a/test/workflow_test.go b/test/workflow_test.go index 06ce466a6..cb3524ece 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -305,6 +305,24 @@ func (w *Workflows) ActivityRetryOnHBTimeout(ctx workflow.Context) ([]string, er return []string{"heartbeatAndSleep", "heartbeatAndSleep", "heartbeatAndSleep"}, nil } +func (w *Workflows) UpdateInfoWorkflow(ctx workflow.Context) error { + err := workflow.SetUpdateHandlerWithOptions(ctx, "update", func(ctx workflow.Context) (string, error) { + return workflow.GetUpdateInfo(ctx).ID, nil + }, workflow.UpdateHandlerOptions{ + Validator: func(ctx workflow.Context) error { + if workflow.GetUpdateInfo(ctx).ID != "testID" { + return errors.New("invalid update ID") + } + return nil + }, + }) + if err != nil { + return errors.New("failed to register update handler") + } + workflow.GetSignalChannel(ctx, "finish").Receive(ctx, nil) + return nil +} + func (w *Workflows) ActivityHeartbeatWithRetry(ctx workflow.Context) (heartbeatCounts int, err error) { // Make retries fast opts := w.defaultActivityOptions() @@ -2268,6 +2286,7 @@ func (w *Workflows) register(worker worker.Worker) { worker.RegisterWorkflow(w.WorkflowWithParallelSideEffects) worker.RegisterWorkflow(w.WorkflowWithParallelMutableSideEffects) worker.RegisterWorkflow(w.LocalActivityStaleCache) + worker.RegisterWorkflow(w.UpdateInfoWorkflow) worker.RegisterWorkflow(w.SignalWorkflow) worker.RegisterWorkflow(w.CronWorkflow) worker.RegisterWorkflow(w.CancelTimerConcurrentWithOtherCommandWorkflow) diff --git a/workflow/workflow.go b/workflow/workflow.go index d78d1d729..a808c6779 100644 --- a/workflow/workflow.go +++ b/workflow/workflow.go @@ -56,6 +56,11 @@ type ( // Info information about currently executing workflow Info = internal.WorkflowInfo + // UpdateInfo information about a currently running update + // + // NOTE: Experimental + UpdateInfo = internal.UpdateInfo + // ContinueAsNewError can be returned by a workflow implementation function and indicates that // the workflow should continue as new with the same WorkflowID, but new RunID and new history. ContinueAsNewError = internal.ContinueAsNewError @@ -192,6 +197,10 @@ func GetInfo(ctx Context) *Info { return internal.GetWorkflowInfo(ctx) } +func GetUpdateInfo(ctx Context) *UpdateInfo { + return internal.GetUpdateInfo(ctx) +} + // GetLogger returns a logger to be used in workflow's context func GetLogger(ctx Context) log.Logger { return internal.GetLogger(ctx) From ff6b20a09c63b45a2cf2e64ffcf8ad25f0a0b5ed Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Mon, 17 Jul 2023 06:56:19 -0700 Subject: [PATCH 2/3] fix ID naming --- internal/internal_event_handlers.go | 6 +++--- internal/internal_event_handlers_test.go | 4 ++-- internal/internal_update.go | 8 ++++---- internal/internal_workflow.go | 4 ++-- internal/internal_workflow_testsuite.go | 8 ++++---- internal/workflow_testsuite.go | 4 ++-- 6 files changed, 17 insertions(+), 17 deletions(-) diff --git a/internal/internal_event_handlers.go b/internal/internal_event_handlers.go index fcd34fd77..fc9c92fc4 100644 --- a/internal/internal_event_handlers.go +++ b/internal/internal_event_handlers.go @@ -153,7 +153,7 @@ type ( cancelHandler func() // A cancel handler to be invoked on a cancel notification signalHandler func(name string, input *commonpb.Payloads, header *commonpb.Header) error // A signal handler to be invoked on a signal event queryHandler func(queryType string, queryArgs *commonpb.Payloads, header *commonpb.Header) (*commonpb.Payloads, error) - updateHandler func(name string, ID string, args *commonpb.Payloads, header *commonpb.Header, callbacks UpdateCallbacks) + updateHandler func(name string, id string, args *commonpb.Payloads, header *commonpb.Header, callbacks UpdateCallbacks) logger log.Logger isReplay bool // flag to indicate if workflow is in replay mode @@ -323,8 +323,8 @@ func (wc *workflowEnvironmentImpl) takeOutgoingMessages() []*protocolpb.Message return retval } -func (wc *workflowEnvironmentImpl) ScheduleUpdate(name string, ID string, args *commonpb.Payloads, hdr *commonpb.Header, callbacks UpdateCallbacks) { - wc.updateHandler(name, ID, args, hdr, callbacks) +func (wc *workflowEnvironmentImpl) ScheduleUpdate(name string, id string, args *commonpb.Payloads, hdr *commonpb.Header, callbacks UpdateCallbacks) { + wc.updateHandler(name, id, args, hdr, callbacks) } func withExpectedEventPredicate(pred func(*historypb.HistoryEvent) bool) msgSendOpt { diff --git a/internal/internal_event_handlers_test.go b/internal/internal_event_handlers_test.go index 78ad10bcc..0f9bb21a0 100644 --- a/internal/internal_event_handlers_test.go +++ b/internal/internal_event_handlers_test.go @@ -437,9 +437,9 @@ func TestUpdateEvents(t *testing.T) { weh := &workflowExecutionEventHandlerImpl{ workflowEnvironmentImpl: &workflowEnvironmentImpl{ - updateHandler: func(name string, ID string, args *commonpb.Payloads, header *commonpb.Header, cb UpdateCallbacks) { + updateHandler: func(name string, id string, args *commonpb.Payloads, header *commonpb.Header, cb UpdateCallbacks) { gotName = name - gotID = ID + gotID = id gotArgs = args gotHeader = header }, diff --git a/internal/internal_update.go b/internal/internal_update.go index 14ed69e5f..12e9f2470 100644 --- a/internal/internal_update.go +++ b/internal/internal_update.go @@ -94,7 +94,7 @@ type ( clientIdentity string requestMsgID string requestSeqID int64 - scheduleUpdate func(name string, ID string, args *commonpb.Payloads, header *commonpb.Header, callbacks UpdateCallbacks) + scheduleUpdate func(name string, id string, args *commonpb.Payloads, header *commonpb.Header, callbacks UpdateCallbacks) env updateEnv state updateState } @@ -114,7 +114,7 @@ type ( // update callbacks. func newUpdateProtocol( protoInstanceID string, - scheduleUpdate func(name string, ID string, args *commonpb.Payloads, header *commonpb.Header, callbacks UpdateCallbacks), + scheduleUpdate func(name string, id string, args *commonpb.Payloads, header *commonpb.Header, callbacks UpdateCallbacks), env updateEnv, ) *updateProtocol { return &updateProtocol{ @@ -241,7 +241,7 @@ func (up *updateProtocol) checkAcceptedEvent(e *historypb.HistoryEvent) bool { func defaultUpdateHandler( rootCtx Context, name string, - ID string, + id string, serializedArgs *commonpb.Payloads, header *commonpb.Header, callbacks UpdateCallbacks, @@ -255,7 +255,7 @@ func defaultUpdateHandler( } scheduler.Spawn(ctx, name, func(ctx Context) { ctx = WithValue(ctx, updateInfoContextKey, &UpdateInfo{ - ID: ID, + ID: id, }) eo := getWorkflowEnvOptions(ctx) diff --git a/internal/internal_workflow.go b/internal/internal_workflow.go index b87fda30d..30ba7aad8 100644 --- a/internal/internal_workflow.go +++ b/internal/internal_workflow.go @@ -542,8 +542,8 @@ func (d *syncWorkflowDefinition) Execute(env WorkflowEnvironment, header *common ) getWorkflowEnvironment(d.rootCtx).RegisterUpdateHandler( - func(name string, ID string, serializedArgs *commonpb.Payloads, header *commonpb.Header, callbacks UpdateCallbacks) { - defaultUpdateHandler(d.rootCtx, name, ID, serializedArgs, header, callbacks, coroScheduler{d.dispatcher}) + func(name string, id string, serializedArgs *commonpb.Payloads, header *commonpb.Header, callbacks UpdateCallbacks) { + defaultUpdateHandler(d.rootCtx, name, id, serializedArgs, header, callbacks, coroScheduler{d.dispatcher}) }) getWorkflowEnvironment(d.rootCtx).RegisterQueryHandler( diff --git a/internal/internal_workflow_testsuite.go b/internal/internal_workflow_testsuite.go index cd2e027ce..4d7b368ec 100644 --- a/internal/internal_workflow_testsuite.go +++ b/internal/internal_workflow_testsuite.go @@ -185,7 +185,7 @@ type ( workflowCancelHandler func() signalHandler func(name string, input *commonpb.Payloads, header *commonpb.Header) error queryHandler func(string, *commonpb.Payloads, *commonpb.Header) (*commonpb.Payloads, error) - updateHandler func(name string, ID string, input *commonpb.Payloads, header *commonpb.Header, resp UpdateCallbacks) + updateHandler func(name string, id string, input *commonpb.Payloads, header *commonpb.Header, resp UpdateCallbacks) startedHandler func(r WorkflowExecution, e error) isWorkflowCompleted bool @@ -2021,7 +2021,7 @@ func (env *testWorkflowEnvironmentImpl) RegisterSignalHandler( } func (env *testWorkflowEnvironmentImpl) RegisterUpdateHandler( - handler func(name string, ID string, input *commonpb.Payloads, header *commonpb.Header, resp UpdateCallbacks), + handler func(name string, id string, input *commonpb.Payloads, header *commonpb.Header, resp UpdateCallbacks), ) { env.updateHandler = handler } @@ -2361,12 +2361,12 @@ func (env *testWorkflowEnvironmentImpl) queryWorkflow(queryType string, args ... return newEncodedValue(blob, env.GetDataConverter()), nil } -func (env *testWorkflowEnvironmentImpl) updateWorkflow(name string, ID string, uc UpdateCallbacks, args ...interface{}) { +func (env *testWorkflowEnvironmentImpl) updateWorkflow(name string, id string, uc UpdateCallbacks, args ...interface{}) { data, err := encodeArgs(env.GetDataConverter(), args) if err != nil { panic(err) } - env.updateHandler(name, ID, data, nil, uc) + env.updateHandler(name, id, data, nil, uc) } func (env *testWorkflowEnvironmentImpl) queryWorkflowByID(workflowID, queryType string, args ...interface{}) (converter.EncodedValue, error) { diff --git a/internal/workflow_testsuite.go b/internal/workflow_testsuite.go index a4bb8fb09..2b633f3f2 100644 --- a/internal/workflow_testsuite.go +++ b/internal/workflow_testsuite.go @@ -828,8 +828,8 @@ func (e *TestWorkflowEnvironment) QueryWorkflow(queryType string, args ...interf return e.impl.queryWorkflow(queryType, args...) } -func (e *TestWorkflowEnvironment) UpdateWorkflow(name string, ID string, uc UpdateCallbacks, args ...interface{}) { - e.impl.updateWorkflow(name, ID, uc, args...) +func (e *TestWorkflowEnvironment) UpdateWorkflow(name string, id string, uc UpdateCallbacks, args ...interface{}) { + e.impl.updateWorkflow(name, id, uc, args...) } // QueryWorkflowByID queries a child workflow by its ID and returns the result synchronously From 49e0a7192462a5f9a2696bc661fca7d64701594b Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Mon, 17 Jul 2023 09:36:29 -0700 Subject: [PATCH 3/3] Update unit test --- internal/internal_workflow_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/internal_workflow_test.go b/internal/internal_workflow_test.go index 0c64bf136..400ce1749 100644 --- a/internal/internal_workflow_test.go +++ b/internal/internal_workflow_test.go @@ -1438,7 +1438,7 @@ func (s *WorkflowUnitTest) Test_MutatingFunctionsInUpdateValidator() { } env.RegisterWorkflow(wf) env.RegisterDelayedCallback(func() { - env.UpdateWorkflow(updateType, &updateCallback{ + env.UpdateWorkflow(updateType, "testID", &updateCallback{ reject: func(err error) { s.Error(err) },