Skip to content

Commit

Permalink
feat: Revamp bootstrap process (#75)
Browse files Browse the repository at this point in the history
  • Loading branch information
jopemachine authored Feb 1, 2024
1 parent 9e8e766 commit 4e533fb
Show file tree
Hide file tree
Showing 27 changed files with 350 additions and 720 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@ jobs:
toolchain: stable
override: true

- name: Lint Rust codes
run: |
cargo clippy
- name: Build and Check Rust unit tests and harness tests all pass
run: |
make build
Expand All @@ -50,3 +46,7 @@ jobs:
pip3 install -r requirements.txt
make test
cd ../../
- name: Lint Rust codes
run: |
cargo clippy
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@
resolver = "2"
members = [
"raftify",
"harness",
"examples",
"examples/memstore",
"examples/memstore/dynamic-members",
"examples/memstore/static-members",
"harness"
]
default-members = [
"raftify",
"harness",
"examples",
"examples/memstore",
"examples/memstore/dynamic-members",
"examples/memstore/static-members",
"harness"
]

[workspace.package]
Expand Down
5 changes: 0 additions & 5 deletions binding/python/src/bindings/raft_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,6 @@ impl PyRaftNode {
future_into_py(py, async move { Ok(raft_node.get_cluster_size().await) })
}

pub fn set_bootstrap_done<'a>(&'a mut self, py: Python<'a>) -> PyResult<&'a PyAny> {
let raft_node = self.inner.clone();
future_into_py(py, async move { Ok(raft_node.set_bootstrap_done().await) })
}

pub fn store<'a>(&'a mut self, py: Python<'a>) -> PyResult<&'a PyAny> {
let raft_node = self.inner.clone();
future_into_py(py, async move { Ok(raft_node.store().await) })
Expand Down
6 changes: 3 additions & 3 deletions examples/memstore/dynamic-members/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,12 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
.unwrap();
let node_id = ticket.reserved_id;

let raft = Raft::new_follower(
let raft = Raft::bootstrap(
node_id,
options.raft_addr,
store.clone(),
cfg.clone(),
None,
Some(ticket.peers.clone().into()),
logger.clone(),
)?;

Expand All @@ -83,7 +83,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
}
None => {
log::info!("Bootstrap a Raft Cluster");
let raft = Raft::bootstrap_cluster(
let raft = Raft::bootstrap(
1,
options.raft_addr,
store.clone(),
Expand Down
6 changes: 5 additions & 1 deletion examples/memstore/src/web_server_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ async fn put(data: web::Data<(HashStore, Raft)>, path: web::Path<(u64, String)>)
key: path.0,
value: path.1.clone(),
};
data.1.raft_node.propose(log_entry.encode().unwrap()).await;
data.1
.raft_node
.propose(log_entry.encode().unwrap())
.await
.unwrap();

"OK".to_string()
}
Expand Down
58 changes: 12 additions & 46 deletions examples/memstore/static-members/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,13 @@ struct Options {
#[structopt(long)]
raft_addr: String,
#[structopt(long)]
peer_addr: Option<String>,
#[structopt(long)]
web_server: Option<String>,
#[structopt(long)]
restore_wal_from: Option<u64>,
#[structopt(long)]
restore_wal_snapshot_from: Option<u64>,
}

fn validate_options(options: Options) -> Options {
if options.peer_addr.is_some() && options.restore_wal_from.is_some() {
panic!("Cannot restore WAL from follower node");
} else if options.peer_addr.is_some() && options.restore_wal_snapshot_from.is_some() {
panic!("Follower node should receive snapshot from leader, not restoring it from storage");
} else {
options
}
}

#[actix_rt::main]
async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
let decorator = slog_term::TermDecorator::new().build();
Expand All @@ -66,7 +54,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {

set_custom_formatter(CustomFormatter::<LogEntry, HashStore>::new());

let options = validate_options(Options::from_args());
let options = Options::from_args();
let store = HashStore::new();
let initial_peers = load_peers().await?;

Expand All @@ -78,38 +66,16 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
.get_node_id_by_addr(options.raft_addr.clone())
.unwrap();

let (raft, raft_handle) = match options.peer_addr {
Some(peer_addr) => {
log::info!("Running in Follower mode");

let raft = Raft::new_follower(
node_id,
options.raft_addr,
store.clone(),
cfg,
Some(initial_peers.clone()),
logger.clone(),
)?;

let handle = tokio::spawn(raft.clone().run());
Raft::member_bootstrap_ready(peer_addr, node_id, logger).await?;

(raft, handle)
}
None => {
log::info!("Node {} bootstrapped a raft cluster", node_id);
let raft = Raft::bootstrap_cluster(
node_id,
options.raft_addr,
store.clone(),
cfg,
Some(initial_peers),
logger.clone(),
)?;
let handle = tokio::spawn(raft.clone().run());
(raft, handle)
}
};
let raft = Raft::bootstrap(
node_id,
options.raft_addr,
store.clone(),
cfg.clone(),
Some(initial_peers.clone()),
logger.clone(),
)?;

let handle = tokio::spawn(raft.clone().run());

if let Some(addr) = options.web_server {
let _web_server = tokio::spawn(
Expand All @@ -130,7 +96,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
);
}

let result = tokio::try_join!(raft_handle)?;
let result = tokio::try_join!(handle)?;
result.0?;
Ok(())
}
1 change: 0 additions & 1 deletion harness/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ raftify.workspace = true

actix-rt = "2.0"
actix-web = "4.0.0"
async-trait = "0.1.48"
bincode = "1.3"
log = { version = "0.4", features = ["std"] }
serde = { version = "1.0", features = ["derive"] }
Expand Down
1 change: 1 addition & 0 deletions harness/src/constant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub const WEB_SERVER_ADDRS: [&str; 5] = [
"127.0.0.1:60085",
];

pub const ZERO_NODE_EXAMPLE: &str = "0-node-example.toml";
pub const ONE_NODE_EXAMPLE: &str = "1-node-example.toml";
pub const THREE_NODE_EXAMPLE: &str = "3-node-example.toml";
pub const FIVE_NODE_EXAMPLE: &str = "5-node-example.toml";
2 changes: 1 addition & 1 deletion harness/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pub mod config;
pub mod constant;
pub mod raft_server;
pub mod raft;
pub mod state_machine;
pub mod utils;
76 changes: 32 additions & 44 deletions harness/src/raft_server.rs → harness/src/raft.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
use futures::future;
use once_cell::sync::Lazy;
use raftify::{
raft::{
formatter::set_custom_formatter,
logger::{Logger, Slogger},
},
raft::{formatter::set_custom_formatter, logger::Slogger},
CustomFormatter, Peers, Raft as Raft_, Result,
};
use std::{
Expand All @@ -23,34 +20,28 @@ pub type Raft = Raft_<LogEntry, HashStore>;

pub static RAFTS: Lazy<Mutex<HashMap<u64, Raft>>> = Lazy::new(|| Mutex::new(HashMap::new()));

fn run_raft(node_id: &u64, peers: Peers) -> Result<JoinHandle<Result<()>>> {
fn run_raft(node_id: &u64, peers: Peers, should_be_leader: bool) -> Result<JoinHandle<Result<()>>> {
let peer = peers.get(node_id).unwrap();
let cfg = build_config();
let store = HashStore::new();
let logger = build_logger();

let raft = match node_id {
1 => Raft::bootstrap_cluster(
1,
peer.addr,
store,
cfg,
Some(peers.clone()),
Arc::new(Slogger {
slog: logger.clone(),
}),
),
_ => Raft::new_follower(
*node_id,
peer.addr,
store,
cfg,
Some(peers.clone()),
Arc::new(Slogger {
slog: logger.clone(),
}),
),
}
let peers = if should_be_leader {
None
} else {
Some(peers.clone())
};

let raft = Raft::bootstrap(
*node_id,
peer.addr,
store,
cfg,
peers,
Arc::new(Slogger {
slog: logger.clone(),
}),
)
.expect("Raft build failed!");

RAFTS.lock().unwrap().insert(*node_id, raft.clone());
Expand All @@ -60,13 +51,14 @@ fn run_raft(node_id: &u64, peers: Peers) -> Result<JoinHandle<Result<()>>> {
Ok(raft_handle)
}

pub async fn run_rafts(peers: Peers) -> Result<()> {
pub async fn build_raft_cluster(peers: Peers) -> Result<()> {
set_custom_formatter(CustomFormatter::<LogEntry, HashStore>::new());

let mut raft_handles = vec![];
let should_be_leader = peers.len() <= 1;

for (node_id, _) in peers.iter() {
let raft_handle = run_raft(&node_id, peers.clone())?;
let raft_handle = run_raft(&node_id, peers.clone(), should_be_leader)?;
raft_handles.push(raft_handle);
println!("Node {} starting...", node_id);
}
Expand All @@ -87,19 +79,8 @@ pub async fn run_rafts(peers: Peers) -> Result<()> {
Ok(())
}

pub async fn handle_bootstrap(peers: Peers, logger: Arc<dyn Logger>) -> Result<()> {
let leader_addr = peers.get(&1).unwrap().addr;

for (node_id, _) in peers.iter() {
if node_id != 1 {
Raft::member_bootstrap_ready(leader_addr, node_id, logger.clone()).await?;
}
}

Ok(())
}

pub async fn spawn_extra_node(peer_addr: &str, raft_addr: &str) -> Result<JoinHandle<Result<()>>> {
// Note that this function lock RAFTS, so it should not be called while holding RAFTS lock.
pub async fn spawn_extra_node(raft_addr: &str, peer_addr: &str) -> Result<JoinHandle<Result<()>>> {
let logger = Arc::new(Slogger {
slog: build_logger(),
});
Expand All @@ -111,8 +92,15 @@ pub async fn spawn_extra_node(peer_addr: &str, raft_addr: &str) -> Result<JoinHa
let cfg = build_config();
let store = HashStore::new();

let raft = Raft::new_follower(node_id, raft_addr, store, cfg, None, logger)
.expect("Raft build failed!");
let raft = Raft::bootstrap(
node_id,
raft_addr,
store,
cfg,
Some(join_ticket.peers.clone().into()),
logger,
)
.expect("Raft build failed!");

RAFTS.lock().unwrap().insert(node_id, raft.clone());

Expand Down
3 changes: 1 addition & 2 deletions harness/src/state_machine.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use async_trait::async_trait;
use bincode::{deserialize, serialize};
use raftify::{AbstractLogEntry, AbstractStateMachine, Result};
use raftify::{async_trait, AbstractLogEntry, AbstractStateMachine, Result};
use serde::{Deserialize, Serialize};
use std::{
collections::HashMap,
Expand Down
20 changes: 15 additions & 5 deletions harness/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{fs, time::Duration};
use tokio::time::sleep;
use toml;

use crate::raft_server::Raft;
use crate::{constant::ZERO_NODE_EXAMPLE, raft::Raft};

pub fn build_logger() -> slog::Logger {
let decorator = slog_term::TermDecorator::new().build();
Expand Down Expand Up @@ -43,8 +43,12 @@ pub struct TomlInnerRaftConfig {
pub peers: Vec<TomlRaftPeer>,
}

pub async fn load_peers(filename: &str) -> Result<Peers, Box<dyn std::error::Error>> {
let path = Path::new("fixtures").join(filename);
pub async fn load_peers(example_filename: &str) -> Result<Peers, Box<dyn std::error::Error>> {
if example_filename == ZERO_NODE_EXAMPLE {
return Ok(Peers::with_empty());
}

let path = Path::new("fixtures").join(example_filename);
let config_str = fs::read_to_string(path)?;

let raft_config: TomlRaftConfig = toml::from_str(&config_str)?;
Expand All @@ -60,7 +64,10 @@ pub async fn load_peers(filename: &str) -> Result<Peers, Box<dyn std::error::Err
}

pub async fn wait_for_until_cluster_size_increase(raft: Raft, target: usize) {
println!("Waiting for cluster size to increase to {}...", target);
raft.logger.debug(&format!(
"Waiting for cluster size to increase to... {}",
target
));

loop {
let size = raft.cluster_size().await;
Expand All @@ -75,7 +82,10 @@ pub async fn wait_for_until_cluster_size_increase(raft: Raft, target: usize) {
}

pub async fn wait_for_until_cluster_size_decrease(raft: Raft, target: usize) {
println!("Waiting for cluster size to decrease to {}...", target);
raft.logger.debug(&format!(
"Waiting for cluster size to decrease to {}...",
target
));

loop {
let size = raft.cluster_size().await;
Expand Down
Loading

0 comments on commit 4e533fb

Please sign in to comment.