From 9eba26faf74a31864817a001a34b2f6c4e523dea Mon Sep 17 00:00:00 2001 From: Benjamin CAVY Date: Thu, 29 Jul 2021 14:41:14 +0200 Subject: [PATCH] doc: projection initialisation --- docs/manual/standard/projections.html | 30 ++++++++ .../configuration/BankConfiguration.java | 15 ++-- .../GlobalBalanceProjection.java | 32 ++++++++ .../sample/ThothSampleApplicationTests.java | 77 +++++++++++++++++++ .../src/main/paradox/standard/projections.md | 39 ++++++++++ 5 files changed, 187 insertions(+), 6 deletions(-) diff --git a/docs/manual/standard/projections.html b/docs/manual/standard/projections.html index 3be40306..66a75400 100644 --- a/docs/manual/standard/projections.html +++ b/docs/manual/standard/projections.html @@ -217,6 +217,36 @@

+

Catch up past events

+

In some cases, the projection will be created while events already exist in the journal. Sometimes these pre-existing events should be added to the projection.

+

To “catch up” with past events, you need to stream journal content:

+
eventStore.loadAllEvents()
+    .map(enveloppe -> enveloppe.event)
+    .filter(event -> event instanceof BankEvent.MoneyDeposited || event instanceof BankEvent.MoneyWithdrawn)
+    .mapAsync(1, event ->
+        CompletableFuture.supplyAsync(() -> {
+            try {
+                if(event instanceof BankEvent.MoneyDeposited deposit) {
+                    String statement = "UPDATE global_balance SET balance=balance+?::money";
+                    try(PreparedStatement preparedStatement = connection.prepareStatement(statement)) {
+                        preparedStatement.setBigDecimal(1, deposit.amount);
+                        preparedStatement.execute();
+                    }
+                } else if(event instanceof BankEvent.MoneyWithdrawn withdraw) {
+                    String statement = "UPDATE global_balance SET balance=balance-?::money";
+                    try(PreparedStatement preparedStatement = connection.prepareStatement(statement)) {
+                        preparedStatement.setBigDecimal(1, withdraw.amount);
+                        preparedStatement.execute();
+                    }
+                }
+                return Tuple.empty();
+            } catch(SQLException ex) {
+                throw new RuntimeException(ex);
+            }
+        })
+    ).run(actorSystem)
+
+

⚠️ This code should be run while the system is not receiving events, otherwise there is a risk of double consumption. To fix this one solution would be to store consumed event id or sequence num and compare them with incoming events.

Eventually consistent projections

Sometimes projections are too costly to be updated in transaction, sometimes we don’t need real time update.

In these case we could build “eventually consistent” projections, by connecting to our “bank” topic in Kafka, and consuming events from there.

diff --git a/sample/src/main/java/fr/maif/thoth/sample/configuration/BankConfiguration.java b/sample/src/main/java/fr/maif/thoth/sample/configuration/BankConfiguration.java index 64ba0d2f..fea9b79b 100644 --- a/sample/src/main/java/fr/maif/thoth/sample/configuration/BankConfiguration.java +++ b/sample/src/main/java/fr/maif/thoth/sample/configuration/BankConfiguration.java @@ -133,20 +133,19 @@ public ActorSystem actorSystem() { return ActorSystem.create(); } - @Bean - public GlobalBalanceProjection dailyTotalWithdrawProjection() { + @Bean GlobalBalanceProjection globalBalanceProjection() { return new GlobalBalanceProjection(); } @Bean public EventProcessor eventProcessor( ActorSystem actorSystem, - GlobalBalanceProjection globalBalanceProjection, + GlobalBalanceProjection balanceProjection, DataSource dataSource, JacksonEventFormat eventFormat, TableNames tableNames, - ProducerSettings> producerSettings) { - return PostgresKafkaEventProcessor.withActorSystem(actorSystem) + ProducerSettings> producerSettings) throws SQLException { + var processor = PostgresKafkaEventProcessor.withActorSystem(actorSystem) .withDataSource(dataSource) .withTables(tableNames) .withTransactionManager(Executors.newFixedThreadPool(5)) @@ -157,7 +156,11 @@ public EventProcessor { + public CompletableFuture initialize(Connection connection, EventStore eventStore, ActorSystem actorSystem) { + return eventStore.loadAllEvents() + .map(enveloppe -> enveloppe.event) + .filter(event -> event instanceof BankEvent.MoneyDeposited || event instanceof BankEvent.MoneyWithdrawn) + .mapAsync(1, event -> + CompletableFuture.supplyAsync(() -> { + try { + if(event instanceof BankEvent.MoneyDeposited deposit) { + String statement = "UPDATE global_balance SET balance=balance+?::money"; + try(PreparedStatement preparedStatement = connection.prepareStatement(statement)) { + preparedStatement.setBigDecimal(1, deposit.amount); + preparedStatement.execute(); + } + } else if(event instanceof BankEvent.MoneyWithdrawn withdraw) { + String statement = "UPDATE global_balance SET balance=balance-?::money"; + try(PreparedStatement preparedStatement = connection.prepareStatement(statement)) { + preparedStatement.setBigDecimal(1, withdraw.amount); + preparedStatement.execute(); + } + } + return Tuple.empty(); + } catch(SQLException ex) { + throw new RuntimeException(ex); + } + }) + ).run(actorSystem).toCompletableFuture(); + } + @Override public Future storeProjection(Connection connection, List> events) { return Future.of(() -> { diff --git a/sample/src/test/java/fr/maif/thoth/sample/ThothSampleApplicationTests.java b/sample/src/test/java/fr/maif/thoth/sample/ThothSampleApplicationTests.java index 2ca783ad..08f6854a 100644 --- a/sample/src/test/java/fr/maif/thoth/sample/ThothSampleApplicationTests.java +++ b/sample/src/test/java/fr/maif/thoth/sample/ThothSampleApplicationTests.java @@ -5,7 +5,12 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.math.BigDecimal; +import java.sql.Connection; +import java.sql.Date; +import java.sql.PreparedStatement; +import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.SQLType; import java.time.Duration; import java.time.LocalDate; import java.util.ArrayList; @@ -56,15 +61,20 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; +import akka.actor.ActorSystem; import fr.maif.eventsourcing.EventEnvelope; +import fr.maif.eventsourcing.EventProcessor; import fr.maif.eventsourcing.format.JacksonSimpleFormat; import fr.maif.json.EventEnvelopeJson; import fr.maif.thoth.sample.api.AccountDTO; import fr.maif.thoth.sample.api.BalanceDTO; import fr.maif.thoth.sample.api.TransferDTO; import fr.maif.thoth.sample.api.TransferResultDTO; +import fr.maif.thoth.sample.commands.BankCommand; import fr.maif.thoth.sample.events.BankEvent; import fr.maif.thoth.sample.events.BankEventFormat; +import fr.maif.thoth.sample.projections.transactional.GlobalBalanceProjection; +import fr.maif.thoth.sample.state.Account; import io.vavr.Tuple; import io.vavr.Tuple0; import io.vavr.Tuple2; @@ -82,6 +92,12 @@ class ThothSampleApplicationTests { private ObjectMapper mapper; @Autowired private BankEventFormat eventFormat; + @Autowired + private GlobalBalanceProjection projection; + @Autowired + private EventProcessor eventProcessor; + @Autowired + private ActorSystem actorSystem; @Container private static PostgreSQLContainer postgres = new PostgreSQLContainer<>(DockerImageName.parse("postgres").withTag("11.2")); @@ -336,6 +352,67 @@ void transferShouldFailIfTargetAccountDoesNotExist() { assertThat(result.error).isEqualTo("Account does not exist"); } + @Test + void projectionShouldCatchUpExistingEvents() throws SQLException { + String entityId = "test"; + + registerInJournal(new BankEvent.AccountOpened(entityId)); + registerInJournal(new BankEvent.MoneyDeposited(entityId, new BigDecimal("100"))); + registerInJournal(new BankEvent.MoneyWithdrawn(entityId, new BigDecimal("30"))); + + assertThat(globalBalance().amount).isEqualByComparingTo(new BigDecimal("0")); + + projection.initialize(pgDataSource.getConnection(), eventProcessor.eventStore(), actorSystem).join(); + + assertThat(globalBalance().amount).isEqualByComparingTo(new BigDecimal("70")); + + } + + private void registerInJournal(BankEvent event) throws SQLException { + String eventType; + String json; + if(event instanceof BankEvent.AccountOpened) { + eventType = "AccountOpened"; + json = "{\"accountId\": \"" + event.accountId + "\"}"; + } else if(event instanceof BankEvent.MoneyWithdrawn withdraw) { + eventType = "MoneyWithdrawn"; + json = "{\"amount\": " + withdraw.amount + ", \"accountId\": \"" + event.accountId + "\"}"; + } else if(event instanceof BankEvent.MoneyDeposited deposit) { + eventType = "MoneyDeposited"; + json = "{\"amount\": " + deposit.amount + ", \"accountId\": \"" + event.accountId + "\"}"; + } else if(event instanceof BankEvent.AccountClosed) { + eventType = "AccountClosed"; + json = "{\"accountId\": \"" + event.accountId + "\"}"; + } else { + throw new RuntimeException("Unknown event type " + event.getClass().getSimpleName()); + } + + + try(Connection connection = pgDataSource.getConnection()) { + final ResultSet sequenceResult = connection.prepareStatement("select nextval('bank_sequence_num')").executeQuery(); + sequenceResult.next(); + final long sequenceNum = sequenceResult.getLong("nextval"); + final PreparedStatement statement = connection.prepareStatement(""" + INSERT INTO bank_journal (id, entity_id, sequence_num, event_type, version, transaction_id, event, total_message_in_transaction, num_message_in_transaction, emission_date, published) + VALUES(?, ?, ?, ?, ?, ?, ?::jsonb, ?, ?, ?, ?); + """); + + statement.setObject(1, UUID.randomUUID()); + statement.setString(2, event.entityId()); + statement.setLong(3, sequenceNum); + statement.setString(4, eventType); + statement.setInt(5, 1); + statement.setObject(6, UUID.randomUUID()); + statement.setObject(7, json); + statement.setInt(8, 1); + statement.setInt(9, 1); + statement.setDate(10, new Date(System.currentTimeMillis())); + statement.setBoolean(11, false); + + statement.execute(); + } + } + private AccountDTO readAccount(String id) { diff --git a/thoth-documentation/src/main/paradox/standard/projections.md b/thoth-documentation/src/main/paradox/standard/projections.md index 5bd83bf2..e869a332 100644 --- a/thoth-documentation/src/main/paradox/standard/projections.md +++ b/thoth-documentation/src/main/paradox/standard/projections.md @@ -86,6 +86,45 @@ public class DemoApplication { } ``` + +### Catch up past events + +In some cases, the projection will be created while events already exist in the journal. +Sometimes these pre-existing events should be added to the projection. + +To "catch up" with past events, you need to stream journal content: + +```java +eventStore.loadAllEvents() + .map(enveloppe -> enveloppe.event) + .filter(event -> event instanceof BankEvent.MoneyDeposited || event instanceof BankEvent.MoneyWithdrawn) + .mapAsync(1, event -> + CompletableFuture.supplyAsync(() -> { + try { + if(event instanceof BankEvent.MoneyDeposited deposit) { + String statement = "UPDATE global_balance SET balance=balance+?::money"; + try(PreparedStatement preparedStatement = connection.prepareStatement(statement)) { + preparedStatement.setBigDecimal(1, deposit.amount); + preparedStatement.execute(); + } + } else if(event instanceof BankEvent.MoneyWithdrawn withdraw) { + String statement = "UPDATE global_balance SET balance=balance-?::money"; + try(PreparedStatement preparedStatement = connection.prepareStatement(statement)) { + preparedStatement.setBigDecimal(1, withdraw.amount); + preparedStatement.execute(); + } + } + return Tuple.empty(); + } catch(SQLException ex) { + throw new RuntimeException(ex); + } + }) + ).run(actorSystem) +``` + +⚠️ This code should be run while the system is not receiving events, otherwise there is a risk of double consumption. +To fix this one solution would be to store consumed event id or sequence num and compare them with incoming events. + ## Eventually consistent projections Sometimes projections are too costly to be updated in transaction, sometimes we don't need real time update.