diff --git a/thoth-jooq/src/test/java/fr/maif/eventsourcing/impl/JooqKafkaTckImplementation.java b/thoth-jooq/src/test/java/fr/maif/eventsourcing/impl/JooqKafkaTckImplementation.java index 7b46d49f..10552c46 100644 --- a/thoth-jooq/src/test/java/fr/maif/eventsourcing/impl/JooqKafkaTckImplementation.java +++ b/thoth-jooq/src/test/java/fr/maif/eventsourcing/impl/JooqKafkaTckImplementation.java @@ -16,6 +16,7 @@ import java.util.concurrent.Executors; import fr.maif.eventsourcing.Projection; +import fr.maif.eventsourcing.datastore.TestConsistentProjection; import fr.maif.eventsourcing.datastore.TestProjection; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -101,6 +102,7 @@ public void initClass() { postgres.start(); kafka = new KafkaContainer(); kafka.start(); + consistentProjection = new TestConsistentProjection(actorSystem,kafka.getBootstrapServers(),eventFormat,dataSource); } @@ -238,6 +240,11 @@ public Integer readProjection() { return ((TestProjection)this.testProjection).getCount(); } + @Override + public Integer readConsistentProjection() { + return consistentProjection.getCount(); + } + private static Optional getEndOffsetIfNotReached(String topic, String kafkaServers, String groupId) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers); diff --git a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerification.java b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerification.java index f3075250..8c313be8 100644 --- a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerification.java +++ b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerification.java @@ -45,6 +45,7 @@ public abstract class DataStoreVerification implements DataStoreVerificat public abstract EventProcessor eventProcessor(String topic); public abstract String kafkaBootstrapUrl(); + protected TestConsistentProjection consistentProjection; @Override @Test public void required_submitValidSingleEventCommandMustWriteEventInDataStore() { @@ -236,6 +237,33 @@ public void required_eventShouldNotBeConsumedByProjectionEvenIfDataBaseIsBroken restartDatabase(); } + @Override + public void required_eventShouldBeConsumedByConsistentProjectionWhenEverythingIsAlright() { + + String topic = randomKafkaTopic(); + consistentProjection.init(topic); + EventProcessor eventProcessor = eventProcessor(topic); + submitValidCommand(eventProcessor, "1"); + sleep(); + + cleanup(eventProcessor); + assertThat(readConsistentProjection()).isEqualTo(1); + } + + @Override + public void required_eventShouldBeConsumedByConsistentProjectionEvenIfBrokerIsDownAtFirst() { + String topic = randomKafkaTopic(); + consistentProjection.init(topic); + EventProcessor eventProcessor = eventProcessor(topic); + shutdownBroker(); + submitValidCommand(eventProcessor, "1"); + sleep(); + restartBroker(); + sleep(); + cleanup(eventProcessor); + assertThat(readConsistentProjection()).isEqualTo(1); + } + private void sleep() { try { Thread.sleep(10000); diff --git a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerificationRules.java b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerificationRules.java index 937fc183..5a3722e6 100644 --- a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerificationRules.java +++ b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerificationRules.java @@ -20,6 +20,7 @@ public interface DataStoreVerificationRules eventProcessor, String id); List> readPublishedEvents(String kafkaBootstrapUrl, String topic); Integer readProjection(); + Integer readConsistentProjection(); void shutdownBroker(); void restartBroker(); void shutdownDatabase(); @@ -43,7 +44,8 @@ public interface DataStoreVerificationRules> readFromDataStore(EventStore eventStore); default void cleanup( diff --git a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestConsistentProjection.java b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestConsistentProjection.java new file mode 100644 index 00000000..772189ee --- /dev/null +++ b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestConsistentProjection.java @@ -0,0 +1,53 @@ +package fr.maif.eventsourcing.datastore; + +import akka.actor.ActorSystem; +import fr.maif.projections.EventuallyConsistentProjection; +import io.vavr.Tuple; +import io.vavr.concurrent.Future; + +import javax.sql.DataSource; +import java.sql.PreparedStatement; +import java.sql.SQLException; + +public class TestConsistentProjection { + + private int counter = 0; + private final ActorSystem actorSystem; + private final String bootstrapServer; + private final TestEventFormat eventFormat; + private final DataSource dataSource; + + public TestConsistentProjection( + ActorSystem actorSystem, + String bootstrapServer, + TestEventFormat eventFormat, + DataSource dataSource) { + this.actorSystem = actorSystem; + this.eventFormat = eventFormat; + this.dataSource = dataSource; + this.bootstrapServer =bootstrapServer; + } + + + public void init(String topic) { + this.counter = 0; + EventuallyConsistentProjection.create( + ActorSystem.create(), + "TestConsistentProjection", + EventuallyConsistentProjection.Config.create(topic, "TestConsistentProjection", bootstrapServer), + eventFormat, + envelope -> + Future.of(() -> { + if (envelope.event instanceof TestEvent.SimpleEvent){ + counter++; + } + return Tuple.empty(); + }) + + ).start(); + } + + public int getCount() { + return counter; + } +} diff --git a/thoth-tck/src/test/java/fr/maif/eventsourcing/datastore/InMemoryDataStoreTest.java b/thoth-tck/src/test/java/fr/maif/eventsourcing/datastore/InMemoryDataStoreTest.java index 097ad5aa..f079c9db 100644 --- a/thoth-tck/src/test/java/fr/maif/eventsourcing/datastore/InMemoryDataStoreTest.java +++ b/thoth-tck/src/test/java/fr/maif/eventsourcing/datastore/InMemoryDataStoreTest.java @@ -4,15 +4,12 @@ import java.util.concurrent.ExecutionException; import java.util.function.Function; -import fr.maif.eventsourcing.Projection; import org.mockito.Mockito; import org.testng.annotations.BeforeMethod; -import akka.actor.ActorSystem; import akka.stream.javadsl.Sink; import fr.maif.eventsourcing.EventEnvelope; import fr.maif.eventsourcing.EventProcessor; -import fr.maif.eventsourcing.EventStore; import fr.maif.eventsourcing.TransactionManager; import fr.maif.eventsourcing.impl.InMemoryEventStore; import io.vavr.Tuple; @@ -54,6 +51,12 @@ public Integer readProjection() { return null; } + @Override + public Integer readConsistentProjection() { + // Not implemented for in memory + return null; + } + @Override public void required_eventShouldBeConsumedByProjectionWhenEverythingIsAlright(){ // Not implemented for in memory @@ -66,17 +69,25 @@ public void required_eventShouldBeConsumedByProjectionEvenIfBrokerIsDownAtFirst( public void required_eventShouldNotBeConsumedByProjectionEvenIfDataBaseIsBroken(){ // Not implemented for in memory } + @Override public void required_commandSubmissionShouldFailIfDatabaseIsNotAvailable() { // Not implemented for in memory } - - @Override public void required_eventShouldBePublishedEventIfBrokerIsDownAtFirst() { // Not implemented for in memory } + @Override + public void required_eventShouldBeConsumedByConsistentProjectionWhenEverythingIsAlright() { + // Not implemented for in memory + } + @Override + public void required_eventShouldBeConsumedByConsistentProjectionEvenIfBrokerIsDownAtFirst() { + // Not implemented for in memory + } + @Override public void shutdownBroker() { throw new RuntimeException("Not implemented for in memory");