Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: grpc conn refresh #690

Merged
merged 3 commits into from
Nov 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.jraft.rpc;

import org.junit.Ignore;
import org.junit.Test;

import com.alipay.sofa.jraft.rpc.impl.PingRequestProcessor;
import com.alipay.sofa.jraft.util.Endpoint;
import com.alipay.sofa.jraft.util.RpcFactoryHelper;

/**
*
* @author jiachun.fjc
*/
public class ConnectionRefreshTest {

@Ignore
@Test
public void simulation() throws InterruptedException {
ProtobufMsgFactory.load();

final RpcServer server = RpcFactoryHelper.rpcFactory().createRpcServer(new Endpoint("127.0.0.1", 19991));
server.registerProcessor(new PingRequestProcessor());
server.init(null);

final Endpoint target = new Endpoint("my.test.host1.com", 19991);

final RpcClient client = RpcFactoryHelper.rpcFactory().createRpcClient();
client.init(null);

final RpcRequests.PingRequest req = RpcRequests.PingRequest.newBuilder() //
.setSendTimestamp(System.currentTimeMillis()) //
.build();

for (int i = 0; i < 1000; i++) {
try {
final Object resp = client.invokeSync(target, req, 3000);
System.out.println(resp);
} catch (final Exception e) {
e.printStackTrace();
}
Thread.sleep(1000);
}
}
}
4 changes: 2 additions & 2 deletions jraft-core/src/test/resources/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@

<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{YYYY-MM-dd HH:mm:ss} [%t] %-5p %c{1}:%L - %msg%n"/>
<PatternLayout pattern="%d{YYYY-MM-dd HH:mm:ss.SSS} [%t] %-5p %c{1}:%L - %msg%n"/>
</Console>

<RollingFile name="RollingFile" filename="log/jraft-test.log"
filepattern="log/%d{YYYYMMddHHmmss}-jraft-test.log">
<PatternLayout pattern="%d{YYYY-MM-dd HH:mm:ss} [%t] %-5p %c{1}:%L - %msg%n"/>
<PatternLayout pattern="%d{YYYY-MM-dd HH:mm:ss.SSS} [%t] %-5p %c{1}:%L - %msg%n"/>
<Policies>
<SizeBasedTriggeringPolicy size="100 MB"/>
</Policies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,13 @@
*/
public class GrpcClient implements RpcClient {

private static final Logger LOG = LoggerFactory.getLogger(GrpcClient.class);
private static final Logger LOG = LoggerFactory.getLogger(GrpcClient.class);

private static final int MAX_FAILURES = SystemPropertyUtil.getInt(
"jraft.grpc.max.connect.failures", 20);
private static final int RESET_CONN_THRESHOLD = SystemPropertyUtil.getInt(
"jraft.grpc.max.conn.failures.to_reset", 2);

private final Map<Endpoint, ManagedChannel> managedChannelPool = new ConcurrentHashMap<>();
private final Map<Endpoint, AtomicInteger> transientFailures = new ConcurrentHashMap<>();
private final Map<Endpoint, ManagedChannel> managedChannelPool = new ConcurrentHashMap<>();
private final Map<Endpoint, AtomicInteger> transientFailures = new ConcurrentHashMap<>();
private final Map<String, Message> parserClasses;
private final MarshallerRegistry marshallerRegistry;
private volatile ReplicatorGroup replicatorGroup;
Expand Down Expand Up @@ -140,10 +140,16 @@ public void invokeAsync(final Endpoint endpoint, final Object request, final Inv
Requires.requireNonNull(endpoint, "endpoint");
Requires.requireNonNull(request, "request");

final Channel ch = getChannel(endpoint);
final Executor executor = callback.executor() != null ? callback.executor() : DirectExecutor.INSTANCE;

final Channel ch = getCheckedChannel(endpoint);
if (ch == null) {
executor.execute(() -> callback.complete(null, new RemotingException("Fail to connect: " + endpoint)));
return;
}

final MethodDescriptor<Message, Message> method = getCallMethod(request);
final CallOptions callOpts = CallOptions.DEFAULT.withDeadlineAfter(timeoutMs, TimeUnit.MILLISECONDS);
final Executor executor = callback.executor() != null ? callback.executor() : DirectExecutor.INSTANCE;

ClientCalls.asyncUnaryCall(ch.newCall(method, callOpts), (Message) request, new StreamObserver<Message>() {

Expand Down Expand Up @@ -178,39 +184,102 @@ private MethodDescriptor<Message, Message> getCallMethod(final Object request) {
.build();
}

private ManagedChannel getChannel(final Endpoint endpoint) {
return this.managedChannelPool.computeIfAbsent(endpoint, ep -> {
final ManagedChannel ch = ManagedChannelBuilder.forAddress(ep.getIp(), ep.getPort()) //
.usePlaintext() //
.directExecutor() //
.maxInboundMessageSize(GrpcRaftRpcFactory.RPC_MAX_INBOUND_MESSAGE_SIZE) //
.build();
// channel connection event
ch.notifyWhenStateChanged(ConnectivityState.READY, () -> {
final ReplicatorGroup rpGroup = replicatorGroup;
if (rpGroup != null) {
try {
RpcUtils.runInThread(() -> {
final PeerId peer = new PeerId();
if (peer.parse(ep.toString())) {
LOG.info("Peer {} is connected.", peer);
rpGroup.checkReplicator(peer, true);
} else {
LOG.error("Fail to parse peer: {}.", ep);
}
});
} catch (final Throwable t) {
LOG.error("Fail to check replicator {}.", ep, t);
}
}
});
ch.notifyWhenStateChanged(ConnectivityState.TRANSIENT_FAILURE,
() -> LOG.warn("Channel in TRANSIENT_FAILURE state: {}.", ep));
ch.notifyWhenStateChanged(ConnectivityState.SHUTDOWN,
() -> LOG.warn("Channel in SHUTDOWN state: {}.", ep));
private ManagedChannel getCheckedChannel(final Endpoint endpoint) {
final ManagedChannel ch = getChannel(endpoint, true);

if (checkConnectivity(endpoint, ch)) {
return ch;
});
}

return null;
}

private ManagedChannel getChannel(final Endpoint endpoint, final boolean createIfAbsent) {
if (createIfAbsent) {
return this.managedChannelPool.computeIfAbsent(endpoint, this::newChannel);
} else {
return this.managedChannelPool.get(endpoint);
}
}

private ManagedChannel newChannel(final Endpoint endpoint) {
final ManagedChannel ch = ManagedChannelBuilder.forAddress(endpoint.getIp(), endpoint.getPort()) //
.usePlaintext() //
.directExecutor() //
.maxInboundMessageSize(GrpcRaftRpcFactory.RPC_MAX_INBOUND_MESSAGE_SIZE) //
.build();

LOG.info("Creating new channel to: {}.", endpoint);

// The init channel state is IDLE
notifyWhenStateChanged(ConnectivityState.IDLE, endpoint, ch);

return ch;
}

private ManagedChannel removeChannel(final Endpoint endpoint) {
return this.managedChannelPool.remove(endpoint);
}

private void notifyWhenStateChanged(final ConnectivityState state, final Endpoint endpoint, final ManagedChannel ch) {
ch.notifyWhenStateChanged(state, () -> onStateChanged(endpoint, ch));
}

private void onStateChanged(final Endpoint endpoint, final ManagedChannel ch) {
final ConnectivityState state = ch.getState(false);

LOG.info("The channel {} is in state: {}.", endpoint, state);

switch (state) {
case READY:
notifyReady(endpoint);
notifyWhenStateChanged(ConnectivityState.READY, endpoint, ch);
break;
case TRANSIENT_FAILURE:
notifyFailure(endpoint);
notifyWhenStateChanged(ConnectivityState.TRANSIENT_FAILURE, endpoint, ch);
break;
case SHUTDOWN:
notifyShutdown(endpoint);
break;
case CONNECTING:
notifyWhenStateChanged(ConnectivityState.CONNECTING, endpoint, ch);
break;
case IDLE:
notifyWhenStateChanged(ConnectivityState.IDLE, endpoint, ch);
break;
}
}

private void notifyReady(final Endpoint endpoint) {
LOG.info("The channel {} has successfully established.", endpoint);

clearConnFailuresCount(endpoint);

final ReplicatorGroup rpGroup = this.replicatorGroup;
if (rpGroup != null) {
try {
RpcUtils.runInThread(() -> {
final PeerId peer = new PeerId();
if (peer.parse(endpoint.toString())) {
LOG.info("Peer {} is connected.", peer);
rpGroup.checkReplicator(peer, true);
} else {
LOG.error("Fail to parse peer: {}.", endpoint);
}
});
} catch (final Throwable t) {
LOG.error("Fail to check replicator {}.", endpoint, t);
}
}
}

private void notifyFailure(final Endpoint endpoint) {
LOG.warn("There has been some transient failure on this channel {}.", endpoint);
}

private void notifyShutdown(final Endpoint endpoint) {
LOG.warn("This channel {} has started shutting down. Any new RPCs should fail immediately.", endpoint);
}

private void closeAllChannels() {
Expand All @@ -219,33 +288,71 @@ private void closeAllChannels() {
LOG.info("Shutdown managed channel: {}, {}.", entry.getKey(), ch);
ManagedChannelHelper.shutdownAndAwaitTermination(ch);
}
this.managedChannelPool.clear();
}

private void closeChannel(final Endpoint endpoint) {
final ManagedChannel ch = this.managedChannelPool.remove(endpoint);
final ManagedChannel ch = removeChannel(endpoint);
LOG.info("Close connection: {}, {}.", endpoint, ch);
if (ch != null) {
ManagedChannelHelper.shutdownAndAwaitTermination(ch);
}
}

private boolean checkChannel(final Endpoint endpoint, final boolean createIfAbsent) {
ManagedChannel ch = this.managedChannelPool.get(endpoint);
if (ch == null && createIfAbsent) {
ch = getChannel(endpoint);
}
final ManagedChannel ch = getChannel(endpoint, createIfAbsent);

if (ch == null) {
return false;
}
final ConnectivityState st = ch.getState(true);
if (st == ConnectivityState.TRANSIENT_FAILURE) {
final AtomicInteger num = this.transientFailures.computeIfAbsent(endpoint, ep -> new AtomicInteger());
if (num.incrementAndGet() > MAX_FAILURES) {
this.transientFailures.remove(endpoint);
LOG.warn("Channel[{}] in {} state {} times, will be reset connect backoff.", endpoint, st, num.get());

return checkConnectivity(endpoint, ch);
}

private int incConnFailuresCount(final Endpoint endpoint) {
return this.transientFailures.computeIfAbsent(endpoint, ep -> new AtomicInteger()).incrementAndGet();
}

private void clearConnFailuresCount(final Endpoint endpoint) {
this.transientFailures.remove(endpoint);
}

private boolean checkConnectivity(final Endpoint endpoint, final ManagedChannel ch) {
final ConnectivityState st = ch.getState(false);

if (st != ConnectivityState.TRANSIENT_FAILURE && st != ConnectivityState.SHUTDOWN) {
return true;
}

final int c = incConnFailuresCount(endpoint);
SteNicholas marked this conversation as resolved.
Show resolved Hide resolved
if (c < RESET_CONN_THRESHOLD) {
if (c == RESET_CONN_THRESHOLD - 1) {
// For sub-channels that are in TRANSIENT_FAILURE state, short-circuit the backoff timer and make
// them reconnect immediately. May also attempt to invoke NameResolver#refresh
ch.resetConnectBackoff();
}
return true;
}
return st != ConnectivityState.TRANSIENT_FAILURE && st != ConnectivityState.SHUTDOWN;

clearConnFailuresCount(endpoint);

final ManagedChannel removedCh = removeChannel(endpoint);

if (removedCh == null) {
// The channel has been removed and closed by another
return false;
}

LOG.warn("Channel[{}] in [INACTIVE] state {} times, it has been removed from the pool.", endpoint, c);

if (removedCh != ch) {
// Now that it's removed, close it
ManagedChannelHelper.shutdownAndAwaitTermination(removedCh, 100);
}

ManagedChannelHelper.shutdownAndAwaitTermination(ch, 100);

return false;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.jraft.rpc;

import org.junit.Ignore;
import org.junit.Test;

import com.alipay.sofa.jraft.rpc.impl.PingRequestProcessor;
import com.alipay.sofa.jraft.util.Endpoint;
import com.alipay.sofa.jraft.util.RpcFactoryHelper;

/**
*
* @author jiachun.fjc
*/
public class ConnectionRefreshTest {

@Ignore
@Test
public void simulation() throws InterruptedException {
ProtobufMsgFactory.load();

final RpcServer server = RpcFactoryHelper.rpcFactory().createRpcServer(new Endpoint("127.0.0.1", 19991));
server.registerProcessor(new PingRequestProcessor());
server.init(null);

final Endpoint target = new Endpoint("my.test.host1.com", 19991);

final RpcClient client = RpcFactoryHelper.rpcFactory().createRpcClient();
client.init(null);

final RpcRequests.PingRequest req = RpcRequests.PingRequest.newBuilder() //
.setSendTimestamp(System.currentTimeMillis()) //
.build();

for (int i = 0; i < 1000; i++) {
try {
final Object resp = client.invokeSync(target, req, 3000);
System.out.println(resp);
} catch (final Exception e) {
e.printStackTrace();
}
Thread.sleep(1000);
}
}
}