diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/rpc/ConnectionRefreshTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/rpc/ConnectionRefreshTest.java new file mode 100644 index 000000000..92cf3fec1 --- /dev/null +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/rpc/ConnectionRefreshTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.rpc; + +import org.junit.Ignore; +import org.junit.Test; + +import com.alipay.sofa.jraft.rpc.impl.PingRequestProcessor; +import com.alipay.sofa.jraft.util.Endpoint; +import com.alipay.sofa.jraft.util.RpcFactoryHelper; + +/** + * + * @author jiachun.fjc + */ +public class ConnectionRefreshTest { + + @Ignore + @Test + public void simulation() throws InterruptedException { + ProtobufMsgFactory.load(); + + final RpcServer server = RpcFactoryHelper.rpcFactory().createRpcServer(new Endpoint("127.0.0.1", 19991)); + server.registerProcessor(new PingRequestProcessor()); + server.init(null); + + final Endpoint target = new Endpoint("my.test.host1.com", 19991); + + final RpcClient client = RpcFactoryHelper.rpcFactory().createRpcClient(); + client.init(null); + + final RpcRequests.PingRequest req = RpcRequests.PingRequest.newBuilder() // + .setSendTimestamp(System.currentTimeMillis()) // + .build(); + + for (int i = 0; i < 1000; i++) { + try { + final Object resp = client.invokeSync(target, req, 3000); + System.out.println(resp); + } catch (final Exception e) { + e.printStackTrace(); + } + Thread.sleep(1000); + } + } +} diff --git a/jraft-core/src/test/resources/log4j2.xml b/jraft-core/src/test/resources/log4j2.xml index 0d4541357..d53e7655a 100644 --- a/jraft-core/src/test/resources/log4j2.xml +++ b/jraft-core/src/test/resources/log4j2.xml @@ -3,12 +3,12 @@ - + - + diff --git a/jraft-extension/rpc-grpc-impl/src/main/java/com/alipay/sofa/jraft/rpc/impl/GrpcClient.java b/jraft-extension/rpc-grpc-impl/src/main/java/com/alipay/sofa/jraft/rpc/impl/GrpcClient.java index 7d29a54ad..256611a79 100644 --- a/jraft-extension/rpc-grpc-impl/src/main/java/com/alipay/sofa/jraft/rpc/impl/GrpcClient.java +++ b/jraft-extension/rpc-grpc-impl/src/main/java/com/alipay/sofa/jraft/rpc/impl/GrpcClient.java @@ -60,13 +60,13 @@ */ public class GrpcClient implements RpcClient { - private static final Logger LOG = LoggerFactory.getLogger(GrpcClient.class); + private static final Logger LOG = LoggerFactory.getLogger(GrpcClient.class); - private static final int MAX_FAILURES = SystemPropertyUtil.getInt( - "jraft.grpc.max.connect.failures", 20); + private static final int RESET_CONN_THRESHOLD = SystemPropertyUtil.getInt( + "jraft.grpc.max.conn.failures.to_reset", 2); - private final Map managedChannelPool = new ConcurrentHashMap<>(); - private final Map transientFailures = new ConcurrentHashMap<>(); + private final Map managedChannelPool = new ConcurrentHashMap<>(); + private final Map transientFailures = new ConcurrentHashMap<>(); private final Map parserClasses; private final MarshallerRegistry marshallerRegistry; private volatile ReplicatorGroup replicatorGroup; @@ -140,10 +140,16 @@ public void invokeAsync(final Endpoint endpoint, final Object request, final Inv Requires.requireNonNull(endpoint, "endpoint"); Requires.requireNonNull(request, "request"); - final Channel ch = getChannel(endpoint); + final Executor executor = callback.executor() != null ? callback.executor() : DirectExecutor.INSTANCE; + + final Channel ch = getCheckedChannel(endpoint); + if (ch == null) { + executor.execute(() -> callback.complete(null, new RemotingException("Fail to connect: " + endpoint))); + return; + } + final MethodDescriptor method = getCallMethod(request); final CallOptions callOpts = CallOptions.DEFAULT.withDeadlineAfter(timeoutMs, TimeUnit.MILLISECONDS); - final Executor executor = callback.executor() != null ? callback.executor() : DirectExecutor.INSTANCE; ClientCalls.asyncUnaryCall(ch.newCall(method, callOpts), (Message) request, new StreamObserver() { @@ -178,39 +184,102 @@ private MethodDescriptor getCallMethod(final Object request) { .build(); } - private ManagedChannel getChannel(final Endpoint endpoint) { - return this.managedChannelPool.computeIfAbsent(endpoint, ep -> { - final ManagedChannel ch = ManagedChannelBuilder.forAddress(ep.getIp(), ep.getPort()) // - .usePlaintext() // - .directExecutor() // - .maxInboundMessageSize(GrpcRaftRpcFactory.RPC_MAX_INBOUND_MESSAGE_SIZE) // - .build(); - // channel connection event - ch.notifyWhenStateChanged(ConnectivityState.READY, () -> { - final ReplicatorGroup rpGroup = replicatorGroup; - if (rpGroup != null) { - try { - RpcUtils.runInThread(() -> { - final PeerId peer = new PeerId(); - if (peer.parse(ep.toString())) { - LOG.info("Peer {} is connected.", peer); - rpGroup.checkReplicator(peer, true); - } else { - LOG.error("Fail to parse peer: {}.", ep); - } - }); - } catch (final Throwable t) { - LOG.error("Fail to check replicator {}.", ep, t); - } - } - }); - ch.notifyWhenStateChanged(ConnectivityState.TRANSIENT_FAILURE, - () -> LOG.warn("Channel in TRANSIENT_FAILURE state: {}.", ep)); - ch.notifyWhenStateChanged(ConnectivityState.SHUTDOWN, - () -> LOG.warn("Channel in SHUTDOWN state: {}.", ep)); + private ManagedChannel getCheckedChannel(final Endpoint endpoint) { + final ManagedChannel ch = getChannel(endpoint, true); + if (checkConnectivity(endpoint, ch)) { return ch; - }); + } + + return null; + } + + private ManagedChannel getChannel(final Endpoint endpoint, final boolean createIfAbsent) { + if (createIfAbsent) { + return this.managedChannelPool.computeIfAbsent(endpoint, this::newChannel); + } else { + return this.managedChannelPool.get(endpoint); + } + } + + private ManagedChannel newChannel(final Endpoint endpoint) { + final ManagedChannel ch = ManagedChannelBuilder.forAddress(endpoint.getIp(), endpoint.getPort()) // + .usePlaintext() // + .directExecutor() // + .maxInboundMessageSize(GrpcRaftRpcFactory.RPC_MAX_INBOUND_MESSAGE_SIZE) // + .build(); + + LOG.info("Creating new channel to: {}.", endpoint); + + // The init channel state is IDLE + notifyWhenStateChanged(ConnectivityState.IDLE, endpoint, ch); + + return ch; + } + + private ManagedChannel removeChannel(final Endpoint endpoint) { + return this.managedChannelPool.remove(endpoint); + } + + private void notifyWhenStateChanged(final ConnectivityState state, final Endpoint endpoint, final ManagedChannel ch) { + ch.notifyWhenStateChanged(state, () -> onStateChanged(endpoint, ch)); + } + + private void onStateChanged(final Endpoint endpoint, final ManagedChannel ch) { + final ConnectivityState state = ch.getState(false); + + LOG.info("The channel {} is in state: {}.", endpoint, state); + + switch (state) { + case READY: + notifyReady(endpoint); + notifyWhenStateChanged(ConnectivityState.READY, endpoint, ch); + break; + case TRANSIENT_FAILURE: + notifyFailure(endpoint); + notifyWhenStateChanged(ConnectivityState.TRANSIENT_FAILURE, endpoint, ch); + break; + case SHUTDOWN: + notifyShutdown(endpoint); + break; + case CONNECTING: + notifyWhenStateChanged(ConnectivityState.CONNECTING, endpoint, ch); + break; + case IDLE: + notifyWhenStateChanged(ConnectivityState.IDLE, endpoint, ch); + break; + } + } + + private void notifyReady(final Endpoint endpoint) { + LOG.info("The channel {} has successfully established.", endpoint); + + clearConnFailuresCount(endpoint); + + final ReplicatorGroup rpGroup = this.replicatorGroup; + if (rpGroup != null) { + try { + RpcUtils.runInThread(() -> { + final PeerId peer = new PeerId(); + if (peer.parse(endpoint.toString())) { + LOG.info("Peer {} is connected.", peer); + rpGroup.checkReplicator(peer, true); + } else { + LOG.error("Fail to parse peer: {}.", endpoint); + } + }); + } catch (final Throwable t) { + LOG.error("Fail to check replicator {}.", endpoint, t); + } + } + } + + private void notifyFailure(final Endpoint endpoint) { + LOG.warn("There has been some transient failure on this channel {}.", endpoint); + } + + private void notifyShutdown(final Endpoint endpoint) { + LOG.warn("This channel {} has started shutting down. Any new RPCs should fail immediately.", endpoint); } private void closeAllChannels() { @@ -219,10 +288,11 @@ private void closeAllChannels() { LOG.info("Shutdown managed channel: {}, {}.", entry.getKey(), ch); ManagedChannelHelper.shutdownAndAwaitTermination(ch); } + this.managedChannelPool.clear(); } private void closeChannel(final Endpoint endpoint) { - final ManagedChannel ch = this.managedChannelPool.remove(endpoint); + final ManagedChannel ch = removeChannel(endpoint); LOG.info("Close connection: {}, {}.", endpoint, ch); if (ch != null) { ManagedChannelHelper.shutdownAndAwaitTermination(ch); @@ -230,22 +300,59 @@ private void closeChannel(final Endpoint endpoint) { } private boolean checkChannel(final Endpoint endpoint, final boolean createIfAbsent) { - ManagedChannel ch = this.managedChannelPool.get(endpoint); - if (ch == null && createIfAbsent) { - ch = getChannel(endpoint); - } + final ManagedChannel ch = getChannel(endpoint, createIfAbsent); + if (ch == null) { return false; } - final ConnectivityState st = ch.getState(true); - if (st == ConnectivityState.TRANSIENT_FAILURE) { - final AtomicInteger num = this.transientFailures.computeIfAbsent(endpoint, ep -> new AtomicInteger()); - if (num.incrementAndGet() > MAX_FAILURES) { - this.transientFailures.remove(endpoint); - LOG.warn("Channel[{}] in {} state {} times, will be reset connect backoff.", endpoint, st, num.get()); + + return checkConnectivity(endpoint, ch); + } + + private int incConnFailuresCount(final Endpoint endpoint) { + return this.transientFailures.computeIfAbsent(endpoint, ep -> new AtomicInteger()).incrementAndGet(); + } + + private void clearConnFailuresCount(final Endpoint endpoint) { + this.transientFailures.remove(endpoint); + } + + private boolean checkConnectivity(final Endpoint endpoint, final ManagedChannel ch) { + final ConnectivityState st = ch.getState(false); + + if (st != ConnectivityState.TRANSIENT_FAILURE && st != ConnectivityState.SHUTDOWN) { + return true; + } + + final int c = incConnFailuresCount(endpoint); + if (c < RESET_CONN_THRESHOLD) { + if (c == RESET_CONN_THRESHOLD - 1) { + // For sub-channels that are in TRANSIENT_FAILURE state, short-circuit the backoff timer and make + // them reconnect immediately. May also attempt to invoke NameResolver#refresh ch.resetConnectBackoff(); } + return true; } - return st != ConnectivityState.TRANSIENT_FAILURE && st != ConnectivityState.SHUTDOWN; + + clearConnFailuresCount(endpoint); + + final ManagedChannel removedCh = removeChannel(endpoint); + + if (removedCh == null) { + // The channel has been removed and closed by another + return false; + } + + LOG.warn("Channel[{}] in [INACTIVE] state {} times, it has been removed from the pool.", endpoint, c); + + if (removedCh != ch) { + // Now that it's removed, close it + ManagedChannelHelper.shutdownAndAwaitTermination(removedCh, 100); + } + + ManagedChannelHelper.shutdownAndAwaitTermination(ch, 100); + + return false; } + } diff --git a/jraft-extension/rpc-grpc-impl/src/test/java/com/alipay/sofa/jraft/rpc/ConnectionRefreshTest.java b/jraft-extension/rpc-grpc-impl/src/test/java/com/alipay/sofa/jraft/rpc/ConnectionRefreshTest.java new file mode 100644 index 000000000..92cf3fec1 --- /dev/null +++ b/jraft-extension/rpc-grpc-impl/src/test/java/com/alipay/sofa/jraft/rpc/ConnectionRefreshTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.rpc; + +import org.junit.Ignore; +import org.junit.Test; + +import com.alipay.sofa.jraft.rpc.impl.PingRequestProcessor; +import com.alipay.sofa.jraft.util.Endpoint; +import com.alipay.sofa.jraft.util.RpcFactoryHelper; + +/** + * + * @author jiachun.fjc + */ +public class ConnectionRefreshTest { + + @Ignore + @Test + public void simulation() throws InterruptedException { + ProtobufMsgFactory.load(); + + final RpcServer server = RpcFactoryHelper.rpcFactory().createRpcServer(new Endpoint("127.0.0.1", 19991)); + server.registerProcessor(new PingRequestProcessor()); + server.init(null); + + final Endpoint target = new Endpoint("my.test.host1.com", 19991); + + final RpcClient client = RpcFactoryHelper.rpcFactory().createRpcClient(); + client.init(null); + + final RpcRequests.PingRequest req = RpcRequests.PingRequest.newBuilder() // + .setSendTimestamp(System.currentTimeMillis()) // + .build(); + + for (int i = 0; i < 1000; i++) { + try { + final Object resp = client.invokeSync(target, req, 3000); + System.out.println(resp); + } catch (final Exception e) { + e.printStackTrace(); + } + Thread.sleep(1000); + } + } +}