diff --git a/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java b/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java index ee9c9012f..9e4518ef3 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java +++ b/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java @@ -22,6 +22,7 @@ import java.net.URI; import java.nio.ByteBuffer; import java.util.Map; +import java.util.Random; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; @@ -55,8 +56,8 @@ public class RSCClient implements LivyClient { private final RSCConf conf; private final Promise contextInfoPromise; private final Map> jobs; - private final ClientProtocol protocol; - private final Promise driverRpc; + private volatile ClientProtocol protocol; + private volatile Promise driverRpc; private final int executorGroupId; private final EventLoopGroup eventLoopGroup; private final Promise serverUriPromise; @@ -68,6 +69,8 @@ public class RSCClient implements LivyClient { // Record the last activity timestamp of the repl private volatile long replLastActivity = System.nanoTime(); + private AtomicInteger retryTimes = new AtomicInteger(1); + RSCClient(RSCConf conf, Promise ctx, Process driverProcess) throws IOException { this.conf = conf; this.contextInfoPromise = ctx; @@ -126,11 +129,54 @@ public void onSuccess(Rpc rpc) throws Exception { Utils.addListener(rpc.getChannel().closeFuture(), new FutureListener() { @Override public void onSuccess(Void unused) { - if (isAlive) { + if (!isAlive) { + LOG.warn("RSCClient is already closed!"); + return; + } + long initialWaitTime = conf.getTimeAsMs(RPC_CLIENT_RETRY_INITIAL_WAIT_TIME); + long maxRetryTimes = conf.getTimeAsMs(RPC_CLIENT_RETRY_MAX_TIMES); + Random random = new Random(); + while (retryTimes.get() < maxRetryTimes) { + LOG.info("Try to connect context for the {} times !", retryTimes.get()); + try { + protocol = new ClientProtocol(); + Promise client = Rpc.createClient(conf, + eventLoopGroup, + info.remoteAddress, + info.remotePort, + info.clientId, + info.secret, + protocol); + Rpc rpcRetry = client.get(conf.getTimeAsMs(RPC_CLIENT_CONNECT_TIMEOUT), + TimeUnit.MILLISECONDS); + driverRpc = ImmediateEventExecutor.INSTANCE.newPromise(); + driverRpc.setSuccess(rpcRetry); + LOG.debug("Retry to connect context succeed !"); + break; + } catch (Exception e) { + LOG.warn("Retry to connect context failed for the {} times wait {}ms retry!", + retryTimes.get(), initialWaitTime, e); + + try { + Thread.sleep(initialWaitTime); + } catch (InterruptedException ex) { + LOG.error("", ex); + } + + retryTimes.addAndGet(1); + initialWaitTime *= 2; + initialWaitTime += random.nextInt(100000); + } + } + + if (retryTimes.get() >= maxRetryTimes) { LOG.warn("Client RPC channel closed unexpectedly."); try { stop(false); - } catch (Exception e) { /* stop() itself prints warning. */ } + } catch (Exception e) { + /* stop() itself prints warning. */ + LOG.error("", e); + } } } }); diff --git a/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java b/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java index 4c45956d7..9239b705f 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java +++ b/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java @@ -63,6 +63,8 @@ public static enum Entry implements ConfEntry { RPC_SERVER_ADDRESS("rpc.server.address", null), RPC_CLIENT_HANDSHAKE_TIMEOUT("server.connect.timeout", "90s"), RPC_CLIENT_CONNECT_TIMEOUT("client.connect.timeout", "10s"), + RPC_CLIENT_RETRY_INITIAL_WAIT_TIME("client.connect.retry.initial.wait.time", "20s"), + RPC_CLIENT_RETRY_MAX_TIMES("client.connect.retry.max.times", "3"), RPC_CHANNEL_LOG_LEVEL("channel.log.level", null), RPC_MAX_MESSAGE_SIZE("rpc.max.size", 50 * 1024 * 1024), RPC_MAX_THREADS("rpc.threads", 8),