diff --git a/crates/primitives/src/transaction/mod.rs b/crates/primitives/src/transaction/mod.rs index 9702ae19eb4d..c69d5c026cf7 100644 --- a/crates/primitives/src/transaction/mod.rs +++ b/crates/primitives/src/transaction/mod.rs @@ -8,10 +8,10 @@ use alloy_rlp::{ use bytes::{Buf, BytesMut}; use derive_more::{AsRef, Deref}; use once_cell::sync::Lazy; -use rayon::prelude::{IntoParallelIterator, ParallelIterator}; +use rayon::prelude::*; use reth_codecs::{add_arbitrary_tests, derive_arbitrary, Compact}; use serde::{Deserialize, Serialize}; -use std::mem; +use std::{mem, sync::mpsc, thread}; pub use access_list::{AccessList, AccessListItem}; pub use eip1559::TxEip1559; @@ -833,6 +833,16 @@ impl TransactionSignedNoHash { self.signature.recover_signer(signature_hash) } + /// Recover signer helper for parallel impl. + /// + /// Returns `None` if the transaction's signature is invalid, see also [Self::recover_signer]. + pub fn recover_signer_helper(&self, rlp_buf: &mut Vec) -> Option
{ + let tx = self; + tx.transaction.encode_without_signature(rlp_buf); + + tx.signature.recover_signer(keccak256(rlp_buf)) + } + /// Converts into a transaction type with its hash: [`TransactionSigned`]. pub fn with_hash(self) -> TransactionSigned { self.into() @@ -849,7 +859,57 @@ impl TransactionSignedNoHash { if num_txes < *PARALLEL_SENDER_RECOVERY_THRESHOLD { txes.into_iter().map(|tx| tx.recover_signer()).collect() } else { - txes.into_par_iter().map(|tx| tx.recover_signer()).collect() + let mut recovered_signers: Vec
= Vec::new(); + let mut channels = Vec::new(); + rayon::scope(|s| { + let (chunk_size, chunks) = + if num_txes < (rayon::current_num_threads() * rayon::current_num_threads()) { + (num_txes, 2) + } else { + let chunk_size = num_txes / + (rayon::current_num_threads() * rayon::current_num_threads()); + let chunks = num_txes / chunk_size + 1; + (chunk_size, chunks) + }; + let mut iter = txes.into_iter(); + (0..chunks).for_each(|i| { + let chunk: Vec<&TransactionSignedNoHash> = if i == chunks - 1 { + iter.by_ref().take(num_txes % chunk_size).collect() + } else { + iter.by_ref().take(chunk_size).collect() + }; + let (recovered_senders_tx, recovered_senders_rx) = mpsc::channel(); + channels.push(recovered_senders_rx); + // Spawn the sender recovery task onto the global rayon pool + // This task will send the results through the channel after it recovered + // the senders. + s.spawn(move |_| { + let mut rlp_buf = Vec::with_capacity(128); + for tx in chunk { + rlp_buf.clear(); + let recovery_result = tx.recover_signer_helper(&mut rlp_buf); + let _ = recovered_senders_tx.send(recovery_result); + } + }); + }) + }); + thread::spawn(move || { + for channel in channels { + while let Ok(recovered) = channel.recv() { + match recovered { + Some(signer) => { + recovered_signers.push(signer); + } + None => { + return None; + } + } + } + } + Some(recovered_signers) + }) + .join() + .unwrap() } } } @@ -992,6 +1052,16 @@ impl TransactionSigned { self.signature.recover_signer(signature_hash) } + /// Recover signer helper for parallel impl. + /// + /// Returns `None` if the transaction's signature is invalid, see also [Self::recover_signer]. + pub fn recover_signer_helper(&self, rlp_buf: &mut Vec) -> Option
{ + let tx = self; + tx.transaction.encode_without_signature(rlp_buf); + + tx.signature.recover_signer(keccak256(rlp_buf)) + } + /// Recovers a list of signers from a transaction list iterator /// /// Returns `None`, if some transaction's signature is invalid, see also @@ -1003,7 +1073,57 @@ impl TransactionSigned { if num_txes < *PARALLEL_SENDER_RECOVERY_THRESHOLD { txes.into_iter().map(|tx| tx.recover_signer()).collect() } else { - txes.into_par_iter().map(|tx| tx.recover_signer()).collect() + let mut recovered_signers: Vec
= Vec::new(); + let mut channels = Vec::new(); + rayon::scope(|s| { + let (chunk_size, chunks) = + if num_txes < (rayon::current_num_threads() * rayon::current_num_threads()) { + (num_txes, 2) + } else { + let chunk_size = num_txes / + (rayon::current_num_threads() * rayon::current_num_threads()); + let chunks = num_txes / chunk_size + 1; + (chunk_size, chunks) + }; + let mut iter = txes.into_iter(); + (0..chunks).for_each(|i| { + let chunk: Vec<&TransactionSigned> = if i == chunks - 1 { + iter.by_ref().take(num_txes % chunk_size).collect() + } else { + iter.by_ref().take(chunk_size).collect() + }; + let (recovered_senders_tx, recovered_senders_rx) = mpsc::channel(); + channels.push(recovered_senders_rx); + // Spawn the sender recovery task onto the global rayon pool + // This task will send the results through the channel after it recovered + // the senders. + s.spawn(move |_| { + let mut rlp_buf = Vec::with_capacity(128); + for tx in chunk { + rlp_buf.clear(); + let recovery_result = tx.recover_signer_helper(&mut rlp_buf); + let _ = recovered_senders_tx.send(recovery_result); + } + }); + }) + }); + thread::spawn(move || { + for channel in channels { + while let Ok(recovered) = channel.recv() { + match recovered { + Some(signer) => { + recovered_signers.push(signer); + } + None => { + return None; + } + } + } + } + Some(recovered_signers) + }) + .join() + .unwrap() } } @@ -1243,7 +1363,7 @@ impl TransactionSigned { /// To decode EIP-4844 transactions in `eth_sendRawTransaction`, use /// [PooledTransactionsElement::decode_enveloped]. pub fn decode_enveloped(data: &mut &[u8]) -> alloy_rlp::Result { - if data.is_empty() { + if data.is_empty() { return Err(RlpError::InputTooShort) }