diff --git a/async/async-commons-api/src/main/java/org/reactivecommons/async/api/DirectAsyncGateway.java b/async/async-commons-api/src/main/java/org/reactivecommons/async/api/DirectAsyncGateway.java index 13b6d0b3..1501035b 100644 --- a/async/async-commons-api/src/main/java/org/reactivecommons/async/api/DirectAsyncGateway.java +++ b/async/async-commons-api/src/main/java/org/reactivecommons/async/api/DirectAsyncGateway.java @@ -5,10 +5,16 @@ import reactor.core.publisher.Mono; public interface DirectAsyncGateway { + public static final String DELAYED = "rc-delay"; + Mono sendCommand(Command command, String targetName); + Mono sendCommand(Command command, String targetName, long delayMillis); + Mono sendCommand(Command command, String targetName, String domain); + Mono sendCommand(Command command, String targetName, long delayMillis, String domain); + Mono sendCommand(CloudEvent command, String targetName); Mono sendCommand(CloudEvent command, String targetName, String domain); diff --git a/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/CommandListenersConfig.java b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/CommandListenersConfig.java index ed7342b0..fe7d2ad9 100644 --- a/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/CommandListenersConfig.java +++ b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/CommandListenersConfig.java @@ -28,7 +28,7 @@ public ApplicationCommandListener applicationCommandListener(ConnectionManager m MessageConverter converter, CustomReporter errorReporter) { ApplicationCommandListener commandListener = new ApplicationCommandListener(manager.getListener(DEFAULT_DOMAIN), appName, handlers.get(DEFAULT_DOMAIN), - asyncProps.getDirect().getExchange(), converter, asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(), + asyncProps.getDirect().getExchange(), converter, asyncProps.getWithDLQRetry(), asyncProps.getDelayedCommands(), asyncProps.getMaxRetries(), asyncProps.getRetryDelay(), asyncProps.getDirect().getMaxLengthBytes(), manager.getDiscardNotifier(DEFAULT_DOMAIN), errorReporter); commandListener.startListener(); diff --git a/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/props/AsyncProps.java b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/props/AsyncProps.java index 906a65ca..b23213d4 100644 --- a/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/props/AsyncProps.java +++ b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/props/AsyncProps.java @@ -44,5 +44,6 @@ public class AsyncProps { private Integer retryDelay = 1000; private Boolean withDLQRetry = false; + private Boolean delayedCommands = false; } diff --git a/async/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/config/CommandListenersConfig.java b/async/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/config/CommandListenersConfig.java index 1507feca..3b72f858 100644 --- a/async/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/config/CommandListenersConfig.java +++ b/async/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/config/CommandListenersConfig.java @@ -29,7 +29,7 @@ public ApplicationCommandListener applicationCommandListener(ReactiveMessageList DiscardNotifier discardNotifier, CustomReporter errorReporter) { ApplicationCommandListener commandListener = new ApplicationCommandListener(listener, appName, resolver, - asyncProps.getDirect().getExchange(), converter, asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(), + asyncProps.getDirect().getExchange(), converter, asyncProps.getWithDLQRetry(), asyncProps.getDelayedCommands(), asyncProps.getMaxRetries(), asyncProps.getRetryDelay(), asyncProps.getDirect().getMaxLengthBytes(), discardNotifier, errorReporter); commandListener.startListener(); diff --git a/async/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/config/props/AsyncProps.java b/async/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/config/props/AsyncProps.java index 8dd69382..2c7e9f8c 100644 --- a/async/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/config/props/AsyncProps.java +++ b/async/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/config/props/AsyncProps.java @@ -33,4 +33,5 @@ public class AsyncProps { private Integer retryDelay = 1000; private Boolean withDLQRetry = false; + private Boolean delayedCommands = false; } diff --git a/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitDirectAsyncGateway.java b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitDirectAsyncGateway.java index 9fa35e10..afb5e4da 100644 --- a/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitDirectAsyncGateway.java +++ b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitDirectAsyncGateway.java @@ -59,12 +59,28 @@ public RabbitDirectAsyncGateway(BrokerConfig config, ReactiveReplyRouter router, @Override public Mono sendCommand(Command command, String targetName) { - return sendCommand(command, targetName, DEFAULT_DOMAIN); + return sendCommand(command, targetName, 0, DEFAULT_DOMAIN); + } + + @Override + public Mono sendCommand(Command command, String targetName, long delayMillis) { + return sendCommand(command, targetName, delayMillis, DEFAULT_DOMAIN); } @Override public Mono sendCommand(Command command, String targetName, String domain) { - return resolveSender(domain).sendWithConfirm(command, exchange, targetName, Collections.emptyMap(), persistentCommands); + return sendCommand(command, targetName, 0, domain); + } + + @Override + public Mono sendCommand(Command command, String targetName, long delayMillis, String domain) { + Map headers = new HashMap<>(); + String realTarget = targetName; + if (delayMillis > 0) { + headers.put(DELAYED, String.valueOf(delayMillis)); + realTarget = targetName + "-delayed"; + } + return resolveSender(domain).sendWithConfirm(command, exchange, realTarget, headers, persistentCommands); } @Override diff --git a/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/communications/ReactiveMessageSender.java b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/communications/ReactiveMessageSender.java index 4726ee84..51bfdacb 100644 --- a/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/communications/ReactiveMessageSender.java +++ b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/communications/ReactiveMessageSender.java @@ -1,10 +1,13 @@ package org.reactivecommons.async.rabbit.communications; import com.rabbitmq.client.AMQP; +import org.reactivecommons.async.commons.communications.Message; import org.reactivecommons.async.commons.converters.MessageConverter; import org.reactivecommons.async.commons.exceptions.SendFailureNoAckException; -import org.reactivecommons.async.commons.communications.Message; -import reactor.core.publisher.*; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; +import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoSink; import reactor.rabbitmq.OutboundMessage; import reactor.rabbitmq.OutboundMessageResult; import reactor.rabbitmq.Sender; @@ -13,14 +16,16 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; -import java.util.concurrent.*; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; +import static org.reactivecommons.async.api.DirectAsyncGateway.DELAYED; import static org.reactivecommons.async.commons.Headers.SOURCE_APPLICATION; public class ReactiveMessageSender { - private final Sender sender; private final String sourceApplication; private final MessageConverter messageConverter; @@ -61,7 +66,7 @@ public Mono sendWithConfirm(T message, String exchange, String routing return Mono.create(monoSink -> { Consumer notifier = new AckNotifier(monoSink); final MyOutboundMessage outboundMessage = toOutboundMessage(message, exchange, routingKey, headers, notifier, persistent); - executorService2.submit(() -> fluxSinkConfirm.get((int) (System.currentTimeMillis()%numberOfSenderSubscriptions)).next(outboundMessage)); + executorService2.submit(() -> fluxSinkConfirm.get((int) (System.currentTimeMillis() % numberOfSenderSubscriptions)).next(outboundMessage)); }); } @@ -73,11 +78,11 @@ public Mono sendNoConfirm(T message, String exchange, String routingKe public Flux sendWithConfirmBatch(Flux messages, String exchange, String routingKey, Map headers, boolean persistent) { return messages.map(message -> toOutboundMessage(message, exchange, routingKey, headers, persistent)) - .as(sender::sendWithPublishConfirms) - .flatMap(result -> result.isAck() ? - Mono.empty() : - Mono.error(new SendFailureNoAckException("Event no ACK in communications")) - ); + .as(sender::sendWithPublishConfirms) + .flatMap(result -> result.isAck() ? + Mono.empty() : + Mono.error(new SendFailureNoAckException("Event no ACK in communications")) + ); } private static class AckNotifier implements Consumer { @@ -98,8 +103,7 @@ public void accept(Boolean ack) { } - - static class MyOutboundMessage extends OutboundMessage{ + static class MyOutboundMessage extends OutboundMessage { private final Consumer ackNotifier; @@ -130,14 +134,18 @@ private AMQP.BasicProperties buildMessageProperties(Message message, Map baseHeaders = new HashMap<>(properties.getHeaders()); baseHeaders.putAll(headers); baseHeaders.put(SOURCE_APPLICATION, sourceApplication); - return new AMQP.BasicProperties.Builder() + AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder() .contentType(properties.getContentType()) .appId(sourceApplication) .contentEncoding(properties.getContentEncoding()) .deliveryMode(persistent ? 2 : 1) .timestamp(new Date()) .messageId(UUID.randomUUID().toString()) - .headers(baseHeaders).build(); + .headers(baseHeaders); + if (headers.containsKey(DELAYED)) { + builder.expiration((String) headers.get(DELAYED)); + } + return builder.build(); } public reactor.rabbitmq.Sender getSender() { diff --git a/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/communications/TopologyCreator.java b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/communications/TopologyCreator.java index d6c00e55..6369d382 100644 --- a/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/communications/TopologyCreator.java +++ b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/communications/TopologyCreator.java @@ -56,9 +56,15 @@ public Mono declareDLQ(String originQueue, String retryTar } public Mono declareQueue(String name, String dlqExchange, Optional maxLengthBytesOpt) { + return declareQueue(name, dlqExchange, maxLengthBytesOpt, Optional.empty()); + } + + public Mono declareQueue(String name, String dlqExchange, Optional maxLengthBytesOpt, + Optional dlRoutingKey) { final Map args = new HashMap<>(); args.put("x-dead-letter-exchange", dlqExchange); maxLengthBytesOpt.ifPresent(maxLengthBytes -> args.put("x-max-length-bytes", maxLengthBytes)); + dlRoutingKey.ifPresent(routingKey -> args.put("x-dead-letter-routing-key", routingKey)); QueueSpecification specification = QueueSpecification.queue(name) .durable(true) .arguments(args); diff --git a/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/ApplicationCommandListener.java b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/ApplicationCommandListener.java index 7194688d..a2e48fba 100644 --- a/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/ApplicationCommandListener.java +++ b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/ApplicationCommandListener.java @@ -3,21 +3,20 @@ import com.rabbitmq.client.AMQP; import lombok.extern.java.Log; import org.reactivecommons.api.domain.Command; -import org.reactivecommons.async.commons.communications.Message; -import org.reactivecommons.async.commons.converters.MessageConverter; -import org.reactivecommons.async.commons.DiscardNotifier; import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler; import org.reactivecommons.async.commons.CommandExecutor; +import org.reactivecommons.async.commons.DiscardNotifier; +import org.reactivecommons.async.commons.communications.Message; +import org.reactivecommons.async.commons.converters.MessageConverter; +import org.reactivecommons.async.commons.ext.CustomReporter; import org.reactivecommons.async.rabbit.HandlerResolver; import org.reactivecommons.async.rabbit.RabbitMessage; import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener; import org.reactivecommons.async.rabbit.communications.TopologyCreator; -import org.reactivecommons.async.commons.ext.CustomReporter; import reactor.core.publisher.Mono; import reactor.rabbitmq.AcknowledgableDelivery; import reactor.rabbitmq.BindingSpecification; import reactor.rabbitmq.ExchangeSpecification; -import reactor.rabbitmq.QueueSpecification; import java.util.Optional; import java.util.function.Function; @@ -29,18 +28,20 @@ public class ApplicationCommandListener extends GenericMessageListener { private final HandlerResolver resolver; private final String directExchange; private final boolean withDLQRetry; + private final boolean delayedCommands; private final int retryDelay; private final Optional maxLengthBytes; //TODO: change large constructor parameters number - public ApplicationCommandListener(ReactiveMessageListener listener, String queueName, HandlerResolver resolver, String directExchange, MessageConverter messageConverter, boolean withDLQRetry, long maxRetries, int retryDelay, Optional maxLengthBytes, DiscardNotifier discardNotifier, CustomReporter errorReporter) { + public ApplicationCommandListener(ReactiveMessageListener listener, String queueName, HandlerResolver resolver, String directExchange, MessageConverter messageConverter, boolean withDLQRetry, boolean delayedCommands, long maxRetries, int retryDelay, Optional maxLengthBytes, DiscardNotifier discardNotifier, CustomReporter errorReporter) { super(queueName, listener, withDLQRetry, maxRetries, discardNotifier, "command", errorReporter); this.retryDelay = retryDelay; this.withDLQRetry = withDLQRetry; + this.delayedCommands = delayedCommands; this.resolver = resolver; this.directExchange = directExchange; this.messageConverter = messageConverter; - this.maxLengthBytes =maxLengthBytes; + this.maxLengthBytes = maxLengthBytes; } protected Mono setUpBindings(TopologyCreator creator) { @@ -51,12 +52,28 @@ protected Mono setUpBindings(TopologyCreator creator) { final Mono declareDLQ = creator.declareDLQ(queueName, directExchange, retryDelay, maxLengthBytes); final Mono binding = creator.bind(BindingSpecification.binding(directExchange, queueName, queueName)); final Mono bindingDLQ = creator.bind(BindingSpecification.binding(directExchange + ".DLQ", queueName, queueName + ".DLQ")); - return declareExchange.then(declareExchangeDLQ).then(declareDLQ).then(declareQueue).then(bindingDLQ).then(binding).then(); + return declareExchange.then(declareExchangeDLQ) + .then(declareDLQ) + .then(declareQueue) + .then(bindingDLQ) + .then(binding) + .then(declareDelayedTopology(creator)) + .then(); } else { final Mono declareQueue = creator.declareQueue(queueName, maxLengthBytes); final Mono binding = creator.bind(BindingSpecification.binding(directExchange, queueName, queueName)); - return declareExchange.then(declareQueue).then(binding).then(); + return declareExchange.then(declareQueue).then(binding).then(declareDelayedTopology(creator)).then(); + } + } + + private Mono declareDelayedTopology(TopologyCreator creator) { + if (delayedCommands) { + String delayedQueue = queueName + "-delayed"; + final Mono declareQueue = creator.declareQueue(delayedQueue, directExchange, maxLengthBytes, Optional.of(queueName)); + final Mono binding = creator.bind(BindingSpecification.binding(directExchange, delayedQueue, delayedQueue)); + return declareQueue.then(binding).then(); } + return Mono.empty(); } @@ -75,7 +92,7 @@ protected String getExecutorPath(AcknowledgableDelivery msj) { } @Override - protected Object parseMessageForReporter(Message message){ + protected Object parseMessageForReporter(Message message) { return messageConverter.readCommandStructure(message); } diff --git a/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/listeners/ApplicationCommandListenerPerfTest.java b/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/listeners/ApplicationCommandListenerPerfTest.java index 4e3a1e3f..914c4e7e 100644 --- a/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/listeners/ApplicationCommandListenerPerfTest.java +++ b/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/listeners/ApplicationCommandListenerPerfTest.java @@ -322,7 +322,7 @@ private AMQP.BasicProperties createProps() { class StubGenericMessageListener extends ApplicationCommandListener { public StubGenericMessageListener(String queueName, ReactiveMessageListener listener, boolean useDLQRetries, long maxRetries, DiscardNotifier discardNotifier, String objectType, HandlerResolver handlerResolver, MessageConverter messageConverter, CustomReporter errorReporter) { - super(listener, queueName, handlerResolver, "directExchange", messageConverter, true, 10, 10, Optional.empty(), discardNotifier, errorReporter); + super(listener, queueName, handlerResolver, "directExchange", messageConverter, true, false, 10, 10, Optional.empty(), discardNotifier, errorReporter); } } diff --git a/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/listeners/ApplicationCommandListenerTest.java b/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/listeners/ApplicationCommandListenerTest.java index e42dd11f..293cff15 100644 --- a/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/listeners/ApplicationCommandListenerTest.java +++ b/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/listeners/ApplicationCommandListenerTest.java @@ -54,7 +54,7 @@ protected GenericMessageListener createMessageListener(HandlerResolver handlerRe class StubGenericMessageListener extends ApplicationCommandListener { public StubGenericMessageListener(HandlerResolver handlerResolver) { - super(reactiveMessageListener, "queueName", handlerResolver, "directExchange", messageConverter, true, 10, 10, Optional.empty(), discardNotifier, errorReporter); + super(reactiveMessageListener, "queueName", handlerResolver, "directExchange", messageConverter, true, false,10, 10, Optional.empty(), discardNotifier, errorReporter); } } } diff --git a/gradle.properties b/gradle.properties index c2fbf536..d4e7b07d 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,4 @@ -version=2.0.3 +version=2.0.4 springBootVersion=3.0.2 reactorRabbitVersion=1.5.5 gradleVersionsVersion=0.36.0 diff --git a/samples/async/receiver-responder/src/main/java/sample/SampleReceiverApp.java b/samples/async/receiver-responder/src/main/java/sample/SampleReceiverApp.java index 9fe79f02..0b137bdc 100644 --- a/samples/async/receiver-responder/src/main/java/sample/SampleReceiverApp.java +++ b/samples/async/receiver-responder/src/main/java/sample/SampleReceiverApp.java @@ -71,14 +71,14 @@ public HandlerRegistry handlerRegistrySubs(/*DirectAsyncGateway gateway*/UseCase // }, Call.class); // .handleDynamicEvents("dynamic.*", message -> Mono.empty(), Object.class) - .listenEvent("event", message -> { - log.info(message.getData().toString()); - return useCase.sendCommand(message.getData()); - }, CloudEvent.class) - .handleCommand("command", message -> { - log.info(message.getData().toString()); +// .listenEvent("event", message -> { +// log.info(message.getData().toString()); +// return useCase.sendCommand(message.getData()); +// }, CloudEvent.class) + .handleCommand("unlock", message -> { + log.info(message.getData()); return Mono.empty(); - }, CloudEvent.class) + }, String.class) // .serveQuery("query1", message -> { // log.info("resolving from direct query" + message); // Map mapData = Map.of("1", "data"); diff --git a/samples/async/receiver-responder/src/main/resources/application.yaml b/samples/async/receiver-responder/src/main/resources/application.yaml index 5bba31da..fa6c922a 100644 --- a/samples/async/receiver-responder/src/main/resources/application.yaml +++ b/samples/async/receiver-responder/src/main/resources/application.yaml @@ -6,7 +6,8 @@ spring: app: async: max-retries: 10 - with-d-l-q-retry: true +# with-d-l-q-retry: true + delayed-commands: true retry-delay: 1000 # son milisegundos # connections: # app: diff --git a/samples/async/sender-client/src/main/java/sample/SampleRestController.java b/samples/async/sender-client/src/main/java/sample/SampleRestController.java index cb365daf..5fb54ea3 100644 --- a/samples/async/sender-client/src/main/java/sample/SampleRestController.java +++ b/samples/async/sender-client/src/main/java/sample/SampleRestController.java @@ -6,14 +6,17 @@ import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; +import org.reactivecommons.api.domain.Command; import org.reactivecommons.api.domain.DomainEventBus; import org.reactivecommons.async.api.AsyncQuery; import org.reactivecommons.async.api.DirectAsyncGateway; import org.reactivecommons.async.rabbit.converters.json.CloudEventBuilderExt; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.MediaType; +import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Mono; @@ -63,18 +66,20 @@ public Mono sampleServiceEvent(@RequestBody Call call) throws JsonProces return Mono.from(domainEventBus.emit(event)).thenReturn("event"); } - @PostMapping(path = "/sample/command", consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE) - public Mono sampleServiceCommand(@RequestBody Call call) throws JsonProcessingException { + @GetMapping(path = "/sample/command", /*consumes = MediaType.APPLICATION_JSON_VALUE,*/ produces = MediaType.APPLICATION_JSON_VALUE) + public Mono sampleServiceCommand(/*@RequestBody Call call*/@RequestParam("delay") long delay) throws JsonProcessingException { // AsyncQuery query = new AsyncQuery<>(queryName, call); - CloudEvent command = CloudEventBuilder.v1() // - .withId(UUID.randomUUID().toString()) // - .withSource(URI.create("https://spring.io/foos"))// - .withType("command") // - .withTime(OffsetDateTime.now()) - .withData("application/json", CloudEventBuilderExt.asBytes(call)) - .build(); +// CloudEvent command = CloudEventBuilder.v1() // +// .withId(UUID.randomUUID().toString()) // +// .withSource(URI.create("https://spring.io/foos"))// +// .withType("command") // +// .withTime(OffsetDateTime.now()) +// .withData("application/json", CloudEventBuilderExt.asBytes(call)) +// .build(); + + - return directAsyncGateway.sendCommand(command, target, "accounts").thenReturn("command"); + return directAsyncGateway.sendCommand(new Command("unlock", "jhkj", "userId"), target, delay).thenReturn("command"); } @PostMapping(path = "/sample/match", consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE) public Mono sampleServices(@RequestBody Call call) {