Skip to content

Commit

Permalink
chore(kafka): Implement kafka binding with support for Domain Events (#…
Browse files Browse the repository at this point in the history
…116)

chore(kafka): Implement kafka binding with support for Domain Events
---------
Co-authored-by: AndresFelipe11
Co-authored-by: jespinosas
  • Loading branch information
juancgalvis authored Aug 20, 2024
1 parent 0233d1a commit 7ced50c
Show file tree
Hide file tree
Showing 41 changed files with 1,934 additions and 11 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -644,4 +644,5 @@ MigrationBackup/
# End of https://www.toptal.com/developers/gitignore/api/macos,linux,windows,gradle,java,intellij,visualstudio,eclipse
contiperf-report

samples/async/local-example/
samples/async/local-example/
.kafka-env
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@
import org.junit.jupiter.api.Test;
import org.reactivecommons.api.domain.Command;
import org.reactivecommons.api.domain.DomainEvent;
import org.reactivecommons.async.api.handlers.*;
import org.reactivecommons.async.api.handlers.CloudCommandHandler;
import org.reactivecommons.async.api.handlers.CloudEventHandler;
import org.reactivecommons.async.api.handlers.DomainCommandHandler;
import org.reactivecommons.async.api.handlers.DomainEventHandler;
import org.reactivecommons.async.api.handlers.QueryHandler;
import org.reactivecommons.async.api.handlers.QueryHandlerDelegate;
import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler;
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler;
Expand Down Expand Up @@ -152,7 +157,7 @@ void handleDomainCommand() {

@Test
void handleCloudEventCommand() {
SomeCloudEventCommandHandler cloudCommandHandler = new SomeCloudEventCommandHandler();
SomeCloudCommandHandler cloudCommandHandler = new SomeCloudCommandHandler();

registry.handleCloudEventCommand(name, cloudCommandHandler);

Expand Down Expand Up @@ -197,7 +202,7 @@ void serveQueryWithLambda() {
@Test
void serveQueryWithTypeInference() {
QueryHandler<SomeDataClass, SomeDataClass> handler = new SomeQueryHandler();
registry.serveQuery(name, handler,SomeDataClass.class);
registry.serveQuery(name, handler, SomeDataClass.class);
assertThat(registry.getHandlers()).anySatisfy(registered -> {
assertThat(registered).extracting(RegisteredQueryHandler::getPath, RegisteredQueryHandler::getQueryClass)
.containsExactly(name, SomeDataClass.class);
Expand Down Expand Up @@ -252,7 +257,7 @@ public Mono<Void> handle(Command<SomeDataClass> message) {
}
}

private static class SomeCloudEventCommandHandler implements CloudCommandHandler {
private static class SomeCloudCommandHandler implements CloudCommandHandler {
@Override
public Mono<Void> handle(CloudEvent message) {
return null;
Expand Down
12 changes: 12 additions & 0 deletions async/async-kafka/async-kafka.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
ext {
artifactId = 'async-kafka'
artifactDescription = 'Async Kafka'
}

dependencies {
api project(':async-commons-api')
api project(':domain-events-api')
api project(':async-commons')
api 'io.projectreactor.kafka:reactor-kafka:1.3.23'
api 'io.cloudevents:cloudevents-json-jackson:4.0.1'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.reactivecommons.async.kafka;

import io.cloudevents.CloudEvent;
import lombok.AllArgsConstructor;
import org.reactivecommons.api.domain.DomainEvent;
import org.reactivecommons.api.domain.DomainEventBus;
import org.reactivecommons.async.kafka.communications.ReactiveMessageSender;
import org.reactivestreams.Publisher;

@AllArgsConstructor
public class KafkaDomainEventBus implements DomainEventBus {
private final ReactiveMessageSender sender;

@Override
public <T> Publisher<Void> emit(DomainEvent<T> event) {
return sender.send(event);
}

@Override
public Publisher<Void> emit(CloudEvent event) {
return sender.send(event);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package org.reactivecommons.async.kafka;

import lombok.Data;
import org.apache.kafka.common.header.Headers;
import org.reactivecommons.async.commons.communications.Message;
import reactor.kafka.receiver.ReceiverRecord;

import java.util.HashMap;
import java.util.Map;

import static org.reactivecommons.async.kafka.converters.json.KafkaJacksonMessageConverter.CONTENT_TYPE;


@Data
public class KafkaMessage implements Message {
private final byte[] body;
private final Properties properties;

@Data
public static class KafkaMessageProperties implements Properties {
private long contentLength;
private String key;
private String topic;
private Map<String, Object> headers = new HashMap<>();

@Override
public String getContentType() {
return (String) headers.get(CONTENT_TYPE);
}
}

public static KafkaMessage fromDelivery(ReceiverRecord<String, byte[]> record) {
return new KafkaMessage(record.value(), createMessageProps(record));
}

private static Properties createMessageProps(ReceiverRecord<String, byte[]> record) {
Map<String, Object> headers = parseHeaders(record.headers());

final KafkaMessageProperties properties = new KafkaMessageProperties();
properties.setHeaders(headers);
properties.setKey(record.key());
properties.setTopic(record.topic());
properties.setContentLength(record.value().length);
return properties;
}

private static Map<String, Object> parseHeaders(Headers headers) {
Map<String, Object> parsedHeaders = new HashMap<>();
headers.forEach(header -> parsedHeaders.put(header.key(), new String(header.value())));
return parsedHeaders;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.reactivecommons.async.kafka.communications;

import lombok.AllArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import reactor.core.publisher.Flux;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverRecord;

import java.util.List;


@AllArgsConstructor
public class ReactiveMessageListener {
private final ReceiverOptions<String, byte[]> receiverOptions;

public Flux<ReceiverRecord<String, byte[]>> listen(String groupId, List<String> topics) { // Notification events
ReceiverOptions<String, byte[]> options = receiverOptions.consumerProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return KafkaReceiver.create(options.subscription(topics))
.receive();
}

public int getMaxConcurrency() {
Object property = receiverOptions.consumerProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG);
if (property instanceof Integer) {
return (int) property;
}
return ConsumerConfig.DEFAULT_MAX_POLL_RECORDS;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package org.reactivecommons.async.kafka.communications;

import lombok.SneakyThrows;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.reactivecommons.async.commons.converters.MessageConverter;
import org.reactivecommons.async.kafka.KafkaMessage;
import org.reactivecommons.async.kafka.communications.topology.TopologyCreator;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderRecord;
import reactor.kafka.sender.SenderResult;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

public class ReactiveMessageSender {
private final ConcurrentHashMap<String, MonoSink<Void>> confirmations = new ConcurrentHashMap<>();
private final CopyOnWriteArrayList<FluxSink<SenderRecord<String, byte[], String>>> fluxSinks = new CopyOnWriteArrayList<>();
private final AtomicLong counter = new AtomicLong();

private final ExecutorService executorServiceConfirm = Executors.newFixedThreadPool(13, r -> new Thread(r, "KMessageSender1-" + counter.getAndIncrement()));
private final ExecutorService executorServiceEmit = Executors.newFixedThreadPool(13, r -> new Thread(r, "KMessageSender2-" + counter.getAndIncrement()));

private final int senderCount = 4;

private final MessageConverter messageConverter;
private final TopologyCreator topologyCreator;

public ReactiveMessageSender(KafkaSender<String, byte[]> sender, MessageConverter messageConverter,
TopologyCreator topologyCreator) {
this.messageConverter = messageConverter;
this.topologyCreator = topologyCreator;
for (int i = 0; i < senderCount; ++i) {
Flux<SenderRecord<String, byte[], String>> source = Flux.create(fluxSinks::add);
sender.send(source)
.doOnNext(this::confirm)
.subscribe();
}
}

public <V> Mono<Void> send(V message) {
return Mono.create(sink -> {
SenderRecord<String, byte[], String> record = createRecord(message);
confirmations.put(record.key(), sink);
executorServiceEmit.submit(() -> fluxSinks.get((int) (System.currentTimeMillis() % senderCount)).next(record));
});
}

private void confirm(SenderResult<String> result) {
executorServiceConfirm.submit(() -> {
MonoSink<Void> sink = confirmations.remove(result.correlationMetadata());
if (sink != null) {
if (result.exception() != null) {
sink.error(result.exception());
} else {
sink.success();
}
}
});
}

private <V> SenderRecord<String, byte[], String> createRecord(V object) {
KafkaMessage message = (KafkaMessage) messageConverter.toMessage(object);
ProducerRecord<String, byte[]> record = createProducerRecord(message);
return SenderRecord.create(record, message.getProperties().getKey()); // TODO: Review for Request-Reply
}

@SneakyThrows
private ProducerRecord<String, byte[]> createProducerRecord(KafkaMessage message) {
topologyCreator.checkTopic(message.getProperties().getTopic());

List<Header> headers = message.getProperties().getHeaders().entrySet().stream()
.map(entry -> new RecordHeader(entry.getKey(), entry.getValue()
.toString().getBytes(StandardCharsets.UTF_8)))
.collect(Collectors.toList());

return new ProducerRecord<>(message.getProperties().getTopic(), null,
message.getProperties().getKey(), message.getBody(), headers);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.reactivecommons.async.kafka.communications.exceptions;

public class TopicNotFoundException extends RuntimeException {
public TopicNotFoundException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.reactivecommons.async.kafka.communications.topology;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.HashMap;
import java.util.Map;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class KafkaCustomizations {
private Map<String, TopicCustomization> topics = new HashMap<>();

public static KafkaCustomizations withTopic(String topic, TopicCustomization customization) {
KafkaCustomizations customizations = new KafkaCustomizations();
customizations.getTopics().put(topic, customization);
return customizations;
}

public KafkaCustomizations addTopic(String topic, TopicCustomization customization) {
this.getTopics().put(topic, customization);
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.reactivecommons.async.kafka.communications.topology;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.Map;

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class TopicCustomization {
private String topic;
private int partitions;
private short replicationFactor;
private Map<String, String> config;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package org.reactivecommons.async.kafka.communications.topology;

import lombok.SneakyThrows;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.errors.TopicExistsException;
import org.reactivecommons.async.kafka.communications.exceptions.TopicNotFoundException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class TopologyCreator {
public static final int TIMEOUT_MS = 60_000;
private final AdminClient adminClient;
private final KafkaCustomizations customizations;
private final Map<String, Boolean> existingTopics;

public TopologyCreator(AdminClient adminClient, KafkaCustomizations customizations) {
this.adminClient = adminClient;
this.customizations = customizations;
this.existingTopics = getTopics();
}

@SneakyThrows
public Map<String, Boolean> getTopics() {
ListTopicsResult topics = adminClient.listTopics(new ListTopicsOptions().timeoutMs(TIMEOUT_MS));
return topics.names().get().stream().collect(Collectors.toConcurrentMap(name -> name, name -> true));
}

public Mono<Void> createTopics(List<String> topics) {
TopicCustomization.TopicCustomizationBuilder defaultBuilder = TopicCustomization.builder()
.partitions(-1)
.replicationFactor((short) -1);

return Flux.fromIterable(topics)
.map(topic -> {
if (customizations.getTopics().containsKey(topic)) {
return customizations.getTopics().get(topic);
}
return defaultBuilder.topic(topic).build();
})
.map(this::toNewTopic)
.flatMap(this::createTopic)
.doOnNext(topic -> existingTopics.put(topic.name(), true))
.then();
}

protected Mono<NewTopic> createTopic(NewTopic topic) {
return Mono.fromFuture(adminClient.createTopics(List.of(topic))
.all()
.toCompletionStage()
.toCompletableFuture())
.thenReturn(topic)
.onErrorResume(TopicExistsException.class, e -> Mono.just(topic));
}

protected NewTopic toNewTopic(TopicCustomization customization) {
NewTopic topic = new NewTopic(customization.getTopic(), customization.getPartitions(), customization.getReplicationFactor());
if (customization.getConfig() != null) {
return topic.configs(customization.getConfig());
}
return topic;
}

public void checkTopic(String topicName) {
if (!existingTopics.containsKey(topicName)) {
throw new TopicNotFoundException("Topic not found: " + topicName + ". Please create it before send a message.");
// TODO: should refresh topics?? getTopics();
}
}
}
Loading

0 comments on commit 7ced50c

Please sign in to comment.