Skip to content

Commit

Permalink
ethmonitor & ethreceipts lazy initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
marino39 committed Apr 5, 2024
1 parent 3386ca0 commit 17c5e4c
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 57 deletions.
45 changes: 21 additions & 24 deletions ethmonitor/ethmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"sync/atomic"
"time"

"github.com/0xsequence/ethkit"
"github.com/0xsequence/ethkit/ethrpc"
"github.com/0xsequence/ethkit/go-ethereum"
"github.com/0xsequence/ethkit/go-ethereum/common"
Expand Down Expand Up @@ -112,7 +111,7 @@ type Monitor struct {
provider ethrpc.RawInterface

chain *Chain
chainID *ethkit.Lazy[big.Int]
chainID *big.Int
nextBlockNumber *big.Int
nextBlockNumberMu sync.Mutex
pollInterval atomic.Int64
Expand Down Expand Up @@ -151,14 +150,7 @@ func NewMonitor(provider ethrpc.RawInterface, options ...Options) (*Monitor, err
}
}

chainID := ethkit.NewLazy(func() *big.Int {
chainId, err := getChainID(provider)
if err != nil {
return nil
}
return chainId
})

var err error
var cache cachestore.Store[[]byte]
if opts.CacheBackend != nil {
cache, err = cachestorectl.Open[[]byte](opts.CacheBackend, cachestore.WithLockExpiry(5*time.Second))
Expand All @@ -177,14 +169,23 @@ func NewMonitor(provider ethrpc.RawInterface, options ...Options) (*Monitor, err
alert: opts.Alerter,
provider: provider,
chain: newChain(opts.BlockRetentionLimit, opts.Bootstrap),
chainID: chainID,
chainID: nil,
cache: cache,
publishCh: make(chan Blocks),
publishQueue: newQueue(opts.BlockRetentionLimit * 2),
subscribers: make([]*subscriber, 0),
}, nil
}

func (m *Monitor) lazyInit(ctx context.Context) error {
var err error
m.chainID, err = getChainID(ctx, m.provider)
if err != nil {
return err
}
return nil
}

func (m *Monitor) Run(ctx context.Context) error {
if m.IsRunning() {
return fmt.Errorf("ethmonitor: already running")
Expand All @@ -195,6 +196,10 @@ func (m *Monitor) Run(ctx context.Context) error {
atomic.StoreInt32(&m.running, 1)
defer atomic.StoreInt32(&m.running, 0)

if err := m.lazyInit(ctx); err != nil {
return err
}

// Check if in bootstrap mode -- in which case we expect nextBlockNumber
// to already be set.
if m.options.Bootstrap && m.chain.blocks == nil {
Expand Down Expand Up @@ -615,7 +620,7 @@ func (m *Monitor) filterLogs(ctx context.Context, blockHash common.Hash, topics
topicsDigest.Write([]byte{'\n'})
}

key := fmt.Sprintf("ethmonitor:%s:Logs:hash=%s;topics=%d", m.getChainID().String(), blockHash.String(), topicsDigest.Sum64())
key := fmt.Sprintf("ethmonitor:%s:Logs:hash=%s;topics=%d", m.chainID.String(), blockHash.String(), topicsDigest.Sum64())
resp, err := m.cache.GetOrSetWithLockEx(ctx, key, getter, m.options.CacheExpiry)
if err != nil {
return nil, resp, err
Expand Down Expand Up @@ -819,7 +824,7 @@ func (m *Monitor) fetchBlockByHash(ctx context.Context, hash common.Hash) (*type
}

// fetch with distributed mutex
key := fmt.Sprintf("ethmonitor:%s:BlockHash:%s", m.getChainID().String(), hash.String())
key := fmt.Sprintf("ethmonitor:%s:BlockHash:%s", m.chainID.String(), hash.String())
resp, err := m.cache.GetOrSetWithLockEx(ctx, key, getter, m.options.CacheExpiry)
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -1026,18 +1031,10 @@ func (m *Monitor) setPayload(value []byte) []byte {
}
}

func (m *Monitor) getChainID() *big.Int {
if val := m.chainID.Get(); val == nil {
return big.NewInt(-1)
} else {
return val
}
}

func getChainID(provider ethrpc.Interface) (*big.Int, error) {
func getChainID(ctx context.Context, provider ethrpc.Interface) (*big.Int, error) {
var chainID *big.Int
err := breaker.Do(context.Background(), func() error {
ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
err := breaker.Do(ctx, func() error {
ctx, cancel := context.WithTimeout(ctx, 4*time.Second)
defer cancel()

id, err := provider.ChainID(ctx)
Expand Down
43 changes: 26 additions & 17 deletions ethreceipts/ethreceipts.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,20 +134,6 @@ func NewReceiptsListener(log logger.Logger, provider ethrpc.Interface, monitor *
return nil, err
}

if opts.NumBlocksToFinality <= 0 {
chainID, err := getChainID(provider)
if err != nil {
chainID = big.NewInt(1) // assume mainnet in case of unlikely error
}
network, ok := ethrpc.Networks[chainID.Uint64()]
if ok {
opts.NumBlocksToFinality = network.NumBlocksToFinality
}
}
if opts.NumBlocksToFinality <= 0 {
opts.NumBlocksToFinality = 1 // absolute min is 1
}

return &ReceiptsListener{
options: opts,
log: log,
Expand All @@ -164,6 +150,25 @@ func NewReceiptsListener(log logger.Logger, provider ethrpc.Interface, monitor *
}, nil
}

func (l *ReceiptsListener) lazyInit(ctx context.Context) error {
if l.options.NumBlocksToFinality <= 0 {
chainID, err := getChainID(ctx, l.provider)
if err != nil {
chainID = big.NewInt(1) // assume mainnet in case of unlikely error
}
network, ok := ethrpc.Networks[chainID.Uint64()]
if ok {
l.options.NumBlocksToFinality = network.NumBlocksToFinality
}
}

if l.options.NumBlocksToFinality <= 0 {
l.options.NumBlocksToFinality = 1 // absolute min is 1
}

return nil
}

func (l *ReceiptsListener) Run(ctx context.Context) error {
if l.IsRunning() {
return fmt.Errorf("ethreceipts: already running")
Expand All @@ -174,6 +179,10 @@ func (l *ReceiptsListener) Run(ctx context.Context) error {
atomic.StoreInt32(&l.running, 1)
defer atomic.StoreInt32(&l.running, 0)

if err := l.lazyInit(ctx); err != nil {
return err
}

l.log.Info("ethreceipts: running")

return l.listener()
Expand Down Expand Up @@ -801,10 +810,10 @@ func (l *ReceiptsListener) latestBlockNum() *big.Int {
return latestBlockNum
}

func getChainID(provider ethrpc.Interface) (*big.Int, error) {
func getChainID(ctx context.Context, provider ethrpc.Interface) (*big.Int, error) {
var chainID *big.Int
err := breaker.Do(context.Background(), func() error {
id, err := provider.ChainID(context.Background())
err := breaker.Do(ctx, func() error {
id, err := provider.ChainID(ctx)
if err != nil {
return err
}
Expand Down
16 changes: 0 additions & 16 deletions types.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,3 @@ func ToSliceValues[T any](in []*T) []T {
}
return out
}

type Lazy[T any] struct {
once func() *T
val *T
}

func NewLazy[T any](once func() *T) *Lazy[T] {
return &Lazy[T]{once: once}
}

func (l *Lazy[T]) Get() *T {
if l.val == nil {
l.val = l.once()
}
return l.val
}

0 comments on commit 17c5e4c

Please sign in to comment.