Skip to content

Commit

Permalink
Minor docstring grammar updates in activity.go (#1608)
Browse files Browse the repository at this point in the history
* some docstring updates

* minor fix
  • Loading branch information
yuandrew authored Aug 22, 2024
1 parent a31f86d commit 9bcc1a9
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 62 deletions.
43 changes: 21 additions & 22 deletions activity/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,37 +39,37 @@ type (
// Info contains information about a currently executing activity.
Info = internal.ActivityInfo

// RegisterOptions consists of options for registering an activity
// RegisterOptions consists of options for registering an activity.
RegisterOptions = internal.RegisterActivityOptions
)

// ErrResultPending is returned from activity's implementation to indicate the activity is not completed when
// ErrResultPending is returned from activity's implementation to indicate the activity is not completed when the
// activity method returns. Activity needs to be completed by Client.CompleteActivity() separately. For example, if an
// activity require human interaction (like approve an expense report), the activity could return ErrResultPending
// which indicate the activity is not done yet. Then, when the waited human action happened, it needs to trigger something
// that could report the activity completed event to temporal server via Client.CompleteActivity() API.
// activity requires human interaction (like approving an expense report), the activity could return ErrResultPending,
// which indicates the activity is not done yet. Then, when the waited human action happened, it needs to trigger something
// that could report the activity completed event to the temporal server via the Client.CompleteActivity() API.
var ErrResultPending = internal.ErrActivityResultPending

// GetInfo returns information about currently executing activity.
// GetInfo returns information about the currently executing activity.
func GetInfo(ctx context.Context) Info {
return internal.GetActivityInfo(ctx)
}

// GetLogger returns a logger that can be used in activity
// GetLogger returns a logger that can be used in the activity.
func GetLogger(ctx context.Context) log.Logger {
return internal.GetActivityLogger(ctx)
}

// GetMetricsHandler returns a metrics handler that can be used in activity
// GetMetricsHandler returns a metrics handler that can be used in the activity.
func GetMetricsHandler(ctx context.Context) metrics.Handler {
return internal.GetActivityMetricsHandler(ctx)
}

// RecordHeartbeat sends heartbeat for the currently executing activity
// If the activity is either canceled (or) workflow/activity doesn't exist then we would cancel
// RecordHeartbeat sends a heartbeat for the currently executing activity.
// If the activity is either canceled or the workflow/activity doesn't exist, then we would cancel
// the context with error context.Canceled.
//
// details - the details that you provided here can be seen in the workflow when it receives TimeoutError, you
// details - The details that you provide here can be seen in the workflow when it receives TimeoutError. You
// can check error with TimeoutType()/Details().
//
// Note: If using asynchronous activity completion,
Expand All @@ -78,34 +78,33 @@ func RecordHeartbeat(ctx context.Context, details ...interface{}) {
internal.RecordActivityHeartbeat(ctx, details...)
}

// HasHeartbeatDetails checks if there is heartbeat details from last attempt.
// HasHeartbeatDetails checks if there are heartbeat details from the last attempt.
func HasHeartbeatDetails(ctx context.Context) bool {
return internal.HasHeartbeatDetails(ctx)
}

// GetHeartbeatDetails extract heartbeat details from last failed attempt. This is used in combination with retry policy.
// An activity could be scheduled with an optional retry policy on ActivityOptions. If the activity failed then server
// would attempt to dispatch another activity task to retry according to the retry policy. If there was heartbeat
// GetHeartbeatDetails extracts heartbeat details from the last failed attempt. This is used in combination with the retry policy.
// An activity could be scheduled with an optional retry policy on ActivityOptions. If the activity failed, then server
// would attempt to dispatch another activity task to retry according to the retry policy. If there were heartbeat
// details reported by activity from the failed attempt, the details would be delivered along with the activity task for
// retry attempt. Activity could extract the details by GetHeartbeatDetails() and resume from the progress.
// the retry attempt. An activity can extract the details from GetHeartbeatDetails() and resume progress from there.
// See TestActivityEnvironment.SetHeartbeatDetails() for unit test support.
//
// Note, values should not be reused for extraction here because merging on top
// of existing values may result in unexpected behavior similar to
// json.Unmarshal.
// Note: Values should not be reused for extraction here because merging on top
// of existing values may result in unexpected behavior similar to json.Unmarshal.
func GetHeartbeatDetails(ctx context.Context, d ...interface{}) error {
return internal.GetHeartbeatDetails(ctx, d...)
}

// GetWorkerStopChannel returns a read-only channel. The closure of this channel indicates the activity worker is stopping.
// When the worker is stopping, it will close this channel and wait until the worker stop timeout finishes. After the timeout
// hit, the worker will cancel the activity context and then exit. The timeout can be defined by worker option: WorkerStopTimeout.
// Use this channel to handle activity graceful exit when the activity worker stops.
// hits, the worker will cancel the activity context and then exit. The timeout can be defined by worker option: WorkerStopTimeout.
// Use this channel to handle a graceful activity exit when the activity worker stops.
func GetWorkerStopChannel(ctx context.Context) <-chan struct{} {
return internal.GetWorkerStopChannel(ctx)
}

// IsActivity check if the context is an activity context from a normal or local activity.
// IsActivity checks if the context is an activity context from a normal or local activity.
func IsActivity(ctx context.Context) bool {
return internal.IsActivity(ctx)
}
79 changes: 39 additions & 40 deletions internal/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ import (
)

type (
// ActivityType identifies a activity type.
// ActivityType identifies an activity type.
ActivityType struct {
Name string
}

// ActivityInfo contains information about currently executing activity.
// ActivityInfo contains information about a currently executing activity.
ActivityInfo struct {
TaskToken []byte
WorkflowType *WorkflowType
Expand All @@ -60,7 +60,7 @@ type (
IsLocalActivity bool // true if it is a local activity
}

// RegisterActivityOptions consists of options for registering an activity
// RegisterActivityOptions consists of options for registering an activity.
RegisterActivityOptions struct {
// When an activity is a function the name is an actual activity type name.
// When an activity is part of a structure then each member of the structure becomes an activity with
Expand All @@ -82,22 +82,22 @@ type (
// The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is
// subjected to change in the future.
ActivityOptions struct {
// TaskQueue that the activity needs to be scheduled on.
// optional: The default task queue with the same name as the workflow task queue.
// TaskQueue - Name of the task queue that the activity needs to be scheduled on.
// Optional: The default task queue with the same name as the workflow task queue.
TaskQueue string

// ScheduleToCloseTimeout - Total time that a workflow is willing to wait for Activity to complete.
// ScheduleToCloseTimeout - Total time that a workflow is willing to wait for an Activity to complete.
// ScheduleToCloseTimeout limits the total time of an Activity's execution including retries
// (use StartToCloseTimeout to limit the time of a single attempt).
// The zero value of this uses default value.
// Either this option or StartToClose is required: Defaults to unlimited.
// Either this option or StartToCloseTimeout is required: Defaults to unlimited.
ScheduleToCloseTimeout time.Duration

// ScheduleToStartTimeout - Time that the Activity Task can stay in the Task Queue before it is picked up by
// a Worker. Do not specify this timeout unless using host specific Task Queues for Activity Tasks are being
// used for routing. In almost all situations that don't involve routing activities to specific hosts it is
// used for routing. In almost all situations that don't involve routing activities to specific hosts, it is
// better to rely on the default value.
// ScheduleToStartTimeout is always non-retryable. Retrying after this timeout doesn't make sense as it would
// ScheduleToStartTimeout is always non-retryable. Retrying after this timeout doesn't make sense, as it would
// just put the Activity Task back into the same Task Queue.
// Optional: Defaults to unlimited.
ScheduleToStartTimeout time.Duration
Expand All @@ -107,7 +107,7 @@ type (
// to detect that an Activity that didn't complete on time. So this timeout should be as short as the longest
// possible execution of the Activity body. Potentially long running Activities must specify HeartbeatTimeout
// and call Activity.RecordHeartbeat(ctx, "my-heartbeat") periodically for timely failure detection.
// Either this option or ScheduleToClose is required: Defaults to the ScheduleToCloseTimeout value.
// Either this option or ScheduleToCloseTimeout is required: Defaults to the ScheduleToCloseTimeout value.
StartToCloseTimeout time.Duration

// HeartbeatTimeout - Heartbeat interval. Activity must call Activity.RecordHeartbeat(ctx, "my-heartbeat")
Expand All @@ -120,112 +120,111 @@ type (
WaitForCancellation bool

// ActivityID - Business level activity ID, this is not needed for most of the cases if you have
// to specify this then talk to temporal team. This is something will be done in future.
// to specify this then talk to the temporal team. This is something will be done in the future.
// Optional: default empty string
ActivityID string

// RetryPolicy specifies how to retry an Activity if an error occurs.
// RetryPolicy - Specifies how to retry an Activity if an error occurs.
// More details are available at docs.temporal.io.
// RetryPolicy is optional. If one is not specified a default RetryPolicy is provided by the server.
// RetryPolicy is optional. If one is not specified, a default RetryPolicy is provided by the server.
// The default RetryPolicy provided by the server specifies:
// - InitialInterval of 1 second
// - BackoffCoefficient of 2.0
// - MaximumInterval of 100 x InitialInterval
// - MaximumAttempts of 0 (unlimited)
// To disable retries set MaximumAttempts to 1.
// To disable retries, set MaximumAttempts to 1.
// The default RetryPolicy provided by the server can be overridden by the dynamic config.
RetryPolicy *RetryPolicy

// If true, will not request eager execution regardless of worker settings.
// If true, eager execution will not be requested, regardless of worker settings.
// If false, eager execution may still be disabled at the worker level or
// eager execution may not be requested due to lack of available slots.
// may not be requested due to lack of available slots.
//
// Eager activity execution means the server returns requested eager
// activities directly from the workflow task back to this worker which is
// faster than non-eager which may be dispatched to a separate worker.
// activities directly from the workflow task back to this worker. This is
// faster than non-eager, which may be dispatched to a separate worker.
DisableEagerExecution bool

// VersioningIntent specifies whether this activity should run on a worker with a compatible
// VersioningIntent - Specifies whether this activity should run on a worker with a compatible
// build ID or not. See temporal.VersioningIntent.
// WARNING: Worker versioning is currently experimental
VersioningIntent VersioningIntent
}

// LocalActivityOptions stores local activity specific parameters that will be stored inside of a context.
LocalActivityOptions struct {
// ScheduleToCloseTimeout - The end to end timeout for the local activity including retries.
// ScheduleToCloseTimeout - The end to end timeout for the local activity, including retries.
// At least one of ScheduleToCloseTimeout or StartToCloseTimeout is required.
// defaults to StartToCloseTimeout if not set.
// Defaults to StartToCloseTimeout if not set.
ScheduleToCloseTimeout time.Duration

// StartToCloseTimeout - The timeout for a single execution of the local activity.
// At least one of ScheduleToCloseTimeout or StartToCloseTimeout is required.
// defaults to ScheduleToCloseTimeout if not set.
// Defaults to ScheduleToCloseTimeout if not set.
StartToCloseTimeout time.Duration

// RetryPolicy specify how to retry activity if error happens.
// RetryPolicy - Specify how to retry activity if error happens.
// Optional: default is to retry according to the default retry policy up to ScheduleToCloseTimeout
// with 1sec initial delay between retries and 2x backoff.
RetryPolicy *RetryPolicy
}
)

// GetActivityInfo returns information about currently executing activity.
// GetActivityInfo returns information about the currently executing activity.
func GetActivityInfo(ctx context.Context) ActivityInfo {
return getActivityOutboundInterceptor(ctx).GetInfo(ctx)
}

// HasHeartbeatDetails checks if there is heartbeat details from last attempt.
// HasHeartbeatDetails checks if there are heartbeat details from last attempt.
func HasHeartbeatDetails(ctx context.Context) bool {
return getActivityOutboundInterceptor(ctx).HasHeartbeatDetails(ctx)
}

// IsActivity check if the context is an activity context from a normal or local activity.
// IsActivity checks if the context is an activity context from a normal or local activity.
func IsActivity(ctx context.Context) bool {
a := ctx.Value(activityInterceptorContextKey)
return a != nil
}

// GetHeartbeatDetails extract heartbeat details from last failed attempt. This is used in combination with retry policy.
// An activity could be scheduled with an optional retry policy on ActivityOptions. If the activity failed then server
// would attempt to dispatch another activity task to retry according to the retry policy. If there was heartbeat
// GetHeartbeatDetails extracts heartbeat details from the last failed attempt. This is used in combination with the retry policy.
// An activity could be scheduled with an optional retry policy on ActivityOptions. If the activity failed, then server
// would attempt to dispatch another activity task to retry according to the retry policy. If there were heartbeat
// details reported by activity from the failed attempt, the details would be delivered along with the activity task for
// retry attempt. Activity could extract the details by GetHeartbeatDetails() and resume from the progress.
// the retry attempt. An activity can extract the details from GetHeartbeatDetails() and resume progress from there.
//
// Note, values should not be reused for extraction here because merging on top
// of existing values may result in unexpected behavior similar to
// json.Unmarshal.
// Note: Values should not be reused for extraction here because merging on top
// of existing values may result in unexpected behavior similar to json.Unmarshal.
func GetHeartbeatDetails(ctx context.Context, d ...interface{}) error {
return getActivityOutboundInterceptor(ctx).GetHeartbeatDetails(ctx, d...)
}

// GetActivityLogger returns a logger that can be used in activity
// GetActivityLogger returns a logger that can be used in the activity.
func GetActivityLogger(ctx context.Context) log.Logger {
return getActivityOutboundInterceptor(ctx).GetLogger(ctx)
}

// GetActivityMetricsHandler returns a metrics handler that can be used in activity
// GetActivityMetricsHandler returns a metrics handler that can be used in the activity.
func GetActivityMetricsHandler(ctx context.Context) metrics.Handler {
return getActivityOutboundInterceptor(ctx).GetMetricsHandler(ctx)
}

// GetWorkerStopChannel returns a read-only channel. The closure of this channel indicates the activity worker is stopping.
// When the worker is stopping, it will close this channel and wait until the worker stop timeout finishes. After the timeout
// hit, the worker will cancel the activity context and then exit. The timeout can be defined by worker option: WorkerStopTimeout.
// Use this channel to handle activity graceful exit when the activity worker stops.
// hits, the worker will cancel the activity context and then exit. The timeout can be defined by worker option: WorkerStopTimeout.
// Use this channel to handle a graceful activity exit when the activity worker stops.
func GetWorkerStopChannel(ctx context.Context) <-chan struct{} {
return getActivityOutboundInterceptor(ctx).GetWorkerStopChannel(ctx)
}

// RecordActivityHeartbeat sends heartbeat for the currently executing activity
// If the activity is either canceled (or) workflow/activity doesn't exist then we would cancel
// RecordActivityHeartbeat sends a heartbeat for the currently executing activity.
// If the activity is either canceled or workflow/activity doesn't exist, then we would cancel
// the context with error context.Canceled.
//
// TODO: we don't have a way to distinguish between the two cases when context is canceled because
// context doesn't support overriding value of ctx.Error.
// TODO: Implement automatic heartbeating with cancellation through ctx.
//
// details - the details that you provided here can be seen in the workflow when it receives TimeoutError, you
// details - The details that you provided here can be seen in the workflow when it receives TimeoutError. You
// can check error TimeoutType()/Details().
func RecordActivityHeartbeat(ctx context.Context, details ...interface{}) {
getActivityOutboundInterceptor(ctx).RecordHeartbeat(ctx, details...)
Expand Down

0 comments on commit 9bcc1a9

Please sign in to comment.