Skip to content

Commit

Permalink
Merge pull request #96 from reactive-commons/feature/fix-health
Browse files Browse the repository at this point in the history
Fix health check, update name generator
  • Loading branch information
juancgalvis authored Feb 27, 2024
2 parents 143248f + a9fa253 commit 788a912
Show file tree
Hide file tree
Showing 18 changed files with 118 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ private static String generateName(String applicationName, String suffix) {
bb.putLong(uuid.getMostSignificantBits())
.putLong(uuid.getLeastSignificantBits());
// Convert to base64 and remove trailing =
return applicationName+"-"+ suffix + "-" + encodeToUrlSafeString(bb.array())
String realSuffix = suffix != null && !"".equals(suffix) ? suffix + "." : "";
return applicationName+"."+ realSuffix + encodeToUrlSafeString(bb.array())
.replace("=", "");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ class NameGeneratorTest {
void generateNameFromWithoutSuffix() {
String result = NameGenerator.generateNameFrom("application");
assertFalse(result.contains("="));
assertTrue(result.startsWith("application--"));
assertEquals(35, result.length());
assertTrue(result.startsWith("application."));
assertEquals(34, result.length());
}

@Test
void generateNameFromWithSuffix() {
String result = NameGenerator.generateNameFrom("application", "suffix");
assertFalse(result.contains("="));
assertTrue(result.startsWith("application-suffix-"));
assertTrue(result.startsWith("application.suffix."));
assertEquals(41, result.length());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@ public void setDiscardNotifier(String domain, DiscardNotifier discardNotifier) {
getChecked(domain).setDiscardNotifier(discardNotifier);
}

public ConnectionManager addDomain(String domain, ReactiveMessageListener listener, ReactiveMessageSender sender) {
public ConnectionManager addDomain(String domain, ReactiveMessageListener listener, ReactiveMessageSender sender,
ConnectionFactoryProvider provider) {
connections.put(domain, DomainConnections.builder()
.listener(listener)
.sender(sender)
.provider(provider)
.build());
return this;
}
Expand All @@ -65,7 +67,7 @@ public DiscardNotifier getDiscardNotifier(String domain) {
return getChecked(domain).getDiscardNotifier();
}

public ConnectionFactoryProvider getProvider(String domain) {
return getChecked(domain).getProvider();
public Map<String, DomainConnections> getProviders() {
return connections;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.reactivecommons.async.rabbit.config;

import org.reactivecommons.async.rabbit.health.RabbitReactiveHealthIndicator;
import org.reactivecommons.async.rabbit.health.DomainRabbitReactiveHealthIndicator;
import org.springframework.boot.actuate.health.AbstractReactiveHealthIndicator;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.context.annotation.Bean;
Expand All @@ -13,7 +13,7 @@
public class RabbitHealthConfig {

@Bean
public RabbitReactiveHealthIndicator rabbitHealthIndicator(ConnectionManager manager) {
return new RabbitReactiveHealthIndicator(manager.getProvider(DEFAULT_DOMAIN)); // TODO: Check every domain connection
public DomainRabbitReactiveHealthIndicator rabbitHealthIndicator(ConnectionManager manager) {
return new DomainRabbitReactiveHealthIndicator(manager);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public ConnectionManager buildConnectionManager(@Value("${spring.application.nam
ConnectionFactoryProvider provider = createConnectionFactoryProvider(properties);
ReactiveMessageSender sender = createMessageSender(appName, provider, properties, converter);
ReactiveMessageListener listener = createMessageListener(appName, provider, props);
connectionManager.addDomain(domain, listener, sender);
connectionManager.addDomain(domain, listener, sender, provider);
});
return connectionManager;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package org.reactivecommons.async.rabbit.health;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import org.reactivecommons.async.rabbit.config.ConnectionFactoryProvider;
import org.reactivecommons.async.rabbit.config.ConnectionManager;
import org.springframework.boot.actuate.health.AbstractReactiveHealthIndicator;
import org.springframework.boot.actuate.health.Health;
import reactor.core.publisher.Mono;

import java.util.stream.Collectors;

@RequiredArgsConstructor
public class DomainRabbitReactiveHealthIndicator extends AbstractReactiveHealthIndicator {
private static final String VERSION = "version";
private final ConnectionManager manager;

@Override
protected Mono<Health> doHealthCheck(Health.Builder builder) {
return Mono.zip(manager.getProviders()
.entrySet()
.stream()
.map(entry -> checkSingle(entry.getKey(), entry.getValue().getProvider()))
.collect(Collectors.toList()), this::merge);
}

private Health merge(Object[] results) {
Health.Builder builder = Health.up();
for (Object obj : results) {
Status status = (Status) obj;
builder.withDetail(status.getDomain(), status.getVersion());
}
return builder.build();
}

private Mono<Status> checkSingle(String domain, ConnectionFactoryProvider provider) {
return Mono.defer(() -> getVersion(provider))
.map(version -> Status.builder().version(version).domain(domain).build());
}

private Mono<String> getVersion(ConnectionFactoryProvider provider) {
return Mono.just(provider)
.map(ConnectionFactoryProvider::getConnectionFactory)
.map(this::getRawVersion);
}

@SneakyThrows
private String getRawVersion(ConnectionFactory factory) {
try (Connection connection = factory.newConnection()) {
return connection.getServerProperties().get(VERSION).toString();
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package org.reactivecommons.async.rabbit.health;

import lombok.Builder;
import lombok.Getter;

@Getter
@Builder
public class Status {
private final String version;
private final String domain;
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void init() throws NoSuchFieldException, IllegalAccessException {
when(receiver.consumeManualAck(any(String.class), any(ConsumeOptions.class))).thenReturn(Flux.never());
when(listener.getReceiver()).thenReturn(receiver);
when(listener.getMaxConcurrency()).thenReturn(20);
manager.addDomain(DEFAULT_DOMAIN, listener, null);
manager.addDomain(DEFAULT_DOMAIN, listener, null, null);
handlers.add(DEFAULT_DOMAIN, handlerResolver);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void init() {
when(listener.getReceiver()).thenReturn(receiver);
when(listener.getMaxConcurrency()).thenReturn(20);
connectionManager = new ConnectionManager();
connectionManager.addDomain(HandlerRegistry.DEFAULT_DOMAIN, listener, sender);
connectionManager.addDomain(HandlerRegistry.DEFAULT_DOMAIN, listener, sender, null);
handlers.add(HandlerRegistry.DEFAULT_DOMAIN, handlerResolver);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void init() {
when(receiver.consumeManualAck(any(String.class), any(ConsumeOptions.class))).thenReturn(Flux.never());
when(listener.getReceiver()).thenReturn(receiver);
when(listener.getMaxConcurrency()).thenReturn(20);
manager.addDomain(DEFAULT_DOMAIN, listener, null);
manager.addDomain(DEFAULT_DOMAIN, listener, null, null);
handlers.add(DEFAULT_DOMAIN, handlerResolver);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void init() {
when(receiver.consumeManualAck(any(String.class), any(ConsumeOptions.class))).thenReturn(Flux.never());
when(listener.getReceiver()).thenReturn(receiver);
when(listener.getMaxConcurrency()).thenReturn(20);
manager.addDomain(DEFAULT_DOMAIN, listener, sender);
manager.addDomain(DEFAULT_DOMAIN, listener, sender, null);
handlers.add(DEFAULT_DOMAIN, handlerResolver);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.reactivecommons.async.rabbit.config.ConnectionFactoryProvider;
import org.reactivecommons.async.rabbit.config.ConnectionManager;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.Health.Builder;
import org.springframework.boot.actuate.health.Status;
Expand All @@ -22,20 +22,26 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.when;
import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_DOMAIN;

@ExtendWith(MockitoExtension.class)
public class RabbitReactiveHealthIndicatorTest {
public class DomainRabbitReactiveHealthIndicatorTest {
@Mock
private ConnectionFactoryProvider provider;
@Mock
private ConnectionFactory factory;
@Mock
private Connection connection;
@InjectMocks
private RabbitReactiveHealthIndicator indicator;

private DomainRabbitReactiveHealthIndicator indicator;

@BeforeEach
void setup() {
ConnectionManager connectionManager = new ConnectionManager();
connectionManager.addDomain(DEFAULT_DOMAIN, null, null, provider);
connectionManager.addDomain("domain2", null, null, provider);
connectionManager.addDomain("domain3", null, null, provider);
indicator = new DomainRabbitReactiveHealthIndicator(connectionManager);
when(provider.getConnectionFactory()).thenReturn(factory);
}

Expand All @@ -51,7 +57,9 @@ void shouldBeUp() throws IOException, TimeoutException {
// Assert
StepVerifier.create(result)
.assertNext(health -> {
assertEquals("1.2.3", health.getDetails().get("version"));
assertEquals("1.2.3", health.getDetails().get(DEFAULT_DOMAIN));
assertEquals("1.2.3", health.getDetails().get("domain2"));
assertEquals("1.2.3", health.getDetails().get("domain3"));
assertEquals(Status.UP, health.getStatus());
})
.verifyComplete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public String getCommandsQueue() {
public String getReplyQueue() {
final String name = replyQueueName.get();
if (name == null) {
final String replyName = NameGenerator.generateNameFrom(appName);
final String replyName = NameGenerator.generateNameFrom(appName, "reply");
if (replyQueueName.compareAndSet(null, replyName)) {
return replyName;
} else {
Expand Down
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
version=2.0.4
springBootVersion=3.0.2
version=2.0.5
springBootVersion=3.2.3
reactorRabbitVersion=1.5.5
gradleVersionsVersion=0.36.0
toPublish=async-commons,async-commons-api,async-commons-rabbit-standalone,async-commons-rabbit-starter,async-commons-rabbit-starter-eda,domain-events-api,async-rabbit
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apply plugin: 'org.springframework.boot'

dependencies {
implementation project(":async-commons-rabbit-starter-eda")
implementation project(":async-commons-rabbit-starter")
implementation 'org.springframework.boot:spring-boot-starter'
runtimeOnly 'org.springframework.boot:spring-boot-devtools'
implementation 'io.cloudevents:cloudevents-core:2.5.0'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package sample;

import io.cloudevents.CloudEvent;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.RequiredArgsConstructor;
Expand All @@ -13,6 +12,7 @@
import org.reactivecommons.async.impl.config.annotations.EnableDirectAsyncGateway;
import org.reactivecommons.async.impl.config.annotations.EnableDomainEventBus;
import org.reactivecommons.async.impl.config.annotations.EnableEventListeners;
import org.reactivecommons.async.impl.config.annotations.EnableNotificationListener;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
Expand All @@ -24,6 +24,7 @@

@SpringBootApplication
@EnableEventListeners
@EnableNotificationListener
@EnableCommandListeners
@EnableDomainEventBus
@EnableDirectAsyncGateway
Expand Down Expand Up @@ -79,6 +80,10 @@ public HandlerRegistry handlerRegistrySubs(/*DirectAsyncGateway gateway*/UseCase
log.info(message.getData());
return Mono.empty();
}, String.class)
.listenNotificationEvent("sample.event", message -> {
log.info(message.getData());
return Mono.empty();
}, String.class)
// .serveQuery("query1", message -> {
// log.info("resolving from direct query" + message);
// Map<String, String> mapData = Map.of("1", "data");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import lombok.Data;
import lombok.NoArgsConstructor;
import org.reactivecommons.api.domain.Command;
import org.reactivecommons.api.domain.DomainEvent;
import org.reactivecommons.api.domain.DomainEventBus;
import org.reactivecommons.async.api.AsyncQuery;
import org.reactivecommons.async.api.DirectAsyncGateway;
Expand Down Expand Up @@ -52,17 +53,18 @@ public Mono<CloudEvent> sampleService(@RequestBody Call call) throws JsonProcess

return directAsyncGateway.requestReply(query, target, CloudEvent.class, "accounts");
}

@PostMapping(path = "/sample/event", consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE)
public Mono<String> sampleServiceEvent(@RequestBody Call call) throws JsonProcessingException {
// AsyncQuery<?> query = new AsyncQuery<>(queryName, call);
CloudEvent event = CloudEventBuilder.v1() //
.withId(UUID.randomUUID().toString()) //
.withSource(URI.create("https://spring.io/foos"))//
.withType("event") //
.withTime(OffsetDateTime.now())
.withData("application/json", CloudEventBuilderExt.asBytes(call))
.build();

// CloudEvent event = CloudEventBuilder.v1() //
// .withId(UUID.randomUUID().toString()) //
// .withSource(URI.create("https://spring.io/foos"))//
// .withType("event") //
// .withTime(OffsetDateTime.now())
// .withData("application/json", CloudEventBuilderExt.asBytes(call))
// .build();
DomainEvent<String> event = new DomainEvent<>("sample.event", UUID.randomUUID().toString(), "hello");
return Mono.from(domainEventBus.emit(event)).thenReturn("event");
}

Expand All @@ -78,9 +80,9 @@ public Mono<String> sampleServiceCommand(/*@RequestBody Call call*/@RequestParam
// .build();



return directAsyncGateway.sendCommand(new Command<String>("unlock", "jhkj", "userId"), target, delay).thenReturn("command");
}

@PostMapping(path = "/sample/match", consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE)
public Mono<RespQuery1> sampleServices(@RequestBody Call call) {
AsyncQuery<?> query = new AsyncQuery<>("sample.query.any.that.matches", call);
Expand Down

0 comments on commit 788a912

Please sign in to comment.