diff --git a/rollup/l1/reader.go b/rollup/l1/reader.go index 8cc294801fdf..566d0fc9717c 100644 --- a/rollup/l1/reader.go +++ b/rollup/l1/reader.go @@ -26,7 +26,7 @@ const ( type Reader struct { ctx context.Context config Config - client EthClient + client Client filterer *L1MessageQueueFilterer scrollChainABI *abi.ABI @@ -42,7 +42,7 @@ type Config struct { } // NewReader initializes a new Reader instance -func NewReader(ctx context.Context, config Config, l1Client EthClient) (*Reader, error) { +func NewReader(ctx context.Context, config Config, l1Client Client) (*Reader, error) { if config.ScrollChainAddress == (common.Address{}) { return nil, errors.New("must pass non-zero scrollChainAddress to L1Client") } diff --git a/rollup/l1/tracker.go b/rollup/l1/tracker.go index cdd6c517db38..d7fe262d1750 100644 --- a/rollup/l1/tracker.go +++ b/rollup/l1/tracker.go @@ -4,12 +4,14 @@ import ( "context" "fmt" "math/big" - "sort" + "slices" "sync" "time" + "github.com/pkg/errors" + + "github.com/scroll-tech/go-ethereum/common" "github.com/scroll-tech/go-ethereum/core/types" - "github.com/scroll-tech/go-ethereum/event" "github.com/scroll-tech/go-ethereum/log" "github.com/scroll-tech/go-ethereum/rpc" ) @@ -18,10 +20,15 @@ type Tracker struct { ctx context.Context cancel context.CancelFunc - client EthClient - lastSyncedHeader *types.Header - subList headerSubList - scope event.SubscriptionScope + client Client + + headers *common.ShrinkingMap[uint64, *types.Header] + lastSafeHeader *types.Header + lastFinalizedHeader *types.Header + + subscriptionCounter int // used to assign unique IDs to subscriptions + subscriptions map[ConfirmationRule][]*subscription // sorted by confirmationRule ascending + mu sync.RWMutex } const ( @@ -29,70 +36,17 @@ const ( defaultSyncInterval = 12 * time.Second ) -func NewTracker(ctx context.Context, l1Client EthClient) (*Tracker, error) { +func NewTracker(ctx context.Context, l1Client Client) *Tracker { ctx, cancel := context.WithCancel(ctx) l1Tracker := &Tracker{ - ctx: ctx, - cancel: cancel, - - client: l1Client, - } - l1Tracker.Start() - return l1Tracker, nil -} - -func (t *Tracker) headerByDepth(depth ChainDepth, latestNumber *big.Int) (*types.Header, error) { - var blockNumber *big.Int - switch depth { - case LatestBlock: - blockNumber = big.NewInt(int64(rpc.LatestBlockNumber)) - case SafeBlock: - blockNumber = big.NewInt(int64(rpc.SafeBlockNumber)) - case FinalizedBlock: - blockNumber = big.NewInt(int64(rpc.FinalizedBlockNumber)) - default: - blockNumber = big.NewInt(0).Sub(latestNumber, big.NewInt(int64(depth))) - } - header, err := t.client.HeaderByNumber(t.ctx, blockNumber) - if err != nil { - return nil, err + ctx: ctx, + cancel: cancel, + client: l1Client, + headers: common.NewShrinkingMap[uint64, *types.Header](1000), + subscriptions: make(map[ConfirmationRule][]*subscription), } - return header, nil -} -func (t *Tracker) newHead(header *types.Header) { - t.lastSyncedHeader = header - t.subList.stopSending() - t.subList.sendNewHeads(t.headerByDepth, header) -} - -func (t *Tracker) syncLatestHead() error { - header, err := t.client.HeaderByNumber(t.ctx, big.NewInt(int64(rpc.LatestBlockNumber))) - if err != nil { - return err - } - if !header.Number.IsInt64() { - return fmt.Errorf("received unexpected block number in L1Client: %v", header.Number) - } - // sync is continuous - if t.lastSyncedHeader != nil || header.ParentHash != t.lastSyncedHeader.Hash() { - t.newHead(header) - } else { // reorg happened or some blocks were not synced - // todo: clear cache - t.newHead(header) - } - return nil -} - -func (t *Tracker) Subscribe(channel HeaderChan, depth ChainDepth) event.Subscription { - sub := &headerSub{ - list: &t.subList, - depth: depth, - channel: channel, - err: make(chan error, 1), - } - t.subList.add(sub) - return t.scope.Track(sub) + return l1Tracker } func (t *Tracker) Start() { @@ -120,102 +74,255 @@ func (t *Tracker) Start() { }() } -func (t *Tracker) Stop() { - log.Info("stopping Tracker") - t.cancel() - t.scope.Close() - log.Info("Tracker stopped") -} +func (t *Tracker) syncLatestHead() error { + newHeader, err := t.client.HeaderByNumber(t.ctx, big.NewInt(int64(rpc.LatestBlockNumber))) + if err != nil { + return errors.Wrapf(err, "failed to get latest header") + } + if !newHeader.Number.IsUint64() { + return fmt.Errorf("received unexpected block number in L1Client: %v", newHeader.Number) + } + + storedHeader, exists := t.headers.Get(newHeader.Number.Uint64()) + if exists { + // We already processed the header, nothing to do. + if storedHeader.Hash() == newHeader.Hash() { + return nil + } + + // Since we already processed a header at this height with different hash this means a L1 reorg happened. + // TODO: reset cache. + + // Notify all subscribers to new LatestChainHead at their respective confirmation depth. + err = t.notifyLatest(newHeader, true) + if err != nil { + return errors.Wrapf(err, "failed to notify subscribers of new latest header") + } + + return nil + } + + // Notify all subscribers to new LatestChainHead at their respective confirmation depth. + err = t.notifyLatest(newHeader, false) + if err != nil { + return errors.Wrapf(err, "failed to notify subscribers of new latest header") + } -type headerSubList struct { - mu sync.Mutex - list []*headerSub + return nil } -func (l *headerSubList) add(sub *headerSub) { - l.mu.Lock() - l.list = append(l.list, sub) - sort.Slice(l.list, func(i, j int) bool { - return l.list[i].depth < l.list[j].depth - }) - l.mu.Unlock() +func (t *Tracker) notifyLatest(newHeader *types.Header, reorg bool) error { + // TODO: add mutex for headers, lastSafeHeader, lastFinalizedHeader + t.headers.Set(newHeader.Number.Uint64(), newHeader) + + t.mu.RLock() + defer t.mu.RUnlock() + + for _, sub := range t.subscriptions[LatestChainHead] { + // Ignore subscriptions with deeper ConfirmationRule than the new block. + if newHeader.Number.Uint64() < uint64(sub.confirmationRule) { + continue + } + + // 1 confirmation == latest block + // 2 confirmations == latest block - 1 + // ... + // n confirmations == latest block - (n-1) + depth := uint64(sub.confirmationRule - 1) + headerToNotifyNumber := newHeader.Number.Uint64() - depth + headerToNotify, exists := t.headers.Get(headerToNotifyNumber) + if !exists { + return fmt.Errorf("header %d not found for confirmationRule %d", headerToNotifyNumber, sub.confirmationRule) + } + + if reorg && sub.lastSentHeader != nil { + // The subscriber is subscribed to a deeper ConfirmationRule than the reorg depth -> this reorg doesn't affect the subscriber. + // Since the subscribers are sorted by ConfirmationRule, we can return here. + if sub.lastSentHeader.Number.Uint64() < headerToNotify.Number.Uint64() { + return nil + } + + // We already sent this header to the subscriber. This shouldn't happen here since we're handling a reorg and + // by definition the last sent header should be different from the header we're notifying about if the header number is the same. + if sub.lastSentHeader.Hash() == headerToNotify.Hash() { + continue + } + } + + sub.callback(sub.lastSentHeader, headerToNotify, reorg) + sub.lastSentHeader = headerToNotify + } + + return nil } -func (l *headerSubList) remove(sub *headerSub) { - l.mu.Lock() - index := -1 - for i, subl := range l.list { - if subl == sub { - index = i - break +func (t *Tracker) syncSafeHead() error { + newHeader, err := t.client.HeaderByNumber(t.ctx, big.NewInt(int64(rpc.SafeBlockNumber))) + if err != nil { + return errors.Wrapf(err, "failed to get latest safe header") + } + if !newHeader.Number.IsUint64() { + return fmt.Errorf("received unexpected block number in L1Client: %v", newHeader.Number) + } + + if t.lastSafeHeader != nil { + // We already saw this header, nothing to do. + if t.lastSafeHeader.Hash() == newHeader.Hash() { + return nil + } + + // This means there was a L1 reorg and the safe block changed. While this is possible, it should be very rare. + if t.lastSafeHeader.Number.Uint64() <= newHeader.Number.Uint64() { + t.notifySafeHead(newHeader, true) + return nil } } - if index != -1 { - l.list = append(l.list[:index], l.list[index+1:]...) + + // Notify all subscribers to new SafeChainHead. + t.notifySafeHead(newHeader, false) + + return nil +} + +func (t *Tracker) notifySafeHead(newHeader *types.Header, reorg bool) { + t.lastSafeHeader = newHeader + + t.mu.RLock() + defer t.mu.RUnlock() + + for _, sub := range t.subscriptions[SafeChainHead] { + sub.callback(sub.lastSentHeader, newHeader, reorg) + sub.lastSentHeader = newHeader } - l.mu.Unlock() } -func (l *headerSubList) sendNewHeads(fetchHeaderFunc func(ChainDepth, *big.Int) (*types.Header, error), header *types.Header) { - l.mu.Lock() - for _, sub := range l.list { - sub.value, _ = fetchHeaderFunc(sub.depth, header.Number) - } - for _, sub := range l.list { - if sub.value != nil { - stopChan := make(chan bool) - sub.stopChan = stopChan - // start new goroutine to send new head - // if subscriber will not read from channel until next update, this goroutine will be stopped and new started - go func(channel HeaderChan, value *types.Header) { - select { - case channel <- value: - case <-stopChan: - } - }(sub.channel, sub.value) +func (t *Tracker) syncFinalizedHead() error { + newHeader, err := t.client.HeaderByNumber(t.ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))) + if err != nil { + return errors.Wrapf(err, "failed to get latest finalized header") + } + if !newHeader.Number.IsUint64() { + return fmt.Errorf("received unexpected block number in L1Client: %v", newHeader.Number) + } + + if t.lastFinalizedHeader != nil { + // We already saw this header, nothing to do. + if t.lastFinalizedHeader.Hash() == newHeader.Hash() { + return nil + } + + // This means the finalized block changed as read from L1. The Ethereum protocol guarantees that this can never + // happen. Must be some issue with the RPC node. + if t.lastFinalizedHeader.Number.Uint64() <= newHeader.Number.Uint64() { + log.Crit("RPC node faulty: finalized block number decreased", "old", t.lastFinalizedHeader.Number, "new", newHeader.Number, "old hash", t.lastFinalizedHeader.Hash(), "new hash", newHeader.Hash()) } } - l.mu.Unlock() + + t.notifyFinalizedHead(newHeader) + + // TODO: prune old headers + + return nil } -func (l *headerSubList) stopSending() { - l.mu.Lock() - for _, sub := range l.list { - if sub.stopChan != nil { - close(sub.stopChan) - sub.stopChan = nil +func (t *Tracker) notifyFinalizedHead(newHeader *types.Header) { + t.lastFinalizedHeader = newHeader + + t.mu.RLock() + defer t.mu.RUnlock() + + // Notify all subscribers to new FinalizedChainHead. + for _, sub := range t.subscriptions[FinalizedChainHead] { + sub.callback(sub.lastSentHeader, newHeader, false) + sub.lastSentHeader = newHeader + } +} + +func (t *Tracker) Subscribe(confirmationRule ConfirmationRule, callback SubscriptionCallback) (unsubscribe func()) { + t.mu.Lock() + defer t.mu.Unlock() + + // Validate ConfirmationRule configuration. Invalid rules will cause a panic as it is a programming error. + var confirmationType ConfirmationRule + switch { + case confirmationRule == FinalizedChainHead: + confirmationType = FinalizedChainHead + case confirmationRule == SafeChainHead: + confirmationType = SafeChainHead + case confirmationRule >= LatestChainHead && confirmationRule <= maxConfirmationRule: + confirmationType = LatestChainHead + default: + log.Crit("invalid confirmation rule", "confirmationRule", confirmationRule) + } + + sub := newSubscription(t.subscriptionCounter, confirmationRule, callback) + + subscriptionsByType := t.subscriptions[confirmationType] + subscriptionsByType = append(subscriptionsByType, sub) + + slices.SortFunc(subscriptionsByType, func(a *subscription, b *subscription) int { + if a.confirmationRule > b.confirmationRule { + return 1 + } else if a.confirmationRule < b.confirmationRule { + return -1 + } else { + // IDs are unique and monotonically increasing, therefore there is always a clear order. + if a.id > b.id { + return 1 + } else { + return -1 + } + } + }) + + t.subscriptions[confirmationType] = subscriptionsByType + t.subscriptionCounter++ + + return func() { + t.mu.Lock() + defer t.mu.Unlock() + + for i, s := range t.subscriptions[sub.confirmationRule] { + if s.id == sub.id { + subscriptionsByType = append(subscriptionsByType[:i], subscriptionsByType[i+1:]...) + break + } } + t.subscriptions[confirmationRule] = subscriptionsByType } - l.mu.Unlock() } -type headerSub struct { - list *headerSubList - depth ChainDepth - channel HeaderChan - errOnce sync.Once - err chan error - value *types.Header // value to send next time - stopChan chan bool // channel used to stop existing goroutine sending new header +func (t *Tracker) Stop() { + log.Info("stopping Tracker") + t.cancel() + log.Info("Tracker stopped") } -func (sub *headerSub) Unsubscribe() { - sub.errOnce.Do(func() { - sub.list.remove(sub) - close(sub.err) - }) +type subscription struct { + id int + confirmationRule ConfirmationRule + callback SubscriptionCallback + lastSentHeader *types.Header } -func (sub *headerSub) Err() <-chan error { - return sub.err +func newSubscription(id int, confirmationRule ConfirmationRule, callback SubscriptionCallback) *subscription { + return &subscription{ + id: id, + confirmationRule: confirmationRule, + callback: callback, + } } -type ChainDepth int64 +type ConfirmationRule int8 + +// maxConfirmationRule is the maximum number of confirmations we can subscribe to. +// This is equal to the best case scenario where Ethereum L1 is finalizing 2 epochs in the past (64 blocks). +const maxConfirmationRule = ConfirmationRule(64) const ( - SafeBlock = ChainDepth(-3) - FinalizedBlock = ChainDepth(-2) - LatestBlock = ChainDepth(-1) + FinalizedChainHead = ConfirmationRule(-2) + SafeChainHead = ConfirmationRule(-1) + LatestChainHead = ConfirmationRule(1) ) -type HeaderChan chan *types.Header +type SubscriptionCallback func(last, new *types.Header, reorg bool) diff --git a/rollup/l1/tracker_test.go b/rollup/l1/tracker_test.go new file mode 100644 index 000000000000..2e5e18ed5793 --- /dev/null +++ b/rollup/l1/tracker_test.go @@ -0,0 +1,144 @@ +package l1 + +import ( + "context" + "fmt" + "math/big" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/scroll-tech/go-ethereum" + "github.com/scroll-tech/go-ethereum/common" + "github.com/scroll-tech/go-ethereum/consensus/ethash" + "github.com/scroll-tech/go-ethereum/core" + "github.com/scroll-tech/go-ethereum/core/types" + "github.com/scroll-tech/go-ethereum/params" + "github.com/scroll-tech/go-ethereum/rpc" +) + +type mockETHClient struct { + chain []*types.Block + chainHeads map[rpc.BlockNumber]*types.Block +} + +func newMockETHClient() *mockETHClient { + genesis := &core.Genesis{ + Config: params.TestChainConfig, + } + _, chain, _ := core.GenerateChainWithGenesis(genesis, ethash.NewFaker(), 100, func(i int, gen *core.BlockGen) {}) + + return &mockETHClient{ + chain: chain, + chainHeads: map[rpc.BlockNumber]*types.Block{ + rpc.LatestBlockNumber: chain[0], + rpc.FinalizedBlockNumber: chain[0], + rpc.SafeBlockNumber: chain[0], + }, + } +} + +func (m mockETHClient) setLatestBlock(blockNum int) { + m.chainHeads[rpc.LatestBlockNumber] = m.chain[blockNum-1] +} +func (m mockETHClient) setFinalizedBlock(blockNum int) { + m.chainHeads[rpc.FinalizedBlockNumber] = m.chain[blockNum-1] +} +func (m mockETHClient) setSafeBlock(blockNum int) { + m.chainHeads[rpc.SafeBlockNumber] = m.chain[blockNum-1] +} + +func (m *mockETHClient) createFork() { + +} + +func (m mockETHClient) BlockNumber(ctx context.Context) (uint64, error) { + //TODO implement me + panic("implement me") +} + +func (m mockETHClient) ChainID(ctx context.Context) (*big.Int, error) { + //TODO implement me + panic("implement me") +} + +func (m mockETHClient) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) { + //TODO implement me + panic("implement me") +} + +func (m mockETHClient) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { + if block, ok := m.chainHeads[rpc.BlockNumber(number.Int64())]; ok { + return block.Header(), nil + } + + if number.Uint64() >= uint64(len(m.chain)) { + return nil, fmt.Errorf("block %d not found", number) + } + + return m.chain[number.Uint64()].Header(), nil +} + +func (m mockETHClient) SubscribeFilterLogs(ctx context.Context, query ethereum.FilterQuery, ch chan<- types.Log) (ethereum.Subscription, error) { + //TODO implement me + panic("implement me") +} + +func (m mockETHClient) TransactionByHash(ctx context.Context, txHash common.Hash) (tx *types.Transaction, isPending bool, err error) { + //TODO implement me + panic("implement me") +} + +func (m mockETHClient) BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error) { + //TODO implement me + panic("implement me") +} + +func TestTracker(t *testing.T) { + client := newMockETHClient() + tracker := NewTracker(context.Background(), client) + + tracker.Subscribe(LatestChainHead, func(last, new *types.Header, reorg bool) { + fmt.Println("sub 1: new block", new.Number, reorg) + }) + + tracker.Subscribe(3, func(last, new *types.Header, reorg bool) { + fmt.Println("sub 2: new block", new.Number, reorg) + }) + + tracker.Subscribe(FinalizedChainHead, func(last, new *types.Header, reorg bool) { + fmt.Println("sub 3 (finalized): new block", new.Number, reorg) + }) + + err := tracker.syncLatestHead() + require.NoError(t, err) + + fmt.Println("----------------------------------") + client.setLatestBlock(2) + + err = tracker.syncLatestHead() + require.NoError(t, err) + + fmt.Println("----------------------------------") + client.setLatestBlock(3) + + err = tracker.syncLatestHead() + require.NoError(t, err) + + fmt.Println("----------------------------------") + client.setFinalizedBlock(1) + + err = tracker.syncFinalizedHead() + require.NoError(t, err) + + // TODO: + // - test invalid confirmation rules + // - test reorg + // - test multiple subscribers with same confirmation rules and reorg + // - test multiple subscribers with different confirmation rules and reorg + // - test finalized, safe + // - test finalized panic if reorg, safe reorg + // - test pruning of headers when finalized header arrives + // - test unsubscribe + // - test running with Start and RPC errors -> recovering automatically +} diff --git a/rollup/l1/types.go b/rollup/l1/types.go index 337a4c0253b3..12d2850f76be 100644 --- a/rollup/l1/types.go +++ b/rollup/l1/types.go @@ -9,9 +9,7 @@ import ( "github.com/scroll-tech/go-ethereum/core/types" ) -// We cannot use ethclient.Client directly as that would lead -// to circular dependency between eth, rollup, and ethclient. -type EthClient interface { +type Client interface { BlockNumber(ctx context.Context) (uint64, error) ChainID(ctx context.Context) (*big.Int, error) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error)