From 2002ac36e8ee0fd03e385945cb5f95dad08909f0 Mon Sep 17 00:00:00 2001 From: Roman Behma <13855864+begmaroman@users.noreply.github.com> Date: Tue, 9 Apr 2024 09:20:45 -0300 Subject: [PATCH 1/4] Added polling mechanism to sequencer tracker (#77) --- .mockery.yaml | 9 -- cmd/main.go | 25 ++-- config/config.go | 16 +-- config/config_test.go | 6 +- config/default.go | 4 +- docs/running.md | 4 +- etherman/etherman.go | 31 +++-- mocks/eth_client.generated.go | 161 -------------------------- mocks/eth_client_factory.generated.go | 96 --------------- mocks/etherman.generated.go | 119 +++++++++++++++++++ sequencer/tracker.go | 156 ++++++++++++++++++++++--- sequencer/tracker_test.go | 43 ++++++- synchronizer/init.go | 36 +++--- synchronizer/init_test.go | 71 ++++-------- synchronizer/reorg.go | 3 +- test/config/test.dev.toml | 4 +- test/config/test.docker.toml | 4 +- test/config/test.local.toml | 4 +- test/e2e/datacommittee_test.go | 4 +- test/e2e/sequencer_tracker_test.go | 55 ++++++--- types/factories.go | 34 ------ 21 files changed, 431 insertions(+), 454 deletions(-) delete mode 100644 mocks/eth_client.generated.go delete mode 100644 mocks/eth_client_factory.generated.go delete mode 100644 types/factories.go diff --git a/.mockery.yaml b/.mockery.yaml index 83b1530c..0256cef9 100644 --- a/.mockery.yaml +++ b/.mockery.yaml @@ -36,12 +36,3 @@ packages: SequencerTracker: config: filename: sequencer_tracker.generated.go - github.com/0xPolygon/cdk-data-availability/types: - config: - interfaces: - EthClient: - config: - filename: eth_client.generated.go - EthClientFactory: - config: - filename: eth_client_factory.generated.go \ No newline at end of file diff --git a/cmd/main.go b/cmd/main.go index 2a787602..dcd1eef0 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -19,7 +19,7 @@ import ( "github.com/0xPolygon/cdk-data-availability/services/status" "github.com/0xPolygon/cdk-data-availability/services/sync" "github.com/0xPolygon/cdk-data-availability/synchronizer" - "github.com/0xPolygon/cdk-data-availability/types" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" _ "github.com/lib/pq" "github.com/urfave/cli/v2" @@ -99,11 +99,13 @@ func start(cliCtx *cli.Context) error { log.Fatal(err) } - // derive address - selfAddr := crypto.PubkeyToAddress(pk.PublicKey) - // ensure synchro/reorg start block is set - err = synchronizer.InitStartBlock(storage, types.NewEthClientFactory(), c.L1) + err = synchronizer.InitStartBlock( + storage, + etm, + c.L1.GenesisBlock, + common.HexToAddress(c.L1.PolygonValidiumAddress), + ) if err != nil { log.Fatal(err) } @@ -114,7 +116,7 @@ func start(cliCtx *cli.Context) error { go sequencerTracker.Start(cliCtx.Context) cancelFuncs = append(cancelFuncs, sequencerTracker.Stop) - detector, err := synchronizer.NewReorgDetector(c.L1.RpcURL, 1*time.Second) + detector, err := synchronizer.NewReorgDetector(c.L1.RpcURL, time.Second) if err != nil { log.Fatal(err) } @@ -125,8 +127,15 @@ func start(cliCtx *cli.Context) error { cancelFuncs = append(cancelFuncs, detector.Stop) - batchSynchronizer, err := synchronizer.NewBatchSynchronizer(c.L1, selfAddr, - storage, detector.Subscribe(), etm, sequencerTracker, client.NewFactory()) + batchSynchronizer, err := synchronizer.NewBatchSynchronizer( + c.L1, + crypto.PubkeyToAddress(pk.PublicKey), + storage, + detector.Subscribe(), + etm, + sequencerTracker, + client.NewFactory(), + ) if err != nil { log.Fatal(err) } diff --git a/config/config.go b/config/config.go index 997be95b..5835e357 100644 --- a/config/config.go +++ b/config/config.go @@ -32,14 +32,14 @@ type Config struct { // L1Config is a struct that defines L1 contract and service settings type L1Config struct { - WsURL string `mapstructure:"WsURL"` - RpcURL string `mapstructure:"RpcURL"` - PolygonValidiumAddress string `mapstructure:"PolygonValidiumAddress"` - DataCommitteeAddress string `mapstructure:"DataCommitteeAddress"` - Timeout types.Duration `mapstructure:"Timeout"` - RetryPeriod types.Duration `mapstructure:"RetryPeriod"` - BlockBatchSize uint `mapstructure:"BlockBatchSize"` - TrackSequencer bool `mapstructure:"TrackSequencer"` + RpcURL string `mapstructure:"RpcURL"` + PolygonValidiumAddress string `mapstructure:"PolygonValidiumAddress"` + DataCommitteeAddress string `mapstructure:"DataCommitteeAddress"` + Timeout types.Duration `mapstructure:"Timeout"` + RetryPeriod types.Duration `mapstructure:"RetryPeriod"` + BlockBatchSize uint `mapstructure:"BlockBatchSize"` + TrackSequencer bool `mapstructure:"TrackSequencer"` + TrackSequencerPollInterval types.Duration `mapstructure:"TrackSequencerPollInterval"` // GenesisBlock represents the block number where PolygonValidium contract is deployed on L1 GenesisBlock uint64 `mapstructure:"GenesisBlock"` diff --git a/config/config_test.go b/config/config_test.go index b0178fd1..b850a248 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -19,13 +19,9 @@ func Test_Defaults(t *testing.T) { path string expectedValue interface{} }{ - { - path: "L1.WsURL", - expectedValue: "ws://127.0.0.1:8546", - }, { path: "L1.RpcURL", - expectedValue: "http://127.0.0.1:8545", + expectedValue: "ws://127.0.0.1:8546", }, { path: "L1.PolygonValidiumAddress", diff --git a/config/default.go b/config/default.go index 75f7f007..5da5f4e7 100644 --- a/config/default.go +++ b/config/default.go @@ -12,8 +12,7 @@ const DefaultValues = ` PrivateKey = {Path = "/pk/test-member.keystore", Password = "testonly"} [L1] -WsURL = "ws://127.0.0.1:8546" -RpcURL = "http://127.0.0.1:8545" +RpcURL = "ws://127.0.0.1:8546" PolygonValidiumAddress = "0x8dAF17A20c9DBA35f005b6324F493785D239719d" DataCommitteeAddress = "0x68B1D87F95878fE05B998F19b66F4baba5De1aed" Timeout = "1m" @@ -21,6 +20,7 @@ RetryPeriod = "5s" BlockBatchSize = "64" GenesisBlock = "0" TrackSequencer = true +TrackSequencerPollInterval = "1m" [Log] Environment = "development" # "production" or "development" diff --git a/docs/running.md b/docs/running.md index 1eca0ae9..4a9eae81 100644 --- a/docs/running.md +++ b/docs/running.md @@ -76,14 +76,14 @@ services: PrivateKey = {Path = "/pk/test-member.keystore", Password = "testonly"} # CHANGE THIS (the password): according to the private key file password [L1] -WsURL = "ws://URLofYourL1Node:8546" # CHANGE THIS: use the URL of your L1 node -RpcURL = "http://URLofYourL1Node:8545" # CHANGE THIS: use the URL of your L1 node +RpcURL = "http://URLofYourL1Node:8545" # CHANGE THIS: use the URL of your L1 node, can be http(s) or ws(s) PolygonValidiumAddress = "0x8dAF17A20c9DBA35f005b6324F493785D239719d" # CHANGE THIS: Address of the Validium smart contract DataCommitteeAddress = "0x68B1D87F95878fE05B998F19b66F4baba5De1aed" # CHANGE THIS: Address of the data availability committee smart contract Timeout = "3m" RetryPeriod = "5s" BlockBatchSize = 32 TrackSequencer = true +TrackSequencerPollInterval = "1m" [Log] Environment = "development" # "production" or "development" diff --git a/etherman/etherman.go b/etherman/etherman.go index 28b033a3..1d407095 100644 --- a/etherman/etherman.go +++ b/etherman/etherman.go @@ -31,9 +31,13 @@ type DataCommittee struct { // Etherman defines functions that should be implemented by Etherman type Etherman interface { + GetTx(ctx context.Context, txHash common.Hash) (*types.Transaction, bool, error) + HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) + BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) + CodeAt(ctx context.Context, account common.Address, blockNumber *big.Int) ([]byte, error) + GetCurrentDataCommittee() (*DataCommittee, error) GetCurrentDataCommitteeMembers() ([]DataCommitteeMember, error) - GetTx(ctx context.Context, txHash common.Hash) (*types.Transaction, bool, error) TrustedSequencer(ctx context.Context) (common.Address, error) WatchSetTrustedSequencer( ctx context.Context, @@ -44,7 +48,6 @@ type Etherman interface { ctx context.Context, events chan *polygonvalidium.PolygonvalidiumSetTrustedSequencerURL, ) (event.Subscription, error) - HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) FilterSequenceBatches( opts *bind.FilterOpts, numBatch []uint64, @@ -63,9 +66,9 @@ func New(ctx context.Context, cfg config.L1Config) (Etherman, error) { ctx, cancel := context.WithTimeout(ctx, cfg.Timeout.Duration) defer cancel() - ethClient, err := ethclient.DialContext(ctx, cfg.WsURL) + ethClient, err := ethclient.DialContext(ctx, cfg.RpcURL) if err != nil { - log.Errorf("error connecting to %s: %+v", cfg.WsURL, err) + log.Errorf("error connecting to %s: %+v", cfg.RpcURL, err) return nil, err } @@ -97,6 +100,21 @@ func (e *etherman) GetTx(ctx context.Context, txHash common.Hash) (*types.Transa return e.EthClient.TransactionByHash(ctx, txHash) } +// HeaderByNumber returns header by number from the eth client +func (e *etherman) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { + return e.EthClient.HeaderByNumber(ctx, number) +} + +// BlockByNumber returns a block by the given number +func (e *etherman) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) { + return e.EthClient.BlockByNumber(ctx, number) +} + +// CodeAt returns the contract code of the given account. +func (e *etherman) CodeAt(ctx context.Context, account common.Address, blockNumber *big.Int) ([]byte, error) { + return e.EthClient.CodeAt(ctx, account, blockNumber) +} + // TrustedSequencer gets trusted sequencer address func (e *etherman) TrustedSequencer(ctx context.Context) (common.Address, error) { return e.CDKValidium.TrustedSequencer(&bind.CallOpts{ @@ -129,11 +147,6 @@ func (e *etherman) WatchSetTrustedSequencerURL( return e.CDKValidium.WatchSetTrustedSequencerURL(&bind.WatchOpts{Context: ctx}, events) } -// HeaderByNumber returns header by number from the eth client -func (e *etherman) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { - return e.EthClient.HeaderByNumber(ctx, number) -} - // FilterSequenceBatches retrieves filtered batches on CDK validium func (e *etherman) FilterSequenceBatches(opts *bind.FilterOpts, numBatch []uint64) (*polygonvalidium.PolygonvalidiumSequenceBatchesIterator, error) { diff --git a/mocks/eth_client.generated.go b/mocks/eth_client.generated.go deleted file mode 100644 index 2cbfe779..00000000 --- a/mocks/eth_client.generated.go +++ /dev/null @@ -1,161 +0,0 @@ -// Code generated by mockery v2.40.1. DO NOT EDIT. - -package mocks - -import ( - big "math/big" - - common "github.com/ethereum/go-ethereum/common" - - context "context" - - mock "github.com/stretchr/testify/mock" - - types "github.com/ethereum/go-ethereum/core/types" -) - -// EthClient is an autogenerated mock type for the EthClient type -type EthClient struct { - mock.Mock -} - -type EthClient_Expecter struct { - mock *mock.Mock -} - -func (_m *EthClient) EXPECT() *EthClient_Expecter { - return &EthClient_Expecter{mock: &_m.Mock} -} - -// BlockByNumber provides a mock function with given fields: ctx, number -func (_m *EthClient) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) { - ret := _m.Called(ctx, number) - - if len(ret) == 0 { - panic("no return value specified for BlockByNumber") - } - - var r0 *types.Block - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, *big.Int) (*types.Block, error)); ok { - return rf(ctx, number) - } - if rf, ok := ret.Get(0).(func(context.Context, *big.Int) *types.Block); ok { - r0 = rf(ctx, number) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*types.Block) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, *big.Int) error); ok { - r1 = rf(ctx, number) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// EthClient_BlockByNumber_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'BlockByNumber' -type EthClient_BlockByNumber_Call struct { - *mock.Call -} - -// BlockByNumber is a helper method to define mock.On call -// - ctx context.Context -// - number *big.Int -func (_e *EthClient_Expecter) BlockByNumber(ctx interface{}, number interface{}) *EthClient_BlockByNumber_Call { - return &EthClient_BlockByNumber_Call{Call: _e.mock.On("BlockByNumber", ctx, number)} -} - -func (_c *EthClient_BlockByNumber_Call) Run(run func(ctx context.Context, number *big.Int)) *EthClient_BlockByNumber_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(*big.Int)) - }) - return _c -} - -func (_c *EthClient_BlockByNumber_Call) Return(_a0 *types.Block, _a1 error) *EthClient_BlockByNumber_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *EthClient_BlockByNumber_Call) RunAndReturn(run func(context.Context, *big.Int) (*types.Block, error)) *EthClient_BlockByNumber_Call { - _c.Call.Return(run) - return _c -} - -// CodeAt provides a mock function with given fields: ctx, account, blockNumber -func (_m *EthClient) CodeAt(ctx context.Context, account common.Address, blockNumber *big.Int) ([]byte, error) { - ret := _m.Called(ctx, account, blockNumber) - - if len(ret) == 0 { - panic("no return value specified for CodeAt") - } - - var r0 []byte - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, common.Address, *big.Int) ([]byte, error)); ok { - return rf(ctx, account, blockNumber) - } - if rf, ok := ret.Get(0).(func(context.Context, common.Address, *big.Int) []byte); ok { - r0 = rf(ctx, account, blockNumber) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]byte) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, common.Address, *big.Int) error); ok { - r1 = rf(ctx, account, blockNumber) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// EthClient_CodeAt_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CodeAt' -type EthClient_CodeAt_Call struct { - *mock.Call -} - -// CodeAt is a helper method to define mock.On call -// - ctx context.Context -// - account common.Address -// - blockNumber *big.Int -func (_e *EthClient_Expecter) CodeAt(ctx interface{}, account interface{}, blockNumber interface{}) *EthClient_CodeAt_Call { - return &EthClient_CodeAt_Call{Call: _e.mock.On("CodeAt", ctx, account, blockNumber)} -} - -func (_c *EthClient_CodeAt_Call) Run(run func(ctx context.Context, account common.Address, blockNumber *big.Int)) *EthClient_CodeAt_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(common.Address), args[2].(*big.Int)) - }) - return _c -} - -func (_c *EthClient_CodeAt_Call) Return(_a0 []byte, _a1 error) *EthClient_CodeAt_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *EthClient_CodeAt_Call) RunAndReturn(run func(context.Context, common.Address, *big.Int) ([]byte, error)) *EthClient_CodeAt_Call { - _c.Call.Return(run) - return _c -} - -// NewEthClient creates a new instance of EthClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewEthClient(t interface { - mock.TestingT - Cleanup(func()) -}) *EthClient { - mock := &EthClient{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} diff --git a/mocks/eth_client_factory.generated.go b/mocks/eth_client_factory.generated.go deleted file mode 100644 index 6cbfd230..00000000 --- a/mocks/eth_client_factory.generated.go +++ /dev/null @@ -1,96 +0,0 @@ -// Code generated by mockery v2.40.1. DO NOT EDIT. - -package mocks - -import ( - context "context" - - types "github.com/0xPolygon/cdk-data-availability/types" - mock "github.com/stretchr/testify/mock" -) - -// EthClientFactory is an autogenerated mock type for the EthClientFactory type -type EthClientFactory struct { - mock.Mock -} - -type EthClientFactory_Expecter struct { - mock *mock.Mock -} - -func (_m *EthClientFactory) EXPECT() *EthClientFactory_Expecter { - return &EthClientFactory_Expecter{mock: &_m.Mock} -} - -// CreateEthClient provides a mock function with given fields: ctx, url -func (_m *EthClientFactory) CreateEthClient(ctx context.Context, url string) (types.EthClient, error) { - ret := _m.Called(ctx, url) - - if len(ret) == 0 { - panic("no return value specified for CreateEthClient") - } - - var r0 types.EthClient - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, string) (types.EthClient, error)); ok { - return rf(ctx, url) - } - if rf, ok := ret.Get(0).(func(context.Context, string) types.EthClient); ok { - r0 = rf(ctx, url) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(types.EthClient) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { - r1 = rf(ctx, url) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// EthClientFactory_CreateEthClient_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CreateEthClient' -type EthClientFactory_CreateEthClient_Call struct { - *mock.Call -} - -// CreateEthClient is a helper method to define mock.On call -// - ctx context.Context -// - url string -func (_e *EthClientFactory_Expecter) CreateEthClient(ctx interface{}, url interface{}) *EthClientFactory_CreateEthClient_Call { - return &EthClientFactory_CreateEthClient_Call{Call: _e.mock.On("CreateEthClient", ctx, url)} -} - -func (_c *EthClientFactory_CreateEthClient_Call) Run(run func(ctx context.Context, url string)) *EthClientFactory_CreateEthClient_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string)) - }) - return _c -} - -func (_c *EthClientFactory_CreateEthClient_Call) Return(_a0 types.EthClient, _a1 error) *EthClientFactory_CreateEthClient_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *EthClientFactory_CreateEthClient_Call) RunAndReturn(run func(context.Context, string) (types.EthClient, error)) *EthClientFactory_CreateEthClient_Call { - _c.Call.Return(run) - return _c -} - -// NewEthClientFactory creates a new instance of EthClientFactory. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewEthClientFactory(t interface { - mock.TestingT - Cleanup(func()) -}) *EthClientFactory { - mock := &EthClientFactory{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} diff --git a/mocks/etherman.generated.go b/mocks/etherman.generated.go index 8c0ef4fa..d33c87e2 100644 --- a/mocks/etherman.generated.go +++ b/mocks/etherman.generated.go @@ -34,6 +34,125 @@ func (_m *Etherman) EXPECT() *Etherman_Expecter { return &Etherman_Expecter{mock: &_m.Mock} } +// BlockByNumber provides a mock function with given fields: ctx, number +func (_m *Etherman) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) { + ret := _m.Called(ctx, number) + + if len(ret) == 0 { + panic("no return value specified for BlockByNumber") + } + + var r0 *types.Block + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *big.Int) (*types.Block, error)); ok { + return rf(ctx, number) + } + if rf, ok := ret.Get(0).(func(context.Context, *big.Int) *types.Block); ok { + r0 = rf(ctx, number) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*types.Block) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *big.Int) error); ok { + r1 = rf(ctx, number) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Etherman_BlockByNumber_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'BlockByNumber' +type Etherman_BlockByNumber_Call struct { + *mock.Call +} + +// BlockByNumber is a helper method to define mock.On call +// - ctx context.Context +// - number *big.Int +func (_e *Etherman_Expecter) BlockByNumber(ctx interface{}, number interface{}) *Etherman_BlockByNumber_Call { + return &Etherman_BlockByNumber_Call{Call: _e.mock.On("BlockByNumber", ctx, number)} +} + +func (_c *Etherman_BlockByNumber_Call) Run(run func(ctx context.Context, number *big.Int)) *Etherman_BlockByNumber_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*big.Int)) + }) + return _c +} + +func (_c *Etherman_BlockByNumber_Call) Return(_a0 *types.Block, _a1 error) *Etherman_BlockByNumber_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *Etherman_BlockByNumber_Call) RunAndReturn(run func(context.Context, *big.Int) (*types.Block, error)) *Etherman_BlockByNumber_Call { + _c.Call.Return(run) + return _c +} + +// CodeAt provides a mock function with given fields: ctx, account, blockNumber +func (_m *Etherman) CodeAt(ctx context.Context, account common.Address, blockNumber *big.Int) ([]byte, error) { + ret := _m.Called(ctx, account, blockNumber) + + if len(ret) == 0 { + panic("no return value specified for CodeAt") + } + + var r0 []byte + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, common.Address, *big.Int) ([]byte, error)); ok { + return rf(ctx, account, blockNumber) + } + if rf, ok := ret.Get(0).(func(context.Context, common.Address, *big.Int) []byte); ok { + r0 = rf(ctx, account, blockNumber) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]byte) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, common.Address, *big.Int) error); ok { + r1 = rf(ctx, account, blockNumber) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Etherman_CodeAt_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CodeAt' +type Etherman_CodeAt_Call struct { + *mock.Call +} + +// CodeAt is a helper method to define mock.On call +// - ctx context.Context +// - account common.Address +// - blockNumber *big.Int +func (_e *Etherman_Expecter) CodeAt(ctx interface{}, account interface{}, blockNumber interface{}) *Etherman_CodeAt_Call { + return &Etherman_CodeAt_Call{Call: _e.mock.On("CodeAt", ctx, account, blockNumber)} +} + +func (_c *Etherman_CodeAt_Call) Run(run func(ctx context.Context, account common.Address, blockNumber *big.Int)) *Etherman_CodeAt_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(common.Address), args[2].(*big.Int)) + }) + return _c +} + +func (_c *Etherman_CodeAt_Call) Return(_a0 []byte, _a1 error) *Etherman_CodeAt_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *Etherman_CodeAt_Call) RunAndReturn(run func(context.Context, common.Address, *big.Int) ([]byte, error)) *Etherman_CodeAt_Call { + _c.Call.Return(run) + return _c +} + // FilterSequenceBatches provides a mock function with given fields: opts, numBatch func (_m *Etherman) FilterSequenceBatches(opts *bind.FilterOpts, numBatch []uint64) (*polygonvalidium.PolygonvalidiumSequenceBatchesIterator, error) { ret := _m.Called(opts, numBatch) diff --git a/sequencer/tracker.go b/sequencer/tracker.go index b4ea95b9..b476d840 100644 --- a/sequencer/tracker.go +++ b/sequencer/tracker.go @@ -2,6 +2,7 @@ package sequencer import ( "context" + "strings" "sync" "time" @@ -21,25 +22,35 @@ const ( // Tracker watches the contract for relevant changes to the sequencer type Tracker struct { - client etherman.Etherman + em etherman.Etherman stop chan struct{} timeout time.Duration retry time.Duration addr common.Address url string trackChanges bool + usePolling bool + pollInterval time.Duration + wg sync.WaitGroup lock sync.Mutex startOnce sync.Once } // NewTracker creates a new Tracker -func NewTracker(cfg config.L1Config, ethClient etherman.Etherman) *Tracker { +func NewTracker(cfg config.L1Config, em etherman.Etherman) *Tracker { + pollInterval := time.Minute + if cfg.TrackSequencerPollInterval.Seconds() > 0 { + pollInterval = cfg.TrackSequencerPollInterval.Duration + } + return &Tracker{ - client: ethClient, + em: em, stop: make(chan struct{}), timeout: cfg.Timeout.Duration, retry: cfg.RetryPeriod.Duration, trackChanges: cfg.TrackSequencer, + usePolling: strings.HasPrefix(cfg.RpcURL, "http"), // If http(s), use polling instead of sockets + pollInterval: pollInterval, } } @@ -52,8 +63,8 @@ func (st *Tracker) GetAddr() common.Address { func (st *Tracker) setAddr(addr common.Address) { st.lock.Lock() - defer st.lock.Unlock() st.addr = addr + st.lock.Unlock() } // GetUrl returns the last known URL of the Sequencer @@ -65,8 +76,8 @@ func (st *Tracker) GetUrl() string { func (st *Tracker) setUrl(url string) { st.lock.Lock() - defer st.lock.Unlock() st.url = url + st.lock.Unlock() } // Start starts the SequencerTracker @@ -75,7 +86,7 @@ func (st *Tracker) Start(parentCtx context.Context) { ctx, cancel := context.WithTimeout(parentCtx, st.timeout) defer cancel() - addr, err := st.client.TrustedSequencer(ctx) + addr, err := st.em.TrustedSequencer(ctx) if err != nil { log.Fatalf("failed to get sequencer addr: %v", err) return @@ -84,7 +95,7 @@ func (st *Tracker) Start(parentCtx context.Context) { log.Infof("current sequencer addr: %s", addr.Hex()) st.setAddr(addr) - url, err := st.client.TrustedSequencerURL(ctx) + url, err := st.em.TrustedSequencerURL(ctx) if err != nil { log.Fatalf("failed to get sequencer addr: %v", err) return @@ -103,6 +114,35 @@ func (st *Tracker) Start(parentCtx context.Context) { } func (st *Tracker) trackAddrChanges(ctx context.Context) { + addrChan := make(chan common.Address, 1) + + if st.usePolling { + go st.pollAddrChanges(ctx, addrChan) + } else { + go st.subscribeOnAddrChanges(ctx, addrChan) + } + + for { + select { + case addr := <-addrChan: + if st.GetAddr().Cmp(addr) != 0 { + log.Infof("new trusted sequencer address: %v", addr) + st.setAddr(addr) + } + case <-ctx.Done(): + if ctx.Err() != nil && ctx.Err() != context.DeadlineExceeded { + log.Warnf("context cancelled: %v", ctx.Err()) + } + case <-st.stop: + return + } + } +} + +func (st *Tracker) subscribeOnAddrChanges(ctx context.Context, addrChan chan<- common.Address) { + st.wg.Add(1) + defer st.wg.Done() + events := make(chan *polygonvalidium.PolygonvalidiumSetTrustedSequencer) defer close(events) @@ -110,7 +150,7 @@ func (st *Tracker) trackAddrChanges(ctx context.Context) { initSubscription := func() { if err := backoff.Exponential(func() (err error) { - if sub, err = st.client.WatchSetTrustedSequencer(ctx, events); err != nil { + if sub, err = st.em.WatchSetTrustedSequencer(ctx, events); err != nil { log.Errorf("error subscribing to trusted sequencer event, retrying: %v", err) } @@ -125,12 +165,9 @@ func (st *Tracker) trackAddrChanges(ctx context.Context) { for { select { case e := <-events: - log.Infof("new trusted sequencer address: %v", e.NewTrustedSequencer) - st.setAddr(e.NewTrustedSequencer) + addrChan <- e.NewTrustedSequencer case <-ctx.Done(): - if ctx.Err() != nil && ctx.Err() != context.DeadlineExceeded { - log.Warnf("context cancelled: %v", ctx.Err()) - } + return case err := <-sub.Err(): log.Warnf("subscription error, resubscribing: %v", err) initSubscription() @@ -143,7 +180,63 @@ func (st *Tracker) trackAddrChanges(ctx context.Context) { } } +func (st *Tracker) pollAddrChanges(ctx context.Context, addrChan chan<- common.Address) { + st.wg.Add(1) + defer st.wg.Done() + + ticker := time.NewTicker(st.pollInterval) + for { + select { + case <-ticker.C: + addr, err := st.em.TrustedSequencer(ctx) + if err != nil { + log.Errorf("failed to get sequencer addr: %v", err) + break + } + + if st.GetAddr().Cmp(addr) != 0 { + addrChan <- addr + } + case <-ctx.Done(): + ticker.Stop() + return + case <-st.stop: + ticker.Stop() + return + } + } +} + func (st *Tracker) trackUrlChanges(ctx context.Context) { + urlChan := make(chan string, 1) + + if st.usePolling { + go st.pollUrlChanges(ctx, urlChan) + } else { + go st.subscribeOnUrlChanges(ctx, urlChan) + } + + for { + select { + case url := <-urlChan: + if st.GetUrl() != url { + log.Infof("new trusted sequencer url: %v", url) + st.setUrl(url) + } + case <-ctx.Done(): + if ctx.Err() != nil && ctx.Err() != context.DeadlineExceeded { + log.Warnf("context cancelled: %v", ctx.Err()) + } + case <-st.stop: + return + } + } +} + +func (st *Tracker) subscribeOnUrlChanges(ctx context.Context, urlChan chan<- string) { + st.wg.Add(1) + defer st.wg.Done() + events := make(chan *polygonvalidium.PolygonvalidiumSetTrustedSequencerURL) defer close(events) @@ -151,7 +244,7 @@ func (st *Tracker) trackUrlChanges(ctx context.Context) { initSubscription := func() { if err := backoff.Exponential(func() (err error) { - if sub, err = st.client.WatchSetTrustedSequencerURL(ctx, events); err != nil { + if sub, err = st.em.WatchSetTrustedSequencerURL(ctx, events); err != nil { log.Errorf("error subscribing to trusted sequencer URL event, retrying: %v", err) } @@ -166,12 +259,9 @@ func (st *Tracker) trackUrlChanges(ctx context.Context) { for { select { case e := <-events: - log.Infof("new trusted sequencer url: %v", e.NewTrustedSequencerURL) - st.setUrl(e.NewTrustedSequencerURL) + urlChan <- e.NewTrustedSequencerURL case <-ctx.Done(): - if ctx.Err() != nil && ctx.Err() != context.DeadlineExceeded { - log.Warnf("context cancelled: %v", ctx.Err()) - } + return case err := <-sub.Err(): log.Warnf("subscription error, resubscribing: %v", err) initSubscription() @@ -184,6 +274,33 @@ func (st *Tracker) trackUrlChanges(ctx context.Context) { } } +func (st *Tracker) pollUrlChanges(ctx context.Context, urlChan chan<- string) { + st.wg.Add(1) + defer st.wg.Done() + + ticker := time.NewTicker(st.pollInterval) + for { + select { + case <-ticker.C: + url, err := st.em.TrustedSequencerURL(ctx) + if err != nil { + log.Errorf("failed to get sequencer URL: %v", err) + break + } + + if st.GetUrl() != url { + urlChan <- url + } + case <-ctx.Done(): + ticker.Stop() + return + case <-st.stop: + ticker.Stop() + return + } + } +} + // GetSequenceBatch returns sequence batch for given batch number func (st *Tracker) GetSequenceBatch(batchNum uint64) (*SeqBatch, error) { return GetData(st.GetUrl(), batchNum) @@ -192,4 +309,5 @@ func (st *Tracker) GetSequenceBatch(batchNum uint64) (*SeqBatch, error) { // Stop stops the SequencerTracker func (st *Tracker) Stop() { close(st.stop) + st.wg.Wait() } diff --git a/sequencer/tracker_test.go b/sequencer/tracker_test.go index ee13fb90..3b709735 100644 --- a/sequencer/tracker_test.go +++ b/sequencer/tracker_test.go @@ -23,7 +23,7 @@ func TestTracker(t *testing.T) { updatedURL = "127.0.0.1:9585" ) - t.Run("with enabled tracker", func(t *testing.T) { + t.Run("with enabled subscription tracker", func(t *testing.T) { var ( addressesChan chan *polygonvalidium.PolygonvalidiumSetTrustedSequencer urlsChan chan *polygonvalidium.PolygonvalidiumSetTrustedSequencerURL @@ -88,18 +88,55 @@ func TestTracker(t *testing.T) { NewTrustedSequencerURL: updatedURL, } - tracker.Stop() - // Wait for values to be updated eventually(t, 10, func() bool { return tracker.GetAddr() == updatedAddress && tracker.GetUrl() == updatedURL }) + tracker.Stop() + urlsSubscription.AssertExpectations(t) addressesSubscription.AssertExpectations(t) etherman.AssertExpectations(t) }) + t.Run("with enabled polling tracker", func(t *testing.T) { + ctx := context.Background() + + etherman := mocks.NewEtherman(t) + + etherman.On("TrustedSequencer", mock.Anything).Return(initialAddress, nil) + etherman.On("TrustedSequencerURL", mock.Anything).Return(initialURL, nil) + + etherman.On("TrustedSequencer", mock.Anything).Return(updatedAddress, nil) + etherman.On("TrustedSequencerURL", mock.Anything).Return(updatedURL, nil) + + tracker := sequencer.NewTracker(config.L1Config{ + RpcURL: "http://127.0.0.1:8545", + Timeout: types.NewDuration(time.Second * 10), + RetryPeriod: types.NewDuration(time.Millisecond), + TrackSequencerPollInterval: types.NewDuration(time.Second), + TrackSequencer: true, + }, etherman) + + require.Equal(t, common.Address{}, tracker.GetAddr()) + require.Empty(t, tracker.GetUrl()) + + tracker.Start(ctx) + + require.Equal(t, initialAddress, tracker.GetAddr()) + require.Equal(t, initialURL, tracker.GetUrl()) + + // Wait for values to be updated + eventually(t, 10, func() bool { + return tracker.GetAddr() == updatedAddress && tracker.GetUrl() == updatedURL + }) + + tracker.Stop() + + etherman.AssertExpectations(t) + }) + t.Run("with disabled tracker", func(t *testing.T) { ctx := context.Background() diff --git a/synchronizer/init.go b/synchronizer/init.go index 18e64134..7fad7eaa 100644 --- a/synchronizer/init.go +++ b/synchronizer/init.go @@ -5,10 +5,9 @@ import ( "math/big" "time" - "github.com/0xPolygon/cdk-data-availability/config" "github.com/0xPolygon/cdk-data-availability/db" + "github.com/0xPolygon/cdk-data-availability/etherman" "github.com/0xPolygon/cdk-data-availability/log" - "github.com/0xPolygon/cdk-data-availability/types" "github.com/ethereum/go-ethereum/common" ) @@ -18,7 +17,7 @@ const ( ) // InitStartBlock initializes the L1 sync task by finding the inception block for the CDKValidium contract -func InitStartBlock(db db.DB, ethClientFactory types.EthClientFactory, l1 config.L1Config) error { +func InitStartBlock(db db.DB, em etherman.Etherman, genesisBlock uint64, validiumAddr common.Address) error { ctx, cancel := context.WithTimeout(context.Background(), initBlockTimeout) defer cancel() @@ -30,18 +29,13 @@ func InitStartBlock(db db.DB, ethClientFactory types.EthClientFactory, l1 config // no need to resolve start block, it's already been set return nil } - log.Info("starting search for start block of contract ", l1.PolygonValidiumAddress) - - ethClient, err := ethClientFactory.CreateEthClient(ctx, l1.RpcURL) - if err != nil { - return err - } + log.Info("starting search for start block of contract ", validiumAddr) startBlock := new(big.Int) - if l1.GenesisBlock != 0 { - startBlock.SetUint64(l1.GenesisBlock) + if genesisBlock != 0 { + startBlock.SetUint64(genesisBlock) } else { - startBlock, err = findContractDeploymentBlock(ctx, ethClient, common.HexToAddress(l1.PolygonValidiumAddress)) + startBlock, err = findContractDeploymentBlock(ctx, em, validiumAddr) if err != nil { return err } @@ -50,30 +44,30 @@ func InitStartBlock(db db.DB, ethClientFactory types.EthClientFactory, l1 config return setStartBlock(db, startBlock.Uint64()) } -func findContractDeploymentBlock(ctx context.Context, eth types.EthClient, contract common.Address) (*big.Int, error) { - latestBlock, err := eth.BlockByNumber(ctx, nil) +func findContractDeploymentBlock(ctx context.Context, em etherman.Etherman, contract common.Address) (*big.Int, error) { + latestBlock, err := em.BlockByNumber(ctx, nil) if err != nil { return nil, err } - firstBlock := findCode(ctx, eth, contract, 0, latestBlock.Number().Int64()) + firstBlock := findCode(ctx, em, contract, 0, latestBlock.Number().Int64()) return big.NewInt(firstBlock), nil } // findCode is an O(log(n)) search for the inception block of a contract at the given address -func findCode(ctx context.Context, eth types.EthClient, address common.Address, startBlock, endBlock int64) int64 { +func findCode(ctx context.Context, em etherman.Etherman, address common.Address, startBlock, endBlock int64) int64 { if startBlock == endBlock { return startBlock } midBlock := (startBlock + endBlock) / 2 //nolint:gomnd - if codeLen := codeLen(ctx, eth, address, midBlock); codeLen > minCodeLen { - return findCode(ctx, eth, address, startBlock, midBlock) + if codeLen := codeLen(ctx, em, address, midBlock); codeLen > minCodeLen { + return findCode(ctx, em, address, startBlock, midBlock) } else { - return findCode(ctx, eth, address, midBlock+1, endBlock) + return findCode(ctx, em, address, midBlock+1, endBlock) } } -func codeLen(ctx context.Context, eth types.EthClient, address common.Address, blockNumber int64) int64 { - data, err := eth.CodeAt(ctx, address, big.NewInt(blockNumber)) +func codeLen(ctx context.Context, em etherman.Etherman, address common.Address, blockNumber int64) int64 { + data, err := em.CodeAt(ctx, address, big.NewInt(blockNumber)) if err != nil { return 0 } diff --git a/synchronizer/init_test.go b/synchronizer/init_test.go index ee7d2479..8a3b34bb 100644 --- a/synchronizer/init_test.go +++ b/synchronizer/init_test.go @@ -27,9 +27,6 @@ func Test_InitStartBlock(t *testing.T) { storeLastProcessedBlockArgs []interface{} storeLastProcessedBlockReturns []interface{} commitReturns []interface{} - // eth client factory mocks - createEthClientArgs []interface{} - createEthClientReturns []interface{} // eth client mocks blockByNumberArgs []interface{} blockByNumberReturns []interface{} @@ -40,8 +37,8 @@ func Test_InitStartBlock(t *testing.T) { } l1Config := config.L1Config{ - WsURL: "ws://localhost:8080/ws", - RpcURL: "http://localhost:8081", + RpcURL: "ws://localhost:8080/ws", + // RpcURL: "http://localhost:8081", PolygonValidiumAddress: "0xCDKValidium", DataCommitteeAddress: "0xDAC", Timeout: types.NewDuration(time.Minute), @@ -49,11 +46,10 @@ func Test_InitStartBlock(t *testing.T) { BlockBatchSize: 10, } - testFn := func(config testConfig) { + testFn := func(t *testing.T, config testConfig) { dbMock := mocks.NewDB(t) txMock := mocks.NewTx(t) - ethClientMock := mocks.NewEthClient(t) - ethClientFactoryMock := mocks.NewEthClientFactory(t) + emMock := mocks.NewEtherman(t) if config.getLastProcessedBlockArgs != nil && config.getLastProcessedBlockReturns != nil { dbMock.On("GetLastProcessedBlock", config.getLastProcessedBlockArgs...).Return( @@ -81,46 +77,38 @@ func Test_InitStartBlock(t *testing.T) { returnArgs...).Once() } - if config.createEthClientArgs != nil { - var returnArgs []interface{} - if config.createEthClientReturns != nil { - returnArgs = config.createEthClientReturns - } else { - returnArgs = append(returnArgs, ethClientMock, nil) - } - - ethClientFactoryMock.On("CreateEthClient", config.createEthClientArgs...).Return( - returnArgs...).Once() - } - if config.blockByNumberArgs != nil && config.blockByNumberReturns != nil { - ethClientMock.On("BlockByNumber", config.blockByNumberArgs...).Return( + emMock.On("BlockByNumber", config.blockByNumberArgs...).Return( config.blockByNumberReturns...).Once() } if config.codeAtArgs != nil && config.codeAtReturns != nil { for i, args := range config.codeAtArgs { - ethClientMock.On("CodeAt", args...).Return( + emMock.On("CodeAt", args...).Return( config.codeAtReturns[i]...).Once() } } + err := InitStartBlock( + dbMock, + emMock, + l1Config.GenesisBlock, + common.HexToAddress(l1Config.PolygonValidiumAddress), + ) if config.isErrorExpected { - require.Error(t, InitStartBlock(dbMock, ethClientFactoryMock, l1Config)) + require.Error(t, err) } else { - require.NoError(t, InitStartBlock(dbMock, ethClientFactoryMock, l1Config)) + require.NoError(t, err) } dbMock.AssertExpectations(t) txMock.AssertExpectations(t) - ethClientMock.AssertExpectations(t) - ethClientFactoryMock.AssertExpectations(t) } t.Run("GetLastProcessedBlock returns an error", func(t *testing.T) { t.Parallel() - testFn(testConfig{ + testFn(t, testConfig{ getLastProcessedBlockArgs: []interface{}{mock.Anything, L1SyncTask}, getLastProcessedBlockReturns: []interface{}{uint64(1), errors.New("can't get last processed block")}, isErrorExpected: true, @@ -130,32 +118,19 @@ func Test_InitStartBlock(t *testing.T) { t.Run("no need to resolve start block", func(t *testing.T) { t.Parallel() - testFn(testConfig{ + testFn(t, testConfig{ getLastProcessedBlockArgs: []interface{}{mock.Anything, L1SyncTask}, getLastProcessedBlockReturns: []interface{}{uint64(10), nil}, isErrorExpected: false, }) }) - t.Run("can not create eth client", func(t *testing.T) { - t.Parallel() - - testFn(testConfig{ - getLastProcessedBlockArgs: []interface{}{mock.Anything, L1SyncTask}, - getLastProcessedBlockReturns: []interface{}{uint64(0), nil}, - createEthClientArgs: []interface{}{mock.Anything, l1Config.RpcURL}, - createEthClientReturns: []interface{}{nil, errors.New("error")}, - isErrorExpected: true, - }) - }) - t.Run("can not get block from eth client", func(t *testing.T) { t.Parallel() - testFn(testConfig{ + testFn(t, testConfig{ getLastProcessedBlockArgs: []interface{}{mock.Anything, L1SyncTask}, getLastProcessedBlockReturns: []interface{}{uint64(0), nil}, - createEthClientArgs: []interface{}{mock.Anything, l1Config.RpcURL}, blockByNumberArgs: []interface{}{mock.Anything, mock.Anything}, blockByNumberReturns: []interface{}{nil, errors.New("error")}, isErrorExpected: true, @@ -169,12 +144,11 @@ func Test_InitStartBlock(t *testing.T) { Number: big.NewInt(0), }) - testFn(testConfig{ + testFn(t, testConfig{ getLastProcessedBlockArgs: []interface{}{mock.Anything, L1SyncTask}, getLastProcessedBlockReturns: []interface{}{uint64(0), nil}, beginStateTransactionArgs: []interface{}{mock.Anything}, beginStateTransactionReturns: []interface{}{nil, errors.New("error")}, - createEthClientArgs: []interface{}{mock.Anything, l1Config.RpcURL}, blockByNumberArgs: []interface{}{mock.Anything, mock.Anything}, blockByNumberReturns: []interface{}{block, nil}, isErrorExpected: true, @@ -188,13 +162,12 @@ func Test_InitStartBlock(t *testing.T) { Number: big.NewInt(0), }) - testFn(testConfig{ + testFn(t, testConfig{ getLastProcessedBlockArgs: []interface{}{mock.Anything, L1SyncTask}, getLastProcessedBlockReturns: []interface{}{uint64(0), nil}, beginStateTransactionArgs: []interface{}{mock.Anything}, storeLastProcessedBlockArgs: []interface{}{mock.Anything, L1SyncTask, uint64(0), mock.Anything}, storeLastProcessedBlockReturns: []interface{}{errors.New("error")}, - createEthClientArgs: []interface{}{mock.Anything, l1Config.RpcURL}, blockByNumberArgs: []interface{}{mock.Anything, mock.Anything}, blockByNumberReturns: []interface{}{block, nil}, isErrorExpected: true, @@ -204,12 +177,11 @@ func Test_InitStartBlock(t *testing.T) { t.Run("Commit fails", func(t *testing.T) { t.Parallel() - testFn(testConfig{ + testFn(t, testConfig{ blockByNumberArgs: []interface{}{mock.Anything, mock.Anything}, blockByNumberReturns: []interface{}{ethTypes.NewBlockWithHeader(ðTypes.Header{ Number: big.NewInt(0), }), nil}, - createEthClientArgs: []interface{}{mock.Anything, l1Config.RpcURL}, getLastProcessedBlockArgs: []interface{}{mock.Anything, L1SyncTask}, getLastProcessedBlockReturns: []interface{}{uint64(0), nil}, storeLastProcessedBlockArgs: []interface{}{mock.Anything, L1SyncTask, uint64(0), mock.Anything}, @@ -224,14 +196,13 @@ func Test_InitStartBlock(t *testing.T) { t.Run("Successful init", func(t *testing.T) { t.Parallel() - testFn(testConfig{ + testFn(t, testConfig{ getLastProcessedBlockArgs: []interface{}{mock.Anything, L1SyncTask}, getLastProcessedBlockReturns: []interface{}{uint64(0), nil}, beginStateTransactionArgs: []interface{}{mock.Anything}, storeLastProcessedBlockArgs: []interface{}{mock.Anything, L1SyncTask, uint64(2), mock.Anything}, storeLastProcessedBlockReturns: []interface{}{nil}, commitReturns: []interface{}{nil}, - createEthClientArgs: []interface{}{mock.Anything, l1Config.RpcURL}, blockByNumberArgs: []interface{}{mock.Anything, mock.Anything}, blockByNumberReturns: []interface{}{ethTypes.NewBlockWithHeader(ðTypes.Header{ Number: big.NewInt(3), diff --git a/synchronizer/reorg.go b/synchronizer/reorg.go index d3176c56..71725b17 100644 --- a/synchronizer/reorg.go +++ b/synchronizer/reorg.go @@ -48,8 +48,7 @@ func (rd *ReorgDetector) Start() error { rd.cancel = cancel blocks := make(chan *ethgo.Block) - err := rd.trackBlocks(ctx, blocks) - if err != nil { + if err := rd.trackBlocks(ctx, blocks); err != nil { return err } diff --git a/test/config/test.dev.toml b/test/config/test.dev.toml index 99cb1045..5e08237b 100644 --- a/test/config/test.dev.toml +++ b/test/config/test.dev.toml @@ -1,13 +1,13 @@ PrivateKey = {Path = "config/test-member.keystore", Password = "testonly"} [L1] -WsURL = "wss://rootchain.hardfork.dev/ws" -RpcURL = "https://rootchain.hardfork.dev/" +RpcURL = "wss://rootchain.hardfork.dev/ws" PolygonValidiumAddress = "0x0775AAFB6dD38417581F7C583053Fa3B78FD4FD1" DataCommitteeAddress = "0xE660928f13F51bEbb553063A1317EDC0e7038949" Timeout = "1m" RetryPeriod = "5s" TrackSequencer = true +TrackSequencerPollInterval = "1m" [Log] Environment = "development" # "production" or "development" diff --git a/test/config/test.docker.toml b/test/config/test.docker.toml index 56550f49..8a74271b 100644 --- a/test/config/test.docker.toml +++ b/test/config/test.docker.toml @@ -1,14 +1,14 @@ PrivateKey = {Path = "/pk/test-member.keystore", Password = "testonly"} [L1] -WsURL = "ws://l1:8546" -RpcURL = "http://l1:8545" +RpcURL = "ws://l1:8546" PolygonValidiumAddress = "0x8dAF17A20c9DBA35f005b6324F493785D239719d" DataCommitteeAddress = "0x68B1D87F95878fE05B998F19b66F4baba5De1aed" Timeout = "3m" RetryPeriod = "5s" BlockBatchSize = 32 TrackSequencer = true +TrackSequencerPollInterval = "1m" [Log] Environment = "development" # "production" or "development" diff --git a/test/config/test.local.toml b/test/config/test.local.toml index 7cddbb02..b19a167b 100644 --- a/test/config/test.local.toml +++ b/test/config/test.local.toml @@ -1,14 +1,14 @@ PrivateKey = {Path = "config/test-member.keystore", Password = "testonly"} [L1] -WsURL = "ws://127.0.0.1:8546" -RpcURL = "http://127.0.0.1:8545" +RpcURL = "ws://127.0.0.1:8546" PolygonValidiumAddress = "0x8dAF17A20c9DBA35f005b6324F493785D239719d" DataCommitteeAddress = "0x68B1D87F95878fE05B998F19b66F4baba5De1aed" Timeout = "3m" RetryPeriod = "5s" BlockBatchSize = 8 TrackSequencer = true +TrackSequencerPollInterval = "1m" [Log] Environment = "development" # "production" or "development" diff --git a/test/e2e/datacommittee_test.go b/test/e2e/datacommittee_test.go index 0bdc4a28..34bae1f5 100644 --- a/test/e2e/datacommittee_test.go +++ b/test/e2e/datacommittee_test.go @@ -297,8 +297,8 @@ func createKeyStore(pk *ecdsa.PrivateKey, outputDir, password string) error { func startDACMember(t *testing.T, m member) { dacNodeConfig := config.Config{ L1: config.L1Config{ - WsURL: "ws://l1:8546", - RpcURL: "http://l1:8545", + RpcURL: "ws://l1:8546", + //RpcURL: "http://l1:8545", PolygonValidiumAddress: operations.DefaultL1CDKValidiumSmartContract, DataCommitteeAddress: operations.DefaultL1DataCommitteeContract, Timeout: cTypes.Duration{Duration: time.Second}, diff --git a/test/e2e/sequencer_tracker_test.go b/test/e2e/sequencer_tracker_test.go index d45d659b..4ca1ef70 100644 --- a/test/e2e/sequencer_tracker_test.go +++ b/test/e2e/sequencer_tracker_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/0xPolygon/cdk-data-availability/config" + "github.com/0xPolygon/cdk-data-availability/config/types" "github.com/0xPolygon/cdk-data-availability/etherman" "github.com/0xPolygon/cdk-data-availability/etherman/smartcontracts/etrog/polygonvalidium" "github.com/0xPolygon/cdk-data-availability/sequencer" @@ -20,28 +21,15 @@ import ( func TestSequencerAddrExists(t *testing.T) { err := operations.StartComponent("network") require.NoError(t, err) + defer operations.StopComponent("network") <-time.After(3 * time.Second) // wait for component to start ctx := cli.NewContext(cli.NewApp(), nil, nil) - cfg, err := config.Load(ctx) - require.NoError(t, err) - etm, err := etherman.New(ctx.Context, cfg.L1) - require.NoError(t, err) - - tracker := sequencer.NewTracker(cfg.L1, etm) - - tracker.Start(ctx.Context) - defer tracker.Stop() - - addr := tracker.GetAddr() - require.Equal(t, common.HexToAddress("0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266"), addr) - - url := tracker.GetUrl() - require.Equal(t, "http://zkevm-json-rpc:8123", url) // the default clientL1, err := ethclient.Dial(operations.DefaultL1NetworkURL) require.NoError(t, err) + validiumContract, err := polygonvalidium.NewPolygonvalidium( common.HexToAddress(operations.DefaultL1CDKValidiumSmartContract), clientL1, @@ -52,11 +40,44 @@ func TestSequencerAddrExists(t *testing.T) { require.NoError(t, err) newUrl := fmt.Sprintf("http://something-else:%d", rand.Intn(10000)) + + initTracker := func(rpcUrl string) *sequencer.Tracker { + cfg, err := config.Load(ctx) + require.NoError(t, err) + + // Make sure ws is used + cfg.L1.RpcURL = rpcUrl + cfg.L1.TrackSequencerPollInterval = types.NewDuration(100 * time.Millisecond) + + etm, err := etherman.New(ctx.Context, cfg.L1) + require.NoError(t, err) + + tracker := sequencer.NewTracker(cfg.L1, etm) + + tracker.Start(ctx.Context) + + addr := tracker.GetAddr() + require.Equal(t, common.HexToAddress("0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266"), addr) + + url := tracker.GetUrl() + require.Equal(t, "http://zkevm-json-rpc:8123", url) // the default + + return tracker + } + + wsTracker := initTracker("ws://127.0.0.1:8546") + defer wsTracker.Stop() + + httpTracker := initTracker("http://127.0.0.1:8545") + defer httpTracker.Stop() + + // Update URL on L1 contract _, err = validiumContract.SetTrustedSequencerURL(authL1, newUrl) require.NoError(t, err) - // give the tracker a sec to get the event + // Give the tracker a sec to get the event <-time.After(2500 * time.Millisecond) - require.Equal(t, newUrl, tracker.GetUrl()) + require.Equal(t, newUrl, wsTracker.GetUrl()) + require.Equal(t, newUrl, httpTracker.GetUrl()) } diff --git a/types/factories.go b/types/factories.go deleted file mode 100644 index 097f3fa7..00000000 --- a/types/factories.go +++ /dev/null @@ -1,34 +0,0 @@ -package types - -import ( - "context" - "math/big" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/ethclient" -) - -// EthClient defines functions that an ethereum rpc client should implement -type EthClient interface { - BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) - CodeAt(ctx context.Context, account common.Address, blockNumber *big.Int) ([]byte, error) -} - -// EthClientFactory defines functions for a EthClient factory -type EthClientFactory interface { - CreateEthClient(ctx context.Context, url string) (EthClient, error) -} - -// ethClientFactory is the implementation of EthClientFactory interface -type ethClientFactory struct{} - -// NewEthClientFactory is the constructor of ethClientFactory -func NewEthClientFactory() EthClientFactory { - return ðClientFactory{} -} - -// CreateEthClient creates a new eth client -func (e *ethClientFactory) CreateEthClient(ctx context.Context, url string) (EthClient, error) { - return ethclient.DialContext(ctx, url) -} From 76e1aa74230c44db911f395adeaef3c2b7c9321a Mon Sep 17 00:00:00 2001 From: Christopher Campbell Date: Wed, 10 Apr 2024 10:18:33 -0400 Subject: [PATCH 2/4] use header rather than block to get latest number during init --- synchronizer/init.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synchronizer/init.go b/synchronizer/init.go index 7fad7eaa..71df83c4 100644 --- a/synchronizer/init.go +++ b/synchronizer/init.go @@ -45,11 +45,11 @@ func InitStartBlock(db db.DB, em etherman.Etherman, genesisBlock uint64, validiu } func findContractDeploymentBlock(ctx context.Context, em etherman.Etherman, contract common.Address) (*big.Int, error) { - latestBlock, err := em.BlockByNumber(ctx, nil) + latestHeader, err := em.HeaderByNumber(ctx, nil) if err != nil { return nil, err } - firstBlock := findCode(ctx, em, contract, 0, latestBlock.Number().Int64()) + firstBlock := findCode(ctx, em, contract, 0, latestHeader.Number.Int64()) return big.NewInt(firstBlock), nil } From 60c2d24dd6eaa22cacd7c656ca34afcb7a02aa73 Mon Sep 17 00:00:00 2001 From: Christopher Campbell Date: Wed, 10 Apr 2024 10:23:25 -0400 Subject: [PATCH 3/4] fix test --- synchronizer/init_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synchronizer/init_test.go b/synchronizer/init_test.go index 8a3b34bb..9da0abbc 100644 --- a/synchronizer/init_test.go +++ b/synchronizer/init_test.go @@ -78,7 +78,7 @@ func Test_InitStartBlock(t *testing.T) { } if config.blockByNumberArgs != nil && config.blockByNumberReturns != nil { - emMock.On("BlockByNumber", config.blockByNumberArgs...).Return( + emMock.On("HeaderByNumber", config.blockByNumberArgs...).Return( config.blockByNumberReturns...).Once() } From 5b956c5108946eab4eeb8115ada926b2fb941979 Mon Sep 17 00:00:00 2001 From: Christopher Campbell Date: Wed, 10 Apr 2024 11:26:43 -0400 Subject: [PATCH 4/4] fix mock expectations --- synchronizer/init_test.go | 54 ++++++++++++++++++--------------------- 1 file changed, 25 insertions(+), 29 deletions(-) diff --git a/synchronizer/init_test.go b/synchronizer/init_test.go index 9da0abbc..3b71c063 100644 --- a/synchronizer/init_test.go +++ b/synchronizer/init_test.go @@ -28,10 +28,10 @@ func Test_InitStartBlock(t *testing.T) { storeLastProcessedBlockReturns []interface{} commitReturns []interface{} // eth client mocks - blockByNumberArgs []interface{} - blockByNumberReturns []interface{} - codeAtArgs [][]interface{} - codeAtReturns [][]interface{} + headerByNumberArgs []interface{} + headerByNumberReturns []interface{} + codeAtArgs [][]interface{} + codeAtReturns [][]interface{} isErrorExpected bool } @@ -77,9 +77,9 @@ func Test_InitStartBlock(t *testing.T) { returnArgs...).Once() } - if config.blockByNumberArgs != nil && config.blockByNumberReturns != nil { - emMock.On("HeaderByNumber", config.blockByNumberArgs...).Return( - config.blockByNumberReturns...).Once() + if config.headerByNumberArgs != nil && config.headerByNumberReturns != nil { + emMock.On("HeaderByNumber", config.headerByNumberArgs...).Return( + config.headerByNumberReturns...).Once() } if config.codeAtArgs != nil && config.codeAtReturns != nil { @@ -131,8 +131,8 @@ func Test_InitStartBlock(t *testing.T) { testFn(t, testConfig{ getLastProcessedBlockArgs: []interface{}{mock.Anything, L1SyncTask}, getLastProcessedBlockReturns: []interface{}{uint64(0), nil}, - blockByNumberArgs: []interface{}{mock.Anything, mock.Anything}, - blockByNumberReturns: []interface{}{nil, errors.New("error")}, + headerByNumberArgs: []interface{}{mock.Anything, mock.Anything}, + headerByNumberReturns: []interface{}{nil, errors.New("error")}, isErrorExpected: true, }) }) @@ -140,37 +140,33 @@ func Test_InitStartBlock(t *testing.T) { t.Run("BeginStateTransaction fails", func(t *testing.T) { t.Parallel() - block := ethTypes.NewBlockWithHeader(ðTypes.Header{ - Number: big.NewInt(0), - }) - testFn(t, testConfig{ getLastProcessedBlockArgs: []interface{}{mock.Anything, L1SyncTask}, getLastProcessedBlockReturns: []interface{}{uint64(0), nil}, beginStateTransactionArgs: []interface{}{mock.Anything}, beginStateTransactionReturns: []interface{}{nil, errors.New("error")}, - blockByNumberArgs: []interface{}{mock.Anything, mock.Anything}, - blockByNumberReturns: []interface{}{block, nil}, - isErrorExpected: true, + headerByNumberArgs: []interface{}{mock.Anything, mock.Anything}, + headerByNumberReturns: []interface{}{ethTypes.NewBlockWithHeader(ðTypes.Header{ + Number: big.NewInt(0), + }).Header(), nil}, + isErrorExpected: true, }) }) t.Run("Store off-chain data fails", func(t *testing.T) { t.Parallel() - block := ethTypes.NewBlockWithHeader(ðTypes.Header{ - Number: big.NewInt(0), - }) - testFn(t, testConfig{ getLastProcessedBlockArgs: []interface{}{mock.Anything, L1SyncTask}, getLastProcessedBlockReturns: []interface{}{uint64(0), nil}, beginStateTransactionArgs: []interface{}{mock.Anything}, storeLastProcessedBlockArgs: []interface{}{mock.Anything, L1SyncTask, uint64(0), mock.Anything}, storeLastProcessedBlockReturns: []interface{}{errors.New("error")}, - blockByNumberArgs: []interface{}{mock.Anything, mock.Anything}, - blockByNumberReturns: []interface{}{block, nil}, - isErrorExpected: true, + headerByNumberArgs: []interface{}{mock.Anything, mock.Anything}, + headerByNumberReturns: []interface{}{ethTypes.NewBlockWithHeader(ðTypes.Header{ + Number: big.NewInt(0), + }).Header(), nil}, + isErrorExpected: true, }) }) @@ -178,10 +174,10 @@ func Test_InitStartBlock(t *testing.T) { t.Parallel() testFn(t, testConfig{ - blockByNumberArgs: []interface{}{mock.Anything, mock.Anything}, - blockByNumberReturns: []interface{}{ethTypes.NewBlockWithHeader(ðTypes.Header{ + headerByNumberArgs: []interface{}{mock.Anything, mock.Anything}, + headerByNumberReturns: []interface{}{ethTypes.NewBlockWithHeader(ðTypes.Header{ Number: big.NewInt(0), - }), nil}, + }).Header(), nil}, getLastProcessedBlockArgs: []interface{}{mock.Anything, L1SyncTask}, getLastProcessedBlockReturns: []interface{}{uint64(0), nil}, storeLastProcessedBlockArgs: []interface{}{mock.Anything, L1SyncTask, uint64(0), mock.Anything}, @@ -203,10 +199,10 @@ func Test_InitStartBlock(t *testing.T) { storeLastProcessedBlockArgs: []interface{}{mock.Anything, L1SyncTask, uint64(2), mock.Anything}, storeLastProcessedBlockReturns: []interface{}{nil}, commitReturns: []interface{}{nil}, - blockByNumberArgs: []interface{}{mock.Anything, mock.Anything}, - blockByNumberReturns: []interface{}{ethTypes.NewBlockWithHeader(ðTypes.Header{ + headerByNumberArgs: []interface{}{mock.Anything, mock.Anything}, + headerByNumberReturns: []interface{}{ethTypes.NewBlockWithHeader(ðTypes.Header{ Number: big.NewInt(3), - }), nil}, + }).Header(), nil}, codeAtArgs: [][]interface{}{ {mock.Anything, common.HexToAddress(l1Config.PolygonValidiumAddress), big.NewInt(1)}, {mock.Anything, common.HexToAddress(l1Config.PolygonValidiumAddress), big.NewInt(2)},