diff --git a/thoth-core-akka/src/main/java/fr/maif/akka/eventsourcing/DefaultAggregateStore.java b/thoth-core-akka/src/main/java/fr/maif/akka/eventsourcing/DefaultAggregateStore.java index 15b00325..b17762f5 100644 --- a/thoth-core-akka/src/main/java/fr/maif/akka/eventsourcing/DefaultAggregateStore.java +++ b/thoth-core-akka/src/main/java/fr/maif/akka/eventsourcing/DefaultAggregateStore.java @@ -19,7 +19,7 @@ public DefaultAggregateStore(EventStore eventStore, Eve } public DefaultAggregateStore(EventStore eventStore, EventHandler eventEventHandler, Materializer materializer, TransactionManager transactionManager) { - super(eventStore, eventEventHandler, transactionManager); + super(eventStore, eventEventHandler, transactionManager, shouldLockEntityForUpdate); this.materializer = materializer; } diff --git a/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/DefaultAggregateStore.java b/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/DefaultAggregateStore.java index 4f95829a..c822f755 100644 --- a/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/DefaultAggregateStore.java +++ b/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/DefaultAggregateStore.java @@ -16,8 +16,8 @@ public class DefaultAggregateStore, E extends Event, Meta, Context, TxCtx> extends AbstractDefaultAggregateStore implements AggregateStore { - public DefaultAggregateStore(EventStore eventStore, EventHandler eventEventHandler, TransactionManager transactionManager) { - super(eventStore, eventEventHandler, transactionManager); + public DefaultAggregateStore(EventStore eventStore, EventHandler eventEventHandler, TransactionManager transactionManager, Boolean shouldLockEntityForUpdate) { + super(eventStore, eventEventHandler, transactionManager, shouldLockEntityForUpdate); } @Override diff --git a/thoth-core/src/main/java/fr/maif/eventsourcing/EventStore.java b/thoth-core/src/main/java/fr/maif/eventsourcing/EventStore.java index 0a46d39e..c4ab91ac 100644 --- a/thoth-core/src/main/java/fr/maif/eventsourcing/EventStore.java +++ b/thoth-core/src/main/java/fr/maif/eventsourcing/EventStore.java @@ -72,6 +72,7 @@ class Query { public final Long sequenceFrom; public final Long sequenceTo; public final Boolean published; + public final Boolean shouldLockEntity; private Query(Query.Builder builder) { this.dateFrom = builder.dateFrom; @@ -83,6 +84,7 @@ private Query(Query.Builder builder) { this.published = builder.published; this.sequenceFrom = builder.sequenceFrom; this.sequenceTo = builder.sequenceTo; + this.shouldLockEntity = builder.shouldLockEntity; } public static Builder builder() { @@ -152,6 +154,7 @@ public static class Builder { Boolean published; Long sequenceFrom; Long sequenceTo; + Boolean shouldLockEntity; public Builder withDateFrom(LocalDateTime dateFrom) { this.dateFrom = dateFrom; @@ -198,6 +201,12 @@ public Builder withSequenceTo(Long sequenceTo) { return this; } + + public Builder withShouldLockEntity(Boolean shouldLockEntity) { + this.shouldLockEntity = shouldLockEntity; + return this; + } + public Query build() { return new Query(this); } diff --git a/thoth-core/src/main/java/fr/maif/eventsourcing/impl/AbstractDefaultAggregateStore.java b/thoth-core/src/main/java/fr/maif/eventsourcing/impl/AbstractDefaultAggregateStore.java index 3f019f1a..41d36a45 100644 --- a/thoth-core/src/main/java/fr/maif/eventsourcing/impl/AbstractDefaultAggregateStore.java +++ b/thoth-core/src/main/java/fr/maif/eventsourcing/impl/AbstractDefaultAggregateStore.java @@ -10,6 +10,7 @@ import io.vavr.control.Option; import org.reactivestreams.Publisher; +import java.util.Objects; import java.util.concurrent.CompletionStage; import java.util.function.BiFunction; @@ -19,10 +20,13 @@ public abstract class AbstractDefaultAggregateStore, E extend private final EventHandler eventEventHandler; private final TransactionManager transactionManager; - public AbstractDefaultAggregateStore(EventStore eventStore, EventHandler eventEventHandler, TransactionManager transactionManager) { + private final Boolean shouldLockEntityForUpdate; + + public AbstractDefaultAggregateStore(EventStore eventStore, EventHandler eventEventHandler, TransactionManager transactionManager, Boolean shouldLockEntityForUpdate) { this.eventStore = eventStore; this.eventEventHandler = eventEventHandler; this.transactionManager = transactionManager; + this.shouldLockEntityForUpdate = Objects.requireNonNullElse(shouldLockEntityForUpdate, false); } @Override @@ -38,9 +42,9 @@ public CompletionStage> getAggregate(TxCtx ctx, String entityId) { EventStore.Query query = mayBeSnapshot.fold( // No snapshot defined, we read all the events - () -> EventStore.Query.builder().withEntityId(entityId).build(), + () -> EventStore.Query.builder().withEntityId(entityId).withShouldLockEntity(this.shouldLockEntityForUpdate).build(), // If a snapshot is defined, we read events from seq num of the snapshot : - s -> EventStore.Query.builder().withSequenceFrom(s.sequenceNum()).withEntityId(entityId).build() + s -> EventStore.Query.builder().withSequenceFrom(s.sequenceNum()).withEntityId(entityId).withShouldLockEntity(this.shouldLockEntityForUpdate).build() ); return fold(this.eventStore.loadEventsByQuery(ctx, query), diff --git a/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java b/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java index ce038118..e744ec85 100644 --- a/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java +++ b/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java @@ -273,10 +273,18 @@ public Publisher> loadEventsByQuery(Tx tx, Query .from(table(this.tableNames.tableName)) .where(clauses.toJavaList()) .orderBy(SEQUENCE_NUM); + if (Objects.nonNull(query.size)) { + if (Boolean.TRUE.equals(query.shouldLockEntity)) { + return queryBuilder.limit(query.size).forUpdate().wait(120); + } return queryBuilder.limit(query.size); } - return queryBuilder; + if (Boolean.TRUE.equals(query.shouldLockEntity)) { + return queryBuilder.forUpdate().wait(120); + } else { + return queryBuilder; + } })).concatMap(this::rsToEnvelope); } diff --git a/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/ReactivePostgresKafkaEventProcessorBuilder.java b/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/ReactivePostgresKafkaEventProcessorBuilder.java index 6de716cb..cf1b03c2 100644 --- a/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/ReactivePostgresKafkaEventProcessorBuilder.java +++ b/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/ReactivePostgresKafkaEventProcessorBuilder.java @@ -338,6 +338,10 @@ public BuilderWithAggregateStore withAggregateStore(Aggrega } public BuilderWithAggregateStore withDefaultAggregateStore() { + return this.withDefaultAggregateStore(false); + } + + public BuilderWithAggregateStore withDefaultAggregateStore(boolean shouldLockEntityForUpdate) { return new BuilderWithAggregateStore<>( pgAsyncPool, tableNames, @@ -349,7 +353,7 @@ public BuilderWithAggregateStore withDefaultAggregateStore( concurrentReplayStrategy, eventStore, eventHandler, - new DefaultAggregateStore<>(eventStore, eventHandler, transactionManager)); + new DefaultAggregateStore<>(eventStore, eventHandler, transactionManager, shouldLockEntityForUpdate)); } } diff --git a/thoth-jooq/src/main/java/fr/maif/eventsourcing/PostgresKafkaEventProcessor.java b/thoth-jooq/src/main/java/fr/maif/eventsourcing/PostgresKafkaEventProcessor.java index 1ea7496c..8e319eb8 100644 --- a/thoth-jooq/src/main/java/fr/maif/eventsourcing/PostgresKafkaEventProcessor.java +++ b/thoth-jooq/src/main/java/fr/maif/eventsourcing/PostgresKafkaEventProcessor.java @@ -81,7 +81,7 @@ public PostgresKafkaEventProcessorConfig( metaFormat, contextFormat ); - this.aggregateStore = aggregateStore == null ? new DefaultAggregateStore<>(this.eventStore, eventHandler, transactionManager) : aggregateStore; + this.aggregateStore = aggregateStore == null ? new DefaultAggregateStore<>(this.eventStore, eventHandler, transactionManager, false) : aggregateStore; this.commandHandler = commandHandler; this.eventHandler = eventHandler; this.projections = projections; @@ -154,7 +154,7 @@ public PostgresKafkaEventProcessorConfig( this.eventHandler = eventHandler; this.projections = projections; this.eventPublisher = eventPublisher; - this.aggregateStore = new DefaultAggregateStore<>(this.eventStore, eventHandler, transactionManager); + this.aggregateStore = new DefaultAggregateStore<>(this.eventStore, eventHandler, transactionManager, false); } } } diff --git a/thoth-jooq/src/main/java/fr/maif/eventsourcing/PostgresKafkaEventProcessorBuilder.java b/thoth-jooq/src/main/java/fr/maif/eventsourcing/PostgresKafkaEventProcessorBuilder.java index 5a216503..d68de32c 100644 --- a/thoth-jooq/src/main/java/fr/maif/eventsourcing/PostgresKafkaEventProcessorBuilder.java +++ b/thoth-jooq/src/main/java/fr/maif/eventsourcing/PostgresKafkaEventProcessorBuilder.java @@ -370,6 +370,10 @@ public BuilderWithAggregateStore withAggregateStore(Aggrega } public BuilderWithAggregateStore withDefaultAggregateStore() { + return this.withDefaultAggregateStore(false); + } + + public BuilderWithAggregateStore withDefaultAggregateStore(boolean shouldLockEntityForUpdate) { return new BuilderWithAggregateStore<>( dataSource, @@ -382,7 +386,7 @@ public BuilderWithAggregateStore withDefaultAggregateStore( concurrentReplayStrategy, eventStore, eventHandler, - new DefaultAggregateStore<>(eventStore, eventHandler, transactionManager)); + new DefaultAggregateStore<>(eventStore, eventHandler, transactionManager, shouldLockEntityForUpdate)); } } diff --git a/thoth-jooq/src/main/java/fr/maif/eventsourcing/impl/PostgresEventStore.java b/thoth-jooq/src/main/java/fr/maif/eventsourcing/impl/PostgresEventStore.java index 28462d10..cd8a34bf 100644 --- a/thoth-jooq/src/main/java/fr/maif/eventsourcing/impl/PostgresEventStore.java +++ b/thoth-jooq/src/main/java/fr/maif/eventsourcing/impl/PostgresEventStore.java @@ -307,9 +307,9 @@ public Publisher> loadEventsByQueryWithOptions(C var tmpJooqQuery = DSL.using(tx) .selectFrom(SELECT_CLAUSE + " FROM " + this.tableNames.tableName) .where(clauses.toJavaList()) - .orderBy(field("sequence_num").asc()) - ; - var jooqQuery = Objects.nonNull(query.size) ? tmpJooqQuery.limit(query.size) : tmpJooqQuery; + .orderBy(field("sequence_num").asc()); + var jooqQueryWithSize = Objects.nonNull(query.size) ? tmpJooqQuery.limit(query.size) : tmpJooqQuery; + var jooqQuery = Boolean.TRUE.equals(query.shouldLockEntity) ? jooqQueryWithSize.forUpdate().wait(120) : jooqQueryWithSize; LOGGER.debug("{}", jooqQuery); return Flux.fromStream(() -> jooqQuery.stream().map(r -> rsToEnvelope(r.intoResultSet())))