diff --git a/.gitignore b/.gitignore index 8a16dda7..47bd6ef3 100644 --- a/.gitignore +++ b/.gitignore @@ -644,4 +644,5 @@ MigrationBackup/ # End of https://www.toptal.com/developers/gitignore/api/macos,linux,windows,gradle,java,intellij,visualstudio,eclipse contiperf-report -samples/async/local-example/ \ No newline at end of file +samples/async/local-example/ +.kafka-env \ No newline at end of file diff --git a/async/async-commons-api/src/test/java/org/reactivecommons/async/api/HandlerRegistryTest.java b/async/async-commons-api/src/test/java/org/reactivecommons/async/api/HandlerRegistryTest.java index 0f73e4b1..5cda7121 100644 --- a/async/async-commons-api/src/test/java/org/reactivecommons/async/api/HandlerRegistryTest.java +++ b/async/async-commons-api/src/test/java/org/reactivecommons/async/api/HandlerRegistryTest.java @@ -5,7 +5,12 @@ import org.junit.jupiter.api.Test; import org.reactivecommons.api.domain.Command; import org.reactivecommons.api.domain.DomainEvent; -import org.reactivecommons.async.api.handlers.*; +import org.reactivecommons.async.api.handlers.CloudCommandHandler; +import org.reactivecommons.async.api.handlers.CloudEventHandler; +import org.reactivecommons.async.api.handlers.DomainCommandHandler; +import org.reactivecommons.async.api.handlers.DomainEventHandler; +import org.reactivecommons.async.api.handlers.QueryHandler; +import org.reactivecommons.async.api.handlers.QueryHandlerDelegate; import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler; import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener; import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler; @@ -152,7 +157,7 @@ void handleDomainCommand() { @Test void handleCloudEventCommand() { - SomeCloudEventCommandHandler cloudCommandHandler = new SomeCloudEventCommandHandler(); + SomeCloudCommandHandler cloudCommandHandler = new SomeCloudCommandHandler(); registry.handleCloudEventCommand(name, cloudCommandHandler); @@ -197,7 +202,7 @@ void serveQueryWithLambda() { @Test void serveQueryWithTypeInference() { QueryHandler handler = new SomeQueryHandler(); - registry.serveQuery(name, handler,SomeDataClass.class); + registry.serveQuery(name, handler, SomeDataClass.class); assertThat(registry.getHandlers()).anySatisfy(registered -> { assertThat(registered).extracting(RegisteredQueryHandler::getPath, RegisteredQueryHandler::getQueryClass) .containsExactly(name, SomeDataClass.class); @@ -252,7 +257,7 @@ public Mono handle(Command message) { } } - private static class SomeCloudEventCommandHandler implements CloudCommandHandler { + private static class SomeCloudCommandHandler implements CloudCommandHandler { @Override public Mono handle(CloudEvent message) { return null; diff --git a/async/async-kafka/async-kafka.gradle b/async/async-kafka/async-kafka.gradle new file mode 100644 index 00000000..a24b92b5 --- /dev/null +++ b/async/async-kafka/async-kafka.gradle @@ -0,0 +1,12 @@ +ext { + artifactId = 'async-kafka' + artifactDescription = 'Async Kafka' +} + +dependencies { + api project(':async-commons-api') + api project(':domain-events-api') + api project(':async-commons') + api 'io.projectreactor.kafka:reactor-kafka:1.3.23' + api 'io.cloudevents:cloudevents-json-jackson:4.0.1' +} \ No newline at end of file diff --git a/async/async-kafka/src/main/java/org/reactivecommons/async/kafka/KafkaDomainEventBus.java b/async/async-kafka/src/main/java/org/reactivecommons/async/kafka/KafkaDomainEventBus.java new file mode 100644 index 00000000..d24f0a76 --- /dev/null +++ b/async/async-kafka/src/main/java/org/reactivecommons/async/kafka/KafkaDomainEventBus.java @@ -0,0 +1,23 @@ +package org.reactivecommons.async.kafka; + +import io.cloudevents.CloudEvent; +import lombok.AllArgsConstructor; +import org.reactivecommons.api.domain.DomainEvent; +import org.reactivecommons.api.domain.DomainEventBus; +import org.reactivecommons.async.kafka.communications.ReactiveMessageSender; +import org.reactivestreams.Publisher; + +@AllArgsConstructor +public class KafkaDomainEventBus implements DomainEventBus { + private final ReactiveMessageSender sender; + + @Override + public Publisher emit(DomainEvent event) { + return sender.send(event); + } + + @Override + public Publisher emit(CloudEvent event) { + return sender.send(event); + } +} diff --git a/async/async-kafka/src/main/java/org/reactivecommons/async/kafka/KafkaMessage.java b/async/async-kafka/src/main/java/org/reactivecommons/async/kafka/KafkaMessage.java new file mode 100644 index 00000000..7c5092de --- /dev/null +++ b/async/async-kafka/src/main/java/org/reactivecommons/async/kafka/KafkaMessage.java @@ -0,0 +1,52 @@ +package org.reactivecommons.async.kafka; + +import lombok.Data; +import org.apache.kafka.common.header.Headers; +import org.reactivecommons.async.commons.communications.Message; +import reactor.kafka.receiver.ReceiverRecord; + +import java.util.HashMap; +import java.util.Map; + +import static org.reactivecommons.async.kafka.converters.json.KafkaJacksonMessageConverter.CONTENT_TYPE; + + +@Data +public class KafkaMessage implements Message { + private final byte[] body; + private final Properties properties; + + @Data + public static class KafkaMessageProperties implements Properties { + private long contentLength; + private String key; + private String topic; + private Map headers = new HashMap<>(); + + @Override + public String getContentType() { + return (String) headers.get(CONTENT_TYPE); + } + } + + public static KafkaMessage fromDelivery(ReceiverRecord record) { + return new KafkaMessage(record.value(), createMessageProps(record)); + } + + private static Properties createMessageProps(ReceiverRecord record) { + Map headers = parseHeaders(record.headers()); + + final KafkaMessageProperties properties = new KafkaMessageProperties(); + properties.setHeaders(headers); + properties.setKey(record.key()); + properties.setTopic(record.topic()); + properties.setContentLength(record.value().length); + return properties; + } + + private static Map parseHeaders(Headers headers) { + Map parsedHeaders = new HashMap<>(); + headers.forEach(header -> parsedHeaders.put(header.key(), new String(header.value()))); + return parsedHeaders; + } +} diff --git a/async/async-kafka/src/main/java/org/reactivecommons/async/kafka/communications/ReactiveMessageListener.java b/async/async-kafka/src/main/java/org/reactivecommons/async/kafka/communications/ReactiveMessageListener.java new file mode 100644 index 00000000..2a02808d --- /dev/null +++ b/async/async-kafka/src/main/java/org/reactivecommons/async/kafka/communications/ReactiveMessageListener.java @@ -0,0 +1,30 @@ +package org.reactivecommons.async.kafka.communications; + +import lombok.AllArgsConstructor; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import reactor.core.publisher.Flux; +import reactor.kafka.receiver.KafkaReceiver; +import reactor.kafka.receiver.ReceiverOptions; +import reactor.kafka.receiver.ReceiverRecord; + +import java.util.List; + + +@AllArgsConstructor +public class ReactiveMessageListener { + private final ReceiverOptions receiverOptions; + + public Flux> listen(String groupId, List topics) { // Notification events + ReceiverOptions options = receiverOptions.consumerProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); + return KafkaReceiver.create(options.subscription(topics)) + .receive(); + } + + public int getMaxConcurrency() { + Object property = receiverOptions.consumerProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG); + if (property instanceof Integer) { + return (int) property; + } + return ConsumerConfig.DEFAULT_MAX_POLL_RECORDS; + } +} diff --git a/async/async-kafka/src/main/java/org/reactivecommons/async/kafka/communications/ReactiveMessageSender.java b/async/async-kafka/src/main/java/org/reactivecommons/async/kafka/communications/ReactiveMessageSender.java new file mode 100644 index 00000000..7e13d05a --- /dev/null +++ b/async/async-kafka/src/main/java/org/reactivecommons/async/kafka/communications/ReactiveMessageSender.java @@ -0,0 +1,91 @@ +package org.reactivecommons.async.kafka.communications; + +import lombok.SneakyThrows; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.reactivecommons.async.commons.converters.MessageConverter; +import org.reactivecommons.async.kafka.KafkaMessage; +import org.reactivecommons.async.kafka.communications.topology.TopologyCreator; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; +import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoSink; +import reactor.kafka.sender.KafkaSender; +import reactor.kafka.sender.SenderRecord; +import reactor.kafka.sender.SenderResult; + +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; + +public class ReactiveMessageSender { + private final ConcurrentHashMap> confirmations = new ConcurrentHashMap<>(); + private final CopyOnWriteArrayList>> fluxSinks = new CopyOnWriteArrayList<>(); + private final AtomicLong counter = new AtomicLong(); + + private final ExecutorService executorServiceConfirm = Executors.newFixedThreadPool(13, r -> new Thread(r, "KMessageSender1-" + counter.getAndIncrement())); + private final ExecutorService executorServiceEmit = Executors.newFixedThreadPool(13, r -> new Thread(r, "KMessageSender2-" + counter.getAndIncrement())); + + private final int senderCount = 4; + + private final MessageConverter messageConverter; + private final TopologyCreator topologyCreator; + + public ReactiveMessageSender(KafkaSender sender, MessageConverter messageConverter, + TopologyCreator topologyCreator) { + this.messageConverter = messageConverter; + this.topologyCreator = topologyCreator; + for (int i = 0; i < senderCount; ++i) { + Flux> source = Flux.create(fluxSinks::add); + sender.send(source) + .doOnNext(this::confirm) + .subscribe(); + } + } + + public Mono send(V message) { + return Mono.create(sink -> { + SenderRecord record = createRecord(message); + confirmations.put(record.key(), sink); + executorServiceEmit.submit(() -> fluxSinks.get((int) (System.currentTimeMillis() % senderCount)).next(record)); + }); + } + + private void confirm(SenderResult result) { + executorServiceConfirm.submit(() -> { + MonoSink sink = confirmations.remove(result.correlationMetadata()); + if (sink != null) { + if (result.exception() != null) { + sink.error(result.exception()); + } else { + sink.success(); + } + } + }); + } + + private SenderRecord createRecord(V object) { + KafkaMessage message = (KafkaMessage) messageConverter.toMessage(object); + ProducerRecord record = createProducerRecord(message); + return SenderRecord.create(record, message.getProperties().getKey()); // TODO: Review for Request-Reply + } + + @SneakyThrows + private ProducerRecord createProducerRecord(KafkaMessage message) { + topologyCreator.checkTopic(message.getProperties().getTopic()); + + List
headers = message.getProperties().getHeaders().entrySet().stream() + .map(entry -> new RecordHeader(entry.getKey(), entry.getValue() + .toString().getBytes(StandardCharsets.UTF_8))) + .collect(Collectors.toList()); + + return new ProducerRecord<>(message.getProperties().getTopic(), null, + message.getProperties().getKey(), message.getBody(), headers); + } +} diff --git a/async/async-kafka/src/main/java/org/reactivecommons/async/kafka/communications/exceptions/TopicNotFoundException.java b/async/async-kafka/src/main/java/org/reactivecommons/async/kafka/communications/exceptions/TopicNotFoundException.java new file mode 100644 index 00000000..3f8496e8 --- /dev/null +++ b/async/async-kafka/src/main/java/org/reactivecommons/async/kafka/communications/exceptions/TopicNotFoundException.java @@ -0,0 +1,7 @@ +package org.reactivecommons.async.kafka.communications.exceptions; + +public class TopicNotFoundException extends RuntimeException { + public TopicNotFoundException(String message) { + super(message); + } +} diff --git a/async/async-kafka/src/main/java/org/reactivecommons/async/kafka/communications/topology/KafkaCustomizations.java b/async/async-kafka/src/main/java/org/reactivecommons/async/kafka/communications/topology/KafkaCustomizations.java new file mode 100644 index 00000000..242b7ab9 --- /dev/null +++ b/async/async-kafka/src/main/java/org/reactivecommons/async/kafka/communications/topology/KafkaCustomizations.java @@ -0,0 +1,26 @@ +package org.reactivecommons.async.kafka.communications.topology; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.HashMap; +import java.util.Map; + +@Data +@AllArgsConstructor +@NoArgsConstructor +public class KafkaCustomizations { + private Map topics = new HashMap<>(); + + public static KafkaCustomizations withTopic(String topic, TopicCustomization customization) { + KafkaCustomizations customizations = new KafkaCustomizations(); + customizations.getTopics().put(topic, customization); + return customizations; + } + + public KafkaCustomizations addTopic(String topic, TopicCustomization customization) { + this.getTopics().put(topic, customization); + return this; + } +} diff --git a/async/async-kafka/src/main/java/org/reactivecommons/async/kafka/communications/topology/TopicCustomization.java b/async/async-kafka/src/main/java/org/reactivecommons/async/kafka/communications/topology/TopicCustomization.java new file mode 100644 index 00000000..64821e9f --- /dev/null +++ b/async/async-kafka/src/main/java/org/reactivecommons/async/kafka/communications/topology/TopicCustomization.java @@ -0,0 +1,19 @@ +package org.reactivecommons.async.kafka.communications.topology; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.Map; + +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +public class TopicCustomization { + private String topic; + private int partitions; + private short replicationFactor; + private Map config; +} diff --git a/async/async-kafka/src/main/java/org/reactivecommons/async/kafka/communications/topology/TopologyCreator.java b/async/async-kafka/src/main/java/org/reactivecommons/async/kafka/communications/topology/TopologyCreator.java new file mode 100644 index 00000000..9c1385cc --- /dev/null +++ b/async/async-kafka/src/main/java/org/reactivecommons/async/kafka/communications/topology/TopologyCreator.java @@ -0,0 +1,76 @@ +package org.reactivecommons.async.kafka.communications.topology; + +import lombok.SneakyThrows; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.ListTopicsOptions; +import org.apache.kafka.clients.admin.ListTopicsResult; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.common.errors.TopicExistsException; +import org.reactivecommons.async.kafka.communications.exceptions.TopicNotFoundException; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class TopologyCreator { + public static final int TIMEOUT_MS = 60_000; + private final AdminClient adminClient; + private final KafkaCustomizations customizations; + private final Map existingTopics; + + public TopologyCreator(AdminClient adminClient, KafkaCustomizations customizations) { + this.adminClient = adminClient; + this.customizations = customizations; + this.existingTopics = getTopics(); + } + + @SneakyThrows + public Map getTopics() { + ListTopicsResult topics = adminClient.listTopics(new ListTopicsOptions().timeoutMs(TIMEOUT_MS)); + return topics.names().get().stream().collect(Collectors.toConcurrentMap(name -> name, name -> true)); + } + + public Mono createTopics(List topics) { + TopicCustomization.TopicCustomizationBuilder defaultBuilder = TopicCustomization.builder() + .partitions(-1) + .replicationFactor((short) -1); + + return Flux.fromIterable(topics) + .map(topic -> { + if (customizations.getTopics().containsKey(topic)) { + return customizations.getTopics().get(topic); + } + return defaultBuilder.topic(topic).build(); + }) + .map(this::toNewTopic) + .flatMap(this::createTopic) + .doOnNext(topic -> existingTopics.put(topic.name(), true)) + .then(); + } + + protected Mono createTopic(NewTopic topic) { + return Mono.fromFuture(adminClient.createTopics(List.of(topic)) + .all() + .toCompletionStage() + .toCompletableFuture()) + .thenReturn(topic) + .onErrorResume(TopicExistsException.class, e -> Mono.just(topic)); + } + + protected NewTopic toNewTopic(TopicCustomization customization) { + NewTopic topic = new NewTopic(customization.getTopic(), customization.getPartitions(), customization.getReplicationFactor()); + if (customization.getConfig() != null) { + return topic.configs(customization.getConfig()); + } + return topic; + } + + public void checkTopic(String topicName) { + if (!existingTopics.containsKey(topicName)) { + throw new TopicNotFoundException("Topic not found: " + topicName + ". Please create it before send a message."); + // TODO: should refresh topics?? getTopics(); + } + } +} diff --git a/async/async-kafka/src/main/java/org/reactivecommons/async/kafka/converters/json/KafkaJacksonMessageConverter.java b/async/async-kafka/src/main/java/org/reactivecommons/async/kafka/converters/json/KafkaJacksonMessageConverter.java new file mode 100644 index 00000000..d1f79703 --- /dev/null +++ b/async/async-kafka/src/main/java/org/reactivecommons/async/kafka/converters/json/KafkaJacksonMessageConverter.java @@ -0,0 +1,61 @@ +package org.reactivecommons.async.kafka.converters.json; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.cloudevents.CloudEvent; +import org.reactivecommons.api.domain.DomainEvent; +import org.reactivecommons.async.commons.communications.Message; +import org.reactivecommons.async.commons.converters.json.JacksonMessageConverter; +import org.reactivecommons.async.commons.exceptions.MessageConversionException; +import org.reactivecommons.async.kafka.KafkaMessage; +import org.reactivecommons.async.kafka.KafkaMessage.KafkaMessageProperties; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; + +public class KafkaJacksonMessageConverter extends JacksonMessageConverter { + + public KafkaJacksonMessageConverter(ObjectMapper objectMapper) { + super(objectMapper); + } + + @Override + public Message toMessage(Object object) { + byte[] bytes; + try { + String jsonString = this.objectMapper.writeValueAsString(object); + bytes = jsonString.getBytes(StandardCharsets.UTF_8); + } catch (IOException e) { + throw new MessageConversionException(FAILED_TO_CONVERT_MESSAGE_CONTENT, e); + } + KafkaMessageProperties props = buildProperties(object); + return new KafkaMessage(bytes, props); + } + + private KafkaMessageProperties buildProperties(Object message) { + KafkaMessageProperties props = new KafkaMessageProperties(); + Map headers = new HashMap<>(); + props.setHeaders(headers); + + if (message instanceof CloudEvent) { + CloudEvent cloudEvent = (CloudEvent) message; + props.setKey(cloudEvent.getId()); + props.setTopic(cloudEvent.getType()); + + headers.put(CONTENT_TYPE, APPLICATION_CLOUD_EVENT_JSON); + return props; + } + + if (message instanceof DomainEvent) { + DomainEvent domainEvent = (DomainEvent) message; + props.setKey(domainEvent.getEventId()); + props.setTopic(domainEvent.getName()); + + headers.put(CONTENT_TYPE, APPLICATION_JSON); + return props; + } + // TODO: Add Command and AsyncQuery support + throw new IllegalArgumentException("Message type not supported: " + message.getClass().getName()); + } +} diff --git a/async/async-kafka/src/main/java/org/reactivecommons/async/kafka/listeners/ApplicationEventListener.java b/async/async-kafka/src/main/java/org/reactivecommons/async/kafka/listeners/ApplicationEventListener.java new file mode 100644 index 00000000..50a19375 --- /dev/null +++ b/async/async-kafka/src/main/java/org/reactivecommons/async/kafka/listeners/ApplicationEventListener.java @@ -0,0 +1,75 @@ +package org.reactivecommons.async.kafka.listeners; + +import lombok.extern.java.Log; +import org.reactivecommons.async.api.handlers.CloudEventHandler; +import org.reactivecommons.async.api.handlers.DomainEventHandler; +import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener; +import org.reactivecommons.async.commons.DiscardNotifier; +import org.reactivecommons.async.commons.EventExecutor; +import org.reactivecommons.async.commons.HandlerResolver; +import org.reactivecommons.async.commons.communications.Message; +import org.reactivecommons.async.commons.converters.MessageConverter; +import org.reactivecommons.async.commons.ext.CustomReporter; +import org.reactivecommons.async.kafka.communications.ReactiveMessageListener; +import reactor.core.publisher.Mono; +import reactor.kafka.receiver.ReceiverRecord; + +import java.util.function.Function; + +@Log +public class ApplicationEventListener extends GenericMessageListener { + + private final MessageConverter messageConverter; + private final HandlerResolver resolver; + + + public ApplicationEventListener(ReactiveMessageListener receiver, + HandlerResolver resolver, + MessageConverter messageConverter, + boolean withDLQRetry, + boolean createTopology, + long maxRetries, + int retryDelay, + DiscardNotifier discardNotifier, + CustomReporter errorReporter, + String appName) { + super(receiver, withDLQRetry, createTopology, maxRetries, retryDelay, discardNotifier, + "event", errorReporter, appName + "-events", resolver.getEventNames()); + this.resolver = resolver; + this.messageConverter = messageConverter; + } + + @Override + protected Function> rawMessageHandler(String executorPath) { + final RegisteredEventListener handler = resolver.getEventListener(executorPath); + + Function converter = resolveConverter(handler); + final EventExecutor executor = new EventExecutor<>(handler.getHandler(), converter); + + return msj -> executor + .execute(msj) + .cast(Object.class); + } + + protected String getExecutorPath(ReceiverRecord msj) { + return msj.topic(); + } + + @Override + protected Object parseMessageForReporter(Message msj) { + return messageConverter.readDomainEventStructure(msj); + } + + private Function resolveConverter(RegisteredEventListener registeredEventListener) { + if (registeredEventListener.getHandler() instanceof DomainEventHandler) { + final Class eventClass = registeredEventListener.getInputClass(); + return msj -> messageConverter.readDomainEvent(msj, eventClass); + } + if (registeredEventListener.getHandler() instanceof CloudEventHandler) { + return messageConverter::readCloudEvent; + } + throw new RuntimeException("Unknown handler type"); + } +} + + diff --git a/async/async-kafka/src/main/java/org/reactivecommons/async/kafka/listeners/ApplicationNotificationsListener.java b/async/async-kafka/src/main/java/org/reactivecommons/async/kafka/listeners/ApplicationNotificationsListener.java new file mode 100644 index 00000000..bf4305e5 --- /dev/null +++ b/async/async-kafka/src/main/java/org/reactivecommons/async/kafka/listeners/ApplicationNotificationsListener.java @@ -0,0 +1,78 @@ +package org.reactivecommons.async.kafka.listeners; + +import lombok.extern.java.Log; +import org.reactivecommons.async.api.handlers.CloudEventHandler; +import org.reactivecommons.async.api.handlers.DomainEventHandler; +import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener; +import org.reactivecommons.async.commons.DiscardNotifier; +import org.reactivecommons.async.commons.EventExecutor; +import org.reactivecommons.async.commons.HandlerResolver; +import org.reactivecommons.async.commons.communications.Message; +import org.reactivecommons.async.commons.converters.MessageConverter; +import org.reactivecommons.async.commons.ext.CustomReporter; +import org.reactivecommons.async.kafka.communications.ReactiveMessageListener; +import reactor.core.publisher.Mono; +import reactor.kafka.receiver.ReceiverRecord; + +import java.util.Optional; +import java.util.UUID; +import java.util.function.Function; + +@Log +public class ApplicationNotificationsListener extends GenericMessageListener { + + private final MessageConverter messageConverter; + private final HandlerResolver resolver; + + + public ApplicationNotificationsListener(ReactiveMessageListener receiver, + HandlerResolver resolver, + MessageConverter messageConverter, + boolean withDLQRetry, + boolean createTopology, + long maxRetries, + int retryDelay, + DiscardNotifier discardNotifier, + CustomReporter errorReporter, + String appName) { + super(receiver, withDLQRetry, createTopology, maxRetries, retryDelay, discardNotifier, + "event", errorReporter, appName + "-notification-" + UUID.randomUUID(), + resolver.getNotificationNames()); + this.resolver = resolver; + this.messageConverter = messageConverter; + } + + @Override + protected Function> rawMessageHandler(String executorPath) { + final RegisteredEventListener handler = resolver.getEventListener(executorPath); + + Function converter = resolveConverter(handler); + final EventExecutor executor = new EventExecutor<>(handler.getHandler(), converter); + + return msj -> executor + .execute(msj) + .cast(Object.class); + } + + protected String getExecutorPath(ReceiverRecord msj) { + return msj.topic(); + } + + @Override + protected Object parseMessageForReporter(Message msj) { + return messageConverter.readDomainEventStructure(msj); + } + + private Function resolveConverter(RegisteredEventListener registeredEventListener) { + if (registeredEventListener.getHandler() instanceof DomainEventHandler) { + final Class eventClass = registeredEventListener.getInputClass(); + return msj -> messageConverter.readDomainEvent(msj, eventClass); + } + if (registeredEventListener.getHandler() instanceof CloudEventHandler) { + return messageConverter::readCloudEvent; + } + throw new RuntimeException("Unknown handler type"); + } +} + + diff --git a/async/async-kafka/src/main/java/org/reactivecommons/async/kafka/listeners/GenericMessageListener.java b/async/async-kafka/src/main/java/org/reactivecommons/async/kafka/listeners/GenericMessageListener.java new file mode 100644 index 00000000..da47e237 --- /dev/null +++ b/async/async-kafka/src/main/java/org/reactivecommons/async/kafka/listeners/GenericMessageListener.java @@ -0,0 +1,233 @@ +package org.reactivecommons.async.kafka.listeners; + +import lombok.extern.java.Log; +import org.reactivecommons.async.commons.DiscardNotifier; +import org.reactivecommons.async.commons.FallbackStrategy; +import org.reactivecommons.async.commons.communications.Message; +import org.reactivecommons.async.commons.ext.CustomReporter; +import org.reactivecommons.async.commons.utils.LoggerSubscriber; +import org.reactivecommons.async.kafka.KafkaMessage; +import org.reactivecommons.async.kafka.communications.ReactiveMessageListener; +import org.reactivecommons.async.kafka.communications.topology.TopologyCreator; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; +import reactor.kafka.receiver.ReceiverRecord; +import reactor.util.retry.Retry; + +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import java.util.logging.Level; + +import static java.lang.String.format; +import static java.util.function.Function.identity; + +@Log +public abstract class GenericMessageListener { + public static final int DEFAULT_RETRIES = 10; + private final ConcurrentHashMap>> handlers = new ConcurrentHashMap<>(); + private final ReactiveMessageListener messageListener; + private final Scheduler scheduler = Schedulers.newParallel(getClass().getSimpleName(), 12); + private final Scheduler errorReporterScheduler = Schedulers.newBoundedElastic(4, 256, + "errorReporterScheduler"); + + private final List topics; + private final String groupId; + private final boolean useDLQ; + private final boolean createTopology; + private final long maxRetries; + private final Duration retryDelay; + private final DiscardNotifier discardNotifier; + private final String objectType; + private final CustomReporter customReporter; + private volatile Flux> messageFlux; + + public GenericMessageListener(ReactiveMessageListener listener, boolean useDLQ, boolean createTopology, + long maxRetries, long retryDelay, DiscardNotifier discardNotifier, + String objectType, CustomReporter customReporter, String groupId, List topics) { + this.groupId = groupId; + this.topics = topics; + this.messageListener = listener; + this.createTopology = createTopology; + this.maxRetries = maxRetries; + this.retryDelay = Duration.ofMillis(retryDelay); + this.useDLQ = useDLQ; + this.discardNotifier = discardNotifier; + this.objectType = objectType; + this.customReporter = customReporter; + } + + private boolean hasRetries() { + return maxRetries != -1; + } + + protected Mono setUpBindings(TopologyCreator creator) { + return creator.createTopics(topics); + } + + public void startListener(TopologyCreator creator) { + log.log(Level.INFO, "Using max concurrency {0}, in receiver for topics: {1}", + + new Object[]{messageListener.getMaxConcurrency(), topics}); + if (useDLQ) { + log.log(Level.INFO, "ATTENTION! Using DLQ Strategy for retries with {0} + 1 Max Retries configured!", + new Object[]{maxRetries}); + } else { + log.log(Level.INFO, "ATTENTION! Using infinite fast retries as Retry Strategy"); + } + + if (createTopology) { + this.messageFlux = setUpBindings(creator) + .thenMany(this.messageListener.listen(groupId, topics) + .doOnError(err -> log.log(Level.SEVERE, "Error listening queue", err)) + .transform(this::consumeFaultTolerant)); + } else { + this.messageFlux = this.messageListener.listen(groupId, topics) + .doOnError(err -> log.log(Level.SEVERE, "Error listening queue", err)) + .transform(this::consumeFaultTolerant); + } + + onTerminate(); + } + + private void onTerminate() { + messageFlux.doOnTerminate(this::onTerminate) + .subscribe(new LoggerSubscriber<>(getClass().getName())); + } + + private Flux> consumeFaultTolerant(Flux> messageFlux) { + return messageFlux.flatMap(msj -> { + final Instant init = Instant.now(); + return handle(msj, init) + .doOnSuccess(record -> record.receiverOffset().acknowledge()) + .onErrorResume(err -> requeueOrAck(msj, err, init)); + }, messageListener.getMaxConcurrency()); + } + + protected Mono> handle(ReceiverRecord msj, Instant initTime) { + try { + final String executorPath = getExecutorPath(msj); + final Function> handler = getExecutor(executorPath); + final Message message = KafkaMessage.fromDelivery(msj); + + Mono flow = Mono.defer(() -> handler.apply(message)) + .transform(enrichPostProcess(message)); + if (hasRetries()) { + flow = flow.retryWhen(Retry.fixedDelay(maxRetries, retryDelay)) + .onErrorMap(err -> { + if (err.getMessage() != null && err.getMessage().contains("Retries exhausted")) { + log.warning(err.getMessage()); + return err.getCause(); + } + return err; + }); + } + return flow.doOnSuccess(o -> logExecution(executorPath, initTime, true)) + .subscribeOn(scheduler) + .thenReturn(msj); + } catch (Exception e) { + log.log(Level.SEVERE, format("ATTENTION !! Outer error protection reached for %s, in Async Consumer!! " + + "Severe Warning! ", msj.key())); + return Mono.error(e); + } + } + + private Mono> requeueOrAck(ReceiverRecord msj, Throwable err, + Instant init) { + final Message message = KafkaMessage.fromDelivery(msj); + reportErrorMetric(msj, init); + sendErrorToCustomReporter(err, message, hasRetries()); + if (hasRetries()) { // Discard + logError(err, msj, FallbackStrategy.DEFINITIVE_DISCARD); + if (useDLQ) { + return discardNotifier + .notifyDiscard(message) + .doOnSuccess(_a -> msj.receiverOffset().acknowledge()) + .thenReturn(msj); + } + return Mono.just(msj); + } else { // infinity fast retries + logError(err, msj, FallbackStrategy.FAST_RETRY); + return Mono.just(msj).delayElement(retryDelay); + } + } + + private void logExecution(String executorPath, Instant initTime, boolean success) { + try { + final Instant afterExecutionTime = Instant.now(); + final long timeElapsed = Duration.between(initTime, afterExecutionTime).toMillis(); + doLogExecution(executorPath, timeElapsed); + customReporter.reportMetric(objectType, executorPath, timeElapsed, success); + } catch (Exception e) { + log.log(Level.WARNING, "Unable to send execution metrics!", e); + } + + } + + private void reportErrorMetric(ReceiverRecord msj, Instant initTime) { + String executorPath; + try { + executorPath = getExecutorPath(msj); + } catch (Exception e) { + executorPath = "unknown"; + } + logExecution(executorPath, initTime, false); + } + + private void doLogExecution(String executorPath, long timeElapsed) { + log.log(Level.FINE, String.format("%s with path %s handled, took %d ms", + objectType, executorPath, timeElapsed)); + } + + + protected void logError(Throwable err, ReceiverRecord msj, FallbackStrategy strategy) { + String messageID = msj.key(); + try { + log.log(Level.SEVERE, format("Error encounter while processing message %s: %s", messageID, err.toString()), err); + log.warning(format("Message %s Headers: %s", messageID, msj.headers().toString())); + log.warning(format("Message %s Body: %s", messageID, new String(msj.value()))); + } catch (Exception e) { + log.log(Level.SEVERE, "Error Login message Content!!", e); + } finally { + log.warning(format(strategy.message, messageID)); + } + } + + private Function> getExecutor(String path) { + final Function> handler = handlers.get(path); + return handler != null ? handler : computeRawMessageHandler(path); + } + + private Function> computeRawMessageHandler(String commandId) { + return handlers.computeIfAbsent(commandId, s -> + rawMessageHandler(commandId) + ); + } + + protected abstract Function> rawMessageHandler(String executorPath); + + protected abstract String getExecutorPath(ReceiverRecord msj); + + protected Function, Mono> enrichPostProcess(Message msg) { + return identity(); + } + + private void sendErrorToCustomReporter(final Throwable err, final Message message, final boolean redelivered) { + try { + customReporter.reportError(err, message, parseMessageForReporter(message), redelivered) + .subscribeOn(errorReporterScheduler) + .doOnError(t -> log.log(Level.WARNING, "Error sending error to external reporter", t)) + .subscribe(); + } catch (Throwable t) { + log.log(Level.WARNING, "Error in scheduler when sending error to external reporter", t); + } + } + + protected abstract Object parseMessageForReporter(Message msj); +} + + diff --git a/async/async-kafka/src/test/java/org/reactivecommons/async/kafka/KafkaMessageTest.java b/async/async-kafka/src/test/java/org/reactivecommons/async/kafka/KafkaMessageTest.java new file mode 100644 index 00000000..4ac1dec5 --- /dev/null +++ b/async/async-kafka/src/test/java/org/reactivecommons/async/kafka/KafkaMessageTest.java @@ -0,0 +1,57 @@ +package org.reactivecommons.async.kafka; + +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.reactivecommons.async.commons.communications.Message; +import reactor.kafka.receiver.ReceiverRecord; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class KafkaMessageTest { + + @Mock + private ReceiverRecord record; + + @Test + void shouldParse() { + // Arrange + RecordHeaders headers = new RecordHeaders(); + headers.add("content-type", "application/json".getBytes()); + when(record.value()).thenReturn("value".getBytes()); + when(record.key()).thenReturn("key"); + when(record.topic()).thenReturn("topic"); + when(record.headers()).thenReturn(headers); + // Act + Message message = KafkaMessage.fromDelivery(record); + // Assert + assertEquals("key", message.getProperties().getKey()); + assertEquals("topic", message.getProperties().getTopic()); + assertEquals("application/json", message.getProperties().getContentType()); + assertEquals(5, message.getProperties().getContentLength()); + assertEquals("value", new String(message.getBody())); + } + + @Test + void shouldParseWhenNoContentType() { + // Arrange + RecordHeaders headers = new RecordHeaders(); + when(record.value()).thenReturn("value".getBytes()); + when(record.key()).thenReturn("key"); + when(record.topic()).thenReturn("topic"); + when(record.headers()).thenReturn(headers); + // Act + Message message = KafkaMessage.fromDelivery(record); + // Assert + assertEquals("key", message.getProperties().getKey()); + assertEquals("topic", message.getProperties().getTopic()); + assertNull(message.getProperties().getContentType()); + assertEquals(5, message.getProperties().getContentLength()); + assertEquals("value", new String(message.getBody())); + } +} diff --git a/async/async-kafka/src/test/java/org/reactivecommons/async/kafka/communications/topology/KafkaCustomizationsTest.java b/async/async-kafka/src/test/java/org/reactivecommons/async/kafka/communications/topology/KafkaCustomizationsTest.java new file mode 100644 index 00000000..6b5b035e --- /dev/null +++ b/async/async-kafka/src/test/java/org/reactivecommons/async/kafka/communications/topology/KafkaCustomizationsTest.java @@ -0,0 +1,54 @@ +package org.reactivecommons.async.kafka.communications.topology; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.reactivecommons.async.kafka.communications.topology.KafkaCustomizations; +import org.reactivecommons.async.kafka.communications.topology.TopicCustomization; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +public class KafkaCustomizationsTest { + + private KafkaCustomizations customizations; + + @BeforeEach + void setUp() { + customizations = new KafkaCustomizations(); + } + + @Test + void testWithTopic() { + String topic = "testTopic"; + Map config = new HashMap<>(); + config.put("cleanup.policy", "compact"); + TopicCustomization customization = new TopicCustomization(topic, 3, (short) 1, config); + KafkaCustomizations result = KafkaCustomizations.withTopic(topic, customization); + + assertNotNull(result); + assertEquals(1, result.getTopics().size()); + assertEquals(customization, result.getTopics().get(topic)); + } + + @Test + void testAddTopic() { + String topic1 = "testTopic1"; + Map config1 = new HashMap<>(); + config1.put("cleanup.policy", "compact"); + TopicCustomization customization1 = new TopicCustomization(topic1, 3, (short) 1, config1); + customizations.addTopic(topic1, customization1); + + String topic2 = "testTopic2"; + Map config2 = new HashMap<>(); + config2.put("retention.ms", "60000"); + TopicCustomization customization2 = new TopicCustomization(topic2, 5, (short) 2, config2); + customizations.addTopic(topic2, customization2); + + assertEquals(2, customizations.getTopics().size()); + assertEquals(customization1, customizations.getTopics().get(topic1)); + assertEquals(customization2, customizations.getTopics().get(topic2)); + } +} \ No newline at end of file diff --git a/async/async-kafka/src/test/java/org/reactivecommons/async/kafka/communications/topology/TopologyCreatorTest.java b/async/async-kafka/src/test/java/org/reactivecommons/async/kafka/communications/topology/TopologyCreatorTest.java new file mode 100644 index 00000000..87c15d24 --- /dev/null +++ b/async/async-kafka/src/test/java/org/reactivecommons/async/kafka/communications/topology/TopologyCreatorTest.java @@ -0,0 +1,96 @@ +package org.reactivecommons.async.kafka.communications.topology; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.CreateTopicsResult; +import org.apache.kafka.clients.admin.ListTopicsOptions; +import org.apache.kafka.clients.admin.ListTopicsResult; +import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.reactivecommons.async.kafka.communications.exceptions.TopicNotFoundException; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class TopologyCreatorTest { + + private TopologyCreator creator; + @Mock + private AdminClient adminClient; + @Mock + private ListTopicsResult listTopicsResult; + @Mock + private CreateTopicsResult createTopicsResult; + private KafkaCustomizations customizations; + + @BeforeEach + void setUp() { + Map config = new HashMap<>(); + config.put("cleanup.policy", "compact"); + TopicCustomization customization = new TopicCustomization("topic1", 3, (short) 1, config); + customizations = KafkaCustomizations.withTopic("topic1", customization); + } + + @Test + void shouldCreateTopics() { + // Arrange + KafkaFutureImpl> names = new KafkaFutureImpl<>(); + names.complete(Set.of("topic1", "topic2")); + doReturn(names).when(listTopicsResult).names(); + when(adminClient.listTopics(any(ListTopicsOptions.class))).thenReturn(listTopicsResult); + + KafkaFutureImpl create = new KafkaFutureImpl<>(); + create.complete(null); + doReturn(create).when(createTopicsResult).all(); + when(adminClient.createTopics(any())).thenReturn(createTopicsResult); + creator = new TopologyCreator(adminClient, customizations); + // Act + Mono flow = creator.createTopics(List.of("topic1", "topic2")); + // Assert + StepVerifier.create(flow) + .verifyComplete(); + } + + @Test + void shouldCheckTopics() { + // Arrange + KafkaFutureImpl> names = new KafkaFutureImpl<>(); + names.complete(Set.of("topic1", "topic2")); + doReturn(names).when(listTopicsResult).names(); + when(adminClient.listTopics(any(ListTopicsOptions.class))).thenReturn(listTopicsResult); + creator = new TopologyCreator(adminClient, customizations); + // Act + creator.checkTopic("topic1"); + // Assert + verify(listTopicsResult, times(1)).names(); + } + + @Test + void shouldFailWhenCheckTopics() { + // Arrange + KafkaFutureImpl> names = new KafkaFutureImpl<>(); + names.complete(Set.of("topic1", "topic2")); + doReturn(names).when(listTopicsResult).names(); + when(adminClient.listTopics(any(ListTopicsOptions.class))).thenReturn(listTopicsResult); + creator = new TopologyCreator(adminClient, customizations); + // Assert + assertThrows(TopicNotFoundException.class, () -> + // Act + creator.checkTopic("topic3")); + } +} diff --git a/async/async-kafka/src/test/java/org/reactivecommons/async/kafka/converters/json/KafkaJacksonMessageConverterTest.java b/async/async-kafka/src/test/java/org/reactivecommons/async/kafka/converters/json/KafkaJacksonMessageConverterTest.java new file mode 100644 index 00000000..cdec5482 --- /dev/null +++ b/async/async-kafka/src/test/java/org/reactivecommons/async/kafka/converters/json/KafkaJacksonMessageConverterTest.java @@ -0,0 +1,88 @@ +package org.reactivecommons.async.kafka.converters.json; + + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; +import io.cloudevents.jackson.JsonCloudEventData; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.reactivecommons.api.domain.DomainEvent; +import org.reactivecommons.async.commons.communications.Message; +import org.reactivecommons.async.commons.converters.json.DefaultObjectMapperSupplier; +import org.reactivecommons.async.commons.converters.json.ObjectMapperSupplier; + +import java.net.URI; +import java.time.OffsetDateTime; +import java.time.format.DateTimeFormatter; +import java.util.UUID; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class KafkaJacksonMessageConverterTest { + private static KafkaJacksonMessageConverter converter; + private static ObjectMapper objectMapper; + + @BeforeAll + static void setUp() { + ObjectMapperSupplier supplier = new DefaultObjectMapperSupplier(); + objectMapper = supplier.get(); + converter = new KafkaJacksonMessageConverter(objectMapper); + } + + @Test + void shouldSerializeDomainEvent() { + // Arrange + String id = UUID.randomUUID().toString(); + MyEvent event = new MyEvent("name", 1); + DomainEvent testEvent = new DomainEvent<>("test", id, event); + String expectedJson = "{\"name\":\"test\",\"eventId\":\"" + id + "\",\"data\":{\"name\":\"name\",\"age\":1}}"; + // Act + Message message = converter.toMessage(testEvent); + // Assert + assertEquals("test", message.getProperties().getTopic()); + assertEquals(id, message.getProperties().getKey()); + assertEquals("application/json", message.getProperties().getContentType()); + assertEquals(expectedJson, new String(message.getBody())); + } + + @Test + void shouldSerializeCloudEvent() throws JsonProcessingException { + // Arrange + String id = UUID.randomUUID().toString(); + MyEvent event = new MyEvent("name", 1); + OffsetDateTime dateTime = OffsetDateTime.now(); + CloudEvent testCloudEvent = CloudEventBuilder.v1() + .withId(id) + .withSource(URI.create("https://reactivecommons.org/events")) + .withType("test") + .withDataContentType("application/json") + .withTime(dateTime) + .withData(objectMapper.writeValueAsBytes(event)) + .build(); + + String expectedJson = "{\"specversion\":\"1.0\",\"id\":\"" + id + + "\",\"source\":\"https://reactivecommons.org/events\",\"type\":\"test\"," + + "\"datacontenttype\":\"application/json\",\"time\":\"" + dateTime + + "\",\"data\":{\"name\":\"name\",\"age\":1}}"; + // Act + Message message = converter.toMessage(testCloudEvent); + // Assert + assertEquals("test", message.getProperties().getTopic()); + assertEquals(id, message.getProperties().getKey()); + assertEquals("application/cloudevents+json", message.getProperties().getContentType()); + assertEquals(expectedJson, new String(message.getBody())); + } + + @Data + @AllArgsConstructor + @NoArgsConstructor + public static class MyEvent { + private String name; + private int age; + } +} diff --git a/async/async-kafka/src/test/java/org/reactivecommons/async/kafka/listeners/ApplicationEventListenerTest.java b/async/async-kafka/src/test/java/org/reactivecommons/async/kafka/listeners/ApplicationEventListenerTest.java new file mode 100644 index 00000000..e05ef076 --- /dev/null +++ b/async/async-kafka/src/test/java/org/reactivecommons/async/kafka/listeners/ApplicationEventListenerTest.java @@ -0,0 +1,99 @@ +package org.reactivecommons.async.kafka.listeners; + +import io.cloudevents.CloudEvent; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.reactivecommons.api.domain.DomainEvent; +import org.reactivecommons.async.api.handlers.CloudEventHandler; +import org.reactivecommons.async.api.handlers.DomainEventHandler; +import org.reactivecommons.async.api.handlers.EventHandler; +import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener; +import org.reactivecommons.async.commons.HandlerResolver; +import org.reactivecommons.async.commons.communications.Message; +import org.reactivecommons.async.commons.converters.MessageConverter; +import org.reactivecommons.async.kafka.communications.ReactiveMessageListener; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@SuppressWarnings({"rawtypes", "unchecked"}) +@ExtendWith(MockitoExtension.class) +class ApplicationEventListenerTest { + + @Mock + private ReactiveMessageListener receiver; + @Mock + private HandlerResolver resolver; + @Mock + private MessageConverter messageConverter; + @Mock + private Message message; + + private ApplicationEventListener applicationEventListener; + + @BeforeEach + void setup() { + applicationEventListener = new ApplicationEventListener( + receiver, + resolver, + messageConverter, + true, + true, + 3, + 1000, + null, + null, + "testApp" + ); + + } + + @Test + void shouldHandleRawMessageSuccessfully() { + DomainEvent event = new DomainEvent<>("sample", "id", "data"); + DomainEventHandler domainEventHandler = mock(DomainEventHandler.class); + when(domainEventHandler.handle(event)).thenReturn(Mono.just("Handled")); + RegisteredEventListener registeredEventListenerMock = mock(RegisteredEventListener.class); + when(registeredEventListenerMock.getHandler()).thenReturn(domainEventHandler); + when(registeredEventListenerMock.getInputClass()).thenReturn(Object.class); + when(resolver.getEventListener(anyString())).thenReturn(registeredEventListenerMock); + when(messageConverter.readDomainEvent(any(Message.class), any(Class.class))).thenReturn(event); + + Mono flow = applicationEventListener.rawMessageHandler("executorPath").apply(message); + + StepVerifier.create(flow) + .expectNext("Handled") + .verifyComplete(); + + verify(resolver, times(1)).getEventListener(anyString()); + verify(messageConverter, times(1)).readDomainEvent(any(Message.class), any(Class.class)); + } + + @Test + void shouldHandleRawMessageSuccessfullyWhenCloudEvent() { + CloudEvent event = mock(CloudEvent.class); + EventHandler domainEventHandler = mock(CloudEventHandler.class); + when(domainEventHandler.handle(event)).thenReturn(Mono.empty()); + RegisteredEventListener registeredEventListenerMock = mock(RegisteredEventListener.class); + when(registeredEventListenerMock.getHandler()).thenReturn(domainEventHandler); + when(resolver.getEventListener(anyString())).thenReturn(registeredEventListenerMock); + when(messageConverter.readCloudEvent(any(Message.class))).thenReturn(event); + + Mono flow = applicationEventListener.rawMessageHandler("executorPath").apply(message); + + StepVerifier.create(flow) + .verifyComplete(); + + verify(resolver, times(1)).getEventListener(anyString()); + verify(messageConverter, times(1)).readCloudEvent(any(Message.class)); + } +} \ No newline at end of file diff --git a/async/async-kafka/src/test/java/org/reactivecommons/async/kafka/listeners/GenericMessageListenerTest.java b/async/async-kafka/src/test/java/org/reactivecommons/async/kafka/listeners/GenericMessageListenerTest.java new file mode 100644 index 00000000..fb7b23b7 --- /dev/null +++ b/async/async-kafka/src/test/java/org/reactivecommons/async/kafka/listeners/GenericMessageListenerTest.java @@ -0,0 +1,125 @@ +package org.reactivecommons.async.kafka.listeners; + +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener; +import org.reactivecommons.async.commons.DiscardNotifier; +import org.reactivecommons.async.commons.communications.Message; +import org.reactivecommons.async.commons.ext.CustomReporter; +import org.reactivecommons.async.kafka.communications.ReactiveMessageListener; +import org.reactivecommons.async.kafka.communications.topology.TopologyCreator; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoSink; +import reactor.kafka.receiver.ReceiverOffset; +import reactor.kafka.receiver.ReceiverRecord; +import reactor.test.StepVerifier; + +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@SuppressWarnings({"rawtypes", "unchecked"}) +@ExtendWith(MockitoExtension.class) +class GenericMessageListenerTest { + + @Mock + private ReactiveMessageListener receiver; + @Mock + private Message message; + @Mock + private TopologyCreator topologyCreator; + + @Mock + private RegisteredEventListener handler; + + SampleListener setup(Function> handler) { + return new SampleListener( + receiver, + true, + true, + 1, + 1, + mock(DiscardNotifier.class), + "event", + mock(CustomReporter.class), + "appName", + List.of("topic"), + handler + ); + + } + + @Test + void shouldStartListener() { + // Arrange + ReceiverRecord record = mock(ReceiverRecord.class); + when(record.topic()).thenReturn("topic"); + when(record.value()).thenReturn("message".getBytes(StandardCharsets.UTF_8)); + Headers header = new RecordHeaders().add("contentType", "application/json".getBytes(StandardCharsets.UTF_8)); + when(record.headers()).thenReturn(header); + when(record.key()).thenReturn("key"); + ReceiverOffset receiverOffset = mock(ReceiverOffset.class); + when(record.receiverOffset()).thenReturn(receiverOffset); + + Flux> flux = Flux.just(record); + when(receiver.listen(anyString(), any(List.class))).thenReturn(flux); + when(receiver.getMaxConcurrency()).thenReturn(1); + when(topologyCreator.createTopics(any(List.class))).thenReturn(Mono.empty()); + + final AtomicReference> sink = new AtomicReference<>(); + Mono flow = Mono.create(sink::set); + SampleListener sampleListener = setup(message1 -> { + sink.get().success(""); + return Mono.empty(); + }); + // Act + sampleListener.startListener(topologyCreator); + StepVerifier.create(flow).expectNext("").verifyComplete(); + // Assert + verify(topologyCreator, times(1)).createTopics(any(List.class)); + verify(receiverOffset, atLeastOnce()).acknowledge(); + } + + + public static class SampleListener extends GenericMessageListener { + private final Function> handler; + + public SampleListener(ReactiveMessageListener listener, boolean useDLQ, boolean createTopology, long maxRetries, + long retryDelay, DiscardNotifier discardNotifier, String objectType, + CustomReporter customReporter, String groupId, List topics, + Function> handler) { + super(listener, useDLQ, createTopology, maxRetries, retryDelay, discardNotifier, objectType, customReporter, + groupId, topics); + this.handler = handler; + } + + @Override + protected Function> rawMessageHandler(String executorPath) { + return handler; + } + + @Override + protected String getExecutorPath(ReceiverRecord msj) { + return msj.topic(); + } + + @Override + protected Object parseMessageForReporter(Message msj) { + return null; + } + } +} diff --git a/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/RabbitMqConfig.java b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/RabbitMqConfig.java index 8efba6ce..1226625f 100644 --- a/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/RabbitMqConfig.java +++ b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/RabbitMqConfig.java @@ -23,6 +23,7 @@ import org.reactivecommons.async.commons.converters.json.DefaultObjectMapperSupplier; import org.reactivecommons.async.commons.converters.json.ObjectMapperSupplier; import org.reactivecommons.async.commons.ext.CustomReporter; +import org.reactivecommons.async.commons.DLQDiscardNotifier; import org.reactivecommons.async.rabbit.DynamicRegistryImp; import org.reactivecommons.async.rabbit.RabbitDomainEventBus; import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener; diff --git a/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/RabbitDirectAsyncGatewayTest.java b/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/RabbitDirectAsyncGatewayTest.java index be1346bb..d55addd5 100644 --- a/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/RabbitDirectAsyncGatewayTest.java +++ b/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/RabbitDirectAsyncGatewayTest.java @@ -19,7 +19,6 @@ import org.reactivecommons.async.commons.converters.json.DefaultObjectMapperSupplier; import org.reactivecommons.async.commons.reply.ReactiveReplyRouter; import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender; -import org.reactivecommons.async.commons.converters.json.JacksonMessageConverter; import org.reactivecommons.async.rabbit.converters.json.RabbitJacksonMessageConverter; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; diff --git a/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/converters/json/JacksonMessageConverterTest.java b/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/converters/json/JacksonMessageConverterTest.java index fb272ab4..0376ae96 100644 --- a/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/converters/json/JacksonMessageConverterTest.java +++ b/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/converters/json/JacksonMessageConverterTest.java @@ -1,6 +1,5 @@ package org.reactivecommons.async.rabbit.converters.json; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import io.cloudevents.CloudEvent; @@ -13,7 +12,6 @@ import org.reactivecommons.async.api.AsyncQuery; import org.reactivecommons.async.commons.communications.Message; import org.reactivecommons.async.commons.converters.json.DefaultObjectMapperSupplier; -import org.reactivecommons.async.commons.converters.json.JacksonMessageConverter; import java.io.IOException; import java.net.URI; @@ -65,7 +63,7 @@ void readValue() { } @Test - void readCloudEvent() throws JsonProcessingException { + void readCloudEvent() { Date date = new Date(); CloudEvent command = CloudEventBuilder.v1() // .withId(UUID.randomUUID().toString()) // diff --git a/gradle.properties b/gradle.properties index 7a1b628c..279b9609 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,3 @@ version=5.0.0-alpha -toPublish=domain-events-api,async-commons-api,async-commons,async-rabbit,async-commons-rabbit-standalone,async-commons-rabbit-starter +toPublish=domain-events-api,async-commons-api,async-commons,async-rabbit,async-commons-rabbit-standalone,async-commons-rabbit-starter,async-kafka,async-kafka-starter onlyUpdater=true \ No newline at end of file diff --git a/samples/async/async-kafka-sender-client/async-kafka-sender-client.gradle b/samples/async/async-kafka-sender-client/async-kafka-sender-client.gradle new file mode 100644 index 00000000..78766eea --- /dev/null +++ b/samples/async/async-kafka-sender-client/async-kafka-sender-client.gradle @@ -0,0 +1,9 @@ +apply plugin: 'org.springframework.boot' + +dependencies { + implementation project(':shared') + implementation project(':async-kafka-starter') + implementation 'org.springframework.boot:spring-boot-starter-webflux' + implementation 'org.springframework.boot:spring-boot-starter-actuator' + implementation 'io.micrometer:micrometer-registry-prometheus' +} \ No newline at end of file diff --git a/samples/async/async-kafka-sender-client/src/main/java/sample/EDASampleSenderApp.java b/samples/async/async-kafka-sender-client/src/main/java/sample/EDASampleSenderApp.java new file mode 100644 index 00000000..f2c87a97 --- /dev/null +++ b/samples/async/async-kafka-sender-client/src/main/java/sample/EDASampleSenderApp.java @@ -0,0 +1,17 @@ +package sample; + +import lombok.extern.java.Log; +import org.reactivecommons.async.kafka.annotations.EnableDomainEventBus; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@Log +//@EnableDirectAsyncGateway +@EnableDomainEventBus +@SpringBootApplication +public class EDASampleSenderApp { + + public static void main(String[] args) { + SpringApplication.run(EDASampleSenderApp.class, args); + } +} diff --git a/samples/async/async-kafka-sender-client/src/main/java/sample/KafkaConfig.java b/samples/async/async-kafka-sender-client/src/main/java/sample/KafkaConfig.java new file mode 100644 index 00000000..525236ea --- /dev/null +++ b/samples/async/async-kafka-sender-client/src/main/java/sample/KafkaConfig.java @@ -0,0 +1,26 @@ +package sample; + +import org.reactivecommons.async.kafka.config.RCKafkaConfig; +import org.reactivecommons.async.kafka.config.props.RCAsyncPropsKafka; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; + +import java.io.IOException; +import java.nio.file.Path; + +@Configuration +public class KafkaConfig { + + @Bean + @Primary + public RCAsyncPropsKafka kafkaProps() throws IOException { + RCAsyncPropsKafka kafkaProps = new RCAsyncPropsKafka(); + kafkaProps.setCreateTopology(true); + kafkaProps.setMaxRetries(5); + kafkaProps.setRetryDelay(1000); + kafkaProps.setWithDLQRetry(true); + kafkaProps.setKafkaProps(RCKafkaConfig.readPropsFromDotEnv(Path.of(".kafka-env"))); + return kafkaProps; + } +} diff --git a/samples/async/async-kafka-sender-client/src/main/java/sample/ListenerConfig.java b/samples/async/async-kafka-sender-client/src/main/java/sample/ListenerConfig.java new file mode 100644 index 00000000..ababe387 --- /dev/null +++ b/samples/async/async-kafka-sender-client/src/main/java/sample/ListenerConfig.java @@ -0,0 +1,36 @@ +package sample; + +import io.cloudevents.CloudEvent; +import lombok.extern.log4j.Log4j2; +import org.reactivecommons.api.domain.DomainEvent; +import org.reactivecommons.async.api.HandlerRegistry; +import org.reactivecommons.async.kafka.annotations.EnableEventListeners; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; +import reactor.core.publisher.Mono; +import sample.model.Constants; + +@Log4j2 +@Configuration +@EnableEventListeners +public class ListenerConfig { + + @Bean + @Primary + public HandlerRegistry handlerRegistrySubs() { + return HandlerRegistry.register() + .listenEvent(Constants.DATA_RESET, this::reset, String.class) + .listenCloudEvent("event-name", this::reset2); + } + + private Mono reset2(CloudEvent cloudEvent) { + log.info("reset2: " + cloudEvent); + return Mono.empty(); + } + + public Mono reset(DomainEvent ignored) { + log.info("reset: {}", ignored); + return Mono.empty(); + } +} diff --git a/samples/async/async-kafka-sender-client/src/main/java/sample/SampleRestController.java b/samples/async/async-kafka-sender-client/src/main/java/sample/SampleRestController.java new file mode 100644 index 00000000..98cf4009 --- /dev/null +++ b/samples/async/async-kafka-sender-client/src/main/java/sample/SampleRestController.java @@ -0,0 +1,127 @@ +package sample; + +import com.fasterxml.jackson.core.JsonProcessingException; +import io.cloudevents.CloudEvent; +import io.cloudevents.CloudEventData; +import io.cloudevents.core.builder.CloudEventBuilder; +import io.cloudevents.jackson.JsonCloudEventData; +import lombok.Data; +import lombok.RequiredArgsConstructor; +import org.reactivecommons.api.domain.DomainEvent; +import org.reactivecommons.api.domain.DomainEventBus; +import org.reactivecommons.async.commons.converters.json.CloudEventBuilderExt; +import org.reactivecommons.async.commons.converters.json.ObjectMapperSupplier; +import org.springframework.web.bind.annotation.DeleteMapping; +import org.springframework.web.bind.annotation.RestController; +import reactor.core.publisher.Mono; +import sample.model.Constants; + +import java.net.URI; +import java.time.OffsetDateTime; +import java.util.UUID; + +import static org.springframework.http.MediaType.APPLICATION_JSON_VALUE; + + +@RestController +@RequiredArgsConstructor +public class SampleRestController { + // private final DirectAsyncGateway directAsyncGateway; + private final DomainEventBus domainEventBus; + private final ObjectMapperSupplier supplier; + private final String target = "receiver-eda"; + + // Notification Event + @DeleteMapping(path = "/api/teams/fail", produces = APPLICATION_JSON_VALUE) + public Mono> resetData() { + DomainEvent event = new DomainEvent<>(Constants.DATA_RESET, UUID.randomUUID().toString(), ""); + return Mono.from(domainEventBus.emit(event)).thenReturn(event); + } + + // Notification Event + @DeleteMapping(path = "/api/teams/ok", produces = APPLICATION_JSON_VALUE) + public Mono> event() { + DomainEvent event = new DomainEvent<>("event-name", UUID.randomUUID().toString(), ""); + return Mono.from(domainEventBus.emit(event)).thenReturn(event); + } + + // Notification Event + @DeleteMapping(path = "/api/teams/cloud/event", produces = APPLICATION_JSON_VALUE) + public Mono cloudEvent() throws JsonProcessingException { + SampleEvent eventData = new SampleEvent(); + eventData.setName("Juan"); + eventData.setDescription("A software developer"); + + CloudEvent event = CloudEventBuilder.v1() + .withId(UUID.randomUUID().toString()) + .withSource(URI.create("https://reactive-commons.org/foos")) + .withType("event-name") + .withTime(OffsetDateTime.now()) + .withData("application/json", JsonCloudEventData.wrap(supplier.get().valueToTree(eventData))) + .build(); + return Mono.from(domainEventBus.emit(event)).thenReturn(event); + } +// +// // Query +// @GetMapping(path = "/api/teams", produces = APPLICATION_JSON_VALUE) +// public Mono getTeams() { +// CloudEvent query = CloudEventBuilder.v1() +// .withId(UUID.randomUUID().toString()) +// .withSource(URI.create("https://reactive-commons.org/foos")) +// .withType(Constants.GET_TEAMS) +// .withTime(OffsetDateTime.now()) +// .withData("application/json", CloudEventBuilderExt.asBytes("")) +// .build(); +// return directAsyncGateway.requestReply(query, target, CloudEvent.class) +// .map(event -> CloudEventBuilderExt.fromCloudEventData(event, Teams.class)); +// } +// +// // Query +// @GetMapping(path = "/api/teams/{team}", produces = APPLICATION_JSON_VALUE) +// public Mono getTeamMembers(@PathVariable("team") String team) { +// CloudEvent query = CloudEventBuilder.v1() +// .withId(UUID.randomUUID().toString()) +// .withSource(URI.create("https://reactive-commons.org/foos")) +// .withType(Constants.GET_TEAM_MEMBERS) +// .withTime(OffsetDateTime.now()) +// .withData("application/json", CloudEventBuilderExt.asBytes(team)) +// .build(); +// return directAsyncGateway.requestReply(query, target, CloudEvent.class) +// .map(event -> CloudEventBuilderExt.fromCloudEventData(event, Members.class)); +// } +// +// // Command +// @PostMapping(path = "/api/teams/{team}/members", consumes = APPLICATION_JSON_VALUE, produces = APPLICATION_JSON_VALUE) +// public Mono addMember(@PathVariable("team") String team, @RequestBody Member member) { +// AddMemberCommand commandData = AddMemberCommand.builder().member(member).teamName(team).build(); +// CloudEvent command = CloudEventBuilder.v1() +// .withId(UUID.randomUUID().toString()) +// .withSource(URI.create("https://reactive-commons.org/foos")) +// .withType(Constants.ADD_MEMBER) +// .withTime(OffsetDateTime.now()) +// .withData("application/json", CloudEventBuilderExt.asBytes(commandData)) +// .build(); +// return directAsyncGateway.sendCommand(command, target).thenReturn(commandData); +// } +// +// // Event +// @DeleteMapping(path = "/api/teams/{team}/members/{member}", produces = APPLICATION_JSON_VALUE) +// public Mono removeMember(@PathVariable("team") String team, +// @PathVariable("member") String member) { +// RemovedMemberEvent eventData = RemovedMemberEvent.builder().teamName(team).username(member).build(); +// CloudEvent event = CloudEventBuilder.v1() +// .withId(UUID.randomUUID().toString()) +// .withSource(URI.create("https://reactive-commons.org/foos")) +// .withType(Constants.MEMBER_REMOVED) +// .withTime(OffsetDateTime.now()) +// .withData("application/json", CloudEventBuilderExt.asBytes(eventData)) +// .build(); +// return Mono.from(domainEventBus.emit(event)).thenReturn(eventData); +// } + + @Data + public static class SampleEvent { + private String name; + private String description; + } +} diff --git a/samples/async/async-kafka-sender-client/src/main/resources/application.yaml b/samples/async/async-kafka-sender-client/src/main/resources/application.yaml new file mode 100644 index 00000000..323bc046 --- /dev/null +++ b/samples/async/async-kafka-sender-client/src/main/resources/application.yaml @@ -0,0 +1,17 @@ +spring: + application: + name: sender-eda + rabbitmq: + virtual-host: / +server: + port: 4001 +management: + endpoint: + health: + show-details: always + endpoints: + web: + exposure: + include: health,prometheus + + diff --git a/samples/async/eda-async-sender-client-domain-a/src/main/java/sample/SampleRestController.java b/samples/async/eda-async-sender-client-domain-a/src/main/java/sample/SampleRestController.java index 3e5f951f..e42ff4eb 100644 --- a/samples/async/eda-async-sender-client-domain-a/src/main/java/sample/SampleRestController.java +++ b/samples/async/eda-async-sender-client-domain-a/src/main/java/sample/SampleRestController.java @@ -3,7 +3,6 @@ import io.cloudevents.CloudEvent; import io.cloudevents.core.builder.CloudEventBuilder; import lombok.RequiredArgsConstructor; -import org.reactivecommons.api.domain.DomainEvent; import org.reactivecommons.api.domain.DomainEventBus; import org.reactivecommons.async.api.DirectAsyncGateway; import org.reactivecommons.async.commons.converters.json.CloudEventBuilderExt; diff --git a/starters/async-kafka-starter/async-kafka-starter.gradle b/starters/async-kafka-starter/async-kafka-starter.gradle new file mode 100644 index 00000000..e5edebab --- /dev/null +++ b/starters/async-kafka-starter/async-kafka-starter.gradle @@ -0,0 +1,16 @@ +ext { + artifactId = 'async-commons-kafka-starter' + artifactDescription = 'Async Commons Starter' +} + +dependencies { + api project(':async-kafka') + compileOnly 'org.springframework.boot:spring-boot-starter' + compileOnly 'org.springframework.boot:spring-boot-starter-actuator' + implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310' + + annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor' + + testImplementation 'io.projectreactor:reactor-test' + testImplementation 'org.springframework.boot:spring-boot-starter-actuator' +} \ No newline at end of file diff --git a/starters/async-kafka-starter/src/main/java/org/reactivecommons/async/kafka/annotations/EnableDomainEventBus.java b/starters/async-kafka-starter/src/main/java/org/reactivecommons/async/kafka/annotations/EnableDomainEventBus.java new file mode 100644 index 00000000..f95879f0 --- /dev/null +++ b/starters/async-kafka-starter/src/main/java/org/reactivecommons/async/kafka/annotations/EnableDomainEventBus.java @@ -0,0 +1,23 @@ +package org.reactivecommons.async.kafka.annotations; + +import org.reactivecommons.async.kafka.config.RCKafkaConfig; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + + +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE}) +@Documented +@Import(RCKafkaConfig.class) +@Configuration +public @interface EnableDomainEventBus { +} + + + diff --git a/starters/async-kafka-starter/src/main/java/org/reactivecommons/async/kafka/annotations/EnableEventListeners.java b/starters/async-kafka-starter/src/main/java/org/reactivecommons/async/kafka/annotations/EnableEventListeners.java new file mode 100644 index 00000000..b70cee39 --- /dev/null +++ b/starters/async-kafka-starter/src/main/java/org/reactivecommons/async/kafka/annotations/EnableEventListeners.java @@ -0,0 +1,23 @@ +package org.reactivecommons.async.kafka.annotations; + +import org.reactivecommons.async.kafka.config.RCKafkaConfig; +import org.reactivecommons.async.kafka.config.RCKafkaEventListenerConfig; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE}) +@Documented +@Import({RCKafkaEventListenerConfig.class, RCKafkaConfig.class}) +@Configuration +public @interface EnableEventListeners { +} + + + diff --git a/starters/async-kafka-starter/src/main/java/org/reactivecommons/async/kafka/annotations/EnableNotificationListener.java b/starters/async-kafka-starter/src/main/java/org/reactivecommons/async/kafka/annotations/EnableNotificationListener.java new file mode 100644 index 00000000..a96a0f26 --- /dev/null +++ b/starters/async-kafka-starter/src/main/java/org/reactivecommons/async/kafka/annotations/EnableNotificationListener.java @@ -0,0 +1,24 @@ +package org.reactivecommons.async.kafka.annotations; + +import org.reactivecommons.async.kafka.config.RCKafkaConfig; +import org.reactivecommons.async.kafka.config.RCKafkaNotificationEventListenerConfig; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + + +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE}) +@Documented +@Import({RCKafkaNotificationEventListenerConfig.class, RCKafkaConfig.class}) +@Configuration +public @interface EnableNotificationListener { +} + + + diff --git a/starters/async-kafka-starter/src/main/java/org/reactivecommons/async/kafka/config/RCKafkaConfig.java b/starters/async-kafka-starter/src/main/java/org/reactivecommons/async/kafka/config/RCKafkaConfig.java new file mode 100644 index 00000000..4ecfd4ad --- /dev/null +++ b/starters/async-kafka-starter/src/main/java/org/reactivecommons/async/kafka/config/RCKafkaConfig.java @@ -0,0 +1,146 @@ +package org.reactivecommons.async.kafka.config; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.reactivecommons.api.domain.DomainEventBus; +import org.reactivecommons.async.commons.DLQDiscardNotifier; +import org.reactivecommons.async.commons.DiscardNotifier; +import org.reactivecommons.async.commons.converters.MessageConverter; +import org.reactivecommons.async.commons.converters.json.DefaultObjectMapperSupplier; +import org.reactivecommons.async.commons.converters.json.ObjectMapperSupplier; +import org.reactivecommons.async.commons.ext.CustomReporter; +import org.reactivecommons.async.commons.ext.DefaultCustomReporter; +import org.reactivecommons.async.kafka.KafkaDomainEventBus; +import org.reactivecommons.async.kafka.communications.ReactiveMessageListener; +import org.reactivecommons.async.kafka.communications.ReactiveMessageSender; +import org.reactivecommons.async.kafka.communications.topology.KafkaCustomizations; +import org.reactivecommons.async.kafka.communications.topology.TopologyCreator; +import org.reactivecommons.async.kafka.config.props.RCAsyncPropsKafka; +import org.reactivecommons.async.kafka.config.props.RCKafkaProps; +import org.reactivecommons.async.kafka.converters.json.KafkaJacksonMessageConverter; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import reactor.kafka.receiver.ReceiverOptions; +import reactor.kafka.sender.KafkaSender; +import reactor.kafka.sender.SenderOptions; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; + +@Configuration +@EnableConfigurationProperties({RCAsyncPropsKafka.class}) +public class RCKafkaConfig { + // Sender + @Bean + @ConditionalOnMissingBean(DomainEventBus.class) + public DomainEventBus kafkaDomainEventBus(ReactiveMessageSender sender) { + return new KafkaDomainEventBus(sender); + } + + @Bean + @ConditionalOnMissingBean(ReactiveMessageSender.class) + public ReactiveMessageSender kafkaReactiveMessageSender(KafkaSender kafkaSender, + MessageConverter converter, TopologyCreator topologyCreator) { + return new ReactiveMessageSender(kafkaSender, converter, topologyCreator); + } + + @Bean + @ConditionalOnMissingBean(KafkaSender.class) + public KafkaSender kafkaSender(RCAsyncPropsKafka config, @Value("${spring.application.name}") String clientId) { + RCKafkaProps props = config.getKafkaProps(); + props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); + props.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + SenderOptions senderOptions = SenderOptions.create(props); + return KafkaSender.create(senderOptions); + } + + // Receiver + + @Bean + @ConditionalOnMissingBean(ReactiveMessageListener.class) + public ReactiveMessageListener kafkaReactiveMessageListener(ReceiverOptions receiverOptions) { + return new ReactiveMessageListener(receiverOptions); + } + + @Bean + @ConditionalOnMissingBean(ReceiverOptions.class) + public ReceiverOptions kafkaReceiverOptions(RCAsyncPropsKafka config) { + RCKafkaProps props = config.getKafkaProps(); + props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + return ReceiverOptions.create(props); + } + + // Shared + + @Bean + @ConditionalOnMissingBean(TopologyCreator.class) + public TopologyCreator kafkaTopologyCreator(RCAsyncPropsKafka config, KafkaCustomizations customizations) { + AdminClient adminClient = AdminClient.create(config.getKafkaProps()); + return new TopologyCreator(adminClient, customizations); + } + + @Bean + @ConditionalOnMissingBean(KafkaCustomizations.class) + public KafkaCustomizations defaultKafkaCustomizations() { + return new KafkaCustomizations(); + } + + @Bean + @ConditionalOnMissingBean(MessageConverter.class) + public MessageConverter kafkaJacksonMessageConverter(ObjectMapperSupplier objectMapperSupplier) { + return new KafkaJacksonMessageConverter(objectMapperSupplier.get()); + } + + @Bean + @ConditionalOnMissingBean(DiscardNotifier.class) + public DiscardNotifier kafkaDiscardNotifier(DomainEventBus domainEventBus, MessageConverter messageConverter) { + return new DLQDiscardNotifier(domainEventBus, messageConverter); + } + + @Bean + @ConditionalOnMissingBean(ObjectMapperSupplier.class) + public ObjectMapperSupplier defaultObjectMapperSupplier() { + return new DefaultObjectMapperSupplier(); + } + + @Bean + @ConditionalOnMissingBean(CustomReporter.class) + public CustomReporter defaultKafkaCustomReporter() { + return new DefaultCustomReporter(); + } + + // Utilities + + public static RCKafkaProps readPropsFromDotEnv(Path path) throws IOException { + String env = Files.readString(path); + String[] split = env.split("\n"); + RCKafkaProps props = new RCKafkaProps(); + for (String s : split) { + if (s.startsWith("#")) { + continue; + } + String[] split1 = s.split("=", 2); + props.put(split1[0], split1[1]); + } + return props; + } + + public static String jassConfig(String username, String password) { + return String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", username, password); + } +} diff --git a/starters/async-kafka-starter/src/main/java/org/reactivecommons/async/kafka/config/RCKafkaEventListenerConfig.java b/starters/async-kafka-starter/src/main/java/org/reactivecommons/async/kafka/config/RCKafkaEventListenerConfig.java new file mode 100644 index 00000000..3e638095 --- /dev/null +++ b/starters/async-kafka-starter/src/main/java/org/reactivecommons/async/kafka/config/RCKafkaEventListenerConfig.java @@ -0,0 +1,60 @@ +package org.reactivecommons.async.kafka.config; + +import org.reactivecommons.async.api.DefaultCommandHandler; +import org.reactivecommons.async.api.HandlerRegistry; +import org.reactivecommons.async.commons.DiscardNotifier; +import org.reactivecommons.async.commons.HandlerResolver; +import org.reactivecommons.async.commons.converters.MessageConverter; +import org.reactivecommons.async.commons.ext.CustomReporter; +import org.reactivecommons.async.commons.utils.resolver.HandlerResolverUtil; +import org.reactivecommons.async.kafka.communications.ReactiveMessageListener; +import org.reactivecommons.async.kafka.communications.topology.TopologyCreator; +import org.reactivecommons.async.kafka.config.props.RCAsyncPropsKafka; +import org.reactivecommons.async.kafka.listeners.ApplicationEventListener; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import reactor.core.publisher.Mono; + +import java.util.Map; + +@Configuration +public class RCKafkaEventListenerConfig { + + @Bean + public ApplicationEventListener applicationEventListener(ReactiveMessageListener listener, + HandlerResolver resolver, + MessageConverter messageConverter, + TopologyCreator creator, + DiscardNotifier discardNotifier, + CustomReporter customReporter, + RCAsyncPropsKafka props, + @Value("${spring.application.name}") String appName) { + ApplicationEventListener eventListener = new ApplicationEventListener(listener, + resolver, + messageConverter, + props.getWithDLQRetry(), + props.getCreateTopology(), + props.getMaxRetries(), + props.getRetryDelay(), + discardNotifier, + customReporter, + appName); + + eventListener.startListener(creator); + + return eventListener; + } + + @Bean + public HandlerResolver resolver(ApplicationContext context, DefaultCommandHandler defaultCommandHandler) { + final Map registries = context.getBeansOfType(HandlerRegistry.class); + return HandlerResolverUtil.fromHandlerRegistries(registries.values(), defaultCommandHandler); + } + + @Bean + public DefaultCommandHandler defaultCommandHandler() { + return command -> Mono.empty(); + } +} diff --git a/starters/async-kafka-starter/src/main/java/org/reactivecommons/async/kafka/config/RCKafkaNotificationEventListenerConfig.java b/starters/async-kafka-starter/src/main/java/org/reactivecommons/async/kafka/config/RCKafkaNotificationEventListenerConfig.java new file mode 100644 index 00000000..d9532cef --- /dev/null +++ b/starters/async-kafka-starter/src/main/java/org/reactivecommons/async/kafka/config/RCKafkaNotificationEventListenerConfig.java @@ -0,0 +1,60 @@ +package org.reactivecommons.async.kafka.config; + +import org.reactivecommons.async.api.DefaultCommandHandler; +import org.reactivecommons.async.api.HandlerRegistry; +import org.reactivecommons.async.commons.DiscardNotifier; +import org.reactivecommons.async.commons.HandlerResolver; +import org.reactivecommons.async.commons.converters.MessageConverter; +import org.reactivecommons.async.commons.ext.CustomReporter; +import org.reactivecommons.async.commons.utils.resolver.HandlerResolverUtil; +import org.reactivecommons.async.kafka.communications.ReactiveMessageListener; +import org.reactivecommons.async.kafka.communications.topology.TopologyCreator; +import org.reactivecommons.async.kafka.config.props.RCAsyncPropsKafka; +import org.reactivecommons.async.kafka.listeners.ApplicationNotificationsListener; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import reactor.core.publisher.Mono; + +import java.util.Map; + +@Configuration +public class RCKafkaNotificationEventListenerConfig { + + @Bean + public ApplicationNotificationsListener applicationEventListener(ReactiveMessageListener listener, + HandlerResolver resolver, + MessageConverter messageConverter, + TopologyCreator creator, + DiscardNotifier discardNotifier, + CustomReporter customReporter, + RCAsyncPropsKafka props, + @Value("${spring.application.name}") String appName) { + ApplicationNotificationsListener eventListener = new ApplicationNotificationsListener(listener, + resolver, + messageConverter, + props.getWithDLQRetry(), + props.getCreateTopology(), + props.getMaxRetries(), + props.getRetryDelay(), + discardNotifier, + customReporter, + appName); + + eventListener.startListener(creator); + + return eventListener; + } + + @Bean + public HandlerResolver resolver(ApplicationContext context, DefaultCommandHandler defaultCommandHandler) { + final Map registries = context.getBeansOfType(HandlerRegistry.class); + return HandlerResolverUtil.fromHandlerRegistries(registries.values(), defaultCommandHandler); + } + + @Bean + public DefaultCommandHandler defaultCommandHandler() { + return command -> Mono.empty(); + } +} diff --git a/starters/async-kafka-starter/src/main/java/org/reactivecommons/async/kafka/config/props/RCAsyncPropsKafka.java b/starters/async-kafka-starter/src/main/java/org/reactivecommons/async/kafka/config/props/RCAsyncPropsKafka.java new file mode 100644 index 00000000..b09e7b46 --- /dev/null +++ b/starters/async-kafka-starter/src/main/java/org/reactivecommons/async/kafka/config/props/RCAsyncPropsKafka.java @@ -0,0 +1,28 @@ +package org.reactivecommons.async.kafka.config.props; + +import lombok.Getter; +import lombok.Setter; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.boot.context.properties.NestedConfigurationProperty; + + +@Getter +@Setter +@ConfigurationProperties(prefix = "reactive.commons.kafka") +public class RCAsyncPropsKafka { + + @NestedConfigurationProperty + private RCKafkaProps kafkaProps = new RCKafkaProps(); + + /** + * -1 will be considered default value. + * When withDLQRetry is true, it will be retried 10 times. + * When withDLQRetry is false, it will be retried indefinitely. + */ + private Integer maxRetries = -1; + + private Integer retryDelay = 1000; + + private Boolean withDLQRetry = false; + private Boolean createTopology = true; +} diff --git a/starters/async-kafka-starter/src/main/java/org/reactivecommons/async/kafka/config/props/RCKafkaProps.java b/starters/async-kafka-starter/src/main/java/org/reactivecommons/async/kafka/config/props/RCKafkaProps.java new file mode 100644 index 00000000..9e7e1cd3 --- /dev/null +++ b/starters/async-kafka-starter/src/main/java/org/reactivecommons/async/kafka/config/props/RCKafkaProps.java @@ -0,0 +1,6 @@ +package org.reactivecommons.async.kafka.config.props; + +import java.util.HashMap; + +public class RCKafkaProps extends HashMap { +}