diff --git a/.gitignore b/.gitignore index 212f41bf..309a5ba6 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,6 @@ target .bsp/ .DS_Store -.bloop \ No newline at end of file +.bloop +*.iml +.java-version \ No newline at end of file diff --git a/build.sbt b/build.sbt index efcc48dc..c46ba263 100644 --- a/build.sbt +++ b/build.sbt @@ -92,7 +92,7 @@ lazy val `thoth-kafka-goodies` = project ) lazy val `thoth-jooq-async` = project - .dependsOn(`thoth-core`) + .dependsOn(`thoth-core`, `thoth-tck`) .settings( sonatypeRepository := "https://s01.oss.sonatype.org/service/local", sonatypeCredentialHost := "s01.oss.sonatype.org", diff --git a/project/Dependencies.scala b/project/Dependencies.scala index f9c10f4e..0c11490e 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -4,6 +4,6 @@ object Dependencies { val akkaVersion = "2.6.14" val vavrVersion = "0.10.3" val jooqVersion = "3.14.3" - val jooqAsyncVersion = "1.1.0" + val jooqAsyncVersion = "1.1.2" val functionalJsonVersion = "1.0.3" } diff --git a/thoth-core/src/main/java/fr/maif/eventsourcing/impl/InMemoryEventStore.java b/thoth-core/src/main/java/fr/maif/eventsourcing/impl/InMemoryEventStore.java index da043799..39ba2202 100644 --- a/thoth-core/src/main/java/fr/maif/eventsourcing/impl/InMemoryEventStore.java +++ b/thoth-core/src/main/java/fr/maif/eventsourcing/impl/InMemoryEventStore.java @@ -118,9 +118,7 @@ public Source, NotUsed> loadEventsByQuery(Tuple0 @Override public Source, NotUsed> loadEventsByQuery(Query query) { return Source.from(eventStore) - .filter(e -> { - return Option.of(query.entityId).map(id -> id.equals(e.entityId)).getOrElse(true); - }); + .filter(e -> Option.of(query.entityId).map(id -> id.equals(e.entityId)).getOrElse(true)); } @Override diff --git a/thoth-jooq-async/build.sbt b/thoth-jooq-async/build.sbt index 80166bb3..f72274d7 100644 --- a/thoth-jooq-async/build.sbt +++ b/thoth-jooq-async/build.sbt @@ -17,9 +17,16 @@ libraryDependencies ++= Seq( "org.junit.platform" % "junit-platform-commons" % "1.4.2" % Test, "org.junit.jupiter" % "junit-jupiter-engine" % "5.4.2" % Test, "org.junit.vintage" % "junit-vintage-engine" % "5.4.2" % Test, - "net.aichler" % "jupiter-interface" % "0.9.1" % Test + "net.aichler" % "jupiter-interface" % "0.9.1" % Test, + "org.mockito" % "mockito-core" % "2.22.0" % Test, + "org.testng" % "testng" % "6.3" % Test, + "org.testcontainers" % "postgresql" % "1.15.0" % Test, + "org.testcontainers" % "kafka" % "1.15.0" % Test ) + +testNGSuites := Seq(((resourceDirectory in Test).value / "testng.xml").absolutePath) + javacOptions in Compile ++= Seq( "-source", "8", diff --git a/thoth-jooq-async/src/test/java/fr/maif/eventsourcing/JooqAsyncKafkaTCKImplementation.java b/thoth-jooq-async/src/test/java/fr/maif/eventsourcing/JooqAsyncKafkaTCKImplementation.java new file mode 100644 index 00000000..e747e456 --- /dev/null +++ b/thoth-jooq-async/src/test/java/fr/maif/eventsourcing/JooqAsyncKafkaTCKImplementation.java @@ -0,0 +1,347 @@ +package fr.maif.eventsourcing; + +import akka.actor.ActorSystem; +import akka.kafka.ProducerSettings; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import fr.maif.eventsourcing.datastore.DataStoreVerification; +import fr.maif.eventsourcing.datastore.TestCommand; +import fr.maif.eventsourcing.datastore.TestCommandHandler; +import fr.maif.eventsourcing.datastore.TestConsistentProjection; +import fr.maif.eventsourcing.datastore.TestEvent; +import fr.maif.eventsourcing.datastore.TestEventFormat; +import fr.maif.eventsourcing.datastore.TestEventHandler; +import fr.maif.eventsourcing.datastore.TestInstransactionProjection; +import fr.maif.eventsourcing.datastore.TestInstransactionProjectionAsync; +import fr.maif.eventsourcing.datastore.TestState; +import fr.maif.eventsourcing.format.JacksonEventFormat; +import fr.maif.eventsourcing.format.JacksonSimpleFormat; +import fr.maif.jooq.PgAsyncPool; +import fr.maif.jooq.reactive.ReactivePgAsyncPool; +import fr.maif.json.EventEnvelopeJson; +import fr.maif.kafka.JsonSerializer; +import fr.maif.kafka.KafkaSettings; +import io.vavr.Tuple0; +import io.vertx.core.Vertx; +import io.vertx.pgclient.PgConnectOptions; +import io.vertx.pgclient.PgPool; +import io.vertx.sqlclient.PoolOptions; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.jooq.SQLDialect; +import org.jooq.impl.DefaultConfiguration; +import org.postgresql.ds.PGSimpleDataSource; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.utility.DockerImageName; +import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +public class JooqAsyncKafkaTCKImplementation extends DataStoreVerification { + + + public static final String DEFAULT_POSTGRE_TAG = "9.6.12"; + private static final DockerImageName DEFAULT_KAFKA_IMAGE_NAME = DockerImageName.parse("confluentinc/cp-kafka"); + private static final String DEFAULT_KAFKA_TAG = "5.4.3"; + private static final DockerImageName DEFAULT_POSTGRE_IMAGE_NAME = DockerImageName.parse("postgres"); + private final String SCHEMA = "CREATE TABLE IF NOT EXISTS test_journal (\n" + + " id UUID primary key,\n" + + " entity_id varchar(100) not null,\n" + + " sequence_num bigint not null,\n" + + " event_type varchar(100) not null,\n" + + " version int not null,\n" + + " transaction_id varchar(100) not null,\n" + + " event jsonb not null,\n" + + " metadata jsonb,\n" + + " context jsonb,\n" + + " total_message_in_transaction int default 1,\n" + + " num_message_in_transaction int default 1,\n" + + " emission_date timestamp not null default now(),\n" + + " user_id varchar(100),\n" + + " system_id varchar(100),\n" + + " published boolean default false,\n" + + " UNIQUE (entity_id, sequence_num)\n" + + " );\n" + + "CREATE TABLE IF NOT EXISTS test_projection (\n" + + " counter int not null\n" + + " );\n" + + " CREATE SEQUENCE if not exists test_sequence_num;"; + private final String INIT_TABLE_QUERY = "TRUNCATE TABLE test_journal;\n" + + " TRUNCATE TABLE test_projection;\n" + + " INSERT INTO test_projection VALUES(0);\n" ; + + private PGSimpleDataSource dataSource; + private TableNames tableNames; + private TestEventFormat eventFormat; + private PostgreSQLContainer postgres; + private KafkaContainer kafka; + private Projection testProjection; + private PgAsyncPool pgAsyncPool; + private Vertx vertx; + private PgPool pgPool; + + private static Optional getEndOffsetIfNotReached(String topic, String kafkaServers, String groupId) { + Properties properties = new Properties(); + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers); + properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + KafkaConsumer consumer = new KafkaConsumer<>(properties); + PartitionInfo partitionInfo = consumer.partitionsFor("foo").get(0); + TopicPartition topicPartition = new TopicPartition(topic, partitionInfo.partition()); + consumer.assign(Collections.singletonList(topicPartition)); + + long position = consumer.position(topicPartition); + consumer.seekToEnd(Collections.singletonList(topicPartition)); + final long endOffset = consumer.position(topicPartition); + + Optional result = Optional.empty(); + if (endOffset > 0 && endOffset > position) { + result = Optional.of(consumer.position(topicPartition) - 1); + } + + consumer.close(); + return result; + } + + @AfterClass(alwaysRun = true) + public void tearDown() throws InterruptedException { + Thread.sleep(10000); + postgres.stop(); + kafka.stop(); + this.vertx.close(); + this.pgPool.close(); + } + + @BeforeClass(alwaysRun = true) + public void initClass() { + + this.tableNames = new TableNames("test_journal", "test_sequence_num"); + this.eventFormat = new TestEventFormat(); + + postgres = new PostgreSQLContainer(DEFAULT_POSTGRE_IMAGE_NAME.withTag(DEFAULT_POSTGRE_TAG)); + postgres.start(); + + kafka = new KafkaContainer(DEFAULT_KAFKA_IMAGE_NAME.withTag(DEFAULT_KAFKA_TAG)); + kafka.start(); + } + + @BeforeMethod(alwaysRun = true) + public void init() throws SQLException { + this.testProjection = new TestInstransactionProjectionAsync(); + this.dataSource = new PGSimpleDataSource(); + dataSource.setUrl(postgres.getJdbcUrl()); + dataSource.setUser(postgres.getUsername()); + dataSource.setPassword(postgres.getPassword()); + // Override default setting, which wait indefinitely if database is down + dataSource.setLoginTimeout(5); + + dataSource.getConnection().prepareStatement(SCHEMA).execute(); + dataSource.getConnection().prepareStatement(INIT_TABLE_QUERY).execute(); + + this.pgAsyncPool = pgAsyncPool(postgres); + consistentProjection = new TestConsistentProjection(actorSystem, kafka.getBootstrapServers(), eventFormat, dataSource); + } + + private PgAsyncPool pgAsyncPool(PostgreSQLContainer server) { + this.vertx = Vertx.vertx(); + DefaultConfiguration jooqConfig = new DefaultConfiguration(); + jooqConfig.setSQLDialect(SQLDialect.POSTGRES); + + final PgConnectOptions options = new PgConnectOptions() + .setConnectTimeout(10) + .setPort(server.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT)) + .setHost(server.getContainerIpAddress()) + .setDatabase(server.getDatabaseName()) + .setUser(server.getUsername()) + .setPassword(server.getPassword()); + PoolOptions poolOptions = new PoolOptions() + .setMaxSize(50); + this.pgPool = PgPool.pool(vertx, options, poolOptions); + + return new ReactivePgAsyncPool(pgPool, jooqConfig); + } + + @Override + public EventProcessor eventProcessor(String topic) { + + + return ReactivePostgresKafkaEventProcessor + .withSystem(ActorSystem.create()) + .withPgAsyncPool(this.pgAsyncPool) + .withTables(tableNames) + .withTransactionManager() + .withEventFormater(eventFormat) + .withNoMetaFormater() + .withNoContextFormater() + .withKafkaSettings(topic, producerSettings(settings(), new TestEventFormat())) + .withEventHandler(new TestEventHandler()) + .withDefaultAggregateStore() + .withCommandHandler(new TestCommandHandler<>()) + .withProjections(this.testProjection) + .build(); + } + + @Override + public String kafkaBootstrapUrl() { + return kafka.getBootstrapServers(); + } + + @Override + public void shutdownBroker() { + pauseContainer(kafka); + } + + @Override + public void restartBroker() { + unPauseContainer(kafka); + } + + @Override + public void shutdownDatabase() { + pauseContainer(postgres); + } + + @Override + public void restartDatabase() { + unPauseContainer(postgres); + } + + private void pauseContainer(GenericContainer container) { + container.getDockerClient().pauseContainerCmd(container.getContainerId()).exec(); + } + + private void unPauseContainer(GenericContainer container) { + container.getDockerClient().unpauseContainerCmd(container.getContainerId()).exec(); + } + + private KafkaSettings settings() { + return KafkaSettings.newBuilder(kafka.getBootstrapServers()).build(); + } + + private ProducerSettings> producerSettings( + KafkaSettings kafkaSettings, + JacksonEventFormat eventFormat) { + return kafkaSettings.producerSettings(actorSystem, JsonSerializer.of( + eventFormat, + JacksonSimpleFormat.empty(), + JacksonSimpleFormat.empty() + ) + ); + } + + @Override + public List> readPublishedEvents(String kafkaBootstrapUrl, String topic) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + String groupId = "reader-" + UUID.randomUUID(); + Optional maybeLastOffset = getEndOffsetIfNotReached(topic, kafkaBootstrapUrl, groupId); + if (!maybeLastOffset.isPresent()) { + return Collections.emptyList(); + } + long lastOffset = maybeLastOffset.get(); + + Properties props = new Properties(); + props.put("bootstrap.servers", kafkaBootstrapUrl); + props.put("group.id", groupId); + props.put("key.deserializer", + "org.apache.kafka.common.serialization.StringDeserializer"); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + props.put("value.deserializer", + "org.apache.kafka.common.serialization.StringDeserializer"); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + KafkaConsumer consumer = new KafkaConsumer<>(props); + + consumer.subscribe(Collections.singletonList(topic)); + + boolean running = true; + List> envelopes = new ArrayList<>(); + while (running) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); + for (ConsumerRecord record : records) { + final long offset = record.offset(); + if (offset >= lastOffset) { + running = false; + } + envelopes.add(parsEnvelope(record.value())); + } + consumer.commitSync(); + } + consumer.close(); + return envelopes; + } + + @Override + public Integer readProjection() { + try(final ResultSet resultSet = this.dataSource.getConnection() + .prepareStatement("SELECT counter::numeric FROM test_projection LIMIT 1").executeQuery()) { + resultSet.next(); + return resultSet.getInt(1); + } catch (SQLException throwables) { + throwables.printStackTrace(); + } + return null; + } + + @Override + public Integer readConsistentProjection() { + return consistentProjection.getCount(); + } + + public EventEnvelope parsEnvelope(String value) { + try { + ObjectMapper mapper = new ObjectMapper(); + ObjectNode node = (ObjectNode) mapper.readTree(value); + CompletableFuture> future = new CompletableFuture<>(); + EventEnvelopeJson.deserialize( + node, + eventFormat, + JacksonSimpleFormat.empty(), + JacksonSimpleFormat.empty(), + (event, err) -> { + future.completeExceptionally(new RuntimeException(err.toString())); + }, + future::complete + ); + return future.get(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } +} + + + diff --git a/thoth-jooq-async/src/test/resources/base.sql b/thoth-jooq-async/src/test/resources/base.sql new file mode 100644 index 00000000..935d1649 --- /dev/null +++ b/thoth-jooq-async/src/test/resources/base.sql @@ -0,0 +1,26 @@ + +CREATE TABLE IF NOT EXISTS vikings_journal ( + id UUID primary key, + entity_id varchar(100) not null, + sequence_num bigint not null, + event_type varchar(100) not null, + version int not null, + transaction_id varchar(100) not null, + event jsonb not null, + metadata jsonb, + context jsonb, + total_message_in_transaction int default 1, + num_message_in_transaction int default 1, + emission_date timestamp not null default now(), + user_id varchar(100), + system_id varchar(100), + published boolean default false, + UNIQUE (entity_id, sequence_num) +); +CREATE INDEX IF NOT EXISTS vikings_sequence_num_idx ON vikings_journal (sequence_num); +CREATE INDEX IF NOT EXISTS vikings_entity_id_idx ON vikings_journal (entity_id); +CREATE INDEX IF NOT EXISTS vikings_user_id_idx ON vikings_journal (user_id); +CREATE INDEX IF NOT EXISTS vikings_system_id_idx ON vikings_journal (system_id); +CREATE INDEX IF NOT EXISTS vikings_emission_date_idx ON vikings_journal (emission_date); +CREATE SEQUENCE if not exists vikings_sequence_num; +CREATE SEQUENCE if not exists vikings_id; diff --git a/thoth-jooq-async/src/test/resources/testng.xml b/thoth-jooq-async/src/test/resources/testng.xml new file mode 100644 index 00000000..c8b266ec --- /dev/null +++ b/thoth-jooq-async/src/test/resources/testng.xml @@ -0,0 +1,11 @@ + + + + + + + + + + + \ No newline at end of file diff --git a/thoth-jooq/src/test/java/fr/maif/eventsourcing/impl/JooqKafkaTckImplementation.java b/thoth-jooq/src/test/java/fr/maif/eventsourcing/impl/JooqKafkaTckImplementation.java index e9061be9..39432086 100644 --- a/thoth-jooq/src/test/java/fr/maif/eventsourcing/impl/JooqKafkaTckImplementation.java +++ b/thoth-jooq/src/test/java/fr/maif/eventsourcing/impl/JooqKafkaTckImplementation.java @@ -1,49 +1,21 @@ package fr.maif.eventsourcing.impl; -import java.io.IOException; -import java.io.UncheckedIOException; -import java.sql.Connection; -import java.sql.SQLException; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Optional; -import java.util.Properties; -import java.util.UUID; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; - -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.postgresql.ds.PGSimpleDataSource; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.KafkaContainer; -import org.testcontainers.containers.PostgreSQLContainer; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.BeforeMethod; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; - import akka.actor.ActorSystem; import akka.kafka.ProducerSettings; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; import fr.maif.eventsourcing.EventEnvelope; import fr.maif.eventsourcing.EventProcessor; import fr.maif.eventsourcing.PostgresKafkaEventProcessor; +import fr.maif.eventsourcing.Projection; import fr.maif.eventsourcing.datastore.DataStoreVerification; import fr.maif.eventsourcing.datastore.TestCommand; import fr.maif.eventsourcing.datastore.TestCommandHandler; +import fr.maif.eventsourcing.datastore.TestConsistentProjection; import fr.maif.eventsourcing.datastore.TestEvent; import fr.maif.eventsourcing.datastore.TestEventFormat; import fr.maif.eventsourcing.datastore.TestEventHandler; +import fr.maif.eventsourcing.datastore.TestInstransactionProjection; import fr.maif.eventsourcing.datastore.TestState; import fr.maif.eventsourcing.format.JacksonEventFormat; import fr.maif.eventsourcing.format.JacksonSimpleFormat; @@ -51,6 +23,36 @@ import fr.maif.kafka.JsonSerializer; import fr.maif.kafka.KafkaSettings; import io.vavr.Tuple0; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.postgresql.ds.PGSimpleDataSource; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; public class JooqKafkaTckImplementation extends DataStoreVerification { private PGSimpleDataSource dataSource; @@ -58,6 +60,7 @@ public class JooqKafkaTckImplementation extends DataStoreVerification()) - .withNoProjections() + .withProjections(this.testProjection) .build(); @@ -228,6 +238,23 @@ public List> readPublishedEvents(String return envelopes; } + @Override + public Integer readProjection() { + try(final ResultSet resultSet = this.dataSource.getConnection() + .prepareStatement("SELECT counter::numeric FROM test_projection LIMIT 1").executeQuery()) { + resultSet.next(); + return resultSet.getInt(1); + } catch (SQLException throwables) { + throwables.printStackTrace(); + } + return null; + } + + @Override + public Integer readConsistentProjection() { + return consistentProjection.getCount(); + } + private static Optional getEndOffsetIfNotReached(String topic, String kafkaServers, String groupId) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers); diff --git a/thoth-tck/build.sbt b/thoth-tck/build.sbt index b4bc9134..189daaa9 100644 --- a/thoth-tck/build.sbt +++ b/thoth-tck/build.sbt @@ -5,10 +5,11 @@ organization := "fr.maif" name := "thoth-tck" libraryDependencies ++= Seq( - "org.assertj" % "assertj-core" % "3.10.0", - "org.testng" % "testng" % "6.3", - "com.typesafe.akka" %% "akka-stream" % akkaVersion % Test, - "org.mockito" % "mockito-core" % "3.6.28" % Test + "org.assertj" % "assertj-core" % "3.10.0", + "org.testng" % "testng" % "6.3", + "com.typesafe.akka" %% "akka-stream" % akkaVersion % Test, + "org.mockito" % "mockito-core" % "3.6.28" % Test, + "fr.maif" %% "jooq-async-api" % jooqAsyncVersion ) testNGSuites := Seq(((resourceDirectory in Test).value / "testng.xml").absolutePath) diff --git a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerification.java b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerification.java index 4bc7c181..262adc35 100644 --- a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerification.java +++ b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerification.java @@ -6,42 +6,24 @@ import fr.maif.eventsourcing.EventProcessor; import fr.maif.eventsourcing.EventStore; import fr.maif.eventsourcing.ProcessingSuccess; -import fr.maif.eventsourcing.format.JacksonSimpleFormat; -import fr.maif.json.EventEnvelopeJson; import io.vavr.Tuple0; import io.vavr.control.Either; import io.vavr.control.Option; - -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.serialization.StringDeserializer; import org.testng.annotations.Test; -import java.io.IOException; -import java.io.UncheckedIOException; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collections; import java.util.List; -import java.util.Optional; -import java.util.Properties; import java.util.UUID; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; - -public abstract class DataStoreVerification implements DataStoreVerificationRules{ +public abstract class DataStoreVerification implements DataStoreVerificationRules { public ActorSystem actorSystem = ActorSystem.create(); + protected TestConsistentProjection consistentProjection; + public abstract EventProcessor eventProcessor(String topic); + public abstract String kafkaBootstrapUrl(); @Override @@ -168,11 +150,7 @@ public void required_eventShouldBePublishedEventIfBrokerIsDownAtFirst() { submitValidCommand(eventProcessor, "1"); restartBroker(); - try { - Thread.sleep(10000); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + sleep(); List> envelopes = deduplicateOnId(readPublishedEvents(kafkaBootstrapUrl(), topic)); cleanup(eventProcessor); @@ -192,13 +170,79 @@ public void required_commandSubmissionShouldFailIfDatabaseIsNotAvailable() { cleanup(eventProcessor); assertThat(result.isLeft()).isTrue(); - } catch(Throwable ex) { + } catch (Throwable ex) { // implementation should either return an embedded error in either, either throw an exception - }finally { + } finally { restartDatabase(); } } + + @Override + @Test + public void required_eventShouldBeConsumedByProjectionWhenEverythingIsAlright() { + String topic = randomKafkaTopic(); + EventProcessor eventProcessor = eventProcessor(topic); + submitValidCommand(eventProcessor, "1"); + sleep(); + + cleanup(eventProcessor); + assertThat(readProjection()).isEqualTo(1); + } + + @Override + @Test + public void required_eventShouldBeConsumedByProjectionEvenIfBrokerIsDownAtFirst() { + String topic = randomKafkaTopic(); + EventProcessor eventProcessor = eventProcessor(topic); + shutdownBroker(); + submitValidCommand(eventProcessor, "1"); + sleep(); + restartBroker(); + sleep(); + cleanup(eventProcessor); + assertThat(readProjection()).isEqualTo(1); + } + + + + + @Override + public void required_eventShouldBeConsumedByConsistentProjectionWhenEverythingIsAlright() { + + String topic = randomKafkaTopic(); + consistentProjection.init(topic); + EventProcessor eventProcessor = eventProcessor(topic); + submitValidCommand(eventProcessor, "1"); + sleep(); + + cleanup(eventProcessor); + assertThat(readConsistentProjection()).isEqualTo(1); + } + + @Override + public void required_eventShouldBeConsumedByConsistentProjectionEvenIfBrokerIsDownAtFirst() { + String topic = randomKafkaTopic(); + consistentProjection.init(topic); + EventProcessor eventProcessor = eventProcessor(topic); + shutdownBroker(); + submitValidCommand(eventProcessor, "1"); + sleep(); + restartBroker(); + sleep(); + cleanup(eventProcessor); + assertThat(readConsistentProjection()).isEqualTo(1); + } + + private void sleep() { + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Override public Either> submitValidCommand( EventProcessor eventProcessor, diff --git a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerificationRules.java b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerificationRules.java index a6ee48bc..20a932f1 100644 --- a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerificationRules.java +++ b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerificationRules.java @@ -1,5 +1,7 @@ package fr.maif.eventsourcing.datastore; +import java.util.List; + import fr.maif.eventsourcing.Event; import fr.maif.eventsourcing.EventEnvelope; import fr.maif.eventsourcing.EventProcessor; @@ -10,8 +12,6 @@ import io.vavr.control.Either; import io.vavr.control.Option; -import java.util.List; - public interface DataStoreVerificationRules { Either> submitValidCommand(EventProcessor eventProcessor, String id); void submitInvalidCommand(EventProcessor eventProcessor, String id); @@ -19,6 +19,8 @@ public interface DataStoreVerificationRules readState(EventProcessor eventProcessor, String id); void submitDeleteCommand(EventProcessor eventProcessor, String id); List> readPublishedEvents(String kafkaBootstrapUrl, String topic); + Integer readProjection(); + Integer readConsistentProjection(); void shutdownBroker(); void restartBroker(); void shutdownDatabase(); @@ -38,6 +40,11 @@ public interface DataStoreVerificationRules> readFromDataStore(EventStore eventStore); default void cleanup( diff --git a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestConsistentProjection.java b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestConsistentProjection.java new file mode 100644 index 00000000..9f904acb --- /dev/null +++ b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestConsistentProjection.java @@ -0,0 +1,51 @@ +package fr.maif.eventsourcing.datastore; + +import akka.actor.ActorSystem; +import fr.maif.projections.EventuallyConsistentProjection; +import io.vavr.Tuple; +import io.vavr.concurrent.Future; + +import javax.sql.DataSource; + +public class TestConsistentProjection { + + private final ActorSystem actorSystem; + private final String bootstrapServer; + private final TestEventFormat eventFormat; + private final DataSource dataSource; + private int counter = 0; + + public TestConsistentProjection( + ActorSystem actorSystem, + String bootstrapServer, + TestEventFormat eventFormat, + DataSource dataSource) { + this.actorSystem = actorSystem; + this.eventFormat = eventFormat; + this.dataSource = dataSource; + this.bootstrapServer = bootstrapServer; + } + + + public void init(String topic) { + this.counter = 0; + EventuallyConsistentProjection.create( + ActorSystem.create(), + "TestConsistentProjection", + EventuallyConsistentProjection.Config.create(topic, "TestConsistentProjection", bootstrapServer), + eventFormat, + envelope -> + Future.of(() -> { + if (envelope.event instanceof TestEvent.SimpleEvent) { + counter++; + } + return Tuple.empty(); + }) + + ).start(); + } + + public int getCount() { + return counter; + } +} diff --git a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestEvent.java b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestEvent.java index a872c293..e46e8989 100644 --- a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestEvent.java +++ b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestEvent.java @@ -4,13 +4,16 @@ import com.fasterxml.jackson.annotation.JsonProperty; import fr.maif.eventsourcing.Event; import fr.maif.eventsourcing.Type; +import io.vavr.API; public abstract class TestEvent implements Event { public final String id; public static Type SimpleEventV1 = Type.create(SimpleEvent.class, 1L); public static Type DeleteEventV1 = Type.create(DeleteEvent.class, 1L); - + static API.Match.Pattern0 $SimpleEvent() { + return API.Match.Pattern0.of(SimpleEvent.class); + } @Override public String entityId() { return id; diff --git a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestInstransactionProjection.java b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestInstransactionProjection.java index 4efd35b9..27b30c5f 100644 --- a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestInstransactionProjection.java +++ b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestInstransactionProjection.java @@ -1,18 +1,38 @@ package fr.maif.eventsourcing.datastore; -import java.sql.Connection; - -import akka.NotUsed; import fr.maif.eventsourcing.EventEnvelope; import fr.maif.eventsourcing.Projection; +import io.vavr.Tuple; import io.vavr.Tuple0; import io.vavr.collection.List; import io.vavr.concurrent.Future; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; + public class TestInstransactionProjection implements Projection { + + @Override - public Future storeProjection(Connection connection, List> events) { - // TODO écrire des trucs en base - return null; + public Future storeProjection(Connection connection, List> envelopes) { + return Future.of(() -> { + try (PreparedStatement incrementStatement = connection.prepareStatement("UPDATE test_projection SET counter=counter+1")) { + for (EventEnvelope envelope : envelopes) { + if (envelope.event instanceof TestEvent.SimpleEvent) { + incrementStatement.addBatch(); + incrementStatement.executeBatch(); + } + } + return Tuple.empty(); + } catch (SQLException ex) { + throw new RuntimeException("Failed to update projection", ex); + } + + }); + + } + + } diff --git a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestInstransactionProjectionAsync.java b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestInstransactionProjectionAsync.java new file mode 100644 index 00000000..2c5b6f55 --- /dev/null +++ b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestInstransactionProjectionAsync.java @@ -0,0 +1,32 @@ +package fr.maif.eventsourcing.datastore; +import fr.maif.jooq.PgAsyncTransaction; +import fr.maif.eventsourcing.EventEnvelope; +import fr.maif.eventsourcing.Projection; +import io.vavr.API; +import io.vavr.Tuple; +import io.vavr.Tuple0; +import io.vavr.collection.List; +import io.vavr.concurrent.Future; + +import static io.vavr.API.Case; +import static io.vavr.API.Match; +import static io.vavr.PartialFunction.unlift; + + +public class TestInstransactionProjectionAsync implements Projection { + + @Override + public Future storeProjection(PgAsyncTransaction connection, List> envelopes) { + return connection.executeBatch(dsl -> + envelopes.collect(unlift(eventEnvelope -> + Match(eventEnvelope.event).option( + Case(TestEvent.$SimpleEvent(), e -> API.Tuple(eventEnvelope, e)) + ))) + .map(t -> dsl.query("UPDATE test_projection SET counter=counter+1" )) + ).map(__ -> Tuple.empty()); + } + + + + +} diff --git a/thoth-tck/src/test/java/fr/maif/eventsourcing/datastore/InMemoryDataStoreTest.java b/thoth-tck/src/test/java/fr/maif/eventsourcing/datastore/InMemoryDataStoreTest.java index d648ae70..4c94781e 100644 --- a/thoth-tck/src/test/java/fr/maif/eventsourcing/datastore/InMemoryDataStoreTest.java +++ b/thoth-tck/src/test/java/fr/maif/eventsourcing/datastore/InMemoryDataStoreTest.java @@ -1,27 +1,25 @@ package fr.maif.eventsourcing.datastore; -import java.util.List; -import java.util.concurrent.ExecutionException; -import java.util.function.Function; - -import org.mockito.Mockito; -import org.testng.annotations.BeforeMethod; - -import akka.actor.ActorSystem; import akka.stream.javadsl.Sink; import fr.maif.eventsourcing.EventEnvelope; import fr.maif.eventsourcing.EventProcessor; -import fr.maif.eventsourcing.EventStore; import fr.maif.eventsourcing.TransactionManager; import fr.maif.eventsourcing.impl.InMemoryEventStore; import io.vavr.Tuple; import io.vavr.Tuple0; import io.vavr.concurrent.Future; +import org.mockito.Mockito; +import org.testng.annotations.BeforeMethod; + +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.function.Function; public class InMemoryDataStoreTest extends DataStoreVerification { public InMemoryEventStore eventStore; public EventProcessor eventProcessor; + @BeforeMethod(alwaysRun = true) public void init() { this.eventStore = Mockito.spy(InMemoryEventStore.create(actorSystem)); @@ -47,16 +45,47 @@ public List> readPublishedEvents(String } @Override - public void required_commandSubmissionShouldFailIfDatabaseIsNotAvailable() { + public Integer readProjection() { // Not implemented for in memory + return null; } + @Override + public Integer readConsistentProjection() { + // Not implemented for in memory + return null; + } + + @Override + public void required_eventShouldBeConsumedByProjectionWhenEverythingIsAlright() { + // Not implemented for in memory + } + + @Override + public void required_eventShouldBeConsumedByProjectionEvenIfBrokerIsDownAtFirst() { + // Not implemented for in memory + } + + @Override + public void required_commandSubmissionShouldFailIfDatabaseIsNotAvailable() { + // Not implemented for in memory + } @Override public void required_eventShouldBePublishedEventIfBrokerIsDownAtFirst() { // Not implemented for in memory } + @Override + public void required_eventShouldBeConsumedByConsistentProjectionWhenEverythingIsAlright() { + // Not implemented for in memory + } + + @Override + public void required_eventShouldBeConsumedByConsistentProjectionEvenIfBrokerIsDownAtFirst() { + // Not implemented for in memory + } + @Override public void shutdownBroker() { throw new RuntimeException("Not implemented for in memory");