Skip to content

Commit

Permalink
:fix: minor fixes
Browse files Browse the repository at this point in the history
Signed-off-by: riccardomodanese <[email protected]>
  • Loading branch information
riccardomodanese committed Oct 8, 2024
1 parent 86f628e commit 09a9f9e
Show file tree
Hide file tree
Showing 16 changed files with 183 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@
import org.eclipse.kapua.broker.artemis.plugin.security.setting.BrokerSetting;
import org.eclipse.kapua.broker.artemis.plugin.security.setting.BrokerSettingKey;
import org.eclipse.kapua.client.security.KapuaMessageListener;
import org.eclipse.kapua.client.security.MessageHelper;
import org.eclipse.kapua.client.security.ServiceClient;
import org.eclipse.kapua.client.security.ServiceClientMessagingImpl;
import org.eclipse.kapua.client.security.amqpclient.Client;
import org.eclipse.kapua.client.security.client.Client;
import org.eclipse.kapua.commons.cache.LocalCache;
import org.eclipse.kapua.commons.core.AbstractKapuaModule;
import org.eclipse.kapua.commons.setting.system.SystemSetting;
Expand Down Expand Up @@ -72,8 +73,9 @@ ServiceClient authServiceClient(
@Named("clusterName") String clusterName,
@Named("brokerHost") String brokerHost,
@Named("serviceBusClient") Client client,
SystemSetting systemSetting) {
return new ServiceClientMessagingImpl(messageListener, client);
SystemSetting systemSetting,
MessageHelper messageHelper) {
return new ServiceClientMessagingImpl(messageListener, client, messageHelper);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import org.eclipse.kapua.KapuaErrorCodes;
import org.eclipse.kapua.KapuaRuntimeException;
import org.eclipse.kapua.client.security.KapuaMessageListener;
import org.eclipse.kapua.client.security.amqpclient.Client;
import org.eclipse.kapua.client.security.amqpclient.ClientAMQP;
import org.eclipse.kapua.client.security.amqpclient.ClientAMQP.DestinationType;
import org.eclipse.kapua.client.security.amqp.ClientAMQP;
import org.eclipse.kapua.client.security.amqp.ClientAMQP.DestinationType;
import org.eclipse.kapua.client.security.client.Client;
import org.eclipse.kapua.commons.core.AbstractKapuaModule;
import org.eclipse.kapua.commons.setting.system.SystemSetting;
import org.eclipse.kapua.commons.setting.system.SystemSettingKey;
Expand Down
9 changes: 9 additions & 0 deletions client/security/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@

<artifactId>kapua-client-security</artifactId>

<properties>
<kafka-clients.version>3.4.1</kafka-clients.version>
</properties>

<dependencies>
<!-- ActiveMQ Artemis -->
<dependency>
Expand All @@ -48,6 +52,11 @@
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-jms-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka-clients.version}</version>
</dependency>

<dependency>
<groupId>org.eclipse.kapua</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,17 @@
import java.util.concurrent.atomic.AtomicInteger;

import javax.inject.Singleton;
import javax.jms.JMSException;
import javax.jms.Message;

import org.apache.qpid.jms.message.JmsTextMessage;
import org.eclipse.kapua.KapuaErrorCodes;
import org.eclipse.kapua.KapuaRuntimeException;
import org.eclipse.kapua.client.security.ServiceClient.SecurityAction;
import org.eclipse.kapua.client.security.amqpclient.ClientMessageListener;
import org.eclipse.kapua.client.security.bean.AuthResponse;
import org.eclipse.kapua.client.security.bean.EntityResponse;
import org.eclipse.kapua.client.security.bean.MessageConstants;
import org.eclipse.kapua.client.security.bean.Response;
import org.eclipse.kapua.client.security.bean.ResponseContainer;
import org.eclipse.kapua.client.security.client.Message;
import org.eclipse.kapua.client.security.client.MessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -42,7 +40,7 @@
* This class is responsible to correlate request/response messages. Only one instance of this must be present at any given time!
*/
@Singleton
public class KapuaMessageListener extends ClientMessageListener implements Closeable {
public class KapuaMessageListener implements MessageListener, Closeable {

protected static Logger logger = LoggerFactory.getLogger(KapuaMessageListener.class);
//Should only be one
Expand Down Expand Up @@ -70,27 +68,27 @@ public class KapuaMessageListener extends ClientMessageListener implements Close
public void onMessage(Message message) {
logger.debug("KapuaMessageListener processing message, instance {} responding", currentInstanceNumber);
try {
SecurityAction securityAction = SecurityAction.valueOf(message.getStringProperty(MessageConstants.HEADER_ACTION));
SecurityAction securityAction = SecurityAction.valueOf((String)message.getProperties().get(MessageConstants.HEADER_ACTION));
switch (securityAction) {
case brokerConnect:
updateResponseContainer(buildAuthResponseFromMessage((JmsTextMessage) message));
updateResponseContainer(buildAuthResponseFromMessage(message));
break;
case brokerDisconnect:
updateResponseContainer(buildAuthResponseFromMessage((JmsTextMessage) message));
updateResponseContainer(buildAuthResponseFromMessage(message));
break;
case getEntity:
updateResponseContainer(buildAccountResponseFromMessage((JmsTextMessage) message));
updateResponseContainer(buildAccountResponseFromMessage(message));
break;
default:
throw new KapuaRuntimeException(KapuaErrorCodes.ILLEGAL_ARGUMENT, "action");
}
} catch (JMSException | IOException e) {
} catch (IOException e) {
metrics.getLoginCallbackError().inc();
logger.error("Error while processing Authentication/Authorization message: {}", e.getMessage(), e);
}
}

private <R extends Response> void updateResponseContainer(R response) throws JMSException, IOException {
private <R extends Response> void updateResponseContainer(R response) throws IOException {
logger.debug("update callback {} on instance {}, map size: {}", response.getRequestId(), this, CALLBACKS.size());
ResponseContainer<R> responseContainer = (ResponseContainer<R>) CALLBACKS.get(response.getRequestId());
if (responseContainer == null) {
Expand All @@ -105,14 +103,12 @@ private <R extends Response> void updateResponseContainer(R response) throws JMS
}
}

private AuthResponse buildAuthResponseFromMessage(JmsTextMessage message) throws JMSException, IOException {
String body = message.getBody(String.class);
return reader.readValue(body, AuthResponse.class);
private AuthResponse buildAuthResponseFromMessage(Message message) throws IOException {
return reader.readValue(message.getBody(), AuthResponse.class);
}

private EntityResponse buildAccountResponseFromMessage(JmsTextMessage message) throws JMSException, IOException {
String body = message.getBody(String.class);
return reader.readValue(body, EntityResponse.class);
private EntityResponse buildAccountResponseFromMessage(Message message) throws IOException {
return reader.readValue(message.getBody(), EntityResponse.class);
}

public void registerCallback(String requestId, ResponseContainer<?> responseContainer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,43 +16,45 @@
import java.util.Map;
import java.util.UUID;

import javax.inject.Singleton;
import javax.jms.JMSException;

import org.eclipse.kapua.client.security.bean.EntityRequest;
import org.eclipse.kapua.client.security.bean.MessageConstants;
import org.eclipse.kapua.client.security.amqpclient.Message;
import org.eclipse.kapua.client.security.client.Message;
import org.eclipse.kapua.client.security.bean.AuthRequest;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;

@Singleton
public class MessageHelper {

private static ObjectMapper mapper = new ObjectMapper();
private static ObjectWriter writer = mapper.writer();//check if it's thread safe
private ObjectMapper mapper = new ObjectMapper();
private ObjectWriter writer = mapper.writer();//check if it's thread safe

private MessageHelper() {
}

static Message getBrokerConnectMessage(AuthRequest authRequest) throws Exception {
public Message getBrokerConnectMessage(AuthRequest authRequest) throws Exception {
return new Message(
"SYS.SVC.auth.request",
authRequest!=null ? writer.writeValueAsString(authRequest) : "",
buildBaseMessage(authRequest));
}

static Message getBrokerDisconnectMessage(AuthRequest authRequest) throws Exception {
public Message getBrokerDisconnectMessage(AuthRequest authRequest) throws Exception {
return new Message(
"SYS.SVC.auth.request",
authRequest!=null ? writer.writeValueAsString(authRequest) : "",
buildBaseMessage(authRequest));
}

static Message getEntityMessage(EntityRequest entityRequest) throws Exception {
public Message getEntityMessage(EntityRequest entityRequest) throws Exception {
return new Message(
"SYS.SVC.auth.request",
entityRequest!=null ? writer.writeValueAsString(entityRequest) : "",
buildBaseMessage(entityRequest));
}

static Map<String, Object> buildBaseMessage(AuthRequest authRequest) throws JMSException {
private Map<String, Object> buildBaseMessage(AuthRequest authRequest) throws JMSException {
Map<String, Object> properties = new HashMap<>();
properties.put(MessageConstants.HEADER_REQUEST_ID, authRequest.getRequestId());
properties.put(MessageConstants.HEADER_ACTION, authRequest.getAction());
Expand All @@ -63,15 +65,15 @@ static Map<String, Object> buildBaseMessage(AuthRequest authRequest) throws JMSE
return properties;
}

static Map<String, Object> buildBaseMessage(EntityRequest entityRequest) throws JMSException {
private Map<String, Object> buildBaseMessage(EntityRequest entityRequest) throws JMSException {
Map<String, Object> properties = new HashMap<>();
properties.put(MessageConstants.HEADER_REQUEST_ID, entityRequest.getRequestId());
properties.put(MessageConstants.HEADER_ACTION, entityRequest.getAction());
properties.put(MessageConstants.HEADER_NAME, entityRequest.getName());
return properties;
}

static String getNewRequestId() {
public String getNewRequestId() {
return UUID.randomUUID().toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@
*******************************************************************************/
package org.eclipse.kapua.client.security;

import org.eclipse.kapua.client.security.amqpclient.Client;
import org.eclipse.kapua.client.security.bean.AuthRequest;
import org.eclipse.kapua.client.security.bean.AuthResponse;
import org.eclipse.kapua.client.security.bean.EntityRequest;
import org.eclipse.kapua.client.security.bean.EntityResponse;
import org.eclipse.kapua.client.security.bean.Request;
import org.eclipse.kapua.client.security.bean.ResponseContainer;
import org.eclipse.kapua.client.security.client.Client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -31,22 +31,24 @@ public class ServiceClientMessagingImpl implements ServiceClient {

private static final int TIMEOUT = 5000;
private final KapuaMessageListener messageListener;
private MessageHelper messageHelper;

private Client client;

public ServiceClientMessagingImpl(KapuaMessageListener messageListener, Client client) {
public ServiceClientMessagingImpl(KapuaMessageListener messageListener, Client client, MessageHelper messageHelper) {
this.messageListener = messageListener;
this.client = client;
this.messageHelper = messageHelper;
}

@Override
public AuthResponse brokerConnect(AuthRequest authRequest) throws Exception {
String requestId = MessageHelper.getNewRequestId();
String requestId = messageHelper.getNewRequestId();
authRequest.setRequestId(requestId);
authRequest.setAction(SecurityAction.brokerConnect.name());
ResponseContainer<AuthResponse> responseContainer = ResponseContainer.createAnRegisterNewMessageContainer(messageListener, authRequest);
logRequest(authRequest);
client.sendMessage(MessageHelper.getBrokerConnectMessage(authRequest));
client.sendMessage(messageHelper.getBrokerConnectMessage(authRequest));
synchronized (responseContainer) {
responseContainer.wait(TIMEOUT);
}
Expand All @@ -56,12 +58,12 @@ public AuthResponse brokerConnect(AuthRequest authRequest) throws Exception {

@Override
public AuthResponse brokerDisconnect(AuthRequest authRequest) throws Exception {
String requestId = MessageHelper.getNewRequestId();
String requestId = messageHelper.getNewRequestId();
authRequest.setRequestId(requestId);
authRequest.setAction(SecurityAction.brokerDisconnect.name());
ResponseContainer<AuthResponse> responseContainer = ResponseContainer.createAnRegisterNewMessageContainer(messageListener, authRequest);
logRequest(authRequest);
client.sendMessage(MessageHelper.getBrokerDisconnectMessage(authRequest));
client.sendMessage(messageHelper.getBrokerDisconnectMessage(authRequest));
synchronized (responseContainer) {
responseContainer.wait(TIMEOUT);
}
Expand All @@ -71,11 +73,11 @@ public AuthResponse brokerDisconnect(AuthRequest authRequest) throws Exception {

@Override
public EntityResponse getEntity(EntityRequest entityRequest) throws Exception {
String requestId = MessageHelper.getNewRequestId();
String requestId = messageHelper.getNewRequestId();
entityRequest.setRequestId(requestId);
ResponseContainer<EntityResponse> responseContainer = ResponseContainer.createAnRegisterNewMessageContainer(messageListener, entityRequest);
logRequest(entityRequest);
client.sendMessage(MessageHelper.getEntityMessage(entityRequest));
client.sendMessage(messageHelper.getEntityMessage(entityRequest));
synchronized (responseContainer) {
responseContainer.wait(TIMEOUT);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@
* Contributors:
* Eurotech - initial API and implementation
*******************************************************************************/
package org.eclipse.kapua.client.security.amqpclient;
package org.eclipse.kapua.client.security.amqp;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
Expand All @@ -23,6 +27,9 @@
import javax.jms.TextMessage;

import org.apache.qpid.jms.JmsConnectionFactory;
import org.eclipse.kapua.client.security.KapuaMessageListener;
import org.eclipse.kapua.client.security.client.Client;
import org.eclipse.kapua.client.security.client.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -51,19 +58,19 @@ public enum DestinationType {
private String requestAddress;
private String responseAddress;
private DestinationType destinationType;
private ClientMessageListener clientMessageListener;
private KapuaMessageListener kapuaMessageListener;
private ExceptionListener exceptionListener;

private boolean active;
private boolean connectionStatus;

public ClientAMQP(String username, String password, String url, String clientId,
String requestAddress, String responseAddress, DestinationType destinationType, ClientMessageListener clientMessageListener) throws JMSException {
String requestAddress, String responseAddress, DestinationType destinationType, KapuaMessageListener kapuaMessageListener) throws JMSException {
this.clientId = clientId;
this.requestAddress = requestAddress;
this.responseAddress = responseAddress;
this.destinationType = destinationType;
this.clientMessageListener = clientMessageListener;
this.kapuaMessageListener = kapuaMessageListener;
connectionFactory = new JmsConnectionFactory(username, password, url);
exceptionListener = new ExceptionListener() {

Expand Down Expand Up @@ -142,10 +149,17 @@ private void connect() {
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
logger.info("AMQP client binding message listener to: {}", responseAddress);
consumer = session.createConsumer(createDestination(responseAddress));
consumer.setMessageListener(clientMessageListener);
consumer.setMessageListener(message -> {
try {
kapuaMessageListener.onMessage(toMessage(message));
} catch (JMSException e) {
//nothing to do, just log
//TODO add metric???
logger.error("", e);
}
});
logger.info("AMQP client binding request sender to: {}", requestAddress);
producer = session.createProducer(createDestination(requestAddress));
clientMessageListener.init(session, producer);
connectionStatus = true;
logger.info("Service client {} - restarting attempt... {} DONE (Connection restored)", this, connectAttempt);
} catch (JMSException e) {
Expand All @@ -157,6 +171,28 @@ private void connect() {
}
}

private Message toMessage(javax.jms.Message message) throws JMSException {
return new Message(
message.getJMSDestination().toString(),
message.getBody(String.class),
convertToProperties(message));
}

private Map<String, Object> convertToProperties(javax.jms.Message message) throws JMSException {
Map<String, Object> map = new HashMap<String, Object>();
((Iterator<String>)message.getPropertyNames().asIterator()).forEachRemaining(str -> putProperty(map, message, str));
return map;
}

private void putProperty(Map<String, Object> map, javax.jms.Message message, String key) {
try {
map.put((String)key, message.getObjectProperty((String)key));
} catch (JMSException e) {
//nothing to do
logger.warn("Cannot get property {} value. Error: {}", key, e.getMessage());
}
}

private Destination createDestination(String address) throws JMSException {
if (DestinationType.queue.equals(destinationType)) {
return session.createQueue(address);
Expand Down
Loading

0 comments on commit 09a9f9e

Please sign in to comment.