Skip to content

Commit

Permalink
Nexus: Better handling of nil and empty payloads (#6616)
Browse files Browse the repository at this point in the history
## What changed?

Improve conversion of payloads with empty metadata and nil workflow
results to Nexus Content.

NOTE: This also upgrades the Go SDK from 1.27.0 to 1.29.1 to test Nexus
via the SDK.

## Why?

Handle a couple of edge cases that weren't properly handled before.
The fix in `mutable_state_impl.go` is the more critical one and may
cause surprising behavior in the SDKs.

## How did you test it?

Added tests.
  • Loading branch information
bergundy authored Oct 7, 2024
1 parent d1a2636 commit abf9dc4
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 36 deletions.
6 changes: 4 additions & 2 deletions common/nexus/payload_serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,13 @@ func (payloadSerializer) Serialize(v any) (*nexus.Content, error) {
return nil, fmt.Errorf("%w: cannot serialize %v", errSerializer, v)
}

// Use the "nil" Nexus Content representation for nil Payloads.
if payload == nil {
return &nexus.Content{}, nil
// Use same structure as the nil serializer from the Nexus Go SDK.
return &nexus.Content{Header: nexus.Header{}}, nil
}

if payload.GetMetadata() == nil {
if len(payload.GetMetadata()) == 0 {
return xTemporalPayload(payload)
}

Expand Down
18 changes: 13 additions & 5 deletions common/nexus/payload_serializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ func TestNexusPayloadSerializer(t *testing.T) {
inputPayload: mustToPayload(t, nil),
header: nexus.Header{},
},
{
// Empty payload is preserved.
name: "empty",
inputPayload: &commonpb.Payload{},
expectedPayload: &commonpb.Payload{},
header: nexus.Header{"type": "application/x-temporal-payload"},
},
{
name: "json proto",
inputPayload: mustToPayload(t, commonpb.RetryPolicy{}),
Expand Down Expand Up @@ -104,15 +111,16 @@ func TestNexusPayloadSerializer(t *testing.T) {
"encoding": []byte("binary/null"),
},
},
// Yes this is the default value, but this test should have an explicit expectation.
header: nil,
header: nexus.Header{},
},
{
name: "nil metadata",
inputPayload: &commonpb.Payload{},
name: "nil metadata non-empty data",
inputPayload: &commonpb.Payload{
Data: []byte("not empty"),
},
expectedPayload: &commonpb.Payload{
Metadata: map[string][]byte{},
Data: []byte{},
Data: []byte("not empty"),
},
header: nexus.Header{"type": "application/x-temporal-payload"},
},
Expand Down
13 changes: 13 additions & 0 deletions common/testing/mocksdk/worker_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ require (
go.opentelemetry.io/otel/sdk/metric v1.27.0
go.opentelemetry.io/otel/trace v1.27.0
go.temporal.io/api v1.39.1-0.20240910163028-b13574e18f3c
go.temporal.io/sdk v1.27.0
go.temporal.io/sdk v1.29.1
go.temporal.io/version v0.3.0
go.uber.org/atomic v1.11.0
go.uber.org/automaxprocs v1.5.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -323,8 +323,8 @@ go.opentelemetry.io/proto/otlp v1.2.0 h1:pVeZGk7nXDC9O2hncA6nHldxEjm6LByfA2aN8IO
go.opentelemetry.io/proto/otlp v1.2.0/go.mod h1:gGpR8txAl5M03pDhMC79G6SdqNV26naRm/KDsgaHD8A=
go.temporal.io/api v1.39.1-0.20240910163028-b13574e18f3c h1:1mORGBzFqhOn4I+Yg/KT07UQtHV59wptGxsPRB0rGIk=
go.temporal.io/api v1.39.1-0.20240910163028-b13574e18f3c/go.mod h1:1WwYUMo6lao8yl0371xWUm13paHExN5ATYT/B7QtFis=
go.temporal.io/sdk v1.27.0 h1:C5oOE/IRyLcZaFoB13kEHsjvSHEnGcwT6bNys0HFFHk=
go.temporal.io/sdk v1.27.0/go.mod h1:PnOq5f3dWuU2NAbY+yczXkIeycsIIdBtoCO62ZE0aak=
go.temporal.io/sdk v1.29.1 h1:y+sUMbUhTU9rj50mwIZAPmcXCtgUdOWS9xHDYRYSgZ0=
go.temporal.io/sdk v1.29.1/go.mod h1:kp//DRvn3CqQVBCtjL51Oicp9wrZYB2s6row1UgzcKQ=
go.temporal.io/version v0.3.0 h1:dMrei9l9NyHt8nG6EB8vAwDLLTwx2SvRyucCSumAiig=
go.temporal.io/version v0.3.0/go.mod h1:UA9S8/1LaKYae6TyD9NaPMJTZb911JcbqghI2CBSP78=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
Expand Down
4 changes: 1 addition & 3 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,14 +575,12 @@ func (ms *MutableStateImpl) GetNexusCompletion(ctx context.Context) (nexus.Opera
switch ce.GetEventType() {
case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED:
payloads := ce.GetWorkflowExecutionCompletedEventAttributes().GetResult().GetPayloads()
var p *commonpb.Payload
var p *commonpb.Payload // default to nil, the payload serializer converts nil to Nexus nil Content.
if len(payloads) > 0 {
// All of our SDKs support returning a single value from workflows, we can safely ignore the
// rest of the payloads. Additionally, even if a workflow could return more than a single value,
// Nexus does not support it.
p = payloads[0]
} else {
p = &commonpb.Payload{}
}
completion, err := nexus.NewOperationCompletionSuccessful(p, nexus.OperationCompletionSuccesfulOptions{
Serializer: commonnexus.PayloadSerializer,
Expand Down
Loading

0 comments on commit abf9dc4

Please sign in to comment.