From 09a9f9e5683b961ec5414a6bef72b7505274eec0 Mon Sep 17 00:00:00 2001 From: riccardomodanese Date: Tue, 8 Oct 2024 17:46:01 +0200 Subject: [PATCH] :fix: minor fixes Signed-off-by: riccardomodanese --- .../security/ArtemisSecurityModule.java | 8 +-- .../security/ArtemisSecurityModuleClient.java | 6 +-- client/security/pom.xml | 9 ++++ .../client/security/KapuaMessageListener.java | 30 +++++------- .../kapua/client/security/MessageHelper.java | 26 +++++----- .../security/ServiceClientMessagingImpl.java | 18 ++++--- .../{amqpclient => amqp}/ClientAMQP.java | 48 +++++++++++++++--- .../amqpclient/ClientMessageListener.java | 36 -------------- .../{amqpclient => client}/Client.java | 2 +- .../{amqpclient => client}/Message.java | 10 +++- .../security/client/MessageListener.java | 19 +++++++ .../commons/event/jms/JMSServiceEventBus.java | 49 +++++-------------- .../kapua/commons/event/jms/Subscription.java | 41 ++++++++++++++++ .../kapua/commons/security/KapuaSession.java | 2 +- .../eclipse/kapua/event/ServiceEventBus.java | 4 +- .../AuthenticationServiceConverter.java | 7 +-- 16 files changed, 183 insertions(+), 132 deletions(-) rename client/security/src/main/java/org/eclipse/kapua/client/security/{amqpclient => amqp}/ClientAMQP.java (78%) delete mode 100644 client/security/src/main/java/org/eclipse/kapua/client/security/amqpclient/ClientMessageListener.java rename client/security/src/main/java/org/eclipse/kapua/client/security/{amqpclient => client}/Client.java (93%) rename client/security/src/main/java/org/eclipse/kapua/client/security/{amqpclient => client}/Message.java (75%) create mode 100644 client/security/src/main/java/org/eclipse/kapua/client/security/client/MessageListener.java create mode 100644 commons/src/main/java/org/eclipse/kapua/commons/event/jms/Subscription.java diff --git a/broker/artemis/plugin/src/main/java/org/eclipse/kapua/broker/artemis/plugin/security/ArtemisSecurityModule.java b/broker/artemis/plugin/src/main/java/org/eclipse/kapua/broker/artemis/plugin/security/ArtemisSecurityModule.java index 993da3be9b2..3ab1a268d4c 100644 --- a/broker/artemis/plugin/src/main/java/org/eclipse/kapua/broker/artemis/plugin/security/ArtemisSecurityModule.java +++ b/broker/artemis/plugin/src/main/java/org/eclipse/kapua/broker/artemis/plugin/security/ArtemisSecurityModule.java @@ -20,9 +20,10 @@ import org.eclipse.kapua.broker.artemis.plugin.security.setting.BrokerSetting; import org.eclipse.kapua.broker.artemis.plugin.security.setting.BrokerSettingKey; import org.eclipse.kapua.client.security.KapuaMessageListener; +import org.eclipse.kapua.client.security.MessageHelper; import org.eclipse.kapua.client.security.ServiceClient; import org.eclipse.kapua.client.security.ServiceClientMessagingImpl; -import org.eclipse.kapua.client.security.amqpclient.Client; +import org.eclipse.kapua.client.security.client.Client; import org.eclipse.kapua.commons.cache.LocalCache; import org.eclipse.kapua.commons.core.AbstractKapuaModule; import org.eclipse.kapua.commons.setting.system.SystemSetting; @@ -72,8 +73,9 @@ ServiceClient authServiceClient( @Named("clusterName") String clusterName, @Named("brokerHost") String brokerHost, @Named("serviceBusClient") Client client, - SystemSetting systemSetting) { - return new ServiceClientMessagingImpl(messageListener, client); + SystemSetting systemSetting, + MessageHelper messageHelper) { + return new ServiceClientMessagingImpl(messageListener, client, messageHelper); } } diff --git a/broker/artemis/plugin/src/main/java/org/eclipse/kapua/broker/artemis/plugin/security/ArtemisSecurityModuleClient.java b/broker/artemis/plugin/src/main/java/org/eclipse/kapua/broker/artemis/plugin/security/ArtemisSecurityModuleClient.java index 036ac81ce46..ac23f0f8736 100644 --- a/broker/artemis/plugin/src/main/java/org/eclipse/kapua/broker/artemis/plugin/security/ArtemisSecurityModuleClient.java +++ b/broker/artemis/plugin/src/main/java/org/eclipse/kapua/broker/artemis/plugin/security/ArtemisSecurityModuleClient.java @@ -21,9 +21,9 @@ import org.eclipse.kapua.KapuaErrorCodes; import org.eclipse.kapua.KapuaRuntimeException; import org.eclipse.kapua.client.security.KapuaMessageListener; -import org.eclipse.kapua.client.security.amqpclient.Client; -import org.eclipse.kapua.client.security.amqpclient.ClientAMQP; -import org.eclipse.kapua.client.security.amqpclient.ClientAMQP.DestinationType; +import org.eclipse.kapua.client.security.amqp.ClientAMQP; +import org.eclipse.kapua.client.security.amqp.ClientAMQP.DestinationType; +import org.eclipse.kapua.client.security.client.Client; import org.eclipse.kapua.commons.core.AbstractKapuaModule; import org.eclipse.kapua.commons.setting.system.SystemSetting; import org.eclipse.kapua.commons.setting.system.SystemSettingKey; diff --git a/client/security/pom.xml b/client/security/pom.xml index 6b49651d606..942baaa3d53 100644 --- a/client/security/pom.xml +++ b/client/security/pom.xml @@ -22,6 +22,10 @@ kapua-client-security + + 3.4.1 + + @@ -48,6 +52,11 @@ org.apache.qpid qpid-jms-client + + org.apache.kafka + kafka-clients + ${kafka-clients.version} + org.eclipse.kapua diff --git a/client/security/src/main/java/org/eclipse/kapua/client/security/KapuaMessageListener.java b/client/security/src/main/java/org/eclipse/kapua/client/security/KapuaMessageListener.java index 7610e1ab5d8..e629f1e4faf 100644 --- a/client/security/src/main/java/org/eclipse/kapua/client/security/KapuaMessageListener.java +++ b/client/security/src/main/java/org/eclipse/kapua/client/security/KapuaMessageListener.java @@ -19,19 +19,17 @@ import java.util.concurrent.atomic.AtomicInteger; import javax.inject.Singleton; -import javax.jms.JMSException; -import javax.jms.Message; -import org.apache.qpid.jms.message.JmsTextMessage; import org.eclipse.kapua.KapuaErrorCodes; import org.eclipse.kapua.KapuaRuntimeException; import org.eclipse.kapua.client.security.ServiceClient.SecurityAction; -import org.eclipse.kapua.client.security.amqpclient.ClientMessageListener; import org.eclipse.kapua.client.security.bean.AuthResponse; import org.eclipse.kapua.client.security.bean.EntityResponse; import org.eclipse.kapua.client.security.bean.MessageConstants; import org.eclipse.kapua.client.security.bean.Response; import org.eclipse.kapua.client.security.bean.ResponseContainer; +import org.eclipse.kapua.client.security.client.Message; +import org.eclipse.kapua.client.security.client.MessageListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,7 +40,7 @@ * This class is responsible to correlate request/response messages. Only one instance of this must be present at any given time! */ @Singleton -public class KapuaMessageListener extends ClientMessageListener implements Closeable { +public class KapuaMessageListener implements MessageListener, Closeable { protected static Logger logger = LoggerFactory.getLogger(KapuaMessageListener.class); //Should only be one @@ -70,27 +68,27 @@ public class KapuaMessageListener extends ClientMessageListener implements Close public void onMessage(Message message) { logger.debug("KapuaMessageListener processing message, instance {} responding", currentInstanceNumber); try { - SecurityAction securityAction = SecurityAction.valueOf(message.getStringProperty(MessageConstants.HEADER_ACTION)); + SecurityAction securityAction = SecurityAction.valueOf((String)message.getProperties().get(MessageConstants.HEADER_ACTION)); switch (securityAction) { case brokerConnect: - updateResponseContainer(buildAuthResponseFromMessage((JmsTextMessage) message)); + updateResponseContainer(buildAuthResponseFromMessage(message)); break; case brokerDisconnect: - updateResponseContainer(buildAuthResponseFromMessage((JmsTextMessage) message)); + updateResponseContainer(buildAuthResponseFromMessage(message)); break; case getEntity: - updateResponseContainer(buildAccountResponseFromMessage((JmsTextMessage) message)); + updateResponseContainer(buildAccountResponseFromMessage(message)); break; default: throw new KapuaRuntimeException(KapuaErrorCodes.ILLEGAL_ARGUMENT, "action"); } - } catch (JMSException | IOException e) { + } catch (IOException e) { metrics.getLoginCallbackError().inc(); logger.error("Error while processing Authentication/Authorization message: {}", e.getMessage(), e); } } - private void updateResponseContainer(R response) throws JMSException, IOException { + private void updateResponseContainer(R response) throws IOException { logger.debug("update callback {} on instance {}, map size: {}", response.getRequestId(), this, CALLBACKS.size()); ResponseContainer responseContainer = (ResponseContainer) CALLBACKS.get(response.getRequestId()); if (responseContainer == null) { @@ -105,14 +103,12 @@ private void updateResponseContainer(R response) throws JMS } } - private AuthResponse buildAuthResponseFromMessage(JmsTextMessage message) throws JMSException, IOException { - String body = message.getBody(String.class); - return reader.readValue(body, AuthResponse.class); + private AuthResponse buildAuthResponseFromMessage(Message message) throws IOException { + return reader.readValue(message.getBody(), AuthResponse.class); } - private EntityResponse buildAccountResponseFromMessage(JmsTextMessage message) throws JMSException, IOException { - String body = message.getBody(String.class); - return reader.readValue(body, EntityResponse.class); + private EntityResponse buildAccountResponseFromMessage(Message message) throws IOException { + return reader.readValue(message.getBody(), EntityResponse.class); } public void registerCallback(String requestId, ResponseContainer responseContainer) { diff --git a/client/security/src/main/java/org/eclipse/kapua/client/security/MessageHelper.java b/client/security/src/main/java/org/eclipse/kapua/client/security/MessageHelper.java index b1b5cc42ef4..d6f7d7fbbc7 100644 --- a/client/security/src/main/java/org/eclipse/kapua/client/security/MessageHelper.java +++ b/client/security/src/main/java/org/eclipse/kapua/client/security/MessageHelper.java @@ -16,43 +16,45 @@ import java.util.Map; import java.util.UUID; +import javax.inject.Singleton; import javax.jms.JMSException; import org.eclipse.kapua.client.security.bean.EntityRequest; import org.eclipse.kapua.client.security.bean.MessageConstants; -import org.eclipse.kapua.client.security.amqpclient.Message; +import org.eclipse.kapua.client.security.client.Message; import org.eclipse.kapua.client.security.bean.AuthRequest; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectWriter; +@Singleton public class MessageHelper { - private static ObjectMapper mapper = new ObjectMapper(); - private static ObjectWriter writer = mapper.writer();//check if it's thread safe + private ObjectMapper mapper = new ObjectMapper(); + private ObjectWriter writer = mapper.writer();//check if it's thread safe - private MessageHelper() { - } - - static Message getBrokerConnectMessage(AuthRequest authRequest) throws Exception { + public Message getBrokerConnectMessage(AuthRequest authRequest) throws Exception { return new Message( + "SYS.SVC.auth.request", authRequest!=null ? writer.writeValueAsString(authRequest) : "", buildBaseMessage(authRequest)); } - static Message getBrokerDisconnectMessage(AuthRequest authRequest) throws Exception { + public Message getBrokerDisconnectMessage(AuthRequest authRequest) throws Exception { return new Message( + "SYS.SVC.auth.request", authRequest!=null ? writer.writeValueAsString(authRequest) : "", buildBaseMessage(authRequest)); } - static Message getEntityMessage(EntityRequest entityRequest) throws Exception { + public Message getEntityMessage(EntityRequest entityRequest) throws Exception { return new Message( + "SYS.SVC.auth.request", entityRequest!=null ? writer.writeValueAsString(entityRequest) : "", buildBaseMessage(entityRequest)); } - static Map buildBaseMessage(AuthRequest authRequest) throws JMSException { + private Map buildBaseMessage(AuthRequest authRequest) throws JMSException { Map properties = new HashMap<>(); properties.put(MessageConstants.HEADER_REQUEST_ID, authRequest.getRequestId()); properties.put(MessageConstants.HEADER_ACTION, authRequest.getAction()); @@ -63,7 +65,7 @@ static Map buildBaseMessage(AuthRequest authRequest) throws JMSE return properties; } - static Map buildBaseMessage(EntityRequest entityRequest) throws JMSException { + private Map buildBaseMessage(EntityRequest entityRequest) throws JMSException { Map properties = new HashMap<>(); properties.put(MessageConstants.HEADER_REQUEST_ID, entityRequest.getRequestId()); properties.put(MessageConstants.HEADER_ACTION, entityRequest.getAction()); @@ -71,7 +73,7 @@ static Map buildBaseMessage(EntityRequest entityRequest) throws return properties; } - static String getNewRequestId() { + public String getNewRequestId() { return UUID.randomUUID().toString(); } } diff --git a/client/security/src/main/java/org/eclipse/kapua/client/security/ServiceClientMessagingImpl.java b/client/security/src/main/java/org/eclipse/kapua/client/security/ServiceClientMessagingImpl.java index 2b73108bce3..da1ffbb75e9 100644 --- a/client/security/src/main/java/org/eclipse/kapua/client/security/ServiceClientMessagingImpl.java +++ b/client/security/src/main/java/org/eclipse/kapua/client/security/ServiceClientMessagingImpl.java @@ -12,13 +12,13 @@ *******************************************************************************/ package org.eclipse.kapua.client.security; -import org.eclipse.kapua.client.security.amqpclient.Client; import org.eclipse.kapua.client.security.bean.AuthRequest; import org.eclipse.kapua.client.security.bean.AuthResponse; import org.eclipse.kapua.client.security.bean.EntityRequest; import org.eclipse.kapua.client.security.bean.EntityResponse; import org.eclipse.kapua.client.security.bean.Request; import org.eclipse.kapua.client.security.bean.ResponseContainer; +import org.eclipse.kapua.client.security.client.Client; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,22 +31,24 @@ public class ServiceClientMessagingImpl implements ServiceClient { private static final int TIMEOUT = 5000; private final KapuaMessageListener messageListener; + private MessageHelper messageHelper; private Client client; - public ServiceClientMessagingImpl(KapuaMessageListener messageListener, Client client) { + public ServiceClientMessagingImpl(KapuaMessageListener messageListener, Client client, MessageHelper messageHelper) { this.messageListener = messageListener; this.client = client; + this.messageHelper = messageHelper; } @Override public AuthResponse brokerConnect(AuthRequest authRequest) throws Exception { - String requestId = MessageHelper.getNewRequestId(); + String requestId = messageHelper.getNewRequestId(); authRequest.setRequestId(requestId); authRequest.setAction(SecurityAction.brokerConnect.name()); ResponseContainer responseContainer = ResponseContainer.createAnRegisterNewMessageContainer(messageListener, authRequest); logRequest(authRequest); - client.sendMessage(MessageHelper.getBrokerConnectMessage(authRequest)); + client.sendMessage(messageHelper.getBrokerConnectMessage(authRequest)); synchronized (responseContainer) { responseContainer.wait(TIMEOUT); } @@ -56,12 +58,12 @@ public AuthResponse brokerConnect(AuthRequest authRequest) throws Exception { @Override public AuthResponse brokerDisconnect(AuthRequest authRequest) throws Exception { - String requestId = MessageHelper.getNewRequestId(); + String requestId = messageHelper.getNewRequestId(); authRequest.setRequestId(requestId); authRequest.setAction(SecurityAction.brokerDisconnect.name()); ResponseContainer responseContainer = ResponseContainer.createAnRegisterNewMessageContainer(messageListener, authRequest); logRequest(authRequest); - client.sendMessage(MessageHelper.getBrokerDisconnectMessage(authRequest)); + client.sendMessage(messageHelper.getBrokerDisconnectMessage(authRequest)); synchronized (responseContainer) { responseContainer.wait(TIMEOUT); } @@ -71,11 +73,11 @@ public AuthResponse brokerDisconnect(AuthRequest authRequest) throws Exception { @Override public EntityResponse getEntity(EntityRequest entityRequest) throws Exception { - String requestId = MessageHelper.getNewRequestId(); + String requestId = messageHelper.getNewRequestId(); entityRequest.setRequestId(requestId); ResponseContainer responseContainer = ResponseContainer.createAnRegisterNewMessageContainer(messageListener, entityRequest); logRequest(entityRequest); - client.sendMessage(MessageHelper.getEntityMessage(entityRequest)); + client.sendMessage(messageHelper.getEntityMessage(entityRequest)); synchronized (responseContainer) { responseContainer.wait(TIMEOUT); } diff --git a/client/security/src/main/java/org/eclipse/kapua/client/security/amqpclient/ClientAMQP.java b/client/security/src/main/java/org/eclipse/kapua/client/security/amqp/ClientAMQP.java similarity index 78% rename from client/security/src/main/java/org/eclipse/kapua/client/security/amqpclient/ClientAMQP.java rename to client/security/src/main/java/org/eclipse/kapua/client/security/amqp/ClientAMQP.java index 43f6697bed3..c3e9b0b1af6 100644 --- a/client/security/src/main/java/org/eclipse/kapua/client/security/amqpclient/ClientAMQP.java +++ b/client/security/src/main/java/org/eclipse/kapua/client/security/amqp/ClientAMQP.java @@ -10,7 +10,11 @@ * Contributors: * Eurotech - initial API and implementation *******************************************************************************/ -package org.eclipse.kapua.client.security.amqpclient; +package org.eclipse.kapua.client.security.amqp; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; import javax.jms.Connection; import javax.jms.ConnectionFactory; @@ -23,6 +27,9 @@ import javax.jms.TextMessage; import org.apache.qpid.jms.JmsConnectionFactory; +import org.eclipse.kapua.client.security.KapuaMessageListener; +import org.eclipse.kapua.client.security.client.Client; +import org.eclipse.kapua.client.security.client.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,19 +58,19 @@ public enum DestinationType { private String requestAddress; private String responseAddress; private DestinationType destinationType; - private ClientMessageListener clientMessageListener; + private KapuaMessageListener kapuaMessageListener; private ExceptionListener exceptionListener; private boolean active; private boolean connectionStatus; public ClientAMQP(String username, String password, String url, String clientId, - String requestAddress, String responseAddress, DestinationType destinationType, ClientMessageListener clientMessageListener) throws JMSException { + String requestAddress, String responseAddress, DestinationType destinationType, KapuaMessageListener kapuaMessageListener) throws JMSException { this.clientId = clientId; this.requestAddress = requestAddress; this.responseAddress = responseAddress; this.destinationType = destinationType; - this.clientMessageListener = clientMessageListener; + this.kapuaMessageListener = kapuaMessageListener; connectionFactory = new JmsConnectionFactory(username, password, url); exceptionListener = new ExceptionListener() { @@ -142,10 +149,17 @@ private void connect() { session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); logger.info("AMQP client binding message listener to: {}", responseAddress); consumer = session.createConsumer(createDestination(responseAddress)); - consumer.setMessageListener(clientMessageListener); + consumer.setMessageListener(message -> { + try { + kapuaMessageListener.onMessage(toMessage(message)); + } catch (JMSException e) { + //nothing to do, just log + //TODO add metric??? + logger.error("", e); + } + }); logger.info("AMQP client binding request sender to: {}", requestAddress); producer = session.createProducer(createDestination(requestAddress)); - clientMessageListener.init(session, producer); connectionStatus = true; logger.info("Service client {} - restarting attempt... {} DONE (Connection restored)", this, connectAttempt); } catch (JMSException e) { @@ -157,6 +171,28 @@ private void connect() { } } + private Message toMessage(javax.jms.Message message) throws JMSException { + return new Message( + message.getJMSDestination().toString(), + message.getBody(String.class), + convertToProperties(message)); + } + + private Map convertToProperties(javax.jms.Message message) throws JMSException { + Map map = new HashMap(); + ((Iterator)message.getPropertyNames().asIterator()).forEachRemaining(str -> putProperty(map, message, str)); + return map; + } + + private void putProperty(Map map, javax.jms.Message message, String key) { + try { + map.put((String)key, message.getObjectProperty((String)key)); + } catch (JMSException e) { + //nothing to do + logger.warn("Cannot get property {} value. Error: {}", key, e.getMessage()); + } + } + private Destination createDestination(String address) throws JMSException { if (DestinationType.queue.equals(destinationType)) { return session.createQueue(address); diff --git a/client/security/src/main/java/org/eclipse/kapua/client/security/amqpclient/ClientMessageListener.java b/client/security/src/main/java/org/eclipse/kapua/client/security/amqpclient/ClientMessageListener.java deleted file mode 100644 index 38849365696..00000000000 --- a/client/security/src/main/java/org/eclipse/kapua/client/security/amqpclient/ClientMessageListener.java +++ /dev/null @@ -1,36 +0,0 @@ -/******************************************************************************* - * Copyright (c) 2021, 2022 Eurotech and/or its affiliates and others - * - * This program and the accompanying materials are made - * available under the terms of the Eclipse Public License 2.0 - * which is available at https://www.eclipse.org/legal/epl-2.0/ - * - * SPDX-License-Identifier: EPL-2.0 - * - * Contributors: - * Eurotech - initial API and implementation - *******************************************************************************/ -package org.eclipse.kapua.client.security.amqpclient; - -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; - -public abstract class ClientMessageListener implements MessageListener { - - protected Session session; - protected MessageProducer producer; - - /** - * Helpful method to enrich the lister with Session and MessageProducer useful in a request reply context - * (so the lister should send a reply once a message is received) - * - * @param session - * @param producer - */ - public void init(Session session, MessageProducer producer) { - this.session = session; - this.producer = producer; - } - -} diff --git a/client/security/src/main/java/org/eclipse/kapua/client/security/amqpclient/Client.java b/client/security/src/main/java/org/eclipse/kapua/client/security/client/Client.java similarity index 93% rename from client/security/src/main/java/org/eclipse/kapua/client/security/amqpclient/Client.java rename to client/security/src/main/java/org/eclipse/kapua/client/security/client/Client.java index 8a4bb42f2e6..f634f216687 100644 --- a/client/security/src/main/java/org/eclipse/kapua/client/security/amqpclient/Client.java +++ b/client/security/src/main/java/org/eclipse/kapua/client/security/client/Client.java @@ -10,7 +10,7 @@ * Contributors: * Eurotech - initial API and implementation *******************************************************************************/ -package org.eclipse.kapua.client.security.amqpclient; +package org.eclipse.kapua.client.security.client; /** * Client definition to handle request/reply through different messaging layers diff --git a/client/security/src/main/java/org/eclipse/kapua/client/security/amqpclient/Message.java b/client/security/src/main/java/org/eclipse/kapua/client/security/client/Message.java similarity index 75% rename from client/security/src/main/java/org/eclipse/kapua/client/security/amqpclient/Message.java rename to client/security/src/main/java/org/eclipse/kapua/client/security/client/Message.java index 258a330df0c..c0cb1d14305 100644 --- a/client/security/src/main/java/org/eclipse/kapua/client/security/amqpclient/Message.java +++ b/client/security/src/main/java/org/eclipse/kapua/client/security/client/Message.java @@ -10,20 +10,26 @@ * Contributors: * Eurotech - initial API and implementation *******************************************************************************/ -package org.eclipse.kapua.client.security.amqpclient; +package org.eclipse.kapua.client.security.client; import java.util.Map; public class Message { + private String destination; private Map properties; private String body; - public Message(String body, Map properties) { + public Message(String destination, String body, Map properties) { + this.destination = destination; this.body = body; this.properties = properties; } + public String getDestination() { + return destination; + } + public String getBody() { return body; } diff --git a/client/security/src/main/java/org/eclipse/kapua/client/security/client/MessageListener.java b/client/security/src/main/java/org/eclipse/kapua/client/security/client/MessageListener.java new file mode 100644 index 00000000000..c66aa253ece --- /dev/null +++ b/client/security/src/main/java/org/eclipse/kapua/client/security/client/MessageListener.java @@ -0,0 +1,19 @@ +/******************************************************************************* + * Copyright (c) 2021, 2022 Eurotech and/or its affiliates and others + * + * This program and the accompanying materials are made + * available under the terms of the Eclipse Public License 2.0 + * which is available at https://www.eclipse.org/legal/epl-2.0/ + * + * SPDX-License-Identifier: EPL-2.0 + * + * Contributors: + * Eurotech - initial API and implementation + *******************************************************************************/ +package org.eclipse.kapua.client.security.client; + +public interface MessageListener { + + void onMessage(Message message) throws Exception; + +} diff --git a/commons/src/main/java/org/eclipse/kapua/commons/event/jms/JMSServiceEventBus.java b/commons/src/main/java/org/eclipse/kapua/commons/event/jms/JMSServiceEventBus.java index b56342792a2..6fad2f25272 100644 --- a/commons/src/main/java/org/eclipse/kapua/commons/event/jms/JMSServiceEventBus.java +++ b/commons/src/main/java/org/eclipse/kapua/commons/event/jms/JMSServiceEventBus.java @@ -76,7 +76,7 @@ public class JMSServiceEventBus implements ServiceEventBus, ServiceEventBusDrive private final SystemSetting systemSetting; private final CommonsMetric commonsMetric; private final List subscriptionList = new ArrayList<>(); - private final ServiceEventMarshaler eventBusMarshaler; + private final ServiceEventMarshaler serviceEventMarshaler; private final String eventPattern; /** @@ -85,11 +85,11 @@ public class JMSServiceEventBus implements ServiceEventBus, ServiceEventBusDrive @Inject public JMSServiceEventBus(SystemSetting systemSetting, CommonsMetric commonsMetric, - ServiceEventMarshaler eventBusMarshaler, + ServiceEventMarshaler serviceEventMarshaler, String eventPattern) { this.systemSetting = systemSetting; this.commonsMetric = commonsMetric; - this.eventBusMarshaler = eventBusMarshaler; + this.serviceEventMarshaler = serviceEventMarshaler; this.eventPattern = eventPattern; this.eventBusJMSConnectionBridge = new EventBusJMSConnectionBridge(); this.producerPoolMinSize = systemSetting.getInt(SystemSettingKey.EVENT_BUS_PRODUCER_POOL_MIN_SIZE); @@ -137,10 +137,6 @@ public synchronized void subscribe(String address, String name, final ServiceEve } } - private void setSession(ServiceEvent kapuaEvent) { - KapuaSession.createFrom(kapuaEvent.getScopeId(), kapuaEvent.getUserId()); - } - public Boolean isConnected() { return eventBusJMSConnectionBridge.isConnected(); } @@ -328,7 +324,7 @@ synchronized void subscribe(Subscription subscription) try { if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; - final ServiceEvent kapuaEvent = eventBusMarshaler.unmarshal(textMessage.getText()); + final ServiceEvent kapuaEvent = serviceEventMarshaler.unmarshal(textMessage.getText()); setSession(kapuaEvent); KapuaSecurityUtils.doPrivileged(() -> { try { @@ -339,7 +335,6 @@ synchronized void subscribe(Subscription subscription) ServiceEventScope.end(); } }); - } else { LOGGER.error("Discarding wrong event message type '{}'", message != null ? message.getClass() : "null"); } @@ -356,6 +351,10 @@ synchronized void subscribe(Subscription subscription) } } + private void setSession(ServiceEvent kapuaEvent) { + KapuaSession.createFrom(kapuaEvent.getScopeId(), kapuaEvent.getUserId()); + } + private class Sender { // TODO manage the session/producer in a stronger way (if the client disconnects due to a network error the connection will not be restored) @@ -373,8 +372,8 @@ public void sendMessage(ServiceEvent kapuaEvent) throws Exception { try { TextMessage message = jmsSession.createTextMessage(); // Serialize outgoing kapua event based on platform configuration - message.setText(eventBusMarshaler.marshal(kapuaEvent)); - message.setStringProperty(ServiceEventMarshaler.CONTENT_TYPE_KEY, eventBusMarshaler.getContentType()); + message.setText(serviceEventMarshaler.marshal(kapuaEvent)); + message.setStringProperty(ServiceEventMarshaler.CONTENT_TYPE_KEY, serviceEventMarshaler.getContentType()); jmsProducer.send(message); } catch (JMSException | KapuaException e) { LOGGER.error("Message publish interrupted: {}", e.getMessage()); @@ -479,30 +478,4 @@ public void stop() { } } - private class Subscription { - - String name; - String address; - ServiceEventBusListener kapuaEventListener; - - public Subscription(String address, String name, ServiceEventBusListener kapuaEventListener) { - this.name = name; - this.address = address; - this.kapuaEventListener = kapuaEventListener; - } - - public String getName() { - return name; - } - - public String getAddress() { - return address; - } - - public ServiceEventBusListener getKapuaEventListener() { - return kapuaEventListener; - } - - } - -} +} \ No newline at end of file diff --git a/commons/src/main/java/org/eclipse/kapua/commons/event/jms/Subscription.java b/commons/src/main/java/org/eclipse/kapua/commons/event/jms/Subscription.java new file mode 100644 index 00000000000..ce0f81fc5ef --- /dev/null +++ b/commons/src/main/java/org/eclipse/kapua/commons/event/jms/Subscription.java @@ -0,0 +1,41 @@ +/******************************************************************************* + * Copyright (c) 2017, 2022 Eurotech and/or its affiliates and others + * + * This program and the accompanying materials are made + * available under the terms of the Eclipse Public License 2.0 + * which is available at https://www.eclipse.org/legal/epl-2.0/ + * + * SPDX-License-Identifier: EPL-2.0 + * + * Contributors: + * Eurotech - initial API and implementation + *******************************************************************************/ +package org.eclipse.kapua.commons.event.jms; + +import org.eclipse.kapua.event.ServiceEventBusListener; + +public class Subscription { + + private String name; + private String address; + private ServiceEventBusListener kapuaEventListener; + + public Subscription(String address, String name, ServiceEventBusListener kapuaEventListener) { + this.name = name; + this.address = address; + this.kapuaEventListener = kapuaEventListener; + } + + public String getName() { + return name; + } + + public String getAddress() { + return address; + } + + public ServiceEventBusListener getKapuaEventListener() { + return kapuaEventListener; + } + +} \ No newline at end of file diff --git a/commons/src/main/java/org/eclipse/kapua/commons/security/KapuaSession.java b/commons/src/main/java/org/eclipse/kapua/commons/security/KapuaSession.java index 3bc9dc94cb6..22cdc5e39a3 100644 --- a/commons/src/main/java/org/eclipse/kapua/commons/security/KapuaSession.java +++ b/commons/src/main/java/org/eclipse/kapua/commons/security/KapuaSession.java @@ -38,7 +38,7 @@ public class KapuaSession implements Serializable { // TODO to be moved inside configuration service or something like that "fully.qualified.classname.methodname" ( for the constructor) static { TRUSTED_CLASSES.add("org.eclipse.kapua.commons.security.KapuaSecurityUtils.doPrivileged"); - TRUSTED_CLASSES.add("org.eclipse.kapua.commons.event.jms.JMSServiceEventBus.setSession"); + TRUSTED_CLASSES.add("org.eclipse.kapua.commons.event.jms.EventListener.onMessage"); TRUSTED_CLASSES.add("org.eclipse.kapua.job.engine.app.core.filter.RebuildTrustedSessionFilter.isAccessAllowed"); } diff --git a/service/api/src/main/java/org/eclipse/kapua/event/ServiceEventBus.java b/service/api/src/main/java/org/eclipse/kapua/event/ServiceEventBus.java index f22f4365bc6..6a435395e3f 100644 --- a/service/api/src/main/java/org/eclipse/kapua/event/ServiceEventBus.java +++ b/service/api/src/main/java/org/eclipse/kapua/event/ServiceEventBus.java @@ -33,8 +33,8 @@ public interface ServiceEventBus { * * @param address address to listen for events * @param name subscriber name. It's used to share events between multiple instances of the same consumer. - * @param eventListener listener to invoke when an event is received + * @param serviceEventBusListener listener to invoke when an event is received * @throws ServiceEventBusException */ - void subscribe(String address, String name, ServiceEventBusListener eventListener) throws ServiceEventBusException; + void subscribe(String address, String name, ServiceEventBusListener serviceEventBusListener) throws ServiceEventBusException; } diff --git a/service/authentication/src/main/java/org/eclipse/kapua/service/authentication/AuthenticationServiceConverter.java b/service/authentication/src/main/java/org/eclipse/kapua/service/authentication/AuthenticationServiceConverter.java index a07377e7d97..cfbc23138d9 100644 --- a/service/authentication/src/main/java/org/eclipse/kapua/service/authentication/AuthenticationServiceConverter.java +++ b/service/authentication/src/main/java/org/eclipse/kapua/service/authentication/AuthenticationServiceConverter.java @@ -18,7 +18,7 @@ import org.apache.camel.Converter; import org.apache.camel.Exchange; -import org.apache.camel.component.jms.JmsMessage; +import org.apache.camel.Message; import org.eclipse.kapua.KapuaException; import org.eclipse.kapua.client.security.bean.AuthRequest; import org.eclipse.kapua.client.security.bean.EntityRequest; @@ -57,7 +57,7 @@ public AuthenticationServiceConverter(MetricsAuthentication metricsAuthenticatio @Converter public AuthRequest convertToAuthRequest(Exchange exchange, Object value) throws KapuaException { try { - String body = ((JmsMessage) exchange.getIn()).getBody(String.class); + String body = ((Message) exchange.getIn()).getBody(String.class); AuthRequest authRequest = reader.readValue(body, AuthRequest.class); metrics.getConverter().inc(); return authRequest; @@ -70,7 +70,7 @@ public AuthRequest convertToAuthRequest(Exchange exchange, Object value) throws @Converter public EntityRequest convertToGetEntity(Exchange exchange, Object value) throws KapuaException { try { - String body = ((JmsMessage) exchange.getIn()).getBody(String.class); + String body = ((Message) exchange.getIn()).getBody(String.class); EntityRequest entityRequest = reader.readValue(body, EntityRequest.class); metrics.getConverter().inc(); return entityRequest; @@ -79,4 +79,5 @@ public EntityRequest convertToGetEntity(Exchange exchange, Object value) throws throw KapuaException.internalError(e, "Error while converting getEntity message"); } } + }