From c9d3c206d6272699716f64bd2c6e5da98f4cf675 Mon Sep 17 00:00:00 2001 From: Elijah Verdoorn Date: Wed, 18 Sep 2019 12:01:30 -0700 Subject: [PATCH 1/6] Add dependency on rx2 interop library, add dependency on RxJava2 --- build.gradle | 2 ++ 1 file changed, 2 insertions(+) diff --git a/build.gradle b/build.gradle index f84e871..36a7627 100644 --- a/build.gradle +++ b/build.gradle @@ -14,6 +14,8 @@ repositories { dependencies { implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk8" implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.0' + implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-rx2:1.3.0' + implementation "io.reactivex.rxjava2:rxjava:2.2.7" implementation "io.reactivex.rxjava3:rxjava:3.0.0-RC2" testCompile group: 'junit', name: 'junit', version: '4.12' } From 2d95c22fa970b9f11803a194fee77f783f24bc46 Mon Sep 17 00:00:00 2001 From: Elijah Verdoorn Date: Wed, 18 Sep 2019 12:02:11 -0700 Subject: [PATCH 2/6] Add sample for interoperability with RxJava Single --- gradle/wrapper/gradle-wrapper.properties | 5 +++-- .../interop/completable/CompletableInterop.kt | 2 ++ src/main/kotlin/interop/single/SingleInterop.kt | 15 +++++++++++++++ .../kotlin/interop/single/TestSingleInterop.kt | 9 +++++++++ 4 files changed, 29 insertions(+), 2 deletions(-) create mode 100644 src/main/kotlin/interop/completable/CompletableInterop.kt create mode 100644 src/main/kotlin/interop/single/SingleInterop.kt create mode 100644 src/main/kotlin/interop/single/TestSingleInterop.kt diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 290541c..f80ef9c 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,6 @@ +#Wed Sep 18 11:29:24 PDT 2019 +distributionUrl=https\://services.gradle.org/distributions/gradle-4.10.3-all.zip distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-4.10.3-bin.zip -zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists +zipStoreBase=GRADLE_USER_HOME diff --git a/src/main/kotlin/interop/completable/CompletableInterop.kt b/src/main/kotlin/interop/completable/CompletableInterop.kt new file mode 100644 index 0000000..8ab5f29 --- /dev/null +++ b/src/main/kotlin/interop/completable/CompletableInterop.kt @@ -0,0 +1,2 @@ +package interop.completable + diff --git a/src/main/kotlin/interop/single/SingleInterop.kt b/src/main/kotlin/interop/single/SingleInterop.kt new file mode 100644 index 0000000..2c78cc4 --- /dev/null +++ b/src/main/kotlin/interop/single/SingleInterop.kt @@ -0,0 +1,15 @@ +package interop.single + +import kotlinx.coroutines.* +import kotlinx.coroutines.rx2.rxSingle +import kotlin.coroutines.CoroutineContext + +class SingleInterop( + override val coroutineContext: CoroutineContext = Dispatchers.Default +): CoroutineScope { + fun doWork() = CoroutineScope(coroutineContext).rxSingle { + delay(1000) + return@rxSingle "Hello!" + } +} + diff --git a/src/main/kotlin/interop/single/TestSingleInterop.kt b/src/main/kotlin/interop/single/TestSingleInterop.kt new file mode 100644 index 0000000..dd3b58e --- /dev/null +++ b/src/main/kotlin/interop/single/TestSingleInterop.kt @@ -0,0 +1,9 @@ +package interop.single + +fun main() { + println("Creating an instance of SingleInterop") + val singleInterop = SingleInterop() + singleInterop.doWork().subscribe(::println, ::println) + + Thread.sleep(2000) // Sleep the main thread to await the completion of the background thread +} From f441eb1b30e221894ae46684f7ceca4bd630a360 Mon Sep 17 00:00:00 2001 From: Elijah Verdoorn Date: Wed, 18 Sep 2019 12:05:01 -0700 Subject: [PATCH 3/6] Add sample for converting coroutine to RxJava 2 Completable --- .../interop/completable/CompletableInterop.kt | 15 +++++++++++++++ .../interop/completable/TestCompletableInterop.kt | 9 +++++++++ 2 files changed, 24 insertions(+) create mode 100644 src/main/kotlin/interop/completable/TestCompletableInterop.kt diff --git a/src/main/kotlin/interop/completable/CompletableInterop.kt b/src/main/kotlin/interop/completable/CompletableInterop.kt index 8ab5f29..01b51f5 100644 --- a/src/main/kotlin/interop/completable/CompletableInterop.kt +++ b/src/main/kotlin/interop/completable/CompletableInterop.kt @@ -1,2 +1,17 @@ package interop.completable +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.delay +import kotlinx.coroutines.rx2.rxCompletable +import kotlin.coroutines.CoroutineContext + +class CompletableInterop( + override val coroutineContext: CoroutineContext = Dispatchers.Default +): CoroutineScope { + fun doWork() = CoroutineScope(coroutineContext).rxCompletable { + delay(1000) + println("Hello from the coroutine!") + } +} + diff --git a/src/main/kotlin/interop/completable/TestCompletableInterop.kt b/src/main/kotlin/interop/completable/TestCompletableInterop.kt new file mode 100644 index 0000000..7513982 --- /dev/null +++ b/src/main/kotlin/interop/completable/TestCompletableInterop.kt @@ -0,0 +1,9 @@ +package interop.completable + +fun main() { + println("Creating an instance of CompletableInterop") + val completableInterop = CompletableInterop() + completableInterop.doWork().subscribe() + + Thread.sleep(2000) // Sleep the main thread to await the completion of the background thread +} \ No newline at end of file From 2bf26f6b528ed5cc7d84cc414d164b0dd67c86d5 Mon Sep 17 00:00:00 2001 From: Elijah Verdoorn Date: Wed, 18 Sep 2019 12:09:29 -0700 Subject: [PATCH 4/6] Add sample of rxMaybe --- src/main/kotlin/interop/maybe/MaybeInterop.kt | 15 +++++++++++++++ src/main/kotlin/interop/maybe/TestMaybeInterop.kt | 9 +++++++++ 2 files changed, 24 insertions(+) create mode 100644 src/main/kotlin/interop/maybe/MaybeInterop.kt create mode 100644 src/main/kotlin/interop/maybe/TestMaybeInterop.kt diff --git a/src/main/kotlin/interop/maybe/MaybeInterop.kt b/src/main/kotlin/interop/maybe/MaybeInterop.kt new file mode 100644 index 0000000..f7483b0 --- /dev/null +++ b/src/main/kotlin/interop/maybe/MaybeInterop.kt @@ -0,0 +1,15 @@ +package interop.maybe + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.rx2.rxMaybe +import kotlin.coroutines.CoroutineContext +import kotlin.random.Random + +class MaybeInterop( + override val coroutineContext: CoroutineContext = Dispatchers.Default +): CoroutineScope { + fun doWorkMaybe() = CoroutineScope(coroutineContext).rxMaybe { + return@rxMaybe if (Random.Default.nextBoolean()) "A value from the function" else null + } +} \ No newline at end of file diff --git a/src/main/kotlin/interop/maybe/TestMaybeInterop.kt b/src/main/kotlin/interop/maybe/TestMaybeInterop.kt new file mode 100644 index 0000000..330f7cc --- /dev/null +++ b/src/main/kotlin/interop/maybe/TestMaybeInterop.kt @@ -0,0 +1,9 @@ +package interop.maybe + +fun main() { + println("Creating an instance of MaybeInterop") + val maybeInterop = MaybeInterop() + maybeInterop.doWorkMaybe().subscribe(::println, ::println) + + Thread.sleep(2000) // Sleep the main thread to await the completion of the background thread +} \ No newline at end of file From c5d6dfc5c6e811ba8b03702339417dedf630f0dd Mon Sep 17 00:00:00 2001 From: Elijah Verdoorn Date: Wed, 18 Sep 2019 12:16:53 -0700 Subject: [PATCH 5/6] Add sample of Observable interoperability --- .../interop/observable/ObservableInterop.kt | 18 ++++++++++++++++++ .../observable/TestObservableInterop.kt | 9 +++++++++ 2 files changed, 27 insertions(+) create mode 100644 src/main/kotlin/interop/observable/ObservableInterop.kt create mode 100644 src/main/kotlin/interop/observable/TestObservableInterop.kt diff --git a/src/main/kotlin/interop/observable/ObservableInterop.kt b/src/main/kotlin/interop/observable/ObservableInterop.kt new file mode 100644 index 0000000..7370100 --- /dev/null +++ b/src/main/kotlin/interop/observable/ObservableInterop.kt @@ -0,0 +1,18 @@ +package interop.observable + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.delay +import kotlinx.coroutines.rx2.rxObservable +import kotlin.coroutines.CoroutineContext + +class ObservableInterop( + override val coroutineContext: CoroutineContext = Dispatchers.Default +): CoroutineScope { + fun getStream() = CoroutineScope(coroutineContext).rxObservable { + (1..10).map { + delay(100) + send(it) + } + } +} \ No newline at end of file diff --git a/src/main/kotlin/interop/observable/TestObservableInterop.kt b/src/main/kotlin/interop/observable/TestObservableInterop.kt new file mode 100644 index 0000000..2e0d10f --- /dev/null +++ b/src/main/kotlin/interop/observable/TestObservableInterop.kt @@ -0,0 +1,9 @@ +package interop.observable + +fun main() { + println("Creating ObservableInterop") + val observableInterop = ObservableInterop() + observableInterop.getStream().subscribe(::println, ::println) + + Thread.sleep(2000) +} \ No newline at end of file From 82a0c3ac4b7345d1d030c44e3174847a5777d337 Mon Sep 17 00:00:00 2001 From: Elijah Verdoorn Date: Wed, 18 Sep 2019 13:50:44 -0700 Subject: [PATCH 6/6] Add sample of backpressure-enabled stream conversion --- .../kotlin/interop/flowable/FlowableInterop.kt | 17 +++++++++++++++++ .../interop/flowable/TestFlowableInterop.kt | 13 +++++++++++++ .../interop/observable/TestObservableInterop.kt | 2 +- 3 files changed, 31 insertions(+), 1 deletion(-) create mode 100644 src/main/kotlin/interop/flowable/FlowableInterop.kt create mode 100644 src/main/kotlin/interop/flowable/TestFlowableInterop.kt diff --git a/src/main/kotlin/interop/flowable/FlowableInterop.kt b/src/main/kotlin/interop/flowable/FlowableInterop.kt new file mode 100644 index 0000000..392a328 --- /dev/null +++ b/src/main/kotlin/interop/flowable/FlowableInterop.kt @@ -0,0 +1,17 @@ +package interop.flowable + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.delay +import kotlinx.coroutines.rx2.rxFlowable +import kotlin.coroutines.CoroutineContext + +class FlowableInterop( + override val coroutineContext: CoroutineContext = Dispatchers.Default +): CoroutineScope { + fun getFlowable() = CoroutineScope(coroutineContext).rxFlowable { + (1..10).map { + send(it) + } + } +} \ No newline at end of file diff --git a/src/main/kotlin/interop/flowable/TestFlowableInterop.kt b/src/main/kotlin/interop/flowable/TestFlowableInterop.kt new file mode 100644 index 0000000..affd8a4 --- /dev/null +++ b/src/main/kotlin/interop/flowable/TestFlowableInterop.kt @@ -0,0 +1,13 @@ +package interop.flowable + +fun main() { + println("Creating FlowableInterop") + val flowableInterop = FlowableInterop() + flowableInterop.getFlowable() + .subscribe({ + println(it) + Thread.sleep(1000) // notice that the producing coroutine's backpressure is handled + },::println) + + Thread.sleep(11000) // allow background work to occur before main thread exits +} \ No newline at end of file diff --git a/src/main/kotlin/interop/observable/TestObservableInterop.kt b/src/main/kotlin/interop/observable/TestObservableInterop.kt index 2e0d10f..8d0ba12 100644 --- a/src/main/kotlin/interop/observable/TestObservableInterop.kt +++ b/src/main/kotlin/interop/observable/TestObservableInterop.kt @@ -5,5 +5,5 @@ fun main() { val observableInterop = ObservableInterop() observableInterop.getStream().subscribe(::println, ::println) - Thread.sleep(2000) + Thread.sleep(2000) // Sleep main thread to allow coroutine to complete } \ No newline at end of file