Skip to content

Commit

Permalink
Introduce a useless application byte rate limit
Browse files Browse the repository at this point in the history
  • Loading branch information
diegomrsantos committed Jul 27, 2023
1 parent e99dbf4 commit 7eaa750
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 69 deletions.
13 changes: 11 additions & 2 deletions libp2p/protocols/pubsub/gossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,7 @@ method onNewPeer(g: GossipSub, peer: PubSubPeer) =
peer.behaviourPenalty = stats.behaviourPenalty

# Check if the score is below the threshold and disconnect the peer if necessary
if not g.disconnectIfBadPeer(peer, stats.score, g.parameters.graylistThreshold):
g.disconnectIfBadTrafficPeer(peer)
g.disconnectIfBadPeer(peer, stats.score)

peer.iHaveBudget = IHavePeerBudget
peer.pingBudget = PingsPeerBudget
Expand Down Expand Up @@ -359,6 +358,10 @@ method rpcHandler*(g: GossipSub,
peer: PubSubPeer,
rpcMsg: RPCMsg) {.async.} =

if peer.shouldDisconnectPeer:
discard g.disconnectPeer(peer)
return

Check warning on line 363 in libp2p/protocols/pubsub/gossipsub.nim

View check run for this annotation

Codecov / codecov/patch

libp2p/protocols/pubsub/gossipsub.nim#L363

Added line #L363 was not covered by tests

if rpcMsg.ping.len in 1..<64 and peer.pingBudget > 0:
g.send(peer, RPCMsg(pong: rpcMsg.ping))
peer.pingBudget.dec
Expand Down Expand Up @@ -644,3 +647,9 @@ method initPubSub*(g: GossipSub)

# init gossip stuff
g.mcache = MCache.init(g.parameters.historyGossip, g.parameters.historyLength)

method shoulDisconnectPeer*(g: GossipSub, peer: PubSubPeer, score: float64): bool =
if g.parameters.disconnectBadPeers and score < g.parameters.graylistThreshold and
peer.peerId notin g.parameters.directPeers:
return true
return false
60 changes: 7 additions & 53 deletions libp2p/protocols/pubsub/gossipsub/scoring.nim
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,17 @@

import std/[tables, sets]
import chronos, chronicles, metrics
import chronos/ratelimit
import "."/[types]
import ".."/[pubsubpeer]
import ../rpc/messages
import "../../.."/[peerid, multiaddress, switch, utils/heartbeat]
import ../pubsub

logScope:
topics = "libp2p gossipsub"

declareGauge(libp2p_gossipsub_peers_scores, "the scores of the peers in gossipsub", labels = ["agent"])
declareCounter(libp2p_gossipsub_bad_score_disconnection, "the number of peers disconnected by gossipsub", labels = ["agent"])
declareGauge(libp2p_gossipsub_peers_score_firstMessageDeliveries, "Detailed gossipsub scoring metric", labels = ["agent"])
declareGauge(libp2p_gossipsub_peers_score_meshMessageDeliveries, "Detailed gossipsub scoring metric", labels = ["agent"])
declareGauge(libp2p_gossipsub_peers_score_meshFailurePenalty, "Detailed gossipsub scoring metric", labels = ["agent"])
Expand Down Expand Up @@ -90,50 +91,6 @@ proc colocationFactor(g: GossipSub, peer: PubSubPeer): float64 =

{.pop.}

proc getAgent(peer: PubSubPeer): string =
return
when defined(libp2p_agents_metrics):
if peer.shortAgent.len > 0:
peer.shortAgent
else:
"unknown"
else:
"unknown"

proc disconnectPeer(g: GossipSub, peer: PubSubPeer) {.async.} =
let agent = getAgent(peer)
libp2p_gossipsub_bad_score_disconnection.inc(labelValues = [agent])

try:
await g.switch.disconnect(peer.peerId)
except CatchableError as exc: # Never cancelled
trace "Failed to close connection", peer, error = exc.name, msg = exc.msg

proc disconnectIfBadPeer*(g: GossipSub, peer: PubSubPeer, score: float64, threshold: float64): bool =
if g.parameters.disconnectBadPeers and score < threshold and
peer.peerId notin g.parameters.directPeers:
debug "disconnecting bad score peer", peer, score, threshold
asyncSpawn(g.disconnectPeer(peer))
return true

return false

proc disconnectIfBadTrafficPeer*(g: GossipSub, peer: PubSubPeer) =
if peer.totalTraffic > 0:
# dividing in this way to avoid integer overflow
let agent = getAgent(peer)
let invalidTrafficRatio = float64(peer.invalidTraffic) / float64(peer.totalTraffic)
let invalidIgnoredTrafficRatio = float64(peer.invalidIgnoredTraffic) / float64(peer.totalTraffic)
let totalInvalidTrafficRatio = invalidTrafficRatio + invalidIgnoredTrafficRatio
libp2p_gossipsub_peers_invalidTraffic_bytes.inc(float64(peer.invalidTraffic), labelValues = [agent])
libp2p_gossipsub_peers_invalidIgnoredTraffic_bytes.inc(float64(peer.invalidIgnoredTraffic), labelValues = [agent])
libp2p_gossipsub_peers_totalTraffic_bytes.inc(float64(peer.totalTraffic), labelValues = [agent])

# inverting the signs as we want to compare if the ratio is above the threshold
if g.disconnectIfBadPeer(peer, -totalInvalidTrafficRatio, -g.parameters.invalidTrafficRatioThreshold):
libp2p_gossipsub_peers_badTrafficScorePeerDisconnections.inc(labelValues = [getAgent(peer)])
debug "Bad traffic score peer disconnected", peer

proc updateScores*(g: GossipSub) = # avoid async
## https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#the-score-function
##
Expand Down Expand Up @@ -202,7 +159,7 @@ proc updateScores*(g: GossipSub) = # avoid async
score += topicScore * topicParams.topicWeight

# Score metrics
let agent = getAgent(peer)
let agent = peer.getAgent()
libp2p_gossipsub_peers_score_firstMessageDeliveries.inc(info.firstMessageDeliveries, labelValues = [agent])
libp2p_gossipsub_peers_score_meshMessageDeliveries.inc(info.meshMessageDeliveries, labelValues = [agent])
libp2p_gossipsub_peers_score_meshFailurePenalty.inc(info.meshFailurePenalty, labelValues = [agent])
Expand Down Expand Up @@ -239,7 +196,7 @@ proc updateScores*(g: GossipSub) = # avoid async
score += colocationFactor * g.parameters.ipColocationFactorWeight

# Score metrics
let agent = getAgent(peer)
let agent = peer.getAgent()
libp2p_gossipsub_peers_score_appScore.inc(peer.appScore, labelValues = [agent])
libp2p_gossipsub_peers_score_behaviourPenalty.inc(peer.behaviourPenalty, labelValues = [agent])
libp2p_gossipsub_peers_score_colocationFactor.inc(colocationFactor, labelValues = [agent])
Expand All @@ -259,10 +216,7 @@ proc updateScores*(g: GossipSub) = # avoid async

trace "updated peer's score", peer, score = peer.score, n_topics, is_grafted

if not g.disconnectIfBadPeer(peer, stats.score, g.parameters.graylistThreshold):
# this is necessary to disconnect peers that send only non-parseable messages as the message won't go through the normal path
g.disconnectIfBadTrafficPeer(peer)

g.disconnectIfBadPeer(peer, stats.score)
libp2p_gossipsub_peers_scores.inc(peer.score, labelValues = [agent])

for peer in evicting:
Expand All @@ -276,8 +230,8 @@ proc scoringHeartbeat*(g: GossipSub) {.async.} =
g.updateScores()

proc punishInvalidMessage*(g: GossipSub, peer: PubSubPeer, msg: Message) =
peer.invalidTraffic += byteSize(msg)
g.disconnectIfBadTrafficPeer(peer)
if not peer.uselessAppBytesRate.tryConsume(len(msg.data)):
discard g.disconnectPeer(peer)

Check warning on line 235 in libp2p/protocols/pubsub/gossipsub/scoring.nim

View check run for this annotation

Codecov / codecov/patch

libp2p/protocols/pubsub/gossipsub/scoring.nim#L235

Added line #L235 was not covered by tests
for tt in msg.topicIds:
let t = tt
Expand Down
19 changes: 19 additions & 0 deletions libp2p/protocols/pubsub/pubsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ declarePublicCounter(libp2p_pubsub_received_ihave, "pubsub broadcast ihave", lab
declarePublicCounter(libp2p_pubsub_received_graft, "pubsub broadcast graft", labels = ["topic"])
declarePublicCounter(libp2p_pubsub_received_prune, "pubsub broadcast prune", labels = ["topic"])

declareCounter(libp2p_gossipsub_bad_score_disconnection, "the number of peers disconnected by gossipsub", labels = ["agent"])

type
InitializationError* = object of LPError

Expand Down Expand Up @@ -603,3 +605,20 @@ proc removeObserver*(p: PubSub; observer: PubSubObserver) {.public.} =
let idx = p.observers[].find(observer)
if idx != -1:
p.observers[].del(idx)

proc disconnectPeer*(p: PubSub, peer: PubSubPeer) {.async.} =
let agent = peer.getAgent()
libp2p_gossipsub_bad_score_disconnection.inc(labelValues = [agent])

try:
await p.switch.disconnect(peer.peerId)
except CatchableError as exc: # Never cancelled
trace "Failed to close connection", peer, error = exc.name, msg = exc.msg

method shoulDisconnectPeer*(p: PubSub, peer: PubSubPeer, score: float64): bool {.base.} =
doAssert(false, "Not implemented!")

Check warning on line 619 in libp2p/protocols/pubsub/pubsub.nim

View check run for this annotation

Codecov / codecov/patch

libp2p/protocols/pubsub/pubsub.nim#L619

Added line #L619 was not covered by tests

method disconnectIfBadPeer*(p: PubSub, peer: PubSubPeer, score: float64) {.base.} =
if p.shoulDisconnectPeer(peer, score):
debug "disconnecting bad score peer", peer, score
asyncSpawn(p.disconnectPeer(peer))
37 changes: 30 additions & 7 deletions libp2p/protocols/pubsub/pubsubpeer.nim
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import std/[sequtils, strutils, tables, hashes, options, sets, deques]
import stew/results
import chronos, chronicles, nimcrypto/sha2, metrics
import chronos/ratelimit
import rpc/[messages, message, protobuf],
../../peerid,
../../peerinfo,
Expand Down Expand Up @@ -65,9 +66,8 @@ type
maxMessageSize: int
appScore*: float64 # application specific score
behaviourPenalty*: float64 # the eventual penalty score
totalTraffic*: int64
invalidTraffic*: int64 # data that was parsed by protobuf but was invalid
invalidIgnoredTraffic*: int64 # data that couldn't be parsed by protobuf
uselessAppBytesRate*: TokenBucket
shouldDisconnectPeer*: bool

RPCHandler* = proc(peer: PubSubPeer, msg: RPCMsg): Future[void]
{.gcsafe, raises: [].}
Expand Down Expand Up @@ -129,6 +129,9 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
try:
try:
while not conn.atEof:
if p.shouldDisconnectPeer:
debug "Ignoring peer", conn, peer = p
break
trace "waiting for data", conn, peer = p, closed = conn.closed

var data = await conn.readLp(p.maxMessageSize)
Expand All @@ -138,14 +141,22 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =

# In this way we count even ignored fields by protobuf
let msgSize = len(data)
p.totalTraffic += msgSize
var rmsg = decodeRpcMsg(data).valueOr:
p.invalidIgnoredTraffic += msgSize
if not p.uselessAppBytesRate.tryConsume(msgSize):
p.shouldDisconnectPeer = true
debug "failed to decode msg from peer",
conn, peer = p, closed = conn.closed,
err = error
break
p.invalidIgnoredTraffic += msgSize - byteSize(rmsg)

let uselessAppBytesNum = block:
rmsg.control.withValue(control):
msgSize - byteSize(control.ihave) - byteSize(control.iwant)
else: msgSize

if not p.uselessAppBytesRate.tryConsume(uselessAppBytesNum):
p.shouldDisconnectPeer = true

data = newSeq[byte]() # Release memory

trace "decoded msg from peer",
Expand Down Expand Up @@ -322,6 +333,18 @@ proc new*(
codec: codec,
peerId: peerId,
connectedFut: newFuture[void](),
maxMessageSize: maxMessageSize
maxMessageSize: maxMessageSize,
uselessAppBytesRate: TokenBucket.new(1024, 1.seconds),
shouldDisconnectPeer: false,
)
result.sentIHaves.addFirst(default(HashSet[MessageId]))

proc getAgent*(peer: PubSubPeer): string =
return
when defined(libp2p_agents_metrics):
if peer.shortAgent.len > 0:
peer.shortAgent
else:
"unknown"
else:
"unknown"
24 changes: 17 additions & 7 deletions libp2p/protocols/pubsub/rpc/messages.nim
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,21 @@ proc byteSize*(msg: Message): int =
total += topicId.len
return total

proc byteSize*(ihave: seq[ControlIHave]): int =
var total = 0
for item in ihave:
total += item.topicId.len
for msgId in item.messageIds:
total += msgId.len
return total

proc byteSize*(iwant: seq[ControlIWant]): int =
var total = 0
for item in iwant:
for msgId in item.messageIds:
total += msgId.len
return total

proc byteSize*(msg: RPCMsg): int =
var total = 0

Expand All @@ -139,14 +154,9 @@ proc byteSize*(msg: RPCMsg): int =

if msg.control.isSome:
let ctrl = msg.control.get()
for item in ctrl.ihave:
total += item.topicId.len
for msgId in item.messageIds:
total += msgId.len
total += byteSize(ctrl.ihave)

for item in ctrl.iwant:
for msgId in item.messageIds:
total += msgId.len
total += byteSize(ctrl.iwant)

for item in ctrl.graft:
total += item.topicId.len
Expand Down

0 comments on commit 7eaa750

Please sign in to comment.