Skip to content

Commit

Permalink
Merge pull request #92 from reactive-commons/feature/delayed-command
Browse files Browse the repository at this point in the history
Add delayed command capability
  • Loading branch information
juancgalvis authored Oct 30, 2023
2 parents 1cb8290 + 43bd7af commit b862461
Show file tree
Hide file tree
Showing 15 changed files with 110 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,16 @@
import reactor.core.publisher.Mono;

public interface DirectAsyncGateway {
public static final String DELAYED = "rc-delay";

<T> Mono<Void> sendCommand(Command<T> command, String targetName);

<T> Mono<Void> sendCommand(Command<T> command, String targetName, long delayMillis);

<T> Mono<Void> sendCommand(Command<T> command, String targetName, String domain);

<T> Mono<Void> sendCommand(Command<T> command, String targetName, long delayMillis, String domain);

Mono<Void> sendCommand(CloudEvent command, String targetName);

Mono<Void> sendCommand(CloudEvent command, String targetName, String domain);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public ApplicationCommandListener applicationCommandListener(ConnectionManager m
MessageConverter converter,
CustomReporter errorReporter) {
ApplicationCommandListener commandListener = new ApplicationCommandListener(manager.getListener(DEFAULT_DOMAIN), appName, handlers.get(DEFAULT_DOMAIN),
asyncProps.getDirect().getExchange(), converter, asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(),
asyncProps.getDirect().getExchange(), converter, asyncProps.getWithDLQRetry(), asyncProps.getDelayedCommands(), asyncProps.getMaxRetries(),
asyncProps.getRetryDelay(), asyncProps.getDirect().getMaxLengthBytes(), manager.getDiscardNotifier(DEFAULT_DOMAIN), errorReporter);

commandListener.startListener();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,6 @@ public class AsyncProps {
private Integer retryDelay = 1000;

private Boolean withDLQRetry = false;
private Boolean delayedCommands = false;

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public ApplicationCommandListener applicationCommandListener(ReactiveMessageList
DiscardNotifier discardNotifier,
CustomReporter errorReporter) {
ApplicationCommandListener commandListener = new ApplicationCommandListener(listener, appName, resolver,
asyncProps.getDirect().getExchange(), converter, asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(),
asyncProps.getDirect().getExchange(), converter, asyncProps.getWithDLQRetry(), asyncProps.getDelayedCommands(), asyncProps.getMaxRetries(),
asyncProps.getRetryDelay(), asyncProps.getDirect().getMaxLengthBytes(), discardNotifier, errorReporter);

commandListener.startListener();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,5 @@ public class AsyncProps {
private Integer retryDelay = 1000;

private Boolean withDLQRetry = false;
private Boolean delayedCommands = false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,28 @@ public RabbitDirectAsyncGateway(BrokerConfig config, ReactiveReplyRouter router,

@Override
public <T> Mono<Void> sendCommand(Command<T> command, String targetName) {
return sendCommand(command, targetName, DEFAULT_DOMAIN);
return sendCommand(command, targetName, 0, DEFAULT_DOMAIN);
}

@Override
public <T> Mono<Void> sendCommand(Command<T> command, String targetName, long delayMillis) {
return sendCommand(command, targetName, delayMillis, DEFAULT_DOMAIN);
}

@Override
public <T> Mono<Void> sendCommand(Command<T> command, String targetName, String domain) {
return resolveSender(domain).sendWithConfirm(command, exchange, targetName, Collections.emptyMap(), persistentCommands);
return sendCommand(command, targetName, 0, domain);
}

@Override
public <T> Mono<Void> sendCommand(Command<T> command, String targetName, long delayMillis, String domain) {
Map<String, Object> headers = new HashMap<>();
String realTarget = targetName;
if (delayMillis > 0) {
headers.put(DELAYED, String.valueOf(delayMillis));
realTarget = targetName + "-delayed";
}
return resolveSender(domain).sendWithConfirm(command, exchange, realTarget, headers, persistentCommands);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package org.reactivecommons.async.rabbit.communications;

import com.rabbitmq.client.AMQP;
import org.reactivecommons.async.commons.communications.Message;
import org.reactivecommons.async.commons.converters.MessageConverter;
import org.reactivecommons.async.commons.exceptions.SendFailureNoAckException;
import org.reactivecommons.async.commons.communications.Message;
import reactor.core.publisher.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.rabbitmq.OutboundMessage;
import reactor.rabbitmq.OutboundMessageResult;
import reactor.rabbitmq.Sender;
Expand All @@ -13,14 +16,16 @@
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

import static org.reactivecommons.async.api.DirectAsyncGateway.DELAYED;
import static org.reactivecommons.async.commons.Headers.SOURCE_APPLICATION;

public class ReactiveMessageSender {

private final Sender sender;
private final String sourceApplication;
private final MessageConverter messageConverter;
Expand Down Expand Up @@ -61,7 +66,7 @@ public <T> Mono<Void> sendWithConfirm(T message, String exchange, String routing
return Mono.create(monoSink -> {
Consumer<Boolean> notifier = new AckNotifier(monoSink);
final MyOutboundMessage outboundMessage = toOutboundMessage(message, exchange, routingKey, headers, notifier, persistent);
executorService2.submit(() -> fluxSinkConfirm.get((int) (System.currentTimeMillis()%numberOfSenderSubscriptions)).next(outboundMessage));
executorService2.submit(() -> fluxSinkConfirm.get((int) (System.currentTimeMillis() % numberOfSenderSubscriptions)).next(outboundMessage));
});
}

Expand All @@ -73,11 +78,11 @@ public <T> Mono<Void> sendNoConfirm(T message, String exchange, String routingKe

public <T> Flux<OutboundMessageResult> sendWithConfirmBatch(Flux<T> messages, String exchange, String routingKey, Map<String, Object> headers, boolean persistent) {
return messages.map(message -> toOutboundMessage(message, exchange, routingKey, headers, persistent))
.as(sender::sendWithPublishConfirms)
.flatMap(result -> result.isAck() ?
Mono.empty() :
Mono.error(new SendFailureNoAckException("Event no ACK in communications"))
);
.as(sender::sendWithPublishConfirms)
.flatMap(result -> result.isAck() ?
Mono.empty() :
Mono.error(new SendFailureNoAckException("Event no ACK in communications"))
);
}

private static class AckNotifier implements Consumer<Boolean> {
Expand All @@ -98,8 +103,7 @@ public void accept(Boolean ack) {
}



static class MyOutboundMessage extends OutboundMessage{
static class MyOutboundMessage extends OutboundMessage {

private final Consumer<Boolean> ackNotifier;

Expand Down Expand Up @@ -130,14 +134,18 @@ private AMQP.BasicProperties buildMessageProperties(Message message, Map<String,
final Map<String, Object> baseHeaders = new HashMap<>(properties.getHeaders());
baseHeaders.putAll(headers);
baseHeaders.put(SOURCE_APPLICATION, sourceApplication);
return new AMQP.BasicProperties.Builder()
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder()
.contentType(properties.getContentType())
.appId(sourceApplication)
.contentEncoding(properties.getContentEncoding())
.deliveryMode(persistent ? 2 : 1)
.timestamp(new Date())
.messageId(UUID.randomUUID().toString())
.headers(baseHeaders).build();
.headers(baseHeaders);
if (headers.containsKey(DELAYED)) {
builder.expiration((String) headers.get(DELAYED));
}
return builder.build();
}

public reactor.rabbitmq.Sender getSender() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,15 @@ public Mono<AMQP.Queue.DeclareOk> declareDLQ(String originQueue, String retryTar
}

public Mono<AMQP.Queue.DeclareOk> declareQueue(String name, String dlqExchange, Optional<Integer> maxLengthBytesOpt) {
return declareQueue(name, dlqExchange, maxLengthBytesOpt, Optional.empty());
}

public Mono<AMQP.Queue.DeclareOk> declareQueue(String name, String dlqExchange, Optional<Integer> maxLengthBytesOpt,
Optional<String> dlRoutingKey) {
final Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", dlqExchange);
maxLengthBytesOpt.ifPresent(maxLengthBytes -> args.put("x-max-length-bytes", maxLengthBytes));
dlRoutingKey.ifPresent(routingKey -> args.put("x-dead-letter-routing-key", routingKey));
QueueSpecification specification = QueueSpecification.queue(name)
.durable(true)
.arguments(args);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,20 @@
import com.rabbitmq.client.AMQP;
import lombok.extern.java.Log;
import org.reactivecommons.api.domain.Command;
import org.reactivecommons.async.commons.communications.Message;
import org.reactivecommons.async.commons.converters.MessageConverter;
import org.reactivecommons.async.commons.DiscardNotifier;
import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler;
import org.reactivecommons.async.commons.CommandExecutor;
import org.reactivecommons.async.commons.DiscardNotifier;
import org.reactivecommons.async.commons.communications.Message;
import org.reactivecommons.async.commons.converters.MessageConverter;
import org.reactivecommons.async.commons.ext.CustomReporter;
import org.reactivecommons.async.rabbit.HandlerResolver;
import org.reactivecommons.async.rabbit.RabbitMessage;
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
import org.reactivecommons.async.commons.ext.CustomReporter;
import reactor.core.publisher.Mono;
import reactor.rabbitmq.AcknowledgableDelivery;
import reactor.rabbitmq.BindingSpecification;
import reactor.rabbitmq.ExchangeSpecification;
import reactor.rabbitmq.QueueSpecification;

import java.util.Optional;
import java.util.function.Function;
Expand All @@ -29,18 +28,20 @@ public class ApplicationCommandListener extends GenericMessageListener {
private final HandlerResolver resolver;
private final String directExchange;
private final boolean withDLQRetry;
private final boolean delayedCommands;
private final int retryDelay;
private final Optional<Integer> maxLengthBytes;

//TODO: change large constructor parameters number
public ApplicationCommandListener(ReactiveMessageListener listener, String queueName, HandlerResolver resolver, String directExchange, MessageConverter messageConverter, boolean withDLQRetry, long maxRetries, int retryDelay, Optional<Integer> maxLengthBytes, DiscardNotifier discardNotifier, CustomReporter errorReporter) {
public ApplicationCommandListener(ReactiveMessageListener listener, String queueName, HandlerResolver resolver, String directExchange, MessageConverter messageConverter, boolean withDLQRetry, boolean delayedCommands, long maxRetries, int retryDelay, Optional<Integer> maxLengthBytes, DiscardNotifier discardNotifier, CustomReporter errorReporter) {
super(queueName, listener, withDLQRetry, maxRetries, discardNotifier, "command", errorReporter);
this.retryDelay = retryDelay;
this.withDLQRetry = withDLQRetry;
this.delayedCommands = delayedCommands;
this.resolver = resolver;
this.directExchange = directExchange;
this.messageConverter = messageConverter;
this.maxLengthBytes =maxLengthBytes;
this.maxLengthBytes = maxLengthBytes;
}

protected Mono<Void> setUpBindings(TopologyCreator creator) {
Expand All @@ -51,12 +52,28 @@ protected Mono<Void> setUpBindings(TopologyCreator creator) {
final Mono<AMQP.Queue.DeclareOk> declareDLQ = creator.declareDLQ(queueName, directExchange, retryDelay, maxLengthBytes);
final Mono<AMQP.Queue.BindOk> binding = creator.bind(BindingSpecification.binding(directExchange, queueName, queueName));
final Mono<AMQP.Queue.BindOk> bindingDLQ = creator.bind(BindingSpecification.binding(directExchange + ".DLQ", queueName, queueName + ".DLQ"));
return declareExchange.then(declareExchangeDLQ).then(declareDLQ).then(declareQueue).then(bindingDLQ).then(binding).then();
return declareExchange.then(declareExchangeDLQ)
.then(declareDLQ)
.then(declareQueue)
.then(bindingDLQ)
.then(binding)
.then(declareDelayedTopology(creator))
.then();
} else {
final Mono<AMQP.Queue.DeclareOk> declareQueue = creator.declareQueue(queueName, maxLengthBytes);
final Mono<AMQP.Queue.BindOk> binding = creator.bind(BindingSpecification.binding(directExchange, queueName, queueName));
return declareExchange.then(declareQueue).then(binding).then();
return declareExchange.then(declareQueue).then(binding).then(declareDelayedTopology(creator)).then();
}
}

private Mono<Void> declareDelayedTopology(TopologyCreator creator) {
if (delayedCommands) {
String delayedQueue = queueName + "-delayed";
final Mono<AMQP.Queue.DeclareOk> declareQueue = creator.declareQueue(delayedQueue, directExchange, maxLengthBytes, Optional.of(queueName));
final Mono<AMQP.Queue.BindOk> binding = creator.bind(BindingSpecification.binding(directExchange, delayedQueue, delayedQueue));
return declareQueue.then(binding).then();
}
return Mono.empty();
}


Expand All @@ -75,7 +92,7 @@ protected String getExecutorPath(AcknowledgableDelivery msj) {
}

@Override
protected Object parseMessageForReporter(Message message){
protected Object parseMessageForReporter(Message message) {
return messageConverter.readCommandStructure(message);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ private AMQP.BasicProperties createProps() {
class StubGenericMessageListener extends ApplicationCommandListener {

public StubGenericMessageListener(String queueName, ReactiveMessageListener listener, boolean useDLQRetries, long maxRetries, DiscardNotifier discardNotifier, String objectType, HandlerResolver handlerResolver, MessageConverter messageConverter, CustomReporter errorReporter) {
super(listener, queueName, handlerResolver, "directExchange", messageConverter, true, 10, 10, Optional.empty(), discardNotifier, errorReporter);
super(listener, queueName, handlerResolver, "directExchange", messageConverter, true, false, 10, 10, Optional.empty(), discardNotifier, errorReporter);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ protected GenericMessageListener createMessageListener(HandlerResolver handlerRe
class StubGenericMessageListener extends ApplicationCommandListener {

public StubGenericMessageListener(HandlerResolver handlerResolver) {
super(reactiveMessageListener, "queueName", handlerResolver, "directExchange", messageConverter, true, 10, 10, Optional.empty(), discardNotifier, errorReporter);
super(reactiveMessageListener, "queueName", handlerResolver, "directExchange", messageConverter, true, false,10, 10, Optional.empty(), discardNotifier, errorReporter);
}
}
}
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version=2.0.3
version=2.0.4
springBootVersion=3.0.2
reactorRabbitVersion=1.5.5
gradleVersionsVersion=0.36.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,14 @@ public HandlerRegistry handlerRegistrySubs(/*DirectAsyncGateway gateway*/UseCase
// }, Call.class);

// .handleDynamicEvents("dynamic.*", message -> Mono.empty(), Object.class)
.listenEvent("event", message -> {
log.info(message.getData().toString());
return useCase.sendCommand(message.getData());
}, CloudEvent.class)
.handleCommand("command", message -> {
log.info(message.getData().toString());
// .listenEvent("event", message -> {
// log.info(message.getData().toString());
// return useCase.sendCommand(message.getData());
// }, CloudEvent.class)
.handleCommand("unlock", message -> {
log.info(message.getData());
return Mono.empty();
}, CloudEvent.class)
}, String.class)
// .serveQuery("query1", message -> {
// log.info("resolving from direct query" + message);
// Map<String, String> mapData = Map.of("1", "data");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ spring:
app:
async:
max-retries: 10
with-d-l-q-retry: true
# with-d-l-q-retry: true
delayed-commands: true
retry-delay: 1000 # son milisegundos
# connections:
# app:
Expand Down
Loading

0 comments on commit b862461

Please sign in to comment.