Skip to content

Commit

Permalink
CAMEL-20121 reconnect SMPP session after receiving Unbind
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Ambach <[email protected]>
  • Loading branch information
der-ambi committed Jan 15, 2024
1 parent 8cbaae6 commit e984c08
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,11 @@ public SmppConsumer(SmppEndpoint endpoint, SmppConfiguration config, Processor p
configuration.getSessionStateListener().onStateChange(newState, oldState, source);
}

if (newState.equals(SessionState.CLOSED)) {
LOG.warn("Lost connection to: {} - trying to reconnect...", getEndpoint().getConnectionString());
if (newState.equals(SessionState.UNBOUND) || newState.equals(SessionState.CLOSED)) {
LOG.warn(newState.equals(SessionState.UNBOUND)
? "Session to {} was unbound - trying to reconnect" : "Lost connection to: {} - trying to reconnect...",
getEndpoint().getConnectionString());
closeSession();

reconnect(configuration.getInitialReconnectDelay());
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,10 @@ public SmppProducer(SmppEndpoint endpoint, SmppConfiguration config) {
configuration.getSessionStateListener().onStateChange(newState, oldState, source);
}

if (newState.equals(SessionState.CLOSED)) {
LOG.warn("Lost connection to: {} - trying to reconnect...", getEndpoint().getConnectionString());
if (newState.equals(SessionState.UNBOUND) || newState.equals(SessionState.CLOSED)) {
LOG.warn(newState.equals(SessionState.UNBOUND)
? "Session to {} was unbound - trying to reconnect" : "Lost connection to: {} - trying to reconnect...",
getEndpoint().getConnectionString());
closeSession();
reconnect(configuration.getInitialReconnectDelay());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,37 @@
*/
package org.apache.camel.component.smpp;

import java.util.concurrent.ScheduledExecutorService;

import org.apache.camel.CamelContext;
import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.Processor;
import org.apache.camel.spi.ExchangeFactory;
import org.apache.camel.support.task.BackgroundTask;
import org.apache.camel.support.task.budget.Budgets;
import org.apache.camel.util.ReflectionHelper;
import org.jsmpp.bean.BindType;
import org.jsmpp.bean.NumberingPlanIndicator;
import org.jsmpp.bean.TypeOfNumber;
import org.jsmpp.extra.SessionState;
import org.jsmpp.session.BindParameter;
import org.jsmpp.session.MessageReceiverListener;
import org.jsmpp.session.SMPPSession;
import org.jsmpp.session.SessionStateListener;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.MockedStatic;

import static org.junit.jupiter.api.Assertions.assertSame;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -156,4 +169,36 @@ public void getterShouldReturnTheSetValues() {
assertSame(endpoint, consumer.getEndpoint());
assertSame(configuration, consumer.getConfiguration());
}

@ParameterizedTest
@EnumSource(value = SessionState.class, names = { "UNBOUND", "CLOSED" })
public void internalSessionStateListenerShouldCloseSessionAndReconnect(SessionState sessionState) throws Exception {
try (MockedStatic<SmppUtils> smppUtilsMock = mockStatic(SmppUtils.class)) {
SessionStateListener sessionStateListener = (SessionStateListener) ReflectionHelper
.getField(SmppConsumer.class.getDeclaredField("internalSessionStateListener"), consumer);
ScheduledExecutorService reconnectService = (ScheduledExecutorService) ReflectionHelper
.getField(SmppConsumer.class.getDeclaredField("reconnectService"), consumer);
when(endpoint.getConnectionString())
.thenReturn("smpp://smppclient@localhost:2775");
BindParameter expectedBindParameter = new BindParameter(
BindType.BIND_RX,
"smppclient",
"password",
"cp",
TypeOfNumber.UNKNOWN,
NumberingPlanIndicator.UNKNOWN,
"");
when(session.connectAndBind("localhost", Integer.valueOf(2775), expectedBindParameter))
.thenReturn("1");
smppUtilsMock.when(() -> SmppUtils.newReconnectTask(any(), anyString(), anyLong(), anyLong(), anyInt()))
.thenReturn(new BackgroundTask.BackgroundTaskBuilder().withScheduledExecutor(reconnectService)
.withBudget(Budgets.timeBudget().build()).build());

consumer.doStart();

sessionStateListener.onStateChange(sessionState, SessionState.BOUND_RX, null);
verify(session).unbindAndClose();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,34 @@
*/
package org.apache.camel.component.smpp;

import java.util.concurrent.ScheduledExecutorService;

import org.apache.camel.Exchange;
import org.apache.camel.support.task.BackgroundTask;
import org.apache.camel.support.task.budget.Budgets;
import org.apache.camel.util.ReflectionHelper;
import org.jsmpp.bean.BindType;
import org.jsmpp.bean.InterfaceVersion;
import org.jsmpp.bean.NumberingPlanIndicator;
import org.jsmpp.bean.TypeOfNumber;
import org.jsmpp.extra.SessionState;
import org.jsmpp.session.BindParameter;
import org.jsmpp.session.SMPPSession;
import org.jsmpp.session.SessionStateListener;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.MockedStatic;

import static org.junit.jupiter.api.Assertions.assertSame;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -126,4 +140,37 @@ public void getterShouldReturnTheSetValues() {
assertSame(endpoint, producer.getEndpoint());
assertSame(configuration, producer.getConfiguration());
}

@ParameterizedTest
@EnumSource(value = SessionState.class, names = { "UNBOUND", "CLOSED" })
public void internalSessionStateListenerShouldCloseSessionAndReconnect(SessionState sessionState) throws Exception {
try (MockedStatic<SmppUtils> smppUtilsMock = mockStatic(SmppUtils.class)) {
ScheduledExecutorService reconnectService = (ScheduledExecutorService) ReflectionHelper
.getField(SmppProducer.class.getDeclaredField("reconnectService"), producer);
SessionStateListener sessionStateListener = (SessionStateListener) ReflectionHelper
.getField(SmppProducer.class.getDeclaredField("internalSessionStateListener"), producer);
when(endpoint.getConnectionString())
.thenReturn("smpp://smppclient@localhost:2775");
BindParameter expectedBindParameters = new BindParameter(
BindType.BIND_TX,
"smppclient",
"password",
"cp",
TypeOfNumber.UNKNOWN,
NumberingPlanIndicator.UNKNOWN,
"",
InterfaceVersion.IF_50);
when(session.connectAndBind("localhost", Integer.valueOf(2775), expectedBindParameters))
.thenReturn("1");
when(endpoint.isSingleton()).thenReturn(true);
smppUtilsMock.when(() -> SmppUtils.newReconnectTask(any(), anyString(), anyLong(), anyLong(), anyInt()))
.thenReturn(new BackgroundTask.BackgroundTaskBuilder().withScheduledExecutor(reconnectService)
.withBudget(Budgets.timeBudget().build()).build());

producer.doStart();

sessionStateListener.onStateChange(SessionState.CLOSED, SessionState.BOUND_TX, null);
verify(session).unbindAndClose();
}
}
}

0 comments on commit e984c08

Please sign in to comment.