Skip to content

Commit

Permalink
fix(storage): Refactor improving performance and atomicity.
Browse files Browse the repository at this point in the history
Overview:
This commit overhauls the Storage layer to set the foundation for a high-performance filtering system based on bitmap indexing. The refactoring introduces a more efficient data handling model and resolves critical issues that affected stability and performance.

Key Changes:
- Introduced the concept of 'WriteBatch' in the Storage interface to support efficient data operations.
- Removed the DB interface to eliminate excessive abstraction, making difficult the implementation of database-specific optimizations.
- Fixed the handling of byte arrays in Pebble's closer method to prevent data corruption. The Storage layer now ensures that data is parsed correctly before the Get operation is closed, eliminating the risk of random failures.
- Implemented batching for data writting from remote nodes, enhancing data consistency and speed.
- Restructured Mock objects into an internal package, restricting access to internals and eliminating the need for specific build tags.

Signed-off-by: Antonio Navarro Perez <[email protected]>
  • Loading branch information
ajnavarro committed Feb 1, 2024
1 parent 9fef618 commit 81c1266
Show file tree
Hide file tree
Showing 23 changed files with 530 additions and 513 deletions.
2 changes: 0 additions & 2 deletions .github/golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ run:
modules-download-mode: readonly
allow-parallel-runners: false
go: ""
build-tags:
- testmocks
skip-dirs:
- serve/filters/mocks

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
uses: actions/checkout@v4

- name: Go test
run: go test -tags "testmocks" -shuffle=on -coverprofile coverage.out -timeout 5m ./...
run: go test -shuffle=on -coverprofile coverage.out -timeout 5m ./...

test-with-race:
runs-on: ubuntu-latest
Expand All @@ -29,4 +29,4 @@ jobs:
uses: actions/checkout@v4

- name: Go race test
run: go test -tags "testmocks" -race -shuffle=on -timeout 5m ./...
run: go test -race -shuffle=on -timeout 5m ./...
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ fixalign:
.PHONY: test
test:
go clean -testcache
go test -v -tags "testmocks" ./...
go test -v ./...
9 changes: 5 additions & 4 deletions cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ import (
"flag"
"fmt"

"github.com/peterbourgon/ff/v3/ffcli"
"go.uber.org/zap"

"github.com/gnolang/tx-indexer/client"
"github.com/gnolang/tx-indexer/events"
"github.com/gnolang/tx-indexer/fetch"
"github.com/gnolang/tx-indexer/serve"
"github.com/gnolang/tx-indexer/storage"
"github.com/peterbourgon/ff/v3/ffcli"
"go.uber.org/zap"
)

const (
Expand Down Expand Up @@ -112,7 +113,7 @@ func (c *startCfg) exec(ctx context.Context) error {
}

// Create a DB instance
db, err := storage.New(c.dbPath)
db, err := storage.NewPebble(c.dbPath)
if err != nil {
return fmt.Errorf("unable to open storage DB, %w", err)
}
Expand Down Expand Up @@ -165,7 +166,7 @@ func (c *startCfg) exec(ctx context.Context) error {
// setupJSONRPC sets up the JSONRPC instance
func setupJSONRPC(
listenAddress string,
db *storage.Storage,
db *storage.Pebble,
em *events.Manager,
logger *zap.Logger,
) *serve.JSONRPC {
Expand Down
29 changes: 19 additions & 10 deletions fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import (
"sort"
"time"

storageErrors "github.com/gnolang/tx-indexer/storage/errors"
"github.com/gnolang/tx-indexer/types"
queue "github.com/madz-lab/insertion-queue"
"go.uber.org/zap"

"github.com/gnolang/tx-indexer/storage"
storageErrors "github.com/gnolang/tx-indexer/storage/errors"
"github.com/gnolang/tx-indexer/types"
)

const (
Expand All @@ -22,7 +24,7 @@ const (
// Fetcher is an instance of the block indexer
// fetcher
type Fetcher struct {
storage Storage
storage storage.Storage
client Client
events Events

Expand All @@ -38,7 +40,7 @@ type Fetcher struct {
// New creates a new data fetcher instance
// that gets blockchain data from a remote chain
func New(
storage Storage,
storage storage.Storage,
client Client,
events Events,
opts ...Option,
Expand Down Expand Up @@ -176,9 +178,11 @@ func (f *Fetcher) FetchChainData(ctx context.Context) error {
// Pop the next chunk
f.chunkBuffer.PopFront()

wb := f.storage.WriteBatch()

// Save the fetched data
for blockIndex, block := range item.chunk.blocks {
if saveErr := f.storage.SaveBlock(block); saveErr != nil {
if saveErr := wb.SetBlock(block); saveErr != nil {
// This is a design choice that really highlights the strain
// of keeping legacy testnets running. Current TM2 testnets
// have blocks / transactions that are no longer compatible
Expand All @@ -189,21 +193,21 @@ func (f *Fetcher) FetchChainData(ctx context.Context) error {
continue
}

f.logger.Debug("Saved block data", zap.Int64("number", block.Height))
f.logger.Debug("Added block data to batch", zap.Int64("number", block.Height))

// Get block results
txResults := item.chunk.results[blockIndex]

// Save the fetched transaction results
for _, txResult := range txResults {
if err := f.storage.SaveTx(txResult); err != nil {
if err := wb.SetTx(txResult); err != nil {
f.logger.Error("unable to save tx", zap.String("err", err.Error()))

continue
}

f.logger.Debug(
"Saved tx",
"Added tx to batch",
zap.String("hash", base64.StdEncoding.EncodeToString(txResult.Tx.Hash())),
)
}
Expand All @@ -218,15 +222,20 @@ func (f *Fetcher) FetchChainData(ctx context.Context) error {
}

f.logger.Info(
"Saved block and tx data for range",
"Added to batch block and tx data for range",
zap.Int64("from", item.chunkRange.from),
zap.Int64("to", item.chunkRange.to),
)

// Save the latest height data
if err := f.storage.SaveLatestHeight(item.chunkRange.to); err != nil {
if err := wb.SetLatestHeight(item.chunkRange.to); err != nil {
defer wb.Rollback()

Check failure on line 232 in fetch/fetch.go

View workflow job for this annotation

GitHub Actions / Go Linter / lint

deferInLoop: Possible resource leak, 'defer' is called in the 'for' loop (gocritic)

Check failure on line 232 in fetch/fetch.go

View workflow job for this annotation

GitHub Actions / Go Linter / lint

Error return value of `wb.Rollback` is not checked (errcheck)
return fmt.Errorf("unable to save latest height info, %w", err)

Check failure on line 233 in fetch/fetch.go

View workflow job for this annotation

GitHub Actions / Go Linter / lint

return with no blank line before (nlreturn)
}

if err := wb.Commit(); err != nil {
return fmt.Errorf("error persisting block information into storage, %w", err)
}
}
}
}
Expand Down
Loading

0 comments on commit 81c1266

Please sign in to comment.