-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flow based Event Emitter #35
base: feature/roomlifecycle-attach-with-retry
Are you sure you want to change the base?
Flow based Event Emitter #35
Conversation
added comments accordingly
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
fun offAll() | ||
} | ||
|
||
open class FlowEmitter<V>(scope: CoroutineScope = CoroutineScope(Dispatchers.Default)) : Emitter<V> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Flow is async in nature. It doesn't take a deterministic approach to canceled collectors.
Also, SDK methods are a with a mix of suspending
and nonsuspending
functions.
So, designing an emitter that works for both using flow
kinda feels antipattern : (
Also, while writing few tests, I found, it doesn't work as expected. Especially when you cancel the subscription, because collect
is async in nature and can collect values later in time.
In the case of ably-java
based emitter, values emitted are triggered by emit
itself, so it's deterministic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Flow
doesn't provide any deterministic methods to cancel
the collector, rather they ask to cancel the parent coroutine
that launched it. This can also cancel
active job running inside collector
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can push some failing/flaky tests. So, will keep this on hold.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, collectors can cause memory leaks if not properly handled/ closed : (
import org.junit.Assert | ||
import org.junit.Test | ||
|
||
class FlowEmitterTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This simple emitter test doesn't work : (
I think Flows
are meant for specific requirements for broadcasting events and don't fit right into the ably specific emitter pattern.
Kotlin
doesn't provide flexible API around collectors, getting active collectors, canceling them, etc. Dangling collectors can also cause memory leaks.
If we want to implement (coroutine/scope-based) emitters in Kotlin, It's better to do it using simple DS like ConcurrentList instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2f12adb have fixed the test, but had to make on
method blocking and add extra workaround (emit null
as end of stream) to make flows work as expected
60905b8
to
f091c71
Compare
Superseded by #38 |
No description provided.