Skip to content

Commit

Permalink
Allow client to suggest delay until next retry
Browse files Browse the repository at this point in the history
  • Loading branch information
ast2023 committed Jan 8, 2024
1 parent 6244097 commit d5e4f14
Show file tree
Hide file tree
Showing 18 changed files with 143 additions and 29 deletions.
4 changes: 4 additions & 0 deletions activity/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ type (

// RegisterOptions consists of options for registering an activity
RegisterOptions = internal.RegisterActivityOptions

// ExtraRequests is an additional data which should be sent to the server to influence its decision
// about the next retry
ExtraRequests = internal.ActivityExtraRequests
)

// ErrResultPending is returned from activity's implementation to indicate the activity is not completed when
Expand Down
2 changes: 1 addition & 1 deletion contrib/datadog/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ require (
github.com/secure-systems-lab/go-securesystemslib v0.7.0 // indirect
github.com/stretchr/objx v0.5.1 // indirect
github.com/tinylib/msgp v1.1.8 // indirect
go.temporal.io/api v1.26.1 // indirect
go.temporal.io/api v1.26.1-0.20240103185939-608bdd111e4b // indirect
go.uber.org/atomic v1.11.0 // indirect
go4.org/intern v0.0.0-20230525184215-6c62f75575cb // indirect
go4.org/unsafe/assume-no-moving-gc v0.0.0-20230525183740-e7c30c78aeb2 // indirect
Expand Down
8 changes: 4 additions & 4 deletions contrib/datadog/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.temporal.io/api v1.26.1 h1:YqGQsOr/Tx4nVdA8wCv74AxesaIzCRHWb3KkHrYqI8k=
go.temporal.io/api v1.26.1/go.mod h1:Y/rALXTprFO+bvAlAfLFoJj7KpQIcL4GDQVN6fhYIa4=
go.temporal.io/api v1.26.1-0.20240103185939-608bdd111e4b h1:Fi5NWG08z7pfxBolgjchVp4PnmWrGIHjqboDlXve5Sg=
go.temporal.io/api v1.26.1-0.20240103185939-608bdd111e4b/go.mod h1:mix7Bpl8mFEfYud66rjYInxNpP43sqXvr1UiHv+mur0=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
Expand Down Expand Up @@ -250,10 +250,10 @@ google.golang.org/grpc v1.60.1/go.mod h1:OlCHIeLYqSSsLi6i49B5QGdzaMZK9+M7LXN2FKz
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/DataDog/dd-trace-go.v1 v1.58.1 h1:zhVNyN5V9G7LVuDh44q3wkcbQwtjIsmmUCieayojNYo=
gopkg.in/DataDog/dd-trace-go.v1 v1.58.1/go.mod h1:SmnEjjV9ZQr4MWRSUYEpoPyNtmtRK5J6UuJdAma+Yxw=
google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I=
google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/DataDog/dd-trace-go.v1 v1.58.1 h1:zhVNyN5V9G7LVuDh44q3wkcbQwtjIsmmUCieayojNYo=
gopkg.in/DataDog/dd-trace-go.v1 v1.58.1/go.mod h1:SmnEjjV9ZQr4MWRSUYEpoPyNtmtRK5J6UuJdAma+Yxw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
Expand Down
9 changes: 6 additions & 3 deletions contrib/opentelemetry/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@ require (
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a // indirect
github.com/go-logr/logr v1.3.0 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
Expand All @@ -27,7 +30,7 @@ require (
github.com/stretchr/objx v0.5.0 // indirect
go.opentelemetry.io/otel/metric v1.21.0
go.opentelemetry.io/otel/sdk/metric v1.21.0
go.temporal.io/api v1.26.1 // indirect
go.temporal.io/api v1.26.1-0.20240103185939-608bdd111e4b // indirect
go.uber.org/atomic v1.9.0 // indirect
golang.org/x/exp v0.0.0-20231127185646-65229373498e // indirect
golang.org/x/net v0.19.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions contrib/opentelemetry/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ go.opentelemetry.io/otel/sdk/metric v1.21.0 h1:smhI5oD714d6jHE6Tie36fPx4WDFIg+Y6
go.opentelemetry.io/otel/sdk/metric v1.21.0/go.mod h1:FJ8RAsoPGv/wYMgBdUJXOm+6pzFY3YdljnXtv1SBE8Q=
go.opentelemetry.io/otel/trace v1.21.0 h1:WD9i5gzvoUPuXIXH24ZNBudiarZDKuekPqi/E8fpfLc=
go.opentelemetry.io/otel/trace v1.21.0/go.mod h1:LGbsEB0f9LGjN+OZaQQ26sohbOmiMR+BaslueVtS/qQ=
go.temporal.io/api v1.26.1 h1:YqGQsOr/Tx4nVdA8wCv74AxesaIzCRHWb3KkHrYqI8k=
go.temporal.io/api v1.26.1/go.mod h1:Y/rALXTprFO+bvAlAfLFoJj7KpQIcL4GDQVN6fhYIa4=
go.temporal.io/api v1.26.1-0.20240103185939-608bdd111e4b h1:Fi5NWG08z7pfxBolgjchVp4PnmWrGIHjqboDlXve5Sg=
go.temporal.io/api v1.26.1-0.20240103185939-608bdd111e4b/go.mod h1:mix7Bpl8mFEfYud66rjYInxNpP43sqXvr1UiHv+mur0=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
Expand Down
2 changes: 1 addition & 1 deletion contrib/opentracing/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/robfig/cron v1.2.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
go.temporal.io/api v1.26.1 // indirect
go.temporal.io/api v1.26.1-0.20240103185939-608bdd111e4b // indirect
go.uber.org/atomic v1.9.0 // indirect
golang.org/x/exp v0.0.0-20231127185646-65229373498e // indirect
golang.org/x/net v0.19.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions contrib/opentracing/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXl
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.temporal.io/api v1.26.1 h1:YqGQsOr/Tx4nVdA8wCv74AxesaIzCRHWb3KkHrYqI8k=
go.temporal.io/api v1.26.1/go.mod h1:Y/rALXTprFO+bvAlAfLFoJj7KpQIcL4GDQVN6fhYIa4=
go.temporal.io/api v1.26.1-0.20240103185939-608bdd111e4b h1:Fi5NWG08z7pfxBolgjchVp4PnmWrGIHjqboDlXve5Sg=
go.temporal.io/api v1.26.1-0.20240103185939-608bdd111e4b/go.mod h1:mix7Bpl8mFEfYud66rjYInxNpP43sqXvr1UiHv+mur0=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
Expand Down
2 changes: 1 addition & 1 deletion contrib/tally/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/robfig/cron v1.2.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/twmb/murmur3 v1.1.5 // indirect
go.temporal.io/api v1.26.1 // indirect
go.temporal.io/api v1.26.1-0.20240103185939-608bdd111e4b // indirect
go.uber.org/atomic v1.9.0 // indirect
golang.org/x/exp v0.0.0-20231127185646-65229373498e // indirect
golang.org/x/net v0.19.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions contrib/tally/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ github.com/uber-go/tally/v4 v4.1.1/go.mod h1:aXeSTDMl4tNosyf6rdU8jlgScHyjEGGtfJ/
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.temporal.io/api v1.26.1 h1:YqGQsOr/Tx4nVdA8wCv74AxesaIzCRHWb3KkHrYqI8k=
go.temporal.io/api v1.26.1/go.mod h1:Y/rALXTprFO+bvAlAfLFoJj7KpQIcL4GDQVN6fhYIa4=
go.temporal.io/api v1.26.1-0.20240103185939-608bdd111e4b h1:Fi5NWG08z7pfxBolgjchVp4PnmWrGIHjqboDlXve5Sg=
go.temporal.io/api v1.26.1-0.20240103185939-608bdd111e4b/go.mod h1:mix7Bpl8mFEfYud66rjYInxNpP43sqXvr1UiHv+mur0=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
Expand Down
5 changes: 5 additions & 0 deletions internal/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ type (
IsLocalActivity bool // true if it is a local activity
}

// ActivityExtraRequests contains additional data to be sent to the server on activity failure
ActivityExtraRequests struct {
RetryDelay time.Duration
}

// RegisterActivityOptions consists of options for registering an activity
RegisterActivityOptions struct {
// When an activity is a function the name is an actual activity type name.
Expand Down
2 changes: 1 addition & 1 deletion internal/cmd/build/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ require (
github.com/robfig/cron v1.2.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/stretchr/testify v1.8.4 // indirect
go.temporal.io/api v1.26.1 // indirect
go.temporal.io/api v1.26.1-0.20240103185939-608bdd111e4b // indirect
go.uber.org/atomic v1.9.0 // indirect
golang.org/x/exp v0.0.0-20231127185646-65229373498e // indirect
golang.org/x/exp/typeparams v0.0.0-20221208152030-732eee02a75a // indirect
Expand Down
4 changes: 2 additions & 2 deletions internal/cmd/build/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.temporal.io/api v1.26.1 h1:YqGQsOr/Tx4nVdA8wCv74AxesaIzCRHWb3KkHrYqI8k=
go.temporal.io/api v1.26.1/go.mod h1:Y/rALXTprFO+bvAlAfLFoJj7KpQIcL4GDQVN6fhYIa4=
go.temporal.io/api v1.26.1-0.20240103185939-608bdd111e4b h1:Fi5NWG08z7pfxBolgjchVp4PnmWrGIHjqboDlXve5Sg=
go.temporal.io/api v1.26.1-0.20240103185939-608bdd111e4b/go.mod h1:mix7Bpl8mFEfYud66rjYInxNpP43sqXvr1UiHv+mur0=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
Expand Down
34 changes: 34 additions & 0 deletions internal/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ Workflow consumers will get an instance of *WorkflowExecutionError. This error w
*/

type (
ApplicationErrorAttributes struct {
NonRetryable bool
Cause error
Details []interface{}
}

// ApplicationError returned from activity implementations with message and optional details.
ApplicationError struct {
temporalError
Expand All @@ -127,6 +133,7 @@ type (
nonRetryable bool
cause error
details converter.EncodedValues
extra ExtraRequests
}

// TimeoutError returned when activity or child workflow timed out.
Expand Down Expand Up @@ -303,6 +310,33 @@ func NewApplicationError(msg string, errType string, nonRetryable bool, cause er
return applicationErr
}

func NewApplicationErrorWithExtraRequests(
msg string,
errType string,
attributes ApplicationErrorAttributes,
requests ExtraRequests,
) error {
applicationErr := &ApplicationError{
msg: msg,
errType: errType,
cause: attributes.Cause,
nonRetryable: attributes.NonRetryable,
extra: requests,
}
// When return error to user, use EncodedValues as details and data is ready to be decoded by calling Get
details := attributes.Details
if len(details) == 1 {
if d, ok := details[0].(*EncodedValues); ok {
applicationErr.details = d
return applicationErr
}
}

// When create error for server, use ErrorDetailsValues as details to hold values and encode later
applicationErr.details = ErrorDetailsValues(details)
return applicationErr
}

// NewTimeoutError creates TimeoutError instance.
// Use NewHeartbeatTimeoutError to create heartbeat TimeoutError.
func NewTimeoutError(msg string, timeoutType enumspb.TimeoutType, cause error, lastHeartbeatDetails ...interface{}) error {
Expand Down
35 changes: 35 additions & 0 deletions internal/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

commandpb "go.temporal.io/api/command/v1"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
Expand Down Expand Up @@ -637,6 +638,40 @@ func Test_convertErrorToFailure_ApplicationError(t *testing.T) {
require.Equal("cause error", applicationErr.Error())
}

func Test_convertErrorToFailure_ApplicationErrorWithExtraRequests(t *testing.T) {
require := require.New(t)
fc := GetDefaultFailureConverter()

err := NewApplicationErrorWithExtraRequests(
"message",
"customType",
ApplicationErrorAttributes{
NonRetryable: true,
Cause: errors.New("cause error"),
Details: []interface{}{"details", 2208},
},
ActivityExtraRequests{},
)
f := fc.ErrorToFailure(err)
require.Equal("message", f.GetMessage())
require.Equal("customType", f.GetApplicationFailureInfo().GetType())
require.True(f.GetApplicationFailureInfo().GetNonRetryable())
require.Equal([]byte(`"details"`), f.GetApplicationFailureInfo().GetDetails().GetPayloads()[0].GetData())
require.Equal([]byte(`2208`), f.GetApplicationFailureInfo().GetDetails().GetPayloads()[1].GetData())
require.Equal("cause error", f.GetCause().GetMessage())
require.Equal("", f.GetCause().GetApplicationFailureInfo().GetType())
require.Nil(f.GetCause().GetCause())

err2 := fc.FailureToError(f)
var applicationErr *ApplicationError
require.True(errors.As(err2, &applicationErr))
require.Equal("message (type: customType, retryable: false): cause error", applicationErr.Error())

err2 = errors.Unwrap(err2)
require.True(errors.As(err2, &applicationErr))
require.Equal("cause error", applicationErr.Error())
}

func Test_convertErrorToFailure_EncodeMessage(t *testing.T) {
require := require.New(t)

Expand Down
4 changes: 4 additions & 0 deletions internal/extra_requests.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package internal

type ExtraRequests interface {
}
3 changes: 2 additions & 1 deletion internal/failure_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"errors"

"google.golang.org/protobuf/proto"

commonpb "go.temporal.io/api/common/v1"
failurepb "go.temporal.io/api/failure/v1"
"go.temporal.io/sdk/converter"
Expand Down Expand Up @@ -92,7 +93,7 @@ func (dfc *DefaultFailureConverter) ErrorToFailure(err error) *failurepb.Failure
case *ApplicationError:
failureInfo := &failurepb.ApplicationFailureInfo{
Type: err.errType,
NonRetryable: err.nonRetryable,
NonRetryable: err.NonRetryable(),
Details: convertErrDetailsToPayloads(err.details, dfc.dataConverter),
}
failure.FailureInfo = &failurepb.Failure_ApplicationFailureInfo{ApplicationFailureInfo: failureInfo}
Expand Down
33 changes: 24 additions & 9 deletions internal/internal_task_pollers.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@ import (
"google.golang.org/protobuf/types/known/wrapperspb"

"github.com/pborman/uuid"

commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
historypb "go.temporal.io/api/history/v1"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
"go.temporal.io/api/workflowservice/v1"

"go.temporal.io/sdk/converter"
"go.temporal.io/sdk/internal/common/metrics"
"go.temporal.io/sdk/internal/common/serializer"
Expand Down Expand Up @@ -1035,23 +1037,23 @@ func reportActivityComplete(
}

var reportErr error
switch request := request.(type) {
switch rqst := request.(type) {
case *workflowservice.RespondActivityTaskCanceledRequest:
grpcCtx, cancel := newGRPCContext(ctx, grpcMetricsHandler(rpcMetricsHandler),
defaultGrpcRetryParameters(ctx))
defer cancel()
_, err := service.RespondActivityTaskCanceled(grpcCtx, request)
_, err := service.RespondActivityTaskCanceled(grpcCtx, rqst)
reportErr = err
case *workflowservice.RespondActivityTaskFailedRequest:
grpcCtx, cancel := newGRPCContext(ctx, grpcMetricsHandler(rpcMetricsHandler), defaultGrpcRetryParameters(ctx))
defer cancel()
_, err := service.RespondActivityTaskFailed(grpcCtx, request)
_, err := service.RespondActivityTaskFailed(grpcCtx, rqst)
reportErr = err
case *workflowservice.RespondActivityTaskCompletedRequest:
grpcCtx, cancel := newGRPCContext(ctx, grpcMetricsHandler(rpcMetricsHandler),
defaultGrpcRetryParameters(ctx))
defer cancel()
_, err := service.RespondActivityTaskCompleted(grpcCtx, request)
_, err := service.RespondActivityTaskCompleted(grpcCtx, rqst)
reportErr = err
}
return reportErr
Expand Down Expand Up @@ -1148,11 +1150,12 @@ func convertActivityResultToRespondRequest(
}

return &workflowservice.RespondActivityTaskFailedRequest{
TaskToken: taskToken,
Failure: failureConverter.ErrorToFailure(err),
Identity: identity,
Namespace: namespace,
WorkerVersion: versionStamp,
TaskToken: taskToken,
Failure: failureConverter.ErrorToFailure(err),
Identity: identity,
Namespace: namespace,
WorkerVersion: versionStamp,
NextRetryDelay: extractActivityRequestsFrom(err),
}
}

Expand Down Expand Up @@ -1224,3 +1227,15 @@ func convertActivityResultToRespondRequestByID(
Identity: identity,
}
}

func extractActivityRequestsFrom(err error) *durationpb.Duration {
applicationError, ok := err.(*ApplicationError)
if !ok {
return nil
}
activityExtra, ok := applicationError.extra.(*ActivityExtraRequests)
if !ok {
return nil
}
return durationpb.New(activityExtra.RetryDelay)
}
13 changes: 13 additions & 0 deletions temporal/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ type (
// PanicError contains information about panicked workflow/activity.
PanicError = internal.PanicError

ExtraRequests = internal.ExtraRequests

// UnknownExternalWorkflowExecutionError can be returned when external workflow doesn't exist
UnknownExternalWorkflowExecutionError = internal.UnknownExternalWorkflowExecutionError
)
Expand All @@ -166,6 +168,17 @@ var (
ErrSkipScheduleUpdate = internal.ErrSkipScheduleUpdate
)

// ApplicationErrorAttributes should be used to set all the desired attributes of a new ApplicationError
// To get a new instance use ErrorAttributes function
type ApplicationErrorAttributes = internal.ApplicationErrorAttributes

// NewApplicationErrorWithExtraRequests creates new instance of *ApplicationError type, all the attributes of the
// newly created error could be controlled through instance of ApplicationErrorAttributes.
// This function also receives some extra requests. See activity.ActivityExtraRequests for details.
func NewApplicationErrorWithExtraRequests(msg, errType string, attributes ApplicationErrorAttributes, extras ExtraRequests) error {
return internal.NewApplicationErrorWithExtraRequests(msg, errType, attributes, extras)
}

// NewApplicationError creates new instance of retryable *ApplicationError with message, type, and optional details.
// Use ApplicationError for any use case specific errors that cross activity and child workflow boundaries.
// errType can be used to control if error is retryable or not. Add the same type in to RetryPolicy.NonRetryableErrorTypes
Expand Down

0 comments on commit d5e4f14

Please sign in to comment.