From 8845e7e7a3ebf1981304803aeec076a67db3dd82 Mon Sep 17 00:00:00 2001 From: Diego Date: Fri, 23 Jun 2023 15:01:04 +0200 Subject: [PATCH 01/26] Disconnect peers with too high invalid traffic --- libp2p/protocols/pubsub/gossipsub.nim | 9 ++++---- libp2p/protocols/pubsub/gossipsub/scoring.nim | 21 +++++++++++++------ libp2p/protocols/pubsub/pubsubpeer.nim | 8 +++++++ 3 files changed, 28 insertions(+), 10 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index ffca594711..e1a21bb050 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -160,7 +160,7 @@ method onNewPeer(g: GossipSub, peer: PubSubPeer) = peer.behaviourPenalty = stats.behaviourPenalty # Check if the score is below the threshold and disconnect the peer if necessary - g.disconnectBadPeerCheck(peer, stats.score) + discard g.tryDisconnectBadPeer(peer, stats.score, g.parameters.graylistThreshold) peer.iHaveBudget = IHavePeerBudget peer.pingBudget = PingsPeerBudget @@ -305,6 +305,7 @@ proc validateAndRelay(g: GossipSub, msgId, msgIdSalted: MessageId, peer: PubSubPeer) {.async.} = try: + let msgSize = sizeof(msg) let validation = await g.validate(msg) var seenPeers: HashSet[PubSubPeer] @@ -316,7 +317,7 @@ proc validateAndRelay(g: GossipSub, of ValidationResult.Reject: debug "Dropping message after validation, reason: reject", msgId = shortLog(msgId), peer - g.punishInvalidMessage(peer, msg.topicIds) + g.punishInvalidMessage(peer, msg) return of ValidationResult.Ignore: debug "Dropping message after validation, reason: ignore", @@ -442,14 +443,14 @@ method rpcHandler*(g: GossipSub, # always validate if signature is present or required debug "Dropping message due to failed signature verification", msgId = shortLog(msgId), peer - g.punishInvalidMessage(peer, msg.topicIds) + g.punishInvalidMessage(peer, msg) continue if msg.seqno.len > 0 and msg.seqno.len != 8: # if we have seqno should be 8 bytes long debug "Dropping message due to invalid seqno length", msgId = shortLog(msgId), peer - g.punishInvalidMessage(peer, msg.topicIds) + g.punishInvalidMessage(peer, msg) continue # g.anonymize needs no evaluation when receiving messages diff --git a/libp2p/protocols/pubsub/gossipsub/scoring.nim b/libp2p/protocols/pubsub/gossipsub/scoring.nim index 3785e37d1c..2d564f4156 100644 --- a/libp2p/protocols/pubsub/gossipsub/scoring.nim +++ b/libp2p/protocols/pubsub/gossipsub/scoring.nim @@ -13,6 +13,7 @@ import std/[tables, sets] import chronos, chronicles, metrics import "."/[types] import ".."/[pubsubpeer] +import ../rpc/messages import "../../.."/[peerid, multiaddress, switch, utils/heartbeat] logScope: @@ -101,11 +102,13 @@ proc disconnectPeer(g: GossipSub, peer: PubSubPeer) {.async.} = except CatchableError as exc: # Never cancelled trace "Failed to close connection", peer, error = exc.name, msg = exc.msg -proc disconnectBadPeerCheck*(g: GossipSub, peer: PubSubPeer, score: float64) = - if g.parameters.disconnectBadPeers and score < g.parameters.graylistThreshold and +proc tryDisconnectBadPeer*(g: GossipSub, peer: PubSubPeer, score: float64, threshold: float64): bool = + if g.parameters.disconnectBadPeers and score < threshold and peer.peerId notin g.parameters.directPeers: - debug "disconnecting bad score peer", peer, score = peer.score + debug "disconnecting bad score peer", peer, score, threshold asyncSpawn(g.disconnectPeer(peer)) + return true + return false proc updateScores*(g: GossipSub) = # avoid async ## https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#the-score-function @@ -246,7 +249,11 @@ proc updateScores*(g: GossipSub) = # avoid async trace "updated peer's score", peer, score = peer.score, n_topics, is_grafted - g.disconnectBadPeerCheck(peer, stats.score) + if not g.tryDisconnectBadPeer(peer, stats.score, g.parameters.graylistThreshold): + if peer.totalTraffic > 0: + # dividing in this way to avoid integer overflow + let invalidTrafficRatio: float64 = (float64(peer.invalidIgnoredTraffic) / float64(peer.totalTraffic)) + (float64(peer.invalidTraffic) / float64(peer.totalTraffic)) + discard g.tryDisconnectBadPeer(peer, -invalidTrafficRatio, -0.30'f64) #g.parameters.maxInvalidTrafficRatio) libp2p_gossipsub_peers_scores.inc(peer.score, labelValues = [agent]) @@ -260,8 +267,9 @@ proc scoringHeartbeat*(g: GossipSub) {.async.} = trace "running scoring heartbeat", instance = cast[int](g) g.updateScores() -proc punishInvalidMessage*(g: GossipSub, peer: PubSubPeer, topics: seq[string]) = - for tt in topics: +proc punishInvalidMessage*(g: GossipSub, peer: PubSubPeer, msg: Message) = + peer.invalidTraffic += sizeof(msg) + for tt in msg.topicIds: let t = tt if t notin g.topics: continue @@ -276,6 +284,7 @@ proc addCapped*[T](stat: var T, diff, cap: T) = proc rewardDelivered*( g: GossipSub, peer: PubSubPeer, topics: openArray[string], first: bool, delay = ZeroDuration) = + for tt in topics: let t = tt if t notin g.topics: diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 1dcd28286a..22df7aff6d 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -66,6 +66,9 @@ type maxMessageSize: int appScore*: float64 # application specific score behaviourPenalty*: float64 # the eventual penalty score + totalTraffic*: int64 + invalidTraffic*: int64 + invalidIgnoredTraffic*: int64 RPCHandler* = proc(peer: PubSubPeer, msg: RPCMsg): Future[void] {.gcsafe, raises: [].} @@ -134,11 +137,16 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = conn, peer = p, closed = conn.closed, data = data.shortLog + # In this way we count even ignored fields by protobuf + let msgSize = sizeof(data) + p.totalTraffic += msgSize var rmsg = decodeRpcMsg(data).valueOr: + p.invalidIgnoredTraffic += msgSize debug "failed to decode msg from peer", conn, peer = p, closed = conn.closed, err = error break + p.invalidIgnoredTraffic += msgSize - sizeof(rmsg) data = newSeq[byte]() # Release memory trace "decoded msg from peer", From f8ca0f35251ea2e150bbe43df44d0a9d0527376d Mon Sep 17 00:00:00 2001 From: Diego Date: Fri, 23 Jun 2023 17:05:16 +0200 Subject: [PATCH 02/26] Improvements after code review --- libp2p/protocols/pubsub/gossipsub.nim | 8 +++- libp2p/protocols/pubsub/gossipsub/scoring.nim | 12 ++--- libp2p/protocols/pubsub/pubsubpeer.nim | 4 +- libp2p/protocols/pubsub/rpc/messages.nim | 45 +++++++++++++++++++ 4 files changed, 57 insertions(+), 12 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index e1a21bb050..f820af6328 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -160,7 +160,7 @@ method onNewPeer(g: GossipSub, peer: PubSubPeer) = peer.behaviourPenalty = stats.behaviourPenalty # Check if the score is below the threshold and disconnect the peer if necessary - discard g.tryDisconnectBadPeer(peer, stats.score, g.parameters.graylistThreshold) + g.disconnectIfBadPeer(peer, stats.score, g.parameters.graylistThreshold) peer.iHaveBudget = IHavePeerBudget peer.pingBudget = PingsPeerBudget @@ -380,6 +380,12 @@ proc validateAndRelay(g: GossipSub, method rpcHandler*(g: GossipSub, peer: PubSubPeer, rpcMsg: RPCMsg) {.async.} = + + if peer.totalTraffic > 0: + # dividing in this way to avoid integer overflow + let invalidTrafficRatio: float64 = (float64(peer.invalidIgnoredTraffic) / float64(peer.totalTraffic)) + (float64(peer.invalidTraffic) / float64(peer.totalTraffic)) + g.disconnectIfBadPeer(peer, -invalidTrafficRatio, -0.30'f64) #g.parameters.maxInvalidTrafficRatio) + if rpcMsg.ping.len in 1..<64 and peer.pingBudget > 0: g.send(peer, RPCMsg(pong: rpcMsg.ping)) peer.pingBudget.dec diff --git a/libp2p/protocols/pubsub/gossipsub/scoring.nim b/libp2p/protocols/pubsub/gossipsub/scoring.nim index 2d564f4156..ebf05ce127 100644 --- a/libp2p/protocols/pubsub/gossipsub/scoring.nim +++ b/libp2p/protocols/pubsub/gossipsub/scoring.nim @@ -102,13 +102,11 @@ proc disconnectPeer(g: GossipSub, peer: PubSubPeer) {.async.} = except CatchableError as exc: # Never cancelled trace "Failed to close connection", peer, error = exc.name, msg = exc.msg -proc tryDisconnectBadPeer*(g: GossipSub, peer: PubSubPeer, score: float64, threshold: float64): bool = +proc disconnectIfBadPeer*(g: GossipSub, peer: PubSubPeer, score: float64, threshold: float64) = if g.parameters.disconnectBadPeers and score < threshold and peer.peerId notin g.parameters.directPeers: debug "disconnecting bad score peer", peer, score, threshold asyncSpawn(g.disconnectPeer(peer)) - return true - return false proc updateScores*(g: GossipSub) = # avoid async ## https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#the-score-function @@ -249,11 +247,7 @@ proc updateScores*(g: GossipSub) = # avoid async trace "updated peer's score", peer, score = peer.score, n_topics, is_grafted - if not g.tryDisconnectBadPeer(peer, stats.score, g.parameters.graylistThreshold): - if peer.totalTraffic > 0: - # dividing in this way to avoid integer overflow - let invalidTrafficRatio: float64 = (float64(peer.invalidIgnoredTraffic) / float64(peer.totalTraffic)) + (float64(peer.invalidTraffic) / float64(peer.totalTraffic)) - discard g.tryDisconnectBadPeer(peer, -invalidTrafficRatio, -0.30'f64) #g.parameters.maxInvalidTrafficRatio) + g.disconnectIfBadPeer(peer, stats.score, g.parameters.graylistThreshold) libp2p_gossipsub_peers_scores.inc(peer.score, labelValues = [agent]) @@ -268,7 +262,7 @@ proc scoringHeartbeat*(g: GossipSub) {.async.} = g.updateScores() proc punishInvalidMessage*(g: GossipSub, peer: PubSubPeer, msg: Message) = - peer.invalidTraffic += sizeof(msg) + peer.invalidTraffic += byteSize(msg) for tt in msg.topicIds: let t = tt if t notin g.topics: diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 22df7aff6d..2b76b13c75 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -138,7 +138,7 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = data = data.shortLog # In this way we count even ignored fields by protobuf - let msgSize = sizeof(data) + let msgSize = len(data) p.totalTraffic += msgSize var rmsg = decodeRpcMsg(data).valueOr: p.invalidIgnoredTraffic += msgSize @@ -146,7 +146,7 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = conn, peer = p, closed = conn.closed, err = error break - p.invalidIgnoredTraffic += msgSize - sizeof(rmsg) + p.invalidIgnoredTraffic += msgSize - byteSize(rmsg.get()) data = newSeq[byte]() # Release memory trace "decoded msg from peer", diff --git a/libp2p/protocols/pubsub/rpc/messages.nim b/libp2p/protocols/pubsub/rpc/messages.nim index ce6dd318bc..3c22769f6f 100644 --- a/libp2p/protocols/pubsub/rpc/messages.nim +++ b/libp2p/protocols/pubsub/rpc/messages.nim @@ -116,3 +116,48 @@ func shortLog*(m: RPCMsg): auto = messages: mapIt(m.messages, it.shortLog), control: m.control.get(ControlMessage()).shortLog ) + +proc byteSize*(msg: Message): int = + var total = 0 + total += msg.fromPeer.len + total += msg.data.len + total += msg.seqno.len + total += msg.signature.len + total += msg.key.len + for topicId in msg.topicIds: + total += topicId.len + return total + +proc byteSize*(msg: RPCMsg): int = + var total = 0 + + for sub in msg.subscriptions: + total += sub.topic.len + total += sizeof(sub.subscribe) + + for msg in msg.messages: + total += byteSize(msg) + + if msg.control.isSome: + let ctrl = msg.control.get() + for item in ctrl.ihave: + total += item.topicId.len + total += item.messageIds.len * sizeof(byte) # Assuming MessageId is seq[byte] + + for item in ctrl.iwant: + total += item.messageIds.len * sizeof(byte) # Assuming MessageId is seq[byte] + + for item in ctrl.graft: + total += item.topicId.len + + for item in ctrl.prune: + total += item.topicId.len + total += sizeof(item.backoff) + for peer in item.peers: + total += peer.peerId.len + total += peer.signedPeerRecord.len * sizeof(byte) + + total += msg.ping.len * sizeof(byte) + total += msg.pong.len * sizeof(byte) + + return total From 2dcfa1ae3ef01fa19ac9a821c9b09e5fa79c2907 Mon Sep 17 00:00:00 2001 From: Diego Date: Mon, 26 Jun 2023 16:24:52 +0200 Subject: [PATCH 03/26] Disconnect peers that sent only non-parseable msgs --- libp2p/protocols/pubsub/gossipsub.nim | 7 +-- libp2p/protocols/pubsub/gossipsub/scoring.nim | 53 +++++++++++-------- libp2p/protocols/pubsub/pubsubpeer.nim | 6 +-- 3 files changed, 35 insertions(+), 31 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index f820af6328..effc3541eb 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -160,7 +160,7 @@ method onNewPeer(g: GossipSub, peer: PubSubPeer) = peer.behaviourPenalty = stats.behaviourPenalty # Check if the score is below the threshold and disconnect the peer if necessary - g.disconnectIfBadPeer(peer, stats.score, g.parameters.graylistThreshold) + discard g.disconnectIfBadPeer(peer, stats.score, g.parameters.graylistThreshold) peer.iHaveBudget = IHavePeerBudget peer.pingBudget = PingsPeerBudget @@ -381,11 +381,6 @@ method rpcHandler*(g: GossipSub, peer: PubSubPeer, rpcMsg: RPCMsg) {.async.} = - if peer.totalTraffic > 0: - # dividing in this way to avoid integer overflow - let invalidTrafficRatio: float64 = (float64(peer.invalidIgnoredTraffic) / float64(peer.totalTraffic)) + (float64(peer.invalidTraffic) / float64(peer.totalTraffic)) - g.disconnectIfBadPeer(peer, -invalidTrafficRatio, -0.30'f64) #g.parameters.maxInvalidTrafficRatio) - if rpcMsg.ping.len in 1..<64 and peer.pingBudget > 0: g.send(peer, RPCMsg(pong: rpcMsg.ping)) peer.pingBudget.dec diff --git a/libp2p/protocols/pubsub/gossipsub/scoring.nim b/libp2p/protocols/pubsub/gossipsub/scoring.nim index ebf05ce127..c26805e716 100644 --- a/libp2p/protocols/pubsub/gossipsub/scoring.nim +++ b/libp2p/protocols/pubsub/gossipsub/scoring.nim @@ -28,6 +28,8 @@ declareGauge(libp2p_gossipsub_peers_score_invalidMessageDeliveries, "Detailed go declareGauge(libp2p_gossipsub_peers_score_appScore, "Detailed gossipsub scoring metric", labels = ["agent"]) declareGauge(libp2p_gossipsub_peers_score_behaviourPenalty, "Detailed gossipsub scoring metric", labels = ["agent"]) declareGauge(libp2p_gossipsub_peers_score_colocationFactor, "Detailed gossipsub scoring metric", labels = ["agent"]) +declareGauge(libp2p_gossipsub_peers_score_invalidIgnoredTrafficRatio, "Invalid Ignored Traffic Ratio", labels = ["agent"]) +declareGauge(libp2p_gossipsub_peers_score_invalidTrafficRatio, "Invalid Traffic Ratio", labels = ["agent"]) proc init*(_: type[TopicParams]): TopicParams = TopicParams( @@ -86,15 +88,18 @@ proc colocationFactor(g: GossipSub, peer: PubSubPeer): float64 = {.pop.} -proc disconnectPeer(g: GossipSub, peer: PubSubPeer) {.async.} = - let agent = - when defined(libp2p_agents_metrics): +proc getAgent(peer: PubSubPeer): string = + return + when defined(libp2p_agents_metrics): if peer.shortAgent.len > 0: peer.shortAgent else: "unknown" else: - "unknown" + "unknown" + +proc disconnectPeer(g: GossipSub, peer: PubSubPeer) {.async.} = + let agent = getAgent(peer) libp2p_gossipsub_bad_score_disconnection.inc(labelValues = [agent]) try: @@ -102,11 +107,25 @@ proc disconnectPeer(g: GossipSub, peer: PubSubPeer) {.async.} = except CatchableError as exc: # Never cancelled trace "Failed to close connection", peer, error = exc.name, msg = exc.msg -proc disconnectIfBadPeer*(g: GossipSub, peer: PubSubPeer, score: float64, threshold: float64) = +proc disconnectIfBadPeer*(g: GossipSub, peer: PubSubPeer, score: float64, threshold: float64): bool = if g.parameters.disconnectBadPeers and score < threshold and peer.peerId notin g.parameters.directPeers: debug "disconnecting bad score peer", peer, score, threshold asyncSpawn(g.disconnectPeer(peer)) + return true + + return false + +proc disconnectIfBadTrafficPeer(g: GossipSub, peer: PubSubPeer) = + if peer.totalTraffic > 0: + # dividing in this way to avoid integer overflow + let agent = getAgent(peer) + let invalidTrafficRatio: float64 = float64(peer.invalidTraffic) / float64(peer.totalTraffic) + let invalidIgnoredTrafficRatio: float64 = float64(peer.invalidIgnoredTraffic) / float64(peer.totalTraffic) + let totalInvalidTrafficRatio: float64 = invalidTrafficRatio + invalidIgnoredTrafficRatio + libp2p_gossipsub_peers_score_invalidTrafficRatio.set(invalidTrafficRatio, labelValues = [agent]) + libp2p_gossipsub_peers_score_invalidIgnoredTrafficRatio.set(invalidIgnoredTrafficRatio, labelValues = [agent]) + discard g.disconnectIfBadPeer(peer, -totalInvalidTrafficRatio, -0.30'f64) #g.parameters.maxInvalidTrafficRatio) proc updateScores*(g: GossipSub) = # avoid async ## https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#the-score-function @@ -176,14 +195,7 @@ proc updateScores*(g: GossipSub) = # avoid async score += topicScore * topicParams.topicWeight # Score metrics - let agent = - when defined(libp2p_agents_metrics): - if peer.shortAgent.len > 0: - peer.shortAgent - else: - "unknown" - else: - "unknown" + let agent = getAgent(peer) libp2p_gossipsub_peers_score_firstMessageDeliveries.inc(info.firstMessageDeliveries, labelValues = [agent]) libp2p_gossipsub_peers_score_meshMessageDeliveries.inc(info.meshMessageDeliveries, labelValues = [agent]) libp2p_gossipsub_peers_score_meshFailurePenalty.inc(info.meshFailurePenalty, labelValues = [agent]) @@ -220,14 +232,7 @@ proc updateScores*(g: GossipSub) = # avoid async score += colocationFactor * g.parameters.ipColocationFactorWeight # Score metrics - let agent = - when defined(libp2p_agents_metrics): - if peer.shortAgent.len > 0: - peer.shortAgent - else: - "unknown" - else: - "unknown" + let agent = getAgent(peer) libp2p_gossipsub_peers_score_appScore.inc(peer.appScore, labelValues = [agent]) libp2p_gossipsub_peers_score_behaviourPenalty.inc(peer.behaviourPenalty, labelValues = [agent]) libp2p_gossipsub_peers_score_colocationFactor.inc(colocationFactor, labelValues = [agent]) @@ -247,7 +252,9 @@ proc updateScores*(g: GossipSub) = # avoid async trace "updated peer's score", peer, score = peer.score, n_topics, is_grafted - g.disconnectIfBadPeer(peer, stats.score, g.parameters.graylistThreshold) + if not g.disconnectIfBadPeer(peer, stats.score, g.parameters.graylistThreshold): + # this is necessary to disconnect peers that send only non-parseable messages as the message won't go through the normal path + g.disconnectIfBadTrafficPeer(peer) libp2p_gossipsub_peers_scores.inc(peer.score, labelValues = [agent]) @@ -263,6 +270,8 @@ proc scoringHeartbeat*(g: GossipSub) {.async.} = proc punishInvalidMessage*(g: GossipSub, peer: PubSubPeer, msg: Message) = peer.invalidTraffic += byteSize(msg) + g.disconnectIfBadTrafficPeer(peer) + for tt in msg.topicIds: let t = tt if t notin g.topics: diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 2b76b13c75..b6e67652a0 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -67,8 +67,8 @@ type appScore*: float64 # application specific score behaviourPenalty*: float64 # the eventual penalty score totalTraffic*: int64 - invalidTraffic*: int64 - invalidIgnoredTraffic*: int64 + invalidTraffic*: int64 # data that was parsed by protobuf but was invalid + invalidIgnoredTraffic*: int64 # data that couldn't be parsed by protobuf RPCHandler* = proc(peer: PubSubPeer, msg: RPCMsg): Future[void] {.gcsafe, raises: [].} @@ -146,7 +146,7 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = conn, peer = p, closed = conn.closed, err = error break - p.invalidIgnoredTraffic += msgSize - byteSize(rmsg.get()) + p.invalidIgnoredTraffic += msgSize - byteSize(rmsg) data = newSeq[byte]() # Release memory trace "decoded msg from peer", From 3fec42cb5242df5e9441fcb883156208424e478f Mon Sep 17 00:00:00 2001 From: Diego Date: Mon, 3 Jul 2023 19:03:08 +0200 Subject: [PATCH 04/26] calculate average traffic --- libp2p/protocols/pubsub/gossipsub/scoring.nim | 25 +++++++++++++------ 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub/scoring.nim b/libp2p/protocols/pubsub/gossipsub/scoring.nim index c26805e716..2edf0d2bbe 100644 --- a/libp2p/protocols/pubsub/gossipsub/scoring.nim +++ b/libp2p/protocols/pubsub/gossipsub/scoring.nim @@ -28,8 +28,8 @@ declareGauge(libp2p_gossipsub_peers_score_invalidMessageDeliveries, "Detailed go declareGauge(libp2p_gossipsub_peers_score_appScore, "Detailed gossipsub scoring metric", labels = ["agent"]) declareGauge(libp2p_gossipsub_peers_score_behaviourPenalty, "Detailed gossipsub scoring metric", labels = ["agent"]) declareGauge(libp2p_gossipsub_peers_score_colocationFactor, "Detailed gossipsub scoring metric", labels = ["agent"]) -declareGauge(libp2p_gossipsub_peers_score_invalidIgnoredTrafficRatio, "Invalid Ignored Traffic Ratio", labels = ["agent"]) -declareGauge(libp2p_gossipsub_peers_score_invalidTrafficRatio, "Invalid Traffic Ratio", labels = ["agent"]) +declareGauge(libp2p_gossipsub_peers_score_averageInvalidIgnoredTrafficRatio, "Average Invalid Ignored Traffic Ratio", labels = ["agent"]) +declareGauge(libp2p_gossipsub_peers_score_averageInvalidTrafficRatio, "Average Invalid Traffic Ratio", labels = ["agent"]) proc init*(_: type[TopicParams]): TopicParams = TopicParams( @@ -98,6 +98,14 @@ proc getAgent(peer: PubSubPeer): string = else: "unknown" +proc numberOfPeersForAgent(g: GossipSub, agent: string): int = + var count = 0 + for peerId in g.peers.keys: + let peer = g.peers.getOrDefault(peerId) + if getAgent(peer) == agent: + count += 1 + return count + proc disconnectPeer(g: GossipSub, peer: PubSubPeer) {.async.} = let agent = getAgent(peer) libp2p_gossipsub_bad_score_disconnection.inc(labelValues = [agent]) @@ -120,11 +128,14 @@ proc disconnectIfBadTrafficPeer(g: GossipSub, peer: PubSubPeer) = if peer.totalTraffic > 0: # dividing in this way to avoid integer overflow let agent = getAgent(peer) - let invalidTrafficRatio: float64 = float64(peer.invalidTraffic) / float64(peer.totalTraffic) - let invalidIgnoredTrafficRatio: float64 = float64(peer.invalidIgnoredTraffic) / float64(peer.totalTraffic) - let totalInvalidTrafficRatio: float64 = invalidTrafficRatio + invalidIgnoredTrafficRatio - libp2p_gossipsub_peers_score_invalidTrafficRatio.set(invalidTrafficRatio, labelValues = [agent]) - libp2p_gossipsub_peers_score_invalidIgnoredTrafficRatio.set(invalidIgnoredTrafficRatio, labelValues = [agent]) + let invalidTrafficRatio = float64(peer.invalidTraffic) / float64(peer.totalTraffic) + let invalidIgnoredTrafficRatio = float64(peer.invalidIgnoredTraffic) / float64(peer.totalTraffic) + let totalInvalidTrafficRatio = invalidTrafficRatio + invalidIgnoredTrafficRatio + let numberOfPeersForAgent = float64(numberOfPeersForAgent(g, agent)) + libp2p_gossipsub_peers_score_averageInvalidTrafficRatio + .inc(invalidTrafficRatio / (if numberOfPeersForAgent != 0: numberOfPeersForAgent else: 1), labelValues = [agent]) + libp2p_gossipsub_peers_score_averageInvalidIgnoredTrafficRatio + .inc(invalidIgnoredTrafficRatio / (if numberOfPeersForAgent != 0: numberOfPeersForAgent else: 1), labelValues = [agent]) discard g.disconnectIfBadPeer(peer, -totalInvalidTrafficRatio, -0.30'f64) #g.parameters.maxInvalidTrafficRatio) proc updateScores*(g: GossipSub) = # avoid async From c3ef6d7d36a32630f65cfb7d556cd6d7092fde71 Mon Sep 17 00:00:00 2001 From: Diego Date: Tue, 4 Jul 2023 15:26:10 +0200 Subject: [PATCH 05/26] fix metrics and add tests --- libp2p/protocols/pubsub/gossipsub/scoring.nim | 13 +++--- libp2p/protocols/pubsub/rpc/messages.nim | 6 ++- tests/pubsub/testmessage.nim | 44 +++++++++++++++++++ 3 files changed, 55 insertions(+), 8 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub/scoring.nim b/libp2p/protocols/pubsub/gossipsub/scoring.nim index 2edf0d2bbe..7d52b124ec 100644 --- a/libp2p/protocols/pubsub/gossipsub/scoring.nim +++ b/libp2p/protocols/pubsub/gossipsub/scoring.nim @@ -28,8 +28,9 @@ declareGauge(libp2p_gossipsub_peers_score_invalidMessageDeliveries, "Detailed go declareGauge(libp2p_gossipsub_peers_score_appScore, "Detailed gossipsub scoring metric", labels = ["agent"]) declareGauge(libp2p_gossipsub_peers_score_behaviourPenalty, "Detailed gossipsub scoring metric", labels = ["agent"]) declareGauge(libp2p_gossipsub_peers_score_colocationFactor, "Detailed gossipsub scoring metric", labels = ["agent"]) -declareGauge(libp2p_gossipsub_peers_score_averageInvalidIgnoredTrafficRatio, "Average Invalid Ignored Traffic Ratio", labels = ["agent"]) -declareGauge(libp2p_gossipsub_peers_score_averageInvalidTrafficRatio, "Average Invalid Traffic Ratio", labels = ["agent"]) +declareGauge(libp2p_gossipsub_peers_score_invalidIgnoredTrafficMB, "Invalid Ignored Traffic (MB)", labels = ["agent"]) +declareGauge(libp2p_gossipsub_peers_score_invalidTrafficMB, "Invalid Traffic (MB)", labels = ["agent"]) +declareGauge(libp2p_gossipsub_peers_score_totalTrafficMB, "Total Traffic (MB)", labels = ["agent"]) proc init*(_: type[TopicParams]): TopicParams = TopicParams( @@ -132,10 +133,10 @@ proc disconnectIfBadTrafficPeer(g: GossipSub, peer: PubSubPeer) = let invalidIgnoredTrafficRatio = float64(peer.invalidIgnoredTraffic) / float64(peer.totalTraffic) let totalInvalidTrafficRatio = invalidTrafficRatio + invalidIgnoredTrafficRatio let numberOfPeersForAgent = float64(numberOfPeersForAgent(g, agent)) - libp2p_gossipsub_peers_score_averageInvalidTrafficRatio - .inc(invalidTrafficRatio / (if numberOfPeersForAgent != 0: numberOfPeersForAgent else: 1), labelValues = [agent]) - libp2p_gossipsub_peers_score_averageInvalidIgnoredTrafficRatio - .inc(invalidIgnoredTrafficRatio / (if numberOfPeersForAgent != 0: numberOfPeersForAgent else: 1), labelValues = [agent]) + libp2p_gossipsub_peers_score_invalidTrafficMB.inc(float64(peer.invalidTraffic) / 1_000_000, labelValues = [agent]) + libp2p_gossipsub_peers_score_invalidIgnoredTrafficMB.inc(float64(peer.invalidIgnoredTraffic) / 1_000_000, labelValues = [agent]) + libp2p_gossipsub_peers_score_totalTrafficMB.inc(float64(peer.totalTraffic) / 1_000_000, labelValues = [agent]) + discard g.disconnectIfBadPeer(peer, -totalInvalidTrafficRatio, -0.30'f64) #g.parameters.maxInvalidTrafficRatio) proc updateScores*(g: GossipSub) = # avoid async diff --git a/libp2p/protocols/pubsub/rpc/messages.nim b/libp2p/protocols/pubsub/rpc/messages.nim index 3c22769f6f..6222a0833a 100644 --- a/libp2p/protocols/pubsub/rpc/messages.nim +++ b/libp2p/protocols/pubsub/rpc/messages.nim @@ -142,10 +142,12 @@ proc byteSize*(msg: RPCMsg): int = let ctrl = msg.control.get() for item in ctrl.ihave: total += item.topicId.len - total += item.messageIds.len * sizeof(byte) # Assuming MessageId is seq[byte] + for msgId in item.messageIds: + total += msgId.len for item in ctrl.iwant: - total += item.messageIds.len * sizeof(byte) # Assuming MessageId is seq[byte] + for msgId in item.messageIds: + total += msgId.len for item in ctrl.graft: total += item.topicId.len diff --git a/tests/pubsub/testmessage.nim b/tests/pubsub/testmessage.nim index 7bc4b267a3..99310e7fbe 100644 --- a/tests/pubsub/testmessage.nim +++ b/tests/pubsub/testmessage.nim @@ -73,3 +73,47 @@ suite "Message": check: msgIdResult.isErr msgIdResult.error == ValidationResult.Reject + + test "byteSize for Message": + var msg = Message( + fromPeer: PeerId(data: @[]), # Empty seq[byte] + data: @[1'u8, 2, 3], # 3 bytes + seqno: @[1'u8], # 1 byte + signature: @[], # Empty seq[byte] + key: @[1'u8], # 1 byte + topicIds: @["abc", "defgh"] # 3 + 5 = 8 bytes + ) + + check byteSize(msg) == 3 + 1 + 1 + 8 # Total: 13 bytes + + test "byteSize for RPCMsg": + var msg = RPCMsg( + subscriptions: @[ + SubOpts(topic: "abc", subscribe: true), + SubOpts(topic: "def", subscribe: false) + ], # 3 + 3 + 2 * sizeof(bool) bytes + messages: @[ + Message(fromPeer: PeerId(data: @[]), data: @[1'u8, 2, 3], seqno: @[1'u8], signature: @[], key: @[1'u8], topicIds: @["abc", "defgh"]), + Message(fromPeer: PeerId(data: @[]), data: @[], seqno: @[], signature: @[], key: @[], topicIds: @["abc"]) + ], # byteSize: 13 + 3 = 16 bytes + control: some(ControlMessage( + ihave: @[ + ControlIHave(topicId: "ghi", messageIds: @[@[1'u8, 2, 3]]) + ], # 3 + 3 bytes + iwant: @[ + ControlIWant(messageIds: @[@[1'u8, 2]]) + ], # 2 bytes + graft: @[ + ControlGraft(topicId: "jkl") + ], # 3 bytes + prune: @[ + ControlPrune(topicId: "mno", peers: @[PeerInfoMsg(peerId: PeerId(data: @[]), signedPeerRecord: @[])], backoff: 1) + ] # 3 + sizeof(uint64) bytes + )), + ping: @[], # Empty seq[byte] + pong: @[] # Empty seq[byte] + ) + + let boolSize = sizeof(bool) + let uint64Size = sizeof(uint64) + check byteSize(msg) == (3 + 3 + 2 * boolSize) + 16 + (3 + 3 + 2 + 3 + 3 + uint64Size) \ No newline at end of file From dfe9fe1068c697c0f7ba85c6a2e89e42a41e1194 Mon Sep 17 00:00:00 2001 From: Diego Date: Tue, 4 Jul 2023 17:15:11 +0200 Subject: [PATCH 06/26] number of bad traffic score peers disconnected --- libp2p/protocols/pubsub/gossipsub/scoring.nim | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/libp2p/protocols/pubsub/gossipsub/scoring.nim b/libp2p/protocols/pubsub/gossipsub/scoring.nim index 7d52b124ec..10e8f9047e 100644 --- a/libp2p/protocols/pubsub/gossipsub/scoring.nim +++ b/libp2p/protocols/pubsub/gossipsub/scoring.nim @@ -31,6 +31,7 @@ declareGauge(libp2p_gossipsub_peers_score_colocationFactor, "Detailed gossipsub declareGauge(libp2p_gossipsub_peers_score_invalidIgnoredTrafficMB, "Invalid Ignored Traffic (MB)", labels = ["agent"]) declareGauge(libp2p_gossipsub_peers_score_invalidTrafficMB, "Invalid Traffic (MB)", labels = ["agent"]) declareGauge(libp2p_gossipsub_peers_score_totalTrafficMB, "Total Traffic (MB)", labels = ["agent"]) +declareCounter(libp2p_gossipsub_peers_score_badTrafficScorePeersDisconnection, "The number of peers disconnected by gossipsub because of bad traffic", labels = ["agent"]) proc init*(_: type[TopicParams]): TopicParams = TopicParams( @@ -137,7 +138,9 @@ proc disconnectIfBadTrafficPeer(g: GossipSub, peer: PubSubPeer) = libp2p_gossipsub_peers_score_invalidIgnoredTrafficMB.inc(float64(peer.invalidIgnoredTraffic) / 1_000_000, labelValues = [agent]) libp2p_gossipsub_peers_score_totalTrafficMB.inc(float64(peer.totalTraffic) / 1_000_000, labelValues = [agent]) - discard g.disconnectIfBadPeer(peer, -totalInvalidTrafficRatio, -0.30'f64) #g.parameters.maxInvalidTrafficRatio) + if g.disconnectIfBadPeer(peer, -totalInvalidTrafficRatio, -0.30'f64): #g.parameters.maxInvalidTrafficRatio) + libp2p_gossipsub_peers_score_badTrafficScorePeersDisconnection.inc(labelValues = [getAgent(peer)]) + debug "Bad traffic score peer disconnected", peer proc updateScores*(g: GossipSub) = # avoid async ## https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#the-score-function From 6f53761a6479e60ec7e53228b3373a73b8046b09 Mon Sep 17 00:00:00 2001 From: Diego Date: Tue, 4 Jul 2023 22:18:13 +0200 Subject: [PATCH 07/26] rename metric and create invalidTrafficRatioThreshold param --- libp2p/protocols/pubsub/gossipsub/scoring.nim | 6 +++--- libp2p/protocols/pubsub/gossipsub/types.nim | 1 + 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub/scoring.nim b/libp2p/protocols/pubsub/gossipsub/scoring.nim index 10e8f9047e..77c6fbbbda 100644 --- a/libp2p/protocols/pubsub/gossipsub/scoring.nim +++ b/libp2p/protocols/pubsub/gossipsub/scoring.nim @@ -31,7 +31,7 @@ declareGauge(libp2p_gossipsub_peers_score_colocationFactor, "Detailed gossipsub declareGauge(libp2p_gossipsub_peers_score_invalidIgnoredTrafficMB, "Invalid Ignored Traffic (MB)", labels = ["agent"]) declareGauge(libp2p_gossipsub_peers_score_invalidTrafficMB, "Invalid Traffic (MB)", labels = ["agent"]) declareGauge(libp2p_gossipsub_peers_score_totalTrafficMB, "Total Traffic (MB)", labels = ["agent"]) -declareCounter(libp2p_gossipsub_peers_score_badTrafficScorePeersDisconnection, "The number of peers disconnected by gossipsub because of bad traffic", labels = ["agent"]) +declareCounter(libp2p_gossipsub_peers_score_badTrafficScorePeerDisconnections, "The number of peer disconnections by gossipsub because of bad traffic", labels = ["agent"]) proc init*(_: type[TopicParams]): TopicParams = TopicParams( @@ -138,8 +138,8 @@ proc disconnectIfBadTrafficPeer(g: GossipSub, peer: PubSubPeer) = libp2p_gossipsub_peers_score_invalidIgnoredTrafficMB.inc(float64(peer.invalidIgnoredTraffic) / 1_000_000, labelValues = [agent]) libp2p_gossipsub_peers_score_totalTrafficMB.inc(float64(peer.totalTraffic) / 1_000_000, labelValues = [agent]) - if g.disconnectIfBadPeer(peer, -totalInvalidTrafficRatio, -0.30'f64): #g.parameters.maxInvalidTrafficRatio) - libp2p_gossipsub_peers_score_badTrafficScorePeersDisconnection.inc(labelValues = [getAgent(peer)]) + if g.disconnectIfBadPeer(peer, -totalInvalidTrafficRatio, g.parameters.invalidTrafficRatioThreshold): + libp2p_gossipsub_peers_score_badTrafficScorePeerDisconnections.inc(labelValues = [getAgent(peer)]) debug "Bad traffic score peer disconnected", peer proc updateScores*(g: GossipSub) = # avoid async diff --git a/libp2p/protocols/pubsub/gossipsub/types.nim b/libp2p/protocols/pubsub/gossipsub/types.nim index 25a9e636a8..a5edc154ad 100644 --- a/libp2p/protocols/pubsub/gossipsub/types.nim +++ b/libp2p/protocols/pubsub/gossipsub/types.nim @@ -141,6 +141,7 @@ type disconnectBadPeers*: bool enablePX*: bool + invalidTrafficRatioThreshold*: float64 bandwidthEstimatebps*: int # This is currently used only for limting flood publishing. 0 disables flood-limiting completely iwantTimeout*: Duration From df78830c11f27cc41162977b193eff50748c715d Mon Sep 17 00:00:00 2001 From: Diego Date: Tue, 25 Jul 2023 15:19:08 +0200 Subject: [PATCH 08/26] Disconnect peers which have bad traffic score right after they try to connect --- libp2p/protocols/pubsub/gossipsub.nim | 3 ++- libp2p/protocols/pubsub/gossipsub/scoring.nim | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index effc3541eb..6d171d5cbf 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -160,7 +160,8 @@ method onNewPeer(g: GossipSub, peer: PubSubPeer) = peer.behaviourPenalty = stats.behaviourPenalty # Check if the score is below the threshold and disconnect the peer if necessary - discard g.disconnectIfBadPeer(peer, stats.score, g.parameters.graylistThreshold) + if not g.disconnectIfBadPeer(peer, stats.score, g.parameters.graylistThreshold): + g.disconnectIfBadTrafficPeer(peer) peer.iHaveBudget = IHavePeerBudget peer.pingBudget = PingsPeerBudget diff --git a/libp2p/protocols/pubsub/gossipsub/scoring.nim b/libp2p/protocols/pubsub/gossipsub/scoring.nim index 77c6fbbbda..5ec07c1f97 100644 --- a/libp2p/protocols/pubsub/gossipsub/scoring.nim +++ b/libp2p/protocols/pubsub/gossipsub/scoring.nim @@ -126,7 +126,7 @@ proc disconnectIfBadPeer*(g: GossipSub, peer: PubSubPeer, score: float64, thresh return false -proc disconnectIfBadTrafficPeer(g: GossipSub, peer: PubSubPeer) = +proc disconnectIfBadTrafficPeer*(g: GossipSub, peer: PubSubPeer) = if peer.totalTraffic > 0: # dividing in this way to avoid integer overflow let agent = getAgent(peer) From a2ebbad60c1c5a329c957e1d44a7e7e55258fe30 Mon Sep 17 00:00:00 2001 From: Diego Date: Tue, 25 Jul 2023 16:13:54 +0200 Subject: [PATCH 09/26] Improvements after my own review --- libp2p/protocols/pubsub/gossipsub.nim | 1 - libp2p/protocols/pubsub/gossipsub/scoring.nim | 31 +++++++------------ 2 files changed, 11 insertions(+), 21 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 6d171d5cbf..f38e2fc65e 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -306,7 +306,6 @@ proc validateAndRelay(g: GossipSub, msgId, msgIdSalted: MessageId, peer: PubSubPeer) {.async.} = try: - let msgSize = sizeof(msg) let validation = await g.validate(msg) var seenPeers: HashSet[PubSubPeer] diff --git a/libp2p/protocols/pubsub/gossipsub/scoring.nim b/libp2p/protocols/pubsub/gossipsub/scoring.nim index 5ec07c1f97..0c6c938cf5 100644 --- a/libp2p/protocols/pubsub/gossipsub/scoring.nim +++ b/libp2p/protocols/pubsub/gossipsub/scoring.nim @@ -28,10 +28,10 @@ declareGauge(libp2p_gossipsub_peers_score_invalidMessageDeliveries, "Detailed go declareGauge(libp2p_gossipsub_peers_score_appScore, "Detailed gossipsub scoring metric", labels = ["agent"]) declareGauge(libp2p_gossipsub_peers_score_behaviourPenalty, "Detailed gossipsub scoring metric", labels = ["agent"]) declareGauge(libp2p_gossipsub_peers_score_colocationFactor, "Detailed gossipsub scoring metric", labels = ["agent"]) -declareGauge(libp2p_gossipsub_peers_score_invalidIgnoredTrafficMB, "Invalid Ignored Traffic (MB)", labels = ["agent"]) -declareGauge(libp2p_gossipsub_peers_score_invalidTrafficMB, "Invalid Traffic (MB)", labels = ["agent"]) -declareGauge(libp2p_gossipsub_peers_score_totalTrafficMB, "Total Traffic (MB)", labels = ["agent"]) -declareCounter(libp2p_gossipsub_peers_score_badTrafficScorePeerDisconnections, "The number of peer disconnections by gossipsub because of bad traffic", labels = ["agent"]) +declareGauge(libp2p_gossipsub_peers_invalidIgnoredTrafficMB, "Invalid Ignored Traffic (MB)", labels = ["agent"]) +declareGauge(libp2p_gossipsub_peers_invalidTrafficMB, "Invalid Traffic (MB)", labels = ["agent"]) +declareGauge(libp2p_gossipsub_peers_totalTrafficMB, "Total Traffic (MB)", labels = ["agent"]) +declareCounter(libp2p_gossipsub_peers_badTrafficScorePeerDisconnections, "The number of peer disconnections by gossipsub because of bad traffic", labels = ["agent"]) proc init*(_: type[TopicParams]): TopicParams = TopicParams( @@ -91,22 +91,14 @@ proc colocationFactor(g: GossipSub, peer: PubSubPeer): float64 = {.pop.} proc getAgent(peer: PubSubPeer): string = - return - when defined(libp2p_agents_metrics): + return + when defined(libp2p_agents_metrics): if peer.shortAgent.len > 0: peer.shortAgent else: "unknown" else: - "unknown" - -proc numberOfPeersForAgent(g: GossipSub, agent: string): int = - var count = 0 - for peerId in g.peers.keys: - let peer = g.peers.getOrDefault(peerId) - if getAgent(peer) == agent: - count += 1 - return count + "unknown" proc disconnectPeer(g: GossipSub, peer: PubSubPeer) {.async.} = let agent = getAgent(peer) @@ -133,13 +125,12 @@ proc disconnectIfBadTrafficPeer*(g: GossipSub, peer: PubSubPeer) = let invalidTrafficRatio = float64(peer.invalidTraffic) / float64(peer.totalTraffic) let invalidIgnoredTrafficRatio = float64(peer.invalidIgnoredTraffic) / float64(peer.totalTraffic) let totalInvalidTrafficRatio = invalidTrafficRatio + invalidIgnoredTrafficRatio - let numberOfPeersForAgent = float64(numberOfPeersForAgent(g, agent)) - libp2p_gossipsub_peers_score_invalidTrafficMB.inc(float64(peer.invalidTraffic) / 1_000_000, labelValues = [agent]) - libp2p_gossipsub_peers_score_invalidIgnoredTrafficMB.inc(float64(peer.invalidIgnoredTraffic) / 1_000_000, labelValues = [agent]) - libp2p_gossipsub_peers_score_totalTrafficMB.inc(float64(peer.totalTraffic) / 1_000_000, labelValues = [agent]) + libp2p_gossipsub_peers_invalidTrafficMB.inc(float64(peer.invalidTraffic) / 1_000_000, labelValues = [agent]) + libp2p_gossipsub_peers_invalidIgnoredTrafficMB.inc(float64(peer.invalidIgnoredTraffic) / 1_000_000, labelValues = [agent]) + libp2p_gossipsub_peers_totalTrafficMB.inc(float64(peer.totalTraffic) / 1_000_000, labelValues = [agent]) if g.disconnectIfBadPeer(peer, -totalInvalidTrafficRatio, g.parameters.invalidTrafficRatioThreshold): - libp2p_gossipsub_peers_score_badTrafficScorePeerDisconnections.inc(labelValues = [getAgent(peer)]) + libp2p_gossipsub_peers_badTrafficScorePeerDisconnections.inc(labelValues = [getAgent(peer)]) debug "Bad traffic score peer disconnected", peer proc updateScores*(g: GossipSub) = # avoid async From e45a32a9744d9f643627ce72962500d92bfebba2 Mon Sep 17 00:00:00 2001 From: Diego Date: Tue, 25 Jul 2023 17:41:59 +0200 Subject: [PATCH 10/26] Fix metric names and values --- libp2p/protocols/pubsub/gossipsub/scoring.nim | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub/scoring.nim b/libp2p/protocols/pubsub/gossipsub/scoring.nim index 0c6c938cf5..ac0009bc33 100644 --- a/libp2p/protocols/pubsub/gossipsub/scoring.nim +++ b/libp2p/protocols/pubsub/gossipsub/scoring.nim @@ -28,9 +28,9 @@ declareGauge(libp2p_gossipsub_peers_score_invalidMessageDeliveries, "Detailed go declareGauge(libp2p_gossipsub_peers_score_appScore, "Detailed gossipsub scoring metric", labels = ["agent"]) declareGauge(libp2p_gossipsub_peers_score_behaviourPenalty, "Detailed gossipsub scoring metric", labels = ["agent"]) declareGauge(libp2p_gossipsub_peers_score_colocationFactor, "Detailed gossipsub scoring metric", labels = ["agent"]) -declareGauge(libp2p_gossipsub_peers_invalidIgnoredTrafficMB, "Invalid Ignored Traffic (MB)", labels = ["agent"]) -declareGauge(libp2p_gossipsub_peers_invalidTrafficMB, "Invalid Traffic (MB)", labels = ["agent"]) -declareGauge(libp2p_gossipsub_peers_totalTrafficMB, "Total Traffic (MB)", labels = ["agent"]) +declareGauge(libp2p_gossipsub_peers_invalidIgnoredTraffic_bytes, "Invalid Ignored Traffic", labels = ["agent"]) +declareGauge(libp2p_gossipsub_peers_invalidTraffic_bytes, "Invalid Traffic", labels = ["agent"]) +declareGauge(libp2p_gossipsub_peers_totalTraffic_bytes, "Total Traffic", labels = ["agent"]) declareCounter(libp2p_gossipsub_peers_badTrafficScorePeerDisconnections, "The number of peer disconnections by gossipsub because of bad traffic", labels = ["agent"]) proc init*(_: type[TopicParams]): TopicParams = @@ -125,9 +125,9 @@ proc disconnectIfBadTrafficPeer*(g: GossipSub, peer: PubSubPeer) = let invalidTrafficRatio = float64(peer.invalidTraffic) / float64(peer.totalTraffic) let invalidIgnoredTrafficRatio = float64(peer.invalidIgnoredTraffic) / float64(peer.totalTraffic) let totalInvalidTrafficRatio = invalidTrafficRatio + invalidIgnoredTrafficRatio - libp2p_gossipsub_peers_invalidTrafficMB.inc(float64(peer.invalidTraffic) / 1_000_000, labelValues = [agent]) - libp2p_gossipsub_peers_invalidIgnoredTrafficMB.inc(float64(peer.invalidIgnoredTraffic) / 1_000_000, labelValues = [agent]) - libp2p_gossipsub_peers_totalTrafficMB.inc(float64(peer.totalTraffic) / 1_000_000, labelValues = [agent]) + libp2p_gossipsub_peers_invalidTraffic_bytes.inc(float64(peer.invalidTraffic), labelValues = [agent]) + libp2p_gossipsub_peers_invalidIgnoredTraffic_bytes.inc(float64(peer.invalidIgnoredTraffic), labelValues = [agent]) + libp2p_gossipsub_peers_totalTraffic_bytes.inc(float64(peer.totalTraffic), labelValues = [agent]) if g.disconnectIfBadPeer(peer, -totalInvalidTrafficRatio, g.parameters.invalidTrafficRatioThreshold): libp2p_gossipsub_peers_badTrafficScorePeerDisconnections.inc(labelValues = [getAgent(peer)]) @@ -293,7 +293,6 @@ proc addCapped*[T](stat: var T, diff, cap: T) = proc rewardDelivered*( g: GossipSub, peer: PubSubPeer, topics: openArray[string], first: bool, delay = ZeroDuration) = - for tt in topics: let t = tt if t notin g.topics: From b267cab0aa0468a67dfe1c5a407a63b999828da8 Mon Sep 17 00:00:00 2001 From: Diego Date: Tue, 25 Jul 2023 18:20:36 +0200 Subject: [PATCH 11/26] Fix sign --- libp2p/protocols/pubsub/gossipsub/scoring.nim | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/libp2p/protocols/pubsub/gossipsub/scoring.nim b/libp2p/protocols/pubsub/gossipsub/scoring.nim index ac0009bc33..ec85cfbe64 100644 --- a/libp2p/protocols/pubsub/gossipsub/scoring.nim +++ b/libp2p/protocols/pubsub/gossipsub/scoring.nim @@ -129,7 +129,8 @@ proc disconnectIfBadTrafficPeer*(g: GossipSub, peer: PubSubPeer) = libp2p_gossipsub_peers_invalidIgnoredTraffic_bytes.inc(float64(peer.invalidIgnoredTraffic), labelValues = [agent]) libp2p_gossipsub_peers_totalTraffic_bytes.inc(float64(peer.totalTraffic), labelValues = [agent]) - if g.disconnectIfBadPeer(peer, -totalInvalidTrafficRatio, g.parameters.invalidTrafficRatioThreshold): + # inverting the signs as we want to compare if the ratio is above the threshold + if g.disconnectIfBadPeer(peer, -totalInvalidTrafficRatio, -g.parameters.invalidTrafficRatioThreshold): libp2p_gossipsub_peers_badTrafficScorePeerDisconnections.inc(labelValues = [getAgent(peer)]) debug "Bad traffic score peer disconnected", peer From 2bf7fe515c79fe80d6d17108b9497d7846e7e690 Mon Sep 17 00:00:00 2001 From: Diego Date: Thu, 27 Jul 2023 23:21:20 +0200 Subject: [PATCH 12/26] Introduce a useless application byte rate limit --- libp2p/protocols/pubsub/gossipsub.nim | 13 +++- libp2p/protocols/pubsub/gossipsub/scoring.nim | 60 +++---------------- libp2p/protocols/pubsub/pubsub.nim | 19 ++++++ libp2p/protocols/pubsub/pubsubpeer.nim | 37 +++++++++--- libp2p/protocols/pubsub/rpc/messages.nim | 24 +++++--- 5 files changed, 84 insertions(+), 69 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index f38e2fc65e..ee54d417b1 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -160,8 +160,7 @@ method onNewPeer(g: GossipSub, peer: PubSubPeer) = peer.behaviourPenalty = stats.behaviourPenalty # Check if the score is below the threshold and disconnect the peer if necessary - if not g.disconnectIfBadPeer(peer, stats.score, g.parameters.graylistThreshold): - g.disconnectIfBadTrafficPeer(peer) + g.disconnectIfBadPeer(peer, stats.score) peer.iHaveBudget = IHavePeerBudget peer.pingBudget = PingsPeerBudget @@ -381,6 +380,10 @@ method rpcHandler*(g: GossipSub, peer: PubSubPeer, rpcMsg: RPCMsg) {.async.} = + if peer.shouldDisconnectPeer: + discard g.disconnectPeer(peer) + return + if rpcMsg.ping.len in 1..<64 and peer.pingBudget > 0: g.send(peer, RPCMsg(pong: rpcMsg.ping)) peer.pingBudget.dec @@ -673,3 +676,9 @@ method initPubSub*(g: GossipSub) # init gossip stuff g.mcache = MCache.init(g.parameters.historyGossip, g.parameters.historyLength) + +method shoulDisconnectPeer*(g: GossipSub, peer: PubSubPeer, score: float64): bool = + if g.parameters.disconnectBadPeers and score < g.parameters.graylistThreshold and + peer.peerId notin g.parameters.directPeers: + return true + return false \ No newline at end of file diff --git a/libp2p/protocols/pubsub/gossipsub/scoring.nim b/libp2p/protocols/pubsub/gossipsub/scoring.nim index ec85cfbe64..384e172e61 100644 --- a/libp2p/protocols/pubsub/gossipsub/scoring.nim +++ b/libp2p/protocols/pubsub/gossipsub/scoring.nim @@ -11,16 +11,17 @@ import std/[tables, sets] import chronos, chronicles, metrics +import chronos/ratelimit import "."/[types] import ".."/[pubsubpeer] import ../rpc/messages import "../../.."/[peerid, multiaddress, switch, utils/heartbeat] +import ../pubsub logScope: topics = "libp2p gossipsub" declareGauge(libp2p_gossipsub_peers_scores, "the scores of the peers in gossipsub", labels = ["agent"]) -declareCounter(libp2p_gossipsub_bad_score_disconnection, "the number of peers disconnected by gossipsub", labels = ["agent"]) declareGauge(libp2p_gossipsub_peers_score_firstMessageDeliveries, "Detailed gossipsub scoring metric", labels = ["agent"]) declareGauge(libp2p_gossipsub_peers_score_meshMessageDeliveries, "Detailed gossipsub scoring metric", labels = ["agent"]) declareGauge(libp2p_gossipsub_peers_score_meshFailurePenalty, "Detailed gossipsub scoring metric", labels = ["agent"]) @@ -90,50 +91,6 @@ proc colocationFactor(g: GossipSub, peer: PubSubPeer): float64 = {.pop.} -proc getAgent(peer: PubSubPeer): string = - return - when defined(libp2p_agents_metrics): - if peer.shortAgent.len > 0: - peer.shortAgent - else: - "unknown" - else: - "unknown" - -proc disconnectPeer(g: GossipSub, peer: PubSubPeer) {.async.} = - let agent = getAgent(peer) - libp2p_gossipsub_bad_score_disconnection.inc(labelValues = [agent]) - - try: - await g.switch.disconnect(peer.peerId) - except CatchableError as exc: # Never cancelled - trace "Failed to close connection", peer, error = exc.name, msg = exc.msg - -proc disconnectIfBadPeer*(g: GossipSub, peer: PubSubPeer, score: float64, threshold: float64): bool = - if g.parameters.disconnectBadPeers and score < threshold and - peer.peerId notin g.parameters.directPeers: - debug "disconnecting bad score peer", peer, score, threshold - asyncSpawn(g.disconnectPeer(peer)) - return true - - return false - -proc disconnectIfBadTrafficPeer*(g: GossipSub, peer: PubSubPeer) = - if peer.totalTraffic > 0: - # dividing in this way to avoid integer overflow - let agent = getAgent(peer) - let invalidTrafficRatio = float64(peer.invalidTraffic) / float64(peer.totalTraffic) - let invalidIgnoredTrafficRatio = float64(peer.invalidIgnoredTraffic) / float64(peer.totalTraffic) - let totalInvalidTrafficRatio = invalidTrafficRatio + invalidIgnoredTrafficRatio - libp2p_gossipsub_peers_invalidTraffic_bytes.inc(float64(peer.invalidTraffic), labelValues = [agent]) - libp2p_gossipsub_peers_invalidIgnoredTraffic_bytes.inc(float64(peer.invalidIgnoredTraffic), labelValues = [agent]) - libp2p_gossipsub_peers_totalTraffic_bytes.inc(float64(peer.totalTraffic), labelValues = [agent]) - - # inverting the signs as we want to compare if the ratio is above the threshold - if g.disconnectIfBadPeer(peer, -totalInvalidTrafficRatio, -g.parameters.invalidTrafficRatioThreshold): - libp2p_gossipsub_peers_badTrafficScorePeerDisconnections.inc(labelValues = [getAgent(peer)]) - debug "Bad traffic score peer disconnected", peer - proc updateScores*(g: GossipSub) = # avoid async ## https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#the-score-function ## @@ -202,7 +159,7 @@ proc updateScores*(g: GossipSub) = # avoid async score += topicScore * topicParams.topicWeight # Score metrics - let agent = getAgent(peer) + let agent = peer.getAgent() libp2p_gossipsub_peers_score_firstMessageDeliveries.inc(info.firstMessageDeliveries, labelValues = [agent]) libp2p_gossipsub_peers_score_meshMessageDeliveries.inc(info.meshMessageDeliveries, labelValues = [agent]) libp2p_gossipsub_peers_score_meshFailurePenalty.inc(info.meshFailurePenalty, labelValues = [agent]) @@ -239,7 +196,7 @@ proc updateScores*(g: GossipSub) = # avoid async score += colocationFactor * g.parameters.ipColocationFactorWeight # Score metrics - let agent = getAgent(peer) + let agent = peer.getAgent() libp2p_gossipsub_peers_score_appScore.inc(peer.appScore, labelValues = [agent]) libp2p_gossipsub_peers_score_behaviourPenalty.inc(peer.behaviourPenalty, labelValues = [agent]) libp2p_gossipsub_peers_score_colocationFactor.inc(colocationFactor, labelValues = [agent]) @@ -259,10 +216,7 @@ proc updateScores*(g: GossipSub) = # avoid async trace "updated peer's score", peer, score = peer.score, n_topics, is_grafted - if not g.disconnectIfBadPeer(peer, stats.score, g.parameters.graylistThreshold): - # this is necessary to disconnect peers that send only non-parseable messages as the message won't go through the normal path - g.disconnectIfBadTrafficPeer(peer) - + g.disconnectIfBadPeer(peer, stats.score) libp2p_gossipsub_peers_scores.inc(peer.score, labelValues = [agent]) for peer in evicting: @@ -276,8 +230,8 @@ proc scoringHeartbeat*(g: GossipSub) {.async.} = g.updateScores() proc punishInvalidMessage*(g: GossipSub, peer: PubSubPeer, msg: Message) = - peer.invalidTraffic += byteSize(msg) - g.disconnectIfBadTrafficPeer(peer) + if not peer.uselessAppBytesRate.tryConsume(len(msg.data)): + discard g.disconnectPeer(peer) for tt in msg.topicIds: let t = tt diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 02436a9b4b..e3e32fb55f 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -79,6 +79,8 @@ declarePublicCounter(libp2p_pubsub_received_ihave, "pubsub broadcast ihave", lab declarePublicCounter(libp2p_pubsub_received_graft, "pubsub broadcast graft", labels = ["topic"]) declarePublicCounter(libp2p_pubsub_received_prune, "pubsub broadcast prune", labels = ["topic"]) +declareCounter(libp2p_gossipsub_bad_score_disconnection, "the number of peers disconnected by gossipsub", labels = ["agent"]) + type InitializationError* = object of LPError @@ -603,3 +605,20 @@ proc removeObserver*(p: PubSub; observer: PubSubObserver) {.public.} = let idx = p.observers[].find(observer) if idx != -1: p.observers[].del(idx) + +proc disconnectPeer*(p: PubSub, peer: PubSubPeer) {.async.} = + let agent = peer.getAgent() + libp2p_gossipsub_bad_score_disconnection.inc(labelValues = [agent]) + + try: + await p.switch.disconnect(peer.peerId) + except CatchableError as exc: # Never cancelled + trace "Failed to close connection", peer, error = exc.name, msg = exc.msg + +method shoulDisconnectPeer*(p: PubSub, peer: PubSubPeer, score: float64): bool {.base.} = + doAssert(false, "Not implemented!") + +method disconnectIfBadPeer*(p: PubSub, peer: PubSubPeer, score: float64) {.base.} = + if p.shoulDisconnectPeer(peer, score): + debug "disconnecting bad score peer", peer, score + asyncSpawn(p.disconnectPeer(peer)) \ No newline at end of file diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index b6e67652a0..cdca134a48 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -12,6 +12,7 @@ import std/[sequtils, strutils, tables, hashes, options, sets, deques] import stew/results import chronos, chronicles, nimcrypto/sha2, metrics +import chronos/ratelimit import rpc/[messages, message, protobuf], ../../peerid, ../../peerinfo, @@ -66,9 +67,8 @@ type maxMessageSize: int appScore*: float64 # application specific score behaviourPenalty*: float64 # the eventual penalty score - totalTraffic*: int64 - invalidTraffic*: int64 # data that was parsed by protobuf but was invalid - invalidIgnoredTraffic*: int64 # data that couldn't be parsed by protobuf + uselessAppBytesRate*: TokenBucket + shouldDisconnectPeer*: bool RPCHandler* = proc(peer: PubSubPeer, msg: RPCMsg): Future[void] {.gcsafe, raises: [].} @@ -130,6 +130,9 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = try: try: while not conn.atEof: + if p.shouldDisconnectPeer: + debug "Ignoring peer", conn, peer = p + break trace "waiting for data", conn, peer = p, closed = conn.closed var data = await conn.readLp(p.maxMessageSize) @@ -139,14 +142,22 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = # In this way we count even ignored fields by protobuf let msgSize = len(data) - p.totalTraffic += msgSize var rmsg = decodeRpcMsg(data).valueOr: - p.invalidIgnoredTraffic += msgSize + if not p.uselessAppBytesRate.tryConsume(msgSize): + p.shouldDisconnectPeer = true debug "failed to decode msg from peer", conn, peer = p, closed = conn.closed, err = error break - p.invalidIgnoredTraffic += msgSize - byteSize(rmsg) + + let uselessAppBytesNum = block: + rmsg.control.withValue(control): + msgSize - byteSize(control.ihave) - byteSize(control.iwant) + else: msgSize + + if not p.uselessAppBytesRate.tryConsume(uselessAppBytesNum): + p.shouldDisconnectPeer = true + data = newSeq[byte]() # Release memory trace "decoded msg from peer", @@ -323,7 +334,19 @@ proc new*( codec: codec, peerId: peerId, connectedFut: newFuture[void](), - maxMessageSize: maxMessageSize + maxMessageSize: maxMessageSize, + uselessAppBytesRate: TokenBucket.new(1024, 1.seconds), + shouldDisconnectPeer: false, ) result.sentIHaves.addFirst(default(HashSet[MessageId])) result.heDontWants.addFirst(default(HashSet[MessageId])) + +proc getAgent*(peer: PubSubPeer): string = + return + when defined(libp2p_agents_metrics): + if peer.shortAgent.len > 0: + peer.shortAgent + else: + "unknown" + else: + "unknown" \ No newline at end of file diff --git a/libp2p/protocols/pubsub/rpc/messages.nim b/libp2p/protocols/pubsub/rpc/messages.nim index 6222a0833a..ad9959dbd7 100644 --- a/libp2p/protocols/pubsub/rpc/messages.nim +++ b/libp2p/protocols/pubsub/rpc/messages.nim @@ -128,6 +128,21 @@ proc byteSize*(msg: Message): int = total += topicId.len return total +proc byteSize*(ihave: seq[ControlIHave]): int = + var total = 0 + for item in ihave: + total += item.topicId.len + for msgId in item.messageIds: + total += msgId.len + return total + +proc byteSize*(iwant: seq[ControlIWant]): int = + var total = 0 + for item in iwant: + for msgId in item.messageIds: + total += msgId.len + return total + proc byteSize*(msg: RPCMsg): int = var total = 0 @@ -140,14 +155,9 @@ proc byteSize*(msg: RPCMsg): int = if msg.control.isSome: let ctrl = msg.control.get() - for item in ctrl.ihave: - total += item.topicId.len - for msgId in item.messageIds: - total += msgId.len + total += byteSize(ctrl.ihave) - for item in ctrl.iwant: - for msgId in item.messageIds: - total += msgId.len + total += byteSize(ctrl.iwant) for item in ctrl.graft: total += item.topicId.len From abbb43857952df7e58704ec4bd5aaa022037fa13 Mon Sep 17 00:00:00 2001 From: Diego Date: Mon, 31 Jul 2023 16:33:17 +0200 Subject: [PATCH 13/26] Introduce a rate limit --- libp2p/protocols/pubsub/floodsub.nim | 7 ++- libp2p/protocols/pubsub/gossipsub.nim | 6 +- libp2p/protocols/pubsub/pubsub.nim | 76 +++++++++++++++++------- libp2p/protocols/pubsub/pubsubpeer.nim | 43 ++++---------- libp2p/protocols/pubsub/rpc/messages.nim | 3 + tests/pubsub/testgossipinternal.nim | 25 ++++---- 6 files changed, 91 insertions(+), 69 deletions(-) diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index baea4d2640..70e272e453 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -95,7 +95,9 @@ method unsubscribePeer*(f: FloodSub, peer: PeerId) = method rpcHandler*(f: FloodSub, peer: PubSubPeer, - rpcMsg: RPCMsg) {.async.} = + data: seq[byte]) {.async.} = + let rpcMsg = decodeAndCheckRateLimit(f, peer, data) + for i in 0.. 0: g.send(peer, RPCMsg(pong: rpcMsg.ping)) diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index e3e32fb55f..2d5aaf23f8 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -17,6 +17,7 @@ import std/[tables, sequtils, sets, strutils] import chronos, chronicles, metrics +import chronos/ratelimit import ./errors as pubsub_errors, ./pubsubpeer, ./rpc/[message, messages, protobuf], @@ -263,9 +264,61 @@ proc updateMetrics*(p: PubSub, rpcMsg: RPCMsg) = else: libp2p_pubsub_received_prune.inc(labelValues = ["generic"]) +proc disconnectPeer*(p: PubSub, peer: PubSubPeer) {.async.} = + let agent = peer.getAgent() + libp2p_gossipsub_bad_score_disconnection.inc(labelValues = [agent]) + + try: + await p.switch.disconnect(peer.peerId) + debug "Peer disconnected", peer + except CatchableError as exc: # Never cancelled + debug "Failed to close connection", peer, error = exc.name, msg = exc.msg + +method shoulDisconnectPeer*(p: PubSub, peer: PubSubPeer, score: float64): bool {.base.} = + doAssert(false, "Not implemented!") + +method disconnectIfBadPeer*(p: PubSub, peer: PubSubPeer, score: float64) {.base.} = + if p.shoulDisconnectPeer(peer, score): + debug "disconnecting bad score peer", peer, score + asyncSpawn(p.disconnectPeer(peer)) + +proc decodeAndCheckRateLimit*(p: PubSub, peer: PubSubPeer, data: seq[byte]): RPCMsg {.raises:[PeerRateLimitError].} = + # In this way we count even ignored fields by protobuf + let msgSize = len(data) + var rmsg = decodeRpcMsg(data).valueOr: + debug "failed to decode msg from peer", peer, err = error + if not peer.uselessAppBytesRate.tryConsume(msgSize): + discard p.disconnectPeer(peer) + raise newException(PeerRateLimitError, "Peer sent too much useless data that couldn't be decoded") + + let usefulMsgBytesNum = + if p.verifySignature: + byteSize(rmsg.messages) + else: + rmsg.messages.mapIt(it.data.len).foldl(a + b, 0) + + rmsg.messages + .mapIt( + it.topicIds.mapIt(it.len).foldl(a + b, 0) + ) + .foldl(a + b, 0) + + var uselessAppBytesNum = msgSize - usefulMsgBytesNum + rmsg.control.withValue(control): + uselessAppBytesNum -= byteSize(control.ihave) - byteSize(control.iwant) + + if not peer.uselessAppBytesRate.tryConsume(uselessAppBytesNum): + debug "Peer sent too many useless data", peer, msgSize, uselessAppBytesNum + discard p.disconnectPeer(peer) + raise newException(PeerRateLimitError, "Peer sent too much useless application data") + + debug "decoded msg from peer", peer, msg = rmsg.shortLog + # trigger hooks + peer.recvObservers(rmsg) + return rmsg + method rpcHandler*(p: PubSub, peer: PubSubPeer, - rpcMsg: RPCMsg): Future[void] {.base, async.} = + data: seq[byte]): Future[void] {.base, async.} = ## Handler that must be overridden by concrete implementation raiseAssert "Unimplemented" @@ -356,9 +409,9 @@ method handleConn*(p: PubSub, ## that we're interested in ## - proc handler(peer: PubSubPeer, msg: RPCMsg): Future[void] = + proc handler(peer: PubSubPeer, data: seq[byte]): Future[void] = # call pubsub rpc handler - p.rpcHandler(peer, msg) + p.rpcHandler(peer, data) let peer = p.getOrCreatePeer(conn.peerId, @[proto]) @@ -605,20 +658,3 @@ proc removeObserver*(p: PubSub; observer: PubSubObserver) {.public.} = let idx = p.observers[].find(observer) if idx != -1: p.observers[].del(idx) - -proc disconnectPeer*(p: PubSub, peer: PubSubPeer) {.async.} = - let agent = peer.getAgent() - libp2p_gossipsub_bad_score_disconnection.inc(labelValues = [agent]) - - try: - await p.switch.disconnect(peer.peerId) - except CatchableError as exc: # Never cancelled - trace "Failed to close connection", peer, error = exc.name, msg = exc.msg - -method shoulDisconnectPeer*(p: PubSub, peer: PubSubPeer, score: float64): bool {.base.} = - doAssert(false, "Not implemented!") - -method disconnectIfBadPeer*(p: PubSub, peer: PubSubPeer, score: float64) {.base.} = - if p.shoulDisconnectPeer(peer, score): - debug "disconnecting bad score peer", peer, score - asyncSpawn(p.disconnectPeer(peer)) \ No newline at end of file diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index cdca134a48..17f872cb50 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -33,6 +33,8 @@ when defined(libp2p_expensive_metrics): declareCounter(libp2p_pubsub_skipped_sent_messages, "number of sent skipped messages", labels = ["id"]) type + PeerRateLimitError* = object of CatchableError + PubSubObserver* = ref object onRecv*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [].} onSend*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [].} @@ -70,7 +72,7 @@ type uselessAppBytesRate*: TokenBucket shouldDisconnectPeer*: bool - RPCHandler* = proc(peer: PubSubPeer, msg: RPCMsg): Future[void] + RPCHandler* = proc(peer: PubSubPeer, data: seq[byte]): Future[void] {.gcsafe, raises: [].} when defined(libp2p_agents_metrics): @@ -110,7 +112,7 @@ func outbound*(p: PubSubPeer): bool = else: false -proc recvObservers(p: PubSubPeer, msg: var RPCMsg) = +proc recvObservers*(p: PubSubPeer, msg: var RPCMsg) = # trigger hooks if not(isNil(p.observers)) and p.observers[].len > 0: for obs in p.observers[]: @@ -130,9 +132,6 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = try: try: while not conn.atEof: - if p.shouldDisconnectPeer: - debug "Ignoring peer", conn, peer = p - break trace "waiting for data", conn, peer = p, closed = conn.closed var data = await conn.readLp(p.maxMessageSize) @@ -140,39 +139,19 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = conn, peer = p, closed = conn.closed, data = data.shortLog - # In this way we count even ignored fields by protobuf - let msgSize = len(data) - var rmsg = decodeRpcMsg(data).valueOr: - if not p.uselessAppBytesRate.tryConsume(msgSize): - p.shouldDisconnectPeer = true - debug "failed to decode msg from peer", - conn, peer = p, closed = conn.closed, - err = error - break - - let uselessAppBytesNum = block: - rmsg.control.withValue(control): - msgSize - byteSize(control.ihave) - byteSize(control.iwant) - else: msgSize - - if not p.uselessAppBytesRate.tryConsume(uselessAppBytesNum): - p.shouldDisconnectPeer = true - - data = newSeq[byte]() # Release memory - - trace "decoded msg from peer", - conn, peer = p, closed = conn.closed, - msg = rmsg.shortLog - # trigger hooks - p.recvObservers(rmsg) - when defined(libp2p_expensive_metrics): for m in rmsg.messages: for t in m.topicIDs: # metrics libp2p_pubsub_received_messages.inc(labelValues = [$p.peerId, t]) - await p.handler(p, rmsg) + await p.handler(p, data) + data = newSeq[byte]() # Release memory + except PeerRateLimitError as exc: + debug "Peer rate limit exceeded, exiting read while", conn, peer = p, error = exc.msg + except CatchableError as exc: + debug "Exception occurred in PubSubPeer.handle", + conn, peer = p, closed = conn.closed, exc = exc.msg finally: await conn.close() except CancelledError: diff --git a/libp2p/protocols/pubsub/rpc/messages.nim b/libp2p/protocols/pubsub/rpc/messages.nim index ad9959dbd7..cfa070ed05 100644 --- a/libp2p/protocols/pubsub/rpc/messages.nim +++ b/libp2p/protocols/pubsub/rpc/messages.nim @@ -128,6 +128,9 @@ proc byteSize*(msg: Message): int = total += topicId.len return total +proc byteSize*(msgs: seq[Message]): int = + msgs.mapIt(byteSize(it)).foldl(a + b, 0) + proc byteSize*(ihave: seq[ControlIHave]): int = var total = 0 for item in ihave: diff --git a/tests/pubsub/testgossipinternal.nim b/tests/pubsub/testgossipinternal.nim index 909e1d613a..00b5170414 100644 --- a/tests/pubsub/testgossipinternal.nim +++ b/tests/pubsub/testgossipinternal.nim @@ -10,6 +10,7 @@ import ../../libp2p/crypto/crypto import ../../libp2p/stream/bufferstream import ../../libp2p/switch import ../../libp2p/muxers/muxer +import ../../libp2p/protocols/pubsub/rpc/protobuf import ../helpers @@ -170,7 +171,7 @@ suite "GossipSub internal": asyncTest "`replenishFanout` Degree Lo": let gossipSub = TestGossipSub.init(newStandardSwitch()) - proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = + proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} = discard let topic = "foobar" @@ -197,7 +198,7 @@ suite "GossipSub internal": asyncTest "`dropFanoutPeers` drop expired fanout topics": let gossipSub = TestGossipSub.init(newStandardSwitch()) - proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = + proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} = discard let topic = "foobar" @@ -227,7 +228,7 @@ suite "GossipSub internal": asyncTest "`dropFanoutPeers` leave unexpired fanout topics": let gossipSub = TestGossipSub.init(newStandardSwitch()) - proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = + proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} = discard let topic1 = "foobar1" @@ -264,7 +265,7 @@ suite "GossipSub internal": asyncTest "`getGossipPeers` - should gather up to degree D non intersecting peers": let gossipSub = TestGossipSub.init(newStandardSwitch()) - proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = + proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} = discard let topic = "foobar" @@ -325,7 +326,7 @@ suite "GossipSub internal": asyncTest "`getGossipPeers` - should not crash on missing topics in mesh": let gossipSub = TestGossipSub.init(newStandardSwitch()) - proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = + proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} = discard let topic = "foobar" @@ -365,7 +366,7 @@ suite "GossipSub internal": asyncTest "`getGossipPeers` - should not crash on missing topics in fanout": let gossipSub = TestGossipSub.init(newStandardSwitch()) - proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = + proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} = discard let topic = "foobar" @@ -406,7 +407,7 @@ suite "GossipSub internal": asyncTest "`getGossipPeers` - should not crash on missing topics in gossip": let gossipSub = TestGossipSub.init(newStandardSwitch()) - proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = + proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} = discard let topic = "foobar" @@ -447,7 +448,7 @@ suite "GossipSub internal": asyncTest "Drop messages of topics without subscription": let gossipSub = TestGossipSub.init(newStandardSwitch()) - proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = + proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} = check false let topic = "foobar" @@ -470,7 +471,7 @@ suite "GossipSub internal": let peer = gossipSub.getPubSubPeer(peerId) inc seqno let msg = Message.init(peerId, ("bar" & $i).toBytes(), topic, some(seqno)) - await gossipSub.rpcHandler(peer, RPCMsg(messages: @[msg])) + await gossipSub.rpcHandler(peer, encodeRpcMsg(RPCMsg(messages: @[msg]), false)) check gossipSub.mcache.msgs.len == 0 @@ -481,7 +482,7 @@ suite "GossipSub internal": let gossipSub = TestGossipSub.init(newStandardSwitch()) gossipSub.parameters.disconnectBadPeers = true gossipSub.parameters.appSpecificWeight = 1.0 - proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = + proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} = check false let topic = "foobar" @@ -525,7 +526,7 @@ suite "GossipSub internal": conn.peerId = peerId let peer = gossipSub.getPubSubPeer(peerId) - await gossipSub.rpcHandler(peer, lotOfSubs) + await gossipSub.rpcHandler(peer, encodeRpcMsg(lotOfSubs, false)) check: gossipSub.gossipsub.len == gossipSub.topicsHigh @@ -656,7 +657,7 @@ suite "GossipSub internal": asyncTest "handleIHave/Iwant tests": let gossipSub = TestGossipSub.init(newStandardSwitch()) - proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = + proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} = check false proc handler2(topic: string, data: seq[byte]) {.async.} = discard From 039672424bdd5966d82f0814aaf6e6395433c55f Mon Sep 17 00:00:00 2001 From: Diego Date: Wed, 2 Aug 2023 01:19:04 +0200 Subject: [PATCH 14/26] Add rate limit only to gossipsub --- libp2p/protocols/pubsub/floodsub.nim | 14 +++-- libp2p/protocols/pubsub/gossipsub.nim | 48 ++++++++++++++--- libp2p/protocols/pubsub/gossipsub/scoring.nim | 19 +++++-- libp2p/protocols/pubsub/gossipsub/types.nim | 1 - libp2p/protocols/pubsub/pubsub.nim | 54 ------------------- libp2p/protocols/pubsub/pubsubpeer.nim | 6 +-- 6 files changed, 66 insertions(+), 76 deletions(-) diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index 70e272e453..19b548db25 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -15,7 +15,7 @@ import ./pubsub, ./pubsubpeer, ./timedcache, ./peertable, - ./rpc/[message, messages], + ./rpc/[message, messages, protobuf], ../../crypto/crypto, ../../stream/connection, ../../peerid, @@ -96,7 +96,14 @@ method unsubscribePeer*(f: FloodSub, peer: PeerId) = method rpcHandler*(f: FloodSub, peer: PubSubPeer, data: seq[byte]) {.async.} = - let rpcMsg = decodeAndCheckRateLimit(f, peer, data) + let msgSize = len(data) + var rpcMsg = decodeRpcMsg(data).valueOr: + debug "failed to decode msg from peer", peer, err = error + raise newException(CatchableError, "") + + debug "decoded msg from peer", peer, msg = rpcMsg.shortLog + # trigger hooks + peer.recvObservers(rpcMsg) for i in 0.. 0: g.send(peer, RPCMsg(pong: rpcMsg.ping)) peer.pingBudget.dec @@ -674,9 +712,3 @@ method initPubSub*(g: GossipSub) # init gossip stuff g.mcache = MCache.init(g.parameters.historyGossip, g.parameters.historyLength) - -method shoulDisconnectPeer*(g: GossipSub, peer: PubSubPeer, score: float64): bool = - if g.parameters.disconnectBadPeers and score < g.parameters.graylistThreshold and - peer.peerId notin g.parameters.directPeers: - return true - return false \ No newline at end of file diff --git a/libp2p/protocols/pubsub/gossipsub/scoring.nim b/libp2p/protocols/pubsub/gossipsub/scoring.nim index 384e172e61..8ce2c6c32d 100644 --- a/libp2p/protocols/pubsub/gossipsub/scoring.nim +++ b/libp2p/protocols/pubsub/gossipsub/scoring.nim @@ -22,6 +22,7 @@ logScope: topics = "libp2p gossipsub" declareGauge(libp2p_gossipsub_peers_scores, "the scores of the peers in gossipsub", labels = ["agent"]) +declareCounter(libp2p_gossipsub_bad_score_disconnection, "the number of peers disconnected by gossipsub", labels = ["agent"]) declareGauge(libp2p_gossipsub_peers_score_firstMessageDeliveries, "Detailed gossipsub scoring metric", labels = ["agent"]) declareGauge(libp2p_gossipsub_peers_score_meshMessageDeliveries, "Detailed gossipsub scoring metric", labels = ["agent"]) declareGauge(libp2p_gossipsub_peers_score_meshFailurePenalty, "Detailed gossipsub scoring metric", labels = ["agent"]) @@ -29,9 +30,6 @@ declareGauge(libp2p_gossipsub_peers_score_invalidMessageDeliveries, "Detailed go declareGauge(libp2p_gossipsub_peers_score_appScore, "Detailed gossipsub scoring metric", labels = ["agent"]) declareGauge(libp2p_gossipsub_peers_score_behaviourPenalty, "Detailed gossipsub scoring metric", labels = ["agent"]) declareGauge(libp2p_gossipsub_peers_score_colocationFactor, "Detailed gossipsub scoring metric", labels = ["agent"]) -declareGauge(libp2p_gossipsub_peers_invalidIgnoredTraffic_bytes, "Invalid Ignored Traffic", labels = ["agent"]) -declareGauge(libp2p_gossipsub_peers_invalidTraffic_bytes, "Invalid Traffic", labels = ["agent"]) -declareGauge(libp2p_gossipsub_peers_totalTraffic_bytes, "Total Traffic", labels = ["agent"]) declareCounter(libp2p_gossipsub_peers_badTrafficScorePeerDisconnections, "The number of peer disconnections by gossipsub because of bad traffic", labels = ["agent"]) proc init*(_: type[TopicParams]): TopicParams = @@ -91,6 +89,19 @@ proc colocationFactor(g: GossipSub, peer: PubSubPeer): float64 = {.pop.} +proc disconnectPeer*(g: GossipSub, peer: PubSubPeer) {.async.} = + try: + await g.switch.disconnect(peer.peerId) + except CatchableError as exc: # Never cancelled + trace "Failed to close connection", peer, error = exc.name, msg = exc.msg + +proc disconnectIfBadScorePeer*(g: GossipSub, peer: PubSubPeer, score: float64) = + if g.parameters.disconnectBadPeers and score < g.parameters.graylistThreshold and + peer.peerId notin g.parameters.directPeers: + debug "disconnecting bad score peer", peer, score = peer.score + asyncSpawn(g.disconnectPeer(peer)) + libp2p_gossipsub_bad_score_disconnection.inc(labelValues = [peer.getAgent()]) + proc updateScores*(g: GossipSub) = # avoid async ## https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#the-score-function ## @@ -216,7 +227,7 @@ proc updateScores*(g: GossipSub) = # avoid async trace "updated peer's score", peer, score = peer.score, n_topics, is_grafted - g.disconnectIfBadPeer(peer, stats.score) + g.disconnectIfBadScorePeer(peer, stats.score) libp2p_gossipsub_peers_scores.inc(peer.score, labelValues = [agent]) for peer in evicting: diff --git a/libp2p/protocols/pubsub/gossipsub/types.nim b/libp2p/protocols/pubsub/gossipsub/types.nim index a5edc154ad..25a9e636a8 100644 --- a/libp2p/protocols/pubsub/gossipsub/types.nim +++ b/libp2p/protocols/pubsub/gossipsub/types.nim @@ -141,7 +141,6 @@ type disconnectBadPeers*: bool enablePX*: bool - invalidTrafficRatioThreshold*: float64 bandwidthEstimatebps*: int # This is currently used only for limting flood publishing. 0 disables flood-limiting completely iwantTimeout*: Duration diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 2d5aaf23f8..3d97436218 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -80,8 +80,6 @@ declarePublicCounter(libp2p_pubsub_received_ihave, "pubsub broadcast ihave", lab declarePublicCounter(libp2p_pubsub_received_graft, "pubsub broadcast graft", labels = ["topic"]) declarePublicCounter(libp2p_pubsub_received_prune, "pubsub broadcast prune", labels = ["topic"]) -declareCounter(libp2p_gossipsub_bad_score_disconnection, "the number of peers disconnected by gossipsub", labels = ["agent"]) - type InitializationError* = object of LPError @@ -264,58 +262,6 @@ proc updateMetrics*(p: PubSub, rpcMsg: RPCMsg) = else: libp2p_pubsub_received_prune.inc(labelValues = ["generic"]) -proc disconnectPeer*(p: PubSub, peer: PubSubPeer) {.async.} = - let agent = peer.getAgent() - libp2p_gossipsub_bad_score_disconnection.inc(labelValues = [agent]) - - try: - await p.switch.disconnect(peer.peerId) - debug "Peer disconnected", peer - except CatchableError as exc: # Never cancelled - debug "Failed to close connection", peer, error = exc.name, msg = exc.msg - -method shoulDisconnectPeer*(p: PubSub, peer: PubSubPeer, score: float64): bool {.base.} = - doAssert(false, "Not implemented!") - -method disconnectIfBadPeer*(p: PubSub, peer: PubSubPeer, score: float64) {.base.} = - if p.shoulDisconnectPeer(peer, score): - debug "disconnecting bad score peer", peer, score - asyncSpawn(p.disconnectPeer(peer)) - -proc decodeAndCheckRateLimit*(p: PubSub, peer: PubSubPeer, data: seq[byte]): RPCMsg {.raises:[PeerRateLimitError].} = - # In this way we count even ignored fields by protobuf - let msgSize = len(data) - var rmsg = decodeRpcMsg(data).valueOr: - debug "failed to decode msg from peer", peer, err = error - if not peer.uselessAppBytesRate.tryConsume(msgSize): - discard p.disconnectPeer(peer) - raise newException(PeerRateLimitError, "Peer sent too much useless data that couldn't be decoded") - - let usefulMsgBytesNum = - if p.verifySignature: - byteSize(rmsg.messages) - else: - rmsg.messages.mapIt(it.data.len).foldl(a + b, 0) + - rmsg.messages - .mapIt( - it.topicIds.mapIt(it.len).foldl(a + b, 0) - ) - .foldl(a + b, 0) - - var uselessAppBytesNum = msgSize - usefulMsgBytesNum - rmsg.control.withValue(control): - uselessAppBytesNum -= byteSize(control.ihave) - byteSize(control.iwant) - - if not peer.uselessAppBytesRate.tryConsume(uselessAppBytesNum): - debug "Peer sent too many useless data", peer, msgSize, uselessAppBytesNum - discard p.disconnectPeer(peer) - raise newException(PeerRateLimitError, "Peer sent too much useless application data") - - debug "decoded msg from peer", peer, msg = rmsg.shortLog - # trigger hooks - peer.recvObservers(rmsg) - return rmsg - method rpcHandler*(p: PubSub, peer: PubSubPeer, data: seq[byte]): Future[void] {.base, async.} = diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 17f872cb50..984ff05b64 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -70,7 +70,6 @@ type appScore*: float64 # application specific score behaviourPenalty*: float64 # the eventual penalty score uselessAppBytesRate*: TokenBucket - shouldDisconnectPeer*: bool RPCHandler* = proc(peer: PubSubPeer, data: seq[byte]): Future[void] {.gcsafe, raises: [].} @@ -314,8 +313,7 @@ proc new*( peerId: peerId, connectedFut: newFuture[void](), maxMessageSize: maxMessageSize, - uselessAppBytesRate: TokenBucket.new(1024, 1.seconds), - shouldDisconnectPeer: false, + uselessAppBytesRate: TokenBucket.new(1024, 1.seconds) ) result.sentIHaves.addFirst(default(HashSet[MessageId])) result.heDontWants.addFirst(default(HashSet[MessageId])) @@ -328,4 +326,4 @@ proc getAgent*(peer: PubSubPeer): string = else: "unknown" else: - "unknown" \ No newline at end of file + "unknown" From fc11af46bbaf8fcd6493a4d149c095b6871b1a40 Mon Sep 17 00:00:00 2001 From: Diego Date: Wed, 2 Aug 2023 13:18:50 +0200 Subject: [PATCH 15/26] Decouple decoding and rate limit --- libp2p/protocols/pubsub/gossipsub.nim | 28 +++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 22e6a74009..55a17d5e12 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -377,19 +377,13 @@ proc validateAndRelay(g: GossipSub, except CatchableError as exc: info "validateAndRelay failed", msg=exc.msg -proc decodeAndCheckRateLimit*(g: GossipSub, peer: PubSubPeer, data: seq[byte]): RPCMsg {.raises:[PeerRateLimitError].} = +proc rateLimit*(g: GossipSub, peer: PubSubPeer, rpcMsgOpt: Opt[RPCMsg], msgSize: int) {.raises:[PeerRateLimitError].} = # In this way we count even ignored fields by protobuf - let msgSize = len(data) - var rmsg = decodeRpcMsg(data).valueOr: - debug "failed to decode msg from peer", peer, err = error + var rmsg = rpcMsgOpt.valueOr: if not peer.uselessAppBytesRate.tryConsume(msgSize): discard g.disconnectPeer(peer) raise newException(PeerRateLimitError, "Peer sent too much useless data that couldn't be decoded") - debug "decoded msg from peer", peer, msg = rmsg.shortLog - # trigger hooks - peer.recvObservers(rmsg) - let usefulMsgBytesNum = if g.verifySignature: byteSize(rmsg.messages) @@ -406,19 +400,25 @@ proc decodeAndCheckRateLimit*(g: GossipSub, peer: PubSubPeer, data: seq[byte]): uselessAppBytesNum -= byteSize(control.ihave) - byteSize(control.iwant) if not peer.uselessAppBytesRate.tryConsume(uselessAppBytesNum): - debug "Peer sent too many useless data", peer, msgSize, uselessAppBytesNum + debug "Peer sent too much useless application data", peer, msgSize, uselessAppBytesNum discard g.disconnectPeer(peer) - raise newException(PeerRateLimitError, "Peer sent too much useless application data") - - return rmsg + raise newException(PeerRateLimitError, "Peer sent too much useless application data.") method rpcHandler*(g: GossipSub, peer: PubSubPeer, data: seq[byte]) {.async.} = + let msgSize = data.len + let rpcMsgResult = decodeRpcMsg(data) + var rpcMsg = rpcMsgResult.valueOr: + debug "failed to decode msg from peer", peer, err = error + rateLimit(g, peer, Opt.none(RPCMsg), msgSize) + return - let rpcMsg = decodeAndCheckRateLimit(g, peer, data) - + debug "decoded msg from peer", peer, msg = rpcMsg.shortLog + rateLimit(g, peer, Opt.some(rpcMsg), msgSize) + # trigger hooks + peer.recvObservers(rpcMsg) if rpcMsg.ping.len in 1..<64 and peer.pingBudget > 0: g.send(peer, RPCMsg(pong: rpcMsg.ping)) From d54b1016d8e51548151b039b76153eca3e1bf410 Mon Sep 17 00:00:00 2001 From: Diego Date: Wed, 2 Aug 2023 14:16:38 +0200 Subject: [PATCH 16/26] Just measure at the beginning instead of disconnecting, for test purposes --- libp2p/protocols/pubsub/gossipsub.nim | 4 ++-- libp2p/protocols/pubsub/gossipsub/scoring.nim | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 55a17d5e12..05e80a7c1f 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -381,7 +381,7 @@ proc rateLimit*(g: GossipSub, peer: PubSubPeer, rpcMsgOpt: Opt[RPCMsg], msgSize: # In this way we count even ignored fields by protobuf var rmsg = rpcMsgOpt.valueOr: if not peer.uselessAppBytesRate.tryConsume(msgSize): - discard g.disconnectPeer(peer) + libp2p_gossipsub_peers_rate_limit_disconnections.inc(labelValues = [peer.getAgent()]) # let's just measure at the beginning for test purposes. # discard g.disconnectPeer(peer) raise newException(PeerRateLimitError, "Peer sent too much useless data that couldn't be decoded") let usefulMsgBytesNum = @@ -401,7 +401,7 @@ proc rateLimit*(g: GossipSub, peer: PubSubPeer, rpcMsgOpt: Opt[RPCMsg], msgSize: if not peer.uselessAppBytesRate.tryConsume(uselessAppBytesNum): debug "Peer sent too much useless application data", peer, msgSize, uselessAppBytesNum - discard g.disconnectPeer(peer) + libp2p_gossipsub_peers_rate_limit_disconnections.inc(labelValues = [peer.getAgent()]) # let's just measure at the beginning for test purposes. # discard g.disconnectPeer(peer) raise newException(PeerRateLimitError, "Peer sent too much useless application data.") method rpcHandler*(g: GossipSub, diff --git a/libp2p/protocols/pubsub/gossipsub/scoring.nim b/libp2p/protocols/pubsub/gossipsub/scoring.nim index 8ce2c6c32d..5465faa4aa 100644 --- a/libp2p/protocols/pubsub/gossipsub/scoring.nim +++ b/libp2p/protocols/pubsub/gossipsub/scoring.nim @@ -30,7 +30,7 @@ declareGauge(libp2p_gossipsub_peers_score_invalidMessageDeliveries, "Detailed go declareGauge(libp2p_gossipsub_peers_score_appScore, "Detailed gossipsub scoring metric", labels = ["agent"]) declareGauge(libp2p_gossipsub_peers_score_behaviourPenalty, "Detailed gossipsub scoring metric", labels = ["agent"]) declareGauge(libp2p_gossipsub_peers_score_colocationFactor, "Detailed gossipsub scoring metric", labels = ["agent"]) -declareCounter(libp2p_gossipsub_peers_badTrafficScorePeerDisconnections, "The number of peer disconnections by gossipsub because of bad traffic", labels = ["agent"]) +declarePublicCounter(libp2p_gossipsub_peers_rate_limit_disconnections, "The number of peer disconnections by gossipsub because of rate limit", labels = ["agent"]) proc init*(_: type[TopicParams]): TopicParams = TopicParams( @@ -242,7 +242,7 @@ proc scoringHeartbeat*(g: GossipSub) {.async.} = proc punishInvalidMessage*(g: GossipSub, peer: PubSubPeer, msg: Message) = if not peer.uselessAppBytesRate.tryConsume(len(msg.data)): - discard g.disconnectPeer(peer) + libp2p_gossipsub_peers_rate_limit_disconnections.inc(labelValues = [peer.getAgent()]) # let's just measure at the beginning for test purposes. # discard g.disconnectPeer(peer) for tt in msg.topicIds: let t = tt From b2f21099353deee2158eef585c3b11dbe58abfcf Mon Sep 17 00:00:00 2001 From: Diego Date: Wed, 2 Aug 2023 18:59:02 +0200 Subject: [PATCH 17/26] Simplify data size calculation --- libp2p/protocols/pubsub/gossipsub.nim | 10 +++---- libp2p/protocols/pubsub/rpc/messages.nim | 31 --------------------- tests/pubsub/testmessage.nim | 34 ------------------------ 3 files changed, 4 insertions(+), 71 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 05e80a7c1f..261843a411 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -377,6 +377,9 @@ proc validateAndRelay(g: GossipSub, except CatchableError as exc: info "validateAndRelay failed", msg=exc.msg +proc dataAndTopicsIdSize(msgs: seq[Message]): int = + msgs.mapIt(it.data.len + it.topicIds.mapIt(it.len).foldl(a + b, 0)).foldl(a + b, 0) + proc rateLimit*(g: GossipSub, peer: PubSubPeer, rpcMsgOpt: Opt[RPCMsg], msgSize: int) {.raises:[PeerRateLimitError].} = # In this way we count even ignored fields by protobuf var rmsg = rpcMsgOpt.valueOr: @@ -388,12 +391,7 @@ proc rateLimit*(g: GossipSub, peer: PubSubPeer, rpcMsgOpt: Opt[RPCMsg], msgSize: if g.verifySignature: byteSize(rmsg.messages) else: - rmsg.messages.mapIt(it.data.len).foldl(a + b, 0) + - rmsg.messages - .mapIt( - it.topicIds.mapIt(it.len).foldl(a + b, 0) - ) - .foldl(a + b, 0) + dataAndTopicsIdSize(rmsg.messages) var uselessAppBytesNum = msgSize - usefulMsgBytesNum rmsg.control.withValue(control): diff --git a/libp2p/protocols/pubsub/rpc/messages.nim b/libp2p/protocols/pubsub/rpc/messages.nim index cfa070ed05..d4cbf85dad 100644 --- a/libp2p/protocols/pubsub/rpc/messages.nim +++ b/libp2p/protocols/pubsub/rpc/messages.nim @@ -145,34 +145,3 @@ proc byteSize*(iwant: seq[ControlIWant]): int = for msgId in item.messageIds: total += msgId.len return total - -proc byteSize*(msg: RPCMsg): int = - var total = 0 - - for sub in msg.subscriptions: - total += sub.topic.len - total += sizeof(sub.subscribe) - - for msg in msg.messages: - total += byteSize(msg) - - if msg.control.isSome: - let ctrl = msg.control.get() - total += byteSize(ctrl.ihave) - - total += byteSize(ctrl.iwant) - - for item in ctrl.graft: - total += item.topicId.len - - for item in ctrl.prune: - total += item.topicId.len - total += sizeof(item.backoff) - for peer in item.peers: - total += peer.peerId.len - total += peer.signedPeerRecord.len * sizeof(byte) - - total += msg.ping.len * sizeof(byte) - total += msg.pong.len * sizeof(byte) - - return total diff --git a/tests/pubsub/testmessage.nim b/tests/pubsub/testmessage.nim index 99310e7fbe..ffc28698f6 100644 --- a/tests/pubsub/testmessage.nim +++ b/tests/pubsub/testmessage.nim @@ -83,37 +83,3 @@ suite "Message": key: @[1'u8], # 1 byte topicIds: @["abc", "defgh"] # 3 + 5 = 8 bytes ) - - check byteSize(msg) == 3 + 1 + 1 + 8 # Total: 13 bytes - - test "byteSize for RPCMsg": - var msg = RPCMsg( - subscriptions: @[ - SubOpts(topic: "abc", subscribe: true), - SubOpts(topic: "def", subscribe: false) - ], # 3 + 3 + 2 * sizeof(bool) bytes - messages: @[ - Message(fromPeer: PeerId(data: @[]), data: @[1'u8, 2, 3], seqno: @[1'u8], signature: @[], key: @[1'u8], topicIds: @["abc", "defgh"]), - Message(fromPeer: PeerId(data: @[]), data: @[], seqno: @[], signature: @[], key: @[], topicIds: @["abc"]) - ], # byteSize: 13 + 3 = 16 bytes - control: some(ControlMessage( - ihave: @[ - ControlIHave(topicId: "ghi", messageIds: @[@[1'u8, 2, 3]]) - ], # 3 + 3 bytes - iwant: @[ - ControlIWant(messageIds: @[@[1'u8, 2]]) - ], # 2 bytes - graft: @[ - ControlGraft(topicId: "jkl") - ], # 3 bytes - prune: @[ - ControlPrune(topicId: "mno", peers: @[PeerInfoMsg(peerId: PeerId(data: @[]), signedPeerRecord: @[])], backoff: 1) - ] # 3 + sizeof(uint64) bytes - )), - ping: @[], # Empty seq[byte] - pong: @[] # Empty seq[byte] - ) - - let boolSize = sizeof(bool) - let uint64Size = sizeof(uint64) - check byteSize(msg) == (3 + 3 + 2 * boolSize) + 16 + (3 + 3 + 2 + 3 + 3 + uint64Size) \ No newline at end of file From 63fa57f77986c4657634cb698579114d2eb602e1 Mon Sep 17 00:00:00 2001 From: Diego Date: Thu, 10 Aug 2023 16:16:21 +0200 Subject: [PATCH 18/26] improvements --- libp2p/protocols/pubsub/floodsub.nim | 2 +- libp2p/protocols/pubsub/gossipsub.nim | 24 ++++++++++++------- libp2p/protocols/pubsub/gossipsub/scoring.nim | 10 ++++++-- libp2p/protocols/pubsub/pubsubpeer.nim | 2 +- 4 files changed, 26 insertions(+), 12 deletions(-) diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index 19b548db25..bf4d70c0ea 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -101,7 +101,7 @@ method rpcHandler*(f: FloodSub, debug "failed to decode msg from peer", peer, err = error raise newException(CatchableError, "") - debug "decoded msg from peer", peer, msg = rpcMsg.shortLog + trace "decoded msg from peer", peer, msg = rpcMsg.shortLog # trigger hooks peer.recvObservers(rpcMsg) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 261843a411..c218928d70 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -380,12 +380,18 @@ proc validateAndRelay(g: GossipSub, proc dataAndTopicsIdSize(msgs: seq[Message]): int = msgs.mapIt(it.data.len + it.topicIds.mapIt(it.len).foldl(a + b, 0)).foldl(a + b, 0) -proc rateLimit*(g: GossipSub, peer: PubSubPeer, rpcMsgOpt: Opt[RPCMsg], msgSize: int) {.raises:[PeerRateLimitError].} = +proc rateLimit*(g: GossipSub, peer: PubSubPeer, rpcMsgOpt: Opt[RPCMsg], msgSize: int) {.raises:[PeerRateLimitError, CatchableError].} = # In this way we count even ignored fields by protobuf + var rmsg = rpcMsgOpt.valueOr: if not peer.uselessAppBytesRate.tryConsume(msgSize): - libp2p_gossipsub_peers_rate_limit_disconnections.inc(labelValues = [peer.getAgent()]) # let's just measure at the beginning for test purposes. # discard g.disconnectPeer(peer) - raise newException(PeerRateLimitError, "Peer sent too much useless data that couldn't be decoded") + libp2p_gossipsub_peers_rate_limit_disconnections.inc(labelValues = [peer.getAgent()]) # let's just measure at the beginning for test purposes. + debug "Peer sent a msg that couldn't be decoded and it's above rate limit", peer, uselessAppBytesNum = msgSize + # discard g.disconnectPeer(peer) + # debug "Peer disconnected", peer, uselessAppBytesNum = msgSize + # raise newException(PeerRateLimitError, "Peer sent a msg that couldn't be decoded and it's above rate limit") + + raise newException(CatchableError, "Peer msg couldn't be decoded") let usefulMsgBytesNum = if g.verifySignature: @@ -395,12 +401,14 @@ proc rateLimit*(g: GossipSub, peer: PubSubPeer, rpcMsgOpt: Opt[RPCMsg], msgSize: var uselessAppBytesNum = msgSize - usefulMsgBytesNum rmsg.control.withValue(control): - uselessAppBytesNum -= byteSize(control.ihave) - byteSize(control.iwant) + uselessAppBytesNum -= (byteSize(control.ihave) + byteSize(control.iwant)) if not peer.uselessAppBytesRate.tryConsume(uselessAppBytesNum): - debug "Peer sent too much useless application data", peer, msgSize, uselessAppBytesNum - libp2p_gossipsub_peers_rate_limit_disconnections.inc(labelValues = [peer.getAgent()]) # let's just measure at the beginning for test purposes. # discard g.disconnectPeer(peer) - raise newException(PeerRateLimitError, "Peer sent too much useless application data.") + libp2p_gossipsub_peers_rate_limit_disconnections.inc(labelValues = [peer.getAgent()]) # let's just measure at the beginning for test purposes. + debug "Peer sent too much useless application data and it's above rate limit.", peer, msgSize, uselessAppBytesNum + # discard g.disconnectPeer(peer) + # debug "Peer disconnected", peer, msgSize, uselessAppBytesNum + # raise newException(PeerRateLimitError, "Peer sent too much useless application data and it's above rate limit.") method rpcHandler*(g: GossipSub, peer: PubSubPeer, @@ -412,7 +420,7 @@ method rpcHandler*(g: GossipSub, rateLimit(g, peer, Opt.none(RPCMsg), msgSize) return - debug "decoded msg from peer", peer, msg = rpcMsg.shortLog + trace "decoded msg from peer", peer, msg = rpcMsg.shortLog rateLimit(g, peer, Opt.some(rpcMsg), msgSize) # trigger hooks diff --git a/libp2p/protocols/pubsub/gossipsub/scoring.nim b/libp2p/protocols/pubsub/gossipsub/scoring.nim index 5465faa4aa..8a7c8a369e 100644 --- a/libp2p/protocols/pubsub/gossipsub/scoring.nim +++ b/libp2p/protocols/pubsub/gossipsub/scoring.nim @@ -241,8 +241,14 @@ proc scoringHeartbeat*(g: GossipSub) {.async.} = g.updateScores() proc punishInvalidMessage*(g: GossipSub, peer: PubSubPeer, msg: Message) = - if not peer.uselessAppBytesRate.tryConsume(len(msg.data)): - libp2p_gossipsub_peers_rate_limit_disconnections.inc(labelValues = [peer.getAgent()]) # let's just measure at the beginning for test purposes. # discard g.disconnectPeer(peer) + let uselessAppBytesNum = msg.data.len + if not peer.uselessAppBytesRate.tryConsume(uselessAppBytesNum): + debug "Peer sent invalid message and it's above rate limit", peer, uselessAppBytesNum + libp2p_gossipsub_peers_rate_limit_disconnections.inc(labelValues = [peer.getAgent()]) # let's just measure at the beginning for test purposes. + # discard g.disconnectPeer(peer) + # debug "Peer disconnected", peer, uselessAppBytesNum + # raise newException(PeerRateLimitError, "Peer sent invalid message and it's above rate limit") + for tt in msg.topicIds: let t = tt diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 984ff05b64..92dc983b5a 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -313,7 +313,7 @@ proc new*( peerId: peerId, connectedFut: newFuture[void](), maxMessageSize: maxMessageSize, - uselessAppBytesRate: TokenBucket.new(1024, 1.seconds) + uselessAppBytesRate: TokenBucket.new(1024, 500.milliseconds) ) result.sentIHaves.addFirst(default(HashSet[MessageId])) result.heDontWants.addFirst(default(HashSet[MessageId])) From 855b7704120581a9d231c64bc97d0d34abe37c22 Mon Sep 17 00:00:00 2001 From: Diego Date: Wed, 16 Aug 2023 13:03:26 +0200 Subject: [PATCH 19/26] Use params for rate limit --- libp2p/protocols/pubsub/gossipsub.nim | 38 ++++++++++++------- libp2p/protocols/pubsub/gossipsub/scoring.nim | 13 ++++--- libp2p/protocols/pubsub/gossipsub/types.nim | 2 + libp2p/protocols/pubsub/pubsub.nim | 5 ++- libp2p/protocols/pubsub/pubsubpeer.nim | 7 ++-- tests/pubsub/testgossipinternal.nim | 2 +- 6 files changed, 42 insertions(+), 25 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index c218928d70..ee610ff62e 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -79,7 +79,8 @@ proc init*(_: type[GossipSubParams]): GossipSubParams = disconnectBadPeers: false, enablePX: false, bandwidthEstimatebps: 100_000_000, # 100 Mbps or 12.5 MBps - iwantTimeout: 3 * GossipSubHeartbeatInterval + iwantTimeout: 3 * GossipSubHeartbeatInterval, + uselessAppBytesRate: (1024, 500.milliseconds) ) proc validateParameters*(parameters: GossipSubParams): Result[void, cstring] = @@ -384,12 +385,13 @@ proc rateLimit*(g: GossipSub, peer: PubSubPeer, rpcMsgOpt: Opt[RPCMsg], msgSize: # In this way we count even ignored fields by protobuf var rmsg = rpcMsgOpt.valueOr: - if not peer.uselessAppBytesRate.tryConsume(msgSize): - libp2p_gossipsub_peers_rate_limit_disconnections.inc(labelValues = [peer.getAgent()]) # let's just measure at the beginning for test purposes. - debug "Peer sent a msg that couldn't be decoded and it's above rate limit", peer, uselessAppBytesNum = msgSize - # discard g.disconnectPeer(peer) - # debug "Peer disconnected", peer, uselessAppBytesNum = msgSize - # raise newException(PeerRateLimitError, "Peer sent a msg that couldn't be decoded and it's above rate limit") + peer.uselessAppBytesRateOpt.withValue(uselessAppBytesRate): + if not uselessAppBytesRate.tryConsume(msgSize): + libp2p_gossipsub_peers_rate_limit_disconnections.inc(labelValues = [peer.getAgent()]) # let's just measure at the beginning for test purposes. + debug "Peer sent a msg that couldn't be decoded and it's above rate limit", peer, uselessAppBytesNum = msgSize + # discard g.disconnectPeer(peer) + # debug "Peer disconnected", peer, uselessAppBytesNum = msgSize + # raise newException(PeerRateLimitError, "Peer sent a msg that couldn't be decoded and it's above rate limit") raise newException(CatchableError, "Peer msg couldn't be decoded") @@ -403,12 +405,13 @@ proc rateLimit*(g: GossipSub, peer: PubSubPeer, rpcMsgOpt: Opt[RPCMsg], msgSize: rmsg.control.withValue(control): uselessAppBytesNum -= (byteSize(control.ihave) + byteSize(control.iwant)) - if not peer.uselessAppBytesRate.tryConsume(uselessAppBytesNum): - libp2p_gossipsub_peers_rate_limit_disconnections.inc(labelValues = [peer.getAgent()]) # let's just measure at the beginning for test purposes. - debug "Peer sent too much useless application data and it's above rate limit.", peer, msgSize, uselessAppBytesNum - # discard g.disconnectPeer(peer) - # debug "Peer disconnected", peer, msgSize, uselessAppBytesNum - # raise newException(PeerRateLimitError, "Peer sent too much useless application data and it's above rate limit.") + peer.uselessAppBytesRateOpt.withValue(uselessAppBytesRate): + if not uselessAppBytesRate.tryConsume(msgSize): + libp2p_gossipsub_peers_rate_limit_disconnections.inc(labelValues = [peer.getAgent()]) # let's just measure at the beginning for test purposes. + debug "Peer sent too much useless application data and it's above rate limit.", peer, msgSize, uselessAppBytesNum + # discard g.disconnectPeer(peer) + # debug "Peer disconnected", peer, msgSize, uselessAppBytesNum + # raise newException(PeerRateLimitError, "Peer sent too much useless application data and it's above rate limit.") method rpcHandler*(g: GossipSub, peer: PubSubPeer, @@ -718,3 +721,12 @@ method initPubSub*(g: GossipSub) # init gossip stuff g.mcache = MCache.init(g.parameters.historyGossip, g.parameters.historyLength) + +method getOrCreatePeer*( + g: GossipSub, + peerId: PeerId, + protos: seq[string]): PubSubPeer = + + let peer = procCall PubSub(g).getOrCreatePeer(peerId, protos) + peer.uselessAppBytesRateOpt = Opt.some(TokenBucket.new(g.parameters.uselessAppBytesRate.bytes, g.parameters.uselessAppBytesRate.interval)) + return peer \ No newline at end of file diff --git a/libp2p/protocols/pubsub/gossipsub/scoring.nim b/libp2p/protocols/pubsub/gossipsub/scoring.nim index 8a7c8a369e..ac03590d8a 100644 --- a/libp2p/protocols/pubsub/gossipsub/scoring.nim +++ b/libp2p/protocols/pubsub/gossipsub/scoring.nim @@ -242,12 +242,13 @@ proc scoringHeartbeat*(g: GossipSub) {.async.} = proc punishInvalidMessage*(g: GossipSub, peer: PubSubPeer, msg: Message) = let uselessAppBytesNum = msg.data.len - if not peer.uselessAppBytesRate.tryConsume(uselessAppBytesNum): - debug "Peer sent invalid message and it's above rate limit", peer, uselessAppBytesNum - libp2p_gossipsub_peers_rate_limit_disconnections.inc(labelValues = [peer.getAgent()]) # let's just measure at the beginning for test purposes. - # discard g.disconnectPeer(peer) - # debug "Peer disconnected", peer, uselessAppBytesNum - # raise newException(PeerRateLimitError, "Peer sent invalid message and it's above rate limit") + peer.uselessAppBytesRateOpt.withValue(uselessAppBytesRate): + if not uselessAppBytesRate.tryConsume(uselessAppBytesNum): + debug "Peer sent invalid message and it's above rate limit", peer, uselessAppBytesNum + libp2p_gossipsub_peers_rate_limit_disconnections.inc(labelValues = [peer.getAgent()]) # let's just measure at the beginning for test purposes. + # discard g.disconnectPeer(peer) + # debug "Peer disconnected", peer, uselessAppBytesNum + # raise newException(PeerRateLimitError, "Peer sent invalid message and it's above rate limit") for tt in msg.topicIds: diff --git a/libp2p/protocols/pubsub/gossipsub/types.nim b/libp2p/protocols/pubsub/gossipsub/types.nim index 25a9e636a8..b27fe76d5a 100644 --- a/libp2p/protocols/pubsub/gossipsub/types.nim +++ b/libp2p/protocols/pubsub/gossipsub/types.nim @@ -145,6 +145,8 @@ type bandwidthEstimatebps*: int # This is currently used only for limting flood publishing. 0 disables flood-limiting completely iwantTimeout*: Duration + uselessAppBytesRate*: tuple[bytes: int, interval: Duration] + BackoffTable* = Table[string, Table[PeerId, Moment]] ValidationSeenTable* = Table[MessageId, HashSet[PubSubPeer]] diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 3d97436218..ef4d680024 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -279,10 +279,11 @@ method onPubSubPeerEvent*(p: PubSub, peer: PubSubPeer, event: PubSubPeerEvent) { of PubSubPeerEventKind.Disconnected: discard -proc getOrCreatePeer*( +method getOrCreatePeer*( p: PubSub, peerId: PeerId, - protos: seq[string]): PubSubPeer = + protos: seq[string]): PubSubPeer {.base, gcsafe.} = + p.peers.withValue(peerId, peer): return peer[] diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 92dc983b5a..646c907eb8 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -69,7 +69,7 @@ type maxMessageSize: int appScore*: float64 # application specific score behaviourPenalty*: float64 # the eventual penalty score - uselessAppBytesRate*: TokenBucket + uselessAppBytesRateOpt*: Opt[TokenBucket] RPCHandler* = proc(peer: PubSubPeer, data: seq[byte]): Future[void] {.gcsafe, raises: [].} @@ -304,7 +304,8 @@ proc new*( getConn: GetConn, onEvent: OnEvent, codec: string, - maxMessageSize: int): T = + maxMessageSize: int, + uselessAppBytesRateOpt: Opt[TokenBucket] = Opt.none(TokenBucket)): T = result = T( getConn: getConn, @@ -313,7 +314,7 @@ proc new*( peerId: peerId, connectedFut: newFuture[void](), maxMessageSize: maxMessageSize, - uselessAppBytesRate: TokenBucket.new(1024, 500.milliseconds) + uselessAppBytesRateOpt: uselessAppBytesRateOpt ) result.sentIHaves.addFirst(default(HashSet[MessageId])) result.heDontWants.addFirst(default(HashSet[MessageId])) diff --git a/tests/pubsub/testgossipinternal.nim b/tests/pubsub/testgossipinternal.nim index 00b5170414..7f9a859209 100644 --- a/tests/pubsub/testgossipinternal.nim +++ b/tests/pubsub/testgossipinternal.nim @@ -23,7 +23,7 @@ proc getPubSubPeer(p: TestGossipSub, peerId: PeerId): PubSubPeer = proc getConn(): Future[Connection] = p.switch.dial(peerId, GossipSubCodec) - let pubSubPeer = PubSubPeer.new(peerId, getConn, nil, GossipSubCodec, 1024 * 1024) + let pubSubPeer = PubSubPeer.new(peerId, getConn, nil, GossipSubCodec, 1024 * 1024, Opt.some(TokenBucket.new(1024, 500.milliseconds))) debug "created new pubsub peer", peerId p.peers[peerId] = pubSubPeer From 1000bf0bcf5eeae6f6c8cf4a6fbe0a6d2eaad1f1 Mon Sep 17 00:00:00 2001 From: Diego Date: Wed, 16 Aug 2023 18:30:54 +0200 Subject: [PATCH 20/26] Fixes after code review --- libp2p/protocols/pubsub/floodsub.nim | 2 +- libp2p/protocols/pubsub/gossipsub.nim | 8 ++++---- tests/pubsub/testmessage.nim | 2 ++ 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index bf4d70c0ea..819161c0c7 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -96,7 +96,7 @@ method unsubscribePeer*(f: FloodSub, peer: PeerId) = method rpcHandler*(f: FloodSub, peer: PubSubPeer, data: seq[byte]) {.async.} = - let msgSize = len(data) + var rpcMsg = decodeRpcMsg(data).valueOr: debug "failed to decode msg from peer", peer, err = error raise newException(CatchableError, "") diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index ee610ff62e..248e6e1030 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -401,7 +401,7 @@ proc rateLimit*(g: GossipSub, peer: PubSubPeer, rpcMsgOpt: Opt[RPCMsg], msgSize: else: dataAndTopicsIdSize(rmsg.messages) - var uselessAppBytesNum = msgSize - usefulMsgBytesNum + var uselessAppBytesNum = msgSize - usefulMsgBytesNum rmsg.control.withValue(control): uselessAppBytesNum -= (byteSize(control.ihave) + byteSize(control.iwant)) @@ -416,9 +416,9 @@ proc rateLimit*(g: GossipSub, peer: PubSubPeer, rpcMsgOpt: Opt[RPCMsg], msgSize: method rpcHandler*(g: GossipSub, peer: PubSubPeer, data: seq[byte]) {.async.} = + let msgSize = data.len - let rpcMsgResult = decodeRpcMsg(data) - var rpcMsg = rpcMsgResult.valueOr: + var rpcMsg = decodeRpcMsg(data).valueOr: debug "failed to decode msg from peer", peer, err = error rateLimit(g, peer, Opt.none(RPCMsg), msgSize) return @@ -729,4 +729,4 @@ method getOrCreatePeer*( let peer = procCall PubSub(g).getOrCreatePeer(peerId, protos) peer.uselessAppBytesRateOpt = Opt.some(TokenBucket.new(g.parameters.uselessAppBytesRate.bytes, g.parameters.uselessAppBytesRate.interval)) - return peer \ No newline at end of file + return peer diff --git a/tests/pubsub/testmessage.nim b/tests/pubsub/testmessage.nim index ffc28698f6..f4f062fa33 100644 --- a/tests/pubsub/testmessage.nim +++ b/tests/pubsub/testmessage.nim @@ -83,3 +83,5 @@ suite "Message": key: @[1'u8], # 1 byte topicIds: @["abc", "defgh"] # 3 + 5 = 8 bytes ) + + check byteSize(msg) == 3 + 1 + 1 + 8 # Total: 13 bytes \ No newline at end of file From 3ffbb1c59c3b80cad1478dac6c038a5c6f6e0caa Mon Sep 17 00:00:00 2001 From: Diego Date: Mon, 28 Aug 2023 19:43:52 +0200 Subject: [PATCH 21/26] Fix bug --- libp2p/protocols/pubsub/gossipsub.nim | 6 +-- libp2p/protocols/pubsub/rpc/messages.nim | 54 ++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 3 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 248e6e1030..f6ab234fe7 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -80,7 +80,7 @@ proc init*(_: type[GossipSubParams]): GossipSubParams = enablePX: false, bandwidthEstimatebps: 100_000_000, # 100 Mbps or 12.5 MBps iwantTimeout: 3 * GossipSubHeartbeatInterval, - uselessAppBytesRate: (1024, 500.milliseconds) + uselessAppBytesRate: (10000, 500.milliseconds) ) proc validateParameters*(parameters: GossipSubParams): Result[void, cstring] = @@ -406,9 +406,9 @@ proc rateLimit*(g: GossipSub, peer: PubSubPeer, rpcMsgOpt: Opt[RPCMsg], msgSize: uselessAppBytesNum -= (byteSize(control.ihave) + byteSize(control.iwant)) peer.uselessAppBytesRateOpt.withValue(uselessAppBytesRate): - if not uselessAppBytesRate.tryConsume(msgSize): + if not uselessAppBytesRate.tryConsume(uselessAppBytesNum): libp2p_gossipsub_peers_rate_limit_disconnections.inc(labelValues = [peer.getAgent()]) # let's just measure at the beginning for test purposes. - debug "Peer sent too much useless application data and it's above rate limit.", peer, msgSize, uselessAppBytesNum + debug "Peer sent too much useless application data and it's above rate limit.", peer, msgSize, uselessAppBytesNum, rmsg # discard g.disconnectPeer(peer) # debug "Peer disconnected", peer, msgSize, uselessAppBytesNum # raise newException(PeerRateLimitError, "Peer sent too much useless application data and it's above rate limit.") diff --git a/libp2p/protocols/pubsub/rpc/messages.nim b/libp2p/protocols/pubsub/rpc/messages.nim index d4cbf85dad..f8f17ca83a 100644 --- a/libp2p/protocols/pubsub/rpc/messages.nim +++ b/libp2p/protocols/pubsub/rpc/messages.nim @@ -145,3 +145,57 @@ proc byteSize*(iwant: seq[ControlIWant]): int = for msgId in item.messageIds: total += msgId.len return total + +proc `$` (msg: PeerInfoMsg): string = + try: + return "PeerInfoMsg(peerId: " & $msg.peerId & ", signedPeerRecord: " & $msg.signedPeerRecord & ")" + except Exception: + return "PeerInfoMsg: exception raised" + +proc `$` (msg: SubOpts): string = + try: + return "SubOpts(subscribe: " & $msg.subscribe & ", topic: " & $msg.topic & ")" + except Exception: + return "SubOpts: exception raised" + +proc `$` (msg: Message): string = + try: + return "Message(fromPeer: " & $msg.fromPeer & ", data: " & $msg.data & ", seqno: " & $msg.seqno & ", topicIds: " & $msg.topicIds & ", signature: " & $msg.signature & ", key: " & $msg.key & ")" + except Exception: + return "Message: exception raised" + +proc `$` (msg: ControlIHave): string = + try: + return "ControlIHave(topicId: " & $msg.topicId & ", messageIds: " & $msg.messageIds & ")" + except Exception: + return "ControlIHave: exception raised" + +proc `$` (msg: ControlIWant): string = + try: + return "ControlIWant(messageIds: " & $msg.messageIds & ")" + except Exception: + return "ControlIWant: exception raised" + +proc `$` (msg: ControlGraft): string = + try: + return "ControlGraft(topicId: " & $msg.topicId & ")" + except Exception: + return "ControlGraft: exception raised" + +proc `$` (msg: ControlPrune): string = + try: + return "ControlPrune(topicId: " & $msg.topicId & ", peers: " & $msg.peers & ", backoff: " & $msg.backoff & ")" + except Exception: + return "ControlPrune: exception raised" + +proc `$` (msg: ControlMessage): string = + try: + return "ControlMessage(ihave: " & $msg.ihave & ", iwant: " & $msg.iwant & ", graft: " & $msg.graft & ", prune: " & $msg.prune & ", idontwant: " & $msg.idontwant & ")" + except Exception: + return "ControlMessage: exception raised" + +proc `$` (msg: RPCMsg): string = + try: + return "RPCMsg(subscriptions: " & $msg.subscriptions & ", messages: " & $msg.messages & ", control: " & $msg.control & ", ping: " & $msg.ping & ", pong: " & $msg.pong & ")" + except Exception: + return "RPCMsg: exception raised" From c7884d5f85cfcf7a4d8cbf01725363de864db8e0 Mon Sep 17 00:00:00 2001 From: Diego Date: Wed, 6 Sep 2023 18:30:56 +0200 Subject: [PATCH 22/26] Make rate limit disable by default if the config isn't provided --- libp2p/protocols/pubsub/gossipsub.nim | 5 +++-- libp2p/protocols/pubsub/gossipsub/types.nim | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index f6ab234fe7..26e9e51435 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -80,7 +80,7 @@ proc init*(_: type[GossipSubParams]): GossipSubParams = enablePX: false, bandwidthEstimatebps: 100_000_000, # 100 Mbps or 12.5 MBps iwantTimeout: 3 * GossipSubHeartbeatInterval, - uselessAppBytesRate: (10000, 500.milliseconds) + uselessAppBytesRateConfOpt: Opt.none(tuple[bytes: int, interval: Duration]) ) proc validateParameters*(parameters: GossipSubParams): Result[void, cstring] = @@ -728,5 +728,6 @@ method getOrCreatePeer*( protos: seq[string]): PubSubPeer = let peer = procCall PubSub(g).getOrCreatePeer(peerId, protos) - peer.uselessAppBytesRateOpt = Opt.some(TokenBucket.new(g.parameters.uselessAppBytesRate.bytes, g.parameters.uselessAppBytesRate.interval)) + g.parameters.uselessAppBytesRateConfOpt.withValue(uselessAppBytesRateConf): + peer.uselessAppBytesRateOpt = Opt.some(TokenBucket.new(uselessAppBytesRateConf.bytes, uselessAppBytesRateConf.interval)) return peer diff --git a/libp2p/protocols/pubsub/gossipsub/types.nim b/libp2p/protocols/pubsub/gossipsub/types.nim index b27fe76d5a..187e1dc5dc 100644 --- a/libp2p/protocols/pubsub/gossipsub/types.nim +++ b/libp2p/protocols/pubsub/gossipsub/types.nim @@ -145,7 +145,7 @@ type bandwidthEstimatebps*: int # This is currently used only for limting flood publishing. 0 disables flood-limiting completely iwantTimeout*: Duration - uselessAppBytesRate*: tuple[bytes: int, interval: Duration] + uselessAppBytesRateConfOpt*: Opt[tuple[bytes: int, interval: Duration]] BackoffTable* = Table[string, Table[PeerId, Moment]] ValidationSeenTable* = Table[MessageId, HashSet[PubSubPeer]] From f443a26f4e3184a1feb8d49f74bea7f2ed10fb2f Mon Sep 17 00:00:00 2001 From: Diego Date: Fri, 8 Sep 2023 16:55:35 +0200 Subject: [PATCH 23/26] Fix test --- tests/pubsub/testgossipinternal.nim | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/pubsub/testgossipinternal.nim b/tests/pubsub/testgossipinternal.nim index 7f9a859209..a764c8e52f 100644 --- a/tests/pubsub/testgossipinternal.nim +++ b/tests/pubsub/testgossipinternal.nim @@ -734,7 +734,7 @@ suite "GossipSub internal": var iwantCount = 0 - proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = + proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} = check false proc handler2(topic: string, data: seq[byte]) {.async.} = discard @@ -780,7 +780,7 @@ suite "GossipSub internal": data: actualMessageData )] ) - await gossipSub.rpcHandler(firstPeer, rpcMsg) + await gossipSub.rpcHandler(firstPeer, encodeRpcMsg(rpcMsg, false)) check: not gossipSub.outstandingIWANTs.contains(ihaveMessageId.toBytes()) @@ -793,7 +793,7 @@ suite "GossipSub internal": gossipSub.parameters.iwantTimeout = 10.milliseconds await gossipSub.start() - proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = discard + proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} = discard proc handler2(topic: string, data: seq[byte]) {.async.} = discard let topic = "foobar" From 0f1def6bcb96d0187c44f23e64021abd28f9fe16 Mon Sep 17 00:00:00 2001 From: Diego Date: Fri, 8 Sep 2023 17:02:41 +0200 Subject: [PATCH 24/26] Improve naming --- libp2p/protocols/pubsub/gossipsub.nim | 14 +++++++------- libp2p/protocols/pubsub/gossipsub/scoring.nim | 4 ++-- libp2p/protocols/pubsub/gossipsub/types.nim | 2 +- libp2p/protocols/pubsub/pubsubpeer.nim | 6 +++--- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 26e9e51435..9251406ce6 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -80,7 +80,7 @@ proc init*(_: type[GossipSubParams]): GossipSubParams = enablePX: false, bandwidthEstimatebps: 100_000_000, # 100 Mbps or 12.5 MBps iwantTimeout: 3 * GossipSubHeartbeatInterval, - uselessAppBytesRateConfOpt: Opt.none(tuple[bytes: int, interval: Duration]) + overheadRateLimitConfOpt: Opt.none(tuple[bytes: int, interval: Duration]) ) proc validateParameters*(parameters: GossipSubParams): Result[void, cstring] = @@ -385,8 +385,8 @@ proc rateLimit*(g: GossipSub, peer: PubSubPeer, rpcMsgOpt: Opt[RPCMsg], msgSize: # In this way we count even ignored fields by protobuf var rmsg = rpcMsgOpt.valueOr: - peer.uselessAppBytesRateOpt.withValue(uselessAppBytesRate): - if not uselessAppBytesRate.tryConsume(msgSize): + peer.overheadRateLimitOpt.withValue(overheadRateLimit): + if not overheadRateLimit.tryConsume(msgSize): libp2p_gossipsub_peers_rate_limit_disconnections.inc(labelValues = [peer.getAgent()]) # let's just measure at the beginning for test purposes. debug "Peer sent a msg that couldn't be decoded and it's above rate limit", peer, uselessAppBytesNum = msgSize # discard g.disconnectPeer(peer) @@ -405,8 +405,8 @@ proc rateLimit*(g: GossipSub, peer: PubSubPeer, rpcMsgOpt: Opt[RPCMsg], msgSize: rmsg.control.withValue(control): uselessAppBytesNum -= (byteSize(control.ihave) + byteSize(control.iwant)) - peer.uselessAppBytesRateOpt.withValue(uselessAppBytesRate): - if not uselessAppBytesRate.tryConsume(uselessAppBytesNum): + peer.overheadRateLimitOpt.withValue(overheadRateLimit): + if not overheadRateLimit.tryConsume(uselessAppBytesNum): libp2p_gossipsub_peers_rate_limit_disconnections.inc(labelValues = [peer.getAgent()]) # let's just measure at the beginning for test purposes. debug "Peer sent too much useless application data and it's above rate limit.", peer, msgSize, uselessAppBytesNum, rmsg # discard g.disconnectPeer(peer) @@ -728,6 +728,6 @@ method getOrCreatePeer*( protos: seq[string]): PubSubPeer = let peer = procCall PubSub(g).getOrCreatePeer(peerId, protos) - g.parameters.uselessAppBytesRateConfOpt.withValue(uselessAppBytesRateConf): - peer.uselessAppBytesRateOpt = Opt.some(TokenBucket.new(uselessAppBytesRateConf.bytes, uselessAppBytesRateConf.interval)) + g.parameters.overheadRateLimitConfOpt.withValue(overheadRateLimitConf): + peer.overheadRateLimitOpt = Opt.some(TokenBucket.new(overheadRateLimitConf.bytes, overheadRateLimitConf.interval)) return peer diff --git a/libp2p/protocols/pubsub/gossipsub/scoring.nim b/libp2p/protocols/pubsub/gossipsub/scoring.nim index ac03590d8a..f1c1e923f3 100644 --- a/libp2p/protocols/pubsub/gossipsub/scoring.nim +++ b/libp2p/protocols/pubsub/gossipsub/scoring.nim @@ -242,8 +242,8 @@ proc scoringHeartbeat*(g: GossipSub) {.async.} = proc punishInvalidMessage*(g: GossipSub, peer: PubSubPeer, msg: Message) = let uselessAppBytesNum = msg.data.len - peer.uselessAppBytesRateOpt.withValue(uselessAppBytesRate): - if not uselessAppBytesRate.tryConsume(uselessAppBytesNum): + peer.overheadRateLimitOpt.withValue(overheadRateLimit): + if not overheadRateLimit.tryConsume(uselessAppBytesNum): debug "Peer sent invalid message and it's above rate limit", peer, uselessAppBytesNum libp2p_gossipsub_peers_rate_limit_disconnections.inc(labelValues = [peer.getAgent()]) # let's just measure at the beginning for test purposes. # discard g.disconnectPeer(peer) diff --git a/libp2p/protocols/pubsub/gossipsub/types.nim b/libp2p/protocols/pubsub/gossipsub/types.nim index 187e1dc5dc..61f6a0d6e5 100644 --- a/libp2p/protocols/pubsub/gossipsub/types.nim +++ b/libp2p/protocols/pubsub/gossipsub/types.nim @@ -145,7 +145,7 @@ type bandwidthEstimatebps*: int # This is currently used only for limting flood publishing. 0 disables flood-limiting completely iwantTimeout*: Duration - uselessAppBytesRateConfOpt*: Opt[tuple[bytes: int, interval: Duration]] + overheadRateLimitConfOpt*: Opt[tuple[bytes: int, interval: Duration]] BackoffTable* = Table[string, Table[PeerId, Moment]] ValidationSeenTable* = Table[MessageId, HashSet[PubSubPeer]] diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 646c907eb8..464131ddf0 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -69,7 +69,7 @@ type maxMessageSize: int appScore*: float64 # application specific score behaviourPenalty*: float64 # the eventual penalty score - uselessAppBytesRateOpt*: Opt[TokenBucket] + overheadRateLimitOpt*: Opt[TokenBucket] RPCHandler* = proc(peer: PubSubPeer, data: seq[byte]): Future[void] {.gcsafe, raises: [].} @@ -305,7 +305,7 @@ proc new*( onEvent: OnEvent, codec: string, maxMessageSize: int, - uselessAppBytesRateOpt: Opt[TokenBucket] = Opt.none(TokenBucket)): T = + overheadRateLimitOpt: Opt[TokenBucket] = Opt.none(TokenBucket)): T = result = T( getConn: getConn, @@ -314,7 +314,7 @@ proc new*( peerId: peerId, connectedFut: newFuture[void](), maxMessageSize: maxMessageSize, - uselessAppBytesRateOpt: uselessAppBytesRateOpt + overheadRateLimitOpt: overheadRateLimitOpt ) result.sentIHaves.addFirst(default(HashSet[MessageId])) result.heDontWants.addFirst(default(HashSet[MessageId])) From bfa50dcea8c4b2d017c72963bd5581d6aa84eeca Mon Sep 17 00:00:00 2001 From: Diego Date: Fri, 8 Sep 2023 17:48:29 +0200 Subject: [PATCH 25/26] Remove procs --- libp2p/protocols/pubsub/rpc/messages.nim | 54 ------------------------ 1 file changed, 54 deletions(-) diff --git a/libp2p/protocols/pubsub/rpc/messages.nim b/libp2p/protocols/pubsub/rpc/messages.nim index f8f17ca83a..d4cbf85dad 100644 --- a/libp2p/protocols/pubsub/rpc/messages.nim +++ b/libp2p/protocols/pubsub/rpc/messages.nim @@ -145,57 +145,3 @@ proc byteSize*(iwant: seq[ControlIWant]): int = for msgId in item.messageIds: total += msgId.len return total - -proc `$` (msg: PeerInfoMsg): string = - try: - return "PeerInfoMsg(peerId: " & $msg.peerId & ", signedPeerRecord: " & $msg.signedPeerRecord & ")" - except Exception: - return "PeerInfoMsg: exception raised" - -proc `$` (msg: SubOpts): string = - try: - return "SubOpts(subscribe: " & $msg.subscribe & ", topic: " & $msg.topic & ")" - except Exception: - return "SubOpts: exception raised" - -proc `$` (msg: Message): string = - try: - return "Message(fromPeer: " & $msg.fromPeer & ", data: " & $msg.data & ", seqno: " & $msg.seqno & ", topicIds: " & $msg.topicIds & ", signature: " & $msg.signature & ", key: " & $msg.key & ")" - except Exception: - return "Message: exception raised" - -proc `$` (msg: ControlIHave): string = - try: - return "ControlIHave(topicId: " & $msg.topicId & ", messageIds: " & $msg.messageIds & ")" - except Exception: - return "ControlIHave: exception raised" - -proc `$` (msg: ControlIWant): string = - try: - return "ControlIWant(messageIds: " & $msg.messageIds & ")" - except Exception: - return "ControlIWant: exception raised" - -proc `$` (msg: ControlGraft): string = - try: - return "ControlGraft(topicId: " & $msg.topicId & ")" - except Exception: - return "ControlGraft: exception raised" - -proc `$` (msg: ControlPrune): string = - try: - return "ControlPrune(topicId: " & $msg.topicId & ", peers: " & $msg.peers & ", backoff: " & $msg.backoff & ")" - except Exception: - return "ControlPrune: exception raised" - -proc `$` (msg: ControlMessage): string = - try: - return "ControlMessage(ihave: " & $msg.ihave & ", iwant: " & $msg.iwant & ", graft: " & $msg.graft & ", prune: " & $msg.prune & ", idontwant: " & $msg.idontwant & ")" - except Exception: - return "ControlMessage: exception raised" - -proc `$` (msg: RPCMsg): string = - try: - return "RPCMsg(subscriptions: " & $msg.subscriptions & ", messages: " & $msg.messages & ", control: " & $msg.control & ", ping: " & $msg.ping & ", pong: " & $msg.pong & ")" - except Exception: - return "RPCMsg: exception raised" From 18f0570c2f76d48460d588221d76791da198a0fa Mon Sep 17 00:00:00 2001 From: Diego Date: Wed, 13 Sep 2023 12:35:22 +0200 Subject: [PATCH 26/26] Renaming --- libp2p/protocols/pubsub/gossipsub.nim | 6 +++--- libp2p/protocols/pubsub/gossipsub/types.nim | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 9251406ce6..a4ee9fbcb7 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -80,7 +80,7 @@ proc init*(_: type[GossipSubParams]): GossipSubParams = enablePX: false, bandwidthEstimatebps: 100_000_000, # 100 Mbps or 12.5 MBps iwantTimeout: 3 * GossipSubHeartbeatInterval, - overheadRateLimitConfOpt: Opt.none(tuple[bytes: int, interval: Duration]) + overheadRateLimit: Opt.none(tuple[bytes: int, interval: Duration]) ) proc validateParameters*(parameters: GossipSubParams): Result[void, cstring] = @@ -728,6 +728,6 @@ method getOrCreatePeer*( protos: seq[string]): PubSubPeer = let peer = procCall PubSub(g).getOrCreatePeer(peerId, protos) - g.parameters.overheadRateLimitConfOpt.withValue(overheadRateLimitConf): - peer.overheadRateLimitOpt = Opt.some(TokenBucket.new(overheadRateLimitConf.bytes, overheadRateLimitConf.interval)) + g.parameters.overheadRateLimit.withValue(overheadRateLimit): + peer.overheadRateLimitOpt = Opt.some(TokenBucket.new(overheadRateLimit.bytes, overheadRateLimit.interval)) return peer diff --git a/libp2p/protocols/pubsub/gossipsub/types.nim b/libp2p/protocols/pubsub/gossipsub/types.nim index 61f6a0d6e5..01ca871cac 100644 --- a/libp2p/protocols/pubsub/gossipsub/types.nim +++ b/libp2p/protocols/pubsub/gossipsub/types.nim @@ -145,7 +145,7 @@ type bandwidthEstimatebps*: int # This is currently used only for limting flood publishing. 0 disables flood-limiting completely iwantTimeout*: Duration - overheadRateLimitConfOpt*: Opt[tuple[bytes: int, interval: Duration]] + overheadRateLimit*: Opt[tuple[bytes: int, interval: Duration]] BackoffTable* = Table[string, Table[PeerId, Moment]] ValidationSeenTable* = Table[MessageId, HashSet[PubSubPeer]]