Skip to content

Commit

Permalink
small refactoring of registerMaterializedViewAndStartPooling function
Browse files Browse the repository at this point in the history
  • Loading branch information
idugalic committed Jul 8, 2023
1 parent 29012b4 commit 7a1b31c
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 26 deletions.
3 changes: 1 addition & 2 deletions src/main/kotlin/com/fraktalio/Application.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import io.ktor.server.cio.*
import io.ktor.util.logging.*
import io.r2dbc.spi.ConnectionFactory
import kotlinx.coroutines.awaitCancellation
import kotlinx.coroutines.launch

/**
* Simple logger
Expand Down Expand Up @@ -58,7 +57,7 @@ fun main(): Unit = SuspendApp {
restaurantView(),
orderView(),
materializedViewStateRepository
).also { launch { eventStream.registerMaterializedViewAndStartPooling("view", it) } }
).also { eventStream.registerMaterializedViewAndStartPooling("view", it, this@SuspendApp) }

server(CIO, host = httpEnv.host, port = httpEnv.port) {
configureSerialization()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.*
import kotlinx.serialization.Serializable
import kotlinx.serialization.decodeFromString
import kotlinx.serialization.json.Json
import java.time.LocalDateTime
import java.time.Month
Expand Down Expand Up @@ -627,36 +626,36 @@ internal class EventStream(private val connectionFactory: ConnectionFactory) {
* Register a materialized view and start pooling events
* @param view the view name
* @param materializedView the materialized view to register - event handler
* @param scope the coroutine scope
*
* Uses [Dispatchers.IO] dispatcher with a limited parallelism
*/
suspend fun registerMaterializedViewAndStartPooling(
fun registerMaterializedViewAndStartPooling(
view: String,
materializedView: MaterializedView<MaterializedViewState, Event?>
materializedView: MaterializedView<MaterializedViewState, Event?>,
scope: CoroutineScope
) {
withContext(dbDispatcher) {
coroutineScope {
val actions = Channel<Action>()
streamEvents(view, actions.receiveAsFlow())
.onStart {
launch {
actions.send(Ack(-1, "start"))
}
scope.launch(dbDispatcher) {
val actions = Channel<Action>()
streamEvents(view, actions.receiveAsFlow())
.onStart {
launch {
actions.send(Ack(-1, "start"))
}
.onEach {
try {
materializedView.handle(it.first)
actions.send(Ack(it.second, it.first.deciderId()))
} catch (e: Exception) {
LOGGER.error("Error while handling event, retrying in 10 seconds ${it.first}", e)
actions.send(ScheduleNack(10000, it.first.deciderId()))
}
}
.retry(5) { cause ->
cause !is CancellationException
}
.onEach {
try {
materializedView.handle(it.first)
actions.send(Ack(it.second, it.first.deciderId()))
} catch (e: Exception) {
LOGGER.error("Error while handling event, retrying in 10 seconds ${it.first}", e)
actions.send(ScheduleNack(10000, it.first.deciderId()))
}
.collect()
}
}
.retry(5) { cause ->
cause !is CancellationException
}
.collect()
}
}
}

0 comments on commit 7a1b31c

Please sign in to comment.