Skip to content

Commit

Permalink
Merge branch 'master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
jackdawm authored Jul 24, 2023
2 parents f8ab872 + 899f6d0 commit c3d3854
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 3 deletions.
5 changes: 5 additions & 0 deletions activity/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,8 @@ func GetHeartbeatDetails(ctx context.Context, d ...interface{}) error {
func GetWorkerStopChannel(ctx context.Context) <-chan struct{} {
return internal.GetWorkerStopChannel(ctx)
}

// IsActivity check if the context is an activity context from a normal or local activity.
func IsActivity(ctx context.Context) bool {
return internal.IsActivity(ctx)
}
7 changes: 7 additions & 0 deletions internal/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type (
StartedTime time.Time // Time of activity start
Deadline time.Time // Time of activity timeout
Attempt int32 // Attempt starts from 1, and increased by 1 for every retry if retry policy is specified.
IsLocalActivity bool // true if it is a local activity
}

// RegisterActivityOptions consists of options for registering an activity
Expand Down Expand Up @@ -179,6 +180,12 @@ func HasHeartbeatDetails(ctx context.Context) bool {
return getActivityOutboundInterceptor(ctx).HasHeartbeatDetails(ctx)
}

// IsActivity check if the context is an activity context from a normal or local activity.
func IsActivity(ctx context.Context) bool {
a := ctx.Value(activityInterceptorContextKey)
return a != nil
}

// GetHeartbeatDetails extract heartbeat details from last failed attempt. This is used in combination with retry policy.
// An activity could be scheduled with an optional retry policy on ActivityOptions. If the activity failed then server
// would attempt to dispatch another activity task to retry according to the retry policy. If there was heartbeat
Expand Down
8 changes: 8 additions & 0 deletions internal/activity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,3 +241,11 @@ func (s *activityTestSuite) TestGetWorkerStopChannel() {
channel := GetWorkerStopChannel(ctx)
s.NotNil(channel)
}

func (s *activityTestSuite) TestIsActivity() {
ctx := context.Background()
s.False(IsActivity(ctx))
ch := make(chan struct{}, 1)
ctx, _ = newActivityContext(context.Background(), nil, &activityEnvironment{workerStopChannel: ch})
s.True(IsActivity(ctx))
}
1 change: 1 addition & 0 deletions internal/internal_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ func (a *activityEnvironmentInterceptor) GetInfo(ctx context.Context) ActivityIn
Attempt: a.env.attempt,
WorkflowType: a.env.workflowType,
WorkflowNamespace: a.env.workflowNamespace,
IsLocalActivity: a.env.isLocalActivity,
}
}

Expand Down
3 changes: 3 additions & 0 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1352,6 +1352,9 @@ func (aw *WorkflowReplayer) replayWorkflowHistory(logger log.Logger, service wor
FailureConverter: aw.failureConverter,
ContextPropagators: aw.contextPropagators,
EnableLoggingInReplay: aw.enableLoggingInReplay,
// Hardcoding NopHandler avoids "No metrics handler configured for temporal worker"
// logs during replay.
MetricsHandler: metrics.NopHandler,
capabilities: &workflowservice.GetSystemInfoResponse_Capabilities{
SignalAndQueryHeader: true,
InternalErrorDifferentiation: true,
Expand Down
9 changes: 8 additions & 1 deletion test/activity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,12 @@ func (a *Activities) failNTimes(_ context.Context, times int, id int) error {
return errFailOnPurpose
}

func (a *Activities) InspectActivityInfo(ctx context.Context, namespace, taskQueue, wfType string) error {
func (a *Activities) InspectActivityInfo(ctx context.Context, namespace, taskQueue, wfType string, isLocalActivity bool) error {
a.append("inspectActivityInfo")
if !activity.IsActivity(ctx) {
return fmt.Errorf("expected InActivity to return %v but got %v", true, activity.IsActivity(ctx))
}

info := activity.GetInfo(ctx)
if info.WorkflowNamespace != namespace {
return fmt.Errorf("expected namespace %v but got %v", namespace, info.WorkflowNamespace)
Expand All @@ -165,6 +169,9 @@ func (a *Activities) InspectActivityInfo(ctx context.Context, namespace, taskQue
if info.TaskQueue != taskQueue {
return fmt.Errorf("expected taskQueue %v but got %v", taskQueue, info.TaskQueue)
}
if info.IsLocalActivity != isLocalActivity {
return fmt.Errorf("expected IsLocalActivity %v but got %v", isLocalActivity, info.IsLocalActivity)
}
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions test/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1058,7 +1058,7 @@ func (w *Workflows) InspectActivityInfo(ctx workflow.Context) error {
wfType := info.WorkflowType.Name
taskQueue := info.TaskQueueName
ctx = workflow.WithActivityOptions(ctx, w.defaultActivityOptions())
return workflow.ExecuteActivity(ctx, "inspectActivityInfo", namespace, taskQueue, wfType).Get(ctx, nil)
return workflow.ExecuteActivity(ctx, "inspectActivityInfo", namespace, taskQueue, wfType, false).Get(ctx, nil)
}

func (w *Workflows) InspectLocalActivityInfo(ctx workflow.Context) error {
Expand All @@ -1069,7 +1069,7 @@ func (w *Workflows) InspectLocalActivityInfo(ctx workflow.Context) error {
ctx = workflow.WithLocalActivityOptions(ctx, w.defaultLocalActivityOptions())
var activities *Activities
return workflow.ExecuteLocalActivity(
ctx, activities.InspectActivityInfo, namespace, taskQueue, wfType).Get(ctx, nil)
ctx, activities.InspectActivityInfo, namespace, taskQueue, wfType, true).Get(ctx, nil)
}

func (w *Workflows) WorkflowWithLocalActivityCtxPropagation(ctx workflow.Context) (string, error) {
Expand Down

0 comments on commit c3d3854

Please sign in to comment.