From a9fa2536e34378badfb9927538c5944728400ae2 Mon Sep 17 00:00:00 2001 From: Juan C Galvis <8420868+juancgalvis@users.noreply.github.com> Date: Tue, 27 Feb 2024 08:01:14 -0500 Subject: [PATCH] Fix health check, update name generator --- .../async/commons/utils/NameGenerator.java | 3 +- .../commons/utils/NameGeneratorTest.java | 6 +- .../rabbit/config/ConnectionManager.java | 8 ++- .../rabbit/config/RabbitHealthConfig.java | 6 +- .../async/rabbit/config/RabbitMqConfig.java | 2 +- .../DomainRabbitReactiveHealthIndicator.java | 55 +++++++++++++++++++ .../health/RabbitReactiveHealthIndicator.java | 35 ------------ .../async/rabbit/health/Status.java | 11 ++++ .../config/CommandListenersConfigTest.java | 2 +- .../config/EventListenersConfigTest.java | 2 +- .../NotificacionListenersConfigTest.java | 2 +- .../config/QueryListenerConfigTest.java | 2 +- ...ainRabbitReactiveHealthIndicatorTest.java} | 18 ++++-- .../config/props/BrokerConfigProps.java | 2 +- gradle.properties | 4 +- .../async-receiver-sample.gradle | 2 +- .../main/java/sample/SampleReceiverApp.java | 7 ++- .../java/sample/SampleRestController.java | 20 ++++--- 18 files changed, 118 insertions(+), 69 deletions(-) create mode 100644 async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/health/DomainRabbitReactiveHealthIndicator.java delete mode 100644 async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/health/RabbitReactiveHealthIndicator.java create mode 100644 async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/health/Status.java rename async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/health/{RabbitReactiveHealthIndicatorTest.java => DomainRabbitReactiveHealthIndicatorTest.java} (74%) diff --git a/async/async-commons/src/main/java/org/reactivecommons/async/commons/utils/NameGenerator.java b/async/async-commons/src/main/java/org/reactivecommons/async/commons/utils/NameGenerator.java index 3d723fc3..294478cb 100644 --- a/async/async-commons/src/main/java/org/reactivecommons/async/commons/utils/NameGenerator.java +++ b/async/async-commons/src/main/java/org/reactivecommons/async/commons/utils/NameGenerator.java @@ -20,7 +20,8 @@ private static String generateName(String applicationName, String suffix) { bb.putLong(uuid.getMostSignificantBits()) .putLong(uuid.getLeastSignificantBits()); // Convert to base64 and remove trailing = - return applicationName+"-"+ suffix + "-" + encodeToUrlSafeString(bb.array()) + String realSuffix = suffix != null && !"".equals(suffix) ? suffix + "." : ""; + return applicationName+"."+ realSuffix + encodeToUrlSafeString(bb.array()) .replace("=", ""); } diff --git a/async/async-commons/src/test/java/org/reactivecommons/async/commons/utils/NameGeneratorTest.java b/async/async-commons/src/test/java/org/reactivecommons/async/commons/utils/NameGeneratorTest.java index 36cf0fe0..47b373c5 100644 --- a/async/async-commons/src/test/java/org/reactivecommons/async/commons/utils/NameGeneratorTest.java +++ b/async/async-commons/src/test/java/org/reactivecommons/async/commons/utils/NameGeneratorTest.java @@ -10,15 +10,15 @@ class NameGeneratorTest { void generateNameFromWithoutSuffix() { String result = NameGenerator.generateNameFrom("application"); assertFalse(result.contains("=")); - assertTrue(result.startsWith("application--")); - assertEquals(35, result.length()); + assertTrue(result.startsWith("application.")); + assertEquals(34, result.length()); } @Test void generateNameFromWithSuffix() { String result = NameGenerator.generateNameFrom("application", "suffix"); assertFalse(result.contains("=")); - assertTrue(result.startsWith("application-suffix-")); + assertTrue(result.startsWith("application.suffix.")); assertEquals(41, result.length()); } } diff --git a/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/ConnectionManager.java b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/ConnectionManager.java index 086497f5..a2ac9760 100644 --- a/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/ConnectionManager.java +++ b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/ConnectionManager.java @@ -36,10 +36,12 @@ public void setDiscardNotifier(String domain, DiscardNotifier discardNotifier) { getChecked(domain).setDiscardNotifier(discardNotifier); } - public ConnectionManager addDomain(String domain, ReactiveMessageListener listener, ReactiveMessageSender sender) { + public ConnectionManager addDomain(String domain, ReactiveMessageListener listener, ReactiveMessageSender sender, + ConnectionFactoryProvider provider) { connections.put(domain, DomainConnections.builder() .listener(listener) .sender(sender) + .provider(provider) .build()); return this; } @@ -65,7 +67,7 @@ public DiscardNotifier getDiscardNotifier(String domain) { return getChecked(domain).getDiscardNotifier(); } - public ConnectionFactoryProvider getProvider(String domain) { - return getChecked(domain).getProvider(); + public Map getProviders() { + return connections; } } diff --git a/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/RabbitHealthConfig.java b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/RabbitHealthConfig.java index b1c6f5aa..f89906ef 100644 --- a/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/RabbitHealthConfig.java +++ b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/RabbitHealthConfig.java @@ -1,6 +1,6 @@ package org.reactivecommons.async.rabbit.config; -import org.reactivecommons.async.rabbit.health.RabbitReactiveHealthIndicator; +import org.reactivecommons.async.rabbit.health.DomainRabbitReactiveHealthIndicator; import org.springframework.boot.actuate.health.AbstractReactiveHealthIndicator; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.context.annotation.Bean; @@ -13,7 +13,7 @@ public class RabbitHealthConfig { @Bean - public RabbitReactiveHealthIndicator rabbitHealthIndicator(ConnectionManager manager) { - return new RabbitReactiveHealthIndicator(manager.getProvider(DEFAULT_DOMAIN)); // TODO: Check every domain connection + public DomainRabbitReactiveHealthIndicator rabbitHealthIndicator(ConnectionManager manager) { + return new DomainRabbitReactiveHealthIndicator(manager); } } diff --git a/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/RabbitMqConfig.java b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/RabbitMqConfig.java index 4bef0b22..f5acbe70 100644 --- a/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/RabbitMqConfig.java +++ b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/RabbitMqConfig.java @@ -68,7 +68,7 @@ public ConnectionManager buildConnectionManager(@Value("${spring.application.nam ConnectionFactoryProvider provider = createConnectionFactoryProvider(properties); ReactiveMessageSender sender = createMessageSender(appName, provider, properties, converter); ReactiveMessageListener listener = createMessageListener(appName, provider, props); - connectionManager.addDomain(domain, listener, sender); + connectionManager.addDomain(domain, listener, sender, provider); }); return connectionManager; } diff --git a/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/health/DomainRabbitReactiveHealthIndicator.java b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/health/DomainRabbitReactiveHealthIndicator.java new file mode 100644 index 00000000..2685ae97 --- /dev/null +++ b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/health/DomainRabbitReactiveHealthIndicator.java @@ -0,0 +1,55 @@ +package org.reactivecommons.async.rabbit.health; + +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import org.reactivecommons.async.rabbit.config.ConnectionFactoryProvider; +import org.reactivecommons.async.rabbit.config.ConnectionManager; +import org.springframework.boot.actuate.health.AbstractReactiveHealthIndicator; +import org.springframework.boot.actuate.health.Health; +import reactor.core.publisher.Mono; + +import java.util.stream.Collectors; + +@RequiredArgsConstructor +public class DomainRabbitReactiveHealthIndicator extends AbstractReactiveHealthIndicator { + private static final String VERSION = "version"; + private final ConnectionManager manager; + + @Override + protected Mono doHealthCheck(Health.Builder builder) { + return Mono.zip(manager.getProviders() + .entrySet() + .stream() + .map(entry -> checkSingle(entry.getKey(), entry.getValue().getProvider())) + .collect(Collectors.toList()), this::merge); + } + + private Health merge(Object[] results) { + Health.Builder builder = Health.up(); + for (Object obj : results) { + Status status = (Status) obj; + builder.withDetail(status.getDomain(), status.getVersion()); + } + return builder.build(); + } + + private Mono checkSingle(String domain, ConnectionFactoryProvider provider) { + return Mono.defer(() -> getVersion(provider)) + .map(version -> Status.builder().version(version).domain(domain).build()); + } + + private Mono getVersion(ConnectionFactoryProvider provider) { + return Mono.just(provider) + .map(ConnectionFactoryProvider::getConnectionFactory) + .map(this::getRawVersion); + } + + @SneakyThrows + private String getRawVersion(ConnectionFactory factory) { + try (Connection connection = factory.newConnection()) { + return connection.getServerProperties().get(VERSION).toString(); + } + } +} diff --git a/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/health/RabbitReactiveHealthIndicator.java b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/health/RabbitReactiveHealthIndicator.java deleted file mode 100644 index ddb38a8a..00000000 --- a/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/health/RabbitReactiveHealthIndicator.java +++ /dev/null @@ -1,35 +0,0 @@ -package org.reactivecommons.async.rabbit.health; - -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; -import lombok.RequiredArgsConstructor; -import lombok.SneakyThrows; -import org.reactivecommons.async.rabbit.config.ConnectionFactoryProvider; -import org.springframework.boot.actuate.health.AbstractReactiveHealthIndicator; -import org.springframework.boot.actuate.health.Health; -import reactor.core.publisher.Mono; - -@RequiredArgsConstructor -public class RabbitReactiveHealthIndicator extends AbstractReactiveHealthIndicator { - private static final String VERSION = "version"; - private final ConnectionFactoryProvider provider; - - @Override - protected Mono doHealthCheck(Health.Builder builder) { - return Mono.defer(this::getVersion) - .map(version -> builder.up().withDetail(VERSION, version).build()); - } - - private Mono getVersion() { - return Mono.just(provider) - .map(ConnectionFactoryProvider::getConnectionFactory) - .map(this::getRawVersion); - } - - @SneakyThrows - private String getRawVersion(ConnectionFactory factory) { - try (Connection connection = factory.newConnection()) { - return connection.getServerProperties().get(VERSION).toString(); - } - } -} diff --git a/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/health/Status.java b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/health/Status.java new file mode 100644 index 00000000..b3be58a2 --- /dev/null +++ b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/health/Status.java @@ -0,0 +1,11 @@ +package org.reactivecommons.async.rabbit.health; + +import lombok.Builder; +import lombok.Getter; + +@Getter +@Builder +public class Status { + private final String version; + private final String domain; +} diff --git a/async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/config/CommandListenersConfigTest.java b/async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/config/CommandListenersConfigTest.java index 0b1d0d25..c322f6e1 100644 --- a/async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/config/CommandListenersConfigTest.java +++ b/async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/config/CommandListenersConfigTest.java @@ -54,7 +54,7 @@ public void init() throws NoSuchFieldException, IllegalAccessException { when(receiver.consumeManualAck(any(String.class), any(ConsumeOptions.class))).thenReturn(Flux.never()); when(listener.getReceiver()).thenReturn(receiver); when(listener.getMaxConcurrency()).thenReturn(20); - manager.addDomain(DEFAULT_DOMAIN, listener, null); + manager.addDomain(DEFAULT_DOMAIN, listener, null, null); handlers.add(DEFAULT_DOMAIN, handlerResolver); } diff --git a/async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/config/EventListenersConfigTest.java b/async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/config/EventListenersConfigTest.java index f09b2bb5..d5f58c05 100644 --- a/async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/config/EventListenersConfigTest.java +++ b/async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/config/EventListenersConfigTest.java @@ -55,7 +55,7 @@ public void init() { when(listener.getReceiver()).thenReturn(receiver); when(listener.getMaxConcurrency()).thenReturn(20); connectionManager = new ConnectionManager(); - connectionManager.addDomain(HandlerRegistry.DEFAULT_DOMAIN, listener, sender); + connectionManager.addDomain(HandlerRegistry.DEFAULT_DOMAIN, listener, sender, null); handlers.add(HandlerRegistry.DEFAULT_DOMAIN, handlerResolver); } diff --git a/async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/config/NotificacionListenersConfigTest.java b/async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/config/NotificacionListenersConfigTest.java index 9e728b8a..53d92633 100644 --- a/async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/config/NotificacionListenersConfigTest.java +++ b/async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/config/NotificacionListenersConfigTest.java @@ -54,7 +54,7 @@ public void init() { when(receiver.consumeManualAck(any(String.class), any(ConsumeOptions.class))).thenReturn(Flux.never()); when(listener.getReceiver()).thenReturn(receiver); when(listener.getMaxConcurrency()).thenReturn(20); - manager.addDomain(DEFAULT_DOMAIN, listener, null); + manager.addDomain(DEFAULT_DOMAIN, listener, null, null); handlers.add(DEFAULT_DOMAIN, handlerResolver); } diff --git a/async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/config/QueryListenerConfigTest.java b/async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/config/QueryListenerConfigTest.java index 85224ea8..c7546e27 100644 --- a/async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/config/QueryListenerConfigTest.java +++ b/async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/config/QueryListenerConfigTest.java @@ -54,7 +54,7 @@ public void init() { when(receiver.consumeManualAck(any(String.class), any(ConsumeOptions.class))).thenReturn(Flux.never()); when(listener.getReceiver()).thenReturn(receiver); when(listener.getMaxConcurrency()).thenReturn(20); - manager.addDomain(DEFAULT_DOMAIN, listener, sender); + manager.addDomain(DEFAULT_DOMAIN, listener, sender, null); handlers.add(DEFAULT_DOMAIN, handlerResolver); } diff --git a/async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/health/RabbitReactiveHealthIndicatorTest.java b/async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/health/DomainRabbitReactiveHealthIndicatorTest.java similarity index 74% rename from async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/health/RabbitReactiveHealthIndicatorTest.java rename to async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/health/DomainRabbitReactiveHealthIndicatorTest.java index 2d173758..fb533280 100644 --- a/async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/health/RabbitReactiveHealthIndicatorTest.java +++ b/async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/health/DomainRabbitReactiveHealthIndicatorTest.java @@ -5,10 +5,10 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.reactivecommons.async.rabbit.config.ConnectionFactoryProvider; +import org.reactivecommons.async.rabbit.config.ConnectionManager; import org.springframework.boot.actuate.health.Health; import org.springframework.boot.actuate.health.Health.Builder; import org.springframework.boot.actuate.health.Status; @@ -22,20 +22,26 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.when; +import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_DOMAIN; @ExtendWith(MockitoExtension.class) -public class RabbitReactiveHealthIndicatorTest { +public class DomainRabbitReactiveHealthIndicatorTest { @Mock private ConnectionFactoryProvider provider; @Mock private ConnectionFactory factory; @Mock private Connection connection; - @InjectMocks - private RabbitReactiveHealthIndicator indicator; + + private DomainRabbitReactiveHealthIndicator indicator; @BeforeEach void setup() { + ConnectionManager connectionManager = new ConnectionManager(); + connectionManager.addDomain(DEFAULT_DOMAIN, null, null, provider); + connectionManager.addDomain("domain2", null, null, provider); + connectionManager.addDomain("domain3", null, null, provider); + indicator = new DomainRabbitReactiveHealthIndicator(connectionManager); when(provider.getConnectionFactory()).thenReturn(factory); } @@ -51,7 +57,9 @@ void shouldBeUp() throws IOException, TimeoutException { // Assert StepVerifier.create(result) .assertNext(health -> { - assertEquals("1.2.3", health.getDetails().get("version")); + assertEquals("1.2.3", health.getDetails().get(DEFAULT_DOMAIN)); + assertEquals("1.2.3", health.getDetails().get("domain2")); + assertEquals("1.2.3", health.getDetails().get("domain3")); assertEquals(Status.UP, health.getStatus()); }) .verifyComplete(); diff --git a/async/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/config/props/BrokerConfigProps.java b/async/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/config/props/BrokerConfigProps.java index 6ea6025e..4035d066 100644 --- a/async/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/config/props/BrokerConfigProps.java +++ b/async/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/config/props/BrokerConfigProps.java @@ -39,7 +39,7 @@ public String getCommandsQueue() { public String getReplyQueue() { final String name = replyQueueName.get(); if (name == null) { - final String replyName = NameGenerator.generateNameFrom(appName); + final String replyName = NameGenerator.generateNameFrom(appName, "reply"); if (replyQueueName.compareAndSet(null, replyName)) { return replyName; } else { diff --git a/gradle.properties b/gradle.properties index d4e7b07d..2ee7db9c 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,5 +1,5 @@ -version=2.0.4 -springBootVersion=3.0.2 +version=2.0.5 +springBootVersion=3.2.3 reactorRabbitVersion=1.5.5 gradleVersionsVersion=0.36.0 toPublish=async-commons,async-commons-api,async-commons-rabbit-standalone,async-commons-rabbit-starter,async-commons-rabbit-starter-eda,domain-events-api,async-rabbit diff --git a/samples/async/receiver-responder/async-receiver-sample.gradle b/samples/async/receiver-responder/async-receiver-sample.gradle index 2675840d..a0b30457 100644 --- a/samples/async/receiver-responder/async-receiver-sample.gradle +++ b/samples/async/receiver-responder/async-receiver-sample.gradle @@ -1,7 +1,7 @@ apply plugin: 'org.springframework.boot' dependencies { - implementation project(":async-commons-rabbit-starter-eda") + implementation project(":async-commons-rabbit-starter") implementation 'org.springframework.boot:spring-boot-starter' runtimeOnly 'org.springframework.boot:spring-boot-devtools' implementation 'io.cloudevents:cloudevents-core:2.5.0' diff --git a/samples/async/receiver-responder/src/main/java/sample/SampleReceiverApp.java b/samples/async/receiver-responder/src/main/java/sample/SampleReceiverApp.java index 0b137bdc..e3af69b1 100644 --- a/samples/async/receiver-responder/src/main/java/sample/SampleReceiverApp.java +++ b/samples/async/receiver-responder/src/main/java/sample/SampleReceiverApp.java @@ -1,6 +1,5 @@ package sample; -import io.cloudevents.CloudEvent; import lombok.AllArgsConstructor; import lombok.Data; import lombok.RequiredArgsConstructor; @@ -13,6 +12,7 @@ import org.reactivecommons.async.impl.config.annotations.EnableDirectAsyncGateway; import org.reactivecommons.async.impl.config.annotations.EnableDomainEventBus; import org.reactivecommons.async.impl.config.annotations.EnableEventListeners; +import org.reactivecommons.async.impl.config.annotations.EnableNotificationListener; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; @@ -24,6 +24,7 @@ @SpringBootApplication @EnableEventListeners +@EnableNotificationListener @EnableCommandListeners @EnableDomainEventBus @EnableDirectAsyncGateway @@ -79,6 +80,10 @@ public HandlerRegistry handlerRegistrySubs(/*DirectAsyncGateway gateway*/UseCase log.info(message.getData()); return Mono.empty(); }, String.class) + .listenNotificationEvent("sample.event", message -> { + log.info(message.getData()); + return Mono.empty(); + }, String.class) // .serveQuery("query1", message -> { // log.info("resolving from direct query" + message); // Map mapData = Map.of("1", "data"); diff --git a/samples/async/sender-client/src/main/java/sample/SampleRestController.java b/samples/async/sender-client/src/main/java/sample/SampleRestController.java index 5fb54ea3..d1922f8b 100644 --- a/samples/async/sender-client/src/main/java/sample/SampleRestController.java +++ b/samples/async/sender-client/src/main/java/sample/SampleRestController.java @@ -7,6 +7,7 @@ import lombok.Data; import lombok.NoArgsConstructor; import org.reactivecommons.api.domain.Command; +import org.reactivecommons.api.domain.DomainEvent; import org.reactivecommons.api.domain.DomainEventBus; import org.reactivecommons.async.api.AsyncQuery; import org.reactivecommons.async.api.DirectAsyncGateway; @@ -52,17 +53,18 @@ public Mono sampleService(@RequestBody Call call) throws JsonProcess return directAsyncGateway.requestReply(query, target, CloudEvent.class, "accounts"); } + @PostMapping(path = "/sample/event", consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE) public Mono sampleServiceEvent(@RequestBody Call call) throws JsonProcessingException { // AsyncQuery query = new AsyncQuery<>(queryName, call); - CloudEvent event = CloudEventBuilder.v1() // - .withId(UUID.randomUUID().toString()) // - .withSource(URI.create("https://spring.io/foos"))// - .withType("event") // - .withTime(OffsetDateTime.now()) - .withData("application/json", CloudEventBuilderExt.asBytes(call)) - .build(); - +// CloudEvent event = CloudEventBuilder.v1() // +// .withId(UUID.randomUUID().toString()) // +// .withSource(URI.create("https://spring.io/foos"))// +// .withType("event") // +// .withTime(OffsetDateTime.now()) +// .withData("application/json", CloudEventBuilderExt.asBytes(call)) +// .build(); + DomainEvent event = new DomainEvent<>("sample.event", UUID.randomUUID().toString(), "hello"); return Mono.from(domainEventBus.emit(event)).thenReturn("event"); } @@ -78,9 +80,9 @@ public Mono sampleServiceCommand(/*@RequestBody Call call*/@RequestParam // .build(); - return directAsyncGateway.sendCommand(new Command("unlock", "jhkj", "userId"), target, delay).thenReturn("command"); } + @PostMapping(path = "/sample/match", consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE) public Mono sampleServices(@RequestBody Call call) { AsyncQuery query = new AsyncQuery<>("sample.query.any.that.matches", call);