Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GossipSub Rate Limit #920

Merged
merged 26 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
8845e7e
Disconnect peers with too high invalid traffic
diegomrsantos Jun 23, 2023
f8ca0f3
Improvements after code review
diegomrsantos Jun 23, 2023
2dcfa1a
Disconnect peers that sent only non-parseable msgs
diegomrsantos Jun 26, 2023
3fec42c
calculate average traffic
diegomrsantos Jul 3, 2023
c3ef6d7
fix metrics and add tests
diegomrsantos Jul 4, 2023
dfe9fe1
number of bad traffic score peers disconnected
diegomrsantos Jul 4, 2023
6f53761
rename metric and create invalidTrafficRatioThreshold param
diegomrsantos Jul 4, 2023
df78830
Disconnect peers which have bad traffic score right after they try to…
diegomrsantos Jul 25, 2023
a2ebbad
Improvements after my own review
diegomrsantos Jul 25, 2023
e45a32a
Fix metric names and values
diegomrsantos Jul 25, 2023
b267cab
Fix sign
diegomrsantos Jul 25, 2023
2bf7fe5
Introduce a useless application byte rate limit
diegomrsantos Jul 27, 2023
abbb438
Introduce a rate limit
diegomrsantos Jul 31, 2023
0396724
Add rate limit only to gossipsub
diegomrsantos Aug 1, 2023
fc11af4
Decouple decoding and rate limit
diegomrsantos Aug 2, 2023
d54b101
Just measure at the beginning instead of disconnecting, for test purp…
diegomrsantos Aug 2, 2023
b2f2109
Simplify data size calculation
diegomrsantos Aug 2, 2023
63fa57f
improvements
diegomrsantos Aug 10, 2023
855b770
Use params for rate limit
diegomrsantos Aug 16, 2023
1000bf0
Fixes after code review
diegomrsantos Aug 16, 2023
3ffbb1c
Fix bug
diegomrsantos Aug 28, 2023
c7884d5
Make rate limit disable by default if the config isn't provided
diegomrsantos Sep 6, 2023
f443a26
Fix test
diegomrsantos Sep 8, 2023
0f1def6
Improve naming
diegomrsantos Sep 8, 2023
bfa50dc
Remove procs
diegomrsantos Sep 8, 2023
18f0570
Renaming
diegomrsantos Sep 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions libp2p/protocols/pubsub/floodsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import ./pubsub,
./pubsubpeer,
./timedcache,
./peertable,
./rpc/[message, messages],
./rpc/[message, messages, protobuf],
../../crypto/crypto,
../../stream/connection,
../../peerid,
Expand Down Expand Up @@ -95,7 +95,16 @@ method unsubscribePeer*(f: FloodSub, peer: PeerId) =

method rpcHandler*(f: FloodSub,
peer: PubSubPeer,
rpcMsg: RPCMsg) {.async.} =
data: seq[byte]) {.async.} =

var rpcMsg = decodeRpcMsg(data).valueOr:
debug "failed to decode msg from peer", peer, err = error
raise newException(CatchableError, "")

trace "decoded msg from peer", peer, msg = rpcMsg.shortLog
# trigger hooks
peer.recvObservers(rpcMsg)

for i in 0..<min(f.topicsHigh, rpcMsg.subscriptions.len):
template sub: untyped = rpcMsg.subscriptions[i]
f.handleSubscribe(peer, sub.topic, sub.subscribe)
Expand Down
74 changes: 67 additions & 7 deletions libp2p/protocols/pubsub/gossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@

import std/[sets, sequtils]
import chronos, chronicles, metrics
import chronos/ratelimit
import ./pubsub,
./floodsub,
./pubsubpeer,
./peertable,
./mcache,
./timedcache,
./rpc/[messages, message],
./rpc/[messages, message, protobuf],
../protocol,
../../stream/connection,
../../peerinfo,
Expand Down Expand Up @@ -78,7 +79,8 @@
disconnectBadPeers: false,
enablePX: false,
bandwidthEstimatebps: 100_000_000, # 100 Mbps or 12.5 MBps
iwantTimeout: 3 * GossipSubHeartbeatInterval
iwantTimeout: 3 * GossipSubHeartbeatInterval,
overheadRateLimitConfOpt: Opt.none(tuple[bytes: int, interval: Duration])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
overheadRateLimitConfOpt: Opt.none(tuple[bytes: int, interval: Duration])
overheadRateLimit: Opt.none(tuple[bytes: int, interval: Duration])

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is different from the actual rate limit, it is a config. Adding the Opt makes using withValue cleaner.

Copy link
Contributor

@Menduist Menduist Sep 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything in that structure is a config, it's even named GossipSubParams
And since when do we put the type of a variable in its name?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

)

proc validateParameters*(parameters: GossipSubParams): Result[void, cstring] =
Expand Down Expand Up @@ -160,7 +162,7 @@
peer.behaviourPenalty = stats.behaviourPenalty

# Check if the score is below the threshold and disconnect the peer if necessary
g.disconnectBadPeerCheck(peer, stats.score)
g.disconnectIfBadScorePeer(peer, stats.score)

peer.iHaveBudget = IHavePeerBudget
peer.pingBudget = PingsPeerBudget
Expand Down Expand Up @@ -316,7 +318,7 @@
of ValidationResult.Reject:
debug "Dropping message after validation, reason: reject",
msgId = shortLog(msgId), peer
g.punishInvalidMessage(peer, msg.topicIds)
g.punishInvalidMessage(peer, msg)
return
of ValidationResult.Ignore:
debug "Dropping message after validation, reason: ignore",
Expand Down Expand Up @@ -376,9 +378,57 @@
except CatchableError as exc:
info "validateAndRelay failed", msg=exc.msg

proc dataAndTopicsIdSize(msgs: seq[Message]): int =
msgs.mapIt(it.data.len + it.topicIds.mapIt(it.len).foldl(a + b, 0)).foldl(a + b, 0)

proc rateLimit*(g: GossipSub, peer: PubSubPeer, rpcMsgOpt: Opt[RPCMsg], msgSize: int) {.raises:[PeerRateLimitError, CatchableError].} =
# In this way we count even ignored fields by protobuf

var rmsg = rpcMsgOpt.valueOr:
peer.overheadRateLimitOpt.withValue(overheadRateLimit):

Check warning on line 388 in libp2p/protocols/pubsub/gossipsub.nim

View check run for this annotation

Codecov / codecov/patch

libp2p/protocols/pubsub/gossipsub.nim#L388

Added line #L388 was not covered by tests
if not overheadRateLimit.tryConsume(msgSize):
libp2p_gossipsub_peers_rate_limit_disconnections.inc(labelValues = [peer.getAgent()]) # let's just measure at the beginning for test purposes.
debug "Peer sent a msg that couldn't be decoded and it's above rate limit", peer, uselessAppBytesNum = msgSize

Check warning on line 391 in libp2p/protocols/pubsub/gossipsub.nim

View check run for this annotation

Codecov / codecov/patch

libp2p/protocols/pubsub/gossipsub.nim#L390-L391

Added lines #L390 - L391 were not covered by tests
# discard g.disconnectPeer(peer)
# debug "Peer disconnected", peer, uselessAppBytesNum = msgSize
# raise newException(PeerRateLimitError, "Peer sent a msg that couldn't be decoded and it's above rate limit")

raise newException(CatchableError, "Peer msg couldn't be decoded")

Check warning on line 396 in libp2p/protocols/pubsub/gossipsub.nim

View check run for this annotation

Codecov / codecov/patch

libp2p/protocols/pubsub/gossipsub.nim#L396

Added line #L396 was not covered by tests

let usefulMsgBytesNum =
if g.verifySignature:
byteSize(rmsg.messages)
else:
dataAndTopicsIdSize(rmsg.messages)

var uselessAppBytesNum = msgSize - usefulMsgBytesNum
rmsg.control.withValue(control):
uselessAppBytesNum -= (byteSize(control.ihave) + byteSize(control.iwant))

peer.overheadRateLimitOpt.withValue(overheadRateLimit):
if not overheadRateLimit.tryConsume(uselessAppBytesNum):
libp2p_gossipsub_peers_rate_limit_disconnections.inc(labelValues = [peer.getAgent()]) # let's just measure at the beginning for test purposes.

Check warning on line 410 in libp2p/protocols/pubsub/gossipsub.nim

View check run for this annotation

Codecov / codecov/patch

libp2p/protocols/pubsub/gossipsub.nim#L410

Added line #L410 was not covered by tests
debug "Peer sent too much useless application data and it's above rate limit.", peer, msgSize, uselessAppBytesNum, rmsg
# discard g.disconnectPeer(peer)
# debug "Peer disconnected", peer, msgSize, uselessAppBytesNum
# raise newException(PeerRateLimitError, "Peer sent too much useless application data and it's above rate limit.")

method rpcHandler*(g: GossipSub,
peer: PubSubPeer,
rpcMsg: RPCMsg) {.async.} =
data: seq[byte]) {.async.} =

let msgSize = data.len
var rpcMsg = decodeRpcMsg(data).valueOr:
debug "failed to decode msg from peer", peer, err = error
rateLimit(g, peer, Opt.none(RPCMsg), msgSize)
return

trace "decoded msg from peer", peer, msg = rpcMsg.shortLog
rateLimit(g, peer, Opt.some(rpcMsg), msgSize)

# trigger hooks
peer.recvObservers(rpcMsg)

if rpcMsg.ping.len in 1..<64 and peer.pingBudget > 0:
g.send(peer, RPCMsg(pong: rpcMsg.ping))
peer.pingBudget.dec
Expand Down Expand Up @@ -442,14 +492,14 @@
# always validate if signature is present or required
debug "Dropping message due to failed signature verification",
msgId = shortLog(msgId), peer
g.punishInvalidMessage(peer, msg.topicIds)
g.punishInvalidMessage(peer, msg)

Check warning on line 495 in libp2p/protocols/pubsub/gossipsub.nim

View check run for this annotation

Codecov / codecov/patch

libp2p/protocols/pubsub/gossipsub.nim#L495

Added line #L495 was not covered by tests
continue

if msg.seqno.len > 0 and msg.seqno.len != 8:
# if we have seqno should be 8 bytes long
debug "Dropping message due to invalid seqno length",
msgId = shortLog(msgId), peer
g.punishInvalidMessage(peer, msg.topicIds)
g.punishInvalidMessage(peer, msg)

Check warning on line 502 in libp2p/protocols/pubsub/gossipsub.nim

View check run for this annotation

Codecov / codecov/patch

libp2p/protocols/pubsub/gossipsub.nim#L502

Added line #L502 was not covered by tests
continue

# g.anonymize needs no evaluation when receiving messages
Expand Down Expand Up @@ -671,3 +721,13 @@

# init gossip stuff
g.mcache = MCache.init(g.parameters.historyGossip, g.parameters.historyLength)

method getOrCreatePeer*(
g: GossipSub,
peerId: PeerId,
protos: seq[string]): PubSubPeer =

let peer = procCall PubSub(g).getOrCreatePeer(peerId, protos)
g.parameters.overheadRateLimitConfOpt.withValue(overheadRateLimitConf):
peer.overheadRateLimitOpt = Opt.some(TokenBucket.new(overheadRateLimitConf.bytes, overheadRateLimitConf.interval))

Check warning on line 732 in libp2p/protocols/pubsub/gossipsub.nim

View check run for this annotation

Codecov / codecov/patch

libp2p/protocols/pubsub/gossipsub.nim#L732

Added line #L732 was not covered by tests
return peer
54 changes: 22 additions & 32 deletions libp2p/protocols/pubsub/gossipsub/scoring.nim
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@

import std/[tables, sets]
import chronos, chronicles, metrics
import chronos/ratelimit
import "."/[types]
import ".."/[pubsubpeer]
import ../rpc/messages
import "../../.."/[peerid, multiaddress, switch, utils/heartbeat]
import ../pubsub

logScope:
topics = "libp2p gossipsub"
Expand All @@ -27,6 +30,7 @@
declareGauge(libp2p_gossipsub_peers_score_appScore, "Detailed gossipsub scoring metric", labels = ["agent"])
declareGauge(libp2p_gossipsub_peers_score_behaviourPenalty, "Detailed gossipsub scoring metric", labels = ["agent"])
declareGauge(libp2p_gossipsub_peers_score_colocationFactor, "Detailed gossipsub scoring metric", labels = ["agent"])
declarePublicCounter(libp2p_gossipsub_peers_rate_limit_disconnections, "The number of peer disconnections by gossipsub because of rate limit", labels = ["agent"])

proc init*(_: type[TopicParams]): TopicParams =
TopicParams(
Expand Down Expand Up @@ -85,27 +89,18 @@

{.pop.}

proc disconnectPeer(g: GossipSub, peer: PubSubPeer) {.async.} =
let agent =
when defined(libp2p_agents_metrics):
if peer.shortAgent.len > 0:
peer.shortAgent
else:
"unknown"
else:
"unknown"
libp2p_gossipsub_bad_score_disconnection.inc(labelValues = [agent])

proc disconnectPeer*(g: GossipSub, peer: PubSubPeer) {.async.} =
try:
await g.switch.disconnect(peer.peerId)
except CatchableError as exc: # Never cancelled
trace "Failed to close connection", peer, error = exc.name, msg = exc.msg

proc disconnectBadPeerCheck*(g: GossipSub, peer: PubSubPeer, score: float64) =
proc disconnectIfBadScorePeer*(g: GossipSub, peer: PubSubPeer, score: float64) =
if g.parameters.disconnectBadPeers and score < g.parameters.graylistThreshold and
peer.peerId notin g.parameters.directPeers:
debug "disconnecting bad score peer", peer, score = peer.score
asyncSpawn(g.disconnectPeer(peer))
libp2p_gossipsub_bad_score_disconnection.inc(labelValues = [peer.getAgent()])

proc updateScores*(g: GossipSub) = # avoid async
## https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#the-score-function
Expand Down Expand Up @@ -175,14 +170,7 @@
score += topicScore * topicParams.topicWeight

# Score metrics
let agent =
when defined(libp2p_agents_metrics):
if peer.shortAgent.len > 0:
peer.shortAgent
else:
"unknown"
else:
"unknown"
let agent = peer.getAgent()
libp2p_gossipsub_peers_score_firstMessageDeliveries.inc(info.firstMessageDeliveries, labelValues = [agent])
libp2p_gossipsub_peers_score_meshMessageDeliveries.inc(info.meshMessageDeliveries, labelValues = [agent])
libp2p_gossipsub_peers_score_meshFailurePenalty.inc(info.meshFailurePenalty, labelValues = [agent])
Expand Down Expand Up @@ -219,14 +207,7 @@
score += colocationFactor * g.parameters.ipColocationFactorWeight

# Score metrics
let agent =
when defined(libp2p_agents_metrics):
if peer.shortAgent.len > 0:
peer.shortAgent
else:
"unknown"
else:
"unknown"
let agent = peer.getAgent()
libp2p_gossipsub_peers_score_appScore.inc(peer.appScore, labelValues = [agent])
libp2p_gossipsub_peers_score_behaviourPenalty.inc(peer.behaviourPenalty, labelValues = [agent])
libp2p_gossipsub_peers_score_colocationFactor.inc(colocationFactor, labelValues = [agent])
Expand All @@ -246,8 +227,7 @@

trace "updated peer's score", peer, score = peer.score, n_topics, is_grafted

g.disconnectBadPeerCheck(peer, stats.score)

g.disconnectIfBadScorePeer(peer, stats.score)
libp2p_gossipsub_peers_scores.inc(peer.score, labelValues = [agent])

for peer in evicting:
Expand All @@ -260,8 +240,18 @@
trace "running scoring heartbeat", instance = cast[int](g)
g.updateScores()

proc punishInvalidMessage*(g: GossipSub, peer: PubSubPeer, topics: seq[string]) =
for tt in topics:
proc punishInvalidMessage*(g: GossipSub, peer: PubSubPeer, msg: Message) =
let uselessAppBytesNum = msg.data.len
peer.overheadRateLimitOpt.withValue(overheadRateLimit):
if not overheadRateLimit.tryConsume(uselessAppBytesNum):
debug "Peer sent invalid message and it's above rate limit", peer, uselessAppBytesNum
libp2p_gossipsub_peers_rate_limit_disconnections.inc(labelValues = [peer.getAgent()]) # let's just measure at the beginning for test purposes.

Check warning on line 248 in libp2p/protocols/pubsub/gossipsub/scoring.nim

View check run for this annotation

Codecov / codecov/patch

libp2p/protocols/pubsub/gossipsub/scoring.nim#L246-L248

Added lines #L246 - L248 were not covered by tests
# discard g.disconnectPeer(peer)
# debug "Peer disconnected", peer, uselessAppBytesNum
# raise newException(PeerRateLimitError, "Peer sent invalid message and it's above rate limit")


for tt in msg.topicIds:
let t = tt
if t notin g.topics:
continue
Expand Down
2 changes: 2 additions & 0 deletions libp2p/protocols/pubsub/gossipsub/types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ type
bandwidthEstimatebps*: int # This is currently used only for limting flood publishing. 0 disables flood-limiting completely
iwantTimeout*: Duration

overheadRateLimitConfOpt*: Opt[tuple[bytes: int, interval: Duration]]

BackoffTable* = Table[string, Table[PeerId, Moment]]
ValidationSeenTable* = Table[MessageId, HashSet[PubSubPeer]]

Expand Down
12 changes: 7 additions & 5 deletions libp2p/protocols/pubsub/pubsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import std/[tables, sequtils, sets, strutils]
import chronos, chronicles, metrics
import chronos/ratelimit
import ./errors as pubsub_errors,
./pubsubpeer,
./rpc/[message, messages, protobuf],
Expand Down Expand Up @@ -263,7 +264,7 @@

method rpcHandler*(p: PubSub,
peer: PubSubPeer,
rpcMsg: RPCMsg): Future[void] {.base, async.} =
data: seq[byte]): Future[void] {.base, async.} =

Check warning on line 267 in libp2p/protocols/pubsub/pubsub.nim

View check run for this annotation

Codecov / codecov/patch

libp2p/protocols/pubsub/pubsub.nim#L267

Added line #L267 was not covered by tests
## Handler that must be overridden by concrete implementation
raiseAssert "Unimplemented"

Expand All @@ -278,10 +279,11 @@
of PubSubPeerEventKind.Disconnected:
discard

proc getOrCreatePeer*(
method getOrCreatePeer*(
p: PubSub,
peerId: PeerId,
protos: seq[string]): PubSubPeer =
protos: seq[string]): PubSubPeer {.base, gcsafe.} =

p.peers.withValue(peerId, peer):
return peer[]

Expand Down Expand Up @@ -354,9 +356,9 @@
## that we're interested in
##

proc handler(peer: PubSubPeer, msg: RPCMsg): Future[void] =
proc handler(peer: PubSubPeer, data: seq[byte]): Future[void] =
# call pubsub rpc handler
p.rpcHandler(peer, msg)
p.rpcHandler(peer, data)

let peer = p.getOrCreatePeer(conn.peerId, @[proto])

Expand Down
Loading
Loading