diff --git a/.github/workflows/docker/dynamic-config-custom.yaml b/.github/workflows/docker/dynamic-config-custom.yaml index da349b212..98059f4a7 100644 --- a/.github/workflows/docker/dynamic-config-custom.yaml +++ b/.github/workflows/docker/dynamic-config-custom.yaml @@ -6,4 +6,6 @@ system.enableActivityEagerExecution: frontend.enableUpdateWorkflowExecution: - value: true frontend.enableUpdateWorkflowExecutionAsyncAccepted: + - value: true +system.enableEagerWorkflowStart: - value: true \ No newline at end of file diff --git a/internal/activity.go b/internal/activity.go index 5a7fe749a..3de596709 100644 --- a/internal/activity.go +++ b/internal/activity.go @@ -100,7 +100,6 @@ type ( // better to rely on the default value. // ScheduleToStartTimeout is always non-retryable. Retrying after this timeout doesn't make sense as it would // just put the Activity Task back into the same Task Queue. - // If ScheduleToClose is not provided then this timeout is required. // Optional: Defaults to unlimited. ScheduleToStartTimeout time.Duration @@ -109,7 +108,7 @@ type ( // to detect that an Activity that didn't complete on time. So this timeout should be as short as the longest // possible execution of the Activity body. Potentially long running Activities must specify HeartbeatTimeout // and call Activity.RecordHeartbeat(ctx, "my-heartbeat") periodically for timely failure detection. - // If ScheduleToClose is not provided then this timeout is required: Defaults to the ScheduleToCloseTimeout value. + // Either this option or ScheduleToClose is required: Defaults to the ScheduleToCloseTimeout value. StartToCloseTimeout time.Duration // HeartbeatTimeout - Heartbeat interval. Activity must call Activity.RecordHeartbeat(ctx, "my-heartbeat") @@ -156,11 +155,13 @@ type ( // LocalActivityOptions stores local activity specific parameters that will be stored inside of a context. LocalActivityOptions struct { // ScheduleToCloseTimeout - The end to end timeout for the local activity including retries. - // This field is required. + // At least one of ScheduleToCloseTimeout or StartToCloseTimeout is required. + // defaults to StartToCloseTimeout if not set. ScheduleToCloseTimeout time.Duration // StartToCloseTimeout - The timeout for a single execution of the local activity. - // Optional: defaults to ScheduleToClose + // At least one of ScheduleToCloseTimeout or StartToCloseTimeout is required. + // defaults to ScheduleToCloseTimeout if not set. StartToCloseTimeout time.Duration // RetryPolicy specify how to retry activity if error happens. @@ -254,25 +255,12 @@ func WithActivityTask( contextPropagators []ContextPropagator, interceptors []WorkerInterceptor, ) (context.Context, error) { - var deadline time.Time scheduled := common.TimeValue(task.GetScheduledTime()) started := common.TimeValue(task.GetStartedTime()) scheduleToCloseTimeout := common.DurationValue(task.GetScheduleToCloseTimeout()) startToCloseTimeout := common.DurationValue(task.GetStartToCloseTimeout()) heartbeatTimeout := common.DurationValue(task.GetHeartbeatTimeout()) - - startToCloseDeadline := started.Add(startToCloseTimeout) - if scheduleToCloseTimeout > 0 { - scheduleToCloseDeadline := scheduled.Add(scheduleToCloseTimeout) - // Minimum of the two deadlines. - if scheduleToCloseDeadline.Before(startToCloseDeadline) { - deadline = scheduleToCloseDeadline - } else { - deadline = startToCloseDeadline - } - } else { - deadline = startToCloseDeadline - } + deadline := calculateActivityDeadline(scheduled, started, scheduleToCloseTimeout, startToCloseTimeout) logger = log.With(logger, tagActivityID, task.ActivityId, @@ -333,6 +321,21 @@ func WithLocalActivityTask( tagWorkflowID, task.params.WorkflowInfo.WorkflowExecution.ID, tagRunID, task.params.WorkflowInfo.WorkflowExecution.RunID, ) + startedTime := time.Now() + scheduleToCloseTimeout := task.params.ScheduleToCloseTimeout + startToCloseTimeout := task.params.StartToCloseTimeout + + if startToCloseTimeout == 0 { + startToCloseTimeout = scheduleToCloseTimeout + } + if scheduleToCloseTimeout == 0 { + scheduleToCloseTimeout = startToCloseTimeout + } + deadline := calculateActivityDeadline(task.scheduledTime, startedTime, scheduleToCloseTimeout, startToCloseTimeout) + if task.attempt > 1 && !task.expireTime.IsZero() && task.expireTime.Before(deadline) { + // this is attempt and expire time is before SCHEDULE_TO_CLOSE timeout + deadline = task.expireTime + } return newActivityContext(ctx, interceptors, &activityEnvironment{ workflowType: &workflowTypeLocal, workflowNamespace: task.params.WorkflowInfo.Namespace, @@ -343,6 +346,9 @@ func WithLocalActivityTask( logger: logger, metricsHandler: metricsHandler, isLocalActivity: true, + deadline: deadline, + scheduledTime: task.scheduledTime, + startedTime: startedTime, dataConverter: dataConverter, attempt: task.attempt, }) @@ -375,3 +381,15 @@ func newActivityContext( return ctx, nil } + +func calculateActivityDeadline(scheduled, started time.Time, scheduleToCloseTimeout, startToCloseTimeout time.Duration) time.Time { + startToCloseDeadline := started.Add(startToCloseTimeout) + if scheduleToCloseTimeout > 0 { + scheduleToCloseDeadline := scheduled.Add(scheduleToCloseTimeout) + // Minimum of the two deadlines. + if scheduleToCloseDeadline.Before(startToCloseDeadline) { + return scheduleToCloseDeadline + } + } + return startToCloseDeadline +} diff --git a/internal/client.go b/internal/client.go index 9092e48f4..de4006de4 100644 --- a/internal/client.go +++ b/internal/client.go @@ -596,6 +596,11 @@ type ( // supported when Temporal server is using ElasticSearch). The key and value type must be registered on Temporal server side. // Use GetSearchAttributes API to get valid key and corresponding value type. SearchAttributes map[string]interface{} + + // EnableEagerStart - request eager execution for this workflow, if a local worker is available. + // + // NOTE: Experimental + EnableEagerStart bool } // RetryPolicy defines the retry policy. @@ -813,6 +818,9 @@ func NewServiceClient(workflowServiceClient workflowservice.WorkflowServiceClien contextPropagators: options.ContextPropagators, workerInterceptors: workerInterceptors, excludeInternalFromRetry: options.ConnectionOptions.excludeInternalFromRetry, + eagerDispatcher: &eagerWorkflowDispatcher{ + workersByTaskQueue: make(map[string][]eagerWorker), + }, } // Create outbound interceptor by wrapping backwards through chain diff --git a/internal/internal_activity.go b/internal/internal_activity.go index 0dba0ae9e..ab2413822 100644 --- a/internal/internal_activity.go +++ b/internal/internal_activity.go @@ -217,7 +217,8 @@ func getValidatedLocalActivityOptions(ctx Context) (*ExecuteLocalActivityOptions } if p.ScheduleToCloseTimeout == 0 { p.ScheduleToCloseTimeout = p.StartToCloseTimeout - } else { + } + if p.StartToCloseTimeout == 0 { p.StartToCloseTimeout = p.ScheduleToCloseTimeout } return p, nil diff --git a/internal/internal_command_state_machine.go b/internal/internal_command_state_machine.go index d758d8ce3..e1b7cc2a9 100644 --- a/internal/internal_command_state_machine.go +++ b/internal/internal_command_state_machine.go @@ -85,10 +85,6 @@ type ( cancelActivityStateMachine struct { *commandStateMachineBase attributes *commandpb.RequestCancelActivityTaskCommandAttributes - - // The commandsHelper.nextCommandEventIDResetCounter when this command - // incremented commandsHelper.commandsCancelledDuringWFCancellation. - cancelledOnEventIDResetCounter uint64 } timerCommandStateMachine struct { @@ -99,10 +95,6 @@ type ( cancelTimerCommandStateMachine struct { *commandStateMachineBase attributes *commandpb.CancelTimerCommandAttributes - - // The commandsHelper.nextCommandEventIDResetCounter when this command - // incremented commandsHelper.commandsCancelledDuringWFCancellation. - cancelledOnEventIDResetCounter uint64 } childWorkflowCommandStateMachine struct { @@ -150,18 +142,10 @@ type ( orderedCommands *list.List commands map[commandID]*list.Element - scheduledEventIDToActivityID map[int64]string - scheduledEventIDToCancellationID map[int64]string - scheduledEventIDToSignalID map[int64]string - versionMarkerLookup map[int64]versionMarker - commandsCancelledDuringWFCancellation int64 - workflowExecutionIsCancelling bool - - // Incremented everytime nextCommandEventID and - // commandsCancelledDuringWFCancellation is reset (i.e. on new workflow - // task). Won't ever happen, but technically the way this value is compared - // is safe for overflow wrap around. - nextCommandEventIDResetCounter uint64 + scheduledEventIDToActivityID map[int64]string + scheduledEventIDToCancellationID map[int64]string + scheduledEventIDToSignalID map[int64]string + versionMarkerLookup map[int64]versionMarker } // panic when command state machine is in illegal state @@ -477,9 +461,6 @@ func (d *commandStateMachineBase) cancel() { case commandStateCommandSent: d.moveState(commandStateCancellationCommandSent, eventCancel) case commandStateInitiated: - if d.helper.workflowExecutionIsCancelling { - d.helper.commandsCancelledDuringWFCancellation++ - } d.moveState(commandStateCanceledAfterInitiated, eventCancel) default: d.failStateTransition(eventCancel) @@ -589,10 +570,6 @@ func (d *activityCommandStateMachine) cancel() { } cancelCmd := d.helper.newCancelActivityStateMachine(attribs) d.helper.addCommand(cancelCmd) - // We must mark the event ID reset counter for when we performed this - // increment so a potential decrement can only decrement if it wasn't - // reset - cancelCmd.cancelledOnEventIDResetCounter = d.helper.nextCommandEventIDResetCounter // We also mark the schedule command as not eager if we haven't sent it yet. // Server behavior differs on eager vs non-eager when scheduling and // cancelling during the same task completion. If it has not been sent this @@ -614,10 +591,6 @@ func (d *timerCommandStateMachine) cancel() { } cancelCmd := d.helper.newCancelTimerCommandStateMachine(attribs) d.helper.addCommand(cancelCmd) - // We must mark the event ID reset counter for when we performed this - // increment so a potential decrement can only decrement if it wasn't - // reset - cancelCmd.cancelledOnEventIDResetCounter = d.helper.nextCommandEventIDResetCounter } d.commandStateMachineBase.cancel() @@ -729,9 +702,6 @@ func (d *childWorkflowCommandStateMachine) handleCancelFailedEvent() { func (d *childWorkflowCommandStateMachine) cancel() { switch d.state { case commandStateStarted: - if d.helper.workflowExecutionIsCancelling { - d.helper.commandsCancelledDuringWFCancellation++ - } d.moveState(commandStateCanceledAfterStarted, eventCancel) // A child workflow may be canceled _after_ something like an activity start // happens inside a simulated goroutine. However, since the state of the @@ -888,11 +858,10 @@ func newCommandsHelper() *commandsHelper { orderedCommands: list.New(), commands: make(map[commandID]*list.Element), - scheduledEventIDToActivityID: make(map[int64]string), - scheduledEventIDToCancellationID: make(map[int64]string), - scheduledEventIDToSignalID: make(map[int64]string), - versionMarkerLookup: make(map[int64]versionMarker), - commandsCancelledDuringWFCancellation: 0, + scheduledEventIDToActivityID: make(map[int64]string), + scheduledEventIDToCancellationID: make(map[int64]string), + scheduledEventIDToSignalID: make(map[int64]string), + versionMarkerLookup: make(map[int64]versionMarker), } } @@ -905,13 +874,20 @@ func (h *commandsHelper) setCurrentWorkflowTaskStartedEventID(workflowTaskStarte // corresponding history event after processing. So we can use workflow task started event id + 2 as the offset as // workflow task completed event is always the first event in the workflow task followed by events generated from // commands. This allows client sdk to deterministically predict history event ids generated by processing of the - // command. We must also add the number of cancel commands that were spawned during cancellation of the workflow - // execution as those canceled command events will show up *after* the workflow task completed event. - h.nextCommandEventID = workflowTaskStartedEventID + 2 + h.commandsCancelledDuringWFCancellation - h.commandsCancelledDuringWFCancellation = 0 - // We must change the counter here so that others who mutate - // commandsCancelledDuringWFCancellation know it has since been reset - h.nextCommandEventIDResetCounter++ + // command. It is possible, notably during workflow cancellation, that commands are generated before the workflow + // task started event is processed. In this case we need to adjust the nextCommandEventID to account for these unsent + // commands.git + var uncountedCommands int64 + for curr := h.orderedCommands.Front(); curr != nil; { + d := curr.Value.(commandStateMachine) + command := d.getCommand() + if command != nil { + uncountedCommands += 1 + } + curr = curr.Next() + } + + h.nextCommandEventID = workflowTaskStartedEventID + 2 + uncountedCommands } func (h *commandsHelper) getNextID() int64 { @@ -974,24 +950,7 @@ func (h *commandsHelper) removeCancelOfResolvedCommand(commandID commandID) { orderedCmdEl, ok := h.commands[commandID] if ok { delete(h.commands, commandID) - command := h.orderedCommands.Remove(orderedCmdEl) - // Sometimes commandsCancelledDuringWFCancellation was incremented before - // it was reset and sometimes not. We make sure the workflow execution is - // actually cancelling since that's the only time we increment the counter - // in the first place. Also, we use the reset counter to see if we're still - // on the same iteration where we may have incremented it before. - if h.workflowExecutionIsCancelling { - switch command := command.(type) { - case *cancelActivityStateMachine: - if command.cancelledOnEventIDResetCounter == h.nextCommandEventIDResetCounter { - h.commandsCancelledDuringWFCancellation-- - } - case *cancelTimerCommandStateMachine: - if command.cancelledOnEventIDResetCounter == h.nextCommandEventIDResetCounter { - h.commandsCancelledDuringWFCancellation-- - } - } - } + _ = h.orderedCommands.Remove(orderedCmdEl) } } diff --git a/internal/internal_eager.go b/internal/internal_eager.go new file mode 100644 index 000000000..eff55c72a --- /dev/null +++ b/internal/internal_eager.go @@ -0,0 +1,35 @@ +// The MIT License +// +// Copyright (c) 2022 Temporal Technologies Inc. All rights reserved. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package internal + +// eagerWorker is the minimal worker interface needed for eager activities and workflows +type eagerWorker interface { + // tryReserveSlot tries to reserver a task slot on the worker without blocking + // caller is expected to release the slot with releaseSlot + tryReserveSlot() bool + // releaseSlot release a task slot acquired by tryReserveSlot + releaseSlot() + // processTaskAsync process a new task on the worker asynchronously and + // call callback once complete + processTaskAsync(task interface{}, callback func()) +} diff --git a/internal/internal_eager_activity.go b/internal/internal_eager_activity.go index 79b0a5857..c0a7536f8 100644 --- a/internal/internal_eager_activity.go +++ b/internal/internal_eager_activity.go @@ -34,7 +34,7 @@ import ( type eagerActivityExecutor struct { eagerActivityExecutorOptions - activityWorker *activityWorker + activityWorker eagerWorker heldSlotCount int countLock sync.Mutex } @@ -97,11 +97,8 @@ func (e *eagerActivityExecutor) reserveOnePendingSlot() bool { // No more room return false } - // Reserve a spot for our request via a non-blocking attempt to take a poller - // request entry which essentially reserves a spot - select { - case <-e.activityWorker.worker.pollerRequestCh: - default: + // Reserve a spot for our request via a non-blocking attempt + if !e.activityWorker.tryReserveSlot() { return false } @@ -131,35 +128,20 @@ func (e *eagerActivityExecutor) handleResponse( // Put every unfulfilled slot back on the poller channel for i := 0; i < unfulfilledSlots; i++ { - // Like other parts that push onto this channel, we assume there is room - // because we took it, so we do a blocking send - e.activityWorker.worker.pollerRequestCh <- struct{}{} + e.activityWorker.releaseSlot() } // Start each activity asynchronously for _, activity := range resp.GetActivityTasks() { - // Before starting the goroutine we have to increase the wait group counter - // that the poller would have otherwise increased - e.activityWorker.worker.stopWG.Add(1) // Asynchronously execute task := &activityTask{activity} - go func() { - // Mark completed when complete - defer func() { - // Like other sends to this channel, we assume there is room because we - // reserved it, so we make a blocking send. The processTask does not do - // this itself because our task is *activityTask, not *polledTask. - e.activityWorker.worker.pollerRequestCh <- struct{}{} - // Decrement executing count - e.countLock.Lock() - e.heldSlotCount-- - e.countLock.Unlock() - }() - - // Process the task synchronously. We call the processor on the base - // worker instead of a higher level so we can get the benefits of metrics, - // stop wait group update, etc. - e.activityWorker.worker.processTask(task) - }() + e.activityWorker.processTaskAsync(task, func() { + // The processTaskAsync does not do this itself because our task is *activityTask, not *polledTask. + e.activityWorker.releaseSlot() + // Decrement executing count + e.countLock.Lock() + e.heldSlotCount-- + e.countLock.Unlock() + }) } } diff --git a/internal/internal_eager_activity_test.go b/internal/internal_eager_activity_test.go index aad9720c9..dd3ad1393 100644 --- a/internal/internal_eager_activity_test.go +++ b/internal/internal_eager_activity_test.go @@ -38,7 +38,7 @@ import ( func TestEagerActivityDisabled(t *testing.T) { exec := newEagerActivityExecutor(eagerActivityExecutorOptions{disabled: true, taskQueue: "task-queue1"}) exec.activityWorker = newActivityWorker(nil, - workerExecutionParameters{TaskQueue: "task-queue1"}, nil, newRegistry(), nil) + workerExecutionParameters{TaskQueue: "task-queue1"}, nil, newRegistry(), nil).worker // Turns requests to false when disabled var req workflowservice.RespondWorkflowTaskCompletedRequest @@ -59,11 +59,13 @@ func TestEagerActivityNoActivityWorker(t *testing.T) { func TestEagerActivityWrongTaskQueue(t *testing.T) { exec := newEagerActivityExecutor(eagerActivityExecutorOptions{taskQueue: "task-queue1"}) - exec.activityWorker = newActivityWorker(nil, - workerExecutionParameters{TaskQueue: "task-queue1", ConcurrentActivityExecutionSize: 10}, nil, newRegistry(), nil) + activityWorker := newActivityWorker(nil, workerExecutionParameters{TaskQueue: "task-queue1", ConcurrentActivityExecutionSize: 10}, nil, newRegistry(), nil) + activityWorker.worker.isWorkerStarted = true + + exec.activityWorker = activityWorker.worker // Fill up the poller request channel for i := 0; i < 10; i++ { - exec.activityWorker.worker.pollerRequestCh <- struct{}{} + activityWorker.worker.pollerRequestCh <- struct{}{} } // Turns requests to false when wrong task queue @@ -77,11 +79,14 @@ func TestEagerActivityWrongTaskQueue(t *testing.T) { func TestEagerActivityMaxPerTask(t *testing.T) { exec := newEagerActivityExecutor(eagerActivityExecutorOptions{taskQueue: "task-queue1"}) - exec.activityWorker = newActivityWorker(nil, + activityWorker := newActivityWorker(nil, workerExecutionParameters{TaskQueue: "task-queue1", ConcurrentActivityExecutionSize: 10}, nil, newRegistry(), nil) + activityWorker.worker.isWorkerStarted = true + + exec.activityWorker = activityWorker.worker // Fill up the poller request channel for i := 0; i < 10; i++ { - exec.activityWorker.worker.pollerRequestCh <- struct{}{} + activityWorker.worker.pollerRequestCh <- struct{}{} } // Add 8, but it limits to only the first 3 @@ -99,16 +104,19 @@ func TestEagerActivityCounts(t *testing.T) { // We'll create an eager activity executor with 3 max eager concurrent and 5 // max concurrent exec := newEagerActivityExecutor(eagerActivityExecutorOptions{taskQueue: "task-queue1", maxConcurrent: 3}) - exec.activityWorker = newActivityWorker(nil, + activityWorker := newActivityWorker(nil, workerExecutionParameters{TaskQueue: "task-queue1", ConcurrentActivityExecutionSize: 5}, nil, newRegistry(), nil) + activityWorker.worker.isWorkerStarted = true + + exec.activityWorker = activityWorker.worker // Fill up the poller request channel - slotsCh := exec.activityWorker.worker.pollerRequestCh + slotsCh := activityWorker.worker.pollerRequestCh for i := 0; i < 5; i++ { slotsCh <- struct{}{} } // Replace task processor taskProcessor := newWaitingTaskProcessor() - exec.activityWorker.worker.options.taskWorker = taskProcessor + activityWorker.worker.options.taskWorker = taskProcessor // Request 2 commands on wrong task queue then 5 commands on proper task queue // but have 2nd request disabled diff --git a/internal/internal_eager_workflow.go b/internal/internal_eager_workflow.go new file mode 100644 index 000000000..2a05563b8 --- /dev/null +++ b/internal/internal_eager_workflow.go @@ -0,0 +1,99 @@ +// The MIT License +// +// Copyright (c) 2022 Temporal Technologies Inc. All rights reserved. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package internal + +import ( + "math/rand" + "sync" + "sync/atomic" + + "go.temporal.io/api/workflowservice/v1" +) + +// eagerWorkflowDispatcher is responsible for finding an available worker for an eager workflow task. +type eagerWorkflowDispatcher struct { + lock sync.RWMutex + workersByTaskQueue map[string][]eagerWorker +} + +// registerWorker registers a worker that can be used for eager workflow dispatch +func (e *eagerWorkflowDispatcher) registerWorker(worker *workflowWorker) { + e.lock.Lock() + defer e.lock.Unlock() + e.workersByTaskQueue[worker.executionParameters.TaskQueue] = append(e.workersByTaskQueue[worker.executionParameters.TaskQueue], worker.worker) +} + +// applyToRequest updates request if eager workflow dispatch is possible and returns the eagerWorkflowExecutor to use +func (e *eagerWorkflowDispatcher) applyToRequest(request *workflowservice.StartWorkflowExecutionRequest) *eagerWorkflowExecutor { + // Try every worker that is assigned to the desired task queue. + e.lock.RLock() + workers := e.workersByTaskQueue[request.GetTaskQueue().Name] + randWorkers := make([]eagerWorker, len(workers)) + // Copy the slice so we can release the lock. + copy(randWorkers, workers) + e.lock.RUnlock() + rand.Shuffle(len(randWorkers), func(i, j int) { randWorkers[i], randWorkers[j] = randWorkers[j], randWorkers[i] }) + for _, worker := range randWorkers { + if worker.tryReserveSlot() { + request.RequestEagerExecution = true + return &eagerWorkflowExecutor{ + worker: worker, + } + } + } + return nil +} + +// eagerWorkflowExecutor is a worker-scoped executor for an eager workflow task. +type eagerWorkflowExecutor struct { + handledResponse atomic.Bool + worker eagerWorker +} + +// handleResponse of an eager workflow task from a StartWorkflowExecution request. +func (e *eagerWorkflowExecutor) handleResponse(response *workflowservice.PollWorkflowTaskQueueResponse) { + if !e.handledResponse.CompareAndSwap(false, true) { + panic("eagerWorkflowExecutor trying to handle multiple responses") + } + // Asynchronously execute the task + task := &eagerWorkflowTask{ + task: response, + } + e.worker.processTaskAsync(task, func() { + // The processTaskAsync does not do this itself because our task is *eagerWorkflowTask, not *polledTask. + e.worker.releaseSlot() + }) +} + +// release the executor task slot this eagerWorkflowExecutor was holding. +// If it is currently handling a responses or has already released the task slot +// then do nothing. +func (e *eagerWorkflowExecutor) release() { + if e.handledResponse.CompareAndSwap(false, true) { + // Assume there is room because it is reserved on creation, so we make a blocking send. + // The processTask does not do this itself because our task is not *polledTask. + e.worker.releaseSlot() + } else { + panic("trying to release an eagerWorkflowExecutor that has already been released") + } +} diff --git a/internal/internal_eager_workflow_test.go b/internal/internal_eager_workflow_test.go new file mode 100644 index 000000000..0b8d98d9c --- /dev/null +++ b/internal/internal_eager_workflow_test.go @@ -0,0 +1,116 @@ +// The MIT License +// +// Copyright (c) 2022 Temporal Technologies Inc. All rights reserved. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package internal + +import ( + "testing" + + "github.com/stretchr/testify/require" + taskqueuepb "go.temporal.io/api/taskqueue/v1" + "go.temporal.io/api/workflowservice/v1" +) + +type eagerWorkerMock struct { + tryReserveSlotCallback func() bool + releaseSlotCallback func() + processTaskAsyncCallback func(interface{}, func()) +} + +func (e *eagerWorkerMock) tryReserveSlot() bool { + return e.tryReserveSlotCallback() +} + +func (e *eagerWorkerMock) releaseSlot() { + e.releaseSlotCallback() +} + +func (e *eagerWorkerMock) processTaskAsync(task interface{}, callback func()) { + e.processTaskAsyncCallback(task, callback) +} + +func TestEagerWorkflowDispatchNoWorkerOnTaskQueue(t *testing.T) { + dispatcher := &eagerWorkflowDispatcher{ + workersByTaskQueue: make(map[string][]eagerWorker), + } + dispatcher.registerWorker(&workflowWorker{ + executionParameters: workerExecutionParameters{TaskQueue: "bad-task-queue"}, + }) + + request := &workflowservice.StartWorkflowExecutionRequest{ + TaskQueue: &taskqueuepb.TaskQueue{Name: "task-queue"}, + } + exec := dispatcher.applyToRequest(request) + require.Nil(t, exec) + require.False(t, request.GetRequestEagerExecution()) +} + +func TestEagerWorkflowDispatchAvailableWorker(t *testing.T) { + dispatcher := &eagerWorkflowDispatcher{ + workersByTaskQueue: make(map[string][]eagerWorker), + } + + availableWorker := &eagerWorkerMock{ + tryReserveSlotCallback: func() bool { return true }, + } + dispatcher.workersByTaskQueue["task-queue"] = []eagerWorker{ + &eagerWorkerMock{ + tryReserveSlotCallback: func() bool { return false }, + }, + &eagerWorkerMock{ + tryReserveSlotCallback: func() bool { return false }, + }, + availableWorker, + } + + request := &workflowservice.StartWorkflowExecutionRequest{ + TaskQueue: &taskqueuepb.TaskQueue{Name: "task-queue"}, + } + exec := dispatcher.applyToRequest(request) + require.Equal(t, exec.worker, availableWorker) + require.True(t, request.GetRequestEagerExecution()) +} + +func TestEagerWorkflowExecutor(t *testing.T) { + slotReleased := false + worker := &eagerWorkerMock{ + tryReserveSlotCallback: func() bool { return true }, + releaseSlotCallback: func() { + slotReleased = true + }, + processTaskAsyncCallback: func(task interface{}, callback func()) { + callback() + }, + } + + exec := &eagerWorkflowExecutor{ + worker: worker, + } + exec.handleResponse(&workflowservice.PollWorkflowTaskQueueResponse{}) + require.True(t, slotReleased) + require.Panics(t, func() { + exec.release() + }) + require.Panics(t, func() { + exec.handleResponse(&workflowservice.PollWorkflowTaskQueueResponse{}) + }) +} diff --git a/internal/internal_event_handlers.go b/internal/internal_event_handlers.go index fc9c92fc4..9cb8fa9e8 100644 --- a/internal/internal_event_handlers.go +++ b/internal/internal_event_handlers.go @@ -184,6 +184,7 @@ type ( pastFirstWFT bool // Set true once this LA has lived for more than one workflow task retryPolicy *RetryPolicy expireTime time.Time + scheduledTime time.Time // Time the activity was scheduled initially. header *commonpb.Header } @@ -494,7 +495,6 @@ func validateAndSerializeMemo(memoMap map[string]interface{}, dc converter.DataC func (wc *workflowEnvironmentImpl) RegisterCancelHandler(handler func()) { wrappedHandler := func() { - wc.commandsHelper.workflowExecutionIsCancelling = true handler() } wc.cancelHandler = wrappedHandler @@ -683,12 +683,13 @@ func (wc *workflowEnvironmentImpl) ExecuteLocalActivity(params ExecuteLocalActiv func newLocalActivityTask(params ExecuteLocalActivityParams, callback LocalActivityResultHandler, activityID string) *localActivityTask { task := &localActivityTask{ - activityID: activityID, - params: ¶ms, - callback: callback, - retryPolicy: params.RetryPolicy, - attempt: params.Attempt, - header: params.Header, + activityID: activityID, + params: ¶ms, + callback: callback, + retryPolicy: params.RetryPolicy, + attempt: params.Attempt, + header: params.Header, + scheduledTime: time.Now(), } if params.ScheduleToCloseTimeout > 0 { diff --git a/internal/internal_public.go b/internal/internal_public.go index a990d5dd1..baf982d24 100644 --- a/internal/internal_public.go +++ b/internal/internal_public.go @@ -78,6 +78,8 @@ type ( // WorkflowTaskHandler represents workflow task handlers. WorkflowTaskHandler interface { + WorkflowContextManager + // Processes the workflow task // The response could be: // - RespondWorkflowTaskCompletedRequest @@ -85,8 +87,21 @@ type ( // - RespondQueryTaskCompletedRequest ProcessWorkflowTask( task *workflowTask, + ctx *workflowExecutionContextImpl, f workflowTaskHeartbeatFunc, - ) (response interface{}, resetter EventLevelResetter, err error) + ) (response interface{}, err error) + } + + WorkflowContextManager interface { + // GetOrCreateWorkflowContext finds an existing cached context object + // for the provided task's run ID or creates a new object, adds it to + // cache, and returns it. In all non-error cases the returned context + // object is in a locked state (i.e. + // workflowExecutionContextImpl.Lock() has been called). + GetOrCreateWorkflowContext( + task *workflowservice.PollWorkflowTaskQueueResponse, + historyIterator HistoryIterator, + ) (*workflowExecutionContextImpl, error) } // ActivityTaskHandler represents activity task handlers. diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index 7b42ef22b..56ba636cb 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -94,6 +94,11 @@ type ( laRetryCh chan *localActivityTask } + // eagerWorkflowTask represents a workflow task sent from an eager workflow executor + eagerWorkflowTask struct { + task *workflowservice.PollWorkflowTaskQueueResponse + } + // activityTask wraps a activity task. activityTask struct { task *workflowservice.PollActivityTaskQueueResponse @@ -469,11 +474,19 @@ func newWorkflowExecutionContext( return workflowContext } +// Lock acquires the lock on this context object, use Unlock(error) to release +// the lock. func (w *workflowExecutionContextImpl) Lock() { w.mutex.Lock() } +// Unlock cleans up after the provided error and it's own internal view of the +// workflow error state by clearing itself and removing itself from cache as +// needed. It is an error to call this function without having called the Lock +// function first and the behavior is undefined. Regardless of the error +// handling involved, the context will be unlocked when this call returns. func (w *workflowExecutionContextImpl) Unlock(err error) { + defer w.mutex.Unlock() if err != nil || w.err != nil || w.isWorkflowCompleted || (w.wth.cache.MaxWorkflowCacheSize() <= 0 && !w.hasPendingLocalActivityWork()) { // TODO: in case of closed, it asumes the close command always succeed. need server side change to return @@ -491,8 +504,6 @@ func (w *workflowExecutionContextImpl) Unlock(err error) { // exited w.clearState() } - - w.mutex.Unlock() } func (w *workflowExecutionContextImpl) getEventHandler() *workflowExecutionEventHandlerImpl { @@ -626,7 +637,7 @@ func (wth *workflowTaskHandlerImpl) createWorkflowContext(task *workflowservice. return newWorkflowExecutionContext(workflowInfo, wth), nil } -func (wth *workflowTaskHandlerImpl) getOrCreateWorkflowContext( +func (wth *workflowTaskHandlerImpl) GetOrCreateWorkflowContext( task *workflowservice.PollWorkflowTaskQueueResponse, historyIterator HistoryIterator, ) (workflowContext *workflowExecutionContextImpl, err error) { @@ -751,10 +762,11 @@ func (w *workflowExecutionContextImpl) resetStateIfDestroyed(task *workflowservi // ProcessWorkflowTask processes all the events of the workflow task. func (wth *workflowTaskHandlerImpl) ProcessWorkflowTask( workflowTask *workflowTask, + workflowContext *workflowExecutionContextImpl, heartbeatFunc workflowTaskHeartbeatFunc, -) (completeRequest interface{}, resetter EventLevelResetter, errRet error) { +) (completeRequest interface{}, errRet error) { if workflowTask == nil || workflowTask.task == nil { - return nil, nil, errors.New("nil workflow task provided") + return nil, errors.New("nil workflow task provided") } task := workflowTask.task if task.History == nil || len(task.History.Events) == 0 { @@ -763,11 +775,11 @@ func (wth *workflowTaskHandlerImpl) ProcessWorkflowTask( } } if task.Query == nil && len(task.History.Events) == 0 { - return nil, nil, errors.New("nil or empty history") + return nil, errors.New("nil or empty history") } if task.Query != nil && len(task.Queries) != 0 { - return nil, nil, errors.New("invalid query workflow task") + return nil, errors.New("invalid query workflow task") } runID := task.WorkflowExecution.GetRunId() @@ -781,18 +793,12 @@ func (wth *workflowTaskHandlerImpl) ProcessWorkflowTask( tagPreviousStartedEventID, task.GetPreviousStartedEventId()) }) - workflowContext, err := wth.getOrCreateWorkflowContext(task, workflowTask.historyIterator) - if err != nil { - return nil, nil, err - } - - defer func() { - workflowContext.Unlock(errRet) - }() - - var response interface{} + var ( + response interface{} + err error + heartbeatTimer *time.Timer + ) - var heartbeatTimer *time.Timer defer func() { if heartbeatTimer != nil { heartbeatTimer.Stop() @@ -877,7 +883,6 @@ processWorkflowLoop: } errRet = err completeRequest = response - resetter = workflowContext.SetPreviousStartedEventID return } @@ -1189,6 +1194,9 @@ func (w *workflowExecutionContextImpl) CompleteWorkflowTask(workflowTask *workfl task := eventHandler.pendingLaTasks[activityID] task.wc = w task.workflowTask = workflowTask + + task.scheduledTime = time.Now() + if !w.laTunnel.sendTask(task) { unstartedLaTasks[activityID] = struct{}{} task.wc = nil @@ -1245,8 +1253,6 @@ func (w *workflowExecutionContextImpl) SetCurrentTask(task *workflowservice.Poll } func (w *workflowExecutionContextImpl) SetPreviousStartedEventID(eventID int64) { - w.mutex.Lock() // This call can race against the cache eviction thread - see clearState - defer w.mutex.Unlock() w.previousStartedEventID = eventID } diff --git a/internal/internal_task_handlers_interfaces_test.go b/internal/internal_task_handlers_interfaces_test.go index e43578d9a..be9d9c723 100644 --- a/internal/internal_task_handlers_interfaces_test.go +++ b/internal/internal_task_handlers_interfaces_test.go @@ -53,11 +53,19 @@ type sampleWorkflowTaskHandler struct{} func (wth sampleWorkflowTaskHandler) ProcessWorkflowTask( workflowTask *workflowTask, + _ *workflowExecutionContextImpl, _ workflowTaskHeartbeatFunc, -) (interface{}, EventLevelResetter, error) { +) (interface{}, error) { return &workflowservice.RespondWorkflowTaskCompletedRequest{ TaskToken: workflowTask.task.TaskToken, - }, nil, nil + }, nil +} + +func (wth sampleWorkflowTaskHandler) GetOrCreateWorkflowContext( + task *workflowservice.PollWorkflowTaskQueueResponse, + historyIterator HistoryIterator, +) (*workflowExecutionContextImpl, error) { + return nil, nil } func newSampleWorkflowTaskHandler() *sampleWorkflowTaskHandler { @@ -115,7 +123,7 @@ func (s *PollLayerInterfacesTestSuite) TestProcessWorkflowTaskInterface() { // Process task and respond to the service. taskHandler := newSampleWorkflowTaskHandler() - request, _, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: response}, nil) + request, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: response}, nil, nil) completionRequest := request.(*workflowservice.RespondWorkflowTaskCompletedRequest) s.NoError(err) diff --git a/internal/internal_task_handlers_test.go b/internal/internal_task_handlers_test.go index 9436ec06d..c633abbab 100644 --- a/internal/internal_task_handlers_test.go +++ b/internal/internal_task_handlers_test.go @@ -143,6 +143,12 @@ func (t *TaskHandlersTestSuite) SetupSuite() { t.namespace = "default" } +func (t *TaskHandlersTestSuite) TearDownTest() { + if cache := *sharedWorkerCachePtr.workflowCache; cache != nil { + cache.Clear() + } +} + func TestTaskHandlersTestSuite(t *testing.T) { suite.Run(t, &TaskHandlersTestSuite{ registry: newRegistry(), @@ -514,13 +520,25 @@ func (t *TaskHandlersTestSuite) getTestWorkerExecutionParams() workerExecutionPa } } +func (t *TaskHandlersTestSuite) mustWorkflowContextImpl( + task *workflowTask, + cm WorkflowContextManager, +) *workflowExecutionContextImpl { + wfctx, err := cm.GetOrCreateWorkflowContext(task.task, task.historyIterator) + t.Require().NoError(err) + return wfctx +} + func (t *TaskHandlersTestSuite) testWorkflowTaskWorkflowExecutionStartedHelper(params workerExecutionParameters) { testEvents := []*historypb.HistoryEvent{ createTestEventWorkflowExecutionStarted(1, &historypb.WorkflowExecutionStartedEventAttributes{TaskQueue: &taskqueuepb.TaskQueue{Name: testWorkflowTaskTaskqueue}}), } task := createWorkflowTask(testEvents, 0, "HelloWorld_Workflow") taskHandler := newWorkflowTaskHandler(params, nil, t.registry) - request, _, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil) + wftask := workflowTask{task: task} + wfctx := t.mustWorkflowContextImpl(&wftask, taskHandler) + request, err := taskHandler.ProcessWorkflowTask(&wftask, wfctx, nil) + wfctx.Unlock(err) response := request.(*workflowservice.RespondWorkflowTaskCompletedRequest) t.NoError(err) t.NotNil(response) @@ -561,7 +579,10 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_BinaryChecksum() { task := createWorkflowTask(testEvents, 8, "BinaryChecksumWorkflow") params := t.getTestWorkerExecutionParams() taskHandler := newWorkflowTaskHandler(params, nil, t.registry) - request, _, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil) + wftask := workflowTask{task: task} + wfctx := t.mustWorkflowContextImpl(&wftask, taskHandler) + request, err := taskHandler.ProcessWorkflowTask(&wftask, wfctx, nil) + wfctx.Unlock(err) response := request.(*workflowservice.RespondWorkflowTaskCompletedRequest) t.NoError(err) @@ -589,7 +610,10 @@ func (t *TaskHandlersTestSuite) TestRespondsToWFTWithWorkerBinaryID() { params := t.getTestWorkerExecutionParams() params.WorkerBuildID = workerBuildID taskHandler := newWorkflowTaskHandler(params, nil, t.registry) - request, _, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil) + wftask := workflowTask{task: task} + wfctx := t.mustWorkflowContextImpl(&wftask, taskHandler) + request, err := taskHandler.ProcessWorkflowTask(&wftask, wfctx, nil) + wfctx.Unlock(err) response := request.(*workflowservice.RespondWorkflowTaskCompletedRequest) t.NoError(err) t.NotNil(response) @@ -618,7 +642,10 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_ActivityTaskScheduled() { task := createWorkflowTask(testEvents[0:3], 0, "HelloWorld_Workflow") params := t.getTestWorkerExecutionParams() taskHandler := newWorkflowTaskHandler(params, nil, t.registry) - request, _, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil) + wftask := workflowTask{task: task} + wfctx := t.mustWorkflowContextImpl(&wftask, taskHandler) + request, err := taskHandler.ProcessWorkflowTask(&wftask, wfctx, nil) + wfctx.Unlock(err) response := request.(*workflowservice.RespondWorkflowTaskCompletedRequest) t.NoError(err) @@ -629,7 +656,10 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_ActivityTaskScheduled() { // Schedule an activity and see if we complete workflow, Having only one last command. task = createWorkflowTask(testEvents, 3, "HelloWorld_Workflow") - request, _, err = taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil) + wftask = workflowTask{task: task} + wfctx = t.mustWorkflowContextImpl(&wftask, taskHandler) + request, err = taskHandler.ProcessWorkflowTask(&wftask, wfctx, nil) + wfctx.Unlock(err) response = request.(*workflowservice.RespondWorkflowTaskCompletedRequest) t.NoError(err) t.NotNil(response) @@ -665,7 +695,10 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_QueryWorkflow_Sticky() { task := createWorkflowTask(testEvents[0:1], 0, "HelloWorld_Workflow") task.StartedEventId = 1 task.WorkflowExecution = execution - request, _, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil) + wftask := workflowTask{task: task} + wfctx := t.mustWorkflowContextImpl(&wftask, taskHandler) + request, err := taskHandler.ProcessWorkflowTask(&wftask, wfctx, nil) + wfctx.Unlock(err) response := request.(*workflowservice.RespondWorkflowTaskCompletedRequest) t.NoError(err) t.NotNil(response) @@ -676,7 +709,10 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_QueryWorkflow_Sticky() { // then check the current state using query task task = createQueryTask([]*historypb.HistoryEvent{}, 6, "HelloWorld_Workflow", queryType) task.WorkflowExecution = execution - queryResp, _, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil) + wftask = workflowTask{task: task} + wfctx = t.mustWorkflowContextImpl(&wftask, taskHandler) + queryResp, err := taskHandler.ProcessWorkflowTask(&wftask, wfctx, nil) + wfctx.Unlock(err) t.NoError(err) t.verifyQueryResult(queryResp, "waiting-activity-result") } @@ -704,30 +740,45 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_QueryWorkflow_NonSticky() { // query after first workflow task (notice the previousStartEventID is always the last eventID for query task) task := createQueryTask(testEvents[0:3], 3, "HelloWorld_Workflow", queryType) taskHandler := newWorkflowTaskHandler(params, nil, t.registry) - response, _, _ := taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil) + wftask := workflowTask{task: task} + wfctx := t.mustWorkflowContextImpl(&wftask, taskHandler) + response, err := taskHandler.ProcessWorkflowTask(&wftask, wfctx, nil) + wfctx.Unlock(err) t.verifyQueryResult(response, "waiting-activity-result") // query after activity task complete but before second workflow task started task = createQueryTask(testEvents[0:7], 7, "HelloWorld_Workflow", queryType) taskHandler = newWorkflowTaskHandler(params, nil, t.registry) - response, _, _ = taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil) + wftask = workflowTask{task: task} + wfctx = t.mustWorkflowContextImpl(&wftask, taskHandler) + response, _ = taskHandler.ProcessWorkflowTask(&wftask, wfctx, nil) + wfctx.Unlock(err) t.verifyQueryResult(response, "waiting-activity-result") // query after second workflow task task = createQueryTask(testEvents[0:8], 8, "HelloWorld_Workflow", queryType) taskHandler = newWorkflowTaskHandler(params, nil, t.registry) - response, _, _ = taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil) + wftask = workflowTask{task: task} + wfctx = t.mustWorkflowContextImpl(&wftask, taskHandler) + response, _ = taskHandler.ProcessWorkflowTask(&wftask, wfctx, nil) + wfctx.Unlock(err) t.verifyQueryResult(response, "done") // query after second workflow task with extra events task = createQueryTask(testEvents[0:9], 9, "HelloWorld_Workflow", queryType) taskHandler = newWorkflowTaskHandler(params, nil, t.registry) - response, _, _ = taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil) + wftask = workflowTask{task: task} + wfctx = t.mustWorkflowContextImpl(&wftask, taskHandler) + response, _ = taskHandler.ProcessWorkflowTask(&wftask, wfctx, nil) + wfctx.Unlock(err) t.verifyQueryResult(response, "done") task = createQueryTask(testEvents[0:9], 9, "HelloWorld_Workflow", "invalid-query-type") taskHandler = newWorkflowTaskHandler(params, nil, t.registry) - response, _, _ = taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil) + wftask = workflowTask{task: task} + wfctx = t.mustWorkflowContextImpl(&wftask, taskHandler) + response, _ = taskHandler.ProcessWorkflowTask(&wftask, wfctx, nil) + wfctx.Unlock(err) t.NotNil(response) queryResp, ok := response.(*workflowservice.RespondQueryTaskCompletedRequest) t.True(ok) @@ -769,8 +820,11 @@ func (t *TaskHandlersTestSuite) TestCacheEvictionWhenErrorOccurs() { task := createWorkflowTask(testEvents, 3, "HelloWorld_Workflow") // newWorkflowTaskWorkerInternal will set the laTunnel in taskHandler, without it, ProcessWorkflowTask() // will fail as it can't find laTunnel in newWorkerCache(). - newWorkflowTaskWorkerInternal(taskHandler, t.service, params, make(chan struct{}), nil) - request, _, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil) + newWorkflowTaskWorkerInternal(taskHandler, taskHandler, t.service, params, make(chan struct{}), nil) + wftask := workflowTask{task: task} + wfctx := t.mustWorkflowContextImpl(&wftask, taskHandler) + request, err := taskHandler.ProcessWorkflowTask(&wftask, wfctx, nil) + wfctx.Unlock(err) t.Error(err) t.Nil(request) @@ -791,21 +845,27 @@ func (t *TaskHandlersTestSuite) TestWithMissingHistoryEvents() { } params := t.getTestWorkerExecutionParams() params.WorkflowPanicPolicy = BlockWorkflow + t.Require().Equal(0, params.cache.getWorkflowCache().Size(), + "Suite teardown should have reset cache state") for _, startEventID := range []int64{0, 3} { - taskHandler := newWorkflowTaskHandler(params, nil, t.registry) - task := createWorkflowTask(testEvents, startEventID, "HelloWorld_Workflow") - // newWorkflowTaskWorkerInternal will set the laTunnel in taskHandler, without it, ProcessWorkflowTask() - // will fail as it can't find laTunnel in newWorkerCache(). - newWorkflowTaskWorkerInternal(taskHandler, t.service, params, make(chan struct{}), nil) - request, _, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil) - - t.Error(err) - t.Nil(request) - t.Contains(err.Error(), "missing history events") + t.Run(fmt.Sprintf("startEventID=%v", startEventID), func() { + taskHandler := newWorkflowTaskHandler(params, nil, t.registry) + task := createWorkflowTask(testEvents, startEventID, "HelloWorld_Workflow") + // newWorkflowTaskWorkerInternal will set the laTunnel in taskHandler, without it, ProcessWorkflowTask() + // will fail as it can't find laTunnel in newWorkerCache(). + newWorkflowTaskWorkerInternal(taskHandler, taskHandler, t.service, params, make(chan struct{}), nil) + wftask := workflowTask{task: task} + wfctx := t.mustWorkflowContextImpl(&wftask, taskHandler) + request, err := taskHandler.ProcessWorkflowTask(&wftask, wfctx, nil) + wfctx.Unlock(err) + + t.Error(err) + t.Nil(request) + t.Contains(err.Error(), "missing history events") - // There should be nothing in the cache. - t.EqualValues(params.cache.getWorkflowCache().Size(), 0) + t.Equal(0, params.cache.getWorkflowCache().Size(), "cache should be empty") + }) } } @@ -848,8 +908,11 @@ func (t *TaskHandlersTestSuite) TestWithTruncatedHistory() { task.StartedEventId = tc.startedEventID // newWorkflowTaskWorkerInternal will set the laTunnel in taskHandler, without it, ProcessWorkflowTask() // will fail as it can't find laTunnel in newWorkerCache(). - newWorkflowTaskWorkerInternal(taskHandler, t.service, params, make(chan struct{}), nil) - request, _, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil) + newWorkflowTaskWorkerInternal(taskHandler, taskHandler, t.service, params, make(chan struct{}), nil) + wftask := workflowTask{task: task} + wfctx := t.mustWorkflowContextImpl(&wftask, taskHandler) + request, err := taskHandler.ProcessWorkflowTask(&wftask, wfctx, nil) + wfctx.Unlock(err) if tc.isResultErr { t.Error(err, "testcase %v failed", i) @@ -908,7 +971,10 @@ func (t *TaskHandlersTestSuite) testSideEffectDeferHelper(cacheSize int) { taskHandler := newWorkflowTaskHandler(params, nil, t.registry) task := createWorkflowTask(testEvents, 0, workflowName) - _, _, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil) + wftask := workflowTask{task: task} + wfctx := t.mustWorkflowContextImpl(&wftask, taskHandler) + _, err := taskHandler.ProcessWorkflowTask(&wftask, wfctx, nil) + wfctx.Unlock(err) t.Nil(err) // Make sure the workflow coroutine has exited. @@ -940,7 +1006,10 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_NondeterministicDetection() { params.WorkerStopChannel = stopC taskHandler := newWorkflowTaskHandler(params, nil, t.registry) - request, _, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil) + wftask := workflowTask{task: task} + wfctx := t.mustWorkflowContextImpl(&wftask, taskHandler) + request, err := taskHandler.ProcessWorkflowTask(&wftask, wfctx, nil) + wfctx.Unlock(err) response := request.(*workflowservice.RespondWorkflowTaskCompletedRequest) // there should be no error as the history events matched the commands. t.NoError(err) @@ -951,8 +1020,11 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_NondeterministicDetection() { task = createWorkflowTask(testEvents, 3, "HelloWorld_Workflow") // newWorkflowTaskWorkerInternal will set the laTunnel in taskHandler, without it, ProcessWorkflowTask() // will fail as it can't find laTunnel in newWorkerCache(). - newWorkflowTaskWorkerInternal(taskHandler, t.service, params, stopC, nil) - request, _, err = taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil) + newWorkflowTaskWorkerInternal(taskHandler, taskHandler, t.service, params, stopC, nil) + wftask = workflowTask{task: task} + wfctx = t.mustWorkflowContextImpl(&wftask, taskHandler) + request, err = taskHandler.ProcessWorkflowTask(&wftask, wfctx, nil) + wfctx.Unlock(err) t.Error(err) t.Nil(request) t.Contains(err.Error(), "nondeterministic") @@ -962,7 +1034,10 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_NondeterministicDetection() { params.WorkflowPanicPolicy = FailWorkflow failOnNondeterminismTaskHandler := newWorkflowTaskHandler(params, nil, t.registry) task = createWorkflowTask(testEvents, 3, "HelloWorld_Workflow") - request, _, err = failOnNondeterminismTaskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil) + wftask = workflowTask{task: task} + wfctx = t.mustWorkflowContextImpl(&wftask, failOnNondeterminismTaskHandler) + request, err = failOnNondeterminismTaskHandler.ProcessWorkflowTask(&wftask, wfctx, nil) + wfctx.Unlock(err) // When FailWorkflow policy is set, task handler does not return an error, // because it will indicate non determinism in the request. t.NoError(err) @@ -980,7 +1055,10 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_NondeterministicDetection() { // now with different package name to activity type testEvents[4].GetActivityTaskScheduledEventAttributes().ActivityType.Name = "new-package.Greeter_Activity" task = createWorkflowTask(testEvents, 3, "HelloWorld_Workflow") - request, _, err = taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil) + wftask = workflowTask{task: task} + wfctx = t.mustWorkflowContextImpl(&wftask, taskHandler) + request, err = taskHandler.ProcessWorkflowTask(&wftask, wfctx, nil) + wfctx.Unlock(err) t.NoError(err) t.NotNil(request) } @@ -997,7 +1075,10 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_WorkflowReturnsPanicError() { params.WorkflowPanicPolicy = BlockWorkflow taskHandler := newWorkflowTaskHandler(params, nil, t.registry) - request, _, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil) + wftask := workflowTask{task: task} + wfctx := t.mustWorkflowContextImpl(&wftask, taskHandler) + request, err := taskHandler.ProcessWorkflowTask(&wftask, wfctx, nil) + wfctx.Unlock(err) t.NoError(err) t.NotNil(request) r, ok := request.(*workflowservice.RespondWorkflowTaskCompletedRequest) @@ -1019,7 +1100,10 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_WorkflowPanics() { params.WorkflowPanicPolicy = BlockWorkflow taskHandler := newWorkflowTaskHandler(params, nil, t.registry) - _, _, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil) + wftask := workflowTask{task: task} + wfctx := t.mustWorkflowContextImpl(&wftask, taskHandler) + _, err := taskHandler.ProcessWorkflowTask(&wftask, wfctx, nil) + wfctx.Unlock(err) t.Error(err) _, ok := err.(*workflowPanicError) t.True(ok) @@ -1063,7 +1147,10 @@ func (t *TaskHandlersTestSuite) TestGetWorkflowInfo() { params.WorkflowPanicPolicy = BlockWorkflow taskHandler := newWorkflowTaskHandler(params, nil, t.registry) - request, _, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil) + wftask := workflowTask{task: task} + wfctx := t.mustWorkflowContextImpl(&wftask, taskHandler) + request, err := taskHandler.ProcessWorkflowTask(&wftask, wfctx, nil) + wfctx.Unlock(err) t.NoError(err) t.NotNil(request) r, ok := request.(*workflowservice.RespondWorkflowTaskCompletedRequest) @@ -1097,9 +1184,12 @@ func (t *TaskHandlersTestSuite) TestConsistentQuery_InvalidQueryTask() { task := createWorkflowTask(testEvents, 3, "HelloWorld_Workflow") task.Query = &querypb.WorkflowQuery{} task.Queries = map[string]*querypb.WorkflowQuery{"query_id": {}} - newWorkflowTaskWorkerInternal(taskHandler, t.service, params, make(chan struct{}), nil) + newWorkflowTaskWorkerInternal(taskHandler, taskHandler, t.service, params, make(chan struct{}), nil) // query and queries are both specified so this is an invalid task - request, _, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil) + wftask := workflowTask{task: task} + wfctx := t.mustWorkflowContextImpl(&wftask, taskHandler) + request, err := taskHandler.ProcessWorkflowTask(&wftask, wfctx, nil) + wfctx.Unlock(err) t.Error(err) t.Nil(request) @@ -1139,7 +1229,10 @@ func (t *TaskHandlersTestSuite) TestConsistentQuery_Success() { params := t.getTestWorkerExecutionParams() taskHandler := newWorkflowTaskHandler(params, nil, t.registry) - request, _, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil) + wftask := workflowTask{task: task} + wfctx := t.mustWorkflowContextImpl(&wftask, taskHandler) + request, err := taskHandler.ProcessWorkflowTask(&wftask, wfctx, nil) + wfctx.Unlock(err) response := request.(*workflowservice.RespondWorkflowTaskCompletedRequest) t.NoError(err) t.NotNil(response) @@ -1159,7 +1252,10 @@ func (t *TaskHandlersTestSuite) TestConsistentQuery_Success() { secondTask := createWorkflowTaskWithQueries(testEvents, 3, "QuerySignalWorkflow", queries, false) secondTask.WorkflowExecution.RunId = task.WorkflowExecution.RunId - request, _, err = taskHandler.ProcessWorkflowTask(&workflowTask{task: secondTask}, nil) + wftask = workflowTask{task: secondTask} + wfctx = t.mustWorkflowContextImpl(&wftask, taskHandler) + request, err = taskHandler.ProcessWorkflowTask(&wftask, wfctx, nil) + wfctx.Unlock(err) response = request.(*workflowservice.RespondWorkflowTaskCompletedRequest) t.NoError(err) t.NotNil(response) @@ -1182,10 +1278,12 @@ func (t *TaskHandlersTestSuite) TestConsistentQuery_Success() { } func (t *TaskHandlersTestSuite) assertQueryResultsEqual(expected map[string]*querypb.WorkflowQueryResult, actual map[string]*querypb.WorkflowQueryResult) { + t.T().Helper() t.Equal(len(expected), len(actual)) for expectedID, expectedResult := range expected { t.Contains(actual, expectedID) - t.True(proto.Equal(expectedResult, actual[expectedID])) + t.True(proto.Equal(expectedResult, actual[expectedID]), + "expected %v = %v", expectedResult, actual[expectedID]) } } @@ -1200,7 +1298,10 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_CancelActivityBeforeSent() { params := t.getTestWorkerExecutionParams() taskHandler := newWorkflowTaskHandler(params, nil, t.registry) - request, _, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil) + wftask := workflowTask{task: task} + wfctx := t.mustWorkflowContextImpl(&wftask, taskHandler) + request, err := taskHandler.ProcessWorkflowTask(&wftask, wfctx, nil) + wfctx.Unlock(err) response := request.(*workflowservice.RespondWorkflowTaskCompletedRequest) t.NoError(err) t.NotNil(response) @@ -1232,7 +1333,10 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_PageToken() { }, } taskHandler := newWorkflowTaskHandler(params, nil, t.registry) - request, _, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task, historyIterator: historyIterator}, nil) + wftask := workflowTask{task: task, historyIterator: historyIterator} + wfctx := t.mustWorkflowContextImpl(&wftask, taskHandler) + request, err := taskHandler.ProcessWorkflowTask(&wftask, wfctx, nil) + wfctx.Unlock(err) response := request.(*workflowservice.RespondWorkflowTaskCompletedRequest) t.NoError(err) t.NotNil(response) @@ -1346,9 +1450,10 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_Messages() { }, } taskHandler := newWorkflowTaskHandler(params, nil, t.registry) - request, _, err := taskHandler.ProcessWorkflowTask(&workflowTask{ - task: task, historyIterator: historyIterator, - }, nil) + wftask := workflowTask{task: task, historyIterator: historyIterator} + wfctx := t.mustWorkflowContextImpl(&wftask, taskHandler) + request, err := taskHandler.ProcessWorkflowTask(&wftask, wfctx, nil) + wfctx.Unlock(err) response := request.(*workflowservice.RespondWorkflowTaskCompletedRequest) t.NoError(err) t.NotNil(response) @@ -1426,13 +1531,9 @@ func (t *TaskHandlersTestSuite) TestLocalActivityRetry_Workflow() { laResultCh := make(chan *localActivityResult) laRetryCh := make(chan *localActivityTask) - response, _, err := taskHandler.ProcessWorkflowTask( - &workflowTask{ - task: task, - laResultCh: laResultCh, - laRetryCh: laRetryCh, - }, - nil) + wftask := workflowTask{task: task, laResultCh: laResultCh, laRetryCh: laRetryCh} + wfctx := t.mustWorkflowContextImpl(&wftask, taskHandler) + response, err := taskHandler.ProcessWorkflowTask(&wftask, wfctx, nil) t.NotNil(response) t.NoError(err) asWFTComplete := response.(*workflowservice.RespondWorkflowTaskCompletedRequest) @@ -1511,14 +1612,15 @@ func (t *TaskHandlersTestSuite) TestLocalActivityRetry_WorkflowTaskHeartbeatFail }() laResultCh := make(chan *localActivityResult) - response, _, err := taskHandler.ProcessWorkflowTask( - &workflowTask{ - task: task, - laResultCh: laResultCh, - }, + wftask := workflowTask{task: task, laResultCh: laResultCh} + wfctx := t.mustWorkflowContextImpl(&wftask, taskHandler) + response, err := taskHandler.ProcessWorkflowTask( + &wftask, + wfctx, func(response interface{}, startTime time.Time) (*workflowTask, error) { return nil, serviceerror.NewNotFound("Intentional wft heartbeat error") }) + wfctx.Unlock(err) t.Nil(response) t.Error(err) @@ -2242,7 +2344,7 @@ func TestResetIfDestroyedTaskPrep(t *testing.T) { require.EqualValues(t, 0, cache.Size()) // cache is empty so this should miss and build a new context with a // full history - _, err := weci.wth.getOrCreateWorkflowContext(task, histIter) + _, err := weci.wth.GetOrCreateWorkflowContext(task, histIter) require.NoError(t, err) require.Len(t, task.History.Events, len(fullHist.Events), diff --git a/internal/internal_task_pollers.go b/internal/internal_task_pollers.go index 0f67eae84..2e17ceea1 100644 --- a/internal/internal_task_pollers.go +++ b/internal/internal_task_pollers.go @@ -94,6 +94,7 @@ type ( identity string service workflowservice.WorkflowServiceClient taskHandler WorkflowTaskHandler + contextManager WorkflowContextManager logger log.Logger dataConverter converter.DataConverter failureConverter converter.FailureConverter @@ -265,6 +266,7 @@ func (bp *basePoller) getCapabilities() *workflowservice.GetSystemInfoResponse_C // newWorkflowTaskPoller creates a new workflow task poller which must have a one to one relationship to workflow worker func newWorkflowTaskPoller( taskHandler WorkflowTaskHandler, + contextManager WorkflowContextManager, service workflowservice.WorkflowServiceClient, params workerExecutionParameters, ) *workflowTaskPoller { @@ -281,6 +283,7 @@ func newWorkflowTaskPoller( taskQueueName: params.TaskQueue, identity: params.Identity, taskHandler: taskHandler, + contextManager: contextManager, logger: params.Logger, dataConverter: params.DataConverter, failureConverter: params.FailureConverter, @@ -313,6 +316,8 @@ func (wtp *workflowTaskPoller) ProcessTask(task interface{}) error { switch task := task.(type) { case *workflowTask: return wtp.processWorkflowTask(task) + case *eagerWorkflowTask: + return wtp.processWorkflowTask(wtp.toWorkflowTask(task.task)) default: panic("unknown task type.") } @@ -333,14 +338,22 @@ func (wtp *workflowTaskPoller) processWorkflowTask(task *workflowTask) error { // close doneCh so local activity worker won't get blocked forever when trying to send back result to laResultCh. defer close(doneCh) + wfctx, err := wtp.contextManager.GetOrCreateWorkflowContext(task.task, task.historyIterator) + if err != nil { + return err + } + var taskErr error + defer func() { wfctx.Unlock(taskErr) }() + for { - var response *workflowservice.RespondWorkflowTaskCompletedResponse startTime := time.Now() task.doneCh = doneCh task.laResultCh = laResultCh task.laRetryCh = laRetryCh - completedRequest, resetter, err := wtp.taskHandler.ProcessWorkflowTask( + var completedRequest interface{} + completedRequest, taskErr = wtp.taskHandler.ProcessWorkflowTask( task, + wfctx, func(response interface{}, startTime time.Time) (*workflowTask, error) { wtp.logger.Debug("Force RespondWorkflowTaskCompleted.", "TaskStartedEventID", task.task.GetStartedEventId()) heartbeatResponse, err := wtp.RespondTaskCompletedWithMetrics(response, nil, task.task, startTime) @@ -357,22 +370,22 @@ func (wtp *workflowTaskPoller) processWorkflowTask(task *workflowTask) error { return task, nil }, ) - if completedRequest == nil && err == nil { + if completedRequest == nil && taskErr == nil { return nil } - if _, ok := err.(workflowTaskHeartbeatError); ok { - return err + if _, ok := taskErr.(workflowTaskHeartbeatError); ok { + return taskErr } - response, err = wtp.RespondTaskCompletedWithMetrics(completedRequest, err, task.task, startTime) + response, err := wtp.RespondTaskCompletedWithMetrics(completedRequest, taskErr, task.task, startTime) if err != nil { return err } if eventLevel := response.GetResetHistoryEventId(); eventLevel != 0 { - resetter(eventLevel) + wfctx.SetPreviousStartedEventID(eventLevel) } - if response == nil || response.WorkflowTask == nil { + if response == nil || response.WorkflowTask == nil || taskErr != nil { return nil } @@ -405,7 +418,10 @@ func (wtp *workflowTaskPoller) RespondTaskCompletedWithMetrics( return } -func (wtp *workflowTaskPoller) RespondTaskCompleted(completedRequest interface{}, task *workflowservice.PollWorkflowTaskQueueResponse) (response *workflowservice.RespondWorkflowTaskCompletedResponse, err error) { +func (wtp *workflowTaskPoller) RespondTaskCompleted( + completedRequest interface{}, + task *workflowservice.PollWorkflowTaskQueueResponse, +) (response *workflowservice.RespondWorkflowTaskCompletedResponse, err error) { ctx := context.Background() // Respond task completion. grpcCtx, cancel := newGRPCContext(ctx, grpcMetricsHandler( @@ -562,18 +578,8 @@ func (lath *localActivityTaskHandler) executeLocalActivityTask(task *localActivi return &localActivityResult{task: task, err: err} } - timeout := task.params.ScheduleToCloseTimeout - if task.params.StartToCloseTimeout != 0 && task.params.StartToCloseTimeout < timeout { - timeout = task.params.StartToCloseTimeout - } - timeoutDuration := timeout - deadline := time.Now().Add(timeoutDuration) - if task.attempt > 1 && !task.expireTime.IsZero() && task.expireTime.Before(deadline) { - // this is attempt and expire time is before SCHEDULE_TO_CLOSE timeout - deadline = task.expireTime - } - - ctx, cancel := context.WithDeadline(ctx, deadline) + info := getActivityEnv(ctx) + ctx, cancel := context.WithDeadline(ctx, info.deadline) defer cancel() task.Lock() @@ -615,13 +621,14 @@ func (lath *localActivityTaskHandler) executeLocalActivityTask(task *localActivi laResult, err = ae.ExecuteWithActualArgs(ctx, task.params.InputArgs) executionLatency := time.Since(laStartTime) metricsHandler.Timer(metrics.LocalActivityExecutionLatency).Record(executionLatency) - if executionLatency > timeoutDuration { + if time.Now().After(info.deadline) { // If local activity takes longer than expected timeout, the context would already be DeadlineExceeded and // the result would be discarded. Print a warning in this case. lath.logger.Warn("LocalActivity takes too long to complete.", "LocalActivityID", task.activityID, "LocalActivityType", activityType, "ScheduleToCloseTimeout", task.params.ScheduleToCloseTimeout, + "StartToCloseTimeout", task.params.StartToCloseTimeout, "ActualExecutionDuration", executionLatency) } }(doneCh) @@ -642,7 +649,11 @@ WaitResult: metricsHandler.Counter(metrics.LocalActivityExecutionCanceledCounter).Inc(1) return &localActivityResult{err: ErrCanceled, task: task} } else if ctx.Err() == context.DeadlineExceeded { - return &localActivityResult{err: ErrDeadlineExceeded, task: task} + if task.params.ScheduleToCloseTimeout != 0 && time.Now().After(info.scheduledTime.Add(task.params.ScheduleToCloseTimeout)) { + return &localActivityResult{err: ErrDeadlineExceeded, task: task} + } else { + return &localActivityResult{err: NewTimeoutError("deadline exceeded", enumspb.TIMEOUT_TYPE_START_TO_CLOSE, nil), task: task} + } } else { // should not happen return &localActivityResult{err: NewApplicationError("unexpected context done", "", true, nil), task: task} diff --git a/internal/internal_task_pollers_test.go b/internal/internal_task_pollers_test.go new file mode 100644 index 000000000..21cca98ab --- /dev/null +++ b/internal/internal_task_pollers_test.go @@ -0,0 +1,163 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package internal + +import ( + "context" + "encoding/binary" + "sync/atomic" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" + commonpb "go.temporal.io/api/common/v1" + historypb "go.temporal.io/api/history/v1" + taskqueuepb "go.temporal.io/api/taskqueue/v1" + "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/api/workflowservicemock/v1" + "google.golang.org/grpc" +) + +type countingTaskHandler struct { + WorkflowTaskHandler + ProcessWorkflowTaskInvocationCount atomic.Uint32 +} + +func (wth *countingTaskHandler) ProcessWorkflowTask( + task *workflowTask, + wfctx *workflowExecutionContextImpl, + hb workflowTaskHeartbeatFunc, +) (interface{}, error) { + wth.ProcessWorkflowTaskInvocationCount.Add(1) + return wth.WorkflowTaskHandler.ProcessWorkflowTask(task, wfctx, hb) +} + +func TestWFTRacePrevention(t *testing.T) { + params := workerExecutionParameters{cache: NewWorkerCache()} + ensureRequiredParams(¶ms) + var ( + taskQueue = taskqueuepb.TaskQueue{Name: t.Name() + "task-queue"} + startedAttrs = historypb.WorkflowExecutionStartedEventAttributes{ + TaskQueue: &taskQueue, + } + startedEvent = createTestEventWorkflowExecutionStarted(1, &startedAttrs) + history = historypb.History{Events: []*historypb.HistoryEvent{startedEvent}} + runID = t.Name() + "-run-id" + wfID = t.Name() + "-workflow-id" + wfe = commonpb.WorkflowExecution{RunId: runID, WorkflowId: wfID} + wfType = commonpb.WorkflowType{Name: t.Name() + "-workflow-type"} + ctrl = gomock.NewController(t) + client = workflowservicemock.NewMockWorkflowServiceClient(ctrl) + resultsChan = make(chan error, 2) + innerTaskHandler = newWorkflowTaskHandler(params, nil, newRegistry()) + taskHandler = &countingTaskHandler{WorkflowTaskHandler: innerTaskHandler} + contextManager = taskHandler + codec = binary.LittleEndian + completionChans = []chan struct{}{make(chan struct{}), make(chan struct{})} + pollResp0 = workflowservice.PollWorkflowTaskQueueResponse{ + Attempt: 1, + WorkflowExecution: &wfe, + WorkflowType: &wfType, + History: &history, + // encode the task pseudo-ID into the token; 0 here and 1 for + // pollResp1 below. The mock will use this as an index into + // `completionChans` (above) to get a task-specific control channel. + TaskToken: codec.AppendUint32(nil, 0), + } + pollResp1 = workflowservice.PollWorkflowTaskQueueResponse{ + Attempt: 1, + WorkflowExecution: &wfe, + WorkflowType: &wfType, + History: &history, + TaskToken: codec.AppendUint32(nil, 1), + } + task0 = workflowTask{task: &pollResp0} + task1 = workflowTask{task: &pollResp1} + + // used as a testify condition + tryWrite = func(ch chan struct{}) func() bool { + return func() bool { + select { + case ch <- struct{}{}: + return true + default: + return false + } + } + } + ) + + t.Log("Didn't register any workflows so expect both future WFTs to " + + "end up calling RespondWorkflowTaskFailed") + client.EXPECT().RespondWorkflowTaskFailed(gomock.Any(), gomock.Any()). + Times(2). + DoAndReturn(func( + _ context.Context, + req *workflowservice.RespondWorkflowTaskFailedRequest, + _ ...grpc.CallOption, + ) (*workflowservice.RespondWorkflowTaskFailedResponse, error) { + // find the appropriate channel for this task - the index is encoded + // into the TaskToken + ch := completionChans[int(codec.Uint32(req.TaskToken))] + <-ch + // these two reads ^v allow the test code to capture a task processing + // goroutine exactly here + <-ch + return &workflowservice.RespondWorkflowTaskFailedResponse{}, nil + }) + + poller := newWorkflowTaskPoller(taskHandler, contextManager, client, params) + + t.Log("Issue task0") + go func() { resultsChan <- poller.processWorkflowTask(&task0) }() + + completionChans[0] <- struct{}{} + require.EqualValues(t, 1, taskHandler.ProcessWorkflowTaskInvocationCount.Load(), + "TaskHandler.ProcessWorkflowTask should have been called once") + t.Log("task0 has called TaskHandler.ProcessWorkflowTask and is blocked " + + "in the mock RespondWorkflowTaskFailed") + + t.Log("Issue task1") + go func() { resultsChan <- poller.processWorkflowTask(&task1) }() + + require.Never(t, tryWrite(completionChans[1]), 2*time.Second, 100*time.Millisecond, + "Should be no reader on the task1's completion channel as task0 holds the ctx lock") + require.EqualValues(t, 1, taskHandler.ProcessWorkflowTaskInvocationCount.Load(), + "TaskHandler.ProcessWorkflowTask should only have been called once") + + t.Log("Unblock task0 allowing poller.processWorkflowTask to return") + close(completionChans[0]) + require.NoError(t, <-resultsChan) + + t.Log("task1 should now proceed and block in the mock RespondWorkflowTaskFailed") + completionChans[1] <- struct{}{} + require.EqualValues(t, 2, taskHandler.ProcessWorkflowTaskInvocationCount.Load(), + "TaskHandler.ProcessWorkflowTask should have been called twice") + + t.Log("Unblock task1 allowing poller.processWorkflowTask to return") + close(completionChans[1]) + require.NoError(t, <-resultsChan) +} diff --git a/internal/internal_worker.go b/internal/internal_worker.go index 7b7f29e91..6a5431afb 100644 --- a/internal/internal_worker.go +++ b/internal/internal_worker.go @@ -290,18 +290,19 @@ func newWorkflowWorkerInternal(service workflowservice.WorkflowServiceClient, pa } else { taskHandler = newWorkflowTaskHandler(params, ppMgr, registry) } - return newWorkflowTaskWorkerInternal(taskHandler, service, params, workerStopChannel, registry.interceptors) + return newWorkflowTaskWorkerInternal(taskHandler, taskHandler, service, params, workerStopChannel, registry.interceptors) } func newWorkflowTaskWorkerInternal( taskHandler WorkflowTaskHandler, + contextManager WorkflowContextManager, service workflowservice.WorkflowServiceClient, params workerExecutionParameters, stopC chan struct{}, interceptors []WorkerInterceptor, ) *workflowWorker { ensureRequiredParams(¶ms) - poller := newWorkflowTaskPoller(taskHandler, service, params) + poller := newWorkflowTaskPoller(taskHandler, contextManager, service, params) worker := newBaseWorker(baseWorkerOptions{ pollerCount: params.MaxConcurrentWorkflowTaskQueuePollers, pollerRate: defaultPollerRate, @@ -1367,7 +1368,12 @@ func (aw *WorkflowReplayer) replayWorkflowHistory(logger log.Logger, service wor }, } taskHandler := newWorkflowTaskHandler(params, nil, aw.registry) - resp, _, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task, historyIterator: iterator}, nil) + wfctx, err := taskHandler.GetOrCreateWorkflowContext(task, iterator) + defer wfctx.Unlock(err) + if err != nil { + return err + } + resp, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task, historyIterator: iterator}, wfctx, nil) if err != nil { return err } @@ -1562,6 +1568,9 @@ func NewAggregatedWorker(client *WorkflowClient, taskQueue string, options Worke } else { workflowWorker = newWorkflowWorker(client.workflowService, workerParams, nil, registry) } + if client.eagerDispatcher != nil { + client.eagerDispatcher.registerWorker(workflowWorker) + } } // activity types. @@ -1569,7 +1578,7 @@ func NewAggregatedWorker(client *WorkflowClient, taskQueue string, options Worke if !options.LocalActivityWorkerOnly { activityWorker = newActivityWorker(client.workflowService, workerParams, nil, registry, nil) // Set the activity worker on the eager executor - workerParams.eagerActivityExecutor.activityWorker = activityWorker + workerParams.eagerActivityExecutor.activityWorker = activityWorker.worker } var sessionWorker *sessionWorker diff --git a/internal/internal_worker_base.go b/internal/internal_worker_base.go index 1b7dfbbb0..276c416da 100644 --- a/internal/internal_worker_base.go +++ b/internal/internal_worker_base.go @@ -194,7 +194,7 @@ type ( } ) -// SetRetryLongPollGracePeriod sets the amount of time a long poller retrys on +// SetRetryLongPollGracePeriod sets the amount of time a long poller retries on // fatal errors before it actually fails. For test use only, // not safe to call with a running worker. func SetRetryLongPollGracePeriod(period time.Duration) { @@ -310,6 +310,36 @@ func (bw *baseWorker) runPoller() { } } +func (bw *baseWorker) tryReserveSlot() bool { + if !bw.isWorkerStarted || bw.isStop() { + return false + } + // Reserve a executor slot via a non-blocking attempt to take a poller + // request entry which essentially reserves a slot + select { + case <-bw.pollerRequestCh: + return true + default: + return false + } +} + +func (bw *baseWorker) releaseSlot() { + // Like other sends to this channel, we assume there is room because we + // reserved it, so we make a blocking send. + bw.pollerRequestCh <- struct{}{} +} + +func (bw *baseWorker) processTaskAsync(task interface{}, callback func()) { + bw.stopWG.Add(1) + go func() { + if callback != nil { + defer callback() + } + bw.processTask(task) + }() +} + func (bw *baseWorker) runTaskDispatcher() { defer bw.stopWG.Done() @@ -323,15 +353,14 @@ func (bw *baseWorker) runTaskDispatcher() { case <-bw.stopCh: return case task := <-bw.taskQueueCh: - // for non-polled-task (local activity result as task), we don't need to rate limit + // for non-polled-task (local activity result as task or eager task), we don't need to rate limit _, isPolledTask := task.(*polledTask) if isPolledTask && bw.taskLimiter.Wait(bw.limiterContext) != nil { if bw.isStop() { return } } - bw.stopWG.Add(1) - go bw.processTask(task) + bw.processTaskAsync(task, nil) } } } diff --git a/internal/internal_worker_test.go b/internal/internal_worker_test.go index 215d3d709..9058891b6 100644 --- a/internal/internal_worker_test.go +++ b/internal/internal_worker_test.go @@ -1623,7 +1623,10 @@ func (s *internalWorkerTestSuite) testWorkflowTaskHandlerHelper(params workerExe } r := newWorkflowTaskHandler(params, nil, s.registry) - _, _, err := r.ProcessWorkflowTask(&workflowTask{task: task}, nil) + wfctx, err := r.GetOrCreateWorkflowContext(task, nil) + s.NoError(err) + _, err = r.ProcessWorkflowTask(&workflowTask{task: task}, wfctx, nil) + wfctx.Unlock(err) s.NoError(err) } diff --git a/internal/internal_workflow_client.go b/internal/internal_workflow_client.go index 776d0b23a..3c41c7cbf 100644 --- a/internal/internal_workflow_client.go +++ b/internal/internal_workflow_client.go @@ -92,6 +92,7 @@ type ( excludeInternalFromRetry *uberatomic.Bool capabilities *workflowservice.GetSystemInfoResponse_Capabilities capabilitiesLock sync.RWMutex + eagerDispatcher *eagerWorkflowDispatcher // The pointer value is shared across multiple clients. If non-nil, only // access/mutate atomically. @@ -1443,7 +1444,9 @@ func serializeSearchAttributes(input map[string]interface{}) (*commonpb.SearchAt return &commonpb.SearchAttributes{IndexedFields: attr}, nil } -type workflowClientInterceptor struct{ client *WorkflowClient } +type workflowClientInterceptor struct { + client *WorkflowClient +} func (w *workflowClientInterceptor) ExecuteWorkflow( ctx context.Context, @@ -1506,6 +1509,11 @@ func (w *workflowClientInterceptor) ExecuteWorkflow( Header: header, } + var eagerExecutor *eagerWorkflowExecutor + if in.Options.EnableEagerStart && w.client.capabilities.GetEagerWorkflowStart() && w.client.eagerDispatcher != nil { + eagerExecutor = w.client.eagerDispatcher.applyToRequest(startRequest) + } + var response *workflowservice.StartWorkflowExecutionResponse grpcCtx, cancel := newGRPCContext(ctx, grpcMetricsHandler( @@ -1514,7 +1522,12 @@ func (w *workflowClientInterceptor) ExecuteWorkflow( defer cancel() response, err = w.client.workflowService.StartWorkflowExecution(grpcCtx, startRequest) - + eagerWorkflowTask := response.GetEagerWorkflowTask() + if eagerWorkflowTask != nil && eagerExecutor != nil { + eagerExecutor.handleResponse(eagerWorkflowTask) + } else if eagerExecutor != nil { + eagerExecutor.release() + } // Allow already-started error var runID string if e, ok := err.(*serviceerror.WorkflowExecutionAlreadyStarted); ok && !in.Options.WorkflowExecutionErrorWhenAlreadyStarted { diff --git a/internal/internal_workflow_client_test.go b/internal/internal_workflow_client_test.go index c1f2027fb..ec02ee56a 100644 --- a/internal/internal_workflow_client_test.go +++ b/internal/internal_workflow_client_test.go @@ -1115,6 +1115,201 @@ func (s *workflowClientTestSuite) TestStartWorkflow() { s.Equal(createResponse.GetRunId(), resp.GetRunID()) } +func (s *workflowClientTestSuite) TestEagerStartWorkflowNotSupported() { + client, ok := s.client.(*WorkflowClient) + client.capabilities = &workflowservice.GetSystemInfoResponse_Capabilities{ + EagerWorkflowStart: false, + } + + var processTask bool + var releaseSlot bool + client.eagerDispatcher = &eagerWorkflowDispatcher{ + workersByTaskQueue: map[string][]eagerWorker{ + taskqueue: { + &eagerWorkerMock{ + tryReserveSlotCallback: func() bool { return true }, + releaseSlotCallback: func() { + releaseSlot = true + }, + processTaskAsyncCallback: func(task interface{}, callback func()) { + processTask = true + callback() + }, + }, + }, + }, + } + s.True(ok) + options := StartWorkflowOptions{ + ID: workflowID, + TaskQueue: taskqueue, + WorkflowExecutionTimeout: timeoutInSeconds, + WorkflowTaskTimeout: timeoutInSeconds, + EnableEagerStart: true, + } + f1 := func(ctx Context, r []byte) string { + panic("this is just a stub") + } + + createResponse := &workflowservice.StartWorkflowExecutionResponse{ + RunId: runID, + EagerWorkflowTask: &workflowservice.PollWorkflowTaskQueueResponse{}, + } + s.service.EXPECT().StartWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any()).Return(createResponse, nil) + + resp, err := client.ExecuteWorkflow(context.Background(), options, f1, []byte("test")) + s.Equal(converter.GetDefaultDataConverter(), client.dataConverter) + s.Nil(err) + s.Equal(createResponse.GetRunId(), resp.GetRunID()) + s.False(processTask) + s.False(releaseSlot) +} + +func (s *workflowClientTestSuite) TestEagerStartWorkflowNoWorker() { + client, ok := s.client.(*WorkflowClient) + client.capabilities = &workflowservice.GetSystemInfoResponse_Capabilities{ + EagerWorkflowStart: false, + } + + var processTask bool + var releaseSlot bool + client.eagerDispatcher = &eagerWorkflowDispatcher{ + workersByTaskQueue: map[string][]eagerWorker{ + taskqueue: { + &eagerWorkerMock{ + tryReserveSlotCallback: func() bool { return false }, + releaseSlotCallback: func() { + releaseSlot = true + }, + processTaskAsyncCallback: func(task interface{}, callback func()) { + processTask = true + callback() + }, + }, + }, + }, + } + s.True(ok) + options := StartWorkflowOptions{ + ID: workflowID, + TaskQueue: taskqueue, + WorkflowExecutionTimeout: timeoutInSeconds, + WorkflowTaskTimeout: timeoutInSeconds, + EnableEagerStart: true, + } + f1 := func(ctx Context, r []byte) string { + panic("this is just a stub") + } + + createResponse := &workflowservice.StartWorkflowExecutionResponse{ + RunId: runID, + EagerWorkflowTask: &workflowservice.PollWorkflowTaskQueueResponse{}, + } + s.service.EXPECT().StartWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any()).Return(createResponse, nil) + + resp, err := client.ExecuteWorkflow(context.Background(), options, f1, []byte("test")) + s.Equal(converter.GetDefaultDataConverter(), client.dataConverter) + s.Nil(err) + s.Equal(createResponse.GetRunId(), resp.GetRunID()) + s.False(processTask) + s.False(releaseSlot) +} + +func (s *workflowClientTestSuite) TestEagerStartWorkflow() { + client, ok := s.client.(*WorkflowClient) + client.capabilities = &workflowservice.GetSystemInfoResponse_Capabilities{ + EagerWorkflowStart: true, + } + + var processTask bool + var releaseSlot bool + client.eagerDispatcher = &eagerWorkflowDispatcher{ + workersByTaskQueue: map[string][]eagerWorker{ + taskqueue: { + &eagerWorkerMock{ + tryReserveSlotCallback: func() bool { return true }, + releaseSlotCallback: func() { + releaseSlot = true + }, + processTaskAsyncCallback: func(task interface{}, callback func()) { + processTask = true + callback() + }, + }, + }, + }, + } + s.True(ok) + options := StartWorkflowOptions{ + ID: workflowID, + TaskQueue: taskqueue, + WorkflowExecutionTimeout: timeoutInSeconds, + WorkflowTaskTimeout: timeoutInSeconds, + EnableEagerStart: true, + } + f1 := func(ctx Context, r []byte) string { + panic("this is just a stub") + } + + createResponse := &workflowservice.StartWorkflowExecutionResponse{ + RunId: runID, + EagerWorkflowTask: &workflowservice.PollWorkflowTaskQueueResponse{}, + } + s.service.EXPECT().StartWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any()).Return(createResponse, nil) + + resp, err := client.ExecuteWorkflow(context.Background(), options, f1, []byte("test")) + s.Equal(converter.GetDefaultDataConverter(), client.dataConverter) + s.Nil(err) + s.Equal(createResponse.GetRunId(), resp.GetRunID()) + s.True(processTask) + s.True(releaseSlot) +} + +func (s *workflowClientTestSuite) TestEagerStartWorkflowStartRequestFail() { + client, ok := s.client.(*WorkflowClient) + client.capabilities = &workflowservice.GetSystemInfoResponse_Capabilities{ + EagerWorkflowStart: true, + } + + var processTask bool + var releaseSlot bool + client.eagerDispatcher = &eagerWorkflowDispatcher{ + workersByTaskQueue: map[string][]eagerWorker{ + taskqueue: { + &eagerWorkerMock{ + tryReserveSlotCallback: func() bool { return true }, + releaseSlotCallback: func() { + releaseSlot = true + }, + processTaskAsyncCallback: func(task interface{}, callback func()) { + processTask = true + callback() + }, + }, + }, + }, + } + s.True(ok) + options := StartWorkflowOptions{ + ID: workflowID, + TaskQueue: taskqueue, + WorkflowExecutionTimeout: timeoutInSeconds, + WorkflowTaskTimeout: timeoutInSeconds, + EnableEagerStart: true, + } + f1 := func(ctx Context, r []byte) string { + panic("this is just a stub") + } + + s.service.EXPECT().StartWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, errors.New("failed request")) + + resp, err := client.ExecuteWorkflow(context.Background(), options, f1, []byte("test")) + s.Nil(resp) + s.Error(err) + s.False(processTask) + s.True(releaseSlot) +} + func (s *workflowClientTestSuite) TestExecuteWorkflowWithDataConverter() { dc := iconverter.NewTestDataConverter() s.client = NewServiceClient(s.service, nil, ClientOptions{DataConverter: dc}) diff --git a/internal/internal_workflow_testsuite.go b/internal/internal_workflow_testsuite.go index 56102597b..e5104581f 100644 --- a/internal/internal_workflow_testsuite.go +++ b/internal/internal_workflow_testsuite.go @@ -634,7 +634,8 @@ func (env *testWorkflowEnvironmentImpl) executeLocalActivity( params: ¶ms, callback: func(lar *LocalActivityResultWrapper) { }, - attempt: 1, + attempt: 1, + scheduledTime: time.Now(), } taskHandler := localActivityTaskHandler{ userContext: env.workerOptions.BackgroundActivityContext, diff --git a/internal/version.go b/internal/version.go index e03e53b0d..471c86e03 100644 --- a/internal/version.go +++ b/internal/version.go @@ -30,7 +30,7 @@ package internal const ( // SDKVersion is a semver (https://semver.org/) that represents the version of this Temporal GoSDK. // Server validates if SDKVersion fits its supported range and rejects request if it doesn't. - SDKVersion = "1.23.1" + SDKVersion = "1.24.0" // SupportedServerVersions is a semver rages (https://github.com/blang/semver#ranges) of server versions that // are supported by this Temporal SDK. diff --git a/test/activity_test.go b/test/activity_test.go index 5f902c736..eba9ef25b 100644 --- a/test/activity_test.go +++ b/test/activity_test.go @@ -71,11 +71,37 @@ func (a *Activities) Sleep(_ context.Context, delay time.Duration) error { return nil } +func (a *Activities) SleepN(ctx context.Context, delay time.Duration, times int) (int32, error) { + a.append("sleepN") + if activity.GetInfo(ctx).Attempt >= int32(times) { + return activity.GetInfo(ctx).Attempt, nil + } + time.Sleep(delay) + return activity.GetInfo(ctx).Attempt, nil +} + func LocalSleep(_ context.Context, delay time.Duration) error { time.Sleep(delay) return nil } +func (a *Activities) ActivityToBeCanceled(ctx context.Context) (string, error) { + a.append("ActivityToBeCanceled") + for { + select { + case <-time.After(1 * time.Second): + activity.RecordHeartbeat(ctx, "") + case <-ctx.Done(): + return "I am canceled by Done", nil + } + } +} + +func (a *Activities) EmptyActivity(ctx context.Context) error { + a.append("EmptyActivity") + return nil +} + func (a *Activities) HeartbeatAndSleep(ctx context.Context, seq int, delay time.Duration) (int, error) { a.append("heartbeatAndSleep") activity.GetLogger(ctx).Info("Running HeartbeatAndSleep activity") @@ -169,6 +195,15 @@ func (a *Activities) InspectActivityInfo(ctx context.Context, namespace, taskQue if info.TaskQueue != taskQueue { return fmt.Errorf("expected taskQueue %v but got %v", taskQueue, info.TaskQueue) } + if info.Deadline.IsZero() { + return errors.New("expected non zero deadline") + } + if info.StartedTime.IsZero() { + return errors.New("expected non zero started time") + } + if info.ScheduledTime.IsZero() { + return errors.New("expected non zero scheduled time") + } if info.IsLocalActivity != isLocalActivity { return fmt.Errorf("expected IsLocalActivity %v but got %v", isLocalActivity, info.IsLocalActivity) } diff --git a/test/integration_test.go b/test/integration_test.go index e171231a3..0f79d0770 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -1142,6 +1142,26 @@ func (ts *IntegrationTestSuite) TestWorkflowWithParallelSideEffects() { ts.NoError(ts.executeWorkflow("test-wf-parallel-side-effects", ts.workflows.WorkflowWithParallelSideEffects, nil)) } +func (ts *IntegrationTestSuite) TestWorkflowWithLocalActivityStartToClose() { + ts.NoError(ts.executeWorkflow("test-wf-la-start-to-close", ts.workflows.WorkflowWithLocalActivityStartToCloseTimeout, nil)) +} + +func (ts *IntegrationTestSuite) TestActivityTimeoutsWorkflow() { + ts.NoError(ts.executeWorkflow("test-activity-timeout-workflow", ts.workflows.ActivityTimeoutsWorkflow, nil, workflow.ActivityOptions{ + ScheduleToCloseTimeout: 5 * time.Second, + })) + + ts.NoError(ts.executeWorkflow("test-activity-timeout-workflow", ts.workflows.ActivityTimeoutsWorkflow, nil, workflow.ActivityOptions{ + StartToCloseTimeout: 5 * time.Second, + })) + + ts.Error(ts.executeWorkflow("test-activity-timeout-workflow", ts.workflows.ActivityTimeoutsWorkflow, nil, workflow.ActivityOptions{})) + ts.Error(ts.executeWorkflow("test-activity-timeout-workflow", ts.workflows.ActivityTimeoutsWorkflow, nil, workflow.ActivityOptions{ + ScheduleToStartTimeout: 5 * time.Second, + })) + +} + func (ts *IntegrationTestSuite) TestWorkflowWithParallelSideEffectsUsingReplay() { replayer := worker.NewWorkflowReplayer() replayer.RegisterWorkflowWithOptions(ts.workflows.WorkflowWithParallelSideEffects, workflow.RegisterOptions{DisableAlreadyRegisteredCheck: true}) @@ -1189,6 +1209,18 @@ func (ts *IntegrationTestSuite) TestMutatingUpdateValidator() { ts.Nil(ts.client.CancelWorkflow(ctx, "test-mutating-update-validator", "")) } +func (ts *IntegrationTestSuite) TestWaitForCancelWithDisconnectedContext() { + ctx := context.Background() + run, err := ts.client.ExecuteWorkflow(ctx, + ts.startWorkflowOptions("test-wait-for-cancel-with-disconnected-contex"), ts.workflows.WaitForCancelWithDisconnectedContextWorkflow) + ts.Nil(err) + + ts.waitForQueryTrue(run, "timer-created", 1) + + ts.Nil(ts.client.CancelWorkflow(ctx, run.GetID(), run.GetRunID())) + ts.Nil(run.Get(ctx, nil)) +} + func (ts *IntegrationTestSuite) TestMutatingSideEffect() { ctx := context.Background() err := ts.executeWorkflowWithContextAndOption(ctx, ts.startWorkflowOptions("test-mutating-side-effect"), ts.workflows.MutatingSideEffectWorkflow, nil) @@ -3989,6 +4021,7 @@ func (ts *IntegrationTestSuite) startWorkflowOptions(wfID string) client.StartWo WorkflowExecutionTimeout: 15 * time.Second, WorkflowTaskTimeout: time.Second, WorkflowIDReusePolicy: enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE, + EnableEagerStart: true, } if wfID == CronWorkflowID { wfOptions.CronSchedule = "@every 1s" diff --git a/test/replaytests/replay-tests-cancel-order-timer-resolved.json b/test/replaytests/replay-tests-cancel-order-timer-resolved.json new file mode 100644 index 000000000..1f407e370 --- /dev/null +++ b/test/replaytests/replay-tests-cancel-order-timer-resolved.json @@ -0,0 +1,281 @@ +{ + "events": [ + { + "eventId": "1", + "eventTime": "2023-07-27T13:17:07.829982170Z", + "eventType": "WorkflowExecutionStarted", + "taskId": "1160915", + "workflowExecutionStartedEventAttributes": { + "workflowType": { + "name": "CancelOrderSelectWorkflow" + }, + "taskQueue": { + "name": "replay-test", + "kind": "Normal" + }, + "workflowExecutionTimeout": "0s", + "workflowRunTimeout": "0s", + "workflowTaskTimeout": "10s", + "originalExecutionRunId": "981183a2-7b46-44de-8f70-189b8c7aa8c1", + "identity": "99286@Quinn-Klassens-MacBook-Pro.local@", + "firstExecutionRunId": "981183a2-7b46-44de-8f70-189b8c7aa8c1", + "attempt": 1, + "firstWorkflowTaskBackoff": "0s", + "header": { + + } + } + }, + { + "eventId": "2", + "eventTime": "2023-07-27T13:17:07.830006003Z", + "eventType": "WorkflowTaskScheduled", + "taskId": "1160916", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "replay-test", + "kind": "Normal" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "3", + "eventTime": "2023-07-27T13:17:07.844309045Z", + "eventType": "WorkflowTaskStarted", + "taskId": "1160923", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "2", + "identity": "99286@Quinn-Klassens-MacBook-Pro.local@", + "requestId": "1bcfea99-c161-40b5-8a3c-2d698594994a", + "historySizeBytes": "572" + } + }, + { + "eventId": "4", + "eventTime": "2023-07-27T13:17:07.855022837Z", + "eventType": "WorkflowTaskCompleted", + "taskId": "1160927", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "2", + "startedEventId": "3", + "identity": "99286@Quinn-Klassens-MacBook-Pro.local@", + "workerVersioningId": { + "workerBuildId": "a6293ecc93d55fa71eb0d9687e852156" + }, + "sdkMetadata": { + "langUsedFlags": [ + 3 + ] + }, + "meteringMetadata": { + + } + } + }, + { + "eventId": "5", + "eventTime": "2023-07-27T13:17:07.855038212Z", + "eventType": "TimerStarted", + "taskId": "1160928", + "timerStartedEventAttributes": { + "timerId": "5", + "startToFireTimeout": "300s", + "workflowTaskCompletedEventId": "4" + } + }, + { + "eventId": "6", + "eventTime": "2023-07-27T13:17:09.854134879Z", + "eventType": "WorkflowExecutionCancelRequested", + "taskId": "1160932", + "workflowExecutionCancelRequestedEventAttributes": { + "identity": "99286@Quinn-Klassens-MacBook-Pro.local@" + } + }, + { + "eventId": "7", + "eventTime": "2023-03-09T07:03:19.936924001Z", + "eventType": "TimerFired", + "taskId": "1054628", + "timerFiredEventAttributes": { + "timerId": "5", + "startedEventId": "5" + } + }, + { + "eventId": "8", + "eventTime": "2023-07-27T13:17:09.854143713Z", + "eventType": "WorkflowTaskScheduled", + "taskId": "1160933", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "Quinn-Klassens-MacBook-Pro.local:c3129203-345c-45c1-ae50-ae9efd5f4c66", + "kind": "Sticky" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "9", + "eventTime": "2023-07-27T13:17:09.867791379Z", + "eventType": "WorkflowTaskStarted", + "taskId": "1160937", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "8", + "identity": "99286@Quinn-Klassens-MacBook-Pro.local@", + "requestId": "9d569699-7b95-4074-bf5f-0574e85f84a3", + "historySizeBytes": "1021" + } + }, + { + "eventId": "10", + "eventTime": "2023-07-27T13:17:09.879198421Z", + "eventType": "WorkflowTaskCompleted", + "taskId": "1160942", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "8", + "startedEventId": "9", + "identity": "99286@Quinn-Klassens-MacBook-Pro.local@", + "workerVersioningId": { + "workerBuildId": "a6293ecc93d55fa71eb0d9687e852156" + }, + "sdkMetadata": { + + }, + "meteringMetadata": { + + } + } + }, + { + "eventId": "11", + "eventTime": "2023-07-27T13:17:09.879409629Z", + "eventType": "ActivityTaskScheduled", + "taskId": "1160944", + "activityTaskScheduledEventAttributes": { + "activityId": "11", + "activityType": { + "name": "helloworldActivity" + }, + "taskQueue": { + "name": "replay-test", + "kind": "Normal" + }, + "header": { + + }, + "input": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IndvcmxkIg==" + } + ] + }, + "scheduleToCloseTimeout": "0s", + "scheduleToStartTimeout": "60s", + "startToCloseTimeout": "60s", + "heartbeatTimeout": "20s", + "workflowTaskCompletedEventId": "9", + "retryPolicy": { + "initialInterval": "1s", + "backoffCoefficient": 2, + "maximumInterval": "100s" + } + } + }, + { + "eventId": "12", + "eventTime": "2023-07-27T13:17:09.879424171Z", + "eventType": "ActivityTaskStarted", + "taskId": "1160948", + "activityTaskStartedEventAttributes": { + "scheduledEventId": "11", + "identity": "99286@Quinn-Klassens-MacBook-Pro.local@", + "requestId": "358dbaf6-0b82-47bc-86e8-d46ad928db73", + "attempt": 1 + } + }, + { + "eventId": "13", + "eventTime": "2023-07-27T13:17:09.886071254Z", + "eventType": "ActivityTaskCompleted", + "taskId": "1160949", + "activityTaskCompletedEventAttributes": { + "result": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IkhlbGxvIHdvcmxkISI=" + } + ] + }, + "scheduledEventId": "11", + "startedEventId": "12", + "identity": "99286@Quinn-Klassens-MacBook-Pro.local@" + } + }, + { + "eventId": "14", + "eventTime": "2023-07-27T13:17:09.886075379Z", + "eventType": "WorkflowTaskScheduled", + "taskId": "1160950", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "Quinn-Klassens-MacBook-Pro.local:c3129203-345c-45c1-ae50-ae9efd5f4c66", + "kind": "Sticky" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "15", + "eventTime": "2023-07-27T13:17:09.894641588Z", + "eventType": "WorkflowTaskStarted", + "taskId": "1160954", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "14", + "identity": "99286@Quinn-Klassens-MacBook-Pro.local@", + "requestId": "f3805eb0-f98e-47b8-b1b1-128868619b4f", + "historySizeBytes": "1809" + } + }, + { + "eventId": "16", + "eventTime": "2023-07-27T13:17:09.905497754Z", + "eventType": "WorkflowTaskCompleted", + "taskId": "1160958", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "14", + "startedEventId": "15", + "identity": "99286@Quinn-Klassens-MacBook-Pro.local@", + "workerVersioningId": { + "workerBuildId": "a6293ecc93d55fa71eb0d9687e852156" + }, + "sdkMetadata": { + + }, + "meteringMetadata": { + + } + } + }, + { + "eventId": "17", + "eventTime": "2023-07-27T13:17:09.905521629Z", + "eventType": "WorkflowExecutionCompleted", + "taskId": "1160959", + "workflowExecutionCompletedEventAttributes": { + "workflowTaskCompletedEventId": "16" + } + } + ] +} \ No newline at end of file diff --git a/test/replaytests/replay-tests-cancel-order.json b/test/replaytests/replay-tests-cancel-order.json new file mode 100644 index 000000000..1b5f174f3 --- /dev/null +++ b/test/replaytests/replay-tests-cancel-order.json @@ -0,0 +1,283 @@ +{ + "events": [ + { + "eventId": "1", + "eventTime": "2023-07-27T13:17:07.829982170Z", + "eventType": "WorkflowExecutionStarted", + "taskId": "1160915", + "workflowExecutionStartedEventAttributes": { + "workflowType": { + "name": "CancelOrderSelectWorkflow" + }, + "taskQueue": { + "name": "replay-test", + "kind": "Normal" + }, + "workflowExecutionTimeout": "0s", + "workflowRunTimeout": "0s", + "workflowTaskTimeout": "10s", + "originalExecutionRunId": "981183a2-7b46-44de-8f70-189b8c7aa8c1", + "identity": "99286@Quinn-Klassens-MacBook-Pro.local@", + "firstExecutionRunId": "981183a2-7b46-44de-8f70-189b8c7aa8c1", + "attempt": 1, + "firstWorkflowTaskBackoff": "0s", + "header": { + + } + } + }, + { + "eventId": "2", + "eventTime": "2023-07-27T13:17:07.830006003Z", + "eventType": "WorkflowTaskScheduled", + "taskId": "1160916", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "replay-test", + "kind": "Normal" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "3", + "eventTime": "2023-07-27T13:17:07.844309045Z", + "eventType": "WorkflowTaskStarted", + "taskId": "1160923", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "2", + "identity": "99286@Quinn-Klassens-MacBook-Pro.local@", + "requestId": "1bcfea99-c161-40b5-8a3c-2d698594994a", + "historySizeBytes": "572" + } + }, + { + "eventId": "4", + "eventTime": "2023-07-27T13:17:07.855022837Z", + "eventType": "WorkflowTaskCompleted", + "taskId": "1160927", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "2", + "startedEventId": "3", + "identity": "99286@Quinn-Klassens-MacBook-Pro.local@", + "workerVersioningId": { + "workerBuildId": "a6293ecc93d55fa71eb0d9687e852156" + }, + "sdkMetadata": { + "langUsedFlags": [ + 3 + ] + }, + "meteringMetadata": { + + } + } + }, + { + "eventId": "5", + "eventTime": "2023-07-27T13:17:07.855038212Z", + "eventType": "TimerStarted", + "taskId": "1160928", + "timerStartedEventAttributes": { + "timerId": "5", + "startToFireTimeout": "300s", + "workflowTaskCompletedEventId": "4" + } + }, + { + "eventId": "6", + "eventTime": "2023-07-27T13:17:09.854134879Z", + "eventType": "WorkflowExecutionCancelRequested", + "taskId": "1160932", + "workflowExecutionCancelRequestedEventAttributes": { + "identity": "99286@Quinn-Klassens-MacBook-Pro.local@" + } + }, + { + "eventId": "7", + "eventTime": "2023-07-27T13:17:09.854143713Z", + "eventType": "WorkflowTaskScheduled", + "taskId": "1160933", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "Quinn-Klassens-MacBook-Pro.local:c3129203-345c-45c1-ae50-ae9efd5f4c66", + "kind": "Sticky" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "8", + "eventTime": "2023-07-27T13:17:09.867791379Z", + "eventType": "WorkflowTaskStarted", + "taskId": "1160937", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "7", + "identity": "99286@Quinn-Klassens-MacBook-Pro.local@", + "requestId": "9d569699-7b95-4074-bf5f-0574e85f84a3", + "historySizeBytes": "1021" + } + }, + { + "eventId": "9", + "eventTime": "2023-07-27T13:17:09.879198421Z", + "eventType": "WorkflowTaskCompleted", + "taskId": "1160942", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "7", + "startedEventId": "8", + "identity": "99286@Quinn-Klassens-MacBook-Pro.local@", + "workerVersioningId": { + "workerBuildId": "a6293ecc93d55fa71eb0d9687e852156" + }, + "sdkMetadata": { + + }, + "meteringMetadata": { + + } + } + }, + { + "eventId": "10", + "eventTime": "2023-07-27T13:17:09.879390796Z", + "eventType": "TimerCanceled", + "taskId": "1160943", + "timerCanceledEventAttributes": { + "timerId": "5", + "startedEventId": "5", + "workflowTaskCompletedEventId": "9", + "identity": "99286@Quinn-Klassens-MacBook-Pro.local@" + } + }, + { + "eventId": "11", + "eventTime": "2023-07-27T13:17:09.879409629Z", + "eventType": "ActivityTaskScheduled", + "taskId": "1160944", + "activityTaskScheduledEventAttributes": { + "activityId": "11", + "activityType": { + "name": "helloworldActivity" + }, + "taskQueue": { + "name": "replay-test", + "kind": "Normal" + }, + "header": { + + }, + "input": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IndvcmxkIg==" + } + ] + }, + "scheduleToCloseTimeout": "0s", + "scheduleToStartTimeout": "60s", + "startToCloseTimeout": "60s", + "heartbeatTimeout": "20s", + "workflowTaskCompletedEventId": "9", + "retryPolicy": { + "initialInterval": "1s", + "backoffCoefficient": 2, + "maximumInterval": "100s" + } + } + }, + { + "eventId": "12", + "eventTime": "2023-07-27T13:17:09.879424171Z", + "eventType": "ActivityTaskStarted", + "taskId": "1160948", + "activityTaskStartedEventAttributes": { + "scheduledEventId": "11", + "identity": "99286@Quinn-Klassens-MacBook-Pro.local@", + "requestId": "358dbaf6-0b82-47bc-86e8-d46ad928db73", + "attempt": 1 + } + }, + { + "eventId": "13", + "eventTime": "2023-07-27T13:17:09.886071254Z", + "eventType": "ActivityTaskCompleted", + "taskId": "1160949", + "activityTaskCompletedEventAttributes": { + "result": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IkhlbGxvIHdvcmxkISI=" + } + ] + }, + "scheduledEventId": "11", + "startedEventId": "12", + "identity": "99286@Quinn-Klassens-MacBook-Pro.local@" + } + }, + { + "eventId": "14", + "eventTime": "2023-07-27T13:17:09.886075379Z", + "eventType": "WorkflowTaskScheduled", + "taskId": "1160950", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "Quinn-Klassens-MacBook-Pro.local:c3129203-345c-45c1-ae50-ae9efd5f4c66", + "kind": "Sticky" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "15", + "eventTime": "2023-07-27T13:17:09.894641588Z", + "eventType": "WorkflowTaskStarted", + "taskId": "1160954", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "14", + "identity": "99286@Quinn-Klassens-MacBook-Pro.local@", + "requestId": "f3805eb0-f98e-47b8-b1b1-128868619b4f", + "historySizeBytes": "1809" + } + }, + { + "eventId": "16", + "eventTime": "2023-07-27T13:17:09.905497754Z", + "eventType": "WorkflowTaskCompleted", + "taskId": "1160958", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "14", + "startedEventId": "15", + "identity": "99286@Quinn-Klassens-MacBook-Pro.local@", + "workerVersioningId": { + "workerBuildId": "a6293ecc93d55fa71eb0d9687e852156" + }, + "sdkMetadata": { + + }, + "meteringMetadata": { + + } + } + }, + { + "eventId": "17", + "eventTime": "2023-07-27T13:17:09.905521629Z", + "eventType": "WorkflowExecutionCompleted", + "taskId": "1160959", + "workflowExecutionCompletedEventAttributes": { + "workflowTaskCompletedEventId": "16" + } + } + ] +} \ No newline at end of file diff --git a/test/replaytests/replay_test.go b/test/replaytests/replay_test.go index d090dd41e..fb61dec3f 100644 --- a/test/replaytests/replay_test.go +++ b/test/replaytests/replay_test.go @@ -367,6 +367,17 @@ func (s *replayTestSuite) TestVersionAndMutableSideEffect() { s.NoError(err) } +func (s *replayTestSuite) TestCancelOrder() { + replayer := worker.NewWorkflowReplayer() + replayer.RegisterWorkflow(CancelOrderSelectWorkflow) + + err := replayer.ReplayWorkflowHistoryFromJSONFile(ilog.NewDefaultLogger(), "replay-tests-cancel-order.json") + s.NoError(err) + + err = replayer.ReplayWorkflowHistoryFromJSONFile(ilog.NewDefaultLogger(), "replay-tests-cancel-order-timer-resolved.json") + s.NoError(err) +} + type captureConverter struct { converter.DataConverter toPayloads []interface{} diff --git a/test/replaytests/workflows.go b/test/replaytests/workflows.go index 5516c9aec..89834c30a 100644 --- a/test/replaytests/workflows.go +++ b/test/replaytests/workflows.go @@ -437,3 +437,36 @@ func generateUUID(ctx workflow.Context) (string, error) { return generatedUUID, nil } + +func CancelOrderSelectWorkflow(ctx workflow.Context) error { + timerf := workflow.NewTimer(ctx, 5*time.Minute) + + var err error + disCtx, _ := workflow.NewDisconnectedContext(ctx) + selector := workflow.NewSelector(ctx) + + selector.AddFuture(timerf, func(f workflow.Future) { + err = timerf.Get(ctx, nil) + // do something different on cancel error + if !temporal.IsCanceledError(err) { + _ = workflow.UpsertSearchAttributes(ctx, map[string]interface{}{"CustomKeywordField": "testkey"}) + } else { + var result string + ao := workflow.ActivityOptions{ + ScheduleToStartTimeout: time.Minute, + StartToCloseTimeout: time.Minute, + HeartbeatTimeout: time.Second * 20, + } + disCtx = workflow.WithActivityOptions(disCtx, ao) + err = workflow.ExecuteActivity(disCtx, helloworldActivity, "world").Get(ctx, &result) + } + + }) + selector.AddReceive(ctx.Done(), func(c workflow.ReceiveChannel, more bool) { + c.Receive(ctx, nil) + err = workflow.Sleep(disCtx, 1*time.Second) + + }) + selector.Select(ctx) + return err +} diff --git a/test/workflow_test.go b/test/workflow_test.go index 89f303492..993d1af98 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -948,6 +948,10 @@ func (w *Workflows) ConsistentQueryWorkflow(ctx workflow.Context, delay time.Dur return nil } +func (w *Workflows) ActivityTimeoutsWorkflow(ctx workflow.Context, activityOptions workflow.ActivityOptions) error { + activityCtx := workflow.WithActivityOptions(ctx, activityOptions) + return workflow.ExecuteActivity(activityCtx, "Sleep", time.Second).Get(ctx, nil) +} func (w *Workflows) SignalWorkflow(ctx workflow.Context) (*commonpb.WorkflowType, error) { s := workflow.NewSelector(ctx) @@ -1155,6 +1159,46 @@ func (w *Workflows) WorkflowWithParallelLongLocalActivityAndHeartbeat(ctx workfl return nil } +func (w *Workflows) WorkflowWithLocalActivityStartToCloseTimeout(ctx workflow.Context) error { + // Validate that local activities respect StartToCloseTimeout and retry correctly + ao := w.defaultLocalActivityOptions() + ao.ScheduleToCloseTimeout = 10 * time.Second + ao.StartToCloseTimeout = 1 * time.Second + ao.RetryPolicy = &temporal.RetryPolicy{ + MaximumInterval: time.Second, + MaximumAttempts: 5, + } + ctx = workflow.WithLocalActivityOptions(ctx, ao) + + var activities *Activities + future := workflow.ExecuteLocalActivity(ctx, activities.SleepN, 3*time.Second, 3) + var count int32 + err := future.Get(ctx, &count) + if err != nil { + return err + } + if count != 3 { + return fmt.Errorf("expected 3, got %v", count) + } + // Validate the correct timeout error is returned + ao.StartToCloseTimeout = 1 * time.Second + ao.RetryPolicy = &temporal.RetryPolicy{ + MaximumInterval: time.Second, + MaximumAttempts: 1, + } + ctx = workflow.WithLocalActivityOptions(ctx, ao) + future = workflow.ExecuteLocalActivity(ctx, activities.SleepN, 3*time.Second, 3) + err = future.Get(ctx, nil) + var timeoutErr *temporal.TimeoutError + if errors.As(err, &timeoutErr) { + if timeoutErr.TimeoutType() != enumspb.TIMEOUT_TYPE_START_TO_CLOSE { + return fmt.Errorf("expected start to close timeout, got %v", timeoutErr.TimeoutType()) + } + return nil + } + return errors.New("expected timeout error") +} + func (w *Workflows) WorkflowWithLocalActivityRetries(ctx workflow.Context) error { laOpts := w.defaultLocalActivityOptions() laOpts.RetryPolicy = &internal.RetryPolicy{ @@ -1571,6 +1615,58 @@ func (w *Workflows) CronWorkflow(ctx workflow.Context) (int, error) { return retme, nil } +func (w *Workflows) WaitForCancelWithDisconnectedContextWorkflow(ctx workflow.Context) (err error) { + ao := workflow.ActivityOptions{ + StartToCloseTimeout: 1 * time.Minute, + HeartbeatTimeout: 5 * time.Second, + WaitForCancellation: true, + } + ctx = workflow.WithActivityOptions(ctx, ao) + + var activities *Activities + defer func() { + if !errors.Is(ctx.Err(), workflow.ErrCanceled) { + return + } + // When the Workflow is canceled, it has to get a new disconnected context to execute any Activities + newCtx, _ := workflow.NewDisconnectedContext(ctx) + err = workflow.ExecuteActivity(newCtx, activities.EmptyActivity).Get(newCtx, nil) + }() + + s := workflow.NewSelector(ctx) + + newCtx, _ := workflow.NewDisconnectedContext(ctx) + newCtx, cancel := workflow.WithCancel(newCtx) + + timer1 := workflow.NewTimer(newCtx, 5*time.Minute) + + err = workflow.SetQueryHandler(newCtx, "timer-created", func() (bool, error) { + return true, nil + }) + if err != nil { + return err + } + + s.AddFuture(timer1, func(f workflow.Future) { + err = f.Get(newCtx, nil) + if !errors.Is(ctx.Err(), workflow.ErrCanceled) { + panic("error is not canceled error") + } + }) + + s.AddReceive(ctx.Done(), func(c workflow.ReceiveChannel, more bool) { + c.Receive(ctx, nil) + cancel() + s.Select(ctx) + }) + + s.Select(ctx) + + var result string + err = workflow.ExecuteActivity(ctx, activities.EmptyActivity).Get(ctx, &result) + return +} + func (w *Workflows) CancelTimerConcurrentWithOtherCommandWorkflow(ctx workflow.Context) (int, error) { ao := workflow.ActivityOptions{ ScheduleToStartTimeout: time.Minute, @@ -2247,6 +2343,7 @@ func (w *Workflows) register(worker worker.Worker) { worker.RegisterWorkflow(w.CancelTimerAfterActivity) worker.RegisterWorkflow(w.CancelTimerViaDeferAfterWFTFailure) worker.RegisterWorkflow(w.CascadingCancellation) + worker.RegisterWorkflow(w.WaitForCancelWithDisconnectedContextWorkflow) worker.RegisterWorkflow(w.ChildWorkflowRetryOnError) worker.RegisterWorkflow(w.ChildWorkflowRetryOnTimeout) worker.RegisterWorkflow(w.ChildWorkflowSuccess) @@ -2285,10 +2382,12 @@ func (w *Workflows) register(worker worker.Worker) { worker.RegisterWorkflow(w.WorkflowWithLocalActivityStartWhenTimerCancel) worker.RegisterWorkflow(w.WorkflowWithParallelSideEffects) worker.RegisterWorkflow(w.WorkflowWithParallelMutableSideEffects) + worker.RegisterWorkflow(w.WorkflowWithLocalActivityStartToCloseTimeout) worker.RegisterWorkflow(w.LocalActivityStaleCache) worker.RegisterWorkflow(w.UpdateInfoWorkflow) worker.RegisterWorkflow(w.SignalWorkflow) worker.RegisterWorkflow(w.CronWorkflow) + worker.RegisterWorkflow(w.ActivityTimeoutsWorkflow) worker.RegisterWorkflow(w.CancelTimerConcurrentWithOtherCommandWorkflow) worker.RegisterWorkflow(w.CancelMultipleCommandsOverMultipleTasks) worker.RegisterWorkflow(w.CancelChildAndExecuteActivityRace)