Skip to content

Commit

Permalink
Merge pull request #16 from Arindam2407/chunk_impl
Browse files Browse the repository at this point in the history
Commit 1 to main
  • Loading branch information
Arindam2407 authored Nov 28, 2023
2 parents 503c401 + a23ca3e commit 0f951e4
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 35 deletions.
62 changes: 31 additions & 31 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

100 changes: 96 additions & 4 deletions crates/primitives/src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ use alloy_rlp::{
use bytes::{Buf, BytesMut};
use derive_more::{AsRef, Deref};
use once_cell::sync::Lazy;
use rayon::prelude::{IntoParallelIterator, ParallelIterator};
use rayon::prelude::*;
use reth_codecs::{add_arbitrary_tests, derive_arbitrary, Compact};
use serde::{Deserialize, Serialize};
use std::mem;
use std::{mem, sync::mpsc, thread};

pub use access_list::{AccessList, AccessListItem};
pub use eip1559::TxEip1559;
Expand Down Expand Up @@ -849,7 +849,53 @@ impl TransactionSignedNoHash {
if num_txes < *PARALLEL_SENDER_RECOVERY_THRESHOLD {
txes.into_iter().map(|tx| tx.recover_signer()).collect()
} else {
txes.into_par_iter().map(|tx| tx.recover_signer()).collect()
let mut recovered_signers: Vec<Address> = Vec::new();
let mut channels = Vec::new();
rayon::scope(|s| {
let (chunk_size, chunks) = if num_txes < 16 {
(num_txes, 2)
} else {
let chunk_size = num_txes / (num_txes / 16);
let chunks = num_txes / chunk_size + 1;
(chunk_size, chunks)
};
let mut iter = txes.into_iter();
(0..chunks).for_each(|i| {
let chunk: Vec<&TransactionSignedNoHash> = if i == chunks - 1 {
iter.by_ref().take(num_txes % chunk_size).collect()
} else {
iter.by_ref().take(chunk_size).collect()
};
let (recovered_senders_tx, recovered_senders_rx) = mpsc::channel();
channels.push(recovered_senders_rx);
// Spawn the sender recovery task onto the global rayon pool
// This task will send the results through the channel after it recovered
// the senders.
s.spawn(move |_| {
for tx in chunk {
let recovery_result = tx.recover_signer();
let _ = recovered_senders_tx.send(recovery_result);
}
});
})
});
thread::spawn(move || {
for channel in channels {
while let Ok(recovered) = channel.recv() {
match recovered {
Some(signer) => {
recovered_signers.push(signer);
}
None => {
return None;
}
}
}
}
Some(recovered_signers)
})
.join()
.unwrap()
}
}
}
Expand Down Expand Up @@ -1003,7 +1049,53 @@ impl TransactionSigned {
if num_txes < *PARALLEL_SENDER_RECOVERY_THRESHOLD {
txes.into_iter().map(|tx| tx.recover_signer()).collect()
} else {
txes.into_par_iter().map(|tx| tx.recover_signer()).collect()
let mut recovered_signers: Vec<Address> = Vec::new();
let mut channels = Vec::new();
rayon::scope(|s| {
let (chunk_size, chunks) = if num_txes < 16 {
(num_txes, 2)
} else {
let chunk_size = num_txes / (num_txes / 16);
let chunks = num_txes / chunk_size + 1;
(chunk_size, chunks)
};
let mut iter = txes.into_iter();
(0..chunks).for_each(|i| {
let chunk: Vec<&TransactionSigned> = if i == chunks - 1 {
iter.by_ref().take(num_txes % chunk_size).collect()
} else {
iter.by_ref().take(chunk_size).collect()
};
let (recovered_senders_tx, recovered_senders_rx) = mpsc::channel();
channels.push(recovered_senders_rx);
// Spawn the sender recovery task onto the global rayon pool
// This task will send the results through the channel after it recovered
// the senders.
s.spawn(move |_| {
for tx in chunk {
let recovery_result = tx.recover_signer();
let _ = recovered_senders_tx.send(recovery_result);
}
});
})
});
thread::spawn(move || {
for channel in channels {
while let Ok(recovered) = channel.recv() {
match recovered {
Some(signer) => {
recovered_signers.push(signer);
}
None => {
return None;
}
}
}
}
Some(recovered_signers)
})
.join()
.unwrap()
}
}

Expand Down

0 comments on commit 0f951e4

Please sign in to comment.