diff --git a/src/main/kotlin/interop/flowable/FlowableInterop.kt b/src/main/kotlin/interop/flowable/FlowableInterop.kt new file mode 100644 index 0000000..392a328 --- /dev/null +++ b/src/main/kotlin/interop/flowable/FlowableInterop.kt @@ -0,0 +1,17 @@ +package interop.flowable + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.delay +import kotlinx.coroutines.rx2.rxFlowable +import kotlin.coroutines.CoroutineContext + +class FlowableInterop( + override val coroutineContext: CoroutineContext = Dispatchers.Default +): CoroutineScope { + fun getFlowable() = CoroutineScope(coroutineContext).rxFlowable { + (1..10).map { + send(it) + } + } +} \ No newline at end of file diff --git a/src/main/kotlin/interop/flowable/TestFlowableInterop.kt b/src/main/kotlin/interop/flowable/TestFlowableInterop.kt new file mode 100644 index 0000000..affd8a4 --- /dev/null +++ b/src/main/kotlin/interop/flowable/TestFlowableInterop.kt @@ -0,0 +1,13 @@ +package interop.flowable + +fun main() { + println("Creating FlowableInterop") + val flowableInterop = FlowableInterop() + flowableInterop.getFlowable() + .subscribe({ + println(it) + Thread.sleep(1000) // notice that the producing coroutine's backpressure is handled + },::println) + + Thread.sleep(11000) // allow background work to occur before main thread exits +} \ No newline at end of file diff --git a/src/main/kotlin/interop/observable/TestObservableInterop.kt b/src/main/kotlin/interop/observable/TestObservableInterop.kt index 2e0d10f..8d0ba12 100644 --- a/src/main/kotlin/interop/observable/TestObservableInterop.kt +++ b/src/main/kotlin/interop/observable/TestObservableInterop.kt @@ -5,5 +5,5 @@ fun main() { val observableInterop = ObservableInterop() observableInterop.getStream().subscribe(::println, ::println) - Thread.sleep(2000) + Thread.sleep(2000) // Sleep main thread to allow coroutine to complete } \ No newline at end of file