Skip to content

Commit

Permalink
[LIVY-1000][RSC] Supports reconnection after the RPC link between RSC…
Browse files Browse the repository at this point in the history
…Client and RSCDriver is disconnected
  • Loading branch information
wangdengshan committed Jun 20, 2024
1 parent 728fcf0 commit a37bbab
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 4 deletions.
54 changes: 50 additions & 4 deletions rsc/src/main/java/org/apache/livy/rsc/RSCClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -55,8 +56,8 @@ public class RSCClient implements LivyClient {
private final RSCConf conf;
private final Promise<ContextInfo> contextInfoPromise;
private final Map<String, JobHandleImpl<?>> jobs;
private final ClientProtocol protocol;
private final Promise<Rpc> driverRpc;
private volatile ClientProtocol protocol;
private volatile Promise<Rpc> driverRpc;
private final int executorGroupId;
private final EventLoopGroup eventLoopGroup;
private final Promise<URI> serverUriPromise;
Expand All @@ -68,6 +69,8 @@ public class RSCClient implements LivyClient {
// Record the last activity timestamp of the repl
private volatile long replLastActivity = System.nanoTime();

private AtomicInteger retryTimes = new AtomicInteger(1);

RSCClient(RSCConf conf, Promise<ContextInfo> ctx, Process driverProcess) throws IOException {
this.conf = conf;
this.contextInfoPromise = ctx;
Expand Down Expand Up @@ -126,11 +129,54 @@ public void onSuccess(Rpc rpc) throws Exception {
Utils.addListener(rpc.getChannel().closeFuture(), new FutureListener<Void>() {
@Override
public void onSuccess(Void unused) {
if (isAlive) {
if (!isAlive) {
LOG.warn("RSCClient is already closed!");
return;
}
long initialWaitTime = conf.getTimeAsMs(RPC_CLIENT_RETRY_INITIAL_WAIT_TIME);
long maxRetryTimes = conf.getTimeAsMs(RPC_CLIENT_RETRY_MAX_TIMES);
Random random = new Random();
while (retryTimes.get() < maxRetryTimes) {
LOG.info("Try to connect context for the {} times !", retryTimes.get());
try {
protocol = new ClientProtocol();
Promise<Rpc> client = Rpc.createClient(conf,
eventLoopGroup,
info.remoteAddress,
info.remotePort,
info.clientId,
info.secret,
protocol);
Rpc rpcRetry = client.get(conf.getTimeAsMs(RPC_CLIENT_CONNECT_TIMEOUT),
TimeUnit.MILLISECONDS);
driverRpc = ImmediateEventExecutor.INSTANCE.newPromise();
driverRpc.setSuccess(rpcRetry);
LOG.debug("Retry to connect context succeed !");
break;
} catch (Exception e) {
LOG.warn("Retry to connect context failed for the {} times wait {}ms retry!",
retryTimes.get(), initialWaitTime, e);

try {
Thread.sleep(initialWaitTime);
} catch (InterruptedException ex) {
LOG.error("", ex);
}

retryTimes.addAndGet(1);
initialWaitTime *= 2;
initialWaitTime += random.nextInt(100000);
}
}

if (retryTimes.get() >= maxRetryTimes) {
LOG.warn("Client RPC channel closed unexpectedly.");
try {
stop(false);
} catch (Exception e) { /* stop() itself prints warning. */ }
} catch (Exception e) {
/* stop() itself prints warning. */
LOG.error("", e);
}
}
}
});
Expand Down
2 changes: 2 additions & 0 deletions rsc/src/main/java/org/apache/livy/rsc/RSCConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public static enum Entry implements ConfEntry {
RPC_SERVER_ADDRESS("rpc.server.address", null),
RPC_CLIENT_HANDSHAKE_TIMEOUT("server.connect.timeout", "90s"),
RPC_CLIENT_CONNECT_TIMEOUT("client.connect.timeout", "10s"),
RPC_CLIENT_RETRY_INITIAL_WAIT_TIME("client.connect.retry.initial.wait.time", "20s"),
RPC_CLIENT_RETRY_MAX_TIMES("client.connect.retry.max.times", "3"),
RPC_CHANNEL_LOG_LEVEL("channel.log.level", null),
RPC_MAX_MESSAGE_SIZE("rpc.max.size", 50 * 1024 * 1024),
RPC_MAX_THREADS("rpc.threads", 8),
Expand Down

0 comments on commit a37bbab

Please sign in to comment.