Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into non-fatal-tracing-e…
Browse files Browse the repository at this point in the history
…rrors
  • Loading branch information
cdavis-joy committed Aug 2, 2023
2 parents 5878d47 + 1ec43ad commit 9d3a161
Show file tree
Hide file tree
Showing 31 changed files with 1,809 additions and 250 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/docker/dynamic-config-custom.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@ system.enableActivityEagerExecution:
frontend.enableUpdateWorkflowExecution:
- value: true
frontend.enableUpdateWorkflowExecutionAsyncAccepted:
- value: true
system.enableEagerWorkflowStart:
- value: true
54 changes: 36 additions & 18 deletions internal/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ type (
// better to rely on the default value.
// ScheduleToStartTimeout is always non-retryable. Retrying after this timeout doesn't make sense as it would
// just put the Activity Task back into the same Task Queue.
// If ScheduleToClose is not provided then this timeout is required.
// Optional: Defaults to unlimited.
ScheduleToStartTimeout time.Duration

Expand All @@ -109,7 +108,7 @@ type (
// to detect that an Activity that didn't complete on time. So this timeout should be as short as the longest
// possible execution of the Activity body. Potentially long running Activities must specify HeartbeatTimeout
// and call Activity.RecordHeartbeat(ctx, "my-heartbeat") periodically for timely failure detection.
// If ScheduleToClose is not provided then this timeout is required: Defaults to the ScheduleToCloseTimeout value.
// Either this option or ScheduleToClose is required: Defaults to the ScheduleToCloseTimeout value.
StartToCloseTimeout time.Duration

// HeartbeatTimeout - Heartbeat interval. Activity must call Activity.RecordHeartbeat(ctx, "my-heartbeat")
Expand Down Expand Up @@ -156,11 +155,13 @@ type (
// LocalActivityOptions stores local activity specific parameters that will be stored inside of a context.
LocalActivityOptions struct {
// ScheduleToCloseTimeout - The end to end timeout for the local activity including retries.
// This field is required.
// At least one of ScheduleToCloseTimeout or StartToCloseTimeout is required.
// defaults to StartToCloseTimeout if not set.
ScheduleToCloseTimeout time.Duration

// StartToCloseTimeout - The timeout for a single execution of the local activity.
// Optional: defaults to ScheduleToClose
// At least one of ScheduleToCloseTimeout or StartToCloseTimeout is required.
// defaults to ScheduleToCloseTimeout if not set.
StartToCloseTimeout time.Duration

// RetryPolicy specify how to retry activity if error happens.
Expand Down Expand Up @@ -254,25 +255,12 @@ func WithActivityTask(
contextPropagators []ContextPropagator,
interceptors []WorkerInterceptor,
) (context.Context, error) {
var deadline time.Time
scheduled := common.TimeValue(task.GetScheduledTime())
started := common.TimeValue(task.GetStartedTime())
scheduleToCloseTimeout := common.DurationValue(task.GetScheduleToCloseTimeout())
startToCloseTimeout := common.DurationValue(task.GetStartToCloseTimeout())
heartbeatTimeout := common.DurationValue(task.GetHeartbeatTimeout())

startToCloseDeadline := started.Add(startToCloseTimeout)
if scheduleToCloseTimeout > 0 {
scheduleToCloseDeadline := scheduled.Add(scheduleToCloseTimeout)
// Minimum of the two deadlines.
if scheduleToCloseDeadline.Before(startToCloseDeadline) {
deadline = scheduleToCloseDeadline
} else {
deadline = startToCloseDeadline
}
} else {
deadline = startToCloseDeadline
}
deadline := calculateActivityDeadline(scheduled, started, scheduleToCloseTimeout, startToCloseTimeout)

logger = log.With(logger,
tagActivityID, task.ActivityId,
Expand Down Expand Up @@ -333,6 +321,21 @@ func WithLocalActivityTask(
tagWorkflowID, task.params.WorkflowInfo.WorkflowExecution.ID,
tagRunID, task.params.WorkflowInfo.WorkflowExecution.RunID,
)
startedTime := time.Now()
scheduleToCloseTimeout := task.params.ScheduleToCloseTimeout
startToCloseTimeout := task.params.StartToCloseTimeout

if startToCloseTimeout == 0 {
startToCloseTimeout = scheduleToCloseTimeout
}
if scheduleToCloseTimeout == 0 {
scheduleToCloseTimeout = startToCloseTimeout
}
deadline := calculateActivityDeadline(task.scheduledTime, startedTime, scheduleToCloseTimeout, startToCloseTimeout)
if task.attempt > 1 && !task.expireTime.IsZero() && task.expireTime.Before(deadline) {
// this is attempt and expire time is before SCHEDULE_TO_CLOSE timeout
deadline = task.expireTime
}
return newActivityContext(ctx, interceptors, &activityEnvironment{
workflowType: &workflowTypeLocal,
workflowNamespace: task.params.WorkflowInfo.Namespace,
Expand All @@ -343,6 +346,9 @@ func WithLocalActivityTask(
logger: logger,
metricsHandler: metricsHandler,
isLocalActivity: true,
deadline: deadline,
scheduledTime: task.scheduledTime,
startedTime: startedTime,
dataConverter: dataConverter,
attempt: task.attempt,
})
Expand Down Expand Up @@ -375,3 +381,15 @@ func newActivityContext(

return ctx, nil
}

func calculateActivityDeadline(scheduled, started time.Time, scheduleToCloseTimeout, startToCloseTimeout time.Duration) time.Time {
startToCloseDeadline := started.Add(startToCloseTimeout)
if scheduleToCloseTimeout > 0 {
scheduleToCloseDeadline := scheduled.Add(scheduleToCloseTimeout)
// Minimum of the two deadlines.
if scheduleToCloseDeadline.Before(startToCloseDeadline) {
return scheduleToCloseDeadline
}
}
return startToCloseDeadline
}
8 changes: 8 additions & 0 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,11 @@ type (
// supported when Temporal server is using ElasticSearch). The key and value type must be registered on Temporal server side.
// Use GetSearchAttributes API to get valid key and corresponding value type.
SearchAttributes map[string]interface{}

// EnableEagerStart - request eager execution for this workflow, if a local worker is available.
//
// NOTE: Experimental
EnableEagerStart bool
}

// RetryPolicy defines the retry policy.
Expand Down Expand Up @@ -813,6 +818,9 @@ func NewServiceClient(workflowServiceClient workflowservice.WorkflowServiceClien
contextPropagators: options.ContextPropagators,
workerInterceptors: workerInterceptors,
excludeInternalFromRetry: options.ConnectionOptions.excludeInternalFromRetry,
eagerDispatcher: &eagerWorkflowDispatcher{
workersByTaskQueue: make(map[string][]eagerWorker),
},
}

// Create outbound interceptor by wrapping backwards through chain
Expand Down
3 changes: 2 additions & 1 deletion internal/internal_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,8 @@ func getValidatedLocalActivityOptions(ctx Context) (*ExecuteLocalActivityOptions
}
if p.ScheduleToCloseTimeout == 0 {
p.ScheduleToCloseTimeout = p.StartToCloseTimeout
} else {
}
if p.StartToCloseTimeout == 0 {
p.StartToCloseTimeout = p.ScheduleToCloseTimeout
}
return p, nil
Expand Down
87 changes: 23 additions & 64 deletions internal/internal_command_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,6 @@ type (
cancelActivityStateMachine struct {
*commandStateMachineBase
attributes *commandpb.RequestCancelActivityTaskCommandAttributes

// The commandsHelper.nextCommandEventIDResetCounter when this command
// incremented commandsHelper.commandsCancelledDuringWFCancellation.
cancelledOnEventIDResetCounter uint64
}

timerCommandStateMachine struct {
Expand All @@ -99,10 +95,6 @@ type (
cancelTimerCommandStateMachine struct {
*commandStateMachineBase
attributes *commandpb.CancelTimerCommandAttributes

// The commandsHelper.nextCommandEventIDResetCounter when this command
// incremented commandsHelper.commandsCancelledDuringWFCancellation.
cancelledOnEventIDResetCounter uint64
}

childWorkflowCommandStateMachine struct {
Expand Down Expand Up @@ -150,18 +142,10 @@ type (
orderedCommands *list.List
commands map[commandID]*list.Element

scheduledEventIDToActivityID map[int64]string
scheduledEventIDToCancellationID map[int64]string
scheduledEventIDToSignalID map[int64]string
versionMarkerLookup map[int64]versionMarker
commandsCancelledDuringWFCancellation int64
workflowExecutionIsCancelling bool

// Incremented everytime nextCommandEventID and
// commandsCancelledDuringWFCancellation is reset (i.e. on new workflow
// task). Won't ever happen, but technically the way this value is compared
// is safe for overflow wrap around.
nextCommandEventIDResetCounter uint64
scheduledEventIDToActivityID map[int64]string
scheduledEventIDToCancellationID map[int64]string
scheduledEventIDToSignalID map[int64]string
versionMarkerLookup map[int64]versionMarker
}

// panic when command state machine is in illegal state
Expand Down Expand Up @@ -477,9 +461,6 @@ func (d *commandStateMachineBase) cancel() {
case commandStateCommandSent:
d.moveState(commandStateCancellationCommandSent, eventCancel)
case commandStateInitiated:
if d.helper.workflowExecutionIsCancelling {
d.helper.commandsCancelledDuringWFCancellation++
}
d.moveState(commandStateCanceledAfterInitiated, eventCancel)
default:
d.failStateTransition(eventCancel)
Expand Down Expand Up @@ -589,10 +570,6 @@ func (d *activityCommandStateMachine) cancel() {
}
cancelCmd := d.helper.newCancelActivityStateMachine(attribs)
d.helper.addCommand(cancelCmd)
// We must mark the event ID reset counter for when we performed this
// increment so a potential decrement can only decrement if it wasn't
// reset
cancelCmd.cancelledOnEventIDResetCounter = d.helper.nextCommandEventIDResetCounter
// We also mark the schedule command as not eager if we haven't sent it yet.
// Server behavior differs on eager vs non-eager when scheduling and
// cancelling during the same task completion. If it has not been sent this
Expand All @@ -614,10 +591,6 @@ func (d *timerCommandStateMachine) cancel() {
}
cancelCmd := d.helper.newCancelTimerCommandStateMachine(attribs)
d.helper.addCommand(cancelCmd)
// We must mark the event ID reset counter for when we performed this
// increment so a potential decrement can only decrement if it wasn't
// reset
cancelCmd.cancelledOnEventIDResetCounter = d.helper.nextCommandEventIDResetCounter
}

d.commandStateMachineBase.cancel()
Expand Down Expand Up @@ -729,9 +702,6 @@ func (d *childWorkflowCommandStateMachine) handleCancelFailedEvent() {
func (d *childWorkflowCommandStateMachine) cancel() {
switch d.state {
case commandStateStarted:
if d.helper.workflowExecutionIsCancelling {
d.helper.commandsCancelledDuringWFCancellation++
}
d.moveState(commandStateCanceledAfterStarted, eventCancel)
// A child workflow may be canceled _after_ something like an activity start
// happens inside a simulated goroutine. However, since the state of the
Expand Down Expand Up @@ -888,11 +858,10 @@ func newCommandsHelper() *commandsHelper {
orderedCommands: list.New(),
commands: make(map[commandID]*list.Element),

scheduledEventIDToActivityID: make(map[int64]string),
scheduledEventIDToCancellationID: make(map[int64]string),
scheduledEventIDToSignalID: make(map[int64]string),
versionMarkerLookup: make(map[int64]versionMarker),
commandsCancelledDuringWFCancellation: 0,
scheduledEventIDToActivityID: make(map[int64]string),
scheduledEventIDToCancellationID: make(map[int64]string),
scheduledEventIDToSignalID: make(map[int64]string),
versionMarkerLookup: make(map[int64]versionMarker),
}
}

Expand All @@ -905,13 +874,20 @@ func (h *commandsHelper) setCurrentWorkflowTaskStartedEventID(workflowTaskStarte
// corresponding history event after processing. So we can use workflow task started event id + 2 as the offset as
// workflow task completed event is always the first event in the workflow task followed by events generated from
// commands. This allows client sdk to deterministically predict history event ids generated by processing of the
// command. We must also add the number of cancel commands that were spawned during cancellation of the workflow
// execution as those canceled command events will show up *after* the workflow task completed event.
h.nextCommandEventID = workflowTaskStartedEventID + 2 + h.commandsCancelledDuringWFCancellation
h.commandsCancelledDuringWFCancellation = 0
// We must change the counter here so that others who mutate
// commandsCancelledDuringWFCancellation know it has since been reset
h.nextCommandEventIDResetCounter++
// command. It is possible, notably during workflow cancellation, that commands are generated before the workflow
// task started event is processed. In this case we need to adjust the nextCommandEventID to account for these unsent
// commands.git
var uncountedCommands int64
for curr := h.orderedCommands.Front(); curr != nil; {
d := curr.Value.(commandStateMachine)
command := d.getCommand()
if command != nil {
uncountedCommands += 1
}
curr = curr.Next()
}

h.nextCommandEventID = workflowTaskStartedEventID + 2 + uncountedCommands
}

func (h *commandsHelper) getNextID() int64 {
Expand Down Expand Up @@ -974,24 +950,7 @@ func (h *commandsHelper) removeCancelOfResolvedCommand(commandID commandID) {
orderedCmdEl, ok := h.commands[commandID]
if ok {
delete(h.commands, commandID)
command := h.orderedCommands.Remove(orderedCmdEl)
// Sometimes commandsCancelledDuringWFCancellation was incremented before
// it was reset and sometimes not. We make sure the workflow execution is
// actually cancelling since that's the only time we increment the counter
// in the first place. Also, we use the reset counter to see if we're still
// on the same iteration where we may have incremented it before.
if h.workflowExecutionIsCancelling {
switch command := command.(type) {
case *cancelActivityStateMachine:
if command.cancelledOnEventIDResetCounter == h.nextCommandEventIDResetCounter {
h.commandsCancelledDuringWFCancellation--
}
case *cancelTimerCommandStateMachine:
if command.cancelledOnEventIDResetCounter == h.nextCommandEventIDResetCounter {
h.commandsCancelledDuringWFCancellation--
}
}
}
_ = h.orderedCommands.Remove(orderedCmdEl)
}
}

Expand Down
35 changes: 35 additions & 0 deletions internal/internal_eager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// The MIT License
//
// Copyright (c) 2022 Temporal Technologies Inc. All rights reserved.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package internal

// eagerWorker is the minimal worker interface needed for eager activities and workflows
type eagerWorker interface {
// tryReserveSlot tries to reserver a task slot on the worker without blocking
// caller is expected to release the slot with releaseSlot
tryReserveSlot() bool
// releaseSlot release a task slot acquired by tryReserveSlot
releaseSlot()
// processTaskAsync process a new task on the worker asynchronously and
// call callback once complete
processTaskAsync(task interface{}, callback func())
}
Loading

0 comments on commit 9d3a161

Please sign in to comment.