Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can't emmit items to observer in infinite loop #592

Open
yoavweber opened this issue Jan 16, 2022 · 2 comments
Open

Can't emmit items to observer in infinite loop #592

yoavweber opened this issue Jan 16, 2022 · 2 comments
Labels

Comments

@yoavweber
Copy link

Hey everyone,
I am building a small app that listens to WebSocket stream and parses the received events. I tried to mock the basic functionality in the code snippet:

import asyncio
import rx
import rx.operators as ops
from rx.scheduler.eventloop import AsyncIOScheduler
import random


TRIGGER_EVENT = 10


def mock_stream(loop):
    def on_subscribe(observer, schedular):
        async def get_deal_pracentage():
            i = 0
            while True:
                value = random.randint(0, 10000000)
                if value == TRIGGER_EVENT:
                    print("sending data to observable with payload:", i)
                    observer.on_next(i)
                    # await asyncio.sleep(0.1)
                    i += 1

        loop.create_task(get_deal_pracentage("test"))

    return rx.create(on_subscribe)


async def foo(i):
    await asyncio.sleep(4)
    return i+1


loop = asyncio.get_event_loop()
ticks = rx.interval(2.0)
obs = mock_stream(loop).pipe(ops.flat_map(lambda i: rx.from_future(
    loop.create_task(foo()))))


obs.subscribe(
    on_next=lambda item: print("observer received payload:", item),
    scheduler=AsyncIOScheduler(loop)
)

loop.run_forever()

For some reason, the observer is not receiving the emitted item. When I am adding await asyncio.sleep() in the observable the observer would receive the event.

How can the observer receive the item without adding the await asyncio.sleep()? and why it only works when I am adding the await asyncio.sleep()?

I am pretty new to rxpy and asyncio, so unrelated feedback about the code would be highly welcomed :)

@MainRo
Copy link
Collaborator

MainRo commented Jan 19, 2022

The issue here is that the asyncio event loop is blocked forever.

The while loop in get_deal_pracentage runs on the asyncio event loop of your application. Without the sleep call, the get_deal_pracentage task never exits nor leaves execution slots for other tasks. As a consequence, the rest of the code is never scheduled/executed.
With the call to sleep, get_deal_pracentage is interrupted. Asyncio schedules one of the other pending tasks, including the code in flatmap.

If you are new to both rxpy and asyncio, starting by using them together is not an easy learning path!

@yoavweber
Copy link
Author

Thanks for the replay!

In my actual implementation, I'm receiving 3 events one after the other. When I am using asyncio.wait I am losing some data(only one event is passing to the observer).

Is there any other way to wait for an event and also make sure that the rest of the application would run?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants