Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] remove useless panics #324

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 20 additions & 16 deletions statefun-sdk-go/v3/pkg/statefun/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,23 +42,23 @@ type Context interface {
Caller() *Address

// Send forwards out a MessageBuilder to another function.
Send(message MessageBuilder)
Send(message MessageBuilder) error

// SendAfter forwards out a MessageBuilder to another function, after a specified time.Duration delay.
SendAfter(delay time.Duration, message MessageBuilder)
SendAfter(delay time.Duration, message MessageBuilder) error

// SendAfterWithCancellationToken forwards out a MessageBuilder to another function,
// after a specified time.Duration delay. The message is tagged with a non-empty,
//unique token to attach to this message, to be used for message cancellation
SendAfterWithCancellationToken(delay time.Duration, token CancellationToken, message MessageBuilder)
SendAfterWithCancellationToken(delay time.Duration, token CancellationToken, message MessageBuilder) error

// CancelDelayedMessage cancels a delayed message (a message that was send via SendAfterWithCancellationToken).
// NOTE: this is a best-effort operation, since the message might have been already delivered.
// If the message was delivered, this is a no-op operation.
CancelDelayedMessage(token CancellationToken)

// SendEgress forwards out an EgressBuilder to an egress.
SendEgress(egress EgressBuilder)
SendEgress(egress EgressBuilder) error

// Storage returns the AddressScopedStorage, providing access to stored values scoped to the
// current invoked function instance's Address (which is obtainable using Self()).
Expand Down Expand Up @@ -86,11 +86,10 @@ func (s *statefunContext) Caller() *Address {
return s.caller
}

func (s *statefunContext) Send(message MessageBuilder) {
func (s *statefunContext) Send(message MessageBuilder) error {
msg, err := message.ToMessage()

if err != nil {
panic(err)
return err
}

invocation := &protocol.FromFunction_Invocation{
Expand All @@ -101,13 +100,14 @@ func (s *statefunContext) Send(message MessageBuilder) {
s.Lock()
s.response.OutgoingMessages = append(s.response.OutgoingMessages, invocation)
s.Unlock()

return nil
}

func (s *statefunContext) SendAfter(delay time.Duration, message MessageBuilder) {
func (s *statefunContext) SendAfter(delay time.Duration, message MessageBuilder) error {
msg, err := message.ToMessage()

if err != nil {
panic(err)
return err
}

invocation := &protocol.FromFunction_DelayedInvocation{
Expand All @@ -119,13 +119,14 @@ func (s *statefunContext) SendAfter(delay time.Duration, message MessageBuilder)
s.Lock()
s.response.DelayedInvocations = append(s.response.DelayedInvocations, invocation)
s.Unlock()

return nil
}

func (s *statefunContext) SendAfterWithCancellationToken(delay time.Duration, token CancellationToken, message MessageBuilder) {
func (s *statefunContext) SendAfterWithCancellationToken(delay time.Duration, token CancellationToken, message MessageBuilder) error {
msg, err := message.ToMessage()

if err != nil {
panic(err)
return err
}

invocation := &protocol.FromFunction_DelayedInvocation{
Expand All @@ -138,6 +139,8 @@ func (s *statefunContext) SendAfterWithCancellationToken(delay time.Duration, to
s.Lock()
s.response.DelayedInvocations = append(s.response.DelayedInvocations, invocation)
s.Unlock()

return nil
}

func (s *statefunContext) CancelDelayedMessage(token CancellationToken) {
Expand All @@ -151,16 +154,17 @@ func (s *statefunContext) CancelDelayedMessage(token CancellationToken) {
s.Unlock()
}

func (s *statefunContext) SendEgress(egress EgressBuilder) {
func (s *statefunContext) SendEgress(egress EgressBuilder) error {
msg, err := egress.toEgressMessage()

if err != nil {
panic(err)
return err
}

s.Lock()
s.response.OutgoingEgresses = append(s.response.OutgoingEgresses, msg)
s.Unlock()

return nil
}

// DeriveContext derives a new statefun.Context from an existing one, replacing
Expand Down
14 changes: 7 additions & 7 deletions statefun-sdk-go/v3/pkg/statefun/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestStatefunContext_Send(t *testing.T) {

msg := MessageBuilder{
Target: Address{
FunctionType: TypeNameFrom("example/func"),
FunctionType: MustParseTypeName("example/func"),
Id: "0",
},
Value: "hello",
Expand All @@ -63,7 +63,7 @@ func TestStatefunContext_SendAfter(t *testing.T) {

msg := MessageBuilder{
Target: Address{
FunctionType: TypeNameFrom("example/func"),
FunctionType: MustParseTypeName("example/func"),
Id: "0",
},
Value: "hello",
Expand All @@ -88,7 +88,7 @@ func TestStatefunContext_SendAfterWithCancellationTokenMessage(t *testing.T) {

msg := MessageBuilder{
Target: Address{
FunctionType: TypeNameFrom("example/func"),
FunctionType: MustParseTypeName("example/func"),
Id: "0",
},
Value: "hello",
Expand Down Expand Up @@ -127,7 +127,7 @@ func TestStatefunContext_SendEgress_Kafka(t *testing.T) {
context := createContext()

kafka := &KafkaEgressBuilder{
Target: TypeNameFrom("example/kafka"),
Target: MustParseTypeName("example/kafka"),
Topic: "topic",
Key: "key",
Value: "value",
Expand All @@ -151,7 +151,7 @@ func TestStatefunContext_SendEgress_Kinesis(t *testing.T) {
context := createContext()

kafka := &KinesisEgressBuilder{
Target: TypeNameFrom("example/kinesis"),
Target: MustParseTypeName("example/kinesis"),
Stream: "stream",
PartitionKey: "key",
Value: "value",
Expand Down Expand Up @@ -203,8 +203,8 @@ func createContext() *statefunContext {
return &statefunContext{
Context: context.WithValue(context.Background(), testContextKey1, testContextValue1),
Mutex: new(sync.Mutex),
caller: &Address{FunctionType: TypeNameFrom("namespace/function1"), Id: "1"},
self: Address{FunctionType: TypeNameFrom("namespace/function2"), Id: "2"},
caller: &Address{FunctionType: MustParseTypeName("namespace/function1"), Id: "1"},
self: Address{FunctionType: MustParseTypeName("namespace/function2"), Id: "2"},
storage: new(storage),
response: &protocol.FromFunction_InvocationResponse{},
}
Expand Down
11 changes: 6 additions & 5 deletions statefun-sdk-go/v3/pkg/statefun/egress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@
package statefun

import (
"testing"

"github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun/internal/protocol"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/proto"
"testing"
)

func TestKafkaEgressBuilder(t *testing.T) {
k := KafkaEgressBuilder{
Target: TypeNameFrom("example/target"),
Target: MustParseTypeName("example/target"),
Topic: "topic",
Key: "key",
Value: "value",
Expand All @@ -45,7 +46,7 @@ func TestKafkaEgressBuilder(t *testing.T) {

func TestKafkaEgressBuilderInvalidString(t *testing.T) {
k := KafkaEgressBuilder{
Target: TypeNameFrom("example/target"),
Target: MustParseTypeName("example/target"),
Topic: "topic",
Key: "key",
Value: string([]byte{0xff, 0xfe, 0xfd}),
Expand All @@ -57,7 +58,7 @@ func TestKafkaEgressBuilderInvalidString(t *testing.T) {

func TestKinesisEgressBuilder(t *testing.T) {
k := KinesisEgressBuilder{
Target: TypeNameFrom("example/target"),
Target: MustParseTypeName("example/target"),
Stream: "stream",
PartitionKey: "key",
Value: "value",
Expand All @@ -77,7 +78,7 @@ func TestKinesisEgressBuilder(t *testing.T) {

func TestKinesisEgressBuilderInvalidString(t *testing.T) {
k := KinesisEgressBuilder{
Target: TypeNameFrom("example/target"),
Target: MustParseTypeName("example/target"),
Stream: "stream",
PartitionKey: "key",
Value: string([]byte{0xff, 0xfe, 0xfd}),
Expand Down
6 changes: 3 additions & 3 deletions statefun-sdk-go/v3/pkg/statefun/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func invokeStatefulFunction(ctx context.Context, target *Address, caller *Addres

func TestStatefunHandler_WithNoCaller_ContextCallerIsNil(t *testing.T) {

target := Address{FunctionType: TypeNameFrom("namespace/function1"), Id: "1"}
target := Address{FunctionType: MustParseTypeName("namespace/function1"), Id: "1"}

statefulFunction := func(ctx Context, message Message) error {
assert.Nil(t, ctx.Caller())
Expand All @@ -76,8 +76,8 @@ func TestStatefunHandler_WithNoCaller_ContextCallerIsNil(t *testing.T) {

func TestStatefunHandler_WithCaller_ContextCallerIsCorrect(t *testing.T) {

target := Address{FunctionType: TypeNameFrom("namespace/function1"), Id: "1"}
caller := Address{FunctionType: TypeNameFrom("namespace/function2"), Id: "2"}
target := Address{FunctionType: MustParseTypeName("namespace/function1"), Id: "1"}
caller := Address{FunctionType: MustParseTypeName("namespace/function2"), Id: "2"}

statefulFunction := func(ctx Context, message Message) error {
assert.Equal(t, caller.String(), ctx.Caller().String())
Expand Down
40 changes: 21 additions & 19 deletions statefun-sdk-go/v3/pkg/statefun/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"errors"
"fmt"

"github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun/internal/protocol"
)

Expand Down Expand Up @@ -88,73 +89,73 @@ func (m *Message) IsBool() bool {
return m.Is(BoolType)
}

func (m *Message) AsBool() bool {
func (m *Message) AsBool() (bool, error) {
var receiver bool
if err := BoolType.Deserialize(bytes.NewReader(m.typedValue.Value), &receiver); err != nil {
panic(fmt.Errorf("failed to deserialize message: %w", err))
return false, fmt.Errorf("failed to deserialize message: %w", err)
}
return receiver
return receiver, nil
}

func (m *Message) IsInt32() bool {
return m.Is(Int32Type)
}

func (m *Message) AsInt32() int32 {
func (m *Message) AsInt32() (int32, error) {
var receiver int32
if err := Int32Type.Deserialize(bytes.NewReader(m.typedValue.Value), &receiver); err != nil {
panic(fmt.Errorf("failed to deserialize message: %w", err))
return 0, fmt.Errorf("failed to deserialize message: %w", err)
}
return receiver
return receiver, nil
}

func (m *Message) IsInt64() bool {
return m.Is(Int64Type)
}

func (m *Message) AsInt64() int64 {
func (m *Message) AsInt64() (int64, error) {
var receiver int64
if err := Int64Type.Deserialize(bytes.NewReader(m.typedValue.Value), &receiver); err != nil {
panic(fmt.Errorf("failed to deserialize message: %w", err))
return 0, fmt.Errorf("failed to deserialize message: %w", err)
}
return receiver
return receiver, nil
}

func (m *Message) IsFloat32() bool {
return m.Is(Float32Type)
}

func (m *Message) AsFloat32() float32 {
func (m *Message) AsFloat32() (float32, error) {
var receiver float32
if err := Float32Type.Deserialize(bytes.NewReader(m.typedValue.Value), &receiver); err != nil {
panic(fmt.Errorf("failed to deserialize message: %w", err))
return 0, fmt.Errorf("failed to deserialize message: %w", err)
}
return receiver
return receiver, nil
}

func (m *Message) IsFloat64() bool {
return m.Is(Float64Type)
}

func (m *Message) AsFloat64() float64 {
func (m *Message) AsFloat64() (float64, error) {
var receiver float64
if err := Float64Type.Deserialize(bytes.NewReader(m.typedValue.Value), &receiver); err != nil {
panic(fmt.Errorf("failed to deserialize message: %w", err))
return 0, fmt.Errorf("failed to deserialize message: %w", err)
}
return receiver
return receiver, nil
}

func (m *Message) IsString() bool {
return m.Is(StringType)
}

func (m *Message) AsString() string {
func (m *Message) AsString() (string, error) {
var receiver string
if err := StringType.Deserialize(bytes.NewReader(m.typedValue.Value), &receiver); err != nil {
panic(fmt.Errorf("failed to deserialize message: %w", err))
return "", fmt.Errorf("failed to deserialize message: %w", err)
}

return receiver
return receiver, nil
}

func (m *Message) Is(t SimpleType) bool {
Expand All @@ -166,7 +167,8 @@ func (m *Message) As(t SimpleType, receiver interface{}) error {
}

func (m *Message) ValueTypeName() TypeName {
return TypeNameFrom(m.typedValue.Typename)
typename, _ := ParseTypeName(m.typedValue.Typename)
return typename
}

func (m *Message) RawValue() []byte {
Expand Down
7 changes: 4 additions & 3 deletions statefun-sdk-go/v3/pkg/statefun/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
package statefun

import (
"github.com/stretchr/testify/assert"
"testing"

"github.com/stretchr/testify/assert"
)

func TestBasicIntMessage(t *testing.T) {
Expand All @@ -36,7 +37,7 @@ func TestBasicIntMessage(t *testing.T) {
assert.NoError(t, err)
assert.True(t, message.IsInt32())

value := message.AsInt32()
value, _ := message.AsInt32()
assert.Equal(t, value, int32(1))
}

Expand All @@ -56,6 +57,6 @@ func TestMessageWithType(t *testing.T) {
assert.NoError(t, err)
assert.True(t, message.IsFloat32())

value := message.AsFloat32()
value, _ := message.AsFloat32()
assert.Equal(t, value, float32(5.0))
}
Loading