From 81c126634ac074fccdbadf255384fa72decaafa9 Mon Sep 17 00:00:00 2001 From: Antonio Navarro Perez Date: Thu, 1 Feb 2024 15:03:29 +0100 Subject: [PATCH] fix(storage): Refactor improving performance and atomicity. Overview: This commit overhauls the Storage layer to set the foundation for a high-performance filtering system based on bitmap indexing. The refactoring introduces a more efficient data handling model and resolves critical issues that affected stability and performance. Key Changes: - Introduced the concept of 'WriteBatch' in the Storage interface to support efficient data operations. - Removed the DB interface to eliminate excessive abstraction, making difficult the implementation of database-specific optimizations. - Fixed the handling of byte arrays in Pebble's closer method to prevent data corruption. The Storage layer now ensures that data is parsed correctly before the Get operation is closed, eliminating the risk of random failures. - Implemented batching for data writting from remote nodes, enhancing data consistency and speed. - Restructured Mock objects into an internal package, restricting access to internals and eliminating the need for specific build tags. Signed-off-by: Antonio Navarro Perez --- .github/golangci.yaml | 2 - .github/workflows/test.yaml | 4 +- Makefile | 2 +- cmd/start.go | 9 +- fetch/fetch.go | 29 +-- fetch/fetch_test.go | 187 +++++++++++--------- fetch/mocks_test.go | 48 +---- fetch/types.go | 17 +- internal/mock/filters.go | 45 +++++ internal/mock/storage.go | 100 +++++++++++ serve/filters/manager.go | 6 +- serve/filters/manager_test.go | 15 +- serve/filters/mocks/mocks.go | 74 -------- serve/filters/subscription/block_test.go | 9 +- serve/filters/types.go | 10 -- serve/handlers/subs/subs_test.go | 37 ++-- serve/jsonrpc.go | 16 +- storage/pebble.go | 131 ++++++++++++++ storage/pebble/pebble.go | 51 ------ storage/pebble/pebble_test.go | 76 -------- storage/{storage_test.go => pebble_test.go} | 33 ++-- storage/storage.go | 90 ---------- storage/types.go | 52 ++++++ 23 files changed, 530 insertions(+), 513 deletions(-) create mode 100644 internal/mock/filters.go create mode 100644 internal/mock/storage.go delete mode 100644 serve/filters/mocks/mocks.go create mode 100644 storage/pebble.go delete mode 100644 storage/pebble/pebble.go delete mode 100644 storage/pebble/pebble_test.go rename storage/{storage_test.go => pebble_test.go} (83%) delete mode 100644 storage/storage.go create mode 100644 storage/types.go diff --git a/.github/golangci.yaml b/.github/golangci.yaml index 7539e04b..cade26c4 100644 --- a/.github/golangci.yaml +++ b/.github/golangci.yaml @@ -7,8 +7,6 @@ run: modules-download-mode: readonly allow-parallel-runners: false go: "" - build-tags: - - testmocks skip-dirs: - serve/filters/mocks diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 9afd7321..83a11d24 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -15,7 +15,7 @@ jobs: uses: actions/checkout@v4 - name: Go test - run: go test -tags "testmocks" -shuffle=on -coverprofile coverage.out -timeout 5m ./... + run: go test -shuffle=on -coverprofile coverage.out -timeout 5m ./... test-with-race: runs-on: ubuntu-latest @@ -29,4 +29,4 @@ jobs: uses: actions/checkout@v4 - name: Go race test - run: go test -tags "testmocks" -race -shuffle=on -timeout 5m ./... + run: go test -race -shuffle=on -timeout 5m ./... diff --git a/Makefile b/Makefile index 9c155e6d..0d55d775 100644 --- a/Makefile +++ b/Makefile @@ -22,4 +22,4 @@ fixalign: .PHONY: test test: go clean -testcache - go test -v -tags "testmocks" ./... \ No newline at end of file + go test -v ./... \ No newline at end of file diff --git a/cmd/start.go b/cmd/start.go index 0e760703..ff8da971 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -6,13 +6,14 @@ import ( "flag" "fmt" + "github.com/peterbourgon/ff/v3/ffcli" + "go.uber.org/zap" + "github.com/gnolang/tx-indexer/client" "github.com/gnolang/tx-indexer/events" "github.com/gnolang/tx-indexer/fetch" "github.com/gnolang/tx-indexer/serve" "github.com/gnolang/tx-indexer/storage" - "github.com/peterbourgon/ff/v3/ffcli" - "go.uber.org/zap" ) const ( @@ -112,7 +113,7 @@ func (c *startCfg) exec(ctx context.Context) error { } // Create a DB instance - db, err := storage.New(c.dbPath) + db, err := storage.NewPebble(c.dbPath) if err != nil { return fmt.Errorf("unable to open storage DB, %w", err) } @@ -165,7 +166,7 @@ func (c *startCfg) exec(ctx context.Context) error { // setupJSONRPC sets up the JSONRPC instance func setupJSONRPC( listenAddress string, - db *storage.Storage, + db *storage.Pebble, em *events.Manager, logger *zap.Logger, ) *serve.JSONRPC { diff --git a/fetch/fetch.go b/fetch/fetch.go index f3b59893..970e2feb 100644 --- a/fetch/fetch.go +++ b/fetch/fetch.go @@ -8,10 +8,12 @@ import ( "sort" "time" - storageErrors "github.com/gnolang/tx-indexer/storage/errors" - "github.com/gnolang/tx-indexer/types" queue "github.com/madz-lab/insertion-queue" "go.uber.org/zap" + + "github.com/gnolang/tx-indexer/storage" + storageErrors "github.com/gnolang/tx-indexer/storage/errors" + "github.com/gnolang/tx-indexer/types" ) const ( @@ -22,7 +24,7 @@ const ( // Fetcher is an instance of the block indexer // fetcher type Fetcher struct { - storage Storage + storage storage.Storage client Client events Events @@ -38,7 +40,7 @@ type Fetcher struct { // New creates a new data fetcher instance // that gets blockchain data from a remote chain func New( - storage Storage, + storage storage.Storage, client Client, events Events, opts ...Option, @@ -176,9 +178,11 @@ func (f *Fetcher) FetchChainData(ctx context.Context) error { // Pop the next chunk f.chunkBuffer.PopFront() + wb := f.storage.WriteBatch() + // Save the fetched data for blockIndex, block := range item.chunk.blocks { - if saveErr := f.storage.SaveBlock(block); saveErr != nil { + if saveErr := wb.SetBlock(block); saveErr != nil { // This is a design choice that really highlights the strain // of keeping legacy testnets running. Current TM2 testnets // have blocks / transactions that are no longer compatible @@ -189,21 +193,21 @@ func (f *Fetcher) FetchChainData(ctx context.Context) error { continue } - f.logger.Debug("Saved block data", zap.Int64("number", block.Height)) + f.logger.Debug("Added block data to batch", zap.Int64("number", block.Height)) // Get block results txResults := item.chunk.results[blockIndex] // Save the fetched transaction results for _, txResult := range txResults { - if err := f.storage.SaveTx(txResult); err != nil { + if err := wb.SetTx(txResult); err != nil { f.logger.Error("unable to save tx", zap.String("err", err.Error())) continue } f.logger.Debug( - "Saved tx", + "Added tx to batch", zap.String("hash", base64.StdEncoding.EncodeToString(txResult.Tx.Hash())), ) } @@ -218,15 +222,20 @@ func (f *Fetcher) FetchChainData(ctx context.Context) error { } f.logger.Info( - "Saved block and tx data for range", + "Added to batch block and tx data for range", zap.Int64("from", item.chunkRange.from), zap.Int64("to", item.chunkRange.to), ) // Save the latest height data - if err := f.storage.SaveLatestHeight(item.chunkRange.to); err != nil { + if err := wb.SetLatestHeight(item.chunkRange.to); err != nil { + defer wb.Rollback() return fmt.Errorf("unable to save latest height info, %w", err) } + + if err := wb.Commit(); err != nil { + return fmt.Errorf("error persisting block information into storage, %w", err) + } } } } diff --git a/fetch/fetch_test.go b/fetch/fetch_test.go index 9e38fe3d..a4ed6296 100644 --- a/fetch/fetch_test.go +++ b/fetch/fetch_test.go @@ -13,13 +13,16 @@ import ( "github.com/gnolang/gno/tm2/pkg/bft/state" "github.com/gnolang/gno/tm2/pkg/bft/types" "github.com/gnolang/gno/tm2/pkg/std" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + clientTypes "github.com/gnolang/tx-indexer/client/types" "github.com/gnolang/tx-indexer/events" + "github.com/gnolang/tx-indexer/internal/mock" + "github.com/gnolang/tx-indexer/storage" storageErrors "github.com/gnolang/tx-indexer/storage/errors" indexerTypes "github.com/gnolang/tx-indexer/types" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.uber.org/zap" ) func TestFetcher_FetchTransactions_Invalid(t *testing.T) { @@ -31,8 +34,8 @@ func TestFetcher_FetchTransactions_Invalid(t *testing.T) { var ( fetchErr = errors.New("random DB error") - mockStorage = &mockStorage{ - getLatestSavedHeightFn: func() (int64, error) { + mockStorage = &mock.Storage{ + GetLatestSavedHeightFn: func() (int64, error) { return 0, fetchErr }, } @@ -84,31 +87,35 @@ func TestFetcher_FetchTransactions_Valid_FullBlocks(t *testing.T) { latestSaved = int64(0) - mockStorage = &mockStorage{ - getLatestSavedHeightFn: func() (int64, error) { + mockStorage = &mock.Storage{ + GetLatestSavedHeightFn: func() (int64, error) { if latestSaved == 0 { return 0, storageErrors.ErrNotFound } return latestSaved, nil }, - saveBlockFn: func(block *types.Block) error { - savedBlocks = append(savedBlocks, block) - - // Check if all blocks are saved - if block.Height == int64(blockNum) { - // At this point, we can cancel the process - cancelFn() - } + GetWriteBatchFn: func() storage.Batch { + return &mock.WriteBatch{ + SetBlockFn: func(block *types.Block) error { + savedBlocks = append(savedBlocks, block) + + // Check if all blocks are saved + if block.Height == int64(blockNum) { + // At this point, we can cancel the process + cancelFn() + } - latestSaved = block.Height + latestSaved = block.Height - return nil - }, - saveTxFn: func(result *types.TxResult) error { - savedTxs = append(savedTxs, result) + return nil + }, + SetTxFn: func(result *types.TxResult) error { + savedTxs = append(savedTxs, result) - return nil + return nil + }, + } }, } @@ -235,31 +242,35 @@ func TestFetcher_FetchTransactions_Valid_FullBlocks(t *testing.T) { latestSaved = int64(0) - mockStorage = &mockStorage{ - getLatestSavedHeightFn: func() (int64, error) { + mockStorage = &mock.Storage{ + GetLatestSavedHeightFn: func() (int64, error) { if latestSaved == 0 { return 0, storageErrors.ErrNotFound } return latestSaved, nil }, - saveBlockFn: func(block *types.Block) error { - savedBlocks = append(savedBlocks, block) - - // Check if all blocks are saved - if block.Height == int64(blockNum) { - // At this point, we can cancel the process - cancelFn() - } + GetWriteBatchFn: func() storage.Batch { + return &mock.WriteBatch{ + SetBlockFn: func(block *types.Block) error { + savedBlocks = append(savedBlocks, block) + + // Check if all blocks are saved + if block.Height == int64(blockNum) { + // At this point, we can cancel the process + cancelFn() + } - latestSaved = block.Height + latestSaved = block.Height - return nil - }, - saveTxFn: func(result *types.TxResult) error { - savedTxs = append(savedTxs, result) + return nil + }, + SetTxFn: func(result *types.TxResult) error { + savedTxs = append(savedTxs, result) - return nil + return nil + }, + } }, } @@ -405,25 +416,29 @@ func TestFetcher_FetchTransactions_Valid_EmptyBlocks(t *testing.T) { }, } - mockStorage = &mockStorage{ - getLatestSavedHeightFn: func() (int64, error) { + mockStorage = &mock.Storage{ + GetLatestSavedHeightFn: func() (int64, error) { return 0, storageErrors.ErrNotFound }, - saveBlockFn: func(block *types.Block) error { - savedBlocks = append(savedBlocks, block) - - // Check if all blocks are saved - if block.Height == int64(blockNum) { - // At this point, we can cancel the process - cancelFn() - } + GetWriteBatchFn: func() storage.Batch { + return &mock.WriteBatch{ + SetBlockFn: func(block *types.Block) error { + savedBlocks = append(savedBlocks, block) + + // Check if all blocks are saved + if block.Height == int64(blockNum) { + // At this point, we can cancel the process + cancelFn() + } - return nil - }, - saveTxFn: func(_ *types.TxResult) error { - t.Fatalf("should not save txs") + return nil + }, + SetTxFn: func(_ *types.TxResult) error { + t.Fatalf("should not save txs") - return nil + return nil + }, + } }, } @@ -507,25 +522,29 @@ func TestFetcher_FetchTransactions_Valid_EmptyBlocks(t *testing.T) { }, } - mockStorage = &mockStorage{ - getLatestSavedHeightFn: func() (int64, error) { + mockStorage = &mock.Storage{ + GetLatestSavedHeightFn: func() (int64, error) { return 0, storageErrors.ErrNotFound }, - saveBlockFn: func(block *types.Block) error { - savedBlocks = append(savedBlocks, block) - - // Check if all blocks are saved - if block.Height == int64(blockNum) { - // At this point, we can cancel the process - cancelFn() - } + GetWriteBatchFn: func() storage.Batch { + return &mock.WriteBatch{ + SetBlockFn: func(block *types.Block) error { + savedBlocks = append(savedBlocks, block) + + // Check if all blocks are saved + if block.Height == int64(blockNum) { + // At this point, we can cancel the process + cancelFn() + } - return nil - }, - saveTxFn: func(_ *types.TxResult) error { - t.Fatalf("should not save txs") + return nil + }, + SetTxFn: func(_ *types.TxResult) error { + t.Fatalf("should not save txs") - return nil + return nil + }, + } }, } @@ -623,25 +642,29 @@ func TestFetcher_InvalidBlocks(t *testing.T) { }, } - mockStorage = &mockStorage{ - getLatestSavedHeightFn: func() (int64, error) { + mockStorage = &mock.Storage{ + GetLatestSavedHeightFn: func() (int64, error) { return 0, storageErrors.ErrNotFound }, - saveBlockFn: func(block *types.Block) error { - savedBlocks = append(savedBlocks, block) + GetWriteBatchFn: func() storage.Batch { + return &mock.WriteBatch{ + SetBlockFn: func(block *types.Block) error { + savedBlocks = append(savedBlocks, block) + + // Check if all blocks are saved + if block.Height == int64(blockNum) { + // At this point, we can cancel the process + cancelFn() + } + + return fmt.Errorf("unable to save block %d", block.Height) + }, + SetTxFn: func(_ *types.TxResult) error { + t.Fatalf("should not save txs") - // Check if all blocks are saved - if block.Height == int64(blockNum) { - // At this point, we can cancel the process - cancelFn() + return nil + }, } - - return fmt.Errorf("unable to save block %d", block.Height) - }, - saveTxFn: func(_ *types.TxResult) error { - t.Fatalf("should not save txs") - - return nil }, } diff --git a/fetch/mocks_test.go b/fetch/mocks_test.go index c7735bfc..dc966054 100644 --- a/fetch/mocks_test.go +++ b/fetch/mocks_test.go @@ -2,57 +2,11 @@ package fetch import ( core_types "github.com/gnolang/gno/tm2/pkg/bft/rpc/core/types" - "github.com/gnolang/gno/tm2/pkg/bft/types" + clientTypes "github.com/gnolang/tx-indexer/client/types" "github.com/gnolang/tx-indexer/events" ) -type ( - getLatestHeightDelegate func() (int64, error) - saveLatestHeightDelegate func(int64) error - saveBlockDelegate func(*types.Block) error - saveTxDelegate func(*types.TxResult) error -) - -type mockStorage struct { - getLatestSavedHeightFn getLatestHeightDelegate - saveLatestHeightFn saveLatestHeightDelegate - saveBlockFn saveBlockDelegate - saveTxFn saveTxDelegate -} - -func (m *mockStorage) GetLatestHeight() (int64, error) { - if m.getLatestSavedHeightFn != nil { - return m.getLatestSavedHeightFn() - } - - return 0, nil -} - -func (m *mockStorage) SaveLatestHeight(blockNum int64) error { - if m.saveLatestHeightFn != nil { - return m.saveLatestHeightFn(blockNum) - } - - return nil -} - -func (m *mockStorage) SaveTx(tx *types.TxResult) error { - if m.saveTxFn != nil { - return m.saveTxFn(tx) - } - - return nil -} - -func (m *mockStorage) SaveBlock(block *types.Block) error { - if m.saveBlockFn != nil { - return m.saveBlockFn(block) - } - - return nil -} - type ( getLatestBlockNumberDelegate func() (int64, error) getBlockDelegate func(int64) (*core_types.ResultBlock, error) diff --git a/fetch/types.go b/fetch/types.go index 8bbcdb18..80a0cac0 100644 --- a/fetch/types.go +++ b/fetch/types.go @@ -2,26 +2,11 @@ package fetch import ( core_types "github.com/gnolang/gno/tm2/pkg/bft/rpc/core/types" - "github.com/gnolang/gno/tm2/pkg/bft/types" + clientTypes "github.com/gnolang/tx-indexer/client/types" "github.com/gnolang/tx-indexer/events" ) -// Storage defines the transaction storage interface -type Storage interface { - // GetLatestHeight returns the latest block height from the storage - GetLatestHeight() (int64, error) - - // SaveLatestHeight saves the latest block height to the storage - SaveLatestHeight(int64) error - - // SaveBlock saves the block to the permanent storage - SaveBlock(block *types.Block) error - - // SaveTx saves the transaction to the permanent storage - SaveTx(tx *types.TxResult) error -} - // Client defines the interface for the node (client) communication type Client interface { // GetLatestBlockNumber returns the latest block height from the chain diff --git a/internal/mock/filters.go b/internal/mock/filters.go new file mode 100644 index 00000000..ddc8b5d2 --- /dev/null +++ b/internal/mock/filters.go @@ -0,0 +1,45 @@ +package mock + +import ( + "github.com/gnolang/tx-indexer/events" +) + +type ( + writeDataFnDelegate func(any) error +) + +type Conn struct { + WriteDataFn writeDataFnDelegate +} + +func (m *Conn) WriteData(data any) error { + if m.WriteDataFn != nil { + return m.WriteDataFn(data) + } + + return nil +} + +type ( + subscribeDelegate func([]events.Type) *events.Subscription + cancelSubscriptionDelegate func(events.SubscriptionID) +) + +type Events struct { + SubscribeFn subscribeDelegate + CancelSubscriptionFn cancelSubscriptionDelegate +} + +func (m *Events) Subscribe(eventTypes []events.Type) *events.Subscription { + if m.SubscribeFn != nil { + return m.SubscribeFn(eventTypes) + } + + return nil +} + +func (m *Events) CancelSubscription(id events.SubscriptionID) { + if m.CancelSubscriptionFn != nil { + m.CancelSubscriptionFn(id) + } +} diff --git a/internal/mock/storage.go b/internal/mock/storage.go new file mode 100644 index 00000000..26b667e0 --- /dev/null +++ b/internal/mock/storage.go @@ -0,0 +1,100 @@ +package mock + +import ( + "github.com/gnolang/gno/tm2/pkg/bft/types" + + "github.com/gnolang/tx-indexer/storage" +) + +var _ storage.Storage = &Storage{} + +type Storage struct { + GetLatestSavedHeightFn func() (int64, error) + GetWriteBatchFn func() storage.Batch + GetBlockFn func(int64) (*types.Block, error) + GetTxFn func([]byte) (*types.TxResult, error) +} + +func (m *Storage) GetLatestHeight() (int64, error) { + if m.GetLatestSavedHeightFn != nil { + return m.GetLatestSavedHeightFn() + } + + return 0, nil +} + +// GetBlock fetches the block by its number +func (m *Storage) GetBlock(blockNum int64) (*types.Block, error) { + if m.GetBlockFn != nil { + return m.GetBlockFn(blockNum) + } + + panic("not implemented") +} + +// GetTx fetches the tx using its hash +func (m *Storage) GetTx(tx []byte) (*types.TxResult, error) { + if m.GetTxFn != nil { + return m.GetTxFn(tx) + } + + panic("not implemented") +} + +// WriteBatch provides a batch intended to do a write action that +// can be cancelled or commited all at the same time +func (m *Storage) WriteBatch() storage.Batch { + if m.GetWriteBatchFn != nil { + return m.GetWriteBatchFn() + } + + panic("not implemented") +} + +func (m *Storage) Close() error { + return nil +} + +type WriteBatch struct { + SetLatestHeightFn func(int64) error + SetBlockFn func(*types.Block) error + SetTxFn func(*types.TxResult) error +} + +// SetLatestHeight saves the latest block height to the storage +func (mb *WriteBatch) SetLatestHeight(h int64) error { + if mb.SetLatestHeightFn != nil { + return mb.SetLatestHeightFn(h) + } + + return nil +} + +// SetBlock saves the block to the permanent storage +func (mb *WriteBatch) SetBlock(block *types.Block) error { + if mb.SetBlockFn != nil { + return mb.SetBlockFn(block) + } + + return nil +} + +// SetTx saves the transaction to the permanent storage +func (mb *WriteBatch) SetTx(tx *types.TxResult) error { + if mb.SetTxFn != nil { + return mb.SetTxFn(tx) + } + + return nil +} + +// Commit stores all the provided info on the storage and make +// it available for other storage readers +func (mb *WriteBatch) Commit() error { + return nil +} + +// Rollback rollbacks the operation not persisting the provided changes +func (mb *WriteBatch) Rollback() error { + return nil +} diff --git a/serve/filters/manager.go b/serve/filters/manager.go index c5313e40..d5897183 100644 --- a/serve/filters/manager.go +++ b/serve/filters/manager.go @@ -6,17 +6,19 @@ import ( "time" "github.com/gnolang/gno/tm2/pkg/bft/types" + "github.com/gnolang/tx-indexer/events" "github.com/gnolang/tx-indexer/serve/conns" "github.com/gnolang/tx-indexer/serve/filters/filter" filterSubscription "github.com/gnolang/tx-indexer/serve/filters/subscription" + "github.com/gnolang/tx-indexer/storage" commonTypes "github.com/gnolang/tx-indexer/types" ) // Manager manages all running filters type Manager struct { ctx context.Context - storage Storage // temporarily unused + storage storage.Storage // temporarily unused events Events filters *filterMap subscriptions *subscriptionMap @@ -26,7 +28,7 @@ type Manager struct { // NewFilterManager creates new filter manager object func NewFilterManager( ctx context.Context, - storage Storage, + storage storage.Storage, events Events, opts ...Option, ) *Manager { diff --git a/serve/filters/manager_test.go b/serve/filters/manager_test.go index cfabce54..7e8314d6 100644 --- a/serve/filters/manager_test.go +++ b/serve/filters/manager_test.go @@ -7,12 +7,13 @@ import ( "time" tm2Types "github.com/gnolang/gno/tm2/pkg/bft/types" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/gnolang/tx-indexer/events" + "github.com/gnolang/tx-indexer/internal/mock" "github.com/gnolang/tx-indexer/serve/filters/filter" - "github.com/gnolang/tx-indexer/serve/filters/mocks" "github.com/gnolang/tx-indexer/types" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) // generateBlocks generates dummy blocks @@ -39,7 +40,7 @@ func Test_BlockFilters(t *testing.T) { filterManager := NewFilterManager( context.Background(), - &mocks.MockStorage{}, + &mock.Storage{}, events.NewManager(), ) @@ -77,7 +78,7 @@ func Test_NewBlockEvents(t *testing.T) { blocks = generateBlocks(t, 10) blockCh = make(chan events.Event) - mockEvents = &mocks.MockEvents{ + mockEvents = &mock.Events{ SubscribeFn: func(_ []events.Type) *events.Subscription { return &events.Subscription{ SubCh: blockCh, @@ -89,7 +90,7 @@ func Test_NewBlockEvents(t *testing.T) { // Init filter manager filterManager := NewFilterManager( context.Background(), - &mocks.MockStorage{}, + &mock.Storage{}, mockEvents, ) @@ -163,7 +164,7 @@ func Test_FilterCleanup(t *testing.T) { // Create filter manager filterManager := NewFilterManager( context.Background(), - &mocks.MockStorage{}, + &mock.Storage{}, events.NewManager(), WithCleanupInterval(10*time.Millisecond), ) diff --git a/serve/filters/mocks/mocks.go b/serve/filters/mocks/mocks.go deleted file mode 100644 index b2256665..00000000 --- a/serve/filters/mocks/mocks.go +++ /dev/null @@ -1,74 +0,0 @@ -//go:build testmocks - -package mocks - -import ( - tm2Types "github.com/gnolang/gno/tm2/pkg/bft/types" - "github.com/gnolang/tx-indexer/events" -) - -type ( - writeDataFnDelegate func(any) error -) - -type MockConn struct { - WriteDataFn writeDataFnDelegate -} - -func (m *MockConn) WriteData(data any) error { - if m.WriteDataFn != nil { - return m.WriteDataFn(data) - } - - return nil -} - -type ( - getBlockDelegate func(int64) (*tm2Types.Block, error) - getTxDelegate func([]byte) (*tm2Types.TxResult, error) -) - -type MockStorage struct { - GetBlockFn getBlockDelegate - GetTxFn getTxDelegate -} - -func (m *MockStorage) GetBlock(blockNum int64) (*tm2Types.Block, error) { - if m.GetBlockFn != nil { - return m.GetBlockFn(blockNum) - } - - return nil, nil -} - -func (m *MockStorage) GetTx(hash []byte) (*tm2Types.TxResult, error) { - if m.GetTxFn != nil { - return m.GetTxFn(hash) - } - - return nil, nil -} - -type ( - subscribeDelegate func([]events.Type) *events.Subscription - cancelSubscriptionDelegate func(events.SubscriptionID) -) - -type MockEvents struct { - SubscribeFn subscribeDelegate - CancelSubscriptionFn cancelSubscriptionDelegate -} - -func (m *MockEvents) Subscribe(eventTypes []events.Type) *events.Subscription { - if m.SubscribeFn != nil { - return m.SubscribeFn(eventTypes) - } - - return nil -} - -func (m *MockEvents) CancelSubscription(id events.SubscriptionID) { - if m.CancelSubscriptionFn != nil { - m.CancelSubscriptionFn(id) - } -} diff --git a/serve/filters/subscription/block_test.go b/serve/filters/subscription/block_test.go index 2cdef697..1ef5e9b3 100644 --- a/serve/filters/subscription/block_test.go +++ b/serve/filters/subscription/block_test.go @@ -4,11 +4,12 @@ import ( "testing" "github.com/gnolang/gno/tm2/pkg/bft/types" - "github.com/gnolang/tx-indexer/serve/encode" - "github.com/gnolang/tx-indexer/serve/filters/mocks" - "github.com/gnolang/tx-indexer/serve/spec" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/gnolang/tx-indexer/internal/mock" + "github.com/gnolang/tx-indexer/serve/encode" + "github.com/gnolang/tx-indexer/serve/spec" ) func TestBlockSubscription_WriteResponse(t *testing.T) { @@ -29,7 +30,7 @@ func TestBlockSubscription_WriteResponse(t *testing.T) { expectedBlockResponse := spec.NewJSONSubscribeResponse("", encodedResponse) - mockConn := &mocks.MockConn{ + mockConn := &mock.Conn{ WriteDataFn: func(data any) error { capturedWrite = data diff --git a/serve/filters/types.go b/serve/filters/types.go index b3d6ff6c..f89dcd65 100644 --- a/serve/filters/types.go +++ b/serve/filters/types.go @@ -20,16 +20,6 @@ type Events interface { CancelSubscription(events.SubscriptionID) } -// Storage represents the permanent storage abstraction -// required by the filter manager -type Storage interface { - // GetBlock fetches the block by its number - GetBlock(int64) (*types.Block, error) - - // GetTx fetches the tx using its hash - GetTx([]byte) (*types.TxResult, error) -} - // Filter interface is used for different filter types type Filter interface { // GetType returns the filter type diff --git a/serve/handlers/subs/subs_test.go b/serve/handlers/subs/subs_test.go index 44ef45dc..a62b573b 100644 --- a/serve/handlers/subs/subs_test.go +++ b/serve/handlers/subs/subs_test.go @@ -10,17 +10,18 @@ import ( "github.com/gnolang/gno/tm2/pkg/amino" "github.com/gnolang/gno/tm2/pkg/bft/types" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/gnolang/tx-indexer/events" + "github.com/gnolang/tx-indexer/internal/mock" "github.com/gnolang/tx-indexer/serve/conns" "github.com/gnolang/tx-indexer/serve/filters" "github.com/gnolang/tx-indexer/serve/filters/filter" - "github.com/gnolang/tx-indexer/serve/filters/mocks" "github.com/gnolang/tx-indexer/serve/filters/subscription" "github.com/gnolang/tx-indexer/serve/metadata" "github.com/gnolang/tx-indexer/serve/spec" indexerTypes "github.com/gnolang/tx-indexer/types" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) // generateBlocks generates dummy blocks @@ -63,8 +64,8 @@ func TestNewBlockFilter_Valid(t *testing.T) { fm := filters.NewFilterManager( context.Background(), - &mocks.MockStorage{}, - &mocks.MockEvents{ + &mock.Storage{}, + &mock.Events{ SubscribeFn: func(_ []events.Type) *events.Subscription { return &events.Subscription{} }, @@ -132,8 +133,8 @@ func TestUninstallFilter_Valid(t *testing.T) { fm := filters.NewFilterManager( context.Background(), - &mocks.MockStorage{}, - &mocks.MockEvents{ + &mock.Storage{}, + &mock.Events{ SubscribeFn: func(_ []events.Type) *events.Subscription { return &events.Subscription{} }, @@ -156,8 +157,8 @@ func TestUninstallFilter_Valid(t *testing.T) { fm := filters.NewFilterManager( context.Background(), - &mocks.MockStorage{}, - &mocks.MockEvents{ + &mock.Storage{}, + &mock.Events{ SubscribeFn: func(_ []events.Type) *events.Subscription { return &events.Subscription{} }, @@ -236,7 +237,7 @@ func TestGetFilterChanges_Valid(t *testing.T) { eventsCh = make(chan events.Event) - mockEvents = &mocks.MockEvents{ + mockEvents = &mock.Events{ SubscribeFn: func(_ []events.Type) *events.Subscription { return &events.Subscription{ ID: events.SubscriptionID(1), @@ -248,7 +249,7 @@ func TestGetFilterChanges_Valid(t *testing.T) { fm := filters.NewFilterManager( context.Background(), - &mocks.MockStorage{}, + &mock.Storage{}, mockEvents, ) @@ -358,7 +359,7 @@ func TestSubscribe_InvalidParams(t *testing.T) { getWSConnectionFn: func(wsID string) conns.WSConnection { require.Equal(t, id, wsID) - return &mocks.MockConn{} // connection found + return &mock.Conn{} // connection found }, } ) @@ -397,7 +398,7 @@ func TestSubscribe_Valid(t *testing.T) { WebSocketID: &connID, } - mockEvents = &mocks.MockEvents{ + mockEvents = &mock.Events{ SubscribeFn: func(_ []events.Type) *events.Subscription { return &events.Subscription{ ID: events.SubscriptionID(1), @@ -407,7 +408,7 @@ func TestSubscribe_Valid(t *testing.T) { } writtenData = make([]any, 0) - mockConn = &mocks.MockConn{ + mockConn = &mock.Conn{ WriteDataFn: func(data any) error { defer wg.Done() writtenData = append(writtenData, data) @@ -426,7 +427,7 @@ func TestSubscribe_Valid(t *testing.T) { fm := filters.NewFilterManager( context.Background(), - &mocks.MockStorage{}, + &mock.Storage{}, mockEvents, ) @@ -578,13 +579,13 @@ func TestUnsubscribe_Valid(t *testing.T) { WebSocketID: &connID, } - mockEvents = &mocks.MockEvents{ + mockEvents = &mock.Events{ SubscribeFn: func(_ []events.Type) *events.Subscription { return &events.Subscription{} }, } - mockConn = &mocks.MockConn{} + mockConn = &mock.Conn{} mockConnFetcher = &mockConnectionFetcher{ getWSConnectionFn: func(id string) conns.WSConnection { @@ -597,7 +598,7 @@ func TestUnsubscribe_Valid(t *testing.T) { fm := filters.NewFilterManager( context.Background(), - &mocks.MockStorage{}, + &mock.Storage{}, mockEvents, ) diff --git a/serve/jsonrpc.go b/serve/jsonrpc.go index 9a42f89f..31a32735 100644 --- a/serve/jsonrpc.go +++ b/serve/jsonrpc.go @@ -9,6 +9,13 @@ import ( "net/http" "time" + "github.com/go-chi/chi/v5" + "github.com/go-chi/chi/v5/middleware" + "github.com/google/uuid" + "github.com/olahol/melody" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + "github.com/gnolang/tx-indexer/serve/conns" "github.com/gnolang/tx-indexer/serve/conns/wsconn" "github.com/gnolang/tx-indexer/serve/filters" @@ -20,12 +27,7 @@ import ( "github.com/gnolang/tx-indexer/serve/writer" httpWriter "github.com/gnolang/tx-indexer/serve/writer/http" wsWriter "github.com/gnolang/tx-indexer/serve/writer/ws" - "github.com/go-chi/chi/v5" - "github.com/go-chi/chi/v5/middleware" - "github.com/google/uuid" - "github.com/olahol/melody" - "go.uber.org/zap" - "golang.org/x/sync/errgroup" + "github.com/gnolang/tx-indexer/storage" ) const ( @@ -188,7 +190,7 @@ func (j *JSONRPC) RegisterBlockEndpoints(db block.Storage) { ) } -func (j *JSONRPC) RegisterSubEndpoints(db filters.Storage) { +func (j *JSONRPC) RegisterSubEndpoints(db storage.Storage) { fm := filters.NewFilterManager(context.Background(), db, j.events) subsHandler := subs.NewHandler( diff --git a/storage/pebble.go b/storage/pebble.go new file mode 100644 index 00000000..451fec73 --- /dev/null +++ b/storage/pebble.go @@ -0,0 +1,131 @@ +package storage + +import ( + "errors" + "fmt" + + "github.com/cockroachdb/pebble" + "github.com/gnolang/gno/tm2/pkg/bft/types" + + storageErrors "github.com/gnolang/tx-indexer/storage/errors" +) + +var _ Storage = &Pebble{} + +// Storage is the instance of an embedded storage +type Pebble struct { + db *pebble.DB +} + +// New creates a new storage instance at the given path +func NewPebble(path string) (*Pebble, error) { + db, err := pebble.Open(path, &pebble.Options{ + // TODO: EventListener + // Start with defaults + }) + if err != nil { + return nil, fmt.Errorf("unable to create DB, %w", err) + } + + return &Pebble{ + db: db, + }, nil +} + +// GetLatestHeight fetches the latest saved height from storage +func (s *Pebble) GetLatestHeight() (int64, error) { + height, c, err := s.db.Get(latestHeightKey) + if errors.Is(err, pebble.ErrNotFound) { + return 0, storageErrors.ErrNotFound + } + if err != nil { + return 0, err + } + + defer c.Close() + + return decodeInt64(height), nil +} + +// GetBlock fetches the specified block from storage, if any +func (s *Pebble) GetBlock(blockNum int64) (*types.Block, error) { + block, c, err := s.db.Get(append(blockPrefix, encodeInt64(blockNum)...)) + if errors.Is(err, pebble.ErrNotFound) { + return nil, storageErrors.ErrNotFound + } + if err != nil { + return nil, err + } + + defer c.Close() + + return decodeBlock(block) +} + +// GetTx fetches the specified tx result from storage, if any +func (s *Pebble) GetTx(txHash []byte) (*types.TxResult, error) { + tx, c, err := s.db.Get(append(txResultKey, txHash...)) + if errors.Is(err, pebble.ErrNotFound) { + return nil, storageErrors.ErrNotFound + } + if err != nil { + return nil, err + } + + defer c.Close() + + return decodeTx(tx) +} + +func (s *Pebble) WriteBatch() Batch { + return &PebbleBatch{ + b: s.db.NewBatch(), + } +} + +func (s *Pebble) Close() error { + return s.db.Close() +} + +var _ Batch = &PebbleBatch{} + +type PebbleBatch struct { + b *pebble.Batch +} + +func (b *PebbleBatch) SetLatestHeight(h int64) error { + return b.b.Set(latestHeightKey, encodeInt64(h), pebble.NoSync) +} + +func (b *PebbleBatch) SetBlock(block *types.Block) error { + eb, err := encodeBlock(block) + if err != nil { + return err + } + + return b.b.Set( + append(blockPrefix, encodeInt64(block.Height)...), + eb, + pebble.NoSync, + ) +} + +func (b *PebbleBatch) SetTx(tx *types.TxResult) error { + encodedTx, err := encodeTx(tx) + if err != nil { + return err + } + return b.b.Set( + append(txResultKey, tx.Tx.Hash()...), + encodedTx, + pebble.NoSync, + ) +} +func (b *PebbleBatch) Commit() error { + return b.b.Commit(pebble.Sync) +} + +// Rollback closes the pebble batch without persisting any data. error output is always nil. +func (b *PebbleBatch) Rollback() error { + return b.b.Close() +} diff --git a/storage/pebble/pebble.go b/storage/pebble/pebble.go deleted file mode 100644 index c1156f6c..00000000 --- a/storage/pebble/pebble.go +++ /dev/null @@ -1,51 +0,0 @@ -package pebble - -import ( - "errors" - - "github.com/cockroachdb/pebble" - storageErrors "github.com/gnolang/tx-indexer/storage/errors" -) - -type Storage struct { - db *pebble.DB -} - -// NewDB initializes a new pebble DB instance at the given path -func NewDB(path string) (*Storage, error) { - db, err := pebble.Open(path, &pebble.Options{}) - if err != nil { - return nil, err - } - - return &Storage{ - db: db, - }, nil -} - -// Set stores a value for the given key -func (p *Storage) Set(key, value []byte) error { - return p.db.Set(key, value, pebble.Sync) -} - -// Get retrieves the value for a given key -func (p *Storage) Get(key []byte) ([]byte, error) { - value, closer, err := p.db.Get(key) - if errors.Is(err, pebble.ErrNotFound) { - // Wrap the error - return nil, storageErrors.ErrNotFound - } - - if err != nil { - return nil, err - } - - defer closer.Close() - - return value, nil -} - -// Close closes the database connection -func (p *Storage) Close() error { - return p.db.Close() -} diff --git a/storage/pebble/pebble_test.go b/storage/pebble/pebble_test.go deleted file mode 100644 index 26f54aff..00000000 --- a/storage/pebble/pebble_test.go +++ /dev/null @@ -1,76 +0,0 @@ -package pebble - -import ( - "crypto/rand" - "testing" - - storageErrors "github.com/gnolang/tx-indexer/storage/errors" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -// generateRandomBytes generates random bytes -func generateRandomBytes(t *testing.T, length int) []byte { - t.Helper() - - bytes := make([]byte, length) - _, err := rand.Read(bytes) - require.NoError(t, err) - - return bytes -} - -// generateRandomPairs generates random key value pairs -func generateRandomPairs(t *testing.T, count int) map[string][]byte { - t.Helper() - - pairs := make(map[string][]byte, count) - - for i := 0; i < count; i++ { - key := generateRandomBytes(t, 8) - value := generateRandomBytes(t, 32) - - pairs[string(key)] = value - } - - return pairs -} - -func TestPebble_GetMissingItem(t *testing.T) { - t.Parallel() - - // Initialize the pebble DB - store, err := NewDB(t.TempDir()) - require.NoError(t, err) - - defer store.Close() - - // Fetch a non-existent value - _, err = store.Get([]byte("non_existent_key")) - if !assert.ErrorIs(t, err, storageErrors.ErrNotFound) { - t.Errorf("Expected error not found when getting non-existent key") - } -} - -func TestPebble_WriteRead(t *testing.T) { - t.Parallel() - - // Initialize the pebble DB - store, err := NewDB(t.TempDir()) - require.NoError(t, err) - - defer store.Close() - - pairs := generateRandomPairs(t, 50) - - for key, value := range pairs { - // Set the key - require.NoError(t, store.Set([]byte(key), value)) - - // Get the value - retrievedValue, err := store.Get([]byte(key)) - require.NoError(t, err) - - assert.Equal(t, value, retrievedValue) - } -} diff --git a/storage/storage_test.go b/storage/pebble_test.go similarity index 83% rename from storage/storage_test.go rename to storage/pebble_test.go index 436106c7..5242d0a1 100644 --- a/storage/storage_test.go +++ b/storage/pebble_test.go @@ -7,15 +7,16 @@ import ( "github.com/gnolang/gno/tm2/pkg/amino" "github.com/gnolang/gno/tm2/pkg/bft/types" "github.com/gnolang/gno/tm2/pkg/std" - storageErrors "github.com/gnolang/tx-indexer/storage/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + storageErrors "github.com/gnolang/tx-indexer/storage/errors" ) func TestStorage_New(t *testing.T) { t.Parallel() - s, err := New(t.TempDir()) + s, err := NewPebble(t.TempDir()) require.NotNil(t, s) assert.NoError(t, err) @@ -25,7 +26,7 @@ func TestStorage_New(t *testing.T) { func TestStorage_LatestHeight(t *testing.T) { t.Parallel() - s, err := New(t.TempDir()) + s, err := NewPebble(t.TempDir()) require.NoError(t, err) defer func() { @@ -39,7 +40,10 @@ func TestStorage_LatestHeight(t *testing.T) { // Save the latest height and grab it for i := int64(0); i < 100; i++ { - require.NoError(t, s.SaveLatestHeight(i)) + b := s.WriteBatch() + + require.NoError(t, b.SetLatestHeight(i)) + require.NoError(t, b.Commit()) latest, err = s.GetLatestHeight() @@ -51,7 +55,7 @@ func TestStorage_LatestHeight(t *testing.T) { func TestStorage_Block(t *testing.T) { t.Parallel() - s, err := New(t.TempDir()) + s, err := NewPebble(t.TempDir()) require.NoError(t, err) defer func() { @@ -61,12 +65,16 @@ func TestStorage_Block(t *testing.T) { blocks := generateRandomBlocks(t, 100) // Save the blocks and fetch them + b := s.WriteBatch() for _, block := range blocks { - assert.NoError(t, s.SaveBlock(block)) + assert.NoError(t, b.SetBlock(block)) + } + + require.NoError(t, b.Commit()) + for _, block := range blocks { savedBlock, err := s.GetBlock(block.Height) require.NoError(t, err) - assert.Equal(t, block, savedBlock) } } @@ -74,7 +82,7 @@ func TestStorage_Block(t *testing.T) { func TestStorage_Tx(t *testing.T) { t.Parallel() - s, err := New(t.TempDir()) + s, err := NewPebble(t.TempDir()) require.NoError(t, err) defer func() { @@ -83,13 +91,18 @@ func TestStorage_Tx(t *testing.T) { txs := generateRandomTxs(t, 100) + wb := s.WriteBatch() + // Save the txs and fetch them for _, tx := range txs { - assert.NoError(t, s.SaveTx(tx)) + assert.NoError(t, wb.SetTx(tx)) + } + require.NoError(t, wb.Commit()) + + for _, tx := range txs { savedTx, err := s.GetTx(tx.Tx.Hash()) require.NoError(t, err) - assert.Equal(t, tx, savedTx) } } diff --git a/storage/storage.go b/storage/storage.go deleted file mode 100644 index 6a4acac4..00000000 --- a/storage/storage.go +++ /dev/null @@ -1,90 +0,0 @@ -package storage - -import ( - "fmt" - - "github.com/gnolang/gno/tm2/pkg/bft/types" - "github.com/gnolang/tx-indexer/storage/pebble" -) - -// Storage is the instance of an embedded storage -type Storage struct { - db *pebble.Storage -} - -// New creates a new storage instance at the given path -func New(path string) (*Storage, error) { - db, err := pebble.NewDB(path) - if err != nil { - return nil, fmt.Errorf("unable to create DB, %w", err) - } - - return &Storage{ - db: db, - }, nil -} - -func (s *Storage) Close() error { - return s.db.Close() -} - -// GetLatestHeight fetches the latest saved height from storage -func (s *Storage) GetLatestHeight() (int64, error) { - height, err := s.db.Get(latestHeightKey) - if err != nil { - return 0, err - } - - return decodeInt64(height), nil -} - -// SaveLatestHeight saves the latest height to storage -func (s *Storage) SaveLatestHeight(height int64) error { - return s.db.Set(latestHeightKey, encodeInt64(height)) -} - -// GetBlock fetches the specified block from storage, if any -func (s *Storage) GetBlock(blockNum int64) (*types.Block, error) { - block, err := s.db.Get(append(blockPrefix, encodeInt64(blockNum)...)) - if err != nil { - return nil, err - } - - return decodeBlock(block) -} - -// SaveBlock saves the specified block to storage -func (s *Storage) SaveBlock(block *types.Block) error { - encodedBlock, err := encodeBlock(block) - if err != nil { - return err - } - - return s.db.Set( - append(blockPrefix, encodeInt64(block.Height)...), - encodedBlock, - ) -} - -// GetTx fetches the specified tx result from storage, if any -func (s *Storage) GetTx(txHash []byte) (*types.TxResult, error) { - tx, err := s.db.Get(append(txResultKey, txHash...)) - if err != nil { - return nil, err - } - - return decodeTx(tx) -} - -// SaveTx saves the specified tx result to storage -func (s *Storage) SaveTx(tx *types.TxResult) error { - encodedTx, err := encodeTx(tx) - if err != nil { - return err - } - - return s.db.Set( - append(txResultKey, tx.Tx.Hash()...), - encodedTx, - ) -} diff --git a/storage/types.go b/storage/types.go new file mode 100644 index 00000000..b5c0a272 --- /dev/null +++ b/storage/types.go @@ -0,0 +1,52 @@ +package storage + +import ( + "io" + + "github.com/gnolang/gno/tm2/pkg/bft/types" +) + +// Storage represents the permanent storage abstraction +// for reading and writting operations +type Storage interface { + io.Closer + StorageRead + StorageWrite +} + +// StorageRead defines the transaction storage interface for read methods +type StorageRead interface { + io.Closer + // GetLatestHeight returns the latest block height from the storage + GetLatestHeight() (int64, error) + + // GetBlock fetches the block by its number + GetBlock(int64) (*types.Block, error) + + // GetTx fetches the tx using its hash + GetTx([]byte) (*types.TxResult, error) +} + +// StorageWrite defines the transaction storage interface for write methods +type StorageWrite interface { + io.Closer + // WriteBatch provides a batch intended to do a write action that + // can be cancelled or commited all at the same time + WriteBatch() Batch +} + +type Batch interface { + // SetLatestHeight saves the latest block height to the storage + SetLatestHeight(int64) error + // SetBlock saves the block to the permanent storage + SetBlock(block *types.Block) error + // SetTx saves the transaction to the permanent storage + SetTx(tx *types.TxResult) error + + // Commit stores all the provided info on the storage and make + // it available for other storage readers + Commit() error + + // Rollback rollbacks the operation not persisting the provided changes + Rollback() error +}