Skip to content

Commit

Permalink
Attempt fix for redis shutdown issue
Browse files Browse the repository at this point in the history
  • Loading branch information
Nathan Martin committed May 11, 2024
1 parent da98be4 commit 16d9573
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public class RedisConnection extends MessengerConnection implements IMessengerConnection {
private final Vector<RedisSubscriber> subscribers = new Vector<>();
private final Map<PacketIdentification, List<PacketListener<? extends Packet.Wrapper>>> listeners = new HashMap<>();
private final RedisPublisher publisher;
private final RedisClient.Builder clientBuilder;
private boolean isAlive = false;
private final AtomicBoolean alive = new AtomicBoolean(false);
private ExecutorService executorService;
private final FailService failService;
private final AESCryptor cryptor;
Expand All @@ -37,7 +38,7 @@ public RedisConnection(RedisClient.Builder clientBuilder, AESCryptor cryptor) {
}

protected void subscribe(IMessageCacheService<?> cache, PluginLogger logger, Packet.Node senderUUID) {
if(!this.isAlive) return;
if(!this.alive.get()) return;

this.executorService.submit(() -> {
try {
Expand All @@ -62,24 +63,28 @@ protected void subscribe(IMessageCacheService<?> cache, PluginLogger logger, Pac
}

public void startListening(IMessageCacheService<?> cache, PluginLogger logger, Packet.Node senderUUID) {
if(this.isAlive) throw new IllegalStateException("The RedisService is already running! You can't start it again! Shut it down with `.kill()` first and then try again!");
if(this.alive.get()) throw new IllegalStateException("The RedisService is already running! You can't start it again! Shut it down with `.kill()` first and then try again!");
this.executorService = Executors.newFixedThreadPool(2);

this.isAlive = true;
this.alive.set(true);

this.subscribe(cache, logger, senderUUID);
}

@Override
public void kill() {
this.isAlive = false;
this.alive.set(false);
this.failService.kill();

for (Iterator<RedisSubscriber> iterator = this.subscribers.elements().asIterator(); iterator.hasNext(); ) {
RedisSubscriber subscriber = iterator.next();
subscriber.shutdown();
}

try {
this.publisher.kill();
} catch (Exception ignore) {}

try {
this.executorService.shutdown();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,20 @@

import group.aelysium.rustyconnector.core.lib.crypt.AESCryptor;
import group.aelysium.rustyconnector.toolkit.core.packet.Packet;
import group.aelysium.rustyconnector.toolkit.core.serviceable.interfaces.Service;
import io.lettuce.core.RedisChannelHandler;
import io.lettuce.core.RedisConnectionStateAdapter;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public class RedisPublisher {
public class RedisPublisher implements Service {
private final RedisClient client;
private StatefulRedisPubSubConnection<String, String> connection;
private final AESCryptor cryptor;
private final AtomicBoolean killed = new AtomicBoolean(false);
protected RedisPublisher(RedisClient client, AESCryptor cryptor) {
this.client = client;
this.client.addListener(new RedisPublisherListener());
Expand All @@ -35,6 +38,8 @@ public void shutdown() {
* @throws IllegalStateException If you attempt to send a received RedisMessage.
*/
public void publish(Packet packet) {
if(killed.get()) return;

String signedPacket;
try {
signedPacket = this.cryptor.encrypt(packet.toString());
Expand All @@ -50,6 +55,11 @@ public void publish(Packet packet) {
async.publish(this.client.dataChannel(), signedPacket);
}

public void kill() {
this.killed.set(true);
this.connection.close();
}

static class RedisPublisherListener extends RedisConnectionStateAdapter {
@Override
public void onRedisExceptionCaught(RedisChannelHandler<?, ?> connection, Throwable cause) {
Expand Down

0 comments on commit 16d9573

Please sign in to comment.