Skip to content

Commit

Permalink
Fix failing tests due to event sequencing
Browse files Browse the repository at this point in the history
Signed-off-by: rodion <[email protected]>
  • Loading branch information
rodion-lim-partior committed Sep 5, 2024
1 parent 6c43218 commit 6c15c1a
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 69 deletions.
2 changes: 1 addition & 1 deletion internal/ethereum/event_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func newTestListener(t *testing.T, withMethods bool) (*listener, *rpcbackendmock
EventListenerOptions: options,
}

es, _, mRPC, done := testEventStream(t, l1req)
es, _, mRPC, done := testEventStream(t, nil, l1req)

l := es.listeners[*lID]
assert.NotNil(t, l)
Expand Down
177 changes: 109 additions & 68 deletions internal/ethereum/event_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,12 @@ import (
"github.com/stretchr/testify/mock"
)

func testEventStream(t *testing.T, listeners ...*ffcapi.EventListenerAddRequest) (*eventStream, chan *ffcapi.ListenerEvent, *rpcbackendmocks.Backend, func()) {
func testEventStream(t *testing.T, additionalMockCalls func(mRPC *rpcbackendmocks.Backend, doneFunc func()), listeners ...*ffcapi.EventListenerAddRequest) (*eventStream, chan *ffcapi.ListenerEvent, *rpcbackendmocks.Backend, func()) {
ctx, c, mRPC, done := newTestConnector(t)
mockStreamLoopEmpty(mRPC)
if additionalMockCalls != nil {
additionalMockCalls(mRPC, done)
}
return testEventStreamExistingConnector(t, ctx, done, c, mRPC, listeners...)
}

Expand Down Expand Up @@ -66,7 +69,7 @@ func testEventStreamExistingConnector(t *testing.T, ctx context.Context, done fu

func TestAddEventListenerMissingFilters(t *testing.T) {

es, _, _, done := testEventStream(t)
es, _, _, done := testEventStream(t, nil)
defer done()

_, err := es.addEventListener(es.ctx, &ffcapi.EventListenerAddRequest{
Expand All @@ -80,7 +83,7 @@ func TestAddEventListenerMissingFilters(t *testing.T) {

func TestAddEventListenerMissingFilterEvent(t *testing.T) {

es, _, _, done := testEventStream(t)
es, _, _, done := testEventStream(t, nil)
defer done()

_, err := es.addEventListener(es.ctx, &ffcapi.EventListenerAddRequest{
Expand All @@ -98,7 +101,7 @@ func TestAddEventListenerMissingFilterEvent(t *testing.T) {

func TestAddEventListenerBadFilterEvent(t *testing.T) {

es, _, _, done := testEventStream(t)
es, _, _, done := testEventStream(t, nil)
defer done()

_, err := es.addEventListener(es.ctx, &ffcapi.EventListenerAddRequest{
Expand All @@ -116,7 +119,7 @@ func TestAddEventListenerBadFilterEvent(t *testing.T) {

func TestAddEventListenerMultipleEvents(t *testing.T) {

es, _, _, done := testEventStream(t)
es, _, _, done := testEventStream(t, nil)
defer done()

l, err := es.addEventListener(es.ctx, &ffcapi.EventListenerAddRequest{
Expand All @@ -137,7 +140,7 @@ func TestAddEventListenerMultipleEvents(t *testing.T) {

func TestAddEventListenerBadOptions(t *testing.T) {

es, _, _, done := testEventStream(t)
es, _, _, done := testEventStream(t, nil)
defer done()

_, err := es.addEventListener(es.ctx, &ffcapi.EventListenerAddRequest{
Expand All @@ -156,7 +159,7 @@ func TestAddEventListenerBadOptions(t *testing.T) {

func TestAddEventListenerBadInitialBlock(t *testing.T) {

es, _, _, done := testEventStream(t)
es, _, _, done := testEventStream(t, nil)
defer done()

_, err := es.addEventListener(es.ctx, &ffcapi.EventListenerAddRequest{
Expand Down Expand Up @@ -187,7 +190,7 @@ func TestStartHeadBlockLimitedByChainHead(t *testing.T) {
},
}

es, _, _, done := testEventStream(t, l1req)
es, _, _, done := testEventStream(t, nil, l1req)
defer done()

assert.Equal(t, int64(testHighBlock), es.headBlock)
Expand All @@ -206,7 +209,46 @@ func TestCatchupThenRejoinLeadGroup(t *testing.T) {
},
}

es, events, mRPC, done := testEventStream(t, l1req)
listenerCaughtUp := make(chan struct{})
additionalMockCalls := func(mRPC *rpcbackendmocks.Backend, doneFunc func()) {
closed := false
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getLogs", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
ethLogs := make([]*logJSONRPC, 0)
filter := *args[3].(*logFilterJSONRPC)
fromBlock := filter.FromBlock.BigInt().Int64()
switch fromBlock {
case 1000:
ethLogs = append(ethLogs, &logJSONRPC{
BlockNumber: ethtypes.NewHexInteger64(1024),
TransactionIndex: ethtypes.NewHexInteger64(64),
LogIndex: ethtypes.NewHexInteger64(2),
BlockHash: ethtypes.MustNewHexBytes0xPrefix("0x6b012339fbb85b70c58ecfd97b31950c4a28bcef5226e12dbe551cb1abaf3b4c"),
Topics: []ethtypes.HexBytes0xPrefix{
ethtypes.MustNewHexBytes0xPrefix("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"),
ethtypes.MustNewHexBytes0xPrefix("0x0000000000000000000000003968ef051b422d3d1cdc182a88bba8dd922e6fa4"),
ethtypes.MustNewHexBytes0xPrefix("0x000000000000000000000000d0f2f5103fd050739a9fb567251bc460cc24d091"),
},
Data: ethtypes.MustNewHexBytes0xPrefix("0x00000000000000000000000000000000000000000000000000000000000003e8"),
})
case DefaultCatchupPageSize + 1000:
if !closed {
close(listenerCaughtUp)
closed = true
}
default:
<-listenerCaughtUp // hold the main group back until we've done the listener catchup
}
*args[1].(*[]*logJSONRPC) = ethLogs
})
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", "0x6b012339fbb85b70c58ecfd97b31950c4a28bcef5226e12dbe551cb1abaf3b4c", false).Return(nil).Run(func(args mock.Arguments) {
*args[1].(**blockInfoJSONRPC) = &blockInfoJSONRPC{
Number: ethtypes.NewHexInteger64(1024),
Hash: ethtypes.MustNewHexBytes0xPrefix("0x6b012339fbb85b70c58ecfd97b31950c4a28bcef5226e12dbe551cb1abaf3b4c"),
}
})
}

es, events, _, done := testEventStream(t, additionalMockCalls, l1req)
defer done()

l2req := &ffcapi.EventListenerAddRequest{
Expand All @@ -221,43 +263,6 @@ func TestCatchupThenRejoinLeadGroup(t *testing.T) {
},
}

closed := false
listenerCaughtUp := make(chan struct{})
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getLogs", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
ethLogs := make([]*logJSONRPC, 0)
filter := *args[3].(*logFilterJSONRPC)
fromBlock := filter.FromBlock.BigInt().Int64()
switch fromBlock {
case 1000:
ethLogs = append(ethLogs, &logJSONRPC{
BlockNumber: ethtypes.NewHexInteger64(1024),
TransactionIndex: ethtypes.NewHexInteger64(64),
LogIndex: ethtypes.NewHexInteger64(2),
BlockHash: ethtypes.MustNewHexBytes0xPrefix("0x6b012339fbb85b70c58ecfd97b31950c4a28bcef5226e12dbe551cb1abaf3b4c"),
Topics: []ethtypes.HexBytes0xPrefix{
ethtypes.MustNewHexBytes0xPrefix("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"),
ethtypes.MustNewHexBytes0xPrefix("0x0000000000000000000000003968ef051b422d3d1cdc182a88bba8dd922e6fa4"),
ethtypes.MustNewHexBytes0xPrefix("0x000000000000000000000000d0f2f5103fd050739a9fb567251bc460cc24d091"),
},
Data: ethtypes.MustNewHexBytes0xPrefix("0x00000000000000000000000000000000000000000000000000000000000003e8"),
})
case es.c.catchupPageSize + 1000:
if !closed {
close(listenerCaughtUp)
closed = true
}
default:
<-listenerCaughtUp // hold the main group back until we've done the listener catchup
}
*args[1].(*[]*logJSONRPC) = ethLogs
})
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", "0x6b012339fbb85b70c58ecfd97b31950c4a28bcef5226e12dbe551cb1abaf3b4c", false).Return(nil).Run(func(args mock.Arguments) {
*args[1].(**blockInfoJSONRPC) = &blockInfoJSONRPC{
Number: ethtypes.NewHexInteger64(1024),
Hash: ethtypes.MustNewHexBytes0xPrefix("0x6b012339fbb85b70c58ecfd97b31950c4a28bcef5226e12dbe551cb1abaf3b4c"),
}
})

_, _, err := es.c.EventListenerAdd(es.ctx, l2req)
assert.NoError(t, err)
l := es.listeners[*l2req.ListenerID]
Expand Down Expand Up @@ -343,23 +348,24 @@ func TestExitDuringCatchup(t *testing.T) {
},
}

_, _, mRPC, done := testEventStream(t, l1req)

completed := make(chan struct{})
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getLogs", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
ethLogs := make([]*logJSONRPC, 0)
filter := *args[3].(*logFilterJSONRPC)
fromBlock := filter.FromBlock.BigInt().Int64()
switch fromBlock {
default:
go func() {
done()
close(completed)
}()
}
*args[1].(*[]*logJSONRPC) = ethLogs
}).Once()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getLogs", mock.Anything).Return(nil)
additionalMockCalls := func(mRPC *rpcbackendmocks.Backend, doneFunc func()) {
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getLogs", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
ethLogs := make([]*logJSONRPC, 0)
filter := *args[3].(*logFilterJSONRPC)
fromBlock := filter.FromBlock.BigInt().Int64()
switch fromBlock {
default:
go func() {
doneFunc()
close(completed)
}()
}
*args[1].(*[]*logJSONRPC) = ethLogs
}).Once()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getLogs", mock.Anything).Return(nil)
}
testEventStream(t, additionalMockCalls, l1req)

<-completed
}
Expand All @@ -382,9 +388,14 @@ func TestLeadGroupDeliverEvents(t *testing.T) {
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) {
*args[1].(*ethtypes.HexInteger) = *ethtypes.NewHexInteger64(testHighBlock)
})
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Once().Run(func(args mock.Arguments) {
hbh := args[1].(*string)
*hbh = "filter_id1"
})
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", "filter_id1").Return(nil)
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newFilter", mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
*args[1].(*string) = "filter_id1"
*args[1].(*string) = "filter_id2"
}).Once()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterLogs", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*[]*logJSONRPC) = make([]*logJSONRPC, 0)
Expand Down Expand Up @@ -454,10 +465,15 @@ func TestLeadGroupNearBlockZeroEnsureNonNegative(t *testing.T) {
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) {
*args[1].(*ethtypes.HexInteger) = *ethtypes.NewHexInteger64(10)
})
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Once().Run(func(args mock.Arguments) {
hbh := args[1].(*string)
*hbh = "filter_id1"
})
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", "filter_id1").Return(nil)
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newFilter", mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
assert.Equal(t, int64(0), args[3].(*logFilterJSONRPC).FromBlock.BigInt().Int64())
*args[1].(*string) = "filter_id1"
*args[1].(*string) = "filter_id2"
}).Once()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterLogs", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*[]*logJSONRPC) = make([]*logJSONRPC, 0)
Expand Down Expand Up @@ -495,6 +511,11 @@ func TestLeadGroupCatchupRetry(t *testing.T) {
hbh := args[1].(*ethtypes.HexInteger)
*hbh = *ethtypes.NewHexInteger64(testHighBlock)
})
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Once().Run(func(args mock.Arguments) {
hbh := args[1].(*string)
*hbh = "filter_id1"
})
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", "filter_id1").Return(nil)
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getLogs", mock.Anything).Return(&rpcbackend.RPCError{Message: "pop"}).
Run(func(args mock.Arguments) {
close(retried)
Expand Down Expand Up @@ -527,6 +548,11 @@ func TestStreamLoopNewFilterFail(t *testing.T) {
hbh := args[1].(*ethtypes.HexInteger)
*hbh = *ethtypes.NewHexInteger64(testHighBlock)
})
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Once().Run(func(args mock.Arguments) {
hbh := args[1].(*string)
*hbh = "filter_id1"
})
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", "filter_id1").Return(nil)
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newFilter", mock.Anything).Return(&rpcbackend.RPCError{Message: "pop"}).
Run(func(args mock.Arguments) {
close(retried)
Expand Down Expand Up @@ -608,16 +634,21 @@ func TestStreamLoopChangeFilter(t *testing.T) {
hbh := args[1].(*ethtypes.HexInteger)
*hbh = *ethtypes.NewHexInteger64(testHighBlock)
})
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Once().Run(func(args mock.Arguments) {
hbh := args[1].(*string)
*hbh = "filter_id1"
})
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", "filter_id1").Return(nil)
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newFilter", mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
l2req.StreamID = es.id
_, _, err := c.EventListenerAdd(ctx, l2req)
assert.NoError(t, err)
*args[1].(*string) = "filter_id1"
*args[1].(*string) = "filter_id2"
}).Once()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newFilter", mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
*args[1].(*string) = "filter_id2"
*args[1].(*string) = "filter_id3"
close(reestablishedFilter)
})
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterLogs", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
Expand Down Expand Up @@ -656,9 +687,14 @@ func TestStreamLoopFilterReset(t *testing.T) {
hbh := args[1].(*ethtypes.HexInteger)
*hbh = *ethtypes.NewHexInteger64(testHighBlock)
})
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Once().Run(func(args mock.Arguments) {
hbh := args[1].(*string)
*hbh = "filter_id1"
})
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", "filter_id1").Return(nil)
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newFilter", mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
*args[1].(*string) = "filter_id1"
*args[1].(*string) = "filter_id2"
}).Once()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newFilter", mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
Expand Down Expand Up @@ -703,9 +739,14 @@ func TestStreamLoopEnrichFail(t *testing.T) {
hbh := args[1].(*ethtypes.HexInteger)
*hbh = *ethtypes.NewHexInteger64(testHighBlock)
})
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Once().Run(func(args mock.Arguments) {
hbh := args[1].(*string)
*hbh = "filter_id1"
})
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", "filter_id1").Return(nil).Maybe()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newFilter", mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
*args[1].(*string) = "filter_id1"
*args[1].(*string) = "filter_id2"
}).Once()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterLogs", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*[]*logJSONRPC) = []*logJSONRPC{
Expand Down

0 comments on commit 6c15c1a

Please sign in to comment.