Skip to content

Commit

Permalink
Optimize Sender Recovery Process (#11385)
Browse files Browse the repository at this point in the history
  • Loading branch information
TheDhejavu authored Oct 18, 2024
1 parent 9c8f5d8 commit 587c91f
Showing 1 changed file with 86 additions and 60 deletions.
146 changes: 86 additions & 60 deletions crates/stages/stages/src/stages/sender_recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ const BATCH_SIZE: usize = 100_000;
/// Maximum number of senders to recover per rayon worker job.
const WORKER_CHUNK_SIZE: usize = 100;

/// Type alias for a sender that transmits the result of sender recovery.
type RecoveryResultSender = mpsc::Sender<Result<(u64, Address), Box<SenderRecoveryStageError>>>;

/// The sender recovery stage iterates over existing transactions,
/// recovers the transaction signer and stores them
/// in [`TransactionSenders`][reth_db::tables::TransactionSenders] table.
Expand Down Expand Up @@ -100,8 +103,10 @@ where
.map(|start| start..std::cmp::min(start + BATCH_SIZE as u64, tx_range.end))
.collect::<Vec<Range<u64>>>();

let tx_batch_sender = setup_range_recovery(provider);

for range in batch {
recover_range(range, provider, &mut senders_cursor)?;
recover_range(range, provider, tx_batch_sender.clone(), &mut senders_cursor)?;
}

Ok(ExecOutput {
Expand Down Expand Up @@ -136,15 +141,16 @@ where
fn recover_range<Provider, CURSOR>(
tx_range: Range<u64>,
provider: &Provider,
tx_batch_sender: mpsc::Sender<Vec<(Range<u64>, RecoveryResultSender)>>,
senders_cursor: &mut CURSOR,
) -> Result<(), StageError>
where
Provider: DBProvider + HeaderProvider + StaticFileProviderFactory,
CURSOR: DbCursorRW<tables::TransactionSenders>,
{
debug!(target: "sync::stages::sender_recovery", ?tx_range, "Recovering senders batch");
debug!(target: "sync::stages::sender_recovery", ?tx_range, "Sending batch for processing");

// Preallocate channels
// Preallocate channels for each chunks in the batch
let (chunks, receivers): (Vec<_>, Vec<_>) = tx_range
.clone()
.step_by(WORKER_CHUNK_SIZE)
Expand All @@ -156,62 +162,9 @@ where
})
.unzip();

let static_file_provider = provider.static_file_provider();

// We do not use `tokio::task::spawn_blocking` because, during a shutdown,
// there will be a timeout grace period in which Tokio does not allow spawning
// additional blocking tasks. This would cause this function to return
// `SenderRecoveryStageError::RecoveredSendersMismatch` at the end.
//
// However, using `std::thread::spawn` allows us to utilize the timeout grace
// period to complete some work without throwing errors during the shutdown.
std::thread::spawn(move || {
for (chunk_range, recovered_senders_tx) in chunks {
// Read the raw value, and let the rayon worker to decompress & decode.
let chunk = match static_file_provider.fetch_range_with_predicate(
StaticFileSegment::Transactions,
chunk_range.clone(),
|cursor, number| {
Ok(cursor
.get_one::<TransactionMask<RawValue<TransactionSignedNoHash>>>(
number.into(),
)?
.map(|tx| (number, tx)))
},
|_| true,
) {
Ok(chunk) => chunk,
Err(err) => {
// We exit early since we could not process this chunk.
let _ = recovered_senders_tx
.send(Err(Box::new(SenderRecoveryStageError::StageError(err.into()))));
break
}
};

// Spawn the task onto the global rayon pool
// This task will send the results through the channel after it has read the transaction
// and calculated the sender.
rayon::spawn(move || {
let mut rlp_buf = Vec::with_capacity(128);
for (number, tx) in chunk {
let res = tx
.value()
.map_err(|err| Box::new(SenderRecoveryStageError::StageError(err.into())))
.and_then(|tx| recover_sender((number, tx), &mut rlp_buf));

let is_err = res.is_err();

let _ = recovered_senders_tx.send(res);

// Finish early
if is_err {
break
}
}
});
}
});
if let Some(err) = tx_batch_sender.send(chunks).err() {
return Err(StageError::Fatal(err.into()));
}

debug!(target: "sync::stages::sender_recovery", ?tx_range, "Appending recovered senders to the database");

Expand All @@ -235,6 +188,7 @@ where
provider.sealed_header(block_number)?.ok_or_else(|| {
ProviderError::HeaderNotFound(block_number.into())
})?;

Err(StageError::Block {
block: Box::new(sealed_header),
error: BlockErrorKind::Validation(
Expand Down Expand Up @@ -269,10 +223,82 @@ where
.into(),
));
}

Ok(())
}

/// Spawns a thread to handle the recovery of transaction senders for
/// specified chunks of a given batch. It processes incoming ranges, fetching and recovering
/// transactions in parallel using global rayon pool
fn setup_range_recovery<Provider>(
provider: &Provider,
) -> mpsc::Sender<Vec<(Range<u64>, RecoveryResultSender)>>
where
Provider: DBProvider + HeaderProvider + StaticFileProviderFactory,
{
let (tx_sender, tx_receiver) = mpsc::channel::<Vec<(Range<u64>, RecoveryResultSender)>>();
let static_file_provider = provider.static_file_provider();

// We do not use `tokio::task::spawn_blocking` because, during a shutdown,
// there will be a timeout grace period in which Tokio does not allow spawning
// additional blocking tasks. This would cause this function to return
// `SenderRecoveryStageError::RecoveredSendersMismatch` at the end.
//
// However, using `std::thread::spawn` allows us to utilize the timeout grace
// period to complete some work without throwing errors during the shutdown.
std::thread::spawn(move || {
while let Ok(chunks) = tx_receiver.recv() {
for (chunk_range, recovered_senders_tx) in chunks {
// Read the raw value, and let the rayon worker to decompress & decode.
let chunk = match static_file_provider.fetch_range_with_predicate(
StaticFileSegment::Transactions,
chunk_range.clone(),
|cursor, number| {
Ok(cursor
.get_one::<TransactionMask<RawValue<TransactionSignedNoHash>>>(
number.into(),
)?
.map(|tx| (number, tx)))
},
|_| true,
) {
Ok(chunk) => chunk,
Err(err) => {
// We exit early since we could not process this chunk.
let _ = recovered_senders_tx
.send(Err(Box::new(SenderRecoveryStageError::StageError(err.into()))));
break
}
};

// Spawn the task onto the global rayon pool
// This task will send the results through the channel after it has read the
// transaction and calculated the sender.
rayon::spawn(move || {
let mut rlp_buf = Vec::with_capacity(128);
for (number, tx) in chunk {
let res = tx
.value()
.map_err(|err| {
Box::new(SenderRecoveryStageError::StageError(err.into()))
})
.and_then(|tx| recover_sender((number, tx), &mut rlp_buf));

let is_err = res.is_err();

let _ = recovered_senders_tx.send(res);

// Finish early
if is_err {
break
}
}
});
}
}
});
tx_sender
}

#[inline]
fn recover_sender(
(tx_id, tx): (TxNumber, TransactionSignedNoHash),
Expand Down

0 comments on commit 587c91f

Please sign in to comment.