Skip to content

Commit

Permalink
#202 Introducing snapshotting event sourced aggregate - draft
Browse files Browse the repository at this point in the history
  • Loading branch information
idugalic committed Jun 10, 2023
1 parent 264fbb6 commit b7b7323
Show file tree
Hide file tree
Showing 4 changed files with 237 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright (c) 2023 Fraktalio D.O.O. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.fraktalio.fmodel.application

import kotlinx.coroutines.flow.*

/**
* Extension function - Handles the command message of type [C]
*
* @param command Command message of type [C]
* @return State of type [S]
*
* @author Иван Дугалић / Ivan Dugalic / @idugalic
*/
fun <C, S, E, I> I.handle(command: C): Flow<E> where I : StateComputation<C, S, E>,
I : EventComputation<C, S, E>,
I : StateRepository<C, S>,
I : EventSnapshottingRepository<C, S, E> =
flow {
// 1. Fetch the latest state snapshot or NULL
val latestSnapshotState = command.fetchState()
// 2. Fetch the latest events, since the latest state snapshot
val latestEvents = command.fetchEvents(latestSnapshotState).toList()
// 3. Compute the current state, based on the latest state snapshot and the latest events
val currentState = latestEvents.fold(latestSnapshotState ?: initialState) { s, e -> evolve(s, e) }
// 4. Compute the new events, based on the latest events, latest snapshot state and the command, and save it
val newEvents = latestEvents.asFlow()
.computeNewEvents(command, latestSnapshotState)
.save()
// 5. Compute the new state, based on the current state and the command and save it conditionally
with(currentState.computeNewState(command)) {
if (shouldCreateNewSnapshot(latestSnapshotState)) {
save()
}
}
emitAll(newEvents)
}

/**
* Extension function - Handles the command message of type [C] to the locking state stored aggregate, optimistically
*
* @param command Command message of type [C]
* @return State of type [Pair]<[S], [V]>, in which [V] is the type of the Version (optimistic locking)
*
* @author Иван Дугалић / Ivan Dugalic / @idugalic
*/
suspend fun <C, S, E, V, I> I.handleOptimistically(command: C): Flow<Pair<E, V>> where I : StateComputation<C, S, E>,
I : EventComputation<C, S, E>,
I : StateLockingRepository<C, S, V>,
I : EventSnapshottingLockingRepository<C, S, E, V> =
flow {
// 1. Fetch the latest state snapshot or NULL
val (latestSnapshotState, latestSnapshotVersion) = command.fetchState()
// 2. Fetch the latest events, since the latest state snapshot
val latestEvents = command.fetchEvents(Pair(latestSnapshotState, latestSnapshotVersion)).toList()
// 3. Compute the current state, based on the latest state snapshot and the latest events
val currentState = latestEvents.fold(latestSnapshotState ?: initialState) { s, e -> evolve(s, e.first) }
// 4. Get the latest event version
val latestEventVersion = latestEvents.map { it.second }.lastOrNull()
// 5. Compute the new events, based on the latest events, latest snapshot state and the command, and save it
val newEvents = latestEvents.asFlow()
.map { it.first }
.computeNewEvents(command, latestSnapshotState)
.save(latestEventVersion)
// 6. Get the new snapshot version = the last/latest event version
val newSnapshotVersion = newEvents.map { it.second }.lastOrNull()
// 7. Compute the new state, based on the current state and the command and save it conditionally
with(currentState.computeNewState(command)) {
if (shouldCreateNewSnapshot(
latestSnapshotState,
latestSnapshotVersion,
newSnapshotVersion
)
)
save(latestSnapshotVersion, newSnapshotVersion)
}
emitAll(newEvents)
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,27 @@ interface EventRepository<C, E> {
fun Flow<E>.save(): Flow<E>
}

interface EventSnapshottingRepository<C, S, E> : EventRepository<C, E> {
/**
* Fetch events by/after the latest snapshot
*
* @receiver Command of type [C]
* @param latestSnapshotState Latest snapshot of type [S]
*
* @return [Flow] of Events of type [E]
*/
fun C.fetchEvents(latestSnapshotState: S?): Flow<E>

/**
* Checks if saving of snapshot is needed
*
* @receiver State of type [S]
* @param latestSnapshotState Latest snapshot of type [S]
* @return newly saved State of type [S]
*/
fun S.shouldCreateNewSnapshot(latestSnapshotState: S?): Boolean
}

/**
* A type alias for the version provider/function.
* It provides the Version of the last Event in the stream.
Expand Down Expand Up @@ -99,3 +120,26 @@ interface EventLockingRepository<C, E, V> {
*/
fun Flow<E>.save(latestVersion: V?): Flow<Pair<E, V>>
}

interface EventSnapshottingLockingRepository<C, S, E, V> : EventLockingRepository<C, E, V> {
/**
* Fetch events by/after the latest snapshot
*
* @receiver Command of type [C]
* @param latestSnapshotState Latest snapshot of type [V]
*
* @return [Flow] of Events of type [Pair]<[E], [V]>
*/
fun C.fetchEvents(latestSnapshotState: Pair<S?, V?>): Flow<Pair<E, V>>

/**
* Checks if saving of snapshot is needed
*
* @receiver State of type [S]
* @param latestSnapshotState Latest snapshot of type [S]
* @param latestSnapshotVersion Latest snapshot version of type [V]
* @param newSnapshotVersion New snapshot version of type [V]
* @return newly saved State of type [S]
*/
fun S.shouldCreateNewSnapshot(latestSnapshotState: S?, latestSnapshotVersion: V?, newSnapshotVersion: V?): Boolean
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ import kotlinx.coroutines.flow.*
* `EventComputation` interface formalizes the `Event Computation` algorithm / event sourced system by using a `decider` of type [IDecider]<[C], [S], [E]> to handle commands based on the current events, and produce new events.
*/
interface EventComputation<C, S, E> : IDecider<C, S, E> {
fun Flow<E>.computeNewEvents(command: C): Flow<E> = flow {
val currentState = fold(initialState) { s, e -> evolve(s, e) }
fun Flow<E>.computeNewEvents(command: C): Flow<E> = computeNewEvents(command, initialState)

fun Flow<E>.computeNewEvents(command: C, latestSnapshot: S?): Flow<E> = flow {
val currentState = fold(latestSnapshot ?: initialState) { s, e -> evolve(s, e) }
val resultingEvents = decide(command, currentState)
emitAll(resultingEvents)
}
Expand Down Expand Up @@ -71,6 +73,22 @@ interface EventOrchestratingComputation<C, S, E> : ISaga<E, C>, IDecider<C, S, E
*/
interface EventSourcingAggregate<C, S, E> : EventComputation<C, S, E>, EventRepository<C, E>

/**
* Event sourcing snapshotting aggregate is using/delegating a `decider` of type [IDecider]<[C], [S], [E]>/ [EventComputation]<[C], [S], [E]> to handle commands and produce events.
* In order to handle the command, aggregate needs to fetch the current state (represented as a list of events) via [EventSnapshottingRepository.fetchEvents] function, and then delegate the command to the `decider` which can produce new event(s) as a result.
* Produced events are then stored via [EventSnapshottingRepository.save] suspending function.
*
* [EventSourcingAggregate] extends [EventComputation] and [EventSnapshottingRepository] interfaces,
* clearly communicating that it is composed out of these two behaviours.
*
* @param C Commands of type [C] that this aggregate can handle
* @param S Aggregate state of type [S]
* @param E Events of type [E] that this aggregate can publish
*
* @author Иван Дугалић / Ivan Dugalic / @idugalic
*/
interface EventSourcingSnapshottingAggregate<C, S, E> : EventComputation<C, S, E>, EventSnapshottingRepository<C, S, E>

/**
* Locking Event sourcing aggregate is using/delegating a `decider` of type [IDecider]<[C], [S], [E]>/ [EventComputation]<[C], [S], [E]> to handle commands and produce events.
* In order to handle the command, aggregate needs to fetch the current state (represented as a list of events) via [EventLockingRepository.fetchEvents] function, and then delegate the command to the `decider` which can produce new event(s) as a result.
Expand All @@ -93,6 +111,29 @@ interface EventSourcingAggregate<C, S, E> : EventComputation<C, S, E>, EventRepo
*/
interface EventSourcingLockingAggregate<C, S, E, V> : EventComputation<C, S, E>, EventLockingRepository<C, E, V>

/**
* Locking Event sourcing aggregate is using/delegating a `decider` of type [IDecider]<[C], [S], [E]>/ [EventComputation]<[C], [S], [E]> to handle commands and produce events.
* In order to handle the command, aggregate needs to fetch the current state (represented as a list of events) via [EventSnapshottingLockingRepository.fetchEvents] function, and then delegate the command to the `decider` which can produce new event(s) as a result.
* Produced events are then stored via [EventSnapshottingLockingRepository.save] suspending function.
*
* Locking Event sourcing aggregate enables `optimistic locking` mechanism more explicitly.
* If you fetch events from a storage, the application records the `version` number of that event stream.
* You can append new events, but only if the `version` number in the storage has not changed.
* If there is a `version` mismatch, it means that someone else has added the event(s) before you did.
*
* [EventSourcingLockingAggregate] extends [EventComputation] and [EventSnapshottingLockingRepository] interfaces,
* clearly communicating that it is composed out of these two behaviours.
*
* @param C Commands of type [C] that this aggregate can handle
* @param S Aggregate state of type [S]
* @param E Events of type [E] that this aggregate can publish
* @param V Version
*
* @author Иван Дугалић / Ivan Dugalic / @idugalic
*/
interface EventSourcingSnapshottingLockingAggregate<C, S, E, V> : EventComputation<C, S, E>,
EventSnapshottingLockingRepository<C, S, E, V>

/**
* Orchestrating Event sourcing aggregate is using/delegating a `decider` of type [IDecider]<[C], [S], [E]> to handle commands and produce events.
* In order to handle the command, aggregate needs to fetch the current state (represented as a list of events) via [EventRepository.fetchEvents] function, and then delegate the command to the `decider` which can produce new event(s) as a result.
Expand Down Expand Up @@ -185,6 +226,28 @@ fun <C, S, E> EventSourcingAggregate(
EventRepository<C, E> by eventRepository,
IDecider<C, S, E> by decider {}

/**
* Event Sourced Snapshotting aggregate constructor-like function.
*
* The Delegation pattern has proven to be a good alternative to implementation inheritance, and Kotlin supports it natively requiring zero boilerplate code.
*
* @param C Commands of type [C] that this aggregate can handle
* @param S Aggregate state of type [S]
* @param E Events of type [E] that are used internally to build/fold new state
* @param decider A decider component of type [IDecider]<[C], [S], [E]>
* @param eventRepository An aggregate event repository of type [EventSnapshottingRepository]<[C], [S], [E]>
* @return An object/instance of type [EventSourcingSnapshottingAggregate]<[C], [S], [E]>
*
* @author Иван Дугалић / Ivan Dugalic / @idugalic
*/
fun <C, S, E> EventSourcingSnapshottingAggregate(
decider: IDecider<C, S, E>,
eventRepository: EventSnapshottingRepository<C, S, E>
): EventSourcingAggregate<C, S, E> =
object : EventSourcingAggregate<C, S, E>,
EventSnapshottingRepository<C, S, E> by eventRepository,
IDecider<C, S, E> by decider {}

/**
* Event Sourced Locking aggregate factory function.
*
Expand Down Expand Up @@ -235,6 +298,29 @@ fun <C, S, E, V> EventSourcingLockingAggregate(
EventLockingRepository<C, E, V> by eventRepository,
IDecider<C, S, E> by decider {}

/**
* Event Sourced Snapshotting and Locking aggregate constructor-like function.
*
* The Delegation pattern has proven to be a good alternative to implementation inheritance, and Kotlin supports it natively requiring zero boilerplate code.
*
* @param C Commands of type [C] that this aggregate can handle
* @param S Aggregate state of type [S]
* @param E Events of type [E] that are used internally to build/fold new state
* @param V Version
* @param decider A decider component of type [IDecider]<[C], [S], [E]>
* @param eventRepository An aggregate event repository of type [EventSnapshottingLockingRepository]<[C], [S], [E], [V]>
* @return An object/instance of type [EventSourcingSnapshottingLockingAggregate]<[C], [S], [E], [V]>
*
* @author Иван Дугалић / Ivan Dugalic / @idugalic
*/
fun <C, S, E, V> EventSourcingSnapshottingLockingAggregate(
decider: IDecider<C, S, E>,
eventRepository: EventSnapshottingLockingRepository<C, S, E, V>
): EventSourcingSnapshottingLockingAggregate<C, S, E, V> =
object : EventSourcingSnapshottingLockingAggregate<C, S, E, V>,
EventSnapshottingLockingRepository<C, S, E, V> by eventRepository,
IDecider<C, S, E> by decider {}

/**
* Event Sourced Orchestrating aggregate factory function.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,17 @@ interface StateLockingRepository<C, S, V> {
* @return newly saved State of type [Pair]<[S], [V]>
*/
suspend fun S.save(currentStateVersion: V?): Pair<S, V>

/**
*
* Save state
* You can update/save the item/state, but only if the `version` number in the storage has not changed.
*
* @receiver State/[S]
* @param currentStateVersion The current version of the state
* @param newStateVersion The new version of the state
* @return newly saved State of type [Pair]<[S], [V]>
*/
suspend fun S.save(currentStateVersion: V?, newStateVersion: V?): Pair<S, V> =
save(currentStateVersion)
}

0 comments on commit b7b7323

Please sign in to comment.