Skip to content

Commit

Permalink
feat(bin): disable ETA for execution stage (paradigmxyz#5839)
Browse files Browse the repository at this point in the history
  • Loading branch information
shekhirin authored Dec 22, 2023
1 parent f3aa296 commit 099c833
Showing 1 changed file with 63 additions and 38 deletions.
101 changes: 63 additions & 38 deletions bin/reth/src/node/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,20 +66,30 @@ impl<DB> NodeState<DB> {
target,
};

let progress = OptionalField(
let stage_progress = OptionalField(
checkpoint.entities().and_then(|entities| entities.fmt_percentage()),
);
let eta = current_stage.eta.fmt_for_stage(stage_id);

info!(
pipeline_stages = %pipeline_stages_progress,
stage = %stage_id,
checkpoint = %checkpoint.block_number,
target = %OptionalField(target),
%progress,
%eta,
"Executing stage",
);
if let Some(stage_eta) = current_stage.eta.fmt_for_stage(stage_id) {
info!(
pipeline_stages = %pipeline_stages_progress,
stage = %stage_id,
checkpoint = %checkpoint.block_number,
target = %OptionalField(target),
%stage_progress,
%stage_eta,
"Executing stage",
);
} else {
info!(
pipeline_stages = %pipeline_stages_progress,
stage = %stage_id,
checkpoint = %checkpoint.block_number,
target = %OptionalField(target),
%stage_progress,
"Executing stage",
);
}

self.current_stage = Some(current_stage);
}
Expand All @@ -97,29 +107,31 @@ impl<DB> NodeState<DB> {
current_stage.eta.update(checkpoint);

let target = OptionalField(current_stage.target);
let progress = OptionalField(
let stage_progress = OptionalField(
checkpoint.entities().and_then(|entities| entities.fmt_percentage()),
);

if done {
let message =
if done { "Stage finished executing" } else { "Stage committed progress" };

if let Some(stage_eta) = current_stage.eta.fmt_for_stage(stage_id) {
info!(
pipeline_stages = %pipeline_stages_progress,
stage = %stage_id,
checkpoint = %checkpoint.block_number,
%target,
%progress,
"Stage finished executing",
%stage_progress,
%stage_eta,
message,
)
} else {
let eta = current_stage.eta.fmt_for_stage(stage_id);
info!(
pipeline_stages = %pipeline_stages_progress,
stage = %stage_id,
checkpoint = %checkpoint.block_number,
%target,
%progress,
%eta,
"Stage committed progress",
%stage_progress,
message,
)
}
}
Expand Down Expand Up @@ -316,22 +328,34 @@ where
if let Some(CurrentStage { stage_id, eta, checkpoint, target }) =
&this.state.current_stage
{
let progress = OptionalField(
let stage_progress = OptionalField(
checkpoint.entities().and_then(|entities| entities.fmt_percentage()),
);
let eta = eta.fmt_for_stage(*stage_id);

info!(
target: "reth::cli",
connected_peers = this.state.num_connected_peers(),
%freelist,
stage = %stage_id,
checkpoint = checkpoint.block_number,
target = %OptionalField(*target),
%progress,
%eta,
"Status"
);
if let Some(stage_eta) = eta.fmt_for_stage(*stage_id) {
info!(
target: "reth::cli",
connected_peers = this.state.num_connected_peers(),
%freelist,
stage = %stage_id,
checkpoint = checkpoint.block_number,
target = %OptionalField(*target),
%stage_progress,
%stage_eta,
"Status"
);
} else {
info!(
target: "reth::cli",
connected_peers = this.state.num_connected_peers(),
%freelist,
stage = %stage_id,
checkpoint = checkpoint.block_number,
target = %OptionalField(*target),
%stage_progress,
"Status"
);
}
} else if let Some(latest_block) = this.state.latest_block {
info!(
target: "reth::cli",
Expand Down Expand Up @@ -410,13 +434,14 @@ impl Eta {

/// Format ETA for a given stage.
///
/// NOTE: Currently ETA is disabled for Headers and Bodies stages until we find better
/// heuristics for calculation.
fn fmt_for_stage(&self, stage: StageId) -> String {
if matches!(stage, StageId::Headers | StageId::Bodies) {
String::from("unknown")
/// NOTE: Currently ETA is enabled only for the stages that have predictable progress.
/// It's not the case for network-dependent ([StageId::Headers] and [StageId::Bodies]) and
/// [StageId::Execution] stages.
fn fmt_for_stage(&self, stage: StageId) -> Option<String> {
if matches!(stage, StageId::Headers | StageId::Bodies | StageId::Execution) {
None
} else {
format!("{}", self)
Some(self.to_string())
}
}
}
Expand Down

0 comments on commit 099c833

Please sign in to comment.