Skip to content

Commit

Permalink
tck async jooq
Browse files Browse the repository at this point in the history
  • Loading branch information
MatthieuMAIF committed Jul 27, 2021
1 parent c65f1a1 commit f6c6ed9
Show file tree
Hide file tree
Showing 13 changed files with 502 additions and 78 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ lazy val `thoth-kafka-goodies` = project
)

lazy val `thoth-jooq-async` = project
.dependsOn(`thoth-core`)
.dependsOn(`thoth-core`, `thoth-tck`)
.settings(
sonatypeRepository := "https://s01.oss.sonatype.org/service/local",
sonatypeCredentialHost := "s01.oss.sonatype.org",
Expand Down
9 changes: 8 additions & 1 deletion thoth-jooq-async/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,16 @@ libraryDependencies ++= Seq(
"org.junit.platform" % "junit-platform-commons" % "1.4.2" % Test,
"org.junit.jupiter" % "junit-jupiter-engine" % "5.4.2" % Test,
"org.junit.vintage" % "junit-vintage-engine" % "5.4.2" % Test,
"net.aichler" % "jupiter-interface" % "0.9.1" % Test
"net.aichler" % "jupiter-interface" % "0.9.1" % Test,
"org.mockito" % "mockito-core" % "2.22.0" % Test,
"org.testng" % "testng" % "6.3" % Test,
"org.testcontainers" % "postgresql" % "1.15.0" % Test,
"org.testcontainers" % "kafka" % "1.15.0" % Test
)


testNGSuites := Seq(((resourceDirectory in Test).value / "testng.xml").absolutePath)

javacOptions in Compile ++= Seq(
"-source",
"8",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,347 @@
package fr.maif.eventsourcing;

import akka.actor.ActorSystem;
import akka.kafka.ProducerSettings;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import fr.maif.eventsourcing.datastore.DataStoreVerification;
import fr.maif.eventsourcing.datastore.TestCommand;
import fr.maif.eventsourcing.datastore.TestCommandHandler;
import fr.maif.eventsourcing.datastore.TestConsistentProjection;
import fr.maif.eventsourcing.datastore.TestEvent;
import fr.maif.eventsourcing.datastore.TestEventFormat;
import fr.maif.eventsourcing.datastore.TestEventHandler;
import fr.maif.eventsourcing.datastore.TestInstransactionProjection;
import fr.maif.eventsourcing.datastore.TestInstransactionProjectionAsync;
import fr.maif.eventsourcing.datastore.TestState;
import fr.maif.eventsourcing.format.JacksonEventFormat;
import fr.maif.eventsourcing.format.JacksonSimpleFormat;
import fr.maif.jooq.PgAsyncPool;
import fr.maif.jooq.reactive.ReactivePgAsyncPool;
import fr.maif.json.EventEnvelopeJson;
import fr.maif.kafka.JsonSerializer;
import fr.maif.kafka.KafkaSettings;
import io.vavr.Tuple0;
import io.vertx.core.Vertx;
import io.vertx.pgclient.PgConnectOptions;
import io.vertx.pgclient.PgPool;
import io.vertx.sqlclient.PoolOptions;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.jooq.SQLDialect;
import org.jooq.impl.DefaultConfiguration;
import org.postgresql.ds.PGSimpleDataSource;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.utility.DockerImageName;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class JooqAsyncKafkaTCKImplementation extends DataStoreVerification<Connection> {


public static final String DEFAULT_POSTGRE_TAG = "9.6.12";
private static final DockerImageName DEFAULT_KAFKA_IMAGE_NAME = DockerImageName.parse("confluentinc/cp-kafka");
private static final String DEFAULT_KAFKA_TAG = "5.4.3";
private static final DockerImageName DEFAULT_POSTGRE_IMAGE_NAME = DockerImageName.parse("postgres");
private final String SCHEMA = "CREATE TABLE IF NOT EXISTS test_journal (\n" +
" id UUID primary key,\n" +
" entity_id varchar(100) not null,\n" +
" sequence_num bigint not null,\n" +
" event_type varchar(100) not null,\n" +
" version int not null,\n" +
" transaction_id varchar(100) not null,\n" +
" event jsonb not null,\n" +
" metadata jsonb,\n" +
" context jsonb,\n" +
" total_message_in_transaction int default 1,\n" +
" num_message_in_transaction int default 1,\n" +
" emission_date timestamp not null default now(),\n" +
" user_id varchar(100),\n" +
" system_id varchar(100),\n" +
" published boolean default false,\n" +
" UNIQUE (entity_id, sequence_num)\n" +
" );\n" +
"CREATE TABLE IF NOT EXISTS test_projection (\n" +
" counter int not null\n" +
" );\n" +
" CREATE SEQUENCE if not exists test_sequence_num;";
private final String INIT_TABLE_QUERY = "TRUNCATE TABLE test_journal;\n" +
" TRUNCATE TABLE test_projection;\n" +
" INSERT INTO test_projection VALUES(0);\n" ;

private PGSimpleDataSource dataSource;
private TableNames tableNames;
private TestEventFormat eventFormat;
private PostgreSQLContainer postgres;
private KafkaContainer kafka;
private Projection testProjection;
private PgAsyncPool pgAsyncPool;
private Vertx vertx;
private PgPool pgPool;

private static Optional<Long> getEndOffsetIfNotReached(String topic, String kafkaServers, String groupId) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<?, ?> consumer = new KafkaConsumer<>(properties);
PartitionInfo partitionInfo = consumer.partitionsFor("foo").get(0);
TopicPartition topicPartition = new TopicPartition(topic, partitionInfo.partition());
consumer.assign(Collections.singletonList(topicPartition));

long position = consumer.position(topicPartition);
consumer.seekToEnd(Collections.singletonList(topicPartition));
final long endOffset = consumer.position(topicPartition);

Optional<Long> result = Optional.empty();
if (endOffset > 0 && endOffset > position) {
result = Optional.of(consumer.position(topicPartition) - 1);
}

consumer.close();
return result;
}

@AfterClass(alwaysRun = true)
public void tearDown() throws InterruptedException {
Thread.sleep(10000);
postgres.stop();
kafka.stop();
this.vertx.close();
this.pgPool.close();
}

@BeforeClass(alwaysRun = true)
public void initClass() {

this.tableNames = new TableNames("test_journal", "test_sequence_num");
this.eventFormat = new TestEventFormat();

postgres = new PostgreSQLContainer(DEFAULT_POSTGRE_IMAGE_NAME.withTag(DEFAULT_POSTGRE_TAG));
postgres.start();

kafka = new KafkaContainer(DEFAULT_KAFKA_IMAGE_NAME.withTag(DEFAULT_KAFKA_TAG));
kafka.start();
consistentProjection = new TestConsistentProjection(actorSystem, kafka.getBootstrapServers(), eventFormat, dataSource);
this.pgAsyncPool = pgAsyncPool(postgres);
}

@BeforeMethod(alwaysRun = true)
public void init() throws SQLException {
this.testProjection = new TestInstransactionProjectionAsync();
this.dataSource = new PGSimpleDataSource();
dataSource.setUrl(postgres.getJdbcUrl());
dataSource.setUser(postgres.getUsername());
dataSource.setPassword(postgres.getPassword());
// Override default setting, which wait indefinitely if database is down
dataSource.setLoginTimeout(5);

dataSource.getConnection().prepareStatement(SCHEMA).execute();
dataSource.getConnection().prepareStatement(INIT_TABLE_QUERY).execute();




}

private PgAsyncPool pgAsyncPool(PostgreSQLContainer server) {
this.vertx = Vertx.vertx();
DefaultConfiguration jooqConfig = new DefaultConfiguration();
jooqConfig.setSQLDialect(SQLDialect.POSTGRES);

final PgConnectOptions options = new PgConnectOptions()
.setPort(server.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT))
.setHost(server.getContainerIpAddress())
.setDatabase(server.getDatabaseName())
.setUser(server.getUsername())
.setPassword(server.getPassword());
PoolOptions poolOptions = new PoolOptions().setMaxSize(50);
this.pgPool = PgPool.pool(vertx, options, poolOptions);

return new ReactivePgAsyncPool(pgPool, jooqConfig);
}

@Override
public EventProcessor<String, TestState, TestCommand, TestEvent, Connection, Tuple0, Tuple0, Tuple0> eventProcessor(String topic) {


return ReactivePostgresKafkaEventProcessor
.withSystem(ActorSystem.create())
.withPgAsyncPool(this.pgAsyncPool)
.withTables(tableNames)
.withTransactionManager()
.withEventFormater(eventFormat)
.withNoMetaFormater()
.withNoContextFormater()
.withKafkaSettings(topic, producerSettings(settings(), new TestEventFormat()))
.withEventHandler(new TestEventHandler())
.withDefaultAggregateStore()
.withCommandHandler(new TestCommandHandler<>())
.withProjections(this.testProjection)
.build();
}

@Override
public String kafkaBootstrapUrl() {
return kafka.getBootstrapServers();
}

@Override
public void shutdownBroker() {
pauseContainer(kafka);
}

@Override
public void restartBroker() {
unPauseContainer(kafka);
}

@Override
public void shutdownDatabase() {
pauseContainer(postgres);
}

@Override
public void restartDatabase() {
unPauseContainer(postgres);
}

private void pauseContainer(GenericContainer container) {
container.getDockerClient().pauseContainerCmd(container.getContainerId()).exec();
}

private void unPauseContainer(GenericContainer container) {
container.getDockerClient().unpauseContainerCmd(container.getContainerId()).exec();
}

private KafkaSettings settings() {
return KafkaSettings.newBuilder(kafka.getBootstrapServers()).build();
}

private ProducerSettings<String, EventEnvelope<TestEvent, Tuple0, Tuple0>> producerSettings(
KafkaSettings kafkaSettings,
JacksonEventFormat<String, TestEvent> eventFormat) {
return kafkaSettings.producerSettings(actorSystem, JsonSerializer.of(
eventFormat,
JacksonSimpleFormat.empty(),
JacksonSimpleFormat.empty()
)
);
}

@Override
public List<EventEnvelope<TestEvent, Tuple0, Tuple0>> readPublishedEvents(String kafkaBootstrapUrl, String topic) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
String groupId = "reader-" + UUID.randomUUID();
Optional<Long> maybeLastOffset = getEndOffsetIfNotReached(topic, kafkaBootstrapUrl, groupId);
if (!maybeLastOffset.isPresent()) {
return Collections.emptyList();
}
long lastOffset = maybeLastOffset.get();

Properties props = new Properties();
props.put("bootstrap.servers", kafkaBootstrapUrl);
props.put("group.id", groupId);
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

consumer.subscribe(Collections.singletonList(topic));

boolean running = true;
List<EventEnvelope<TestEvent, Tuple0, Tuple0>> envelopes = new ArrayList<>();
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, String> record : records) {
final long offset = record.offset();
if (offset >= lastOffset) {
running = false;
}
envelopes.add(parsEnvelope(record.value()));
}
consumer.commitSync();
}
consumer.close();
return envelopes;
}

@Override
public Integer readProjection() {
try(final ResultSet resultSet = this.dataSource.getConnection()
.prepareStatement("SELECT counter::numeric FROM test_projection LIMIT 1").executeQuery()) {
resultSet.next();
return resultSet.getInt(1);
} catch (SQLException throwables) {
throwables.printStackTrace();
}
return null;
}

@Override
public Integer readConsistentProjection() {
return consistentProjection.getCount();
}

public EventEnvelope<TestEvent, Tuple0, Tuple0> parsEnvelope(String value) {
try {
ObjectMapper mapper = new ObjectMapper();
ObjectNode node = (ObjectNode) mapper.readTree(value);
CompletableFuture<EventEnvelope<TestEvent, Tuple0, Tuple0>> future = new CompletableFuture<>();
EventEnvelopeJson.deserialize(
node,
eventFormat,
JacksonSimpleFormat.empty(),
JacksonSimpleFormat.empty(),
(event, err) -> {
future.completeExceptionally(new RuntimeException(err.toString()));
},
future::complete
);
return future.get();
} catch (IOException e) {
throw new UncheckedIOException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
}



Loading

0 comments on commit f6c6ed9

Please sign in to comment.