Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

doc: projection initialisation #42

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions docs/manual/standard/projections.html
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,36 @@ <h3><a href="#usage" name="usage" class="anchor"><span class="anchor-link"></spa
}
}
</code></pre>
<h3><a href="#catch-up-past-events" name="catch-up-past-events" class="anchor"><span class="anchor-link"></span></a>Catch up past events</h3>
<p>In some cases, the projection will be created while events already exist in the journal. Sometimes these pre-existing events should be added to the projection.</p>
<p>To &ldquo;catch up&rdquo; with past events, you need to stream journal content:</p>
<pre class="prettyprint"><code class="language-java">eventStore.loadAllEvents()
.map(enveloppe -&gt; enveloppe.event)
.filter(event -&gt; event instanceof BankEvent.MoneyDeposited || event instanceof BankEvent.MoneyWithdrawn)
.mapAsync(1, event -&gt;
CompletableFuture.supplyAsync(() -&gt; {
try {
if(event instanceof BankEvent.MoneyDeposited deposit) {
String statement = &quot;UPDATE global_balance SET balance=balance+?::money&quot;;
try(PreparedStatement preparedStatement = connection.prepareStatement(statement)) {
preparedStatement.setBigDecimal(1, deposit.amount);
preparedStatement.execute();
}
} else if(event instanceof BankEvent.MoneyWithdrawn withdraw) {
String statement = &quot;UPDATE global_balance SET balance=balance-?::money&quot;;
try(PreparedStatement preparedStatement = connection.prepareStatement(statement)) {
preparedStatement.setBigDecimal(1, withdraw.amount);
preparedStatement.execute();
}
}
return Tuple.empty();
} catch(SQLException ex) {
throw new RuntimeException(ex);
}
})
).run(actorSystem)
</code></pre>
<p>⚠️ This code should be run while the system is not receiving events, otherwise there is a risk of double consumption. To fix this one solution would be to store consumed event id or sequence num and compare them with incoming events.</p>
<h2><a href="#eventually-consistent-projections" name="eventually-consistent-projections" class="anchor"><span class="anchor-link"></span></a>Eventually consistent projections</h2>
<p>Sometimes projections are too costly to be updated in transaction, sometimes we don&rsquo;t need real time update.</p>
<p>In these case we could build &ldquo;eventually consistent&rdquo; projections, by connecting to our &ldquo;bank&rdquo; topic in Kafka, and consuming events from there.</p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,20 +133,19 @@ public ActorSystem actorSystem() {
return ActorSystem.create();
}

@Bean
public GlobalBalanceProjection dailyTotalWithdrawProjection() {
@Bean GlobalBalanceProjection globalBalanceProjection() {
return new GlobalBalanceProjection();
}

@Bean
public EventProcessor<String, Account, BankCommand, BankEvent, Connection, Tuple0, Tuple0, Tuple0> eventProcessor(
ActorSystem actorSystem,
GlobalBalanceProjection globalBalanceProjection,
GlobalBalanceProjection balanceProjection,
DataSource dataSource,
JacksonEventFormat<String, BankEvent> eventFormat,
TableNames tableNames,
ProducerSettings<String, EventEnvelope<BankEvent, Tuple0, Tuple0>> producerSettings) {
return PostgresKafkaEventProcessor.withActorSystem(actorSystem)
ProducerSettings<String, EventEnvelope<BankEvent, Tuple0, Tuple0>> producerSettings) throws SQLException {
var processor = PostgresKafkaEventProcessor.withActorSystem(actorSystem)
.withDataSource(dataSource)
.withTables(tableNames)
.withTransactionManager(Executors.newFixedThreadPool(5))
Expand All @@ -157,7 +156,11 @@ public EventProcessor<String, Account, BankCommand, BankEvent, Connection, Tuple
.withEventHandler(new BankEventHandler())
.withDefaultAggregateStore()
.withCommandHandler(new BankCommandHandler())
.withProjections(globalBalanceProjection)
.withProjections(balanceProjection)
.build();

balanceProjection.initialize(dataSource.getConnection(), processor.eventStore(), actorSystem);

return processor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.concurrent.CompletableFuture;

import akka.Done;
import akka.actor.ActorSystem;
import fr.maif.eventsourcing.EventEnvelope;
import fr.maif.eventsourcing.EventStore;
import fr.maif.eventsourcing.Projection;
import fr.maif.thoth.sample.events.BankEvent;
import io.vavr.Tuple;
Expand All @@ -14,6 +18,34 @@

public class GlobalBalanceProjection implements Projection<Connection, BankEvent, Tuple0, Tuple0> {

public CompletableFuture<Done> initialize(Connection connection, EventStore<Connection, BankEvent, Tuple0, Tuple0> eventStore, ActorSystem actorSystem) {
return eventStore.loadAllEvents()
.map(enveloppe -> enveloppe.event)
.filter(event -> event instanceof BankEvent.MoneyDeposited || event instanceof BankEvent.MoneyWithdrawn)
.mapAsync(1, event ->
CompletableFuture.supplyAsync(() -> {
try {
if(event instanceof BankEvent.MoneyDeposited deposit) {
String statement = "UPDATE global_balance SET balance=balance+?::money";
try(PreparedStatement preparedStatement = connection.prepareStatement(statement)) {
preparedStatement.setBigDecimal(1, deposit.amount);
preparedStatement.execute();
}
} else if(event instanceof BankEvent.MoneyWithdrawn withdraw) {
String statement = "UPDATE global_balance SET balance=balance-?::money";
try(PreparedStatement preparedStatement = connection.prepareStatement(statement)) {
preparedStatement.setBigDecimal(1, withdraw.amount);
preparedStatement.execute();
}
}
return Tuple.empty();
} catch(SQLException ex) {
throw new RuntimeException(ex);
}
})
).run(actorSystem).toCompletableFuture();
}

@Override
public Future<Tuple0> storeProjection(Connection connection, List<EventEnvelope<BankEvent, Tuple0, Tuple0>> events) {
return Future.of(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLType;
import java.time.Duration;
import java.time.LocalDate;
import java.util.ArrayList;
Expand Down Expand Up @@ -56,15 +61,20 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;

import akka.actor.ActorSystem;
import fr.maif.eventsourcing.EventEnvelope;
import fr.maif.eventsourcing.EventProcessor;
import fr.maif.eventsourcing.format.JacksonSimpleFormat;
import fr.maif.json.EventEnvelopeJson;
import fr.maif.thoth.sample.api.AccountDTO;
import fr.maif.thoth.sample.api.BalanceDTO;
import fr.maif.thoth.sample.api.TransferDTO;
import fr.maif.thoth.sample.api.TransferResultDTO;
import fr.maif.thoth.sample.commands.BankCommand;
import fr.maif.thoth.sample.events.BankEvent;
import fr.maif.thoth.sample.events.BankEventFormat;
import fr.maif.thoth.sample.projections.transactional.GlobalBalanceProjection;
import fr.maif.thoth.sample.state.Account;
import io.vavr.Tuple;
import io.vavr.Tuple0;
import io.vavr.Tuple2;
Expand All @@ -82,6 +92,12 @@ class ThothSampleApplicationTests {
private ObjectMapper mapper;
@Autowired
private BankEventFormat eventFormat;
@Autowired
private GlobalBalanceProjection projection;
@Autowired
private EventProcessor<String, Account, BankCommand, BankEvent, Connection, Tuple0, Tuple0, Tuple0> eventProcessor;
@Autowired
private ActorSystem actorSystem;

@Container
private static PostgreSQLContainer<?> postgres = new PostgreSQLContainer<>(DockerImageName.parse("postgres").withTag("11.2"));
Expand Down Expand Up @@ -336,6 +352,67 @@ void transferShouldFailIfTargetAccountDoesNotExist() {
assertThat(result.error).isEqualTo("Account does not exist");
}

@Test
void projectionShouldCatchUpExistingEvents() throws SQLException {
String entityId = "test";

registerInJournal(new BankEvent.AccountOpened(entityId));
registerInJournal(new BankEvent.MoneyDeposited(entityId, new BigDecimal("100")));
registerInJournal(new BankEvent.MoneyWithdrawn(entityId, new BigDecimal("30")));

assertThat(globalBalance().amount).isEqualByComparingTo(new BigDecimal("0"));

projection.initialize(pgDataSource.getConnection(), eventProcessor.eventStore(), actorSystem).join();

assertThat(globalBalance().amount).isEqualByComparingTo(new BigDecimal("70"));

}

private void registerInJournal(BankEvent event) throws SQLException {
String eventType;
String json;
if(event instanceof BankEvent.AccountOpened) {
eventType = "AccountOpened";
json = "{\"accountId\": \"" + event.accountId + "\"}";
} else if(event instanceof BankEvent.MoneyWithdrawn withdraw) {
eventType = "MoneyWithdrawn";
json = "{\"amount\": " + withdraw.amount + ", \"accountId\": \"" + event.accountId + "\"}";
} else if(event instanceof BankEvent.MoneyDeposited deposit) {
eventType = "MoneyDeposited";
json = "{\"amount\": " + deposit.amount + ", \"accountId\": \"" + event.accountId + "\"}";
} else if(event instanceof BankEvent.AccountClosed) {
eventType = "AccountClosed";
json = "{\"accountId\": \"" + event.accountId + "\"}";
} else {
throw new RuntimeException("Unknown event type " + event.getClass().getSimpleName());
}


try(Connection connection = pgDataSource.getConnection()) {
final ResultSet sequenceResult = connection.prepareStatement("select nextval('bank_sequence_num')").executeQuery();
sequenceResult.next();
final long sequenceNum = sequenceResult.getLong("nextval");
final PreparedStatement statement = connection.prepareStatement("""
INSERT INTO bank_journal (id, entity_id, sequence_num, event_type, version, transaction_id, event, total_message_in_transaction, num_message_in_transaction, emission_date, published)
VALUES(?, ?, ?, ?, ?, ?, ?::jsonb, ?, ?, ?, ?);
""");

statement.setObject(1, UUID.randomUUID());
statement.setString(2, event.entityId());
statement.setLong(3, sequenceNum);
statement.setString(4, eventType);
statement.setInt(5, 1);
statement.setObject(6, UUID.randomUUID());
statement.setObject(7, json);
statement.setInt(8, 1);
statement.setInt(9, 1);
statement.setDate(10, new Date(System.currentTimeMillis()));
statement.setBoolean(11, false);

statement.execute();
}
}



private AccountDTO readAccount(String id) {
Expand Down
39 changes: 39 additions & 0 deletions thoth-documentation/src/main/paradox/standard/projections.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,45 @@ public class DemoApplication {
}
```


### Catch up past events

In some cases, the projection will be created while events already exist in the journal.
Sometimes these pre-existing events should be added to the projection.

To "catch up" with past events, you need to stream journal content:

```java
eventStore.loadAllEvents()
.map(enveloppe -> enveloppe.event)
.filter(event -> event instanceof BankEvent.MoneyDeposited || event instanceof BankEvent.MoneyWithdrawn)
.mapAsync(1, event ->
CompletableFuture.supplyAsync(() -> {
try {
if(event instanceof BankEvent.MoneyDeposited deposit) {
String statement = "UPDATE global_balance SET balance=balance+?::money";
try(PreparedStatement preparedStatement = connection.prepareStatement(statement)) {
preparedStatement.setBigDecimal(1, deposit.amount);
preparedStatement.execute();
}
} else if(event instanceof BankEvent.MoneyWithdrawn withdraw) {
String statement = "UPDATE global_balance SET balance=balance-?::money";
try(PreparedStatement preparedStatement = connection.prepareStatement(statement)) {
preparedStatement.setBigDecimal(1, withdraw.amount);
preparedStatement.execute();
}
}
return Tuple.empty();
} catch(SQLException ex) {
throw new RuntimeException(ex);
}
})
).run(actorSystem)
```

⚠️ This code should be run while the system is not receiving events, otherwise there is a risk of double consumption.
To fix this one solution would be to store consumed event id or sequence num and compare them with incoming events.

## Eventually consistent projections

Sometimes projections are too costly to be updated in transaction, sometimes we don't need real time update.
Expand Down