diff --git a/core/tx_pool.go b/core/tx_pool.go index e7d5062fd523..28943e009629 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -280,7 +280,6 @@ type TxPool struct { queueTxEventCh chan *types.Transaction reorgDoneCh chan chan struct{} reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop - reorgPauseCh chan bool // requests to pause scheduleReorgLoop realTxActivityShutdownCh chan struct{} wg sync.WaitGroup // tracks loop, scheduleReorgLoop initDoneCh chan struct{} // is closed once the pool is initialized (for tests) @@ -317,7 +316,6 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block reorgDoneCh: make(chan chan struct{}), reorgShutdownCh: make(chan struct{}), realTxActivityShutdownCh: make(chan struct{}), - reorgPauseCh: make(chan bool), initDoneCh: make(chan struct{}), gasPrice: new(big.Int).SetUint64(config.PriceLimit), } @@ -1229,14 +1227,13 @@ func (pool *TxPool) scheduleReorgLoop() { curDone chan struct{} // non-nil while runReorg is active nextDone = make(chan struct{}) launchNextRun bool - reorgsPaused bool reset *txpoolResetRequest dirtyAccounts *accountSet queuedEvents = make(map[common.Address]*txSortedMap) ) for { // Launch next background reorg if needed - if curDone == nil && launchNextRun && !reorgsPaused { + if curDone == nil && launchNextRun { // Run the background reorg and announcements go pool.runReorg(nextDone, reset, dirtyAccounts, queuedEvents) @@ -1288,7 +1285,6 @@ func (pool *TxPool) scheduleReorgLoop() { } close(nextDone) return - case reorgsPaused = <-pool.reorgPauseCh: } } } @@ -1308,9 +1304,10 @@ func (pool *TxPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirt promoteAddrs = dirtyAccounts.flatten() } pool.mu.Lock() + var affectedAccounts map[common.Address]bool if reset != nil { // Reset from the old head to the new, rescheduling any reorged transactions - pool.reset(reset.oldHead, reset.newHead) + affectedAccounts = pool.reset(reset.oldHead, reset.newHead) // Nonces were reset, discard any events that became stale for addr := range events { @@ -1332,7 +1329,7 @@ func (pool *TxPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirt // remove any transaction that has been included in the block or was invalidated // because of another transaction (e.g. higher gas price). if reset != nil { - pool.demoteUnexecutables() + pool.demoteUnexecutables(affectedAccounts) if reset.newHead != nil && pool.chainconfig.IsCurie(new(big.Int).Add(reset.newHead.Number, big.NewInt(1))) { l1BaseFee := fees.GetL1BaseFee(pool.currentState) pendingBaseFee := misc.CalcBaseFee(pool.chainconfig, reset.newHead, l1BaseFee) @@ -1380,9 +1377,18 @@ func (pool *TxPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirt // reset retrieves the current state of the blockchain and ensures the content // of the transaction pool is valid with regard to the chain state. -func (pool *TxPool) reset(oldHead, newHead *types.Header) { +func (pool *TxPool) reset(oldHead, newHead *types.Header) map[common.Address]bool { // If we're reorging an old state, reinject all dropped transactions var reinject types.Transactions + affectedAccounts := make(map[common.Address]bool) + collectAffectedAccounts := func(txs types.Transactions) { + if affectedAccounts != nil { + for _, tx := range txs { + addr, _ := types.Sender(pool.signer, tx) + affectedAccounts[addr] = true + } + } + } if oldHead != nil && oldHead.Hash() != newHead.ParentHash { // If the reorg is too deep, avoid doing it (will happen during fast sync) @@ -1391,6 +1397,7 @@ func (pool *TxPool) reset(oldHead, newHead *types.Header) { if depth := uint64(math.Abs(float64(oldNum) - float64(newNum))); depth > 64 { log.Debug("Skipping deep transaction reorg", "depth", depth) + affectedAccounts = nil // do a deep txPool reorg } else { // Reorg seems shallow enough to pull in all transactions into memory var discarded, included types.Transactions @@ -1407,7 +1414,7 @@ func (pool *TxPool) reset(oldHead, newHead *types.Header) { // If we reorged to a same or higher number, then it's not a case of setHead log.Warn("Transaction pool reset with missing oldhead", "old", oldHead.Hash(), "oldnum", oldNum, "new", newHead.Hash(), "newnum", newNum) - return + return nil } // If the reorg ended up on a lower number, it's indicative of setHead being the cause log.Debug("Skipping transaction reset caused by setHead", @@ -1418,44 +1425,48 @@ func (pool *TxPool) reset(oldHead, newHead *types.Header) { discarded = append(discarded, rem.Transactions()...) if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil { log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash()) - return + return nil } } for add.NumberU64() > rem.NumberU64() { included = append(included, add.Transactions()...) if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil { log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash()) - return + return nil } } for rem.Hash() != add.Hash() { discarded = append(discarded, rem.Transactions()...) if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil { log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash()) - return + return nil } included = append(included, add.Transactions()...) if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil { log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash()) - return + return nil } } reinject = types.TxDifference(discarded, included) + collectAffectedAccounts(reinject) + collectAffectedAccounts(included) } } } // Initialize the internal state to the current head if newHead == nil { + affectedAccounts = nil newHead = pool.chain.CurrentBlock().Header() // Special case during testing } statedb, err := pool.chain.StateAt(newHead.Root) if err != nil { log.Error("Failed to reset txpool state", "err", err) - return + return nil } pool.currentState = statedb pool.pendingNonces = newTxNoncer(statedb) pool.currentMaxGas = newHead.GasLimit + collectAffectedAccounts(pool.chain.GetBlock(newHead.Hash(), newHead.Number.Uint64()).Transactions()) // Inject any transactions discarded due to reorgs log.Debug("Reinjecting stale transactions", "count", len(reinject)) @@ -1472,6 +1483,7 @@ func (pool *TxPool) reset(oldHead, newHead *types.Header) { // Update current head pool.currentHead = next + return affectedAccounts } // promoteExecutables moves transactions that have become processable from the @@ -1706,9 +1718,14 @@ func (pool *TxPool) truncateQueue() { // Note: transactions are not marked as removed in the priced list because re-heaping // is always explicitly triggered by SetBaseFee and it would be unnecessary and wasteful // to trigger a re-heap is this function -func (pool *TxPool) demoteUnexecutables() { +func (pool *TxPool) demoteUnexecutables(affectedAccounts map[common.Address]bool) { + log.Info("Demoting unexecutable transactions", "affected", len(affectedAccounts)) // Iterate over all accounts and demote any non-executable transactions for addr, list := range pool.pending { + if affectedAccounts != nil && !affectedAccounts[addr] { + continue + } + nonce := pool.currentState.GetNonce(addr) // Drop all transactions that are deemed too old (low nonce) @@ -1772,24 +1789,6 @@ func (pool *TxPool) calculateTxsLifecycle(txs types.Transactions, t time.Time) { } } -// PauseReorgs stops any new reorg jobs to be started but doesn't interrupt any existing ones that are in flight -// Keep in mind this function might block, although it is not expected to block for any significant amount of time -func (pool *TxPool) PauseReorgs() { - select { - case pool.reorgPauseCh <- true: - case <-pool.reorgShutdownCh: - } -} - -// ResumeReorgs allows new reorg jobs to be started. -// Keep in mind this function might block, although it is not expected to block for any significant amount of time -func (pool *TxPool) ResumeReorgs() { - select { - case pool.reorgPauseCh <- false: - case <-pool.reorgShutdownCh: - } -} - // addressByHeartbeat is an account address tagged with its last activity timestamp. type addressByHeartbeat struct { address common.Address diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index 525deda66b9b..3d5172dc00d7 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -2463,7 +2463,7 @@ func benchmarkPendingDemotion(b *testing.B, size int) { // Benchmark the speed of pool validation b.ResetTimer() for i := 0; i < b.N; i++ { - pool.demoteUnexecutables() + pool.demoteUnexecutables(nil) } } diff --git a/miner/scroll_worker.go b/miner/scroll_worker.go index 02f331bc200a..8f7c76c69025 100644 --- a/miner/scroll_worker.go +++ b/miner/scroll_worker.go @@ -569,9 +569,6 @@ func (w *worker) processTxPool() (bool, error) { // Fill the block with all available pending transactions. pending := w.eth.TxPool().PendingWithMax(false, w.config.MaxAccountsNum) - // Allow txpool to be reorged as we build current block - w.eth.TxPool().ResumeReorgs() - // Split the pending transactions into locals and remotes localTxs, remoteTxs := make(map[common.Address]types.Transactions), pending for _, account := range w.eth.TxPool().Locals() { @@ -892,10 +889,6 @@ func (w *worker) commit() (common.Hash, error) { } } - // A new block event will trigger a reorg in the txpool, pause reorgs to defer this until we fetch txns for next block. - // We may end up trying to process txns that we already included in the previous block, but they will all fail the nonce check - w.eth.TxPool().PauseReorgs() - // Commit block and state to database. _, err = w.chain.WriteBlockWithState(block, w.current.receipts, w.current.coalescedLogs, w.current.state, true) if err != nil { diff --git a/params/version.go b/params/version.go index dec887edfcb2..47191375efc0 100644 --- a/params/version.go +++ b/params/version.go @@ -24,7 +24,7 @@ import ( const ( VersionMajor = 5 // Major version component of the current release VersionMinor = 7 // Minor version component of the current release - VersionPatch = 21 // Patch version component of the current release + VersionPatch = 22 // Patch version component of the current release VersionMeta = "mainnet" // Version metadata to append to the version string )