From e9a4037f8da643a6d72d8d8c33dbd5d7f520be99 Mon Sep 17 00:00:00 2001 From: "jiachun.fjc" Date: Sat, 9 Oct 2021 19:10:44 +0800 Subject: [PATCH 1/3] fix: when the grpc connection failures too many times, reset the connect backoff and reconnect immediately --- jraft-core/src/test/resources/log4j2.xml | 4 +- .../sofa/jraft/rpc/impl/GrpcClient.java | 118 ++++++++++++------ 2 files changed, 85 insertions(+), 37 deletions(-) diff --git a/jraft-core/src/test/resources/log4j2.xml b/jraft-core/src/test/resources/log4j2.xml index 0d4541357..d53e7655a 100644 --- a/jraft-core/src/test/resources/log4j2.xml +++ b/jraft-core/src/test/resources/log4j2.xml @@ -3,12 +3,12 @@ - + - + diff --git a/jraft-extension/rpc-grpc-impl/src/main/java/com/alipay/sofa/jraft/rpc/impl/GrpcClient.java b/jraft-extension/rpc-grpc-impl/src/main/java/com/alipay/sofa/jraft/rpc/impl/GrpcClient.java index 7d29a54ad..4e658d6b6 100644 --- a/jraft-extension/rpc-grpc-impl/src/main/java/com/alipay/sofa/jraft/rpc/impl/GrpcClient.java +++ b/jraft-extension/rpc-grpc-impl/src/main/java/com/alipay/sofa/jraft/rpc/impl/GrpcClient.java @@ -60,13 +60,15 @@ */ public class GrpcClient implements RpcClient { - private static final Logger LOG = LoggerFactory.getLogger(GrpcClient.class); + private static final Logger LOG = LoggerFactory.getLogger(GrpcClient.class); - private static final int MAX_FAILURES = SystemPropertyUtil.getInt( - "jraft.grpc.max.connect.failures", 20); + private static final int RESET_BACKOFF_THRESHOLD = SystemPropertyUtil + .getInt( + "jraft.grpc.max.conn.failures.reset_backoff", + 2); - private final Map managedChannelPool = new ConcurrentHashMap<>(); - private final Map transientFailures = new ConcurrentHashMap<>(); + private final Map managedChannelPool = new ConcurrentHashMap<>(); + private final Map transientFailures = new ConcurrentHashMap<>(); private final Map parserClasses; private final MarshallerRegistry marshallerRegistry; private volatile ReplicatorGroup replicatorGroup; @@ -185,44 +187,68 @@ private ManagedChannel getChannel(final Endpoint endpoint) { .directExecutor() // .maxInboundMessageSize(GrpcRaftRpcFactory.RPC_MAX_INBOUND_MESSAGE_SIZE) // .build(); + // channel connection event - ch.notifyWhenStateChanged(ConnectivityState.READY, () -> { - final ReplicatorGroup rpGroup = replicatorGroup; - if (rpGroup != null) { - try { - RpcUtils.runInThread(() -> { - final PeerId peer = new PeerId(); - if (peer.parse(ep.toString())) { - LOG.info("Peer {} is connected.", peer); - rpGroup.checkReplicator(peer, true); - } else { - LOG.error("Fail to parse peer: {}.", ep); - } - }); - } catch (final Throwable t) { - LOG.error("Fail to check replicator {}.", ep, t); - } - } - }); - ch.notifyWhenStateChanged(ConnectivityState.TRANSIENT_FAILURE, - () -> LOG.warn("Channel in TRANSIENT_FAILURE state: {}.", ep)); - ch.notifyWhenStateChanged(ConnectivityState.SHUTDOWN, - () -> LOG.warn("Channel in SHUTDOWN state: {}.", ep)); + ch.notifyWhenStateChanged(ConnectivityState.READY, () -> onChannelReady(ep)); + ch.notifyWhenStateChanged(ConnectivityState.TRANSIENT_FAILURE, () -> onChannelFailure(ep, ch)); + ch.notifyWhenStateChanged(ConnectivityState.SHUTDOWN, () -> onChannelShutdown(ep)); return ch; }); } + private ManagedChannel removeChannel(final Endpoint endpoint) { + return this.managedChannelPool.remove(endpoint); + } + + private void onChannelReady(final Endpoint endpoint) { + LOG.info("The channel {} has successfully established.", endpoint); + final ReplicatorGroup rpGroup = this.replicatorGroup; + if (rpGroup != null) { + try { + RpcUtils.runInThread(() -> { + final PeerId peer = new PeerId(); + if (peer.parse(endpoint.toString())) { + LOG.info("Peer {} is connected.", peer); + rpGroup.checkReplicator(peer, true); + } else { + LOG.error("Fail to parse peer: {}.", endpoint); + } + }); + } catch (final Throwable t) { + LOG.error("Fail to check replicator {}.", endpoint, t); + } + } + } + + private void onChannelFailure(final Endpoint endpoint, final ManagedChannel ch) { + LOG.warn("There has been some transient failure on this channel {}.", endpoint); + RpcUtils.runInThread(() -> { + if (ch == null) { + return; + } + // double-check + if (ch.getState(false) == ConnectivityState.TRANSIENT_FAILURE) { + mayResetConnectBackoff(endpoint, ch); + } + }); + } + + private void onChannelShutdown(final Endpoint endpoint) { + LOG.warn("This channel {} has started shutting down. Any new RPCs should fail immediately.", endpoint); + } + private void closeAllChannels() { for (final Map.Entry entry : this.managedChannelPool.entrySet()) { final ManagedChannel ch = entry.getValue(); LOG.info("Shutdown managed channel: {}, {}.", entry.getKey(), ch); ManagedChannelHelper.shutdownAndAwaitTermination(ch); } + this.managedChannelPool.clear(); } private void closeChannel(final Endpoint endpoint) { - final ManagedChannel ch = this.managedChannelPool.remove(endpoint); + final ManagedChannel ch = removeChannel(endpoint); LOG.info("Close connection: {}, {}.", endpoint, ch); if (ch != null) { ManagedChannelHelper.shutdownAndAwaitTermination(ch); @@ -231,21 +257,43 @@ private void closeChannel(final Endpoint endpoint) { private boolean checkChannel(final Endpoint endpoint, final boolean createIfAbsent) { ManagedChannel ch = this.managedChannelPool.get(endpoint); + if (ch == null && createIfAbsent) { ch = getChannel(endpoint); } + if (ch == null) { return false; } - final ConnectivityState st = ch.getState(true); + + ConnectivityState st = ch.getState(true); + if (st == ConnectivityState.TRANSIENT_FAILURE) { - final AtomicInteger num = this.transientFailures.computeIfAbsent(endpoint, ep -> new AtomicInteger()); - if (num.incrementAndGet() > MAX_FAILURES) { - this.transientFailures.remove(endpoint); - LOG.warn("Channel[{}] in {} state {} times, will be reset connect backoff.", endpoint, st, num.get()); - ch.resetConnectBackoff(); - } + mayResetConnectBackoff(endpoint, ch); + st = ch.getState(false); } + + LOG.debug("Channel[{}] in {} state.", ch, st); + return st != ConnectivityState.TRANSIENT_FAILURE && st != ConnectivityState.SHUTDOWN; } + + private int incConnFailuresCount(final Endpoint endpoint) { + return this.transientFailures.computeIfAbsent(endpoint, ep -> new AtomicInteger()).incrementAndGet(); + } + + private void mayResetConnectBackoff(final Endpoint endpoint, final ManagedChannel ch) { + final int c = incConnFailuresCount(endpoint); + if (c < RESET_BACKOFF_THRESHOLD) { + return; + } + + this.transientFailures.remove(endpoint); + + LOG.warn("Channel[{}] in TRANSIENT_FAILURE state {} times, will be reset connect backoff.", endpoint, c); + + // For sub-channels that are in TRANSIENT_FAILURE state, short-circuit the backoff timer and make + // them reconnect immediately. May also attempt to invoke NameResolver#refresh + ch.resetConnectBackoff(); + } } From e2765f6900e33fdc3f1b418252a748f84f858325 Mon Sep 17 00:00:00 2001 From: "jiachun.fjc" Date: Mon, 18 Oct 2021 14:54:39 +0800 Subject: [PATCH 2/3] by CR --- .../java/com/alipay/sofa/jraft/rpc/impl/GrpcClient.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/jraft-extension/rpc-grpc-impl/src/main/java/com/alipay/sofa/jraft/rpc/impl/GrpcClient.java b/jraft-extension/rpc-grpc-impl/src/main/java/com/alipay/sofa/jraft/rpc/impl/GrpcClient.java index 4e658d6b6..4491c6f68 100644 --- a/jraft-extension/rpc-grpc-impl/src/main/java/com/alipay/sofa/jraft/rpc/impl/GrpcClient.java +++ b/jraft-extension/rpc-grpc-impl/src/main/java/com/alipay/sofa/jraft/rpc/impl/GrpcClient.java @@ -203,6 +203,9 @@ private ManagedChannel removeChannel(final Endpoint endpoint) { private void onChannelReady(final Endpoint endpoint) { LOG.info("The channel {} has successfully established.", endpoint); + + clearConnFailuresCount(endpoint); + final ReplicatorGroup rpGroup = this.replicatorGroup; if (rpGroup != null) { try { @@ -282,6 +285,10 @@ private int incConnFailuresCount(final Endpoint endpoint) { return this.transientFailures.computeIfAbsent(endpoint, ep -> new AtomicInteger()).incrementAndGet(); } + private void clearConnFailuresCount(final Endpoint endpoint) { + this.transientFailures.remove(endpoint); + } + private void mayResetConnectBackoff(final Endpoint endpoint, final ManagedChannel ch) { final int c = incConnFailuresCount(endpoint); if (c < RESET_BACKOFF_THRESHOLD) { From feffd395ed7c923345f7a3cfb0695aabdeb8d7a0 Mon Sep 17 00:00:00 2001 From: "jiachun.fjc" Date: Fri, 19 Nov 2021 11:09:58 +0800 Subject: [PATCH 3/3] grpc conn fix --- .../sofa/jraft/rpc/ConnectionRefreshTest.java | 60 +++++++ .../sofa/jraft/rpc/impl/GrpcClient.java | 166 ++++++++++++------ .../sofa/jraft/rpc/ConnectionRefreshTest.java | 60 +++++++ 3 files changed, 229 insertions(+), 57 deletions(-) create mode 100644 jraft-core/src/test/java/com/alipay/sofa/jraft/rpc/ConnectionRefreshTest.java create mode 100644 jraft-extension/rpc-grpc-impl/src/test/java/com/alipay/sofa/jraft/rpc/ConnectionRefreshTest.java diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/rpc/ConnectionRefreshTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/rpc/ConnectionRefreshTest.java new file mode 100644 index 000000000..92cf3fec1 --- /dev/null +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/rpc/ConnectionRefreshTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.rpc; + +import org.junit.Ignore; +import org.junit.Test; + +import com.alipay.sofa.jraft.rpc.impl.PingRequestProcessor; +import com.alipay.sofa.jraft.util.Endpoint; +import com.alipay.sofa.jraft.util.RpcFactoryHelper; + +/** + * + * @author jiachun.fjc + */ +public class ConnectionRefreshTest { + + @Ignore + @Test + public void simulation() throws InterruptedException { + ProtobufMsgFactory.load(); + + final RpcServer server = RpcFactoryHelper.rpcFactory().createRpcServer(new Endpoint("127.0.0.1", 19991)); + server.registerProcessor(new PingRequestProcessor()); + server.init(null); + + final Endpoint target = new Endpoint("my.test.host1.com", 19991); + + final RpcClient client = RpcFactoryHelper.rpcFactory().createRpcClient(); + client.init(null); + + final RpcRequests.PingRequest req = RpcRequests.PingRequest.newBuilder() // + .setSendTimestamp(System.currentTimeMillis()) // + .build(); + + for (int i = 0; i < 1000; i++) { + try { + final Object resp = client.invokeSync(target, req, 3000); + System.out.println(resp); + } catch (final Exception e) { + e.printStackTrace(); + } + Thread.sleep(1000); + } + } +} diff --git a/jraft-extension/rpc-grpc-impl/src/main/java/com/alipay/sofa/jraft/rpc/impl/GrpcClient.java b/jraft-extension/rpc-grpc-impl/src/main/java/com/alipay/sofa/jraft/rpc/impl/GrpcClient.java index 4491c6f68..256611a79 100644 --- a/jraft-extension/rpc-grpc-impl/src/main/java/com/alipay/sofa/jraft/rpc/impl/GrpcClient.java +++ b/jraft-extension/rpc-grpc-impl/src/main/java/com/alipay/sofa/jraft/rpc/impl/GrpcClient.java @@ -60,15 +60,13 @@ */ public class GrpcClient implements RpcClient { - private static final Logger LOG = LoggerFactory.getLogger(GrpcClient.class); + private static final Logger LOG = LoggerFactory.getLogger(GrpcClient.class); - private static final int RESET_BACKOFF_THRESHOLD = SystemPropertyUtil - .getInt( - "jraft.grpc.max.conn.failures.reset_backoff", - 2); + private static final int RESET_CONN_THRESHOLD = SystemPropertyUtil.getInt( + "jraft.grpc.max.conn.failures.to_reset", 2); - private final Map managedChannelPool = new ConcurrentHashMap<>(); - private final Map transientFailures = new ConcurrentHashMap<>(); + private final Map managedChannelPool = new ConcurrentHashMap<>(); + private final Map transientFailures = new ConcurrentHashMap<>(); private final Map parserClasses; private final MarshallerRegistry marshallerRegistry; private volatile ReplicatorGroup replicatorGroup; @@ -142,10 +140,16 @@ public void invokeAsync(final Endpoint endpoint, final Object request, final Inv Requires.requireNonNull(endpoint, "endpoint"); Requires.requireNonNull(request, "request"); - final Channel ch = getChannel(endpoint); + final Executor executor = callback.executor() != null ? callback.executor() : DirectExecutor.INSTANCE; + + final Channel ch = getCheckedChannel(endpoint); + if (ch == null) { + executor.execute(() -> callback.complete(null, new RemotingException("Fail to connect: " + endpoint))); + return; + } + final MethodDescriptor method = getCallMethod(request); final CallOptions callOpts = CallOptions.DEFAULT.withDeadlineAfter(timeoutMs, TimeUnit.MILLISECONDS); - final Executor executor = callback.executor() != null ? callback.executor() : DirectExecutor.INSTANCE; ClientCalls.asyncUnaryCall(ch.newCall(method, callOpts), (Message) request, new StreamObserver() { @@ -180,28 +184,74 @@ private MethodDescriptor getCallMethod(final Object request) { .build(); } - private ManagedChannel getChannel(final Endpoint endpoint) { - return this.managedChannelPool.computeIfAbsent(endpoint, ep -> { - final ManagedChannel ch = ManagedChannelBuilder.forAddress(ep.getIp(), ep.getPort()) // - .usePlaintext() // - .directExecutor() // - .maxInboundMessageSize(GrpcRaftRpcFactory.RPC_MAX_INBOUND_MESSAGE_SIZE) // - .build(); - - // channel connection event - ch.notifyWhenStateChanged(ConnectivityState.READY, () -> onChannelReady(ep)); - ch.notifyWhenStateChanged(ConnectivityState.TRANSIENT_FAILURE, () -> onChannelFailure(ep, ch)); - ch.notifyWhenStateChanged(ConnectivityState.SHUTDOWN, () -> onChannelShutdown(ep)); + private ManagedChannel getCheckedChannel(final Endpoint endpoint) { + final ManagedChannel ch = getChannel(endpoint, true); + if (checkConnectivity(endpoint, ch)) { return ch; - }); + } + + return null; + } + + private ManagedChannel getChannel(final Endpoint endpoint, final boolean createIfAbsent) { + if (createIfAbsent) { + return this.managedChannelPool.computeIfAbsent(endpoint, this::newChannel); + } else { + return this.managedChannelPool.get(endpoint); + } + } + + private ManagedChannel newChannel(final Endpoint endpoint) { + final ManagedChannel ch = ManagedChannelBuilder.forAddress(endpoint.getIp(), endpoint.getPort()) // + .usePlaintext() // + .directExecutor() // + .maxInboundMessageSize(GrpcRaftRpcFactory.RPC_MAX_INBOUND_MESSAGE_SIZE) // + .build(); + + LOG.info("Creating new channel to: {}.", endpoint); + + // The init channel state is IDLE + notifyWhenStateChanged(ConnectivityState.IDLE, endpoint, ch); + + return ch; } private ManagedChannel removeChannel(final Endpoint endpoint) { return this.managedChannelPool.remove(endpoint); } - private void onChannelReady(final Endpoint endpoint) { + private void notifyWhenStateChanged(final ConnectivityState state, final Endpoint endpoint, final ManagedChannel ch) { + ch.notifyWhenStateChanged(state, () -> onStateChanged(endpoint, ch)); + } + + private void onStateChanged(final Endpoint endpoint, final ManagedChannel ch) { + final ConnectivityState state = ch.getState(false); + + LOG.info("The channel {} is in state: {}.", endpoint, state); + + switch (state) { + case READY: + notifyReady(endpoint); + notifyWhenStateChanged(ConnectivityState.READY, endpoint, ch); + break; + case TRANSIENT_FAILURE: + notifyFailure(endpoint); + notifyWhenStateChanged(ConnectivityState.TRANSIENT_FAILURE, endpoint, ch); + break; + case SHUTDOWN: + notifyShutdown(endpoint); + break; + case CONNECTING: + notifyWhenStateChanged(ConnectivityState.CONNECTING, endpoint, ch); + break; + case IDLE: + notifyWhenStateChanged(ConnectivityState.IDLE, endpoint, ch); + break; + } + } + + private void notifyReady(final Endpoint endpoint) { LOG.info("The channel {} has successfully established.", endpoint); clearConnFailuresCount(endpoint); @@ -224,20 +274,11 @@ private void onChannelReady(final Endpoint endpoint) { } } - private void onChannelFailure(final Endpoint endpoint, final ManagedChannel ch) { + private void notifyFailure(final Endpoint endpoint) { LOG.warn("There has been some transient failure on this channel {}.", endpoint); - RpcUtils.runInThread(() -> { - if (ch == null) { - return; - } - // double-check - if (ch.getState(false) == ConnectivityState.TRANSIENT_FAILURE) { - mayResetConnectBackoff(endpoint, ch); - } - }); } - private void onChannelShutdown(final Endpoint endpoint) { + private void notifyShutdown(final Endpoint endpoint) { LOG.warn("This channel {} has started shutting down. Any new RPCs should fail immediately.", endpoint); } @@ -259,26 +300,13 @@ private void closeChannel(final Endpoint endpoint) { } private boolean checkChannel(final Endpoint endpoint, final boolean createIfAbsent) { - ManagedChannel ch = this.managedChannelPool.get(endpoint); - - if (ch == null && createIfAbsent) { - ch = getChannel(endpoint); - } + final ManagedChannel ch = getChannel(endpoint, createIfAbsent); if (ch == null) { return false; } - ConnectivityState st = ch.getState(true); - - if (st == ConnectivityState.TRANSIENT_FAILURE) { - mayResetConnectBackoff(endpoint, ch); - st = ch.getState(false); - } - - LOG.debug("Channel[{}] in {} state.", ch, st); - - return st != ConnectivityState.TRANSIENT_FAILURE && st != ConnectivityState.SHUTDOWN; + return checkConnectivity(endpoint, ch); } private int incConnFailuresCount(final Endpoint endpoint) { @@ -289,18 +317,42 @@ private void clearConnFailuresCount(final Endpoint endpoint) { this.transientFailures.remove(endpoint); } - private void mayResetConnectBackoff(final Endpoint endpoint, final ManagedChannel ch) { + private boolean checkConnectivity(final Endpoint endpoint, final ManagedChannel ch) { + final ConnectivityState st = ch.getState(false); + + if (st != ConnectivityState.TRANSIENT_FAILURE && st != ConnectivityState.SHUTDOWN) { + return true; + } + final int c = incConnFailuresCount(endpoint); - if (c < RESET_BACKOFF_THRESHOLD) { - return; + if (c < RESET_CONN_THRESHOLD) { + if (c == RESET_CONN_THRESHOLD - 1) { + // For sub-channels that are in TRANSIENT_FAILURE state, short-circuit the backoff timer and make + // them reconnect immediately. May also attempt to invoke NameResolver#refresh + ch.resetConnectBackoff(); + } + return true; } - this.transientFailures.remove(endpoint); + clearConnFailuresCount(endpoint); + + final ManagedChannel removedCh = removeChannel(endpoint); + + if (removedCh == null) { + // The channel has been removed and closed by another + return false; + } + + LOG.warn("Channel[{}] in [INACTIVE] state {} times, it has been removed from the pool.", endpoint, c); - LOG.warn("Channel[{}] in TRANSIENT_FAILURE state {} times, will be reset connect backoff.", endpoint, c); + if (removedCh != ch) { + // Now that it's removed, close it + ManagedChannelHelper.shutdownAndAwaitTermination(removedCh, 100); + } + + ManagedChannelHelper.shutdownAndAwaitTermination(ch, 100); - // For sub-channels that are in TRANSIENT_FAILURE state, short-circuit the backoff timer and make - // them reconnect immediately. May also attempt to invoke NameResolver#refresh - ch.resetConnectBackoff(); + return false; } + } diff --git a/jraft-extension/rpc-grpc-impl/src/test/java/com/alipay/sofa/jraft/rpc/ConnectionRefreshTest.java b/jraft-extension/rpc-grpc-impl/src/test/java/com/alipay/sofa/jraft/rpc/ConnectionRefreshTest.java new file mode 100644 index 000000000..92cf3fec1 --- /dev/null +++ b/jraft-extension/rpc-grpc-impl/src/test/java/com/alipay/sofa/jraft/rpc/ConnectionRefreshTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.rpc; + +import org.junit.Ignore; +import org.junit.Test; + +import com.alipay.sofa.jraft.rpc.impl.PingRequestProcessor; +import com.alipay.sofa.jraft.util.Endpoint; +import com.alipay.sofa.jraft.util.RpcFactoryHelper; + +/** + * + * @author jiachun.fjc + */ +public class ConnectionRefreshTest { + + @Ignore + @Test + public void simulation() throws InterruptedException { + ProtobufMsgFactory.load(); + + final RpcServer server = RpcFactoryHelper.rpcFactory().createRpcServer(new Endpoint("127.0.0.1", 19991)); + server.registerProcessor(new PingRequestProcessor()); + server.init(null); + + final Endpoint target = new Endpoint("my.test.host1.com", 19991); + + final RpcClient client = RpcFactoryHelper.rpcFactory().createRpcClient(); + client.init(null); + + final RpcRequests.PingRequest req = RpcRequests.PingRequest.newBuilder() // + .setSendTimestamp(System.currentTimeMillis()) // + .build(); + + for (int i = 0; i < 1000; i++) { + try { + final Object resp = client.invokeSync(target, req, 3000); + System.out.println(resp); + } catch (final Exception e) { + e.printStackTrace(); + } + Thread.sleep(1000); + } + } +}