Skip to content

Commit

Permalink
#202 Introducing snapshotting event sourced aggregate - draft 3
Browse files Browse the repository at this point in the history
  • Loading branch information
idugalic committed Jun 11, 2023
1 parent 7fb2444 commit c82e393
Show file tree
Hide file tree
Showing 3 changed files with 214 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@

package com.fraktalio.fmodel.application

import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.*

/**
* Extension function - Handles the command message of type [C]
* Extension function - Handles the command message of type [C] by the snapshotting, event sourced aggregate.
*
* @param command Command message of type [C]
* @return State of type [S]
Expand Down Expand Up @@ -51,7 +52,7 @@ fun <C, S, E, I> I.handle(command: C): Flow<E> where I : StateComputation<C, S,
}

/**
* Extension function - Handles the command message of type [C] to the snapshotting, locking event sourced aggregate, optimistically
* Extension function - Handles the command message of type [C] by the snapshotting, locking event sourced aggregate, optimistically
*
* @param command Command message of type [C]
* @return State of type [Pair]<[S], [V]>, in which [V] is the type of the Version (optimistic locking)
Expand Down Expand Up @@ -90,3 +91,87 @@ suspend fun <C, S, E, V, I> I.handleOptimistically(command: C): Flow<Pair<E, V>>
}
emitAll(newEvents)
}


/**
* Extension function - Handles the command message of type [C] by the snapshotting, orchestrating event sourced aggregate
*
*
* @param command Command message of type [C]
* @return State of type [S]
*
* @author Иван Дугалић / Ivan Dugalic / @idugalic
*/
@ExperimentalCoroutinesApi
fun <C, S, E, I> I.handle(command: C): Flow<E> where I : StateOrchestratingComputation<C, S, E>,
I : EventOrchestratingComputation<C, S, E>,
I : StateRepository<C, S>,
I : EventSnapshottingRepository<C, S, E> =
flow {
// 1. Fetch the latest state snapshot or NULL
val latestSnapshotState = command.fetchState()
// 2. Fetch the latest events, since the latest state snapshot
val latestEvents = command.fetchEvents(latestSnapshotState).toList()
// 3. Compute the current state, based on the latest state snapshot and the latest events
val currentState = latestEvents.fold(latestSnapshotState ?: initialState) { s, e -> evolve(s, e) }
// 4. Compute the new events, based on the latest events, latest snapshot state and the command, and save it
val newEvents = latestEvents.asFlow()
.computeNewEventsByOrchestrating(command, latestSnapshotState) { it.fetchEvents(latestSnapshotState) }
.save()
// 5. Compute the new state, based on the current state and the command and save it conditionally
with(currentState.computeNewState(command)) {
if (shouldCreateNewSnapshot(latestSnapshotState)) {
save()
}
}
emitAll(newEvents)
}

/**
* Extension function - Handles the command message of type [C] by the snapshotting, locking, orchestrating event sourced aggregate, optimistically
*
* @param command Command message of type [C]
* @return State of type [Pair]<[S], [V]>, in which [V] is the type of the Version (optimistic locking)
*
* @author Иван Дугалић / Ivan Dugalic / @idugalic
*/
@ExperimentalCoroutinesApi
suspend fun <C, S, E, V, I> I.handleOptimistically(command: C): Flow<Pair<E, V>> where I : StateOrchestratingComputation<C, S, E>,
I : EventOrchestratingComputation<C, S, E>,
I : StateLockingRepository<C, S, V>,
I : EventSnapshottingLockingRepository<C, S, E, V> =
flow {
// 1. Fetch the latest state snapshot or NULL
val (latestSnapshotState, latestSnapshotVersion) = command.fetchState()
// 2. Fetch the latest events, since the latest state snapshot
val latestEvents = command.fetchEvents(Pair(latestSnapshotState, latestSnapshotVersion)).toList()
// 3. Compute the current state, based on the latest state snapshot and the latest events
val currentState = latestEvents.fold(latestSnapshotState ?: initialState) { s, e -> evolve(s, e.first) }
// 4. Get the latest event version
val latestEventVersion = latestEvents.map { it.second }.lastOrNull()
// 5. Compute the new events, based on the latest events, latest snapshot state and the command, and save it
val newEvents = latestEvents.asFlow()
.map { it.first }
.computeNewEventsByOrchestrating(command, latestSnapshotState) {
it.fetchEvents(
Pair(
latestSnapshotState,
latestSnapshotVersion
)
).map { pair -> pair.first }
}
.save(latestEventVersion)
// 6. Get the new snapshot version = the last/latest event version
val newSnapshotVersion = newEvents.map { it.second }.lastOrNull()
// 7. Compute the new state, based on the current state and the command and save it conditionally
with(currentState.computeNewState(command)) {
if (shouldCreateNewSnapshot(
latestSnapshotState,
latestSnapshotVersion,
newSnapshotVersion
)
)
save(latestSnapshotVersion, newSnapshotVersion)
}
emitAll(newEvents)
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,26 @@ interface EventComputation<C, S, E> : IDecider<C, S, E> {
*/
interface EventOrchestratingComputation<C, S, E> : ISaga<E, C>, IDecider<C, S, E> {
@ExperimentalCoroutinesApi
fun Flow<E>.computeNewEventsByOrchestrating(command: C, fetchEvents: (C) -> Flow<E>): Flow<E> = flow {
val currentState = fold(initialState) { s, e -> evolve(s, e) }
var resultingEvents = decide(command, currentState)
fun Flow<E>.computeNewEventsByOrchestrating(command: C, fetchEvents: (C) -> Flow<E>): Flow<E> =
computeNewEventsByOrchestrating(command, initialState, fetchEvents)

resultingEvents
.flatMapConcat { react(it) }
.onEach { c ->
val newEvents = flowOf(fetchEvents(c), resultingEvents)
.flattenConcat()
.computeNewEventsByOrchestrating(c, fetchEvents)
resultingEvents = flowOf(resultingEvents, newEvents).flattenConcat()
}.collect()
@ExperimentalCoroutinesApi
fun Flow<E>.computeNewEventsByOrchestrating(command: C, latestSnapshot: S?, fetchEvents: (C) -> Flow<E>): Flow<E> =
flow {
val currentState = fold(latestSnapshot ?: initialState) { s, e -> evolve(s, e) }
var resultingEvents = decide(command, currentState)

emitAll(resultingEvents)
}
resultingEvents
.flatMapConcat { react(it) }
.onEach { c ->
val newEvents = flowOf(fetchEvents(c), resultingEvents)
.flattenConcat()
.computeNewEventsByOrchestrating(c, latestSnapshot, fetchEvents)
resultingEvents = flowOf(resultingEvents, newEvents).flattenConcat()
}.collect()

emitAll(resultingEvents)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.fraktalio.fmodel.application

import com.fraktalio.fmodel.domain.IDecider
import com.fraktalio.fmodel.domain.ISaga


/**
Expand Down Expand Up @@ -69,6 +70,59 @@ interface EventSourcingSnapshottingLockingAggregate<C, S, E, V> :
EventSnapshottingLockingRepository<C, S, E, V>,
StateLockingRepository<C, S, V>

/**
* Event sourced, snapshotting, orchestrating aggregate is using/delegating a `decider` of type [IDecider]<[C], [S], [E]> / [EventOrchestratingComputation]<[C], [S], [E]> to handle commands and produce events.
* In order to handle the command, aggregate needs to fetch the current state (represented as a list of events) via [EventSnapshottingRepository.fetchEvents] function, and then delegate the command to the `decider` which can produce new event(s) as a result.
* If the `decider` is combined out of many deciders via `combine` function, an optional `saga` of type [ISaga] could be used to react on new events and send new commands to the 'decider` recursively, in single transaction.
* This behaviour is formalized in [EventOrchestratingComputation].
* Produced events are then stored via [EventSnapshottingRepository.save] suspending function.
*
* Additionally, Event sourcing aggregate enables `snapshotting` mechanism by using [StateOrchestratingComputation] and [StateRepository] interfaces to store and fetch the current state of the aggregate from time to time, removing the need to always fetch the full list of events.
*
* [EventSourcingSnapshottingOrchestratingAggregate] extends [EventOrchestratingComputation], [StateOrchestratingComputation], [StateRepository] and [EventSnapshottingRepository] interfaces,
* clearly communicating that it is composed out of these behaviours.
*
* @param C Commands of type [C] that this aggregate can handle
* @param S Aggregate state of type [S]
* @param E Events of type [E] that this aggregate can publish
*
* @author Иван Дугалић / Ivan Dugalic / @idugalic
*/
interface EventSourcingSnapshottingOrchestratingAggregate<C, S, E> :
EventOrchestratingComputation<C, S, E>,
StateOrchestratingComputation<C, S, E>,
EventSnapshottingRepository<C, S, E>,
StateRepository<C, S>

/**
* Event sourced, snapshotting, orchestrating and locking aggregate is using/delegating a `decider` of type [IDecider]<[C], [S], [E]> / [EventOrchestratingComputation]<[C], [S], [E]> to handle commands and produce events.
* In order to handle the command, aggregate needs to fetch the current state (represented as a list of events) via [EventSnapshottingLockingRepository.fetchEvents] function, and then delegate the command to the `decider` which can produce new event(s) as a result.
* If the `decider` is combined out of many deciders via `combine` function, an optional `saga` of type [ISaga]/[EventOrchestratingComputation] could be used to react on new events and send new commands to the 'decider` recursively, in single transaction.
* This behaviour is formalized in [EventOrchestratingComputation].
* Produced events are then stored via [EventSnapshottingLockingRepository.save] suspending function.
*
* Locking Orchestrating Event sourcing aggregate enables `optimistic locking` mechanism more explicitly.
* If you fetch events from a storage, the application records the `version` number of that event stream.
* You can append new events, but only if the `version` number in the storage has not changed.
* If there is a `version` mismatch, it means that someone else has added the event(s) before you did.
*
* [EventSourcingSnapshottingLockingOrchestratingAggregate] extends [EventOrchestratingComputation], [StateOrchestratingComputation], [StateLockingRepository] and [EventLockingRepository] interfaces,
* clearly communicating that it is composed out of these behaviours.
*
* @param C Commands of type [C] that this aggregate can handle
* @param S Aggregate state of type [S]
* @param E Events of type [E] that this aggregate can publish
* @param V Version
*
* @author Иван Дугалић / Ivan Dugalic / @idugalic
*/
interface EventSourcingSnapshottingLockingOrchestratingAggregate<C, S, E, V> :
EventOrchestratingComputation<C, S, E>,
StateOrchestratingComputation<C, S, E>,
EventSnapshottingLockingRepository<C, S, E, V>,
StateLockingRepository<C, S, V>


/**
* Event Sourced Snapshotting aggregate constructor-like function.
*
Expand Down Expand Up @@ -118,3 +172,58 @@ fun <C, S, E, V> EventSourcingSnapshottingLockingAggregate(
EventSnapshottingLockingRepository<C, S, E, V> by eventRepository,
StateLockingRepository<C, S, V> by stateRepository,
IDecider<C, S, E> by decider {}

/**
* Event Sourced Snapshotting Orchestrating aggregate constructor-like function.
*
* The Delegation pattern has proven to be a good alternative to implementation inheritance, and Kotlin supports it natively requiring zero boilerplate code.
*
* @param C Commands of type [C] that this aggregate can handle
* @param S Aggregate state of type [S]
* @param E Events of type [E] that are used internally to build/fold new state
* @param decider A decider component of type [IDecider]<[C], [S], [E]>
* @param saga A saga component of type [ISaga]<[E], [C]>
* @param eventRepository An aggregate event repository of type [EventSnapshottingRepository]<[C], [S], [E]>
* @return An object/instance of type [EventSourcingSnapshottingOrchestratingAggregate]<[C], [S], [E]>
*
* @author Иван Дугалић / Ivan Dugalic / @idugalic
*/
fun <C, S, E> EventSourcingSnapshottingOrchestratingAggregate(
decider: IDecider<C, S, E>,
eventRepository: EventSnapshottingRepository<C, S, E>,
stateRepository: StateRepository<C, S>,
saga: ISaga<E, C>
): EventSourcingSnapshottingOrchestratingAggregate<C, S, E> =
object : EventSourcingSnapshottingOrchestratingAggregate<C, S, E>,
EventSnapshottingRepository<C, S, E> by eventRepository,
StateRepository<C, S> by stateRepository,
IDecider<C, S, E> by decider,
ISaga<E, C> by saga {}

/**
* Event Sourced Snapshotting Locking Orchestrating aggregate constructor-like function.
*
* The Delegation pattern has proven to be a good alternative to implementation inheritance, and Kotlin supports it natively requiring zero boilerplate code.
*
* @param C Commands of type [C] that this aggregate can handle
* @param S Aggregate state of type [S]
* @param E Events of type [E] that are used internally to build/fold new state
* @param V Version
* @param decider A decider component of type [IDecider]<[C], [S], [E]>
* @param saga A saga component of type [ISaga]<[E], [C]>
* @param eventRepository An aggregate event repository of type [EventSnapshottingLockingRepository]<[C], [S], [E], [V]>
* @return An object/instance of type [EventSourcingSnapshottingLockingOrchestratingAggregate]<[C], [S], [E], [V]>
*
* @author Иван Дугалић / Ivan Dugalic / @idugalic
*/
fun <C, S, E, V> EventSourcingSnapshottingLockingOrchestratingAggregate(
decider: IDecider<C, S, E>,
eventRepository: EventSnapshottingLockingRepository<C, S, E, V>,
stateRepository: StateLockingRepository<C, S, V>,
saga: ISaga<E, C>
): EventSourcingSnapshottingLockingOrchestratingAggregate<C, S, E, V> =
object : EventSourcingSnapshottingLockingOrchestratingAggregate<C, S, E, V>,
EventSnapshottingLockingRepository<C, S, E, V> by eventRepository,
StateLockingRepository<C, S, V> by stateRepository,
IDecider<C, S, E> by decider,
ISaga<E, C> by saga {}

0 comments on commit c82e393

Please sign in to comment.