Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: grpc conn refresh #690

Merged
merged 3 commits into from
Nov 23, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions jraft-core/src/test/resources/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@

<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{YYYY-MM-dd HH:mm:ss} [%t] %-5p %c{1}:%L - %msg%n"/>
<PatternLayout pattern="%d{YYYY-MM-dd HH:mm:ss.SSS} [%t] %-5p %c{1}:%L - %msg%n"/>
</Console>

<RollingFile name="RollingFile" filename="log/jraft-test.log"
filepattern="log/%d{YYYYMMddHHmmss}-jraft-test.log">
<PatternLayout pattern="%d{YYYY-MM-dd HH:mm:ss} [%t] %-5p %c{1}:%L - %msg%n"/>
<PatternLayout pattern="%d{YYYY-MM-dd HH:mm:ss.SSS} [%t] %-5p %c{1}:%L - %msg%n"/>
<Policies>
<SizeBasedTriggeringPolicy size="100 MB"/>
</Policies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,15 @@
*/
public class GrpcClient implements RpcClient {

private static final Logger LOG = LoggerFactory.getLogger(GrpcClient.class);
private static final Logger LOG = LoggerFactory.getLogger(GrpcClient.class);

private static final int MAX_FAILURES = SystemPropertyUtil.getInt(
"jraft.grpc.max.connect.failures", 20);
private static final int RESET_BACKOFF_THRESHOLD = SystemPropertyUtil
.getInt(
"jraft.grpc.max.conn.failures.reset_backoff",
2);

private final Map<Endpoint, ManagedChannel> managedChannelPool = new ConcurrentHashMap<>();
private final Map<Endpoint, AtomicInteger> transientFailures = new ConcurrentHashMap<>();
private final Map<Endpoint, ManagedChannel> managedChannelPool = new ConcurrentHashMap<>();
private final Map<Endpoint, AtomicInteger> transientFailures = new ConcurrentHashMap<>();
private final Map<String, Message> parserClasses;
private final MarshallerRegistry marshallerRegistry;
private volatile ReplicatorGroup replicatorGroup;
Expand Down Expand Up @@ -185,44 +187,68 @@ private ManagedChannel getChannel(final Endpoint endpoint) {
.directExecutor() //
.maxInboundMessageSize(GrpcRaftRpcFactory.RPC_MAX_INBOUND_MESSAGE_SIZE) //
.build();

// channel connection event
ch.notifyWhenStateChanged(ConnectivityState.READY, () -> {
final ReplicatorGroup rpGroup = replicatorGroup;
if (rpGroup != null) {
try {
RpcUtils.runInThread(() -> {
final PeerId peer = new PeerId();
if (peer.parse(ep.toString())) {
LOG.info("Peer {} is connected.", peer);
rpGroup.checkReplicator(peer, true);
} else {
LOG.error("Fail to parse peer: {}.", ep);
}
});
} catch (final Throwable t) {
LOG.error("Fail to check replicator {}.", ep, t);
}
}
});
ch.notifyWhenStateChanged(ConnectivityState.TRANSIENT_FAILURE,
() -> LOG.warn("Channel in TRANSIENT_FAILURE state: {}.", ep));
ch.notifyWhenStateChanged(ConnectivityState.SHUTDOWN,
() -> LOG.warn("Channel in SHUTDOWN state: {}.", ep));
ch.notifyWhenStateChanged(ConnectivityState.READY, () -> onChannelReady(ep));
ch.notifyWhenStateChanged(ConnectivityState.TRANSIENT_FAILURE, () -> onChannelFailure(ep, ch));
ch.notifyWhenStateChanged(ConnectivityState.SHUTDOWN, () -> onChannelShutdown(ep));

return ch;
});
}

private ManagedChannel removeChannel(final Endpoint endpoint) {
return this.managedChannelPool.remove(endpoint);
}

private void onChannelReady(final Endpoint endpoint) {
LOG.info("The channel {} has successfully established.", endpoint);
final ReplicatorGroup rpGroup = this.replicatorGroup;
if (rpGroup != null) {
try {
RpcUtils.runInThread(() -> {
final PeerId peer = new PeerId();
if (peer.parse(endpoint.toString())) {
LOG.info("Peer {} is connected.", peer);
rpGroup.checkReplicator(peer, true);
} else {
LOG.error("Fail to parse peer: {}.", endpoint);
}
});
} catch (final Throwable t) {
LOG.error("Fail to check replicator {}.", endpoint, t);
}
}
}

private void onChannelFailure(final Endpoint endpoint, final ManagedChannel ch) {
LOG.warn("There has been some transient failure on this channel {}.", endpoint);
RpcUtils.runInThread(() -> {
if (ch == null) {
return;
}
// double-check
if (ch.getState(false) == ConnectivityState.TRANSIENT_FAILURE) {
mayResetConnectBackoff(endpoint, ch);
}
});
}

private void onChannelShutdown(final Endpoint endpoint) {
LOG.warn("This channel {} has started shutting down. Any new RPCs should fail immediately.", endpoint);
}

private void closeAllChannels() {
for (final Map.Entry<Endpoint, ManagedChannel> entry : this.managedChannelPool.entrySet()) {
final ManagedChannel ch = entry.getValue();
LOG.info("Shutdown managed channel: {}, {}.", entry.getKey(), ch);
ManagedChannelHelper.shutdownAndAwaitTermination(ch);
}
this.managedChannelPool.clear();
}

private void closeChannel(final Endpoint endpoint) {
final ManagedChannel ch = this.managedChannelPool.remove(endpoint);
final ManagedChannel ch = removeChannel(endpoint);
LOG.info("Close connection: {}, {}.", endpoint, ch);
if (ch != null) {
ManagedChannelHelper.shutdownAndAwaitTermination(ch);
Expand All @@ -231,21 +257,43 @@ private void closeChannel(final Endpoint endpoint) {

private boolean checkChannel(final Endpoint endpoint, final boolean createIfAbsent) {
ManagedChannel ch = this.managedChannelPool.get(endpoint);

if (ch == null && createIfAbsent) {
ch = getChannel(endpoint);
}

if (ch == null) {
return false;
}
final ConnectivityState st = ch.getState(true);

ConnectivityState st = ch.getState(true);

if (st == ConnectivityState.TRANSIENT_FAILURE) {
final AtomicInteger num = this.transientFailures.computeIfAbsent(endpoint, ep -> new AtomicInteger());
if (num.incrementAndGet() > MAX_FAILURES) {
this.transientFailures.remove(endpoint);
LOG.warn("Channel[{}] in {} state {} times, will be reset connect backoff.", endpoint, st, num.get());
ch.resetConnectBackoff();
}
mayResetConnectBackoff(endpoint, ch);
st = ch.getState(false);
}

LOG.debug("Channel[{}] in {} state.", ch, st);

return st != ConnectivityState.TRANSIENT_FAILURE && st != ConnectivityState.SHUTDOWN;
}

private int incConnFailuresCount(final Endpoint endpoint) {
return this.transientFailures.computeIfAbsent(endpoint, ep -> new AtomicInteger()).incrementAndGet();
}

private void mayResetConnectBackoff(final Endpoint endpoint, final ManagedChannel ch) {
final int c = incConnFailuresCount(endpoint);
SteNicholas marked this conversation as resolved.
Show resolved Hide resolved
if (c < RESET_BACKOFF_THRESHOLD) {
return;
}

this.transientFailures.remove(endpoint);

LOG.warn("Channel[{}] in TRANSIENT_FAILURE state {} times, will be reset connect backoff.", endpoint, c);

// For sub-channels that are in TRANSIENT_FAILURE state, short-circuit the backoff timer and make
// them reconnect immediately. May also attempt to invoke NameResolver#refresh
ch.resetConnectBackoff();
}
}