diff --git a/examples/pubsub-streaming-async/README.md b/examples/pubsub-streaming-async/README.md new file mode 100644 index 00000000..dfa7d27d --- /dev/null +++ b/examples/pubsub-streaming-async/README.md @@ -0,0 +1,122 @@ +# Example - Publish and subscribe to messages + +This example utilizes a publisher and a subscriber to show the bidirectional pubsub pattern. +It creates a publisher and calls the `publish_event` method in the `DaprClient`. +In the s`subscriber.py` file it creates a subscriber object that can call the `next_message` method to get new messages from the stream. After processing the new message, it returns a status to the stream. + + +> **Note:** Make sure to use the latest proto bindings + +## Pre-requisites + +- [Dapr CLI and initialized environment](https://docs.dapr.io/getting-started) +- [Install Python 3.8+](https://www.python.org/downloads/) + +## Install Dapr python-SDK + + + +```bash +pip3 install dapr +``` + +## Run async example where users control reading messages off the stream + +Run the following command in a terminal/command prompt: + + + +```bash +# 1. Start Subscriber +dapr run --app-id python-subscriber --app-protocol grpc python3 subscriber.py +``` + + + +In another terminal/command prompt run: + + + +```bash +# 2. Start Publisher +dapr run --app-id python-publisher --app-protocol grpc --dapr-grpc-port=3500 --enable-app-health-check python3 publisher.py +``` + + + +## Run async example with a handler function + +Run the following command in a terminal/command prompt: + + + +```bash +# 1. Start Subscriber +dapr run --app-id python-subscriber --app-protocol grpc python3 subscriber-handler.py +``` + + + +In another terminal/command prompt run: + + + +```bash +# 2. Start Publisher +dapr run --app-id python-publisher --app-protocol grpc --dapr-grpc-port=3500 --enable-app-health-check python3 publisher.py +``` + + + + +## Cleanup + + diff --git a/examples/pubsub-streaming-async/publisher.py b/examples/pubsub-streaming-async/publisher.py new file mode 100644 index 00000000..7268f16a --- /dev/null +++ b/examples/pubsub-streaming-async/publisher.py @@ -0,0 +1,43 @@ +# ------------------------------------------------------------ +# Copyright 2022 The Dapr Authors +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ------------------------------------------------------------ +import asyncio +import json + +from dapr.aio.clients import DaprClient + +async def publish_events(): + """ + Publishes events to a pubsub topic asynchronously + """ + + async with DaprClient() as d: + id = 0 + while id < 5: + id += 1 + req_data = {'id': id, 'message': 'hello world'} + + # Create a typed message with content type and body + await d.publish_event( + pubsub_name='pubsub', + topic_name='TOPIC_A', + data=json.dumps(req_data), + data_content_type='application/json', + publish_metadata={'ttlInSeconds': '100', 'rawPayload': 'false'}, + ) + + # Print the request + print(req_data, flush=True) + + await asyncio.sleep(1) + +asyncio.run(publish_events()) \ No newline at end of file diff --git a/examples/pubsub-streaming/async-subscriber-handler.py b/examples/pubsub-streaming-async/subscriber-handler.py similarity index 96% rename from examples/pubsub-streaming/async-subscriber-handler.py rename to examples/pubsub-streaming-async/subscriber-handler.py index e5f68953..f9503f06 100644 --- a/examples/pubsub-streaming/async-subscriber-handler.py +++ b/examples/pubsub-streaming-async/subscriber-handler.py @@ -32,7 +32,6 @@ async def main(): # Wait until 5 messages are processed global counter while counter < 5: - print('Counter: ', counter) await asyncio.sleep(1) print('Closing subscription...') diff --git a/examples/pubsub-streaming/async-subscriber.py b/examples/pubsub-streaming-async/subscriber.py similarity index 100% rename from examples/pubsub-streaming/async-subscriber.py rename to examples/pubsub-streaming-async/subscriber.py diff --git a/examples/pubsub-streaming/README.md b/examples/pubsub-streaming/README.md index 4bad7f3c..4849e791 100644 --- a/examples/pubsub-streaming/README.md +++ b/examples/pubsub-streaming/README.md @@ -116,103 +116,6 @@ dapr run --app-id python-publisher --app-protocol grpc --dapr-grpc-port=3500 --e -## Run async example where users control reading messages off the stream - -Run the following command in a terminal/command prompt: - - - -```bash -# 1. Start Subscriber -dapr run --app-id python-subscriber --app-protocol grpc python3 async-subscriber.py -``` - - - -In another terminal/command prompt run: - - - -```bash -# 2. Start Publisher -dapr run --app-id python-publisher --app-protocol grpc --dapr-grpc-port=3500 --enable-app-health-check python3 publisher.py -``` - - - -## Run async example with a handler function - -Run the following command in a terminal/command prompt: - - - -```bash -# 1. Start Subscriber -dapr run --app-id python-subscriber --app-protocol grpc python3 async-subscriber-handler.py -``` - - - -In another terminal/command prompt run: - - - -```bash -# 2. Start Publisher -dapr run --app-id python-publisher --app-protocol grpc --dapr-grpc-port=3500 --enable-app-health-check python3 publisher.py -``` - - - - ## Cleanup diff --git a/tox.ini b/tox.ini index 6400e329..78f23086 100644 --- a/tox.ini +++ b/tox.ini @@ -51,6 +51,7 @@ commands = ./validate.sh error_handling ./validate.sh pubsub-simple ./validate.sh pubsub-streaming + ./validate.sh pubsub-streaming-async ./validate.sh state_store ./validate.sh state_store_query ./validate.sh secret_store