Skip to content

Commit

Permalink
Update streaming subscription to understand new initial response (#601)
Browse files Browse the repository at this point in the history
* Update streaming subscription to understand new initial response

Signed-off-by: joshvanl <[email protected]>

* Update dapr CLI to 1.14.0-rc.6

Signed-off-by: joshvanl <[email protected]>

* Update streamsub name in validate examples

Signed-off-by: joshvanl <[email protected]>

* Apply suggestions from code review

Co-authored-by: Mike Nguyen <[email protected]>
Signed-off-by: Josh van Leeuwen <[email protected]>

---------

Signed-off-by: joshvanl <[email protected]>
Signed-off-by: Josh van Leeuwen <[email protected]>
Co-authored-by: Mike Nguyen <[email protected]>
  • Loading branch information
JoshVanL and mikeee authored Jul 23, 2024
1 parent 7c03c7c commit 9bc7d82
Show file tree
Hide file tree
Showing 10 changed files with 37 additions and 23 deletions.
10 changes: 5 additions & 5 deletions .github/workflows/validate_examples.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ jobs:
GOARCH: amd64
GOPROXY: https://proxy.golang.org
DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/master/install/install.sh
DAPR_CLI_REF: 19b9de05611ade540b06d2c061f32f6c37093a17
DAPR_CLI_REF: ${{ github.event.inputs.daprcli_commit }}
DAPR_REF: ${{ github.event.inputs.daprdapr_commit }}
CHECKOUT_REPO: ${{ github.repository }}
CHECKOUT_REF: ${{ github.ref }}
outputs:
DAPR_INSTALL_URL: ${{ env.DAPR_INSTALL_URL }}
DAPR_CLI_VER: ${{ steps.outputs.outputs.DAPR_CLI_VER }}
DAPR_CLI_VER: 1.14.0-rc.6
DAPR_CLI_REF: ${{ steps.outputs.outputs.DAPR_CLI_REF }}
DAPR_RUNTIME_VER: 1.14.0-rc.2
DAPR_RUNTIME_VER: 1.14.0-rc.4
CHECKOUT_REPO: ${{ steps.outputs.outputs.CHECKOUT_REPO }}
CHECKOUT_REF: ${{ steps.outputs.outputs.CHECKOUT_REF }}
DAPR_REF: ${{ steps.outputs.outputs.DAPR_REF }}
Expand Down Expand Up @@ -150,7 +150,7 @@ jobs:
GOPROXY: https://proxy.golang.org
DAPR_INSTALL_URL: ${{ needs.setup.outputs.DAPR_INSTALL_URL }}
DAPR_CLI_VER: ${{ needs.setup.outputs.DAPR_CLI_VER }}
DAPR_RUNTIME_VER: 1.14.0-rc.2
DAPR_RUNTIME_VER: ${{ needs.setup.outputs.DAPR_RUNTIME_VER }}
DAPR_CLI_REF: ${{ needs.setup.outputs.DAPR_CLI_REF }}
DAPR_REF: ${{ needs.setup.outputs.DAPR_REF }}
CHECKOUT_REPO: ${{ needs.setup.outputs.CHECKOUT_REPO }}
Expand All @@ -169,7 +169,7 @@ jobs:
"grpc-service",
"hello-world",
"pubsub",
"bidipubsub",
"streamsub",
"service",
"socket",
"workflow",
Expand Down
26 changes: 20 additions & 6 deletions client/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,11 @@ func (c *GRPCClient) Subscribe(ctx context.Context, opts SubscriptionOptions) (*
return nil, err
}

return &Subscription{
s := &Subscription{
stream: stream,
}, nil
}

return s, nil
}

func (c *GRPCClient) SubscribeWithHandler(ctx context.Context, opts SubscriptionOptions, handler SubscriptionHandleFunction) (func() error, error) {
Expand Down Expand Up @@ -99,10 +101,11 @@ func (s *Subscription) Close() error {
}

func (s *Subscription) Receive() (*SubscriptionMessage, error) {
event, err := s.stream.Recv()
resp, err := s.stream.Recv()
if err != nil {
return nil, err
}
event := resp.GetEventMessage()

data := any(event.GetData())
if len(event.GetData()) > 0 {
Expand Down Expand Up @@ -181,8 +184,8 @@ func (s *SubscriptionMessage) respond(status pb.TopicEventResponse_TopicEventRes
defer s.sub.lock.Unlock()

return s.sub.stream.Send(&pb.SubscribeTopicEventsRequestAlpha1{
SubscribeTopicEventsRequestType: &pb.SubscribeTopicEventsRequestAlpha1_EventResponse{
EventResponse: &pb.SubscribeTopicEventsResponseAlpha1{
SubscribeTopicEventsRequestType: &pb.SubscribeTopicEventsRequestAlpha1_EventProcessed{
EventProcessed: &pb.SubscribeTopicEventsRequestProcessedAlpha1{
Id: s.ID,
Status: &pb.TopicEventResponse{Status: status},
},
Expand All @@ -206,7 +209,7 @@ func (c *GRPCClient) subscribeInitialRequest(ctx context.Context, opts Subscript

err = stream.Send(&pb.SubscribeTopicEventsRequestAlpha1{
SubscribeTopicEventsRequestType: &pb.SubscribeTopicEventsRequestAlpha1_InitialRequest{
InitialRequest: &pb.SubscribeTopicEventsInitialRequestAlpha1{
InitialRequest: &pb.SubscribeTopicEventsRequestInitialAlpha1{
PubsubName: opts.PubsubName, Topic: opts.Topic,
Metadata: opts.Metadata, DeadLetterTopic: opts.DeadLetterTopic,
},
Expand All @@ -216,5 +219,16 @@ func (c *GRPCClient) subscribeInitialRequest(ctx context.Context, opts Subscript
return nil, errors.Join(err, stream.CloseSend())
}

resp, err := stream.Recv()
if err != nil {
return nil, errors.Join(err, stream.CloseSend())
}

switch resp.GetSubscribeTopicEventsResponseType().(type) {
case *pb.SubscribeTopicEventsResponseAlpha1_InitialResponse:
default:
return nil, fmt.Errorf("unexpected initial response from server : %v", resp)
}

return stream, nil
}
2 changes: 1 addition & 1 deletion examples/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/dapr/dapr v1.14.0-rc.2 // indirect
github.com/dapr/dapr v1.14.0-rc.5 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/go-chi/chi/v5 v5.1.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
Expand Down
4 changes: 2 additions & 2 deletions examples/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyY
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/dapr/dapr v1.14.0-rc.2 h1:wuXninZLTyokeztCinVIVAc9mpVYJS8QyxecPCLdlY8=
github.com/dapr/dapr v1.14.0-rc.2/go.mod h1:uZMuD9K7y+LKSsQUoSAvv1Yn8Cim9X/9ZQ9XuTobyP8=
github.com/dapr/dapr v1.14.0-rc.5 h1:oTZPcT5fwda6bCMxrfenem6tOyeqW1nastxTwWInBCY=
github.com/dapr/dapr v1.14.0-rc.5/go.mod h1:IQWNthXF/I+qqlW4I0T+F4hCu74eKon4vjhpNvoBl8A=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ dapr run --app-id sub \
--dapr-http-port 3500 \
--log-level debug \
--resources-path ./config \
go run bidisub/bidisub.go
go run sub/sub.go
```

<!-- END_STEP -->
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -45,27 +45,27 @@ func main() {
eventHandler,
)
if err != nil {
log.Fatal(err)
log.Fatalf("failed to subscribe to topic: %v", err)
}
fmt.Printf(">>Created subscription messages/sendorder\n")

// Another method of streaming subscriptions, this time for the topic "neworder".
// The returned `sub` object is used to receive messages.
// `sub` must be closed once it's no longer needed.

sub, err := client.Subscribe(context.Background(), daprd.SubscriptionOptions{
PubsubName: "messages",
Topic: "neworder",
DeadLetterTopic: &deadLetterTopic,
})
if err != nil {
log.Fatal(err)
log.Fatalf("failed to subscribe to topic: %v", err)
}
fmt.Printf(">>Created subscription\n")
fmt.Printf(">>Created subscription messages/neworder\n")

for i := 0; i < 3; i++ {
msg, err := sub.Receive()
if err != nil {
log.Fatalf("error receiving message: %v", err)
log.Fatalf("Error receiving message: %v", err)
}
log.Printf(">>Received message\n")
log.Printf("event - PubsubName: %s, Topic: %s, ID: %s, Data: %s\n", msg.PubsubName, msg.Topic, msg.ID, msg.RawData)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/dapr/go-sdk
go 1.22.5

require (
github.com/dapr/dapr v1.14.0-rc.2
github.com/dapr/dapr v1.14.0-rc.5
github.com/go-chi/chi/v5 v5.1.0
github.com/golang/mock v1.6.0
github.com/google/uuid v1.6.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/dapr/dapr v1.14.0-rc.2 h1:wuXninZLTyokeztCinVIVAc9mpVYJS8QyxecPCLdlY8=
github.com/dapr/dapr v1.14.0-rc.2/go.mod h1:uZMuD9K7y+LKSsQUoSAvv1Yn8Cim9X/9ZQ9XuTobyP8=
github.com/dapr/dapr v1.14.0-rc.5 h1:oTZPcT5fwda6bCMxrfenem6tOyeqW1nastxTwWInBCY=
github.com/dapr/dapr v1.14.0-rc.5/go.mod h1:IQWNthXF/I+qqlW4I0T+F4hCu74eKon4vjhpNvoBl8A=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-chi/chi/v5 v5.1.0 h1:acVI1TYaD+hhedDJ3r54HyA6sExp3HfXq7QWEEY/xMw=
Expand Down

0 comments on commit 9bc7d82

Please sign in to comment.