Skip to content

Commit

Permalink
Merge branch 'main' into asmaa/consensus_dropped_messages
Browse files Browse the repository at this point in the history
  • Loading branch information
asmaastarkware authored Jul 22, 2024
2 parents e7bcf31 + 55cc85b commit a1103b3
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 65 deletions.
178 changes: 125 additions & 53 deletions crates/sequencing/papyrus_consensus/run_consensus.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,45 @@
import socket
from contextlib import closing


# The SECRET_KEY is used for building the BOOT_NODE_PEER_ID, so they are coupled and must be used together.
SECRET_KEY = "0xabcdabcdabcdabcdabcdabcdabcdabcdabcdabcdabcdabcdabcdabcdabcdabcd"
BOOT_NODE_PEER_ID = "12D3KooWDFYi71juk6dYWo3UDvqs5gAzGDc124LSvcR5d187Tdvi"

MONITORING_PERIOD = 10


class Node:
def __init__(self, validator_id, monitoring_gateway_server_port, cmd):
self.validator_id = validator_id
self.monitoring_gateway_server_port = monitoring_gateway_server_port
self.cmd = cmd
self.process = None
self.height_and_timestamp = (None, None) # (height, timestamp)

def start(self):
self.process = subprocess.Popen(self.cmd, shell=True, preexec_fn=os.setsid)

def stop(self):
if self.process:
os.killpg(os.getpgid(self.process.pid), signal.SIGINT)
self.process.wait()

def get_height(self):
port = self.monitoring_gateway_server_port
command = f"curl -s -X GET http://localhost:{port}/monitoring/metrics | grep -oP 'papyrus_consensus_height \\K\\d+'"
result = subprocess.run(command, shell=True, capture_output=True, text=True)
# returns the most recently decided height, or None if node is not ready or consensus has not yet reached any height.
return int(result.stdout) if result.stdout else None

def check_height(self):
height = self.get_height()
if self.height_and_timestamp[0] != height:
if self.height_and_timestamp[0] is not None and height is not None:
assert height > self.height_and_timestamp[0], "Height should be increasing."
self.height_and_timestamp = (height, time.time())

return self.height_and_timestamp


def find_free_port():
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
Expand All @@ -21,49 +55,105 @@ def find_free_port():
return s.getsockname()[1]


def run_command(command):
return subprocess.run(command, shell=True, check=True)
BOOTNODE_TCP_PORT = find_free_port()


# Returns if the simulation should exit.
def monitor_simulation(nodes, start_time, duration, stagnation_timeout):
curr_time = time.time()
if duration is not None and duration < (curr_time - start_time):
return True
stagnated_nodes = []
for node in nodes:
(height, last_update) = node.check_height()
print(f"Node: {node.validator_id}, height: {height}")
if height is not None and (curr_time - last_update) > stagnation_timeout:
stagnated_nodes.append(node.validator_id)
if stagnated_nodes:
print(f"Nodes {stagnated_nodes} have stagnated. Exiting simulation.")
return True
return False

def run_parallel_commands(commands, duration):
processes = []
for command in commands:
process = subprocess.Popen(command, shell=True, preexec_fn=os.setsid)
processes.append(process)

def run_simulation(nodes, duration, stagnation_timeout):
for node in nodes:
node.start()

start_time = time.time()
try:
time.sleep(duration)
while True:
time.sleep(MONITORING_PERIOD)
print(f"\nTime elapsed: {time.time() - start_time}s")
should_exit = monitor_simulation(nodes, start_time, duration, stagnation_timeout)
if should_exit:
break
except KeyboardInterrupt:
print("\nCtrl+C pressed: Terminating subprocesses...")
print("\nTerminating subprocesses...")
finally:
for process in processes:
os.killpg(os.getpgid(process.pid), signal.SIGINT)
process.wait()
for node in nodes:
node.stop()


def build_node(base_layer_node_url, temp_dir, num_validators, i):
is_bootstrap = i == 1
tcp_port = BOOTNODE_TCP_PORT if is_bootstrap else find_free_port()
monitoring_gateway_server_port = find_free_port()

def peernode_command(base_layer_node_url, temp_dir, num_validators, i):
return (
cmd = (
f"RUST_LOG=papyrus_consensus=debug,papyrus=info "
f"target/release/papyrus_node --network.#is_none false "
f"--base_layer.node_url {base_layer_node_url} "
f"--storage.db_config.path_prefix {temp_dir}/data{i} "
f"--consensus.#is_none false --consensus.validator_id 0x{i} "
f"--consensus.num_validators {num_validators} "
f"--network.tcp_port {find_free_port()} "
f"--network.tcp_port {tcp_port} "
f"--rpc.server_address 127.0.0.1:{find_free_port()} "
f"--monitoring_gateway.server_address 127.0.0.1:{find_free_port()} "
f"--network.bootstrap_peer_multiaddr.#is_none false "
f"--network.bootstrap_peer_multiaddr /ip4/127.0.0.1/tcp/10000/p2p/{BOOT_NODE_PEER_ID} "
# Use sed to strip special formatting characters
f"| sed -r 's/\\x1B\\[[0-9;]*[mK]//g' > {temp_dir}/validator{i}.txt"
f"--monitoring_gateway.server_address 127.0.0.1:{monitoring_gateway_server_port} "
f"--collect_metrics true "
)

if is_bootstrap:
cmd += (
f"--network.secret_key {SECRET_KEY} "
+ f"| sed -r 's/\\x1B\\[[0-9;]*[mK]//g' > {temp_dir}/validator{i}.txt"
)

else:
cmd += (
f"--network.bootstrap_peer_multiaddr.#is_none false "
f"--network.bootstrap_peer_multiaddr /ip4/127.0.0.1/tcp/{BOOTNODE_TCP_PORT}/p2p/{BOOT_NODE_PEER_ID} "
+ f"| sed -r 's/\\x1B\\[[0-9;]*[mK]//g' > {temp_dir}/validator{i}.txt"
)

return Node(
validator_id=i,
monitoring_gateway_server_port=monitoring_gateway_server_port,
cmd=cmd,
)


def build_all_nodes(base_layer_node_url, temp_dir, num_validators):
# Validators are started in a specific order to ensure proper network formation:
# 1. The bootnode (validator 1) is started first for network peering.
# 2. Validators 2+ are started next to join the network through the bootnode.
# 3. Validator 0, which is the proposer, is started last so the validators don't miss the proposals.
nodes = []

def main(base_layer_node_url, num_validators, duration):
nodes.append(build_node(base_layer_node_url, temp_dir, num_validators, 1)) # Bootstrap

for i in range(2, num_validators):
nodes.append(build_node(base_layer_node_url, temp_dir, num_validators, i))

nodes.append(build_node(base_layer_node_url, temp_dir, num_validators, 0)) # Proposer

return nodes


def main(base_layer_node_url, num_validators, stagnation_threshold, duration):
assert num_validators >= 2, "At least 2 validators are required for the simulation."
# Building the Papyrus Node package assuming its output will be located in the papyrus target directory.
print("Running cargo build...")
run_command("cargo build --release --package papyrus_node")
subprocess.run("cargo build --release --package papyrus_node", shell=True, check=True)

temp_dir = tempfile.mkdtemp()
print(f"Output files will be stored in: {temp_dir}")
Expand All @@ -73,45 +163,27 @@ def main(base_layer_node_url, num_validators, duration):
data_dir = os.path.join(temp_dir, f"data{i}")
os.makedirs(data_dir)

# Validators are started in a specific order to ensure proper network formation:
# 1. The bootnode (validator 1) is started first for network peering.
# 2. Validators 2+ are started next to join the network through the bootnode.
# 3. Validator 0, which is the proposer, is started last so the validators don't miss the proposals.

validator_commands = []
# Ensure validator 1 runs first
bootnode_command = (
f"RUST_LOG=papyrus_consensus=debug,papyrus=info "
f"target/release/papyrus_node --network.#is_none false "
f"--base_layer.node_url {base_layer_node_url} "
f"--network.secret_key {SECRET_KEY} "
f"--storage.db_config.path_prefix {temp_dir}/data1 "
f"--consensus.#is_none false --consensus.validator_id 0x1 "
f"--consensus.num_validators {num_validators} "
# Use sed to strip special formatting characters
f"| sed -r 's/\\x1B\\[[0-9;]*[mK]//g' > {temp_dir}/validator1.txt"
)
validator_commands.append(bootnode_command)

# Add other validators
validator_commands.extend(
peernode_command(base_layer_node_url, temp_dir, num_validators, i)
for i in range(2, num_validators)
)
# Ensure validator 0 runs last
validator_commands.append(peernode_command(base_layer_node_url, temp_dir, num_validators, 0))
nodes = build_all_nodes(base_layer_node_url, temp_dir, num_validators)

# Run validator commands in parallel and manage duration time
print("Running validators...")
run_parallel_commands(validator_commands, duration)
run_simulation(nodes, duration, stagnation_threshold)
print(f"Output files were stored in: {temp_dir}")
print("Simulation complete.")


if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run Papyrus Node simulation.")
parser.add_argument("--base_layer_node_url", required=True)
parser.add_argument("--num_validators", type=int, required=True)
parser.add_argument("--duration", type=int, required=True)
parser.add_argument(
"--stagnation_threshold",
type=int,
required=False,
default=60,
help="Time in seconds to check for height stagnation.",
)
parser.add_argument("--duration", type=int, required=False, default=None)

args = parser.parse_args()
main(args.base_layer_node_url, args.num_validators, args.duration)
main(args.base_layer_node_url, args.num_validators, args.stagnation_threshold, args.duration)
4 changes: 3 additions & 1 deletion crates/sequencing/papyrus_consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ where
ProposalWrapper:
Into<(ProposalInit, mpsc::Receiver<BlockT::ProposalChunk>, oneshot::Receiver<BlockHash>)>,
{
let mut shc = SingleHeightConsensus::new(height, context, validator_id).await;
let validators = context.validators(height).await;
let mut shc =
SingleHeightConsensus::new(Arc::clone(&context), height, validator_id, validators);

if let Some(decision) = shc.start().await? {
return Ok(decision);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ pub(crate) struct SingleHeightConsensus<BlockT: ConsensusBlock> {
}

impl<BlockT: ConsensusBlock> SingleHeightConsensus<BlockT> {
pub(crate) async fn new(
height: BlockNumber,
pub(crate) fn new(
context: Arc<dyn ConsensusContext<Block = BlockT>>,
height: BlockNumber,
id: ValidatorId,
validators: Vec<ValidatorId>,
) -> Self {
let validators = context.validators(height).await;
// TODO(matan): Use actual weights, not just `len`.
let state_machine = StateMachine::new(validators.len() as u32);
Self {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@ async fn proposer() {
let block = TestBlock { content: vec![1, 2, 3], id: BlockHash(Felt::ONE) };
let block_id = block.id();
// Set expectations for how the test should run:
context
.expect_validators()
.returning(move |_| vec![node_id, 2_u32.into(), 3_u32.into(), 4_u32.into()]);
context.expect_proposer().returning(move |_, _| node_id);
let block_clone = block.clone();
context.expect_build_proposal().returning(move |_| {
Expand Down Expand Up @@ -56,7 +53,12 @@ async fn proposer() {
.withf(move |msg: &ConsensusMessage| msg == &precommit(block_id, 0, node_id))
.returning(move |_| Ok(()));

let mut shc = SingleHeightConsensus::new(BlockNumber(0), Arc::new(context), node_id).await;
let mut shc = SingleHeightConsensus::new(
Arc::new(context),
BlockNumber(0),
node_id,
vec![node_id, 2_u32.into(), 3_u32.into(), 4_u32.into()],
);

// Sends proposal and prevote.
assert!(matches!(shc.start().await, Ok(None)));
Expand Down Expand Up @@ -96,9 +98,6 @@ async fn validator() {
let block_id = block.id();

// Set expectations for how the test should run:
context
.expect_validators()
.returning(move |_| vec![node_id, proposer, 3_u32.into(), 4_u32.into()]);
context.expect_proposer().returning(move |_, _| proposer);
let block_clone = block.clone();
context.expect_validate_proposal().returning(move |_, _| {
Expand All @@ -116,7 +115,12 @@ async fn validator() {
.returning(move |_| Ok(()));

// Creation calls to `context.validators`.
let mut shc = SingleHeightConsensus::new(BlockNumber(0), Arc::new(context), node_id).await;
let mut shc = SingleHeightConsensus::new(
Arc::new(context),
BlockNumber(0),
node_id,
vec![node_id, proposer, 3_u32.into(), 4_u32.into()],
);

// Send the proposal from the peer.
let (fin_sender, fin_receiver) = oneshot::channel();
Expand Down

0 comments on commit a1103b3

Please sign in to comment.