Skip to content

Commit

Permalink
Merge pull request #92 from 0xPolygon/ccampbell/limit-unprocessed-query
Browse files Browse the repository at this point in the history
Limit number of keys to process simultaneously
  • Loading branch information
christophercampbell authored Jun 21, 2024
2 parents d98a596 + d11e731 commit 30f1bf2
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 29 deletions.
8 changes: 4 additions & 4 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type DB interface {
GetLastProcessedBlock(ctx context.Context, task string) (uint64, error)

StoreUnresolvedBatchKeys(ctx context.Context, bks []types.BatchKey, dbTx sqlx.ExecerContext) error
GetUnresolvedBatchKeys(ctx context.Context) ([]types.BatchKey, error)
GetUnresolvedBatchKeys(ctx context.Context, limit uint) ([]types.BatchKey, error)
DeleteUnresolvedBatchKeys(ctx context.Context, bks []types.BatchKey, dbTx sqlx.ExecerContext) error

Exists(ctx context.Context, key common.Hash) bool
Expand Down Expand Up @@ -113,10 +113,10 @@ func (db *pgDB) StoreUnresolvedBatchKeys(ctx context.Context, bks []types.BatchK
}

// GetUnresolvedBatchKeys returns the unresolved batch keys from the database
func (db *pgDB) GetUnresolvedBatchKeys(ctx context.Context) ([]types.BatchKey, error) {
const getUnresolvedBatchKeysSQL = "SELECT num, hash FROM data_node.unresolved_batches;"
func (db *pgDB) GetUnresolvedBatchKeys(ctx context.Context, limit uint) ([]types.BatchKey, error) {
const getUnresolvedBatchKeysSQL = "SELECT num, hash FROM data_node.unresolved_batches LIMIT $1;"

rows, err := db.pg.QueryxContext(ctx, getUnresolvedBatchKeysSQL)
rows, err := db.pg.QueryxContext(ctx, getUnresolvedBatchKeysSQL, limit)
if err != nil {
return nil, err
}
Expand Down
5 changes: 3 additions & 2 deletions db/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,8 @@ func Test_DB_GetUnresolvedBatchKeys(t *testing.T) {
// Seed data
seedUnresolvedBatchKeys(t, wdb, mock, tt.bks)

expected := mock.ExpectQuery(`SELECT num, hash FROM data_node\.unresolved_batches`)
var limit = uint(10)
expected := mock.ExpectQuery(`SELECT num, hash FROM data_node\.unresolved_batches LIMIT \$1\;`).WithArgs(limit)

if tt.returnErr != nil {
expected.WillReturnError(tt.returnErr)
Expand All @@ -256,7 +257,7 @@ func Test_DB_GetUnresolvedBatchKeys(t *testing.T) {

dbPG := New(wdb)

data, err := dbPG.GetUnresolvedBatchKeys(context.Background())
data, err := dbPG.GetUnresolvedBatchKeys(context.Background(), limit)
if tt.returnErr != nil {
require.ErrorIs(t, err, tt.returnErr)
} else {
Expand Down
29 changes: 15 additions & 14 deletions mocks/db.generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions synchronizer/batches_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@ func TestBatchSynchronizer_HandleUnresolvedBatches(t *testing.T) {
t.Parallel()

testFn(t, testConfig{
getUnresolvedBatchKeysArgs: []interface{}{mock.Anything},
getUnresolvedBatchKeysArgs: []interface{}{mock.Anything, uint(100)},
getUnresolvedBatchKeysReturns: []interface{}{nil, errors.New("error")},
isErrorExpected: true,
})
Expand All @@ -653,7 +653,7 @@ func TestBatchSynchronizer_HandleUnresolvedBatches(t *testing.T) {
t.Parallel()

testFn(t, testConfig{
getUnresolvedBatchKeysArgs: []interface{}{mock.Anything},
getUnresolvedBatchKeysArgs: []interface{}{mock.Anything, uint(100)},
getUnresolvedBatchKeysReturns: []interface{}{nil, nil},
isErrorExpected: false,
})
Expand All @@ -663,7 +663,7 @@ func TestBatchSynchronizer_HandleUnresolvedBatches(t *testing.T) {
t.Parallel()

testFn(t, testConfig{
getUnresolvedBatchKeysArgs: []interface{}{mock.Anything},
getUnresolvedBatchKeysArgs: []interface{}{mock.Anything, uint(100)},
getUnresolvedBatchKeysReturns: []interface{}{
[]types.BatchKey{{
Number: 10,
Expand Down Expand Up @@ -691,7 +691,7 @@ func TestBatchSynchronizer_HandleUnresolvedBatches(t *testing.T) {
t.Parallel()

testFn(t, testConfig{
getUnresolvedBatchKeysArgs: []interface{}{mock.Anything},
getUnresolvedBatchKeysArgs: []interface{}{mock.Anything, uint(100)},
getUnresolvedBatchKeysReturns: []interface{}{
[]types.BatchKey{{
Number: 10,
Expand Down
5 changes: 3 additions & 2 deletions synchronizer/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ import (
)

const (
initBlockTimeout = 15 * time.Second
minCodeLen = 2
initBlockTimeout = 15 * time.Second
minCodeLen = 2
maxUnprocessedBatch = 100
)

// InitStartBlock initializes the L1 sync task by finding the inception block for the CDKValidium contract
Expand Down
2 changes: 1 addition & 1 deletion synchronizer/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func getUnresolvedBatchKeys(parentCtx context.Context, db dbTypes.DB) ([]types.B
ctx, cancel := context.WithTimeout(parentCtx, dbTimeout)
defer cancel()

return db.GetUnresolvedBatchKeys(ctx)
return db.GetUnresolvedBatchKeys(ctx, maxUnprocessedBatch)
}

func deleteUnresolvedBatchKeys(parentCtx context.Context, db dbTypes.DB, keys []types.BatchKey) error {
Expand Down
4 changes: 2 additions & 2 deletions synchronizer/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func Test_getUnresolvedBatchKeys(t *testing.T) {
db: func(t *testing.T) db.DB {
mockDB := mocks.NewDB(t)

mockDB.On("GetUnresolvedBatchKeys", mock.Anything).
mockDB.On("GetUnresolvedBatchKeys", mock.Anything, uint(100)).
Return(nil, testError)

return mockDB
Expand All @@ -323,7 +323,7 @@ func Test_getUnresolvedBatchKeys(t *testing.T) {
db: func(t *testing.T) db.DB {
mockDB := mocks.NewDB(t)

mockDB.On("GetUnresolvedBatchKeys", mock.Anything).Return(testData, nil)
mockDB.On("GetUnresolvedBatchKeys", mock.Anything, uint(100)).Return(testData, nil)

return mockDB
},
Expand Down

0 comments on commit 30f1bf2

Please sign in to comment.