Skip to content

Commit

Permalink
Implement substrate tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mpetrun5 committed Oct 16, 2023
1 parent 9b84fb4 commit b9d5369
Show file tree
Hide file tree
Showing 7 changed files with 304 additions and 16 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,5 @@ genmocks:
mockgen -source=chains/evm/transactor/signAndSend/signAndSend.go -destination=./mock/signAndSend.go -package mock
mockgen -source=./store/store.go -destination=./mock/store.go -package mock
mockgen -source=./relayer/message/handler.go -destination=./mock/message.go -package mock
mockgen -source=./chains/evm/listener/listener.go -destination=./mock/listener.go -package mock
mockgen -source=./chains/evm/listener/listener.go -destination=./mock/evmListener.go -package mock
mockgen -destination=./mock/substrateListener.go -package mock github.com/ChainSafe/sygma-core/chains/substrate/listener ChainConnection
2 changes: 1 addition & 1 deletion chains/evm/listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ loop:
default:
head, err := l.client.LatestBlock()
if err != nil {
l.log.Error().Err(err).Msg("Unable to get latest block")
l.log.Warn().Err(err).Msg("Unable to get latest block")
time.Sleep(l.blockRetryInterval)
continue
}
Expand Down
7 changes: 4 additions & 3 deletions chains/evm/listener/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,19 +112,20 @@ func (s *ListenerTestSuite) Test_ListenToEvents_StoresBlockIfEventHandlingSucces
cancel()
}

func (s *ListenerTestSuite) Test_ListenToEvents_IgnoresBlocStorerError() {
func (s *ListenerTestSuite) Test_ListenToEvents_IgnoresBlockStorerError() {
startBlock := big.NewInt(100)
endBlock := big.NewInt(105)
head := big.NewInt(110)

s.mockClient.EXPECT().LatestBlock().Return(head, nil)
// prevent infinite runs
s.mockClient.EXPECT().LatestBlock().Return(big.NewInt(95), nil)
s.mockBlockDeltaMeter.EXPECT().TrackBlockDelta(uint8(1), head, endBlock)
s.mockEventHandler.EXPECT().HandleEvents(startBlock, new(big.Int).Sub(endBlock, big.NewInt(1))).Return(nil)
s.mockEventHandler.EXPECT().HandleEvents(startBlock, new(big.Int).Sub(endBlock, big.NewInt(1))).Return(nil)
s.mockBlockStorer.EXPECT().StoreBlock(endBlock, s.domainID).Return(fmt.Errorf("error"))

// prevent infinite runs
s.mockClient.EXPECT().LatestBlock().Return(big.NewInt(95), nil)

ctx, cancel := context.WithCancel(context.Background())

go s.listener.ListenToEvents(ctx, big.NewInt(100))
Expand Down
25 changes: 15 additions & 10 deletions chains/substrate/listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"math/big"
"time"

"github.com/centrifuge/go-substrate-rpc-client/v4/registry/parser"
"github.com/centrifuge/go-substrate-rpc-client/v4/types"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
Expand All @@ -19,10 +18,6 @@ type EventHandler interface {
}

type ChainConnection interface {
UpdateMetatdata() error
GetHeaderLatest() (*types.Header, error)
GetBlockHash(blockNumber uint64) (types.Hash, error)
GetBlockEvents(hash types.Hash) ([]*parser.Event, error)
GetFinalizedHead() (types.Hash, error)
GetBlock(blockHash types.Hash) (*types.SignedBlock, error)
}
Expand All @@ -31,10 +26,15 @@ type BlockStorer interface {
StoreBlock(block *big.Int, domainID uint8) error
}

type BlockDeltaMeter interface {
TrackBlockDelta(domainID uint8, head *big.Int, current *big.Int)
}

type SubstrateListener struct {
conn ChainConnection
blockstore BlockStorer
eventHandlers []EventHandler
metrics BlockDeltaMeter

blockRetryInterval time.Duration
blockInterval *big.Int
Expand All @@ -43,7 +43,7 @@ type SubstrateListener struct {
log zerolog.Logger
}

func NewSubstrateListener(connection ChainConnection, blockstore BlockStorer, eventHandlers []EventHandler, domainID uint8, blockRetryInterval time.Duration, blockInterval *big.Int) *SubstrateListener {
func NewSubstrateListener(connection ChainConnection, eventHandlers []EventHandler, blockstore BlockStorer, metrics BlockDeltaMeter, domainID uint8, blockRetryInterval time.Duration, blockInterval *big.Int) *SubstrateListener {
return &SubstrateListener{
log: log.With().Uint8("domainID", domainID).Logger(),
domainID: domainID,
Expand All @@ -52,27 +52,29 @@ func NewSubstrateListener(connection ChainConnection, blockstore BlockStorer, ev
eventHandlers: eventHandlers,
blockRetryInterval: blockRetryInterval,
blockInterval: blockInterval,
metrics: metrics,
}
}

func (l *SubstrateListener) ListenToEvents(ctx context.Context, startBlock *big.Int) {
endBlock := big.NewInt(0)

go func() {
loop:
for {
select {
case <-ctx.Done():
return
default:
hash, err := l.conn.GetFinalizedHead()
if err != nil {
l.log.Error().Err(err).Msg("Failed to fetch finalized header")
l.log.Warn().Err(err).Msg("Failed to fetch finalized header")
time.Sleep(l.blockRetryInterval)
continue
}
head, err := l.conn.GetBlock(hash)
if err != nil {
l.log.Error().Err(err).Msg("Failed to fetch block")
l.log.Warn().Err(err).Msg("Failed to fetch block")
time.Sleep(l.blockRetryInterval)
continue
}
Expand All @@ -88,15 +90,18 @@ func (l *SubstrateListener) ListenToEvents(ctx context.Context, startBlock *big.
continue
}

l.metrics.TrackBlockDelta(l.domainID, big.NewInt(int64(head.Block.Header.Number)), endBlock)
l.log.Debug().Msgf("Fetching substrate events for block range %s-%s", startBlock, endBlock)

for _, handler := range l.eventHandlers {
err := handler.HandleEvents(startBlock, new(big.Int).Sub(endBlock, big.NewInt(1)))
if err != nil {
l.log.Warn().Err(err).Msg("Error handling substrate events")
continue
continue loop
}
}

err = l.blockstore.StoreBlock(startBlock, l.domainID)
err = l.blockstore.StoreBlock(endBlock, l.domainID)
if err != nil {
l.log.Error().Str("block", startBlock.String()).Err(err).Msg("Failed to write latest block to blockstore")
}
Expand Down
212 changes: 212 additions & 0 deletions chains/substrate/listener/listener_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
package listener_test

import (
"context"
"fmt"
"math/big"
"testing"
"time"

"github.com/ChainSafe/sygma-core/chains/substrate/listener"
"github.com/ChainSafe/sygma-core/mock"
"github.com/centrifuge/go-substrate-rpc-client/v4/types"
"github.com/stretchr/testify/suite"
"go.uber.org/mock/gomock"
)

type ListenerTestSuite struct {
suite.Suite
listener *listener.SubstrateListener
mockClient *mock.MockChainConnection
mockEventHandler *mock.MockEventHandler
mockBlockStorer *mock.MockBlockStorer
mockBlockDeltaMeter *mock.MockBlockDeltaMeter
domainID uint8
}

func TestRunTestSuite(t *testing.T) {
suite.Run(t, new(ListenerTestSuite))
}

func (s *ListenerTestSuite) SetupTest() {
ctrl := gomock.NewController(s.T())
s.domainID = 1
s.mockClient = mock.NewMockChainConnection(ctrl)
s.mockEventHandler = mock.NewMockEventHandler(ctrl)
s.mockBlockStorer = mock.NewMockBlockStorer(ctrl)
s.mockBlockDeltaMeter = mock.NewMockBlockDeltaMeter(ctrl)
s.listener = listener.NewSubstrateListener(
s.mockClient,
[]listener.EventHandler{s.mockEventHandler, s.mockEventHandler},
s.mockBlockStorer,
s.mockBlockDeltaMeter,
s.domainID,
time.Millisecond*75,
big.NewInt(5),
)
}

func (s *ListenerTestSuite) Test_ListenToEvents_RetriesIfFinalizedHeadUnavailable() {
s.mockClient.EXPECT().GetFinalizedHead().Return(types.Hash{}, fmt.Errorf("error"))

ctx, cancel := context.WithCancel(context.Background())
go s.listener.ListenToEvents(ctx, big.NewInt(100))

time.Sleep(time.Millisecond * 50)
cancel()
}

func (s *ListenerTestSuite) Test_ListenToEvents_RetriesIfBlockUnavailable() {
s.mockClient.EXPECT().GetFinalizedHead().Return(types.Hash{}, nil)
s.mockClient.EXPECT().GetBlock(gomock.Any()).Return(nil, fmt.Errorf("error"))

ctx, cancel := context.WithCancel(context.Background())
go s.listener.ListenToEvents(ctx, big.NewInt(100))

time.Sleep(time.Millisecond * 50)
cancel()
}

func (s *ListenerTestSuite) Test_ListenToEvents_SleepsIfBlockTooNew() {
s.mockClient.EXPECT().GetFinalizedHead().Return(types.Hash{}, nil)
s.mockClient.EXPECT().GetBlock(gomock.Any()).Return(&types.SignedBlock{
Block: types.Block{
Header: types.Header{
Number: 104,
},
},
}, nil)

ctx, cancel := context.WithCancel(context.Background())
go s.listener.ListenToEvents(ctx, big.NewInt(100))

time.Sleep(time.Millisecond * 50)
cancel()
}

func (s *ListenerTestSuite) Test_ListenToEvents_RetriesInCaseOfHandlerFailure() {
startBlock := big.NewInt(100)
endBlock := big.NewInt(105)
head := big.NewInt(110)

// First pass
s.mockClient.EXPECT().GetFinalizedHead().Return(types.Hash{}, nil)
s.mockClient.EXPECT().GetBlock(gomock.Any()).Return(&types.SignedBlock{
Block: types.Block{
Header: types.Header{
Number: types.BlockNumber(head.Int64()),
},
},
}, nil)
s.mockBlockDeltaMeter.EXPECT().TrackBlockDelta(uint8(1), head, endBlock)
s.mockEventHandler.EXPECT().HandleEvents(startBlock, new(big.Int).Sub(endBlock, big.NewInt(1))).Return(fmt.Errorf("error"))
// Second pass
s.mockClient.EXPECT().GetFinalizedHead().Return(types.Hash{}, nil)
s.mockClient.EXPECT().GetBlock(gomock.Any()).Return(&types.SignedBlock{
Block: types.Block{
Header: types.Header{
Number: types.BlockNumber(head.Int64()),
},
},
}, nil)
s.mockBlockDeltaMeter.EXPECT().TrackBlockDelta(uint8(1), head, endBlock)
s.mockEventHandler.EXPECT().HandleEvents(startBlock, new(big.Int).Sub(endBlock, big.NewInt(1))).Return(nil)
s.mockEventHandler.EXPECT().HandleEvents(startBlock, new(big.Int).Sub(endBlock, big.NewInt(1))).Return(nil)
s.mockBlockStorer.EXPECT().StoreBlock(endBlock, s.domainID).Return(nil)
// third pass
s.mockClient.EXPECT().GetFinalizedHead().Return(types.Hash{}, nil)
s.mockClient.EXPECT().GetBlock(gomock.Any()).Return(&types.SignedBlock{
Block: types.Block{
Header: types.Header{
Number: 100,
},
},
}, nil)

ctx, cancel := context.WithCancel(context.Background())

go s.listener.ListenToEvents(ctx, big.NewInt(100))

time.Sleep(time.Millisecond * 50)
cancel()
}

func (s *ListenerTestSuite) Test_ListenToEvents_IgnoresBlockStorerError() {
startBlock := big.NewInt(100)
endBlock := big.NewInt(105)
head := big.NewInt(110)

// First pass
s.mockClient.EXPECT().GetFinalizedHead().Return(types.Hash{}, nil)
s.mockClient.EXPECT().GetBlock(gomock.Any()).Return(&types.SignedBlock{
Block: types.Block{
Header: types.Header{
Number: types.BlockNumber(head.Int64()),
},
},
}, nil)
s.mockBlockDeltaMeter.EXPECT().TrackBlockDelta(uint8(1), head, endBlock)
s.mockEventHandler.EXPECT().HandleEvents(startBlock, new(big.Int).Sub(endBlock, big.NewInt(1))).Return(nil)
s.mockEventHandler.EXPECT().HandleEvents(startBlock, new(big.Int).Sub(endBlock, big.NewInt(1))).Return(nil)
s.mockBlockStorer.EXPECT().StoreBlock(endBlock, s.domainID).Return(fmt.Errorf("error"))
// second pass
s.mockClient.EXPECT().GetFinalizedHead().Return(types.Hash{}, nil)
s.mockClient.EXPECT().GetBlock(gomock.Any()).Return(&types.SignedBlock{
Block: types.Block{
Header: types.Header{
Number: 95,
},
},
}, nil)

ctx, cancel := context.WithCancel(context.Background())
go s.listener.ListenToEvents(ctx, big.NewInt(100))

time.Sleep(time.Millisecond * 50)
cancel()
}

func (s *ListenerTestSuite) Test_ListenToEvents_UsesHeadAsStartBlockIfNilPassed() {
startBlock := big.NewInt(110)
endBlock := big.NewInt(115)
oldHead := big.NewInt(110)
newHead := big.NewInt(120)

s.mockClient.EXPECT().GetFinalizedHead().Return(types.Hash{}, nil)
s.mockClient.EXPECT().GetBlock(gomock.Any()).Return(&types.SignedBlock{
Block: types.Block{
Header: types.Header{
Number: types.BlockNumber(oldHead.Int64()),
},
},
}, nil)
s.mockClient.EXPECT().GetFinalizedHead().Return(types.Hash{}, nil)
s.mockClient.EXPECT().GetBlock(gomock.Any()).Return(&types.SignedBlock{
Block: types.Block{
Header: types.Header{
Number: types.BlockNumber(newHead.Int64()),
},
},
}, nil)
s.mockClient.EXPECT().GetFinalizedHead().Return(types.Hash{}, nil)
s.mockClient.EXPECT().GetBlock(gomock.Any()).Return(&types.SignedBlock{
Block: types.Block{
Header: types.Header{
Number: types.BlockNumber(95),
},
},
}, nil)

s.mockBlockDeltaMeter.EXPECT().TrackBlockDelta(uint8(1), big.NewInt(120), endBlock)

s.mockEventHandler.EXPECT().HandleEvents(startBlock, new(big.Int).Sub(endBlock, big.NewInt(1))).Return(nil)
s.mockEventHandler.EXPECT().HandleEvents(startBlock, new(big.Int).Sub(endBlock, big.NewInt(1))).Return(nil)
s.mockBlockStorer.EXPECT().StoreBlock(endBlock, s.domainID).Return(nil)

ctx, cancel := context.WithCancel(context.Background())

go s.listener.ListenToEvents(ctx, nil)

time.Sleep(time.Millisecond * 100)
cancel()
}
2 changes: 1 addition & 1 deletion mock/listener.go → mock/evmListener.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit b9d5369

Please sign in to comment.