Skip to content

Commit

Permalink
Split sync and async examples
Browse files Browse the repository at this point in the history
Signed-off-by: Elena Kolevska <[email protected]>
  • Loading branch information
elena-kolevska committed Oct 10, 2024
1 parent 0ada5ea commit edadffe
Show file tree
Hide file tree
Showing 6 changed files with 166 additions and 98 deletions.
122 changes: 122 additions & 0 deletions examples/pubsub-streaming-async/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
# Example - Publish and subscribe to messages

This example utilizes a publisher and a subscriber to show the bidirectional pubsub pattern.
It creates a publisher and calls the `publish_event` method in the `DaprClient`.
In the s`subscriber.py` file it creates a subscriber object that can call the `next_message` method to get new messages from the stream. After processing the new message, it returns a status to the stream.


> **Note:** Make sure to use the latest proto bindings
## Pre-requisites

- [Dapr CLI and initialized environment](https://docs.dapr.io/getting-started)
- [Install Python 3.8+](https://www.python.org/downloads/)

## Install Dapr python-SDK

<!-- Our CI/CD pipeline automatically installs the correct version, so we can skip this step in the automation -->

```bash
pip3 install dapr
```

## Run async example where users control reading messages off the stream

Run the following command in a terminal/command prompt:

<!-- STEP
name: Run subscriber
expected_stdout_lines:
- "== APP == Processing message: {'id': 1, 'message': 'hello world'} from TOPIC_A..."
- "== APP == Processing message: {'id': 2, 'message': 'hello world'} from TOPIC_A..."
- "== APP == Processing message: {'id': 3, 'message': 'hello world'} from TOPIC_A..."
- "== APP == Processing message: {'id': 4, 'message': 'hello world'} from TOPIC_A..."
- "== APP == Processing message: {'id': 5, 'message': 'hello world'} from TOPIC_A..."
- "== APP == Closing subscription..."
output_match_mode: substring
background: true
match_order: none
sleep: 3
-->

```bash
# 1. Start Subscriber
dapr run --app-id python-subscriber --app-protocol grpc python3 subscriber.py
```

<!-- END_STEP -->

In another terminal/command prompt run:

<!-- STEP
name: Run publisher
expected_stdout_lines:
- "== APP == {'id': 1, 'message': 'hello world'}"
- "== APP == {'id': 2, 'message': 'hello world'}"
- "== APP == {'id': 3, 'message': 'hello world'}"
- "== APP == {'id': 4, 'message': 'hello world'}"
- "== APP == {'id': 5, 'message': 'hello world'}"
background: true
output_match_mode: substring
sleep: 15
-->

```bash
# 2. Start Publisher
dapr run --app-id python-publisher --app-protocol grpc --dapr-grpc-port=3500 --enable-app-health-check python3 publisher.py
```

<!-- END_STEP -->

## Run async example with a handler function

Run the following command in a terminal/command prompt:

<!-- STEP
name: Run subscriber
expected_stdout_lines:
- "== APP == Processing message: {'id': 1, 'message': 'hello world'} from TOPIC_A..."
- "== APP == Processing message: {'id': 2, 'message': 'hello world'} from TOPIC_A..."
- "== APP == Processing message: {'id': 3, 'message': 'hello world'} from TOPIC_A..."
- "== APP == Processing message: {'id': 4, 'message': 'hello world'} from TOPIC_A..."
- "== APP == Processing message: {'id': 5, 'message': 'hello world'} from TOPIC_A..."
- "== APP == Closing subscription..."
output_match_mode: substring
background: true
match_order: none
sleep: 3
-->

```bash
# 1. Start Subscriber
dapr run --app-id python-subscriber --app-protocol grpc python3 subscriber-handler.py
```

<!-- END_STEP -->

In another terminal/command prompt run:

<!-- STEP
name: Run publisher
expected_stdout_lines:
- "== APP == {'id': 1, 'message': 'hello world'}"
- "== APP == {'id': 2, 'message': 'hello world'}"
- "== APP == {'id': 3, 'message': 'hello world'}"
- "== APP == {'id': 4, 'message': 'hello world'}"
- "== APP == {'id': 5, 'message': 'hello world'}"
background: true
output_match_mode: substring
sleep: 15
-->

```bash
# 2. Start Publisher
dapr run --app-id python-publisher --app-protocol grpc --dapr-grpc-port=3500 --enable-app-health-check python3 publisher.py
```

<!-- END_STEP -->


## Cleanup


43 changes: 43 additions & 0 deletions examples/pubsub-streaming-async/publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# ------------------------------------------------------------
# Copyright 2022 The Dapr Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ------------------------------------------------------------
import asyncio
import json

from dapr.aio.clients import DaprClient

async def publish_events():
"""
Publishes events to a pubsub topic asynchronously
"""

async with DaprClient() as d:
id = 0
while id < 5:
id += 1
req_data = {'id': id, 'message': 'hello world'}

# Create a typed message with content type and body
await d.publish_event(
pubsub_name='pubsub',
topic_name='TOPIC_A',
data=json.dumps(req_data),
data_content_type='application/json',
publish_metadata={'ttlInSeconds': '100', 'rawPayload': 'false'},
)

# Print the request
print(req_data, flush=True)

await asyncio.sleep(1)

asyncio.run(publish_events())
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ async def main():
# Wait until 5 messages are processed
global counter
while counter < 5:
print('Counter: ', counter)
await asyncio.sleep(1)

print('Closing subscription...')
Expand Down
97 changes: 0 additions & 97 deletions examples/pubsub-streaming/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,103 +116,6 @@ dapr run --app-id python-publisher --app-protocol grpc --dapr-grpc-port=3500 --e

<!-- END_STEP -->

## Run async example where users control reading messages off the stream

Run the following command in a terminal/command prompt:

<!-- STEP
name: Run subscriber
expected_stdout_lines:
- "== APP == Processing message: {'id': 1, 'message': 'hello world'} from TOPIC_A..."
- "== APP == Processing message: {'id': 2, 'message': 'hello world'} from TOPIC_A..."
- "== APP == Processing message: {'id': 3, 'message': 'hello world'} from TOPIC_A..."
- "== APP == Processing message: {'id': 4, 'message': 'hello world'} from TOPIC_A..."
- "== APP == Processing message: {'id': 5, 'message': 'hello world'} from TOPIC_A..."
- "== APP == Closing subscription..."
output_match_mode: substring
background: true
match_order: none
sleep: 3
-->

```bash
# 1. Start Subscriber
dapr run --app-id python-subscriber --app-protocol grpc python3 async-subscriber.py
```

<!-- END_STEP -->

In another terminal/command prompt run:

<!-- STEP
name: Run publisher
expected_stdout_lines:
- "== APP == {'id': 1, 'message': 'hello world'}"
- "== APP == {'id': 2, 'message': 'hello world'}"
- "== APP == {'id': 3, 'message': 'hello world'}"
- "== APP == {'id': 4, 'message': 'hello world'}"
- "== APP == {'id': 5, 'message': 'hello world'}"
background: true
output_match_mode: substring
sleep: 15
-->

```bash
# 2. Start Publisher
dapr run --app-id python-publisher --app-protocol grpc --dapr-grpc-port=3500 --enable-app-health-check python3 publisher.py
```

<!-- END_STEP -->

## Run async example with a handler function

Run the following command in a terminal/command prompt:

<!-- STEP
name: Run subscriber
expected_stdout_lines:
- "== APP == Processing message: {'id': 1, 'message': 'hello world'} from TOPIC_A..."
- "== APP == Processing message: {'id': 2, 'message': 'hello world'} from TOPIC_A..."
- "== APP == Processing message: {'id': 3, 'message': 'hello world'} from TOPIC_A..."
- "== APP == Processing message: {'id': 4, 'message': 'hello world'} from TOPIC_A..."
- "== APP == Processing message: {'id': 5, 'message': 'hello world'} from TOPIC_A..."
- "== APP == Closing subscription..."
output_match_mode: substring
background: true
match_order: none
sleep: 3
-->

```bash
# 1. Start Subscriber
dapr run --app-id python-subscriber --app-protocol grpc python3 async-subscriber-handler.py
```

<!-- END_STEP -->

In another terminal/command prompt run:

<!-- STEP
name: Run publisher
expected_stdout_lines:
- "== APP == {'id': 1, 'message': 'hello world'}"
- "== APP == {'id': 2, 'message': 'hello world'}"
- "== APP == {'id': 3, 'message': 'hello world'}"
- "== APP == {'id': 4, 'message': 'hello world'}"
- "== APP == {'id': 5, 'message': 'hello world'}"
background: true
output_match_mode: substring
sleep: 15
-->

```bash
# 2. Start Publisher
dapr run --app-id python-publisher --app-protocol grpc --dapr-grpc-port=3500 --enable-app-health-check python3 publisher.py
```

<!-- END_STEP -->


## Cleanup


1 change: 1 addition & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ commands =
./validate.sh error_handling
./validate.sh pubsub-simple
./validate.sh pubsub-streaming
./validate.sh pubsub-streaming-async
./validate.sh state_store
./validate.sh state_store_query
./validate.sh secret_store
Expand Down

0 comments on commit edadffe

Please sign in to comment.