Skip to content

Commit

Permalink
Add GossipSub ping (#912)
Browse files Browse the repository at this point in the history
  • Loading branch information
Menduist authored Jun 21, 2023
1 parent 224f92e commit 1c4d083
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 0 deletions.
4 changes: 4 additions & 0 deletions libp2p/protocols/pubsub/gossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ method onNewPeer(g: GossipSub, peer: PubSubPeer) =
peer.behaviourPenalty = stats.behaviourPenalty

peer.iHaveBudget = IHavePeerBudget
peer.pingBudget = PingsPeerBudget

method onPubSubPeerEvent*(p: GossipSub, peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe.} =
case event.kind
Expand Down Expand Up @@ -352,6 +353,9 @@ proc validateAndRelay(g: GossipSub,
method rpcHandler*(g: GossipSub,
peer: PubSubPeer,
rpcMsg: RPCMsg) {.async.} =
if rpcMsg.ping.len in 1..<64 and peer.pingBudget > 0:
g.send(peer, RPCMsg(pong: rpcMsg.ping))
peer.pingBudget.dec
for i in 0..<min(g.topicsHigh, rpcMsg.subscriptions.len):
template sub: untyped = rpcMsg.subscriptions[i]
g.handleSubscribe(peer, sub.topic, sub.subscribe)
Expand Down
1 change: 1 addition & 0 deletions libp2p/protocols/pubsub/gossipsub/behavior.nim
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,7 @@ proc onHeartbeat(g: GossipSub) {.raises: [].} =
if peer.sentIHaves.len > g.parameters.historyLength:
discard peer.sentIHaves.popLast()
peer.iHaveBudget = IHavePeerBudget
peer.pingBudget = PingsPeerBudget

var meshMetrics = MeshMetrics()

Expand Down
1 change: 1 addition & 0 deletions libp2p/protocols/pubsub/gossipsub/types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const

const
BackoffSlackTime* = 2 # seconds
PingsPeerBudget* = 100 # maximum of 6.4kb/heartbeat (6.4kb/s with default 1 second/hb)
IHavePeerBudget* = 10
# the max amount of IHave to expose, not by spec, but go as example
# rust sigp: https://github.com/sigp/rust-libp2p/blob/f53d02bc873fef2bf52cd31e3d5ce366a41d8a8c/protocols/gossipsub/src/config.rs#L572
Expand Down
1 change: 1 addition & 0 deletions libp2p/protocols/pubsub/pubsubpeer.nim
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type
score*: float64
sentIHaves*: Deque[HashSet[MessageId]]
iHaveBudget*: int
pingBudget*: int
maxMessageSize: int
appScore*: float64 # application specific score
behaviourPenalty*: float64 # the eventual penalty score
Expand Down
2 changes: 2 additions & 0 deletions libp2p/protocols/pubsub/rpc/messages.nim
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ type
subscriptions*: seq[SubOpts]
messages*: seq[Message]
control*: Option[ControlMessage]
ping*: seq[byte]
pong*: seq[byte]

func withSubs*(
T: type RPCMsg, topics: openArray[string], subscribe: bool): T =
Expand Down
8 changes: 8 additions & 0 deletions libp2p/protocols/pubsub/rpc/protobuf.nim
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,12 @@ proc encodeRpcMsg*(msg: RPCMsg, anonymize: bool): seq[byte] =
pb.write(2, item, anonymize)
if msg.control.isSome():
pb.write(3, msg.control.get())
# nim-libp2p extension, using fields which are unlikely to be used
# by other extensions
if msg.ping.len > 0:
pb.write(60, msg.ping)
if msg.pong.len > 0:
pb.write(61, msg.pong)
if len(pb.buffer) > 0:
pb.finish()
pb.buffer
Expand All @@ -327,4 +333,6 @@ proc decodeRpcMsg*(msg: seq[byte]): ProtoResult[RPCMsg] {.inline.} =
assign(rpcMsg.get().messages, ? pb.decodeMessages())
assign(rpcMsg.get().subscriptions, ? pb.decodeSubscriptions())
assign(rpcMsg.get().control, ? pb.decodeControl())
discard ? pb.getField(60, rpcMsg.get().ping)
discard ? pb.getField(61, rpcMsg.get().pong)
rpcMsg

0 comments on commit 1c4d083

Please sign in to comment.