diff --git a/chat-android/src/main/java/com/ably/chat/AtomicCoroutineScope.kt b/chat-android/src/main/java/com/ably/chat/AtomicCoroutineScope.kt new file mode 100644 index 0000000..28e06b5 --- /dev/null +++ b/chat-android/src/main/java/com/ably/chat/AtomicCoroutineScope.kt @@ -0,0 +1,77 @@ +package com.ably.chat + +import java.util.PriorityQueue +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.async +import kotlinx.coroutines.cancel +import kotlinx.coroutines.launch + +/** + * AtomicCoroutineScope makes sure all operations are atomic and run with given priority. + * Each ChatRoomLifecycleManager is supposed to have it's own AtomicCoroutineScope. + * Uses limitedParallelism set to 1 to make sure coroutines under given scope do not run in parallel. + * See [Kotlin Dispatchers](https://kt.academy/article/cc-dispatchers) for more information. + */ +class AtomicCoroutineScope { + + private val sequentialScope = CoroutineScope(Dispatchers.Default.limitedParallelism(1)) + + private class Job( + private val priority: Int, + val coroutineBlock: suspend CoroutineScope.() -> Any, + val deferredResult: CompletableDeferred, + val queuedPriority: Int, + ) : + Comparable { + override fun compareTo(other: Job): Int { + if (this.priority == other.priority) { + return this.queuedPriority.compareTo(other.queuedPriority) + } + return this.priority.compareTo(other.priority) + } + } + + private var isRunning = false + private var queueCounter = 0 + private val jobs: PriorityQueue = PriorityQueue() + + /** + * @param priority Defines priority for the operation execution. + * @param coroutineBlock Suspended function that needs to be executed mutually exclusive under given scope. + */ + suspend fun async(priority: Int = 0, coroutineBlock: suspend CoroutineScope.() -> T): CompletableDeferred { + val deferredResult = CompletableDeferred() + sequentialScope.launch { + jobs.add(Job(priority, coroutineBlock, deferredResult, queueCounter++)) + if (!isRunning) { + isRunning = true + while (jobs.isNotEmpty()) { + val job = jobs.poll() + job?.let { + try { + val result = sequentialScope.async(block = it.coroutineBlock).await() + it.deferredResult.complete(result) + } catch (t: Throwable) { + it.deferredResult.completeExceptionally(t) + } + } + } + isRunning = false + } + } + + @Suppress("UNCHECKED_CAST") + return deferredResult as CompletableDeferred + } + + /** + * Cancels all jobs along with it's children. + * This includes cancelling queued jobs and current retry timers. + */ + fun cancel(message: String, cause: Throwable? = null) { + jobs.clear() + sequentialScope.cancel(message, cause) + } +} diff --git a/chat-android/src/main/java/com/ably/chat/RoomLifecycleManager.kt b/chat-android/src/main/java/com/ably/chat/RoomLifecycleManager.kt index f35a277..a3f722c 100644 --- a/chat-android/src/main/java/com/ably/chat/RoomLifecycleManager.kt +++ b/chat-android/src/main/java/com/ably/chat/RoomLifecycleManager.kt @@ -1,5 +1,6 @@ package com.ably.chat +import io.ably.lib.types.AblyException import io.ably.lib.types.ErrorInfo import io.ably.lib.util.Log.LogHandler import io.ably.lib.realtime.Channel as AblyRealtimeChannel @@ -43,7 +44,7 @@ interface ResolvedContributor { * The order of precedence for lifecycle operations, passed to PriorityQueueExecutor which allows * us to ensure that internal operations take precedence over user-driven operations. */ -enum class LifecycleOperationPrecedence(val operationPriority: Int) { +enum class LifecycleOperationPrecedence(val priority: Int) { Internal(1), Release(2), AttachOrDetach(3), @@ -83,6 +84,13 @@ class RoomLifecycleManager */ private val _logger: LogHandler? = logger + /** + * AtomicCoroutineScope makes sure all operations are atomic and run with given priority. + * See [Kotlin Dispatchers](https://kt.academy/article/cc-dispatchers) for more information. + * Spec: CHA-RL7 + */ + private val atomicCoroutineScope = AtomicCoroutineScope() + /** * This flag indicates whether some sort of controlled operation is in progress (e.g. attaching, detaching, releasing). * @@ -105,7 +113,40 @@ class RoomLifecycleManager // TODO - [CHA-RL4] set up room monitoring here } - suspend fun attach() { - TODO("Not yet implemented") + internal suspend fun attach() { + val deferredAttach = atomicCoroutineScope.async(LifecycleOperationPrecedence.AttachOrDetach.priority) { + when (_status.current) { + RoomLifecycle.Attached -> return@async + RoomLifecycle.Releasing -> + throw AblyException.fromErrorInfo( + ErrorInfo( + "Can't ATTACH since room is in RELEASING state", + ErrorCodes.RoomIsReleasing.errorCode, + ), + ) + RoomLifecycle.Released -> + throw AblyException.fromErrorInfo( + ErrorInfo( + "Can't ATTACH since room is in RELEASED state", + ErrorCodes.RoomIsReleased.errorCode, + ), + ) + else -> {} + } + doAttach() + } + deferredAttach.await() + } + + /** + * + * Attaches each feature channel with rollback on channel attach failure. + * This method is re-usable and can be called as a part of internal room operations. + * + */ + private suspend fun doAttach() { + for (feature in _contributors) { + feature.channel.attachCoroutine() + } } } diff --git a/chat-android/src/test/java/com/ably/chat/AtomicCoroutineScopeTest.kt b/chat-android/src/test/java/com/ably/chat/AtomicCoroutineScopeTest.kt new file mode 100644 index 0000000..8a1f7e1 --- /dev/null +++ b/chat-android/src/test/java/com/ably/chat/AtomicCoroutineScopeTest.kt @@ -0,0 +1,105 @@ +package com.ably.chat + +import io.ably.lib.types.AblyException +import io.ably.lib.types.ErrorInfo +import kotlin.time.DurationUnit +import kotlin.time.toDuration +import kotlinx.coroutines.Deferred +import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.delay +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.test.runTest +import org.junit.Assert +import org.junit.Test + +class AtomicCoroutineScopeTest { + + @Test + fun `should perform given operation`() = runTest { + val atomicCoroutineScope = AtomicCoroutineScope() + val deferredResult = atomicCoroutineScope.async { + delay(3000) + return@async "Operation Success!" + } + val result = deferredResult.await() + Assert.assertEquals("Operation Success!", result) + } + + @Test + fun `should capture failure of the given operation`() = runTest { + val atomicCoroutineScope = AtomicCoroutineScope() + val deferredResult = atomicCoroutineScope.async { + delay(2000) + throw AblyException.fromErrorInfo(ErrorInfo("Error performing operation", 400)) + } + val ex = Assert.assertThrows(AblyException::class.java) { + runBlocking { + deferredResult.await() + } + } + Assert.assertEquals("Error performing operation", ex.errorInfo.message) + } + + @Test + fun `should perform mutually exclusive operations`() = runTest { + val atomicCoroutineScope = AtomicCoroutineScope() + val deferredResults = mutableListOf>() + var operationInProgress = false + var counter = 0 + val threadIds = mutableSetOf() + + repeat(10) { + val result = atomicCoroutineScope.async { + threadIds.add(Thread.currentThread().id) + if (operationInProgress) { + error("Can't perform operation when other operation is going on") + } + operationInProgress = true + delay((200..800).random().toDuration(DurationUnit.MILLISECONDS)) + operationInProgress = false + val returnValue = counter++ + return@async returnValue + } + deferredResults.add(result) + } + + val results = deferredResults.awaitAll() + repeat(10) { + Assert.assertEquals(it, results[it]) + } + // Scheduler should run all async operations under single thread + Assert.assertEquals(1, threadIds.size) + } + + @Test + fun `should perform mutually exclusive operations with given priority`() = runTest { + val atomicCoroutineScope = AtomicCoroutineScope() + val deferredResults = mutableListOf>() + var operationInProgress = false + var counter = 0 + val threadIds = mutableSetOf() + + repeat(10) { + val result = atomicCoroutineScope.async(10 - it) { + threadIds.add(Thread.currentThread().id) + if (operationInProgress) { + error("Can't perform operation when other operation is going on") + } + operationInProgress = true + delay((200..800).random().toDuration(DurationUnit.MILLISECONDS)) + operationInProgress = false + val returnValue = counter++ + return@async returnValue + } + deferredResults.add(result) + } + + val results = deferredResults.awaitAll() + val expectedResults = listOf(0, 9, 8, 7, 6, 5, 4, 3, 2, 1) + repeat(10) { + Assert.assertEquals(expectedResults[it], results[it]) + } + // Scheduler should run all async operations under single thread + Assert.assertEquals(1, threadIds.size) + } +}