Skip to content

Commit

Permalink
#72 handling the messages concurrently and in parallel - actors
Browse files Browse the repository at this point in the history
  • Loading branch information
idugalic committed Jul 9, 2023
1 parent ee6fae2 commit add4f22
Show file tree
Hide file tree
Showing 4 changed files with 349 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package com.fraktalio.fmodel.application

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.ObsoleteCoroutinesApi
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.channels.actor
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.flow.onCompletion
import kotlin.contracts.ExperimentalContracts
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.math.absoluteValue
Expand All @@ -28,7 +28,6 @@ import kotlin.math.absoluteValue
* @author Иван Дугалић / Ivan Dugalic / @idugalic
*/
@ObsoleteCoroutinesApi
@ExperimentalContracts
fun <C, S, E> EventSourcingAggregate<C, S, E>.handleConcurrently(
commands: Flow<C>,
numberOfActors: Int = 100,
Expand Down Expand Up @@ -68,7 +67,6 @@ fun <C, S, E> EventSourcingAggregate<C, S, E>.handleConcurrently(
* @author Иван Дугалић / Ivan Dugalic / @idugalic
*/
@ObsoleteCoroutinesApi
@ExperimentalContracts
fun <C, E> Flow<C>.publishConcurrentlyTo(
aggregate: EventSourcingAggregate<C, *, E>,
numberOfActors: Int = 100,
Expand All @@ -79,6 +77,206 @@ fun <C, E> Flow<C>.publishConcurrentlyTo(
): Flow<E> =
aggregate.handleConcurrently(this, numberOfActors, actorsCapacity, actorsStart, actorsContext) { partitionKey(it) }

/**
* Extension function - Handles the flow of command messages of type [C] by concurrently distributing the load across finite number of actors/handlers
*
* @param commands [Flow] of Command messages of type [C]
* @param numberOfActors total number of actors/workers available for distributing the load. Minimum one.
* @param actorsCapacity capacity of the actors channel's buffer
* @param actorsStart actors coroutine start option
* @param actorsContext additional to [CoroutineScope.coroutineContext] context of the actor coroutines.
* @param partitionKey a function that calculates the partition key/routing key of command - commands with the same partition key will be handled with the same 'actor' to keep the ordering
* @return [Flow] of stored Events of type [E]
*
* @author Иван Дугалић / Ivan Dugalic / @idugalic
*/
@ExperimentalCoroutinesApi
@ObsoleteCoroutinesApi
fun <C, S, E> EventSourcingOrchestratingAggregate<C, S, E>.handleConcurrently(
commands: Flow<C>,
numberOfActors: Int = 100,
actorsCapacity: Int = Channel.BUFFERED,
actorsStart: CoroutineStart = CoroutineStart.LAZY,
actorsContext: CoroutineContext = EmptyCoroutineContext,
partitionKey: (C) -> Int
): Flow<E> = channelFlow {
val actors: List<SendChannel<C>> = (1..numberOfActors).map {
commandActor(channel, actorsCapacity, actorsStart, actorsContext) { handle(it) }

}
commands
.onCompletion {
actors.forEach {
it.close()
}
}
.collect {
val partition = partitionKey(it).absoluteValue % numberOfActors.coerceAtLeast(1)
actors[partition].send(it)
}
}

/**
* Extension function - Publishes [Flow] of commands of type [C] to the event sourcing aggregate of type [EventSourcingAggregate]<[C], *, [E]> by concurrently distributing the load across finite number of actors
*
* @receiver [Flow] of commands of type [C]
* @param aggregate of type [EventSourcingAggregate]<[C], *, [E]>
* @param numberOfActors total number of actors/workers available for distributing the load. Minimum one.
* @param actorsCapacity capacity of the actors channel's buffer
* @param actorsStart actors coroutine start option
* @param actorsContext additional to [CoroutineScope.coroutineContext] context of the actor coroutines.
* @param partitionKey a function that calculates the partition key/routing key of command - commands with the same partition key will be handled with the same 'actor' to keep the ordering
* @return the [Flow] of stored Events of type [E]
*
* @author Иван Дугалић / Ivan Dugalic / @idugalic
*/
@ExperimentalCoroutinesApi
@ObsoleteCoroutinesApi
fun <C, E> Flow<C>.publishConcurrentlyTo(
aggregate: EventSourcingOrchestratingAggregate<C, *, E>,
numberOfActors: Int = 100,
actorsCapacity: Int = Channel.BUFFERED,
actorsStart: CoroutineStart = CoroutineStart.LAZY,
actorsContext: CoroutineContext = EmptyCoroutineContext,
partitionKey: (C) -> Int
): Flow<E> =
aggregate.handleConcurrently(this, numberOfActors, actorsCapacity, actorsStart, actorsContext) { partitionKey(it) }

/**
* Extension function - Handles the flow of command messages of type [C] by concurrently distributing the load across finite number of actors/handlers
*
* @param commands [Flow] of Command messages of type [C]
* @param numberOfActors total number of actors/workers available for distributing the load. Minimum one.
* @param actorsCapacity capacity of the actors channel's buffer
* @param actorsStart actors coroutine start option
* @param actorsContext additional to [CoroutineScope.coroutineContext] context of the actor coroutines.
* @param partitionKey a function that calculates the partition key/routing key of command - commands with the same partition key will be handled with the same 'actor' to keep the ordering
* @return [Flow] of stored Events of type [E] with version [V]
*
* @author Иван Дугалић / Ivan Dugalic / @idugalic
*/
@ObsoleteCoroutinesApi
fun <C, S, E, V> EventSourcingLockingAggregate<C, S, E, V>.handleConcurrentlyAndOptimistically(
commands: Flow<C>,
numberOfActors: Int = 100,
actorsCapacity: Int = Channel.BUFFERED,
actorsStart: CoroutineStart = CoroutineStart.LAZY,
actorsContext: CoroutineContext = EmptyCoroutineContext,
partitionKey: (C) -> Int
): Flow<Pair<E, V>> = channelFlow {
val actors: List<SendChannel<C>> = (1..numberOfActors).map {
commandActor(channel, actorsCapacity, actorsStart, actorsContext) { handleOptimistically(it) }
}
commands
.onCompletion {
actors.forEach {
it.close()
}
}
.collect {
val partition = partitionKey(it).absoluteValue % numberOfActors.coerceAtLeast(1)
actors[partition].send(it)
}
}

/**
* Extension function - Publishes [Flow] of commands of type [C] to the event sourcing aggregate of type [EventSourcingAggregate]<[C], *, [E]> by concurrently distributing the load across finite number of actors
*
* @receiver [Flow] of commands of type [C]
* @param aggregate of type [EventSourcingAggregate]<[C], *, [E]>
* @param numberOfActors total number of actors/workers available for distributing the load. Minimum one.
* @param actorsCapacity capacity of the actors channel's buffer
* @param actorsStart actors coroutine start option
* @param actorsContext additional to [CoroutineScope.coroutineContext] context of the actor coroutines.
* @param partitionKey a function that calculates the partition key/routing key of command - commands with the same partition key will be handled with the same 'actor' to keep the ordering
* @return the [Flow] of stored Events of type [E] and version [V]
*
* @author Иван Дугалић / Ivan Dugalic / @idugalic
*/
@ObsoleteCoroutinesApi
fun <C, E, V> Flow<C>.publishConcurrentlyAndOptimisticallyTo(
aggregate: EventSourcingLockingAggregate<C, *, E, V>,
numberOfActors: Int = 100,
actorsCapacity: Int = Channel.BUFFERED,
actorsStart: CoroutineStart = CoroutineStart.LAZY,
actorsContext: CoroutineContext = EmptyCoroutineContext,
partitionKey: (C) -> Int
): Flow<Pair<E, V>> = aggregate.handleConcurrentlyAndOptimistically(
this,
numberOfActors,
actorsCapacity,
actorsStart,
actorsContext
) { partitionKey(it) }

/**
* Extension function - Handles the flow of command messages of type [C] by concurrently distributing the load across finite number of actors/handlers
*
* @param commands [Flow] of Command messages of type [C]
* @param numberOfActors total number of actors/workers available for distributing the load. Minimum one.
* @param actorsCapacity capacity of the actors channel's buffer
* @param actorsStart actors coroutine start option
* @param actorsContext additional to [CoroutineScope.coroutineContext] context of the actor coroutines.
* @param partitionKey a function that calculates the partition key/routing key of command - commands with the same partition key will be handled with the same 'actor' to keep the ordering
* @return [Flow] of stored Events of type [E] with version [V]
*
* @author Иван Дугалић / Ivan Dugalic / @idugalic
*/
@ExperimentalCoroutinesApi
@ObsoleteCoroutinesApi
fun <C, S, E, V> EventSourcingLockingOrchestratingAggregate<C, S, E, V>.handleConcurrentlyAndOptimistically(
commands: Flow<C>,
numberOfActors: Int = 100,
actorsCapacity: Int = Channel.BUFFERED,
actorsStart: CoroutineStart = CoroutineStart.LAZY,
actorsContext: CoroutineContext = EmptyCoroutineContext,
partitionKey: (C) -> Int
): Flow<Pair<E, V>> = channelFlow {
val actors: List<SendChannel<C>> = (1..numberOfActors).map {
commandActor(channel, actorsCapacity, actorsStart, actorsContext) { handleOptimistically(it) }
}
commands
.onCompletion {
actors.forEach {
it.close()
}
}
.collect {
val partition = partitionKey(it).absoluteValue % numberOfActors.coerceAtLeast(1)
actors[partition].send(it)
}
}

/**
* Extension function - Publishes [Flow] of commands of type [C] to the event sourcing aggregate of type [EventSourcingAggregate]<[C], *, [E]> by concurrently distributing the load across finite number of actors
*
* @receiver [Flow] of commands of type [C]
* @param aggregate of type [EventSourcingAggregate]<[C], *, [E]>
* @param numberOfActors total number of actors/workers available for distributing the load. Minimum one.
* @param actorsCapacity capacity of the actors channel's buffer
* @param actorsStart actors coroutine start option
* @param actorsContext additional to [CoroutineScope.coroutineContext] context of the actor coroutines.
* @param partitionKey a function that calculates the partition key/routing key of command - commands with the same partition key will be handled with the same 'actor' to keep the ordering
* @return the [Flow] of stored Events of type [E] and version [V]
*
* @author Иван Дугалић / Ivan Dugalic / @idugalic
*/
@ExperimentalCoroutinesApi
@ObsoleteCoroutinesApi
fun <C, E, V> Flow<C>.publishConcurrentlyAndOptimisticallyTo(
aggregate: EventSourcingLockingOrchestratingAggregate<C, *, E, V>,
numberOfActors: Int = 100,
actorsCapacity: Int = Channel.BUFFERED,
actorsStart: CoroutineStart = CoroutineStart.LAZY,
actorsContext: CoroutineContext = EmptyCoroutineContext,
partitionKey: (C) -> Int
): Flow<Pair<E, V>> = aggregate.handleConcurrentlyAndOptimistically(
this,
numberOfActors,
actorsCapacity,
actorsStart,
actorsContext
) { partitionKey(it) }

/**
* Command Actor - Event Sourced Aggregate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import kotlinx.coroutines.channels.actor
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.flow.onCompletion
import kotlin.contracts.ExperimentalContracts
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.math.absoluteValue
Expand All @@ -45,15 +44,14 @@ import kotlin.math.absoluteValue
* @author Иван Дугалић / Ivan Dugalic / @idugalic
*/
@ObsoleteCoroutinesApi
@ExperimentalContracts
fun <S, E> MaterializedView<S, E>.handleConcurrently(
fun <S, E, I> I.handleConcurrently(
events: Flow<E>,
numberOfActors: Int = 100,
actorsCapacity: Int = Channel.BUFFERED,
actorsStart: CoroutineStart = CoroutineStart.LAZY,
actorsContext: CoroutineContext = EmptyCoroutineContext,
partitionKey: (E) -> Int
): Flow<S> = channelFlow {
): Flow<S> where I : ViewStateComputation<S, E>, I : ViewStateRepository<E, S> = channelFlow {
val actors: List<SendChannel<E>> = (1..numberOfActors).map {
eventActor(channel, actorsCapacity, actorsStart, actorsContext) { handle(it) }
}
Expand All @@ -69,6 +67,44 @@ fun <S, E> MaterializedView<S, E>.handleConcurrently(
}
}

/**
* Extension function - Handles the flow of events of type [E] by concurrently distributing the load across finite number of actors/handlers
*
* @param events Flow of Events of type [E] to be handled
* @param numberOfActors total number of actors/workers available for distributing the load. Minimum one.
* @param actorsCapacity capacity of the actors channel's buffer
* @param actorsStart actors coroutine start option
* @param actorsContext additional to [CoroutineScope.coroutineContext] context of the actor coroutines.
* @param partitionKey a function that calculates the partition key/routing key of event - events with the same partition key will be handled with the same 'actor' to keep the ordering
* @return [Flow] of State of type `Pair<S, V>`
*
* @author Иван Дугалић / Ivan Dugalic / @idugalic
*/
@ObsoleteCoroutinesApi
fun <S, E, V, I> I.handleConcurrentlyAndOptimistically(
events: Flow<E>,
numberOfActors: Int = 100,
actorsCapacity: Int = Channel.BUFFERED,
actorsStart: CoroutineStart = CoroutineStart.LAZY,
actorsContext: CoroutineContext = EmptyCoroutineContext,
partitionKey: (E) -> Int
): Flow<Pair<S, V>> where I : ViewStateComputation<S, E>, I : ViewStateLockingRepository<E, S, V> = channelFlow {
val actors: List<SendChannel<E>> = (1..numberOfActors).map {
eventActor(channel, actorsCapacity, actorsStart, actorsContext) { handleOptimistically(it) }
}
events
.onCompletion {
actors.forEach {
it.close()
}
}
.collect {
val partition = partitionKey(it).absoluteValue % numberOfActors.coerceAtLeast(1)
actors[partition].send(it)
}
}


/**
* Extension function - Publishes the event of type [E] to the materialized view of type [MaterializedView]<[S], [E]> by concurrently distributing the load across finite number of actors/handlers
*
Expand All @@ -84,24 +120,52 @@ fun <S, E> MaterializedView<S, E>.handleConcurrently(
* @author Иван Дугалић / Ivan Dugalic / @idugalic
*/
@ObsoleteCoroutinesApi
@ExperimentalContracts
fun <S, E> Flow<E>.publishConcurrentlyTo(
materializedView: MaterializedView<S, E>,
fun <S, E, M> Flow<E>.publishConcurrentlyTo(
materializedView: M,
numberOfActors: Int = 100,
actorsCapacity: Int = Channel.BUFFERED,
actorsStart: CoroutineStart = CoroutineStart.LAZY,
actorsContext: CoroutineContext = EmptyCoroutineContext,
partitionKey: (E) -> Int
): Flow<S> =
materializedView.handleConcurrently(
): Flow<S> where M : ViewStateComputation<S, E>, M : ViewStateRepository<E, S> = materializedView.handleConcurrently(
this,
numberOfActors,
actorsCapacity,
actorsStart,
actorsContext
) { partitionKey(it) }

/**
* Extension function - Publishes the event of type [E] to the materialized view of type [MaterializedView]<[S], [E]> by concurrently distributing the load across finite number of actors/handlers
*
* @receiver [Flow] of events of type [E]
* @param materializedView of type [MaterializedView]<[S], [E]>
* @param numberOfActors total number of actors/workers available for distributing the load. Minimum one.
* @param actorsCapacity capacity of the actors channel's buffer
* @param actorsStart actors coroutine start option
* @param actorsContext additional to [CoroutineScope.coroutineContext] context of the actor coroutines.
* @param partitionKey a function that calculates the partition key/routing key of event - events with the same partition key will be handled with the same 'actor' to keep the ordering
* @return [Flow] of State of type `Pair<S, V>`
*
* @author Иван Дугалић / Ivan Dugalic / @idugalic
*/
@ObsoleteCoroutinesApi
fun <S, E, V, M> Flow<E>.publishConcurrentlyAndOptimisticallyTo(
materializedView: M,
numberOfActors: Int = 100,
actorsCapacity: Int = Channel.BUFFERED,
actorsStart: CoroutineStart = CoroutineStart.LAZY,
actorsContext: CoroutineContext = EmptyCoroutineContext,
partitionKey: (E) -> Int
): Flow<Pair<S, V>> where M : ViewStateComputation<S, E>, M : ViewStateLockingRepository<E, S, V> =
materializedView.handleConcurrentlyAndOptimistically(
this,
numberOfActors,
actorsCapacity,
actorsStart,
actorsContext
) { partitionKey(it) }


/**
* Event Actor - Materialized View
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,12 @@ import kotlinx.coroutines.channels.actor
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.flow.onCompletion
import kotlin.contracts.ExperimentalContracts
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.math.absoluteValue

/**
* Extension function - Handles the the [Flow] of action results of type [AR] by concurrently distributing the load across finite number of actors/handlers
* Extension function - Handles the [Flow] of action results of type [AR] by concurrently distributing the load across finite number of actors/handlers
*
* @param actionResults Action Results represent the outcome of some action you want to handle in some way
* @param numberOfActors total number of actors/workers available for distributing the load. Minimum one.
Expand All @@ -44,7 +43,6 @@ import kotlin.math.absoluteValue
* @author Иван Дугалић / Ivan Dugalic / @idugalic
*/
@ObsoleteCoroutinesApi
@ExperimentalContracts
fun <AR, A> SagaManager<AR, A>.handleConcurrently(
actionResults: Flow<AR>,
numberOfActors: Int = 100,
Expand Down Expand Up @@ -82,7 +80,6 @@ fun <AR, A> SagaManager<AR, A>.handleConcurrently(
* @author Иван Дугалић / Ivan Dugalic / @idugalic
*/
@ObsoleteCoroutinesApi
@ExperimentalContracts
fun <AR, A> Flow<AR>.publishConcurrentlyTo(
sagaManager: SagaManager<AR, A>,
numberOfActors: Int = 100,
Expand Down
Loading

0 comments on commit add4f22

Please sign in to comment.