diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 16d70b23be..98edd34fe3 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -157,8 +157,7 @@ method onNewPeer(g: GossipSub, peer: PubSubPeer) = peer.behaviourPenalty = stats.behaviourPenalty # Check if the score is below the threshold and disconnect the peer if necessary - if not g.disconnectIfBadPeer(peer, stats.score, g.parameters.graylistThreshold): - g.disconnectIfBadTrafficPeer(peer) + g.disconnectIfBadPeer(peer, stats.score) peer.iHaveBudget = IHavePeerBudget peer.pingBudget = PingsPeerBudget @@ -359,6 +358,10 @@ method rpcHandler*(g: GossipSub, peer: PubSubPeer, rpcMsg: RPCMsg) {.async.} = + if peer.shouldDisconnectPeer: + discard g.disconnectPeer(peer) + return + if rpcMsg.ping.len in 1..<64 and peer.pingBudget > 0: g.send(peer, RPCMsg(pong: rpcMsg.ping)) peer.pingBudget.dec @@ -644,3 +647,9 @@ method initPubSub*(g: GossipSub) # init gossip stuff g.mcache = MCache.init(g.parameters.historyGossip, g.parameters.historyLength) + +method shoulDisconnectPeer*(g: GossipSub, peer: PubSubPeer, score: float64): bool = + if g.parameters.disconnectBadPeers and score < g.parameters.graylistThreshold and + peer.peerId notin g.parameters.directPeers: + return true + return false \ No newline at end of file diff --git a/libp2p/protocols/pubsub/gossipsub/scoring.nim b/libp2p/protocols/pubsub/gossipsub/scoring.nim index ec85cfbe64..384e172e61 100644 --- a/libp2p/protocols/pubsub/gossipsub/scoring.nim +++ b/libp2p/protocols/pubsub/gossipsub/scoring.nim @@ -11,16 +11,17 @@ import std/[tables, sets] import chronos, chronicles, metrics +import chronos/ratelimit import "."/[types] import ".."/[pubsubpeer] import ../rpc/messages import "../../.."/[peerid, multiaddress, switch, utils/heartbeat] +import ../pubsub logScope: topics = "libp2p gossipsub" declareGauge(libp2p_gossipsub_peers_scores, "the scores of the peers in gossipsub", labels = ["agent"]) -declareCounter(libp2p_gossipsub_bad_score_disconnection, "the number of peers disconnected by gossipsub", labels = ["agent"]) declareGauge(libp2p_gossipsub_peers_score_firstMessageDeliveries, "Detailed gossipsub scoring metric", labels = ["agent"]) declareGauge(libp2p_gossipsub_peers_score_meshMessageDeliveries, "Detailed gossipsub scoring metric", labels = ["agent"]) declareGauge(libp2p_gossipsub_peers_score_meshFailurePenalty, "Detailed gossipsub scoring metric", labels = ["agent"]) @@ -90,50 +91,6 @@ proc colocationFactor(g: GossipSub, peer: PubSubPeer): float64 = {.pop.} -proc getAgent(peer: PubSubPeer): string = - return - when defined(libp2p_agents_metrics): - if peer.shortAgent.len > 0: - peer.shortAgent - else: - "unknown" - else: - "unknown" - -proc disconnectPeer(g: GossipSub, peer: PubSubPeer) {.async.} = - let agent = getAgent(peer) - libp2p_gossipsub_bad_score_disconnection.inc(labelValues = [agent]) - - try: - await g.switch.disconnect(peer.peerId) - except CatchableError as exc: # Never cancelled - trace "Failed to close connection", peer, error = exc.name, msg = exc.msg - -proc disconnectIfBadPeer*(g: GossipSub, peer: PubSubPeer, score: float64, threshold: float64): bool = - if g.parameters.disconnectBadPeers and score < threshold and - peer.peerId notin g.parameters.directPeers: - debug "disconnecting bad score peer", peer, score, threshold - asyncSpawn(g.disconnectPeer(peer)) - return true - - return false - -proc disconnectIfBadTrafficPeer*(g: GossipSub, peer: PubSubPeer) = - if peer.totalTraffic > 0: - # dividing in this way to avoid integer overflow - let agent = getAgent(peer) - let invalidTrafficRatio = float64(peer.invalidTraffic) / float64(peer.totalTraffic) - let invalidIgnoredTrafficRatio = float64(peer.invalidIgnoredTraffic) / float64(peer.totalTraffic) - let totalInvalidTrafficRatio = invalidTrafficRatio + invalidIgnoredTrafficRatio - libp2p_gossipsub_peers_invalidTraffic_bytes.inc(float64(peer.invalidTraffic), labelValues = [agent]) - libp2p_gossipsub_peers_invalidIgnoredTraffic_bytes.inc(float64(peer.invalidIgnoredTraffic), labelValues = [agent]) - libp2p_gossipsub_peers_totalTraffic_bytes.inc(float64(peer.totalTraffic), labelValues = [agent]) - - # inverting the signs as we want to compare if the ratio is above the threshold - if g.disconnectIfBadPeer(peer, -totalInvalidTrafficRatio, -g.parameters.invalidTrafficRatioThreshold): - libp2p_gossipsub_peers_badTrafficScorePeerDisconnections.inc(labelValues = [getAgent(peer)]) - debug "Bad traffic score peer disconnected", peer - proc updateScores*(g: GossipSub) = # avoid async ## https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#the-score-function ## @@ -202,7 +159,7 @@ proc updateScores*(g: GossipSub) = # avoid async score += topicScore * topicParams.topicWeight # Score metrics - let agent = getAgent(peer) + let agent = peer.getAgent() libp2p_gossipsub_peers_score_firstMessageDeliveries.inc(info.firstMessageDeliveries, labelValues = [agent]) libp2p_gossipsub_peers_score_meshMessageDeliveries.inc(info.meshMessageDeliveries, labelValues = [agent]) libp2p_gossipsub_peers_score_meshFailurePenalty.inc(info.meshFailurePenalty, labelValues = [agent]) @@ -239,7 +196,7 @@ proc updateScores*(g: GossipSub) = # avoid async score += colocationFactor * g.parameters.ipColocationFactorWeight # Score metrics - let agent = getAgent(peer) + let agent = peer.getAgent() libp2p_gossipsub_peers_score_appScore.inc(peer.appScore, labelValues = [agent]) libp2p_gossipsub_peers_score_behaviourPenalty.inc(peer.behaviourPenalty, labelValues = [agent]) libp2p_gossipsub_peers_score_colocationFactor.inc(colocationFactor, labelValues = [agent]) @@ -259,10 +216,7 @@ proc updateScores*(g: GossipSub) = # avoid async trace "updated peer's score", peer, score = peer.score, n_topics, is_grafted - if not g.disconnectIfBadPeer(peer, stats.score, g.parameters.graylistThreshold): - # this is necessary to disconnect peers that send only non-parseable messages as the message won't go through the normal path - g.disconnectIfBadTrafficPeer(peer) - + g.disconnectIfBadPeer(peer, stats.score) libp2p_gossipsub_peers_scores.inc(peer.score, labelValues = [agent]) for peer in evicting: @@ -276,8 +230,8 @@ proc scoringHeartbeat*(g: GossipSub) {.async.} = g.updateScores() proc punishInvalidMessage*(g: GossipSub, peer: PubSubPeer, msg: Message) = - peer.invalidTraffic += byteSize(msg) - g.disconnectIfBadTrafficPeer(peer) + if not peer.uselessAppBytesRate.tryConsume(len(msg.data)): + discard g.disconnectPeer(peer) for tt in msg.topicIds: let t = tt diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 02436a9b4b..e3e32fb55f 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -79,6 +79,8 @@ declarePublicCounter(libp2p_pubsub_received_ihave, "pubsub broadcast ihave", lab declarePublicCounter(libp2p_pubsub_received_graft, "pubsub broadcast graft", labels = ["topic"]) declarePublicCounter(libp2p_pubsub_received_prune, "pubsub broadcast prune", labels = ["topic"]) +declareCounter(libp2p_gossipsub_bad_score_disconnection, "the number of peers disconnected by gossipsub", labels = ["agent"]) + type InitializationError* = object of LPError @@ -603,3 +605,20 @@ proc removeObserver*(p: PubSub; observer: PubSubObserver) {.public.} = let idx = p.observers[].find(observer) if idx != -1: p.observers[].del(idx) + +proc disconnectPeer*(p: PubSub, peer: PubSubPeer) {.async.} = + let agent = peer.getAgent() + libp2p_gossipsub_bad_score_disconnection.inc(labelValues = [agent]) + + try: + await p.switch.disconnect(peer.peerId) + except CatchableError as exc: # Never cancelled + trace "Failed to close connection", peer, error = exc.name, msg = exc.msg + +method shoulDisconnectPeer*(p: PubSub, peer: PubSubPeer, score: float64): bool {.base.} = + doAssert(false, "Not implemented!") + +method disconnectIfBadPeer*(p: PubSub, peer: PubSubPeer, score: float64) {.base.} = + if p.shoulDisconnectPeer(peer, score): + debug "disconnecting bad score peer", peer, score + asyncSpawn(p.disconnectPeer(peer)) \ No newline at end of file diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 9c119aff22..97ece0bc86 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -12,6 +12,7 @@ import std/[sequtils, strutils, tables, hashes, options, sets, deques] import stew/results import chronos, chronicles, nimcrypto/sha2, metrics +import chronos/ratelimit import rpc/[messages, message, protobuf], ../../peerid, ../../peerinfo, @@ -65,9 +66,8 @@ type maxMessageSize: int appScore*: float64 # application specific score behaviourPenalty*: float64 # the eventual penalty score - totalTraffic*: int64 - invalidTraffic*: int64 # data that was parsed by protobuf but was invalid - invalidIgnoredTraffic*: int64 # data that couldn't be parsed by protobuf + uselessAppBytesRate*: TokenBucket + shouldDisconnectPeer*: bool RPCHandler* = proc(peer: PubSubPeer, msg: RPCMsg): Future[void] {.gcsafe, raises: [].} @@ -129,6 +129,9 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = try: try: while not conn.atEof: + if p.shouldDisconnectPeer: + debug "Ignoring peer", conn, peer = p + break trace "waiting for data", conn, peer = p, closed = conn.closed var data = await conn.readLp(p.maxMessageSize) @@ -138,14 +141,22 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = # In this way we count even ignored fields by protobuf let msgSize = len(data) - p.totalTraffic += msgSize var rmsg = decodeRpcMsg(data).valueOr: - p.invalidIgnoredTraffic += msgSize + if not p.uselessAppBytesRate.tryConsume(msgSize): + p.shouldDisconnectPeer = true debug "failed to decode msg from peer", conn, peer = p, closed = conn.closed, err = error break - p.invalidIgnoredTraffic += msgSize - byteSize(rmsg) + + let uselessAppBytesNum = block: + rmsg.control.withValue(control): + msgSize - byteSize(control.ihave) - byteSize(control.iwant) + else: msgSize + + if not p.uselessAppBytesRate.tryConsume(uselessAppBytesNum): + p.shouldDisconnectPeer = true + data = newSeq[byte]() # Release memory trace "decoded msg from peer", @@ -322,6 +333,18 @@ proc new*( codec: codec, peerId: peerId, connectedFut: newFuture[void](), - maxMessageSize: maxMessageSize + maxMessageSize: maxMessageSize, + uselessAppBytesRate: TokenBucket.new(1024, 1.seconds), + shouldDisconnectPeer: false, ) result.sentIHaves.addFirst(default(HashSet[MessageId])) + +proc getAgent*(peer: PubSubPeer): string = + return + when defined(libp2p_agents_metrics): + if peer.shortAgent.len > 0: + peer.shortAgent + else: + "unknown" + else: + "unknown" \ No newline at end of file diff --git a/libp2p/protocols/pubsub/rpc/messages.nim b/libp2p/protocols/pubsub/rpc/messages.nim index 6693716db7..3e82fb3921 100644 --- a/libp2p/protocols/pubsub/rpc/messages.nim +++ b/libp2p/protocols/pubsub/rpc/messages.nim @@ -127,6 +127,21 @@ proc byteSize*(msg: Message): int = total += topicId.len return total +proc byteSize*(ihave: seq[ControlIHave]): int = + var total = 0 + for item in ihave: + total += item.topicId.len + for msgId in item.messageIds: + total += msgId.len + return total + +proc byteSize*(iwant: seq[ControlIWant]): int = + var total = 0 + for item in iwant: + for msgId in item.messageIds: + total += msgId.len + return total + proc byteSize*(msg: RPCMsg): int = var total = 0 @@ -139,14 +154,9 @@ proc byteSize*(msg: RPCMsg): int = if msg.control.isSome: let ctrl = msg.control.get() - for item in ctrl.ihave: - total += item.topicId.len - for msgId in item.messageIds: - total += msgId.len + total += byteSize(ctrl.ihave) - for item in ctrl.iwant: - for msgId in item.messageIds: - total += msgId.len + total += byteSize(ctrl.iwant) for item in ctrl.graft: total += item.topicId.len