diff --git a/ethmonitor/ethmonitor.go b/ethmonitor/ethmonitor.go index 78af347..b9b22af 100644 --- a/ethmonitor/ethmonitor.go +++ b/ethmonitor/ethmonitor.go @@ -10,7 +10,6 @@ import ( "sync/atomic" "time" - "github.com/0xsequence/ethkit" "github.com/0xsequence/ethkit/ethrpc" "github.com/0xsequence/ethkit/go-ethereum" "github.com/0xsequence/ethkit/go-ethereum/common" @@ -112,7 +111,7 @@ type Monitor struct { provider ethrpc.RawInterface chain *Chain - chainID *ethkit.Lazy[big.Int] + chainID *big.Int nextBlockNumber *big.Int nextBlockNumberMu sync.Mutex pollInterval atomic.Int64 @@ -151,14 +150,7 @@ func NewMonitor(provider ethrpc.RawInterface, options ...Options) (*Monitor, err } } - chainID := ethkit.NewLazy(func() *big.Int { - chainId, err := getChainID(provider) - if err != nil { - return nil - } - return chainId - }) - + var err error var cache cachestore.Store[[]byte] if opts.CacheBackend != nil { cache, err = cachestorectl.Open[[]byte](opts.CacheBackend, cachestore.WithLockExpiry(5*time.Second)) @@ -177,7 +169,7 @@ func NewMonitor(provider ethrpc.RawInterface, options ...Options) (*Monitor, err alert: opts.Alerter, provider: provider, chain: newChain(opts.BlockRetentionLimit, opts.Bootstrap), - chainID: chainID, + chainID: nil, cache: cache, publishCh: make(chan Blocks), publishQueue: newQueue(opts.BlockRetentionLimit * 2), @@ -185,6 +177,15 @@ func NewMonitor(provider ethrpc.RawInterface, options ...Options) (*Monitor, err }, nil } +func (m *Monitor) lazyInit(ctx context.Context) error { + var err error + m.chainID, err = getChainID(ctx, m.provider) + if err != nil { + return err + } + return nil +} + func (m *Monitor) Run(ctx context.Context) error { if m.IsRunning() { return fmt.Errorf("ethmonitor: already running") @@ -195,6 +196,10 @@ func (m *Monitor) Run(ctx context.Context) error { atomic.StoreInt32(&m.running, 1) defer atomic.StoreInt32(&m.running, 0) + if err := m.lazyInit(ctx); err != nil { + return err + } + // Check if in bootstrap mode -- in which case we expect nextBlockNumber // to already be set. if m.options.Bootstrap && m.chain.blocks == nil { @@ -615,7 +620,7 @@ func (m *Monitor) filterLogs(ctx context.Context, blockHash common.Hash, topics topicsDigest.Write([]byte{'\n'}) } - key := fmt.Sprintf("ethmonitor:%s:Logs:hash=%s;topics=%d", m.getChainID().String(), blockHash.String(), topicsDigest.Sum64()) + key := fmt.Sprintf("ethmonitor:%s:Logs:hash=%s;topics=%d", m.chainID.String(), blockHash.String(), topicsDigest.Sum64()) resp, err := m.cache.GetOrSetWithLockEx(ctx, key, getter, m.options.CacheExpiry) if err != nil { return nil, resp, err @@ -819,7 +824,7 @@ func (m *Monitor) fetchBlockByHash(ctx context.Context, hash common.Hash) (*type } // fetch with distributed mutex - key := fmt.Sprintf("ethmonitor:%s:BlockHash:%s", m.getChainID().String(), hash.String()) + key := fmt.Sprintf("ethmonitor:%s:BlockHash:%s", m.chainID.String(), hash.String()) resp, err := m.cache.GetOrSetWithLockEx(ctx, key, getter, m.options.CacheExpiry) if err != nil { return nil, nil, err @@ -1026,18 +1031,10 @@ func (m *Monitor) setPayload(value []byte) []byte { } } -func (m *Monitor) getChainID() *big.Int { - if val := m.chainID.Get(); val == nil { - return big.NewInt(-1) - } else { - return val - } -} - -func getChainID(provider ethrpc.Interface) (*big.Int, error) { +func getChainID(ctx context.Context, provider ethrpc.Interface) (*big.Int, error) { var chainID *big.Int - err := breaker.Do(context.Background(), func() error { - ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second) + err := breaker.Do(ctx, func() error { + ctx, cancel := context.WithTimeout(ctx, 4*time.Second) defer cancel() id, err := provider.ChainID(ctx) diff --git a/ethreceipts/ethreceipts.go b/ethreceipts/ethreceipts.go index e0d740a..8cfd9f2 100644 --- a/ethreceipts/ethreceipts.go +++ b/ethreceipts/ethreceipts.go @@ -134,20 +134,6 @@ func NewReceiptsListener(log logger.Logger, provider ethrpc.Interface, monitor * return nil, err } - if opts.NumBlocksToFinality <= 0 { - chainID, err := getChainID(provider) - if err != nil { - chainID = big.NewInt(1) // assume mainnet in case of unlikely error - } - network, ok := ethrpc.Networks[chainID.Uint64()] - if ok { - opts.NumBlocksToFinality = network.NumBlocksToFinality - } - } - if opts.NumBlocksToFinality <= 0 { - opts.NumBlocksToFinality = 1 // absolute min is 1 - } - return &ReceiptsListener{ options: opts, log: log, @@ -164,6 +150,25 @@ func NewReceiptsListener(log logger.Logger, provider ethrpc.Interface, monitor * }, nil } +func (l *ReceiptsListener) lazyInit(ctx context.Context) error { + if l.options.NumBlocksToFinality <= 0 { + chainID, err := getChainID(ctx, l.provider) + if err != nil { + chainID = big.NewInt(1) // assume mainnet in case of unlikely error + } + network, ok := ethrpc.Networks[chainID.Uint64()] + if ok { + l.options.NumBlocksToFinality = network.NumBlocksToFinality + } + } + + if l.options.NumBlocksToFinality <= 0 { + l.options.NumBlocksToFinality = 1 // absolute min is 1 + } + + return nil +} + func (l *ReceiptsListener) Run(ctx context.Context) error { if l.IsRunning() { return fmt.Errorf("ethreceipts: already running") @@ -174,6 +179,10 @@ func (l *ReceiptsListener) Run(ctx context.Context) error { atomic.StoreInt32(&l.running, 1) defer atomic.StoreInt32(&l.running, 0) + if err := l.lazyInit(ctx); err != nil { + return err + } + l.log.Info("ethreceipts: running") return l.listener() @@ -801,10 +810,10 @@ func (l *ReceiptsListener) latestBlockNum() *big.Int { return latestBlockNum } -func getChainID(provider ethrpc.Interface) (*big.Int, error) { +func getChainID(ctx context.Context, provider ethrpc.Interface) (*big.Int, error) { var chainID *big.Int - err := breaker.Do(context.Background(), func() error { - id, err := provider.ChainID(context.Background()) + err := breaker.Do(ctx, func() error { + id, err := provider.ChainID(ctx) if err != nil { return err } diff --git a/types.go b/types.go index 34e29c0..c8f7ade 100644 --- a/types.go +++ b/types.go @@ -27,19 +27,3 @@ func ToSliceValues[T any](in []*T) []T { } return out } - -type Lazy[T any] struct { - once func() *T - val *T -} - -func NewLazy[T any](once func() *T) *Lazy[T] { - return &Lazy[T]{once: once} -} - -func (l *Lazy[T]) Get() *T { - if l.val == nil { - l.val = l.once() - } - return l.val -}