Skip to content

Commit

Permalink
Introduce a rate limit
Browse files Browse the repository at this point in the history
  • Loading branch information
diegomrsantos committed Jul 31, 2023
1 parent 979709c commit 397e2c3
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 71 deletions.
7 changes: 6 additions & 1 deletion libp2p/protocols/pubsub/floodsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ method unsubscribePeer*(f: FloodSub, peer: PeerId) =

method rpcHandler*(f: FloodSub,
peer: PubSubPeer,
rpcMsg: RPCMsg) {.async.} =
data: seq[byte]) {.async.} =
let rpcMsg = decodeAndCheckRateLimit(f, peer, data)

for i in 0..<min(f.topicsHigh, rpcMsg.subscriptions.len):
template sub: untyped = rpcMsg.subscriptions[i]
f.handleSubscribe(peer, sub.topic, sub.subscribe)
Expand Down Expand Up @@ -227,3 +229,6 @@ method initPubSub*(f: FloodSub)
hmacDrbgGenerate(f.rng[], f.seenSalt)

f.init()

method shoulDisconnectPeer*(f: FloodSub, peer: PubSubPeer, score: float64): bool =
return true

Check warning on line 234 in libp2p/protocols/pubsub/floodsub.nim

View check run for this annotation

Codecov / codecov/patch

libp2p/protocols/pubsub/floodsub.nim#L233-L234

Added lines #L233 - L234 were not covered by tests
9 changes: 3 additions & 6 deletions libp2p/protocols/pubsub/gossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ proc init*(_: type[GossipSubParams]): GossipSubParams =
behaviourPenaltyWeight: -1.0,
behaviourPenaltyDecay: 0.999,
disconnectBadPeers: false,
enablePX: false,
invalidTrafficRatioThreshold: 0.3,
enablePX: false
)

proc validateParameters*(parameters: GossipSubParams): Result[void, cstring] =
Expand Down Expand Up @@ -372,11 +371,9 @@ proc validateAndRelay(g: GossipSub,

method rpcHandler*(g: GossipSub,
peer: PubSubPeer,
rpcMsg: RPCMsg) {.async.} =
data: seq[byte]) {.async.} =

if peer.shouldDisconnectPeer:
discard g.disconnectPeer(peer)
return
let rpcMsg = decodeAndCheckRateLimit(g, peer, data)

if rpcMsg.ping.len in 1..<64 and peer.pingBudget > 0:
g.send(peer, RPCMsg(pong: rpcMsg.ping))
Expand Down
76 changes: 56 additions & 20 deletions libp2p/protocols/pubsub/pubsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import std/[tables, sequtils, sets, strutils]
import chronos, chronicles, metrics
import chronos/ratelimit
import ./errors as pubsub_errors,
./pubsubpeer,
./rpc/[message, messages, protobuf],
Expand Down Expand Up @@ -263,9 +264,61 @@ proc updateMetrics*(p: PubSub, rpcMsg: RPCMsg) =
else:
libp2p_pubsub_received_prune.inc(labelValues = ["generic"])

proc disconnectPeer*(p: PubSub, peer: PubSubPeer) {.async.} =
let agent = peer.getAgent()
libp2p_gossipsub_bad_score_disconnection.inc(labelValues = [agent])

try:
await p.switch.disconnect(peer.peerId)
debug "Peer disconnected", peer
except CatchableError as exc: # Never cancelled
debug "Failed to close connection", peer, error = exc.name, msg = exc.msg

Check warning on line 275 in libp2p/protocols/pubsub/pubsub.nim

View check run for this annotation

Codecov / codecov/patch

libp2p/protocols/pubsub/pubsub.nim#L275

Added line #L275 was not covered by tests

method shoulDisconnectPeer*(p: PubSub, peer: PubSubPeer, score: float64): bool {.base.} =
doAssert(false, "Not implemented!")

Check warning on line 278 in libp2p/protocols/pubsub/pubsub.nim

View check run for this annotation

Codecov / codecov/patch

libp2p/protocols/pubsub/pubsub.nim#L278

Added line #L278 was not covered by tests

method disconnectIfBadPeer*(p: PubSub, peer: PubSubPeer, score: float64) {.base.} =
if p.shoulDisconnectPeer(peer, score):
debug "disconnecting bad score peer", peer, score
asyncSpawn(p.disconnectPeer(peer))

proc decodeAndCheckRateLimit*(p: PubSub, peer: PubSubPeer, data: seq[byte]): RPCMsg {.raises:[PeerRateLimitError].} =
# In this way we count even ignored fields by protobuf
let msgSize = len(data)
var rmsg = decodeRpcMsg(data).valueOr:
debug "failed to decode msg from peer", peer, err = error
if not peer.uselessAppBytesRate.tryConsume(msgSize):
discard p.disconnectPeer(peer)
raise newException(PeerRateLimitError, "Peer sent too much useless data that couldn't be decoded")

let usefulMsgBytesNum =

Check warning on line 294 in libp2p/protocols/pubsub/pubsub.nim

View check run for this annotation

Codecov / codecov/patch

libp2p/protocols/pubsub/pubsub.nim#L294

Added line #L294 was not covered by tests
if p.verifySignature:
byteSize(rmsg.messages)
else:
rmsg.messages.mapIt(it.data.len).foldl(a + b, 0) +
rmsg.messages
.mapIt(

Check warning on line 300 in libp2p/protocols/pubsub/pubsub.nim

View check run for this annotation

Codecov / codecov/patch

libp2p/protocols/pubsub/pubsub.nim#L300

Added line #L300 was not covered by tests
it.topicIds.mapIt(it.len).foldl(a + b, 0)
)
.foldl(a + b, 0)

Check warning on line 304 in libp2p/protocols/pubsub/pubsub.nim

View check run for this annotation

Codecov / codecov/patch

libp2p/protocols/pubsub/pubsub.nim#L304

Added line #L304 was not covered by tests
var uselessAppBytesNum = msgSize - usefulMsgBytesNum
rmsg.control.withValue(control):

Check warning on line 306 in libp2p/protocols/pubsub/pubsub.nim

View check run for this annotation

Codecov / codecov/patch

libp2p/protocols/pubsub/pubsub.nim#L306

Added line #L306 was not covered by tests
uselessAppBytesNum -= byteSize(control.ihave) - byteSize(control.iwant)

if not peer.uselessAppBytesRate.tryConsume(uselessAppBytesNum):
debug "Peer sent too many useless data", peer, msgSize, uselessAppBytesNum
discard p.disconnectPeer(peer)
raise newException(PeerRateLimitError, "Peer sent too much useless application data")

Check warning on line 312 in libp2p/protocols/pubsub/pubsub.nim

View check run for this annotation

Codecov / codecov/patch

libp2p/protocols/pubsub/pubsub.nim#L310-L312

Added lines #L310 - L312 were not covered by tests

debug "decoded msg from peer", peer, msg = rmsg.shortLog
# trigger hooks
peer.recvObservers(rmsg)
return rmsg

Check warning on line 318 in libp2p/protocols/pubsub/pubsub.nim

View check run for this annotation

Codecov / codecov/patch

libp2p/protocols/pubsub/pubsub.nim#L318

Added line #L318 was not covered by tests
method rpcHandler*(p: PubSub,
peer: PubSubPeer,
rpcMsg: RPCMsg): Future[void] {.base, async.} =
data: seq[byte]): Future[void] {.base, async.} =

Check warning on line 321 in libp2p/protocols/pubsub/pubsub.nim

View check run for this annotation

Codecov / codecov/patch

libp2p/protocols/pubsub/pubsub.nim#L321

Added line #L321 was not covered by tests
## Handler that must be overridden by concrete implementation
raiseAssert "Unimplemented"

Expand Down Expand Up @@ -356,9 +409,9 @@ method handleConn*(p: PubSub,
## that we're interested in
##

proc handler(peer: PubSubPeer, msg: RPCMsg): Future[void] =
proc handler(peer: PubSubPeer, data: seq[byte]): Future[void] =
# call pubsub rpc handler
p.rpcHandler(peer, msg)
p.rpcHandler(peer, data)

let peer = p.getOrCreatePeer(conn.peerId, @[proto])

Expand Down Expand Up @@ -605,20 +658,3 @@ proc removeObserver*(p: PubSub; observer: PubSubObserver) {.public.} =
let idx = p.observers[].find(observer)
if idx != -1:
p.observers[].del(idx)

proc disconnectPeer*(p: PubSub, peer: PubSubPeer) {.async.} =
let agent = peer.getAgent()
libp2p_gossipsub_bad_score_disconnection.inc(labelValues = [agent])

try:
await p.switch.disconnect(peer.peerId)
except CatchableError as exc: # Never cancelled
trace "Failed to close connection", peer, error = exc.name, msg = exc.msg

method shoulDisconnectPeer*(p: PubSub, peer: PubSubPeer, score: float64): bool {.base.} =
doAssert(false, "Not implemented!")

method disconnectIfBadPeer*(p: PubSub, peer: PubSubPeer, score: float64) {.base.} =
if p.shoulDisconnectPeer(peer, score):
debug "disconnecting bad score peer", peer, score
asyncSpawn(p.disconnectPeer(peer))
43 changes: 11 additions & 32 deletions libp2p/protocols/pubsub/pubsubpeer.nim
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ when defined(libp2p_expensive_metrics):
declareCounter(libp2p_pubsub_skipped_sent_messages, "number of sent skipped messages", labels = ["id"])

type
PeerRateLimitError* = object of CatchableError

PubSubObserver* = ref object
onRecv*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [].}
onSend*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [].}
Expand Down Expand Up @@ -70,7 +72,7 @@ type
uselessAppBytesRate*: TokenBucket
shouldDisconnectPeer*: bool

RPCHandler* = proc(peer: PubSubPeer, msg: RPCMsg): Future[void]
RPCHandler* = proc(peer: PubSubPeer, data: seq[byte]): Future[void]
{.gcsafe, raises: [].}

when defined(libp2p_agents_metrics):
Expand Down Expand Up @@ -110,7 +112,7 @@ func outbound*(p: PubSubPeer): bool =
else:
false

proc recvObservers(p: PubSubPeer, msg: var RPCMsg) =
proc recvObservers*(p: PubSubPeer, msg: var RPCMsg) =
# trigger hooks
if not(isNil(p.observers)) and p.observers[].len > 0:
for obs in p.observers[]:
Expand All @@ -130,49 +132,26 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
try:
try:
while not conn.atEof:
if p.shouldDisconnectPeer:
debug "Ignoring peer", conn, peer = p
break
trace "waiting for data", conn, peer = p, closed = conn.closed

var data = await conn.readLp(p.maxMessageSize)
trace "read data from peer",
conn, peer = p, closed = conn.closed,
data = data.shortLog

# In this way we count even ignored fields by protobuf
let msgSize = len(data)
var rmsg = decodeRpcMsg(data).valueOr:
if not p.uselessAppBytesRate.tryConsume(msgSize):
p.shouldDisconnectPeer = true
debug "failed to decode msg from peer",
conn, peer = p, closed = conn.closed,
err = error
break

let uselessAppBytesNum = block:
rmsg.control.withValue(control):
msgSize - byteSize(control.ihave) - byteSize(control.iwant)
else: msgSize

if not p.uselessAppBytesRate.tryConsume(uselessAppBytesNum):
p.shouldDisconnectPeer = true

data = newSeq[byte]() # Release memory

trace "decoded msg from peer",
conn, peer = p, closed = conn.closed,
msg = rmsg.shortLog
# trigger hooks
p.recvObservers(rmsg)

when defined(libp2p_expensive_metrics):
for m in rmsg.messages:
for t in m.topicIDs:
# metrics
libp2p_pubsub_received_messages.inc(labelValues = [$p.peerId, t])

await p.handler(p, rmsg)
await p.handler(p, data)
data = newSeq[byte]() # Release memory
except PeerRateLimitError as exc:
debug "Peer rate limit exceeded, exiting read while", conn, peer = p, error = exc.msg
except CatchableError as exc:
debug "Exception occurred in PubSubPeer.handle",
conn, peer = p, closed = conn.closed, exc = exc.msg
finally:
await conn.close()
except CancelledError:
Expand Down
3 changes: 3 additions & 0 deletions libp2p/protocols/pubsub/rpc/messages.nim
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ proc byteSize*(msg: Message): int =
total += topicId.len
return total

proc byteSize*(msgs: seq[Message]): int =
msgs.mapIt(byteSize(it)).foldl(a + b, 0)

proc byteSize*(ihave: seq[ControlIHave]): int =
var total = 0
for item in ihave:
Expand Down
25 changes: 13 additions & 12 deletions tests/pubsub/testgossipinternal.nim
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import ../../libp2p/crypto/crypto
import ../../libp2p/stream/bufferstream
import ../../libp2p/switch
import ../../libp2p/muxers/muxer
import ../../libp2p/protocols/pubsub/rpc/protobuf

import ../helpers

Expand Down Expand Up @@ -170,7 +171,7 @@ suite "GossipSub internal":
asyncTest "`replenishFanout` Degree Lo":
let gossipSub = TestGossipSub.init(newStandardSwitch())

proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} =
discard

let topic = "foobar"
Expand All @@ -197,7 +198,7 @@ suite "GossipSub internal":
asyncTest "`dropFanoutPeers` drop expired fanout topics":
let gossipSub = TestGossipSub.init(newStandardSwitch())

proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} =
discard

let topic = "foobar"
Expand Down Expand Up @@ -227,7 +228,7 @@ suite "GossipSub internal":
asyncTest "`dropFanoutPeers` leave unexpired fanout topics":
let gossipSub = TestGossipSub.init(newStandardSwitch())

proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} =
discard

let topic1 = "foobar1"
Expand Down Expand Up @@ -264,7 +265,7 @@ suite "GossipSub internal":
asyncTest "`getGossipPeers` - should gather up to degree D non intersecting peers":
let gossipSub = TestGossipSub.init(newStandardSwitch())

proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} =
discard

let topic = "foobar"
Expand Down Expand Up @@ -325,7 +326,7 @@ suite "GossipSub internal":
asyncTest "`getGossipPeers` - should not crash on missing topics in mesh":
let gossipSub = TestGossipSub.init(newStandardSwitch())

proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} =
discard

let topic = "foobar"
Expand Down Expand Up @@ -365,7 +366,7 @@ suite "GossipSub internal":
asyncTest "`getGossipPeers` - should not crash on missing topics in fanout":
let gossipSub = TestGossipSub.init(newStandardSwitch())

proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} =
discard

let topic = "foobar"
Expand Down Expand Up @@ -406,7 +407,7 @@ suite "GossipSub internal":
asyncTest "`getGossipPeers` - should not crash on missing topics in gossip":
let gossipSub = TestGossipSub.init(newStandardSwitch())

proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} =
discard

let topic = "foobar"
Expand Down Expand Up @@ -447,7 +448,7 @@ suite "GossipSub internal":
asyncTest "Drop messages of topics without subscription":
let gossipSub = TestGossipSub.init(newStandardSwitch())

proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} =
check false

let topic = "foobar"
Expand All @@ -470,7 +471,7 @@ suite "GossipSub internal":
let peer = gossipSub.getPubSubPeer(peerId)
inc seqno
let msg = Message.init(peerId, ("bar" & $i).toBytes(), topic, some(seqno))
await gossipSub.rpcHandler(peer, RPCMsg(messages: @[msg]))
await gossipSub.rpcHandler(peer, encodeRpcMsg(RPCMsg(messages: @[msg]), false))

check gossipSub.mcache.msgs.len == 0

Expand All @@ -481,7 +482,7 @@ suite "GossipSub internal":
let gossipSub = TestGossipSub.init(newStandardSwitch())
gossipSub.parameters.disconnectBadPeers = true
gossipSub.parameters.appSpecificWeight = 1.0
proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} =
check false

let topic = "foobar"
Expand Down Expand Up @@ -525,7 +526,7 @@ suite "GossipSub internal":
conn.peerId = peerId
let peer = gossipSub.getPubSubPeer(peerId)

await gossipSub.rpcHandler(peer, lotOfSubs)
await gossipSub.rpcHandler(peer, encodeRpcMsg(lotOfSubs, false))

check:
gossipSub.gossipsub.len == gossipSub.topicsHigh
Expand Down Expand Up @@ -656,7 +657,7 @@ suite "GossipSub internal":
asyncTest "handleIHave/Iwant tests":
let gossipSub = TestGossipSub.init(newStandardSwitch())

proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} =
check false
proc handler2(topic: string, data: seq[byte]) {.async.} = discard

Expand Down

0 comments on commit 397e2c3

Please sign in to comment.