From e9eb37305adb43f4051a0bba0532d16c8f41bd70 Mon Sep 17 00:00:00 2001 From: Cedric Fung Date: Mon, 8 Jul 2024 05:23:42 +0000 Subject: [PATCH] send messages to all connections of a peer --- p2p/handle.go | 30 ++++++++++++------------------ p2p/peer.go | 29 +++++++++++++++++------------ 2 files changed, 29 insertions(+), 30 deletions(-) diff --git a/p2p/handle.go b/p2p/handle.go index f3d66e220..9d018390b 100644 --- a/p2p/handle.go +++ b/p2p/handle.go @@ -422,7 +422,7 @@ func parseNetworkMessage(version uint8, data []byte) (*PeerMessage, error) { } msg.Snapshot = snap.Snapshot case PeerMessageTypeRelay: - msg.Data = data[1:] + msg.Data = data case PeerMessageTypeConsumers: msg.Data = data[1:] } @@ -431,14 +431,14 @@ func parseNetworkMessage(version uint8, data []byte) (*PeerMessage, error) { func (me *Peer) relayOrHandlePeerMessage(relayerId crypto.Hash, msg *PeerMessage) error { logger.Verbosef("me.relayOrHandlePeerMessage(%s, %s) => %s %v", me.Address, me.IdForNetwork, relayerId, msg.Data) - if len(msg.Data) < 64 { + if len(msg.Data) < 65 { return nil } var from, to crypto.Hash - copy(from[:], msg.Data[:32]) - copy(to[:], msg.Data[32:64]) + copy(from[:], msg.Data[1:33]) + copy(to[:], msg.Data[33:65]) if to == me.IdForNetwork { - rm, err := parseNetworkMessage(msg.version, msg.Data[64:]) + rm, err := parseNetworkMessage(msg.version, msg.Data[65:]) logger.Verbosef("me.relayOrHandlePeerMessage.ME(%s, %s) => %s %v %v", me.Address, me.IdForNetwork, from, rm, err) if err != nil { return err @@ -450,21 +450,19 @@ func (me *Peer) relayOrHandlePeerMessage(relayerId crypto.Hash, msg *PeerMessage } var relayers []*Peer - peer := me.GetNeighbor(to) - if peer != nil { - relayers = []*Peer{peer} + if nbrs := me.GetNeighbors(to); len(nbrs) > 0 { + relayers = nbrs } else { relayers = me.GetRemoteRelayers(to) } - data := append([]byte{PeerMessageTypeRelay}, msg.Data...) - rk := crypto.Blake3Hash(data) + rk := crypto.Blake3Hash(msg.Data) rk = crypto.Blake3Hash(append(rk[:], []byte("REMOTE")...)) for _, peer := range relayers { if peer.IdForNetwork == relayerId { return nil } rk := crypto.Blake3Hash(append(rk[:], peer.IdForNetwork[:]...)) - success := me.offerToPeerWithCacheCheck(peer, MsgPriorityNormal, &ChanMsg{rk[:], data}) + success := me.offerToPeerWithCacheCheck(peer, MsgPriorityNormal, &ChanMsg{rk[:], msg.Data}) if !success { logger.Verbosef("me.offerToPeerWithCacheCheck(%s) relayer timeout\n", peer.IdForNetwork) } @@ -477,10 +475,6 @@ func (me *Peer) updateRemoteRelayerConsumers(relayerId crypto.Hash, data []byte) if !me.IsRelayer() { return nil } - relayer := me.GetNeighbor(relayerId) - if relayer == nil || !relayer.IsRelayer() { - return nil - } pl := len(crypto.Key{}) + 137 for c := len(data) / pl; c > 0; c-- { var id crypto.Hash @@ -492,7 +486,7 @@ func (me *Peer) updateRemoteRelayerConsumers(relayerId crypto.Hash, data []byte) if token.PeerId != id { panic(id) } - me.remoteRelayers.Add(id, relayer.IdForNetwork) + me.remoteRelayers.Add(id, relayerId) data = data[pl:] } return nil @@ -514,8 +508,8 @@ func (me *Peer) handlePeerMessage(peerId crypto.Hash, msg *PeerMessage) error { if err != nil { return err } - peer := me.GetNeighbor(peerId) - if peer != nil { + nbrs := me.GetNeighbors(peerId) + for _, peer := range nbrs { peer.syncRing.Offer(msg.Graph) } return nil diff --git a/p2p/peer.go b/p2p/peer.go index 50bc21bd6..cdd40f1c6 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -428,11 +428,13 @@ func (me *Peer) sendToPeer(to crypto.Hash, typ byte, key, data []byte, priority } me.sentMetric.handle(typ) - peer := me.GetNeighbor(to) - if peer != nil { - success := peer.offer(priority, &ChanMsg{key, data}) - if !success { - return fmt.Errorf("peer send %d timeout", priority) + nbrs := me.GetNeighbors(to) + if len(nbrs) > 0 { + for _, peer := range nbrs { + success := peer.offer(priority, &ChanMsg{key, data}) + if !success { + logger.Verbosef("peer.offer(%s) send timeout\n", peer.IdForNetwork) + } } return nil } @@ -463,12 +465,17 @@ func (me *Peer) sendSnapshotMessageToPeer(to crypto.Hash, snap crypto.Hash, typ return me.sendToPeer(to, typ, key, data, MsgPriorityNormal) } -func (me *Peer) GetNeighbor(key crypto.Hash) *Peer { +func (me *Peer) GetNeighbors(key crypto.Hash) []*Peer { + var nbrs []*Peer p := me.relayers.Get(key) if p != nil { - return p + nbrs = append(nbrs, p) } - return me.consumers.Get(key) + p = me.consumers.Get(key) + if p != nil { + nbrs = append(nbrs, p) + } + return nbrs } func (me *Peer) GetRemoteRelayers(key crypto.Hash) []*Peer { @@ -478,10 +485,8 @@ func (me *Peer) GetRemoteRelayers(key crypto.Hash) []*Peer { var relayers []*Peer ids := me.remoteRelayers.Get(key) for _, id := range ids { - p := me.GetNeighbor(id) - if p != nil { - relayers = append(relayers, p) - } + nbrs := me.GetNeighbors(id) + relayers = append(relayers, nbrs...) } return relayers }