Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ECO-4953][CHA-RL7] Executor for mutually exclusive room lifecycle operations #29

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions chat-android/src/main/java/com/ably/chat/AtomicCoroutineScope.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package com.ably.chat

import java.util.PriorityQueue
import java.util.concurrent.Executors
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExecutorCoroutineDispatcher
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.async
import kotlinx.coroutines.cancel
import kotlinx.coroutines.launch

/**
* Each ChatRoomLifecycleManager is supposed to have it's own AtomicCoroutineScope.
* AtomicCoroutineScope makes sure all operations are atomic and run with given priority.
* Uses single threaded dispatcher to avoid thread synchronization issues.
*/
class AtomicCoroutineScope private constructor(private val scope: CoroutineScope) {

private class Job(
private val priority: Int,
val coroutineBlock: suspend CoroutineScope.() -> Any,
val deferredResult: CompletableDeferred<Any>,
) :
Comparable<Job> {
override fun compareTo(other: Job): Int = this.priority.compareTo(other.priority)
}

private var isRunning = false
private val jobs: PriorityQueue<Job> = PriorityQueue()

/**
* @param priority Defines priority for the operation execution.
* @param coroutineBlock Suspended function that needs to be executed mutually exclusive under given scope.
*/
suspend fun <T : Any>async(priority: Int = 0, coroutineBlock: suspend CoroutineScope.() -> T): CompletableDeferred<T> {
val deferredResult = CompletableDeferred<Any>()
scope.launch {
jobs.add(Job(priority, coroutineBlock, deferredResult))
if (!isRunning) {
isRunning = true
while (jobs.isNotEmpty()) {
val job = jobs.poll()
job?.let {
try {
it.deferredResult.complete(scope.async(block = it.coroutineBlock).await())
} catch (t: Throwable) {
it.deferredResult.completeExceptionally(t)
}
}
}
isRunning = false
}
}

@Suppress("UNCHECKED_CAST")
return deferredResult as CompletableDeferred<T>
}

/**
* Cancels all jobs along along with it's children.
* This includes cancelling queued jobs and current retry timers.
*/
fun cancel(message: String, cause: Throwable? = null) {
jobs.clear()
scope.cancel(message, cause)
}

companion object {
private var _singleThreadedDispatcher : ExecutorCoroutineDispatcher? = null

fun create(): AtomicCoroutineScope {
if (_singleThreadedDispatcher == null) {
_singleThreadedDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher();
}
return AtomicCoroutineScope(CoroutineScope(singleThreadedDispatcher))
}

val singleThreadedDispatcher: ExecutorCoroutineDispatcher
get() {
return _singleThreadedDispatcher?: error("Call SingleThreadedExecutor.create() method to initialize SingleThreadedDispatcher")
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package com.ably.chat

import io.ably.lib.types.AblyException
import io.ably.lib.types.ErrorInfo
import kotlin.time.DurationUnit
import kotlin.time.toDuration
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.runTest
import org.junit.Assert
import org.junit.Test

class AtomicCoroutineScopeTest {

@Test
fun `should perform given operation`() = runTest {
val atomicCoroutineScope = AtomicCoroutineScope.create()
val deferredResult = atomicCoroutineScope.async {
delay(3000)
return@async "Operation Success!"
}
val result = deferredResult.await()
Assert.assertEquals("Operation Success!", result)
}

@Test
fun `should capture failure of the given operation`() = runTest {
val atomicCoroutineScope = AtomicCoroutineScope.create()
val deferredResult = atomicCoroutineScope.async {
delay(2000)
throw AblyException.fromErrorInfo(ErrorInfo("Error performing operation", 400))
}
Assert.assertThrows("Error performing operation", AblyException::class.java) {
runBlocking {
deferredResult.await()
}
}
}

@Test
fun `should perform mutually exclusive operations with given priority`() = runTest {
val atomicCoroutineScope = AtomicCoroutineScope.create()
val deferredResults = mutableListOf<Deferred<Int>>()
var operationInProgress = false
var counter = 0
val threadIds = mutableSetOf<Long>()

repeat(20) {
val result = atomicCoroutineScope.async(it) {
threadIds.add(Thread.currentThread().id)
if (operationInProgress) {
error("Can't perform operation when other operation is going on")
}
operationInProgress = true
delay((200..800).random().toDuration(DurationUnit.MILLISECONDS))
operationInProgress = false
val returnValue = counter++
return@async returnValue
}
deferredResults.add(result)
}

val results = deferredResults.awaitAll()
repeat(20) {
Assert.assertEquals(it, results[it])
}
// Scheduler should run all async operations under single thread
Assert.assertEquals(1, threadIds.size)
}
}
Loading