Skip to content

Commit

Permalink
feat: write peers to datastore (#374)
Browse files Browse the repository at this point in the history
* write peers to datastore

* update docs
  • Loading branch information
minhd-vu authored Sep 19, 2024
1 parent 9ab6f2a commit 8c71a3c
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 0 deletions.
6 changes: 6 additions & 0 deletions cmd/p2p/sensor/sensor.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sensor

import (
"context"
"crypto/ecdsa"
"encoding/json"
"errors"
Expand Down Expand Up @@ -50,6 +51,7 @@ type (
ShouldWriteBlockEvents bool
ShouldWriteTransactions bool
ShouldWriteTransactionEvents bool
ShouldWritePeers bool
ShouldRunPprof bool
PprofPort uint
ShouldRunPrometheus bool
Expand Down Expand Up @@ -166,6 +168,7 @@ var SensorCmd = &cobra.Command{
ShouldWriteBlockEvents: inputSensorParams.ShouldWriteBlockEvents,
ShouldWriteTransactions: inputSensorParams.ShouldWriteTransactions,
ShouldWriteTransactionEvents: inputSensorParams.ShouldWriteTransactionEvents,
ShouldWritePeers: inputSensorParams.ShouldWritePeers,
TTL: inputSensorParams.TTL,
})

Expand Down Expand Up @@ -268,6 +271,8 @@ var SensorCmd = &cobra.Command{
if err := removePeerMessages(msgCounter, server.Peers()); err != nil {
log.Error().Err(err).Msg("Failed to clean up peer messages")
}

db.WritePeers(context.Background(), server.Peers())
case peer := <-opts.Peers:
// Update the peer list and the nodes file.
if _, ok := peers[peer.ID()]; !ok {
Expand Down Expand Up @@ -481,6 +486,7 @@ increase CPU and memory usage.`)
SensorCmd.Flags().BoolVar(&inputSensorParams.ShouldWriteTransactionEvents, "write-tx-events", true,
`Whether to write transaction events to the database. This option could
significantly increase CPU and memory usage.`)
SensorCmd.Flags().BoolVar(&inputSensorParams.ShouldWritePeers, "write-peers", true, "Whether to write peers to the database")
SensorCmd.Flags().BoolVar(&inputSensorParams.ShouldRunPprof, "pprof", false, "Whether to run pprof")
SensorCmd.Flags().UintVar(&inputSensorParams.PprofPort, "pprof-port", 6060, "Port pprof runs on")
SensorCmd.Flags().BoolVar(&inputSensorParams.ShouldRunPrometheus, "prom", true, "Whether to run Prometheus")
Expand Down
1 change: 1 addition & 0 deletions doc/polycli_p2p_sensor.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ If no nodes.json file exists, it will be created.
--ttl duration Time to live (default 336h0m0s)
--write-block-events Whether to write block events to the database (default true)
-B, --write-blocks Whether to write blocks to the database (default true)
--write-peers Whether to write peers to the database (default true)
--write-tx-events Whether to write transaction events to the database. This option could
significantly increase CPU and memory usage. (default true)
-t, --write-txs Whether to write transactions to the database. This option could significantly
Expand Down
5 changes: 5 additions & 0 deletions p2p/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
)

Expand Down Expand Up @@ -35,6 +36,9 @@ type Database interface {
// ShouldWriteTransactionEvents return true, respectively.
WriteTransactions(context.Context, *enode.Node, []*types.Transaction)

// WritePeers will write the connected peers to the database.
WritePeers(context.Context, []*p2p.Peer)

// HasBlock will return whether the block is in the database. If the database
// client has not been initialized this will always return true.
HasBlock(context.Context, common.Hash) bool
Expand All @@ -44,6 +48,7 @@ type Database interface {
ShouldWriteBlockEvents() bool
ShouldWriteTransactions() bool
ShouldWriteTransactionEvents() bool
ShouldWritePeers() bool

// NodeList will return a list of enode URLs.
NodeList(ctx context.Context, limit int) ([]string, error)
Expand Down
48 changes: 48 additions & 0 deletions p2p/database/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/rs/zerolog/log"
"google.golang.org/api/iterator"
Expand All @@ -21,6 +22,7 @@ const (
BlockEventsKind = "block_events"
TransactionsKind = "transactions"
TransactionEventsKind = "transaction_events"
PeersKind = "peers"
MaxAttempts = 10
)

Expand All @@ -34,6 +36,7 @@ type Datastore struct {
shouldWriteBlockEvents bool
shouldWriteTransactions bool
shouldWriteTransactionEvents bool
shouldWritePeers bool
jobs chan struct{}
ttl time.Duration
}
Expand Down Expand Up @@ -101,6 +104,13 @@ type DatastoreTransaction struct {
Type int16
}

type DatastorePeer struct {
URL string
LastSeenBy string
TimeLastSeen time.Time
TTL time.Time
}

// DatastoreOptions is used when creating a NewDatastore.
type DatastoreOptions struct {
ProjectID string
Expand All @@ -111,6 +121,7 @@ type DatastoreOptions struct {
ShouldWriteBlockEvents bool
ShouldWriteTransactions bool
ShouldWriteTransactionEvents bool
ShouldWritePeers bool
TTL time.Duration
}

Expand All @@ -130,6 +141,7 @@ func NewDatastore(ctx context.Context, opts DatastoreOptions) Database {
shouldWriteBlockEvents: opts.ShouldWriteBlockEvents,
shouldWriteTransactions: opts.ShouldWriteTransactions,
shouldWriteTransactionEvents: opts.ShouldWriteTransactionEvents,
shouldWritePeers: opts.ShouldWritePeers,
jobs: make(chan struct{}, opts.MaxConcurrency),
ttl: opts.TTL,
}
Expand Down Expand Up @@ -234,6 +246,38 @@ func (d *Datastore) WriteTransactions(ctx context.Context, peer *enode.Node, txs
}
}

// WritePeers writes the connected peers to datastore.
func (d *Datastore) WritePeers(ctx context.Context, peers []*p2p.Peer) {
if d.client == nil || !d.ShouldWritePeers() {
return
}

d.jobs <- struct{}{}
go func() {

keys := make([]*datastore.Key, 0, len(peers))
dsPeers := make([]*DatastorePeer, 0, len(peers))
now := time.Now()

for _, peer := range peers {
keys = append(keys, datastore.NameKey(PeersKind, peer.ID().String(), nil))
dsPeers = append(dsPeers, &DatastorePeer{
URL: peer.Node().URLv4(),
LastSeenBy: d.sensorID,
TimeLastSeen: now,
TTL: now.Add(d.ttl),
})
}

_, err := d.client.PutMulti(ctx, keys, dsPeers)
if err != nil {
log.Error().Err(err).Msg("Failed to write peers")
}

<-d.jobs
}()
}

func (d *Datastore) MaxConcurrentWrites() int {
return d.maxConcurrency
}
Expand All @@ -254,6 +298,10 @@ func (d *Datastore) ShouldWriteTransactionEvents() bool {
return d.shouldWriteTransactionEvents
}

func (d *Datastore) ShouldWritePeers() bool {
return d.shouldWritePeers
}

func (d *Datastore) HasBlock(ctx context.Context, hash common.Hash) bool {
if d.client == nil {
return true
Expand Down

0 comments on commit 8c71a3c

Please sign in to comment.