Skip to content

Commit

Permalink
Bidirectional Subscriptions (#578)
Browse files Browse the repository at this point in the history
* Bidirectional Subscriptions

Adds support for bidirectional subscriptions to PubSubs. Adds two
methods for subscribing- one using a callback and one using an
imperative approach. Both giving support to different programming styles
or use cases.

Adds example with tests.

Signed-off-by: joshvanl <[email protected]>

* Linting: Remove unused `closeCh`

Signed-off-by: joshvanl <[email protected]>

* Fixes comment order in bidisub.go

Signed-off-by: joshvanl <[email protected]>

* Add comment about processing message

Signed-off-by: joshvanl <[email protected]>

* Adds dead letter topic example

Signed-off-by: joshvanl <[email protected]>

* chore: remove go.mod

Signed-off-by: mikeee <[email protected]>

* Updates go mod to v1.14.0-rc.1

Signed-off-by: joshvanl <[email protected]>

---------

Signed-off-by: joshvanl <[email protected]>
Signed-off-by: mikeee <[email protected]>
Signed-off-by: Mike Nguyen <[email protected]>
Co-authored-by: mikeee <[email protected]>
  • Loading branch information
JoshVanL and mikeee authored Jul 11, 2024
1 parent a1e723b commit b7b90e3
Show file tree
Hide file tree
Showing 12 changed files with 508 additions and 39 deletions.
1 change: 1 addition & 0 deletions .github/workflows/validate_examples.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ jobs:
"grpc-service",
"hello-world",
"pubsub",
"bidipubsub",
"service",
"socket",
"workflow",
Expand Down
8 changes: 8 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,14 @@ type Client interface {
// UnsubscribeConfigurationItems can stop the subscription with target store's and id
UnsubscribeConfigurationItems(ctx context.Context, storeName string, id string, opts ...ConfigurationOpt) error

// Subscribe subscribes to a pubsub topic and streams messages to the returned Subscription.
// Subscription must be closed after finishing with subscribing.
Subscribe(ctx context.Context, opts SubscriptionOptions) (*Subscription, error)

// SubscribeWithHandler subscribes to a pubsub topic and calls the given handler on topic events.
// The returned cancel function must be called after finishing with subscribing.
SubscribeWithHandler(ctx context.Context, opts SubscriptionOptions, handler SubscriptionHandleFunction) (func() error, error)

// DeleteBulkState deletes content for multiple keys from store.
DeleteBulkState(ctx context.Context, storeName string, keys []string, meta map[string]string) error

Expand Down
3 changes: 2 additions & 1 deletion client/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,8 @@ func (c *GRPCClient) GetBulkState(ctx context.Context, storeName string, keys []

// GetState retrieves state from specific store using default consistency option.
func (c *GRPCClient) GetState(ctx context.Context, storeName, key string, meta map[string]string) (item *StateItem, err error) {
return c.GetStateWithConsistency(ctx, storeName, key, meta, StateConsistencyStrong)
i, err := c.GetStateWithConsistency(ctx, storeName, key, meta, StateConsistencyStrong)
return i, err
}

// GetStateWithConsistency retrieves state from specific store using provided state consistency.
Expand Down
220 changes: 220 additions & 0 deletions client/subscribe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
/*
Copyright 2024 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package client

import (
"context"
"encoding/json"
"errors"
"fmt"
"mime"
"strings"
"sync"
"sync/atomic"

pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
"github.com/dapr/go-sdk/service/common"
)

type SubscriptionHandleFunction func(event *common.TopicEvent) common.SubscriptionResponseStatus

type SubscriptionOptions struct {
PubsubName string
Topic string
DeadLetterTopic *string
Metadata map[string]string
}

type Subscription struct {
stream pb.Dapr_SubscribeTopicEventsAlpha1Client
// lock locks concurrent writes to subscription stream.
lock sync.Mutex
closed atomic.Bool
}

type SubscriptionMessage struct {
*common.TopicEvent
sub *Subscription
}

func (c *GRPCClient) Subscribe(ctx context.Context, opts SubscriptionOptions) (*Subscription, error) {
stream, err := c.subscribeInitialRequest(ctx, opts)
if err != nil {
return nil, err
}

return &Subscription{
stream: stream,
}, nil
}

func (c *GRPCClient) SubscribeWithHandler(ctx context.Context, opts SubscriptionOptions, handler SubscriptionHandleFunction) (func() error, error) {
s, err := c.Subscribe(ctx, opts)
if err != nil {
return nil, err
}

go func() {
defer s.Close()

for {
msg, err := s.Receive()
if err != nil {
if !s.closed.Load() {
logger.Printf("Error receiving messages from subscription pubsub=%s topic=%s, closing subscription: %s",
opts.PubsubName, opts.Topic, err)
}
return
}

go func() {
if err := msg.respondStatus(handler(msg.TopicEvent)); err != nil {
logger.Printf("Error responding to topic with event status pubsub=%s topic=%s message_id=%s: %s",
opts.PubsubName, opts.Topic, msg.ID, err)
}
}()
}
}()

return s.Close, nil
}

func (s *Subscription) Close() error {
if !s.closed.CompareAndSwap(false, true) {
return errors.New("subscription already closed")
}

return s.stream.CloseSend()
}

func (s *Subscription) Receive() (*SubscriptionMessage, error) {
event, err := s.stream.Recv()
if err != nil {
return nil, err
}

data := any(event.GetData())
if len(event.GetData()) > 0 {
mediaType, _, err := mime.ParseMediaType(event.GetDataContentType())
if err == nil {
var v interface{}
switch mediaType {
case "application/json":
if err := json.Unmarshal(event.GetData(), &v); err == nil {
data = v
}
case "text/plain":
// Assume UTF-8 encoded string.
data = string(event.GetData())
default:
if strings.HasPrefix(mediaType, "application/") &&
strings.HasSuffix(mediaType, "+json") {
if err := json.Unmarshal(event.GetData(), &v); err == nil {
data = v
}
}
}
}
}

topicEvent := &common.TopicEvent{
ID: event.GetId(),
Source: event.GetSource(),
Type: event.GetType(),
SpecVersion: event.GetSpecVersion(),
DataContentType: event.GetDataContentType(),
Data: data,
RawData: event.GetData(),
Topic: event.GetTopic(),
PubsubName: event.GetPubsubName(),
}

return &SubscriptionMessage{
sub: s,
TopicEvent: topicEvent,
}, nil
}

func (s *SubscriptionMessage) Success() error {
return s.respond(pb.TopicEventResponse_SUCCESS)
}

func (s *SubscriptionMessage) Retry() error {
return s.respond(pb.TopicEventResponse_RETRY)
}

func (s *SubscriptionMessage) Drop() error {
return s.respond(pb.TopicEventResponse_DROP)
}

func (s *SubscriptionMessage) respondStatus(status common.SubscriptionResponseStatus) error {
var statuspb pb.TopicEventResponse_TopicEventResponseStatus
switch status {
case common.SubscriptionResponseStatusSuccess:
statuspb = pb.TopicEventResponse_SUCCESS
case common.SubscriptionResponseStatusRetry:
statuspb = pb.TopicEventResponse_RETRY
case common.SubscriptionResponseStatusDrop:
statuspb = pb.TopicEventResponse_DROP
default:
return fmt.Errorf("unknown status, expected one of %s, %s, %s: %s",
common.SubscriptionResponseStatusSuccess, common.SubscriptionResponseStatusRetry,
common.SubscriptionResponseStatusDrop, status)
}

return s.respond(statuspb)
}

func (s *SubscriptionMessage) respond(status pb.TopicEventResponse_TopicEventResponseStatus) error {
s.sub.lock.Lock()
defer s.sub.lock.Unlock()

return s.sub.stream.Send(&pb.SubscribeTopicEventsRequestAlpha1{
SubscribeTopicEventsRequestType: &pb.SubscribeTopicEventsRequestAlpha1_EventResponse{
EventResponse: &pb.SubscribeTopicEventsResponseAlpha1{
Id: s.ID,
Status: &pb.TopicEventResponse{Status: status},
},
},
})
}

func (c *GRPCClient) subscribeInitialRequest(ctx context.Context, opts SubscriptionOptions) (pb.Dapr_SubscribeTopicEventsAlpha1Client, error) {
if len(opts.PubsubName) == 0 {
return nil, errors.New("pubsub name required")
}

if len(opts.Topic) == 0 {
return nil, errors.New("topic required")
}

stream, err := c.protoClient.SubscribeTopicEventsAlpha1(ctx)
if err != nil {
return nil, err
}

err = stream.Send(&pb.SubscribeTopicEventsRequestAlpha1{
SubscribeTopicEventsRequestType: &pb.SubscribeTopicEventsRequestAlpha1_InitialRequest{
InitialRequest: &pb.SubscribeTopicEventsInitialRequestAlpha1{
PubsubName: opts.PubsubName, Topic: opts.Topic,
Metadata: opts.Metadata, DeadLetterTopic: opts.DeadLetterTopic,
},
},
})
if err != nil {
return nil, errors.Join(err, stream.CloseSend())
}

return stream, nil
}
71 changes: 71 additions & 0 deletions examples/bidipubsub/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Dapr PubSub Example with go-sdk

This folder contains two Go files that use the Go SDK to invoke the Dapr Pub/Sub API.

## Diagram

![](https://i.loli.net/2020/08/23/5MBYgwqCZcXNUf2.jpg)

## Step

### Prepare

- Dapr installed

### Run Subscriber Server

<!-- STEP
name: Run Subscriber Server
output_match_mode: substring
match_order: none
expected_stdout_lines:
- 'event - PubsubName: messages, Topic: neworder'
- 'event - PubsubName: messages, Topic: neworder'
- 'event - PubsubName: messages, Topic: neworder'
- 'event - PubsubName: messages, Topic: sendorder'
- 'event - PubsubName: messages, Topic: sendorder'
- 'event - PubsubName: messages, Topic: sendorder'
expected_stderr_lines:
background: true
sleep: 15
-->

```bash
dapr run --app-id sub \
--dapr-http-port 3500 \
--log-level debug \
--resources-path ./config \
go run bidisub/bidisub.go
```

<!-- END_STEP -->

### Run Publisher

<!-- STEP
name: Run publisher
output_match_mode: substring
expected_stdout_lines:
- 'sending message'
- 'message published'
- 'sending multiple messages'
- 'multiple messages published'
expected_stderr_lines:
background: true
sleep: 15
-->

```bash
dapr run --app-id pub \
--log-level debug \
--resources-path ./config \
go run pub/pub.go
```

<!-- END_STEP -->

## Result

```shell
== APP == 2023/03/29 21:36:07 event - PubsubName: messages, Topic: neworder, ID: 82427280-1c18-4fab-b901-c7e68d295d31, Data: ping123
```
Loading

0 comments on commit b7b90e3

Please sign in to comment.