Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ECO-4953][CHA-RL7] RoomLifecycle : Atomic Coroutinescope #32

Draft
wants to merge 2 commits into
base: feature/room-ATTACH
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions chat-android/src/main/java/com/ably/chat/AtomicCoroutineScope.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package com.ably.chat

import java.util.PriorityQueue
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.cancel
import kotlinx.coroutines.launch

/**
* AtomicCoroutineScope makes sure all operations are atomic and run with given priority.
* Each ChatRoomLifecycleManager is supposed to have it's own AtomicCoroutineScope.
* Uses limitedParallelism set to 1 to make sure coroutines under given scope do not run in parallel.
* See [Kotlin Dispatchers](https://kt.academy/article/cc-dispatchers) for more information.
*/
class AtomicCoroutineScope {

private val sequentialScope = CoroutineScope(Dispatchers.Default.limitedParallelism(1))

private class Job(
private val priority: Int,
val coroutineBlock: suspend CoroutineScope.() -> Any,
val deferredResult: CompletableDeferred<Any>,
val queuedPriority: Int,
) :
Comparable<Job> {
override fun compareTo(other: Job): Int {
if (this.priority == other.priority) {
return this.queuedPriority.compareTo(other.queuedPriority)
}
return this.priority.compareTo(other.priority)
}
}

private var isRunning = false
private var queueCounter = 0
private val jobs: PriorityQueue<Job> = PriorityQueue()

/**
* @param priority Defines priority for the operation execution.
* @param coroutineBlock Suspended function that needs to be executed mutually exclusive under given scope.
*/
suspend fun <T : Any>async(priority: Int = 0, coroutineBlock: suspend CoroutineScope.() -> T): CompletableDeferred<T> {
val deferredResult = CompletableDeferred<Any>()
sequentialScope.launch {
jobs.add(Job(priority, coroutineBlock, deferredResult, queueCounter++))
if (!isRunning) {
isRunning = true
while (jobs.isNotEmpty()) {
val job = jobs.poll()
job?.let {
try {
val result = sequentialScope.async(block = it.coroutineBlock).await()
it.deferredResult.complete(result)
} catch (t: Throwable) {
it.deferredResult.completeExceptionally(t)
}
}
}
isRunning = false
}
}

@Suppress("UNCHECKED_CAST")
return deferredResult as CompletableDeferred<T>
}

/**
* Cancels all jobs along with it's children.
* This includes cancelling queued jobs and current retry timers.
*/
fun cancel(message: String, cause: Throwable? = null) {
jobs.clear()
sequentialScope.cancel(message, cause)
}
}
47 changes: 44 additions & 3 deletions chat-android/src/main/java/com/ably/chat/RoomLifecycleManager.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.ably.chat

import io.ably.lib.types.AblyException
import io.ably.lib.types.ErrorInfo
import io.ably.lib.util.Log.LogHandler
import io.ably.lib.realtime.Channel as AblyRealtimeChannel
Expand Down Expand Up @@ -43,7 +44,7 @@ interface ResolvedContributor {
* The order of precedence for lifecycle operations, passed to PriorityQueueExecutor which allows
* us to ensure that internal operations take precedence over user-driven operations.
*/
enum class LifecycleOperationPrecedence(val operationPriority: Int) {
enum class LifecycleOperationPrecedence(val priority: Int) {
Internal(1),
Release(2),
AttachOrDetach(3),
Expand Down Expand Up @@ -83,6 +84,13 @@ class RoomLifecycleManager
*/
private val _logger: LogHandler? = logger

/**
* AtomicCoroutineScope makes sure all operations are atomic and run with given priority.
* See [Kotlin Dispatchers](https://kt.academy/article/cc-dispatchers) for more information.
* Spec: CHA-RL7
*/
private val atomicCoroutineScope = AtomicCoroutineScope()

/**
* This flag indicates whether some sort of controlled operation is in progress (e.g. attaching, detaching, releasing).
*
Expand All @@ -105,7 +113,40 @@ class RoomLifecycleManager
// TODO - [CHA-RL4] set up room monitoring here
}

suspend fun attach() {
TODO("Not yet implemented")
internal suspend fun attach() {
val deferredAttach = atomicCoroutineScope.async(LifecycleOperationPrecedence.AttachOrDetach.priority) {
when (_status.current) {
RoomLifecycle.Attached -> return@async
RoomLifecycle.Releasing ->
throw AblyException.fromErrorInfo(
ErrorInfo(
"Can't ATTACH since room is in RELEASING state",
ErrorCodes.RoomIsReleasing.errorCode,
),
)
RoomLifecycle.Released ->
throw AblyException.fromErrorInfo(
ErrorInfo(
"Can't ATTACH since room is in RELEASED state",
ErrorCodes.RoomIsReleased.errorCode,
),
)
else -> {}
}
doAttach()
}
deferredAttach.await()
}

/**
*
* Attaches each feature channel with rollback on channel attach failure.
* This method is re-usable and can be called as a part of internal room operations.
*
*/
private suspend fun doAttach() {
for (feature in _contributors) {
feature.channel.attachCoroutine()
}
}
}
105 changes: 105 additions & 0 deletions chat-android/src/test/java/com/ably/chat/AtomicCoroutineScopeTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package com.ably.chat

import io.ably.lib.types.AblyException
import io.ably.lib.types.ErrorInfo
import kotlin.time.DurationUnit
import kotlin.time.toDuration
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.runTest
import org.junit.Assert
import org.junit.Test

class AtomicCoroutineScopeTest {

@Test
fun `should perform given operation`() = runTest {
val atomicCoroutineScope = AtomicCoroutineScope()
val deferredResult = atomicCoroutineScope.async {
delay(3000)
return@async "Operation Success!"
}
val result = deferredResult.await()
Assert.assertEquals("Operation Success!", result)
}

@Test
fun `should capture failure of the given operation`() = runTest {
val atomicCoroutineScope = AtomicCoroutineScope()
val deferredResult = atomicCoroutineScope.async {
delay(2000)
throw AblyException.fromErrorInfo(ErrorInfo("Error performing operation", 400))
}
val ex = Assert.assertThrows(AblyException::class.java) {
runBlocking {
deferredResult.await()
}
}
Assert.assertEquals("Error performing operation", ex.errorInfo.message)
}

@Test
fun `should perform mutually exclusive operations`() = runTest {
val atomicCoroutineScope = AtomicCoroutineScope()
val deferredResults = mutableListOf<Deferred<Int>>()
var operationInProgress = false
var counter = 0
val threadIds = mutableSetOf<Long>()

repeat(10) {
val result = atomicCoroutineScope.async {
threadIds.add(Thread.currentThread().id)
if (operationInProgress) {
error("Can't perform operation when other operation is going on")
}
operationInProgress = true
delay((200..800).random().toDuration(DurationUnit.MILLISECONDS))
operationInProgress = false
val returnValue = counter++
return@async returnValue
}
deferredResults.add(result)
}

val results = deferredResults.awaitAll()
repeat(10) {
Assert.assertEquals(it, results[it])
}
// Scheduler should run all async operations under single thread
Assert.assertEquals(1, threadIds.size)
}

@Test
fun `should perform mutually exclusive operations with given priority`() = runTest {
val atomicCoroutineScope = AtomicCoroutineScope()
val deferredResults = mutableListOf<Deferred<Int>>()
var operationInProgress = false
var counter = 0
val threadIds = mutableSetOf<Long>()

repeat(10) {
val result = atomicCoroutineScope.async(10 - it) {
threadIds.add(Thread.currentThread().id)
if (operationInProgress) {
error("Can't perform operation when other operation is going on")
}
operationInProgress = true
delay((200..800).random().toDuration(DurationUnit.MILLISECONDS))
operationInProgress = false
val returnValue = counter++
return@async returnValue
}
deferredResults.add(result)
}

val results = deferredResults.awaitAll()
val expectedResults = listOf(0, 9, 8, 7, 6, 5, 4, 3, 2, 1)
repeat(10) {
Assert.assertEquals(expectedResults[it], results[it])
}
// Scheduler should run all async operations under single thread
Assert.assertEquals(1, threadIds.size)
}
}
Loading