Skip to content

Commit

Permalink
Add sample of backpressure-enabled stream conversion
Browse files Browse the repository at this point in the history
  • Loading branch information
Elijah Verdoorn committed Sep 18, 2019
1 parent c5d6dfc commit 82a0c3a
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 1 deletion.
17 changes: 17 additions & 0 deletions src/main/kotlin/interop/flowable/FlowableInterop.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package interop.flowable

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.rx2.rxFlowable
import kotlin.coroutines.CoroutineContext

class FlowableInterop(
override val coroutineContext: CoroutineContext = Dispatchers.Default
): CoroutineScope {
fun getFlowable() = CoroutineScope(coroutineContext).rxFlowable {
(1..10).map {
send(it)
}
}
}
13 changes: 13 additions & 0 deletions src/main/kotlin/interop/flowable/TestFlowableInterop.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package interop.flowable

fun main() {
println("Creating FlowableInterop")
val flowableInterop = FlowableInterop()
flowableInterop.getFlowable()
.subscribe({
println(it)
Thread.sleep(1000) // notice that the producing coroutine's backpressure is handled
},::println)

Thread.sleep(11000) // allow background work to occur before main thread exits
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ fun main() {
val observableInterop = ObservableInterop()
observableInterop.getStream().subscribe(::println, ::println)

Thread.sleep(2000)
Thread.sleep(2000) // Sleep main thread to allow coroutine to complete
}

0 comments on commit 82a0c3a

Please sign in to comment.