Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Non concurrent commands #48

Draft
wants to merge 10 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions sample/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@ Compile / compileOrder := CompileOrder.JavaThenScala
libraryDependencies ++= Seq(
"org.springframework.boot" % "spring-boot-starter" % "2.4.3",
"org.springframework.boot" % "spring-boot-starter-web" % "2.4.3",
"org.testcontainers" % "postgresql" % "1.15.2" % Test,
"org.testcontainers" % "kafka" % "1.15.2" % Test,
"org.testcontainers" % "junit-jupiter" % "1.15.2" % Test,
"org.testcontainers" % "postgresql" % "1.16.3" % Test,
"org.testcontainers" % "kafka" % "1.16.3" % Test,
"org.testcontainers" % "junit-jupiter" % "1.16.3" % Test,
"org.springframework.boot" % "spring-boot-starter-test" % "2.4.3" % Test,
"org.assertj" % "assertj-core" % "3.19.0" % Test,
"net.aichler" % "jupiter-interface" % "0.9.1" % Test,
"org.junit.platform" % "junit-platform-launcher" % "1.4.2" % Test,
"org.junit.platform" % "junit-platform-commons" % "1.4.2" % Test,
"org.junit.jupiter" % "junit-jupiter-engine" % "5.4.2" % Test,
"org.junit.vintage" % "junit-vintage-engine" % "5.4.2" % Test
"org.junit.jupiter" % "junit-jupiter-engine" % "5.8.2" % Test,
"org.junit.vintage" % "junit-vintage-engine" % "5.8.2" % Test
)

Compile / javacOptions ++= Seq(
Expand Down
6 changes: 3 additions & 3 deletions thoth-core/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ libraryDependencies ++= Seq(
"org.mockito" % "mockito-core" % "2.22.0" % Test,
"org.junit.platform" % "junit-platform-launcher" % "1.4.2" % Test,
"org.junit.platform" % "junit-platform-commons" % "1.4.2" % Test,
"org.junit.jupiter" % "junit-jupiter-engine" % "5.4.2" % Test,
"org.junit.vintage" % "junit-vintage-engine" % "5.4.2" % Test,
"org.junit.jupiter" % "junit-jupiter-engine" % "5.8.2" % Test,
"org.junit.vintage" % "junit-vintage-engine" % "5.8.2" % Test,
"net.aichler" % "jupiter-interface" % "0.9.1" % Test,
"org.scalatest" %% "scalatest" % "3.0.8" % Test,
"org.testcontainers" % "kafka" % "1.15.3" % Test
"org.testcontainers" % "kafka" % "1.16.3" % Test
)

Compile / javacOptions ++= Seq("-source", "8", "-target", "8", "-Xlint:unchecked", "-Xlint:deprecation")
Expand Down
4 changes: 4 additions & 0 deletions thoth-core/src/main/java/fr/maif/eventsourcing/Command.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,8 @@ default Option<String> systemId() {
default Option<String> userId() {
return Option.none();
}

default boolean concurrent() {
return true;
}
}
195 changes: 110 additions & 85 deletions thoth-core/src/main/java/fr/maif/eventsourcing/EventProcessor.java

Large diffs are not rendered by default.

12 changes: 12 additions & 0 deletions thoth-core/src/main/java/fr/maif/eventsourcing/LockManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package fr.maif.eventsourcing;

import io.vavr.Tuple0;
import io.vavr.collection.Set;
import io.vavr.concurrent.Future;

public interface LockManager<TxCtx> {
Future<Tuple0> lock(TxCtx transactionContext, Set<String> entityIds);
default boolean isNoOp() {
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package fr.maif.eventsourcing;

import io.vavr.Tuple;
import io.vavr.Tuple0;
import io.vavr.collection.Set;
import io.vavr.concurrent.Future;

public class NoOpLockManager<TxCtx> implements LockManager<TxCtx>{
@Override
public Future<Tuple0> lock(TxCtx transactionManager, Set<String> entityIds) {
return Future.successful(Tuple.empty());
}

@Override
public boolean isNoOp() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,8 @@ private EventProcessor<String, Viking, VikingCommand, VikingEvent, Tuple0, Strin
new FakeTransactionManager(),
vikingSnapshot,
new VikingCommandHandler(),
new VikingEventHandler()
new VikingEventHandler(),
new NoOpLockManager<>()
);
}

Expand All @@ -247,12 +248,16 @@ private EventProcessor<String, Viking, VikingCommand, VikingEvent, Tuple0, Strin
new FakeTransactionManager(),
new VikingCommandHandler(),
new VikingEventHandler(),
projections
projections,
new NoOpLockManager<>()
);
}


private EventProcessor<String, Viking, VikingCommand, VikingEvent, Tuple0, String, Tuple0, Tuple0> vikingEventProcessorWithSnapshot(InMemoryEventStore<VikingEvent, Tuple0, Tuple0> inMemoryEventStore, AggregateStore<Viking, String, Tuple0> aggregateStore, List<Projection<Tuple0, VikingEvent, Tuple0, Tuple0>> projections) {
private EventProcessor<String, Viking, VikingCommand, VikingEvent, Tuple0, String, Tuple0, Tuple0> vikingEventProcessorWithSnapshot(
InMemoryEventStore<VikingEvent, Tuple0, Tuple0> inMemoryEventStore,
AggregateStore<Viking, String, Tuple0> aggregateStore,
List<Projection<Tuple0, VikingEvent, Tuple0, Tuple0>> projections) {
return new EventProcessor<>(
inMemoryEventStore,
new FakeTransactionManager(),
Expand Down
4 changes: 2 additions & 2 deletions thoth-jooq-async/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ libraryDependencies ++= Seq(
"org.postgresql" % "postgresql" % "42.2.5" % Test,
"org.junit.platform" % "junit-platform-launcher" % "1.4.2" % Test,
"org.junit.platform" % "junit-platform-commons" % "1.4.2" % Test,
"org.junit.jupiter" % "junit-jupiter-engine" % "5.4.2" % Test,
"org.junit.vintage" % "junit-vintage-engine" % "5.4.2" % Test,
"org.junit.jupiter" % "junit-jupiter-engine" % "5.8.2" % Test,
"org.junit.vintage" % "junit-vintage-engine" % "5.8.2" % Test,
"net.aichler" % "jupiter-interface" % "0.9.1" % Test
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package fr.maif.eventsourcing;

import static org.jooq.impl.DSL.field;

import fr.maif.jooq.PgAsyncTransaction;
import io.vavr.Tuple;
import io.vavr.Tuple0;
import io.vavr.collection.Set;
import io.vavr.concurrent.Future;
import org.jooq.Field;

public class AsyncLockManager implements LockManager<PgAsyncTransaction> {
private final static Field<String> ENTITY_ID = field("entity_id", String.class);

private final TableNames tableNames;

public AsyncLockManager(TableNames tableNames) {
this.tableNames = tableNames;
}

@Override
public Future<Tuple0> lock(PgAsyncTransaction transactionContext, Set<String> entityIds) {
if(entityIds.isEmpty()) {
return Future.successful(Tuple.empty());
}
return transactionContext.execute(dslContext ->
dslContext.select().from(tableNames.lockTableName)
.where(ENTITY_ID.in(entityIds.toJavaSet()))
.forUpdate()
).map(__ -> Tuple.empty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ public ReactivePostgresKafkaEventProcessor(PostgresKafkaEventProcessorConfig<Err
config.aggregateStore,
config.commandHandler,
config.eventHandler,
config.projections
config.projections,
config.lockManager
);
this.config = config;
config.eventPublisher.start(config.eventStore, config.concurrentReplayStrategy);
Expand All @@ -53,6 +54,42 @@ public static class PostgresKafkaEventProcessorConfig<Error, S extends State<S>,
public final EventHandler<S, E> eventHandler;
public final List<Projection<PgAsyncTransaction, E, Meta, Context>> projections;
public final KafkaEventPublisher<E, Meta, Context> eventPublisher;
public final LockManager<PgAsyncTransaction> lockManager;


public PostgresKafkaEventProcessorConfig(
ActorSystem system,
TableNames tableNames,
PgAsyncPool pgAsyncPool,
String topic,
ProducerSettings<String, EventEnvelope<E, Meta, Context>> producerSettings,
EventStore.ConcurrentReplayStrategy concurrentReplayStrategy, TransactionManager<PgAsyncTransaction> transactionManager,
AggregateStore<S, String, PgAsyncTransaction> aggregateStore,
CommandHandler<Error, S, C, E, Message, PgAsyncTransaction> commandHandler,
EventHandler<S, E> eventHandler,
List<Projection<PgAsyncTransaction, E, Meta, Context>> projections,
JacksonEventFormat<?, E> eventFormat, JacksonSimpleFormat<Meta> metaFormat,
JacksonSimpleFormat<Context> contextFormat,
Integer eventsBufferSize,
LockManager<PgAsyncTransaction> lockManager) {
this.concurrentReplayStrategy = concurrentReplayStrategy;
this.transactionManager = transactionManager;
this.eventPublisher = new KafkaEventPublisher<>(system, producerSettings, topic, eventsBufferSize);
this.eventStore = new ReactivePostgresEventStore<>(
system,
eventPublisher,
pgAsyncPool,
tableNames,
eventFormat,
metaFormat,
contextFormat
);
this.aggregateStore = aggregateStore == null ? new DefaultAggregateStore<>(this.eventStore, eventHandler, system, transactionManager) : aggregateStore;
this.commandHandler = commandHandler;
this.eventHandler = eventHandler;
this.projections = projections;
this.lockManager = lockManager;
}

public PostgresKafkaEventProcessorConfig(
ActorSystem system,
Expand All @@ -68,23 +105,22 @@ public PostgresKafkaEventProcessorConfig(
JacksonEventFormat<?, E> eventFormat, JacksonSimpleFormat<Meta> metaFormat,
JacksonSimpleFormat<Context> contextFormat,
Integer eventsBufferSize) {
this.concurrentReplayStrategy = concurrentReplayStrategy;
this.transactionManager = transactionManager;
this.eventPublisher = new KafkaEventPublisher<>(system, producerSettings, topic, eventsBufferSize);
this.eventStore = new ReactivePostgresEventStore<>(
system,
eventPublisher,
pgAsyncPool,
tableNames,
eventFormat,
metaFormat,
contextFormat
);
this.aggregateStore = aggregateStore == null ? new DefaultAggregateStore<>(this.eventStore, eventHandler, system, transactionManager) : aggregateStore;
this.commandHandler = commandHandler;
this.eventHandler = eventHandler;
this.projections = projections;

this(system,
tableNames,
pgAsyncPool,
topic,
producerSettings,
concurrentReplayStrategy,
transactionManager,
aggregateStore,
commandHandler,
eventHandler,
projections,
eventFormat,
metaFormat,
contextFormat,
eventsBufferSize,
new NoOpLockManager<>());
}

public PostgresKafkaEventProcessorConfig(
Expand All @@ -100,7 +136,8 @@ public PostgresKafkaEventProcessorConfig(
List<Projection<PgAsyncTransaction, E, Meta, Context>> projections,
JacksonEventFormat<?, E> eventFormat,
JacksonSimpleFormat<Meta> metaFormat,
JacksonSimpleFormat<Context> contextFormat, EventStore.ConcurrentReplayStrategy concurrentReplayStrategy) {
JacksonSimpleFormat<Context> contextFormat,
EventStore.ConcurrentReplayStrategy concurrentReplayStrategy) {
this(system, tableNames, pgAsyncPool, topic, producerSettings, concurrentReplayStrategy, transactionManager, aggregateStore, commandHandler, eventHandler, projections, eventFormat, metaFormat, contextFormat, null);
}

Expand All @@ -117,7 +154,8 @@ public PostgresKafkaEventProcessorConfig(
JacksonEventFormat<?, E> eventFormat,
JacksonSimpleFormat<Meta> metaFormat,
JacksonSimpleFormat<Context> contextFormat,
Integer eventsBufferSize, EventStore.ConcurrentReplayStrategy concurrentReplayStrategy) {
Integer eventsBufferSize,
EventStore.ConcurrentReplayStrategy concurrentReplayStrategy) {
this(system, tableNames, pgAsyncPool, topic, producerSettings, concurrentReplayStrategy, transactionManager, null, commandHandler, eventHandler, projections, eventFormat, metaFormat, contextFormat, eventsBufferSize);
}

Expand All @@ -129,6 +167,26 @@ public PostgresKafkaEventProcessorConfig(
EventHandler<S, E> eventHandler,
List<Projection<PgAsyncTransaction, E, Meta, Context>> projections,
KafkaEventPublisher<E, Meta, Context> eventPublisher) {
this(concurrentReplayStrategy,
eventStore,
transactionManager,
aggregateStore,
commandHandler,
eventHandler,
projections,
eventPublisher,
new NoOpLockManager<>());
}

public PostgresKafkaEventProcessorConfig(
EventStore.ConcurrentReplayStrategy concurrentReplayStrategy, ReactivePostgresEventStore<E, Meta, Context> eventStore,
TransactionManager<PgAsyncTransaction> transactionManager,
AggregateStore<S, String, PgAsyncTransaction> aggregateStore,
CommandHandler<Error, S, C, E, Message, PgAsyncTransaction> commandHandler,
EventHandler<S, E> eventHandler,
List<Projection<PgAsyncTransaction, E, Meta, Context>> projections,
KafkaEventPublisher<E, Meta, Context> eventPublisher,
LockManager<PgAsyncTransaction> lockManager) {
this.concurrentReplayStrategy = concurrentReplayStrategy;
this.eventStore = eventStore;
this.transactionManager = transactionManager;
Expand All @@ -137,6 +195,7 @@ public PostgresKafkaEventProcessorConfig(
this.eventHandler = eventHandler;
this.projections = projections;
this.eventPublisher = eventPublisher;
this.lockManager = lockManager;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,17 @@ public class TableNames {

public final String tableName;
public final String sequenceNumName;
public final String lockTableName;

public TableNames(String tableName, String sequenceNumName) {
this.tableName = tableName;
this.sequenceNumName = sequenceNumName;
this.lockTableName = null;
}

public TableNames(String tableName, String sequenceNumName, String lockTableName) {
this.tableName = tableName;
this.sequenceNumName = sequenceNumName;
this.lockTableName = lockTableName;
}
}
8 changes: 4 additions & 4 deletions thoth-jooq/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ libraryDependencies ++= Seq(
"com.h2database" % "h2" % "1.4.197" % Test,
"org.junit.platform" % "junit-platform-launcher" % "1.4.2" % Test,
"org.junit.platform" % "junit-platform-commons" % "1.4.2" % Test,
"org.junit.jupiter" % "junit-jupiter-engine" % "5.4.2" % Test,
"org.junit.vintage" % "junit-vintage-engine" % "5.4.2" % Test,
"org.junit.jupiter" % "junit-jupiter-engine" % "5.8.2" % Test,
"org.junit.vintage" % "junit-vintage-engine" % "5.8.2" % Test,
"net.aichler" % "jupiter-interface" % "0.9.1" % Test,
"org.mockito" % "mockito-core" % "2.22.0" % Test,
"org.testng" % "testng" % "6.3" % Test,
"org.testcontainers" % "postgresql" % "1.15.0" % Test,
"org.testcontainers" % "kafka" % "1.15.0" % Test,
"org.testcontainers" % "postgresql" % "1.16.3" % Test,
"org.testcontainers" % "kafka" % "1.16.3" % Test,
"org.slf4j" % "slf4j-api" % "1.7.30" % Test,
"org.slf4j" % "slf4j-simple" % "1.7.30" % Test
)
Expand Down
Loading