Skip to content

Commit

Permalink
chore(next): Update starters and extract common reusable code to an a…
Browse files Browse the repository at this point in the history
…ditional module (#119)

* chore(next): Update starters and extract common reusable code to an additional module
  • Loading branch information
juancgalvis authored Aug 27, 2024
1 parent 7c202c5 commit 5e093a5
Show file tree
Hide file tree
Showing 46 changed files with 2,664 additions and 344 deletions.
1 change: 1 addition & 0 deletions async/async-commons/async-commons.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ dependencies {

compileOnly 'io.projectreactor:reactor-core'
api 'com.fasterxml.jackson.core:jackson-databind'
api 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
implementation 'commons-io:commons-io:2.16.1'
implementation 'io.cloudevents:cloudevents-json-jackson:4.0.1'

Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
package org.reactivecommons.async.rabbit.config;
package org.reactivecommons.async.commons;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.log4j.Log4j2;
import lombok.extern.java.Log;
import org.reactivecommons.async.api.DefaultCommandHandler;
import org.reactivecommons.async.api.HandlerRegistry;
import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler;
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler;
import org.reactivecommons.async.commons.HandlerResolver;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.logging.Level;
import java.util.stream.Stream;

import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_DOMAIN;

@NoArgsConstructor(access = AccessLevel.PRIVATE)
@Log4j2
@Log
public class HandlerResolverBuilder {

public static HandlerResolver buildResolver(String domain,
Expand Down Expand Up @@ -81,7 +81,7 @@ public <T, D> RegisteredCommandHandler<T, D> getCommandHandler(String path) {
if (r.getDomainEventListeners().containsKey(domain)) {
return Stream.concat(r.getDomainEventListeners().get(domain).stream(), getDynamics(domain, r));
}
log.warn("Domain " + domain + "does not have a connection defined in your configuration and you want to listen from it");
log.log(Level.WARNING, "Domain " + domain + "does not have a connection defined in your configuration and you want to listen from it");
return Stream.empty();
})
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
Expand All @@ -102,7 +102,7 @@ public <T, D> RegisteredCommandHandler<T, D> getCommandHandler(String path) {
if (r.getDomainEventListeners().containsKey(domain)) {
return r.getDomainEventListeners().get(domain).stream();
}
log.warn("Domain " + domain + "does not have a connection defined in your configuration and you want to listen from it");
log.log(Level.WARNING, "Domain " + domain + "does not have a connection defined in your configuration and you want to listen from it");
return Stream.empty();
})
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import io.cloudevents.jackson.JsonFormat;

public class DefaultObjectMapperSupplier implements ObjectMapperSupplier {
Expand All @@ -11,7 +12,8 @@ public ObjectMapper get() {
final ObjectMapper objectMapper = new ObjectMapper();
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
objectMapper.findAndRegisterModules();
objectMapper.registerModule(JsonFormat.getCloudEventJacksonModule()); // TODO: Review if this is necessary
objectMapper.registerModule(new JavaTimeModule());
objectMapper.registerModule(JsonFormat.getCloudEventJacksonModule());
return objectMapper;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,30 +1,33 @@
package org.reactivecommons.async.kafka.communications;

import lombok.AllArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import reactor.core.publisher.Flux;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverRecord;

import java.util.List;

import static org.apache.kafka.clients.consumer.ConsumerConfig.DEFAULT_MAX_POLL_RECORDS;
import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.MAX_POLL_RECORDS_CONFIG;


@AllArgsConstructor
public class ReactiveMessageListener {
private final ReceiverOptions<String, byte[]> receiverOptions;

public Flux<ReceiverRecord<String, byte[]>> listen(String groupId, List<String> topics) { // Notification events
ReceiverOptions<String, byte[]> options = receiverOptions.consumerProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
ReceiverOptions<String, byte[]> options = receiverOptions.consumerProperty(GROUP_ID_CONFIG, groupId);
return KafkaReceiver.create(options.subscription(topics))
.receive();
}

public int getMaxConcurrency() {
Object property = receiverOptions.consumerProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG);
Object property = receiverOptions.consumerProperty(MAX_POLL_RECORDS_CONFIG);
if (property instanceof Integer) {
return (int) property;
}
return ConsumerConfig.DEFAULT_MAX_POLL_RECORDS;
return DEFAULT_MAX_POLL_RECORDS;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,20 @@ public class TopologyCreator {
private final AdminClient adminClient;
private final KafkaCustomizations customizations;
private final Map<String, Boolean> existingTopics;
private final boolean checkTopics;

public TopologyCreator(AdminClient adminClient, KafkaCustomizations customizations) {
public TopologyCreator(AdminClient adminClient, KafkaCustomizations customizations, boolean checkTopics) {
this.adminClient = adminClient;
this.customizations = customizations;
this.checkTopics = checkTopics;
this.existingTopics = getTopics();
}

@SneakyThrows
public Map<String, Boolean> getTopics() {
if (!checkTopics) {
return Map.of();
}
ListTopicsResult topics = adminClient.listTopics(new ListTopicsOptions().timeoutMs(TIMEOUT_MS));
return topics.names().get().stream().collect(Collectors.toConcurrentMap(name -> name, name -> true));
}
Expand Down Expand Up @@ -68,7 +73,7 @@ protected NewTopic toNewTopic(TopicCustomization customization) {
}

public void checkTopic(String topicName) {
if (!existingTopics.containsKey(topicName)) {
if (checkTopics && !existingTopics.containsKey(topicName)) {
throw new TopicNotFoundException("Topic not found: " + topicName + ". Please create it before send a message.");
// TODO: should refresh topics?? getTopics();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public ApplicationNotificationsListener(ReactiveMessageListener receiver,

@Override
protected Function<Message, Mono<Object>> rawMessageHandler(String executorPath) {
final RegisteredEventListener<Object, Object> handler = resolver.getEventListener(executorPath);
final RegisteredEventListener<Object, Object> handler = resolver.getNotificationListener(executorPath);

Function<Message, Object> converter = resolveConverter(handler);
final EventExecutor<Object> executor = new EventExecutor<>(handler.getHandler(), converter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ void shouldCreateTopics() {
create.complete(null);
doReturn(create).when(createTopicsResult).all();
when(adminClient.createTopics(any())).thenReturn(createTopicsResult);
creator = new TopologyCreator(adminClient, customizations);
creator = new TopologyCreator(adminClient, customizations, true);
// Act
Mono<Void> flow = creator.createTopics(List.of("topic1", "topic2"));
// Assert
Expand All @@ -73,7 +73,7 @@ void shouldCheckTopics() {
names.complete(Set.of("topic1", "topic2"));
doReturn(names).when(listTopicsResult).names();
when(adminClient.listTopics(any(ListTopicsOptions.class))).thenReturn(listTopicsResult);
creator = new TopologyCreator(adminClient, customizations);
creator = new TopologyCreator(adminClient, customizations, true);
// Act
creator.checkTopic("topic1");
// Assert
Expand All @@ -87,7 +87,7 @@ void shouldFailWhenCheckTopics() {
names.complete(Set.of("topic1", "topic2"));
doReturn(names).when(listTopicsResult).names();
when(adminClient.listTopics(any(ListTopicsOptions.class))).thenReturn(listTopicsResult);
creator = new TopologyCreator(adminClient, customizations);
creator = new TopologyCreator(adminClient, customizations, true);
// Assert
assertThrows(TopicNotFoundException.class, () ->
// Act
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import com.rabbitmq.client.AMQP;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.provider.EventFormatProvider;
import io.cloudevents.jackson.JsonFormat;
import lombok.extern.java.Log;
import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler;
import org.reactivecommons.async.commons.DiscardNotifier;
Expand Down Expand Up @@ -34,7 +32,6 @@
import static org.reactivecommons.async.commons.Headers.SERVED_QUERY_ID;

@Log
//TODO: Organizar inferencia de tipos de la misma forma que en comandos y eventos
public class ApplicationQueryListener extends GenericMessageListener {
private final MessageConverter converter;
private final HandlerResolver handlerResolver;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.longThat;
import static org.mockito.Mockito.verify;
import static reactor.core.publisher.Mono.*;
import static reactor.core.publisher.Mono.error;

@ExtendWith(MockitoExtension.class)
public class ApplicationCommandListenerTest extends ListenerReporterTestSuperClass{
Expand All @@ -34,7 +34,7 @@ void shouldSendErrorMetricToCustomErrorReporter() throws InterruptedException {
final HandlerRegistry registry = HandlerRegistry.register()
.handleCommand("app.command.test", m -> error(new RuntimeException("testEx")), DummyMessage.class);
assertSendErrorToCustomReporter(registry, createSource(Command::getName, command));
verify(errorReporter).reportMetric(eq("command"), eq("app.command.test"), longThat(time -> time > 0 ), eq(false));
verify(errorReporter).reportMetric(eq("command"), eq("app.command.test"), longThat(time -> time >= 0 ), eq(false));
}

@Test
Expand Down
144 changes: 142 additions & 2 deletions docs/docs/reactive-commons/1-getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
sidebar_position: 1
---

# Getting Started

import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';

<Tabs>
<TabItem value="rabbitmq" label="RabbitMQ" default>
# Getting Started

This quick start tutorial sets up a single node RabbitMQ and runs the sample reactive sender and consumer using Reactive
Commons.
Expand Down Expand Up @@ -119,7 +120,146 @@ If you want to use it, you should read the [Creating a CloudEvent guide](11-crea

</TabItem>
<TabItem value="kafka" label="Kafka">
Comming soon...
This quick start tutorial sets up a single node Kafka and runs the sample reactive sender and consumer using Reactive
Commons.

## Requirements

You need Java JRE installed (Java 17 or later).

## Start Kafka

Start a Kafka broker on your local machine with all the defaults (e.g. port is 9092).

### Containerized

You can run it with Docker or Podman.

The following docker compose has a Kafka broker, a Zookeeper and a Kafka UI.

docker-compose.yml
```yaml
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.4.1
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:7.4.1
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
ports:
- "9092:9092"
depends_on:
- zookeeper
kafka-ui:
image: provectuslabs/kafka-ui:latest
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
ports:
- "8081:8080"
depends_on:
- kafka
```

```shell
docker-compose up
```

You may set in /etc/hosts (or equivalent) the following entry:

```txt
127.0.0.1 kafka
```

To enter the Kafka UI, open your browser and go to `http://localhost:8081`

## Spring Boot Application

The Spring Boot sample publishes and consumes messages with the `DomainEventBus`. This application illustrates how to
configure Reactive Commons using RabbitMQ in a Spring Boot environment.

To build your own application using the Reactive Commons API, you need to include a dependency to Reactive Commons.

### Current version

![Maven metadata URL](https://img.shields.io/maven-metadata/v?metadataUrl=https%3A%2F%2Frepo1.maven.org%2Fmaven2%2Forg%2Freactivecommons%2Fasync-commons-rabbit-starter%2Fmaven-metadata.xml)

### Dependency

```groovy
dependencies {
implementation "org.reactivecommons:async-kafka-starter:<version>"
}
```

### Configuration properties

Also you need to include the name for your app in the `application.properties`, it is important because this value will
be used
to name the application queues inside RabbitMQ:

```properties
spring.application.name=MyAppName
```

Or in your `application.yaml`

```yaml
spring:
application:
name: MyAppName
```

You can set the RabbitMQ connection properties through spring boot with
the [`spring.kafka.*` properties](https://docs.spring.io/spring-boot/docs/current/reference/html/application-properties.html)

```yaml
spring:
kafka:
bootstrap-servers: localhost:9092
```

You can also set it in runtime for example from a secret, so you can create the `KafkaProperties` bean like:

```java title="org.reactivecommons.async.rabbit.config.RabbitProperties"
@Configuration
public class MyKafkaConfig {
@Bean
@Primary
public KafkaProperties myRCKafkaProperties() {
KafkaProperties properties = new KafkaProperties();
properties.setBootstrapServers(List.of("localhost:9092"));
return properties;
}
}
```

### Multi Broker Instances of Kafka or Multi Domain support

Enables to you the ability to listen events from different domains.

### Cloud Events

Includes the Cloud Events specification.

If you want to use it, you should read the [Creating a CloudEvent guide](11-creating-a-cloud-event.md)

</TabItem>
</Tabs>

3 changes: 2 additions & 1 deletion docs/docs/reactive-commons/10-wildcards.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
sidebar_position: 10
---

# Wildcards

import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';

<Tabs>
<TabItem value="rabbitmq" label="RabbitMQ" default>
# Wildcards

You may need to listen variable event names that have the same structure, in that case you have the
method `handleDynamicEvents` in the `HandlerRegistry`, so you can specify a pattern with '*' wildcard, it does not
Expand Down
Loading

0 comments on commit 5e093a5

Please sign in to comment.