Skip to content

Commit

Permalink
Chunk Impl for Parallel Sender Recovery
Browse files Browse the repository at this point in the history
  • Loading branch information
Arindam2407 committed Nov 22, 2023
1 parent 3598a23 commit ac2bc98
Showing 1 changed file with 125 additions and 5 deletions.
130 changes: 125 additions & 5 deletions crates/primitives/src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ use alloy_rlp::{
use bytes::{Buf, BytesMut};
use derive_more::{AsRef, Deref};
use once_cell::sync::Lazy;
use rayon::prelude::{IntoParallelIterator, ParallelIterator};
use rayon::prelude::*;
use reth_codecs::{add_arbitrary_tests, derive_arbitrary, Compact};
use serde::{Deserialize, Serialize};
use std::mem;
use std::{mem, sync::mpsc, thread};

pub use access_list::{AccessList, AccessListItem};
pub use eip1559::TxEip1559;
Expand Down Expand Up @@ -833,6 +833,16 @@ impl TransactionSignedNoHash {
self.signature.recover_signer(signature_hash)
}

/// Recover signer helper for parallel impl.
///
/// Returns `None` if the transaction's signature is invalid, see also [Self::recover_signer].
pub fn recover_signer_helper(&self, rlp_buf: &mut Vec<u8>) -> Option<Address> {
let tx = self;
tx.transaction.encode_without_signature(rlp_buf);

tx.signature.recover_signer(keccak256(rlp_buf))
}

/// Converts into a transaction type with its hash: [`TransactionSigned`].
pub fn with_hash(self) -> TransactionSigned {
self.into()
Expand All @@ -849,7 +859,57 @@ impl TransactionSignedNoHash {
if num_txes < *PARALLEL_SENDER_RECOVERY_THRESHOLD {
txes.into_iter().map(|tx| tx.recover_signer()).collect()
} else {
txes.into_par_iter().map(|tx| tx.recover_signer()).collect()
let mut recovered_signers: Vec<Address> = Vec::new();
let mut channels = Vec::new();
rayon::scope(|s| {
let (chunk_size, chunks) =
if num_txes < (rayon::current_num_threads() * rayon::current_num_threads()) {
(num_txes, 2)
} else {
let chunk_size = num_txes /
(rayon::current_num_threads() * rayon::current_num_threads());
let chunks = num_txes / chunk_size + 1;
(chunk_size, chunks)
};
let mut iter = txes.into_iter();
(0..chunks).for_each(|i| {
let chunk: Vec<&TransactionSignedNoHash> = if i == chunks - 1 {
iter.by_ref().take(num_txes % chunk_size).collect()
} else {
iter.by_ref().take(chunk_size).collect()
};
let (recovered_senders_tx, recovered_senders_rx) = mpsc::channel();
channels.push(recovered_senders_rx);
// Spawn the sender recovery task onto the global rayon pool
// This task will send the results through the channel after it recovered
// the senders.
s.spawn(move |_| {
let mut rlp_buf = Vec::with_capacity(128);
for tx in chunk {
rlp_buf.clear();
let recovery_result = tx.recover_signer_helper(&mut rlp_buf);
let _ = recovered_senders_tx.send(recovery_result);
}
});
})
});
thread::spawn(move || {
for channel in channels {
while let Ok(recovered) = channel.recv() {
match recovered {
Some(signer) => {
recovered_signers.push(signer);
}
None => {
return None;
}
}
}
}
Some(recovered_signers)
})
.join()
.unwrap()
}
}
}
Expand Down Expand Up @@ -992,6 +1052,16 @@ impl TransactionSigned {
self.signature.recover_signer(signature_hash)
}

/// Recover signer helper for parallel impl.
///
/// Returns `None` if the transaction's signature is invalid, see also [Self::recover_signer].
pub fn recover_signer_helper(&self, rlp_buf: &mut Vec<u8>) -> Option<Address> {
let tx = self;
tx.transaction.encode_without_signature(rlp_buf);

tx.signature.recover_signer(keccak256(rlp_buf))
}

/// Recovers a list of signers from a transaction list iterator
///
/// Returns `None`, if some transaction's signature is invalid, see also
Expand All @@ -1003,7 +1073,57 @@ impl TransactionSigned {
if num_txes < *PARALLEL_SENDER_RECOVERY_THRESHOLD {
txes.into_iter().map(|tx| tx.recover_signer()).collect()
} else {
txes.into_par_iter().map(|tx| tx.recover_signer()).collect()
let mut recovered_signers: Vec<Address> = Vec::new();
let mut channels = Vec::new();
rayon::scope(|s| {
let (chunk_size, chunks) =
if num_txes < (rayon::current_num_threads() * rayon::current_num_threads()) {
(num_txes, 2)
} else {
let chunk_size = num_txes /
(rayon::current_num_threads() * rayon::current_num_threads());
let chunks = num_txes / chunk_size + 1;
(chunk_size, chunks)
};
let mut iter = txes.into_iter();
(0..chunks).for_each(|i| {
let chunk: Vec<&TransactionSigned> = if i == chunks - 1 {
iter.by_ref().take(num_txes % chunk_size).collect()
} else {
iter.by_ref().take(chunk_size).collect()
};
let (recovered_senders_tx, recovered_senders_rx) = mpsc::channel();
channels.push(recovered_senders_rx);
// Spawn the sender recovery task onto the global rayon pool
// This task will send the results through the channel after it recovered
// the senders.
s.spawn(move |_| {
let mut rlp_buf = Vec::with_capacity(128);
for tx in chunk {
rlp_buf.clear();
let recovery_result = tx.recover_signer_helper(&mut rlp_buf);
let _ = recovered_senders_tx.send(recovery_result);
}
});
})
});
thread::spawn(move || {
for channel in channels {
while let Ok(recovered) = channel.recv() {
match recovered {
Some(signer) => {
recovered_signers.push(signer);
}
None => {
return None;
}
}
}
}
Some(recovered_signers)
})
.join()
.unwrap()
}
}

Expand Down Expand Up @@ -1243,7 +1363,7 @@ impl TransactionSigned {
/// To decode EIP-4844 transactions in `eth_sendRawTransaction`, use
/// [PooledTransactionsElement::decode_enveloped].
pub fn decode_enveloped(data: &mut &[u8]) -> alloy_rlp::Result<Self> {
if data.is_empty() {
if data.is_empty() {
return Err(RlpError::InputTooShort)
}

Expand Down

0 comments on commit ac2bc98

Please sign in to comment.