diff --git a/activity/activity.go b/activity/activity.go index 812fd39d4..8661356e4 100644 --- a/activity/activity.go +++ b/activity/activity.go @@ -41,6 +41,10 @@ type ( // RegisterOptions consists of options for registering an activity RegisterOptions = internal.RegisterActivityOptions + + // ExtraRequests is an additional data which should be sent to the server to influence its decision + // about the next retry + ExtraRequests = internal.ActivityExtraRequests ) // ErrResultPending is returned from activity's implementation to indicate the activity is not completed when diff --git a/contrib/datadog/go.mod b/contrib/datadog/go.mod index fda27a309..f6df65cb8 100644 --- a/contrib/datadog/go.mod +++ b/contrib/datadog/go.mod @@ -37,7 +37,7 @@ require ( github.com/secure-systems-lab/go-securesystemslib v0.7.0 // indirect github.com/stretchr/objx v0.5.1 // indirect github.com/tinylib/msgp v1.1.8 // indirect - go.temporal.io/api v1.26.1 // indirect + go.temporal.io/api v1.26.1-0.20240103185939-608bdd111e4b // indirect go.uber.org/atomic v1.11.0 // indirect go4.org/intern v0.0.0-20230525184215-6c62f75575cb // indirect go4.org/unsafe/assume-no-moving-gc v0.0.0-20230525183740-e7c30c78aeb2 // indirect diff --git a/contrib/datadog/go.sum b/contrib/datadog/go.sum index d8549347e..1cdf2a8a6 100644 --- a/contrib/datadog/go.sum +++ b/contrib/datadog/go.sum @@ -124,8 +124,8 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= -go.temporal.io/api v1.26.1 h1:YqGQsOr/Tx4nVdA8wCv74AxesaIzCRHWb3KkHrYqI8k= -go.temporal.io/api v1.26.1/go.mod h1:Y/rALXTprFO+bvAlAfLFoJj7KpQIcL4GDQVN6fhYIa4= +go.temporal.io/api v1.26.1-0.20240103185939-608bdd111e4b h1:Fi5NWG08z7pfxBolgjchVp4PnmWrGIHjqboDlXve5Sg= +go.temporal.io/api v1.26.1-0.20240103185939-608bdd111e4b/go.mod h1:mix7Bpl8mFEfYud66rjYInxNpP43sqXvr1UiHv+mur0= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= @@ -250,10 +250,10 @@ google.golang.org/grpc v1.60.1/go.mod h1:OlCHIeLYqSSsLi6i49B5QGdzaMZK9+M7LXN2FKz google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= -gopkg.in/DataDog/dd-trace-go.v1 v1.58.1 h1:zhVNyN5V9G7LVuDh44q3wkcbQwtjIsmmUCieayojNYo= -gopkg.in/DataDog/dd-trace-go.v1 v1.58.1/go.mod h1:SmnEjjV9ZQr4MWRSUYEpoPyNtmtRK5J6UuJdAma+Yxw= google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I= google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +gopkg.in/DataDog/dd-trace-go.v1 v1.58.1 h1:zhVNyN5V9G7LVuDh44q3wkcbQwtjIsmmUCieayojNYo= +gopkg.in/DataDog/dd-trace-go.v1 v1.58.1/go.mod h1:SmnEjjV9ZQr4MWRSUYEpoPyNtmtRK5J6UuJdAma+Yxw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/contrib/opentelemetry/go.mod b/contrib/opentelemetry/go.mod index e7dcb34db..9d7a64643 100644 --- a/contrib/opentelemetry/go.mod +++ b/contrib/opentelemetry/go.mod @@ -11,10 +11,13 @@ require ( ) require ( - github.com/davecgh/go-spew v1.1.1 // indirect - github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a // indirect github.com/go-logr/logr v1.3.0 // indirect github.com/go-logr/stdr v1.2.2 // indirect +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/mock v1.6.0 // indirect github.com/golang/protobuf v1.5.3 // indirect @@ -27,7 +30,7 @@ require ( github.com/stretchr/objx v0.5.0 // indirect go.opentelemetry.io/otel/metric v1.21.0 go.opentelemetry.io/otel/sdk/metric v1.21.0 - go.temporal.io/api v1.26.1 // indirect + go.temporal.io/api v1.26.1-0.20240103185939-608bdd111e4b // indirect go.uber.org/atomic v1.9.0 // indirect golang.org/x/exp v0.0.0-20231127185646-65229373498e // indirect golang.org/x/net v0.19.0 // indirect diff --git a/contrib/opentelemetry/go.sum b/contrib/opentelemetry/go.sum index b69cf4ffb..6b366b3b8 100644 --- a/contrib/opentelemetry/go.sum +++ b/contrib/opentelemetry/go.sum @@ -84,8 +84,8 @@ go.opentelemetry.io/otel/sdk/metric v1.21.0 h1:smhI5oD714d6jHE6Tie36fPx4WDFIg+Y6 go.opentelemetry.io/otel/sdk/metric v1.21.0/go.mod h1:FJ8RAsoPGv/wYMgBdUJXOm+6pzFY3YdljnXtv1SBE8Q= go.opentelemetry.io/otel/trace v1.21.0 h1:WD9i5gzvoUPuXIXH24ZNBudiarZDKuekPqi/E8fpfLc= go.opentelemetry.io/otel/trace v1.21.0/go.mod h1:LGbsEB0f9LGjN+OZaQQ26sohbOmiMR+BaslueVtS/qQ= -go.temporal.io/api v1.26.1 h1:YqGQsOr/Tx4nVdA8wCv74AxesaIzCRHWb3KkHrYqI8k= -go.temporal.io/api v1.26.1/go.mod h1:Y/rALXTprFO+bvAlAfLFoJj7KpQIcL4GDQVN6fhYIa4= +go.temporal.io/api v1.26.1-0.20240103185939-608bdd111e4b h1:Fi5NWG08z7pfxBolgjchVp4PnmWrGIHjqboDlXve5Sg= +go.temporal.io/api v1.26.1-0.20240103185939-608bdd111e4b/go.mod h1:mix7Bpl8mFEfYud66rjYInxNpP43sqXvr1UiHv+mur0= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= diff --git a/contrib/opentracing/go.mod b/contrib/opentracing/go.mod index 7765750a5..7cfa498a4 100644 --- a/contrib/opentracing/go.mod +++ b/contrib/opentracing/go.mod @@ -21,7 +21,7 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/robfig/cron v1.2.0 // indirect github.com/stretchr/objx v0.5.0 // indirect - go.temporal.io/api v1.26.1 // indirect + go.temporal.io/api v1.26.1-0.20240103185939-608bdd111e4b // indirect go.uber.org/atomic v1.9.0 // indirect golang.org/x/exp v0.0.0-20231127185646-65229373498e // indirect golang.org/x/net v0.19.0 // indirect diff --git a/contrib/opentracing/go.sum b/contrib/opentracing/go.sum index 58f08e6ee..9b7c20225 100644 --- a/contrib/opentracing/go.sum +++ b/contrib/opentracing/go.sum @@ -71,8 +71,8 @@ github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXl github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= -go.temporal.io/api v1.26.1 h1:YqGQsOr/Tx4nVdA8wCv74AxesaIzCRHWb3KkHrYqI8k= -go.temporal.io/api v1.26.1/go.mod h1:Y/rALXTprFO+bvAlAfLFoJj7KpQIcL4GDQVN6fhYIa4= +go.temporal.io/api v1.26.1-0.20240103185939-608bdd111e4b h1:Fi5NWG08z7pfxBolgjchVp4PnmWrGIHjqboDlXve5Sg= +go.temporal.io/api v1.26.1-0.20240103185939-608bdd111e4b/go.mod h1:mix7Bpl8mFEfYud66rjYInxNpP43sqXvr1UiHv+mur0= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= diff --git a/contrib/tally/go.mod b/contrib/tally/go.mod index 91450655b..b4ab6f912 100644 --- a/contrib/tally/go.mod +++ b/contrib/tally/go.mod @@ -22,7 +22,7 @@ require ( github.com/robfig/cron v1.2.0 // indirect github.com/stretchr/objx v0.5.0 // indirect github.com/twmb/murmur3 v1.1.5 // indirect - go.temporal.io/api v1.26.1 // indirect + go.temporal.io/api v1.26.1-0.20240103185939-608bdd111e4b // indirect go.uber.org/atomic v1.9.0 // indirect golang.org/x/exp v0.0.0-20231127185646-65229373498e // indirect golang.org/x/net v0.19.0 // indirect diff --git a/contrib/tally/go.sum b/contrib/tally/go.sum index cbb725734..6b9135322 100644 --- a/contrib/tally/go.sum +++ b/contrib/tally/go.sum @@ -137,8 +137,8 @@ github.com/uber-go/tally/v4 v4.1.1/go.mod h1:aXeSTDMl4tNosyf6rdU8jlgScHyjEGGtfJ/ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= -go.temporal.io/api v1.26.1 h1:YqGQsOr/Tx4nVdA8wCv74AxesaIzCRHWb3KkHrYqI8k= -go.temporal.io/api v1.26.1/go.mod h1:Y/rALXTprFO+bvAlAfLFoJj7KpQIcL4GDQVN6fhYIa4= +go.temporal.io/api v1.26.1-0.20240103185939-608bdd111e4b h1:Fi5NWG08z7pfxBolgjchVp4PnmWrGIHjqboDlXve5Sg= +go.temporal.io/api v1.26.1-0.20240103185939-608bdd111e4b/go.mod h1:mix7Bpl8mFEfYud66rjYInxNpP43sqXvr1UiHv+mur0= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= diff --git a/internal/activity.go b/internal/activity.go index d5d52a0c3..7ffc19863 100644 --- a/internal/activity.go +++ b/internal/activity.go @@ -60,6 +60,11 @@ type ( IsLocalActivity bool // true if it is a local activity } + // ActivityExtraRequests contains additional data to be sent to the server on activity failure + ActivityExtraRequests struct { + RetryDelay time.Duration + } + // RegisterActivityOptions consists of options for registering an activity RegisterActivityOptions struct { // When an activity is a function the name is an actual activity type name. diff --git a/internal/cmd/build/go.mod b/internal/cmd/build/go.mod index 935a8de21..c83544e93 100644 --- a/internal/cmd/build/go.mod +++ b/internal/cmd/build/go.mod @@ -23,7 +23,7 @@ require ( github.com/robfig/cron v1.2.0 // indirect github.com/stretchr/objx v0.5.0 // indirect github.com/stretchr/testify v1.8.4 // indirect - go.temporal.io/api v1.26.1 // indirect + go.temporal.io/api v1.26.1-0.20240103185939-608bdd111e4b // indirect go.uber.org/atomic v1.9.0 // indirect golang.org/x/exp v0.0.0-20231127185646-65229373498e // indirect golang.org/x/exp/typeparams v0.0.0-20221208152030-732eee02a75a // indirect diff --git a/internal/cmd/build/go.sum b/internal/cmd/build/go.sum index 5594affc6..db5927074 100644 --- a/internal/cmd/build/go.sum +++ b/internal/cmd/build/go.sum @@ -74,8 +74,8 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= -go.temporal.io/api v1.26.1 h1:YqGQsOr/Tx4nVdA8wCv74AxesaIzCRHWb3KkHrYqI8k= -go.temporal.io/api v1.26.1/go.mod h1:Y/rALXTprFO+bvAlAfLFoJj7KpQIcL4GDQVN6fhYIa4= +go.temporal.io/api v1.26.1-0.20240103185939-608bdd111e4b h1:Fi5NWG08z7pfxBolgjchVp4PnmWrGIHjqboDlXve5Sg= +go.temporal.io/api v1.26.1-0.20240103185939-608bdd111e4b/go.mod h1:mix7Bpl8mFEfYud66rjYInxNpP43sqXvr1UiHv+mur0= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= diff --git a/internal/error.go b/internal/error.go index 0f4f485d8..500a16730 100644 --- a/internal/error.go +++ b/internal/error.go @@ -119,6 +119,12 @@ Workflow consumers will get an instance of *WorkflowExecutionError. This error w */ type ( + ApplicationErrorAttributes struct { + NonRetryable bool + Cause error + Details []interface{} + } + // ApplicationError returned from activity implementations with message and optional details. ApplicationError struct { temporalError @@ -127,6 +133,7 @@ type ( nonRetryable bool cause error details converter.EncodedValues + extra ExtraRequests } // TimeoutError returned when activity or child workflow timed out. @@ -303,6 +310,33 @@ func NewApplicationError(msg string, errType string, nonRetryable bool, cause er return applicationErr } +func NewApplicationErrorWithExtraRequests( + msg string, + errType string, + attributes ApplicationErrorAttributes, + requests ExtraRequests, +) error { + applicationErr := &ApplicationError{ + msg: msg, + errType: errType, + cause: attributes.Cause, + nonRetryable: attributes.NonRetryable, + extra: requests, + } + // When return error to user, use EncodedValues as details and data is ready to be decoded by calling Get + details := attributes.Details + if len(details) == 1 { + if d, ok := details[0].(*EncodedValues); ok { + applicationErr.details = d + return applicationErr + } + } + + // When create error for server, use ErrorDetailsValues as details to hold values and encode later + applicationErr.details = ErrorDetailsValues(details) + return applicationErr +} + // NewTimeoutError creates TimeoutError instance. // Use NewHeartbeatTimeoutError to create heartbeat TimeoutError. func NewTimeoutError(msg string, timeoutType enumspb.TimeoutType, cause error, lastHeartbeatDetails ...interface{}) error { diff --git a/internal/error_test.go b/internal/error_test.go index 7a815be35..fd5e0b466 100644 --- a/internal/error_test.go +++ b/internal/error_test.go @@ -31,6 +31,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + commandpb "go.temporal.io/api/command/v1" commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" @@ -637,6 +638,40 @@ func Test_convertErrorToFailure_ApplicationError(t *testing.T) { require.Equal("cause error", applicationErr.Error()) } +func Test_convertErrorToFailure_ApplicationErrorWithExtraRequests(t *testing.T) { + require := require.New(t) + fc := GetDefaultFailureConverter() + + err := NewApplicationErrorWithExtraRequests( + "message", + "customType", + ApplicationErrorAttributes{ + NonRetryable: true, + Cause: errors.New("cause error"), + Details: []interface{}{"details", 2208}, + }, + ActivityExtraRequests{}, + ) + f := fc.ErrorToFailure(err) + require.Equal("message", f.GetMessage()) + require.Equal("customType", f.GetApplicationFailureInfo().GetType()) + require.True(f.GetApplicationFailureInfo().GetNonRetryable()) + require.Equal([]byte(`"details"`), f.GetApplicationFailureInfo().GetDetails().GetPayloads()[0].GetData()) + require.Equal([]byte(`2208`), f.GetApplicationFailureInfo().GetDetails().GetPayloads()[1].GetData()) + require.Equal("cause error", f.GetCause().GetMessage()) + require.Equal("", f.GetCause().GetApplicationFailureInfo().GetType()) + require.Nil(f.GetCause().GetCause()) + + err2 := fc.FailureToError(f) + var applicationErr *ApplicationError + require.True(errors.As(err2, &applicationErr)) + require.Equal("message (type: customType, retryable: false): cause error", applicationErr.Error()) + + err2 = errors.Unwrap(err2) + require.True(errors.As(err2, &applicationErr)) + require.Equal("cause error", applicationErr.Error()) +} + func Test_convertErrorToFailure_EncodeMessage(t *testing.T) { require := require.New(t) diff --git a/internal/extra_requests.go b/internal/extra_requests.go new file mode 100644 index 000000000..0fcf67d1d --- /dev/null +++ b/internal/extra_requests.go @@ -0,0 +1,4 @@ +package internal + +type ExtraRequests interface { +} diff --git a/internal/failure_converter.go b/internal/failure_converter.go index bbcb130c2..41311a764 100644 --- a/internal/failure_converter.go +++ b/internal/failure_converter.go @@ -26,6 +26,7 @@ import ( "errors" "google.golang.org/protobuf/proto" + commonpb "go.temporal.io/api/common/v1" failurepb "go.temporal.io/api/failure/v1" "go.temporal.io/sdk/converter" @@ -92,7 +93,7 @@ func (dfc *DefaultFailureConverter) ErrorToFailure(err error) *failurepb.Failure case *ApplicationError: failureInfo := &failurepb.ApplicationFailureInfo{ Type: err.errType, - NonRetryable: err.nonRetryable, + NonRetryable: err.NonRetryable(), Details: convertErrDetailsToPayloads(err.details, dfc.dataConverter), } failure.FailureInfo = &failurepb.Failure_ApplicationFailureInfo{ApplicationFailureInfo: failureInfo} diff --git a/internal/internal_task_pollers.go b/internal/internal_task_pollers.go index 2cea8f117..7c2dbe0de 100644 --- a/internal/internal_task_pollers.go +++ b/internal/internal_task_pollers.go @@ -37,11 +37,13 @@ import ( "google.golang.org/protobuf/types/known/wrapperspb" "github.com/pborman/uuid" + commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" historypb "go.temporal.io/api/history/v1" taskqueuepb "go.temporal.io/api/taskqueue/v1" "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/sdk/converter" "go.temporal.io/sdk/internal/common/metrics" "go.temporal.io/sdk/internal/common/serializer" @@ -1035,23 +1037,23 @@ func reportActivityComplete( } var reportErr error - switch request := request.(type) { + switch rqst := request.(type) { case *workflowservice.RespondActivityTaskCanceledRequest: grpcCtx, cancel := newGRPCContext(ctx, grpcMetricsHandler(rpcMetricsHandler), defaultGrpcRetryParameters(ctx)) defer cancel() - _, err := service.RespondActivityTaskCanceled(grpcCtx, request) + _, err := service.RespondActivityTaskCanceled(grpcCtx, rqst) reportErr = err case *workflowservice.RespondActivityTaskFailedRequest: grpcCtx, cancel := newGRPCContext(ctx, grpcMetricsHandler(rpcMetricsHandler), defaultGrpcRetryParameters(ctx)) defer cancel() - _, err := service.RespondActivityTaskFailed(grpcCtx, request) + _, err := service.RespondActivityTaskFailed(grpcCtx, rqst) reportErr = err case *workflowservice.RespondActivityTaskCompletedRequest: grpcCtx, cancel := newGRPCContext(ctx, grpcMetricsHandler(rpcMetricsHandler), defaultGrpcRetryParameters(ctx)) defer cancel() - _, err := service.RespondActivityTaskCompleted(grpcCtx, request) + _, err := service.RespondActivityTaskCompleted(grpcCtx, rqst) reportErr = err } return reportErr @@ -1148,11 +1150,12 @@ func convertActivityResultToRespondRequest( } return &workflowservice.RespondActivityTaskFailedRequest{ - TaskToken: taskToken, - Failure: failureConverter.ErrorToFailure(err), - Identity: identity, - Namespace: namespace, - WorkerVersion: versionStamp, + TaskToken: taskToken, + Failure: failureConverter.ErrorToFailure(err), + Identity: identity, + Namespace: namespace, + WorkerVersion: versionStamp, + NextRetryDelay: extractActivityRequestsFrom(err), } } @@ -1224,3 +1227,15 @@ func convertActivityResultToRespondRequestByID( Identity: identity, } } + +func extractActivityRequestsFrom(err error) *durationpb.Duration { + applicationError, ok := err.(*ApplicationError) + if !ok { + return nil + } + activityExtra, ok := applicationError.extra.(*ActivityExtraRequests) + if !ok { + return nil + } + return durationpb.New(activityExtra.RetryDelay) +} diff --git a/temporal/error.go b/temporal/error.go index 27f210d6f..2757d60ec 100644 --- a/temporal/error.go +++ b/temporal/error.go @@ -151,6 +151,8 @@ type ( // PanicError contains information about panicked workflow/activity. PanicError = internal.PanicError + ExtraRequests = internal.ExtraRequests + // UnknownExternalWorkflowExecutionError can be returned when external workflow doesn't exist UnknownExternalWorkflowExecutionError = internal.UnknownExternalWorkflowExecutionError ) @@ -166,6 +168,17 @@ var ( ErrSkipScheduleUpdate = internal.ErrSkipScheduleUpdate ) +// ApplicationErrorAttributes should be used to set all the desired attributes of a new ApplicationError +// To get a new instance use ErrorAttributes function +type ApplicationErrorAttributes = internal.ApplicationErrorAttributes + +// NewApplicationErrorWithExtraRequests creates new instance of *ApplicationError type, all the attributes of the +// newly created error could be controlled through instance of ApplicationErrorAttributes. +// This function also receives some extra requests. See activity.ActivityExtraRequests for details. +func NewApplicationErrorWithExtraRequests(msg, errType string, attributes ApplicationErrorAttributes, extras ExtraRequests) error { + return internal.NewApplicationErrorWithExtraRequests(msg, errType, attributes, extras) +} + // NewApplicationError creates new instance of retryable *ApplicationError with message, type, and optional details. // Use ApplicationError for any use case specific errors that cross activity and child workflow boundaries. // errType can be used to control if error is retryable or not. Add the same type in to RetryPolicy.NonRetryableErrorTypes