Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Assign specific jobs to dedicated workers #564

Open
wants to merge 16 commits into
base: develop
Choose a base branch
from
Open
19 changes: 3 additions & 16 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ num-bigint = "0.4.5"
num-traits = "0.2.19"
nunny = "0.2.1"
once_cell = "1.19.0"
paladin-core = "0.4.2"
paladin-core = { git = "https://github.com/0xPolygonZero/paladin.git", branch = "arpit/507-2" }
temaniarpit27 marked this conversation as resolved.
Show resolved Hide resolved
parking_lot = "0.12.3"
paste = "1.0.15"
pest = "2.7.10"
Expand Down Expand Up @@ -139,4 +139,3 @@ trybuild = "1.0"

[workspace.lints.clippy]
too_long_first_doc_paragraph = "allow"

14 changes: 13 additions & 1 deletion zero_bin/leader/src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use std::path::PathBuf;

use alloy::transports::http::reqwest::Url;
use clap::{Parser, Subcommand, ValueHint};
use clap::{Parser, Subcommand, ValueEnum, ValueHint};
use prover::cli::CliProverConfig;
use rpc::RpcType;
use zero_bin_common::prover_state::cli::CliProverStateConfig;

const WORKER_HELP_HEADING: &str = "Worker Config options";

/// zero-bin leader config
#[derive(Parser)]
pub(crate) struct Cli {
Expand All @@ -22,6 +24,16 @@ pub(crate) struct Cli {
// mode.
#[clap(flatten)]
pub(crate) prover_state_config: CliProverStateConfig,

// Mode to use for worker for setup (split or unified)
temaniarpit27 marked this conversation as resolved.
Show resolved Hide resolved
#[arg(long = "worker-run-mode", help_heading = WORKER_HELP_HEADING, value_enum, default_value = "unified")]
pub(crate) worker_run_mode: WorkerRunMode,
}

#[derive(ValueEnum, Clone, PartialEq, Debug)]
pub enum WorkerRunMode {
Split,
Unified,
}

#[derive(Subcommand)]
Expand Down
9 changes: 6 additions & 3 deletions zero_bin/leader/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ pub struct ProofParams {

/// The main function for the client.
pub(crate) async fn client_main(
runtime: Runtime,
block_proof_runtime: Runtime,
segment_proof_runtime: Runtime,
temaniarpit27 marked this conversation as resolved.
Show resolved Hide resolved
rpc_params: RpcParams,
block_interval: BlockInterval,
mut params: ProofParams,
Expand Down Expand Up @@ -82,13 +83,15 @@ pub(crate) async fn client_main(
// verify the whole sequence.
let proved_blocks = prover::prove(
block_prover_inputs,
&runtime,
&block_proof_runtime,
&segment_proof_runtime,
params.previous_proof.take(),
params.prover_config,
params.proof_output_dir.clone(),
)
.await;
runtime.close().await?;
block_proof_runtime.close().await?;
segment_proof_runtime.close().await?;
let proved_blocks = proved_blocks?;

if params.prover_config.test_only {
Expand Down
27 changes: 20 additions & 7 deletions zero_bin/leader/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,31 @@ use tracing::{debug, error, info};

/// The main function for the HTTP mode.
pub(crate) async fn http_main(
runtime: Runtime,
block_proof_runtime: Runtime,
segment_proof_runtime: Runtime,
port: u16,
output_dir: PathBuf,
prover_config: ProverConfig,
) -> Result<()> {
let addr = SocketAddr::from(([0, 0, 0, 0], port));
debug!("listening on {}", addr);

let runtime = Arc::new(runtime);
let block_proof_runtime = Arc::new(block_proof_runtime);
temaniarpit27 marked this conversation as resolved.
Show resolved Hide resolved
let segment_proof_runtime = Arc::new(segment_proof_runtime);
let app = Router::new().route(
"/prove",
post({
let runtime = runtime.clone();
move |body| prove(body, runtime, output_dir.clone(), prover_config)
let block_proof_runtime = block_proof_runtime.clone();
let segment_proof_runtime = segment_proof_runtime.clone();
move |body| {
prove(
body,
block_proof_runtime,
segment_proof_runtime,
output_dir.clone(),
prover_config,
)
}
}),
);
let listener = tokio::net::TcpListener::bind(&addr).await?;
Expand Down Expand Up @@ -63,7 +74,8 @@ struct HttpProverInput {

async fn prove(
Json(payload): Json<HttpProverInput>,
runtime: Arc<Runtime>,
block_proof_runtime: Arc<Runtime>,
segment_proof_runtime: Arc<Runtime>,
output_dir: PathBuf,
prover_config: ProverConfig,
) -> StatusCode {
Expand All @@ -75,7 +87,7 @@ async fn prove(
payload
.prover_input
.prove_test(
&runtime,
&segment_proof_runtime,
payload.previous.map(futures::future::ok),
prover_config,
)
Expand All @@ -84,7 +96,8 @@ async fn prove(
payload
.prover_input
.prove(
&runtime,
&block_proof_runtime,
&segment_proof_runtime,
payload.previous.map(futures::future::ok),
prover_config,
)
Expand Down
45 changes: 40 additions & 5 deletions zero_bin/leader/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ fn get_previous_proof(path: Option<PathBuf>) -> Result<Option<GeneratedBlockProo
Ok(Some(proof))
}

const SEGMENT_PROOF_ROUTING_KEY: &str = "segment_proof";
muursh marked this conversation as resolved.
Show resolved Hide resolved
const BLOCK_PROOF_ROUTING_KEY: &str = "block_proof";
const DEFAULT_ROUTING_KEY: &str = paladin::runtime::DEFAULT_ROUTING_KEY;
temaniarpit27 marked this conversation as resolved.
Show resolved Hide resolved

#[tokio::main]
async fn main() -> Result<()> {
load_dotenvy_vars_if_present();
Expand All @@ -55,7 +59,25 @@ async fn main() -> Result<()> {

let args = cli::Cli::parse();

let runtime = Runtime::from_config(&args.paladin, register()).await?;
let mut block_proof_routing_key = DEFAULT_ROUTING_KEY.to_string();
let mut segment_proof_routing_key = DEFAULT_ROUTING_KEY.to_string();
if args.worker_run_mode == cli::WorkerRunMode::Split {
temaniarpit27 marked this conversation as resolved.
Show resolved Hide resolved
// If we're running in split mode, we need to set the routing key for the
// block proof and segment proof.
info!("Workers running in split mode");
block_proof_routing_key = BLOCK_PROOF_ROUTING_KEY.to_string();
segment_proof_routing_key = SEGMENT_PROOF_ROUTING_KEY.to_string();
}

let mut block_proof_paladin_args = args.paladin.clone();
temaniarpit27 marked this conversation as resolved.
Show resolved Hide resolved
block_proof_paladin_args.task_bus_routing_key = Some(block_proof_routing_key);

let mut segment_proof_paladin_args = args.paladin.clone();
segment_proof_paladin_args.task_bus_routing_key = Some(segment_proof_routing_key);

let block_proof_runtime = Runtime::from_config(&block_proof_paladin_args, register()).await?;
let segment_proof_runtime =
Runtime::from_config(&segment_proof_paladin_args, register()).await?;

let prover_config: ProverConfig = args.prover_config.into();

Expand All @@ -73,7 +95,13 @@ async fn main() -> Result<()> {
Command::Clean => zero_bin_common::prover_state::persistence::delete_all()?,
Command::Stdio { previous_proof } => {
let previous_proof = get_previous_proof(previous_proof)?;
stdio::stdio_main(runtime, previous_proof, prover_config).await?;
stdio::stdio_main(
block_proof_runtime,
segment_proof_runtime,
previous_proof,
prover_config,
)
.await?;
}
Command::Http { port, output_dir } => {
// check if output_dir exists, is a directory, and is writable
Expand All @@ -85,7 +113,14 @@ async fn main() -> Result<()> {
panic!("output-dir is not a writable directory");
}

http::http_main(runtime, port, output_dir, prover_config).await?;
http::http_main(
block_proof_runtime,
segment_proof_runtime,
port,
output_dir,
prover_config,
)
.await?;
}
Command::Rpc {
rpc_url,
Expand All @@ -99,7 +134,6 @@ async fn main() -> Result<()> {
backoff,
max_retries,
} => {
let runtime = Runtime::from_config(&args.paladin, register()).await?;
let previous_proof = get_previous_proof(previous_proof)?;
let mut block_interval = BlockInterval::new(&block_interval)?;

Expand All @@ -113,7 +147,8 @@ async fn main() -> Result<()> {

info!("Proving interval {block_interval}");
client_main(
runtime,
block_proof_runtime,
segment_proof_runtime,
RpcParams {
rpc_url,
rpc_type,
Expand Down
17 changes: 13 additions & 4 deletions zero_bin/leader/src/stdio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use tracing::info;

/// The main function for the stdio mode.
pub(crate) async fn stdio_main(
runtime: Runtime,
block_proof_runtime: Runtime,
segment_proof_runtime: Runtime,
previous: Option<GeneratedBlockProof>,
prover_config: ProverConfig,
) -> Result<()> {
Expand All @@ -21,9 +22,17 @@ pub(crate) async fn stdio_main(
.map(Into::into)
.collect::<Vec<BlockProverInputFuture>>();

let proved_blocks =
prover::prove(block_prover_inputs, &runtime, previous, prover_config, None).await;
runtime.close().await?;
let proved_blocks = prover::prove(
block_prover_inputs,
&block_proof_runtime,
&segment_proof_runtime,
previous,
prover_config,
None,
)
.await;
block_proof_runtime.close().await?;
segment_proof_runtime.close().await?;
let proved_blocks = proved_blocks?;

if prover_config.test_only {
Expand Down
25 changes: 16 additions & 9 deletions zero_bin/prover/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ impl BlockProverInput {

pub async fn prove(
self,
runtime: &Runtime,
block_proof_runtime: &Runtime,
segment_proof_runtime: &Runtime,
previous: Option<impl Future<Output = Result<GeneratedBlockProof>>>,
prover_config: ProverConfig,
) -> Result<GeneratedBlockProof> {
Expand Down Expand Up @@ -99,7 +100,7 @@ impl BlockProverInput {

Directive::map(IndexedStream::from(segment_data_iterator), &seg_prove_ops)
.fold(&seg_agg_ops)
.run(runtime)
.run(segment_proof_runtime)
.map(move |e| {
e.map(|p| (idx, proof_gen::proof_types::BatchAggregatableProof::from(p)))
})
Expand All @@ -109,7 +110,7 @@ impl BlockProverInput {
// Fold the batch aggregated proof stream into a single proof.
let final_batch_proof =
Directive::fold(IndexedStream::new(batch_proof_futs), &batch_agg_ops)
.run(runtime)
.run(block_proof_runtime)
.await?;

if let proof_gen::proof_types::BatchAggregatableProof::Agg(proof) = final_batch_proof {
Expand All @@ -126,7 +127,7 @@ impl BlockProverInput {
prev,
save_inputs_on_error,
})
.run(runtime)
.run(block_proof_runtime)
.await?;

info!("Successfully proved block {block_number}");
Expand All @@ -139,7 +140,7 @@ impl BlockProverInput {

pub async fn prove_test(
self,
runtime: &Runtime,
segment_proof_runtime: &Runtime,
previous: Option<impl Future<Output = Result<GeneratedBlockProof>>>,
prover_config: ProverConfig,
) -> Result<GeneratedBlockProof> {
Expand Down Expand Up @@ -175,7 +176,7 @@ impl BlockProverInput {
);

simulation
.run(runtime)
.run(segment_proof_runtime)
.await?
.try_for_each(|_| future::ok(()))
.await?;
Expand Down Expand Up @@ -204,7 +205,8 @@ impl BlockProverInput {
/// block proofs as well.
pub async fn prove(
block_prover_inputs: Vec<BlockProverInputFuture>,
runtime: &Runtime,
block_proof_runtime: &Runtime,
segment_proof_runtime: &Runtime,
previous_proof: Option<GeneratedBlockProof>,
prover_config: ProverConfig,
proof_output_dir: Option<PathBuf>,
Expand All @@ -226,7 +228,7 @@ pub async fn prove(
// Prove the block
let block_proof = if prover_config.test_only {
block
.prove_test(runtime, previous_block_proof, prover_config)
.prove_test(segment_proof_runtime, previous_block_proof, prover_config)
.then(move |proof| async move {
let proof = proof?;
let block_number = proof.b_height;
Expand All @@ -250,7 +252,12 @@ pub async fn prove(
.await?
} else {
block
.prove(runtime, previous_block_proof, prover_config)
.prove(
block_proof_runtime,
segment_proof_runtime,
previous_block_proof,
prover_config,
)
.then(move |proof| async move {
let proof = proof?;
let block_number = proof.b_height;
Expand Down
Loading
Loading