Skip to content

Commit

Permalink
tck consitent projection
Browse files Browse the repository at this point in the history
  • Loading branch information
MatthieuMAIF committed Jun 1, 2021
1 parent ce4d24e commit 44f89aa
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.concurrent.Executors;

import fr.maif.eventsourcing.Projection;
import fr.maif.eventsourcing.datastore.TestConsistentProjection;
import fr.maif.eventsourcing.datastore.TestProjection;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand Down Expand Up @@ -101,6 +102,7 @@ public void initClass() {
postgres.start();
kafka = new KafkaContainer();
kafka.start();
consistentProjection = new TestConsistentProjection(actorSystem,kafka.getBootstrapServers(),eventFormat,dataSource);
}


Expand Down Expand Up @@ -238,6 +240,11 @@ public Integer readProjection() {
return ((TestProjection)this.testProjection).getCount();
}

@Override
public Integer readConsistentProjection() {
return consistentProjection.getCount();
}

private static Optional<Long> getEndOffsetIfNotReached(String topic, String kafkaServers, String groupId) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public abstract class DataStoreVerification<TxCtx> implements DataStoreVerificat
public abstract EventProcessor<String, TestState, TestCommand, TestEvent, TxCtx, Tuple0, Tuple0, Tuple0> eventProcessor(String topic);
public abstract String kafkaBootstrapUrl();

protected TestConsistentProjection consistentProjection;
@Override
@Test
public void required_submitValidSingleEventCommandMustWriteEventInDataStore() {
Expand Down Expand Up @@ -236,6 +237,33 @@ public void required_eventShouldNotBeConsumedByProjectionEvenIfDataBaseIsBroken
restartDatabase();
}

@Override
public void required_eventShouldBeConsumedByConsistentProjectionWhenEverythingIsAlright() {

String topic = randomKafkaTopic();
consistentProjection.init(topic);
EventProcessor<String, TestState, TestCommand, TestEvent, TxCtx, Tuple0, Tuple0, Tuple0> eventProcessor = eventProcessor(topic);
submitValidCommand(eventProcessor, "1");
sleep();

cleanup(eventProcessor);
assertThat(readConsistentProjection()).isEqualTo(1);
}

@Override
public void required_eventShouldBeConsumedByConsistentProjectionEvenIfBrokerIsDownAtFirst() {
String topic = randomKafkaTopic();
consistentProjection.init(topic);
EventProcessor<String, TestState, TestCommand, TestEvent, TxCtx, Tuple0, Tuple0, Tuple0> eventProcessor = eventProcessor(topic);
shutdownBroker();
submitValidCommand(eventProcessor, "1");
sleep();
restartBroker();
sleep();
cleanup(eventProcessor);
assertThat(readConsistentProjection()).isEqualTo(1);
}

private void sleep() {
try {
Thread.sleep(10000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public interface DataStoreVerificationRules<Ste extends State, Evt extends Event
void submitDeleteCommand(EventProcessor<String, TestState, TestCommand, TestEvent, TxCtx, Tuple0, Tuple0, Tuple0> eventProcessor, String id);
List<EventEnvelope<TestEvent, Tuple0, Tuple0>> readPublishedEvents(String kafkaBootstrapUrl, String topic);
Integer readProjection();
Integer readConsistentProjection();
void shutdownBroker();
void restartBroker();
void shutdownDatabase();
Expand All @@ -43,7 +44,8 @@ public interface DataStoreVerificationRules<Ste extends State, Evt extends Event
void required_eventShouldBeConsumedByProjectionEvenIfBrokerIsDownAtFirst();
void required_eventShouldNotBeConsumedByProjectionEvenIfDataBaseIsBroken();


void required_eventShouldBeConsumedByConsistentProjectionWhenEverythingIsAlright();
void required_eventShouldBeConsumedByConsistentProjectionEvenIfBrokerIsDownAtFirst();
List<EventEnvelope<Evt, Meta, Context>> readFromDataStore(EventStore<TxCtx, TestEvent, Tuple0, Tuple0> eventStore);

default void cleanup(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package fr.maif.eventsourcing.datastore;

import akka.actor.ActorSystem;
import fr.maif.projections.EventuallyConsistentProjection;
import io.vavr.Tuple;
import io.vavr.concurrent.Future;

import javax.sql.DataSource;
import java.sql.PreparedStatement;
import java.sql.SQLException;

public class TestConsistentProjection {

private int counter = 0;
private final ActorSystem actorSystem;
private final String bootstrapServer;
private final TestEventFormat eventFormat;
private final DataSource dataSource;

public TestConsistentProjection(
ActorSystem actorSystem,
String bootstrapServer,
TestEventFormat eventFormat,
DataSource dataSource) {
this.actorSystem = actorSystem;
this.eventFormat = eventFormat;
this.dataSource = dataSource;
this.bootstrapServer =bootstrapServer;
}


public void init(String topic) {
this.counter = 0;
EventuallyConsistentProjection.create(
ActorSystem.create(),
"TestConsistentProjection",
EventuallyConsistentProjection.Config.create(topic, "TestConsistentProjection", bootstrapServer),
eventFormat,
envelope ->
Future.of(() -> {
if (envelope.event instanceof TestEvent.SimpleEvent){
counter++;
}
return Tuple.empty();
})

).start();
}

public int getCount() {
return counter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,12 @@
import java.util.concurrent.ExecutionException;
import java.util.function.Function;

import fr.maif.eventsourcing.Projection;
import org.mockito.Mockito;
import org.testng.annotations.BeforeMethod;

import akka.actor.ActorSystem;
import akka.stream.javadsl.Sink;
import fr.maif.eventsourcing.EventEnvelope;
import fr.maif.eventsourcing.EventProcessor;
import fr.maif.eventsourcing.EventStore;
import fr.maif.eventsourcing.TransactionManager;
import fr.maif.eventsourcing.impl.InMemoryEventStore;
import io.vavr.Tuple;
Expand Down Expand Up @@ -54,6 +51,12 @@ public Integer readProjection() {
return null;
}

@Override
public Integer readConsistentProjection() {
// Not implemented for in memory
return null;
}

@Override
public void required_eventShouldBeConsumedByProjectionWhenEverythingIsAlright(){
// Not implemented for in memory
Expand All @@ -66,17 +69,25 @@ public void required_eventShouldBeConsumedByProjectionEvenIfBrokerIsDownAtFirst(
public void required_eventShouldNotBeConsumedByProjectionEvenIfDataBaseIsBroken(){
// Not implemented for in memory
}

@Override
public void required_commandSubmissionShouldFailIfDatabaseIsNotAvailable() {
// Not implemented for in memory
}


@Override
public void required_eventShouldBePublishedEventIfBrokerIsDownAtFirst() {
// Not implemented for in memory
}

@Override
public void required_eventShouldBeConsumedByConsistentProjectionWhenEverythingIsAlright() {
// Not implemented for in memory
}
@Override
public void required_eventShouldBeConsumedByConsistentProjectionEvenIfBrokerIsDownAtFirst() {
// Not implemented for in memory
}

@Override
public void shutdownBroker() {
throw new RuntimeException("Not implemented for in memory");
Expand Down

0 comments on commit 44f89aa

Please sign in to comment.