diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index f8a8e541..745ae7a4 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -26,10 +26,6 @@ jobs: toolchain: stable override: true - - name: Lint Rust codes - run: | - cargo clippy - - name: Build and Check Rust unit tests and harness tests all pass run: | make build @@ -50,3 +46,7 @@ jobs: pip3 install -r requirements.txt make test cd ../../ + + - name: Lint Rust codes + run: | + cargo clippy diff --git a/Cargo.lock b/Cargo.lock index 25f34eca..66af53c2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -920,7 +920,6 @@ version = "0.1.42" dependencies = [ "actix-rt", "actix-web", - "async-trait", "bincode", "futures", "log", diff --git a/Cargo.toml b/Cargo.toml index be9f77e3..31d18d90 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,19 +2,19 @@ resolver = "2" members = [ "raftify", + "harness", "examples", "examples/memstore", "examples/memstore/dynamic-members", "examples/memstore/static-members", - "harness" ] default-members = [ "raftify", + "harness", "examples", "examples/memstore", "examples/memstore/dynamic-members", "examples/memstore/static-members", - "harness" ] [workspace.package] diff --git a/binding/python/src/bindings/raft_node.rs b/binding/python/src/bindings/raft_node.rs index 916e161f..a4e3d43b 100644 --- a/binding/python/src/bindings/raft_node.rs +++ b/binding/python/src/bindings/raft_node.rs @@ -121,11 +121,6 @@ impl PyRaftNode { future_into_py(py, async move { Ok(raft_node.get_cluster_size().await) }) } - pub fn set_bootstrap_done<'a>(&'a mut self, py: Python<'a>) -> PyResult<&'a PyAny> { - let raft_node = self.inner.clone(); - future_into_py(py, async move { Ok(raft_node.set_bootstrap_done().await) }) - } - pub fn store<'a>(&'a mut self, py: Python<'a>) -> PyResult<&'a PyAny> { let raft_node = self.inner.clone(); future_into_py(py, async move { Ok(raft_node.store().await) }) diff --git a/examples/memstore/dynamic-members/src/main.rs b/examples/memstore/dynamic-members/src/main.rs index 39e98631..621b5ceb 100644 --- a/examples/memstore/dynamic-members/src/main.rs +++ b/examples/memstore/dynamic-members/src/main.rs @@ -66,12 +66,12 @@ async fn main() -> std::result::Result<(), Box> { .unwrap(); let node_id = ticket.reserved_id; - let raft = Raft::new_follower( + let raft = Raft::bootstrap( node_id, options.raft_addr, store.clone(), cfg.clone(), - None, + Some(ticket.peers.clone().into()), logger.clone(), )?; @@ -83,7 +83,7 @@ async fn main() -> std::result::Result<(), Box> { } None => { log::info!("Bootstrap a Raft Cluster"); - let raft = Raft::bootstrap_cluster( + let raft = Raft::bootstrap( 1, options.raft_addr, store.clone(), diff --git a/examples/memstore/src/web_server_api.rs b/examples/memstore/src/web_server_api.rs index 8a830772..390917d4 100644 --- a/examples/memstore/src/web_server_api.rs +++ b/examples/memstore/src/web_server_api.rs @@ -14,7 +14,11 @@ async fn put(data: web::Data<(HashStore, Raft)>, path: web::Path<(u64, String)>) key: path.0, value: path.1.clone(), }; - data.1.raft_node.propose(log_entry.encode().unwrap()).await; + data.1 + .raft_node + .propose(log_entry.encode().unwrap()) + .await + .unwrap(); "OK".to_string() } diff --git a/examples/memstore/static-members/src/main.rs b/examples/memstore/static-members/src/main.rs index 922668a0..4f9d9b21 100644 --- a/examples/memstore/static-members/src/main.rs +++ b/examples/memstore/static-members/src/main.rs @@ -27,8 +27,6 @@ struct Options { #[structopt(long)] raft_addr: String, #[structopt(long)] - peer_addr: Option, - #[structopt(long)] web_server: Option, #[structopt(long)] restore_wal_from: Option, @@ -36,16 +34,6 @@ struct Options { restore_wal_snapshot_from: Option, } -fn validate_options(options: Options) -> Options { - if options.peer_addr.is_some() && options.restore_wal_from.is_some() { - panic!("Cannot restore WAL from follower node"); - } else if options.peer_addr.is_some() && options.restore_wal_snapshot_from.is_some() { - panic!("Follower node should receive snapshot from leader, not restoring it from storage"); - } else { - options - } -} - #[actix_rt::main] async fn main() -> std::result::Result<(), Box> { let decorator = slog_term::TermDecorator::new().build(); @@ -66,7 +54,7 @@ async fn main() -> std::result::Result<(), Box> { set_custom_formatter(CustomFormatter::::new()); - let options = validate_options(Options::from_args()); + let options = Options::from_args(); let store = HashStore::new(); let initial_peers = load_peers().await?; @@ -78,38 +66,16 @@ async fn main() -> std::result::Result<(), Box> { .get_node_id_by_addr(options.raft_addr.clone()) .unwrap(); - let (raft, raft_handle) = match options.peer_addr { - Some(peer_addr) => { - log::info!("Running in Follower mode"); - - let raft = Raft::new_follower( - node_id, - options.raft_addr, - store.clone(), - cfg, - Some(initial_peers.clone()), - logger.clone(), - )?; - - let handle = tokio::spawn(raft.clone().run()); - Raft::member_bootstrap_ready(peer_addr, node_id, logger).await?; - - (raft, handle) - } - None => { - log::info!("Node {} bootstrapped a raft cluster", node_id); - let raft = Raft::bootstrap_cluster( - node_id, - options.raft_addr, - store.clone(), - cfg, - Some(initial_peers), - logger.clone(), - )?; - let handle = tokio::spawn(raft.clone().run()); - (raft, handle) - } - }; + let raft = Raft::bootstrap( + node_id, + options.raft_addr, + store.clone(), + cfg.clone(), + Some(initial_peers.clone()), + logger.clone(), + )?; + + let handle = tokio::spawn(raft.clone().run()); if let Some(addr) = options.web_server { let _web_server = tokio::spawn( @@ -130,7 +96,7 @@ async fn main() -> std::result::Result<(), Box> { ); } - let result = tokio::try_join!(raft_handle)?; + let result = tokio::try_join!(handle)?; result.0?; Ok(()) } diff --git a/harness/Cargo.toml b/harness/Cargo.toml index f6f0d462..e5af68fa 100644 --- a/harness/Cargo.toml +++ b/harness/Cargo.toml @@ -13,7 +13,6 @@ raftify.workspace = true actix-rt = "2.0" actix-web = "4.0.0" -async-trait = "0.1.48" bincode = "1.3" log = { version = "0.4", features = ["std"] } serde = { version = "1.0", features = ["derive"] } diff --git a/harness/src/constant.rs b/harness/src/constant.rs index a63288d7..1a156ed4 100644 --- a/harness/src/constant.rs +++ b/harness/src/constant.rs @@ -16,6 +16,7 @@ pub const WEB_SERVER_ADDRS: [&str; 5] = [ "127.0.0.1:60085", ]; +pub const ZERO_NODE_EXAMPLE: &str = "0-node-example.toml"; pub const ONE_NODE_EXAMPLE: &str = "1-node-example.toml"; pub const THREE_NODE_EXAMPLE: &str = "3-node-example.toml"; pub const FIVE_NODE_EXAMPLE: &str = "5-node-example.toml"; diff --git a/harness/src/lib.rs b/harness/src/lib.rs index f1855de7..b86bb0d4 100644 --- a/harness/src/lib.rs +++ b/harness/src/lib.rs @@ -1,5 +1,5 @@ pub mod config; pub mod constant; -pub mod raft_server; +pub mod raft; pub mod state_machine; pub mod utils; diff --git a/harness/src/raft_server.rs b/harness/src/raft.rs similarity index 60% rename from harness/src/raft_server.rs rename to harness/src/raft.rs index 14966bbc..3993a262 100644 --- a/harness/src/raft_server.rs +++ b/harness/src/raft.rs @@ -1,10 +1,7 @@ use futures::future; use once_cell::sync::Lazy; use raftify::{ - raft::{ - formatter::set_custom_formatter, - logger::{Logger, Slogger}, - }, + raft::{formatter::set_custom_formatter, logger::Slogger}, CustomFormatter, Peers, Raft as Raft_, Result, }; use std::{ @@ -23,34 +20,28 @@ pub type Raft = Raft_; pub static RAFTS: Lazy>> = Lazy::new(|| Mutex::new(HashMap::new())); -fn run_raft(node_id: &u64, peers: Peers) -> Result>> { +fn run_raft(node_id: &u64, peers: Peers, should_be_leader: bool) -> Result>> { let peer = peers.get(node_id).unwrap(); let cfg = build_config(); let store = HashStore::new(); let logger = build_logger(); - let raft = match node_id { - 1 => Raft::bootstrap_cluster( - 1, - peer.addr, - store, - cfg, - Some(peers.clone()), - Arc::new(Slogger { - slog: logger.clone(), - }), - ), - _ => Raft::new_follower( - *node_id, - peer.addr, - store, - cfg, - Some(peers.clone()), - Arc::new(Slogger { - slog: logger.clone(), - }), - ), - } + let peers = if should_be_leader { + None + } else { + Some(peers.clone()) + }; + + let raft = Raft::bootstrap( + *node_id, + peer.addr, + store, + cfg, + peers, + Arc::new(Slogger { + slog: logger.clone(), + }), + ) .expect("Raft build failed!"); RAFTS.lock().unwrap().insert(*node_id, raft.clone()); @@ -60,13 +51,14 @@ fn run_raft(node_id: &u64, peers: Peers) -> Result>> { Ok(raft_handle) } -pub async fn run_rafts(peers: Peers) -> Result<()> { +pub async fn build_raft_cluster(peers: Peers) -> Result<()> { set_custom_formatter(CustomFormatter::::new()); let mut raft_handles = vec![]; + let should_be_leader = peers.len() <= 1; for (node_id, _) in peers.iter() { - let raft_handle = run_raft(&node_id, peers.clone())?; + let raft_handle = run_raft(&node_id, peers.clone(), should_be_leader)?; raft_handles.push(raft_handle); println!("Node {} starting...", node_id); } @@ -87,19 +79,8 @@ pub async fn run_rafts(peers: Peers) -> Result<()> { Ok(()) } -pub async fn handle_bootstrap(peers: Peers, logger: Arc) -> Result<()> { - let leader_addr = peers.get(&1).unwrap().addr; - - for (node_id, _) in peers.iter() { - if node_id != 1 { - Raft::member_bootstrap_ready(leader_addr, node_id, logger.clone()).await?; - } - } - - Ok(()) -} - -pub async fn spawn_extra_node(peer_addr: &str, raft_addr: &str) -> Result>> { +// Note that this function lock RAFTS, so it should not be called while holding RAFTS lock. +pub async fn spawn_extra_node(raft_addr: &str, peer_addr: &str) -> Result>> { let logger = Arc::new(Slogger { slog: build_logger(), }); @@ -111,8 +92,15 @@ pub async fn spawn_extra_node(peer_addr: &str, raft_addr: &str) -> Result slog::Logger { let decorator = slog_term::TermDecorator::new().build(); @@ -43,8 +43,12 @@ pub struct TomlInnerRaftConfig { pub peers: Vec, } -pub async fn load_peers(filename: &str) -> Result> { - let path = Path::new("fixtures").join(filename); +pub async fn load_peers(example_filename: &str) -> Result> { + if example_filename == ZERO_NODE_EXAMPLE { + return Ok(Peers::with_empty()); + } + + let path = Path::new("fixtures").join(example_filename); let config_str = fs::read_to_string(path)?; let raft_config: TomlRaftConfig = toml::from_str(&config_str)?; @@ -60,7 +64,10 @@ pub async fn load_peers(filename: &str) -> Result, logger: slog::Log let mut persisted_peers_configs: HashMap = HashMap::new(); - for (_, entry) in entries.iter().enumerate() { + for entry in entries.iter() { let conf_change_v2 = match entry.get_entry_type() { EntryType::EntryConfChange => to_confchange_v2(ConfChange::decode(entry.get_data())?), EntryType::EntryConfChangeV2 => ConfChangeV2::decode(entry.get_data())?, @@ -73,7 +73,7 @@ pub fn dump_peers(path: &str, peers: HashMap, logger: slog::Log for (k, v) in diffs_to_remove.into_iter() { let mut cs = ConfChangeSingle::default(); - cs.set_node_id(*k as u64); + cs.set_node_id(*k); cs.set_change_type(ConfChangeType::RemoveNode); conf_changes.push(cs); cc_addrs.push(*v); @@ -82,13 +82,13 @@ pub fn dump_peers(path: &str, peers: HashMap, logger: slog::Log // TODO: Support AddLearnerNode for (k, v) in diffs_to_add.into_iter() { let mut cs = ConfChangeSingle::default(); - cs.set_node_id(*k as u64); + cs.set_node_id(*k); cs.set_change_type(ConfChangeType::AddNode); conf_changes.push(cs); cc_addrs.push(*v); } - if conf_changes.len() >= 1 { + if !conf_changes.is_empty() { new_cc_v2.set_context(serialize(&cc_addrs)?); new_cc_v2.set_changes(conf_changes); diff --git a/raftify/src/peers.rs b/raftify/src/peers.rs index bec09c00..b9b03362 100644 --- a/raftify/src/peers.rs +++ b/raftify/src/peers.rs @@ -1,7 +1,11 @@ use serde::{Deserialize, Serialize}; -use std::{collections::HashMap, net::ToSocketAddrs}; +use std::{ + collections::HashMap, + net::{SocketAddr, ToSocketAddrs}, +}; use super::Peer; +use crate::error::Result; #[derive(Debug, Clone, Deserialize, Serialize)] pub struct Peers { @@ -14,6 +18,22 @@ impl Default for Peers { } } +impl From for HashMap { + fn from(peers: Peers) -> Self { + peers.inner.into_iter().map(|(k, v)| (k, v.addr)).collect() + } +} + +impl From> for Peers { + fn from(map: HashMap) -> Self { + let inner = map + .into_iter() + .map(|(k, addr)| (k, Peer { addr, client: None })) + .collect(); + Peers { inner } + } +} + impl Peers { pub fn new(self_id: u64, self_addr: A) -> Self { let mut inner = HashMap::new(); @@ -68,6 +88,11 @@ impl Peers { self.inner.insert(id, peer); } + pub async fn connect(&mut self, id: u64) -> Result<()> { + let peer = self.get_mut(&id).unwrap(); + peer.connect().await + } + pub fn reserve_id(&mut self) -> u64 { match self.inner.keys().max() { Some(id) => id + 1, diff --git a/raftify/src/raft_facade.rs b/raftify/src/raft_facade.rs index 36cf8355..64e380d7 100644 --- a/raftify/src/raft_facade.rs +++ b/raftify/src/raft_facade.rs @@ -4,21 +4,18 @@ use std::{ collections::HashMap, net::{SocketAddr, ToSocketAddrs}, sync::Arc, - time::Duration, }; use tokio::{ signal, sync::{mpsc, oneshot}, - time::sleep, }; -use tonic::Request; use super::{ create_client, error::{Error, Result}, raft_node::RaftNode, raft_server::RaftServer, - raft_service::{self, MemberBootstrapReadyArgs, ResultCode}, + raft_service::{self, ResultCode}, request_message::ServerRequestMsg, AbstractLogEntry, AbstractStateMachine, Config, LogStore, Peers, }; @@ -44,7 +41,7 @@ pub struct ClusterJoinTicket { impl Raft { - pub fn bootstrap_cluster( + pub fn bootstrap( node_id: u64, raft_addr: A, fsm: FSM, @@ -53,62 +50,20 @@ impl, ) -> Result { let raft_addr = raft_addr.to_socket_addrs()?.next().unwrap(); - let initial_peers = initial_peers.unwrap_or(Peers::new(node_id, raft_addr)); + let should_be_leader = initial_peers.is_none(); + let peers = initial_peers.unwrap_or(Peers::new(node_id, raft_addr)); let (local_tx, local_rx) = mpsc::channel(100); let (server_tx, server_rx) = mpsc::channel(100); - let bootstrap_done = initial_peers.is_empty() || initial_peers.len() <= 1; - let raft_node = RaftNode::bootstrap_cluster( + let raft_node = RaftNode::bootstrap( node_id, + should_be_leader, fsm, config.clone(), - initial_peers, + peers, raft_addr, logger.clone(), - bootstrap_done, - server_rx, - server_tx.clone(), - local_rx, - local_tx.clone(), - )?; - - let raft_server = - RaftServer::new(server_tx.clone(), raft_addr, config.clone(), logger.clone()); - - Ok(Self { - raft_addr, - server_tx: server_tx.clone(), - raft_node, - raft_server, - config, - logger, - }) - } - - pub fn new_follower( - node_id: u64, - raft_addr: A, - fsm: FSM, - config: Config, - initial_peers: Option, - logger: Arc, - ) -> Result { - let raft_addr = raft_addr.to_socket_addrs()?.next().unwrap(); - let initial_peers = initial_peers.unwrap_or(Peers::new(node_id, raft_addr)); - - let (local_tx, local_rx) = mpsc::channel(100); - let (server_tx, server_rx) = mpsc::channel(100); - let bootstrap_done = initial_peers.is_empty() || initial_peers.len() <= 1; - - let raft_node = RaftNode::new_follower( - node_id, - fsm, - config.clone(), - initial_peers, - raft_addr, - logger.clone(), - bootstrap_done, server_rx, server_tx.clone(), local_rx, @@ -235,57 +190,6 @@ impl( - leader_addr: A, - node_id: u64, - logger: Arc, - ) -> Result<()> { - let mut ctrl_c = Box::pin(signal::ctrl_c()); - - let mut leader_client = loop { - tokio::select! { - res = create_client(&leader_addr) => { - match res { - Ok(client) => break client, - Err(e) => { - logger.error(&format!( - "Connection to the Leader node failed. Cause: \"{}\". Retry after 1s...", - e - )); - sleep(Duration::from_secs(1)).await; - } - } - } - _ = ctrl_c.as_mut() => { - logger.info("Ctrl+C signal detected. Shutting down..."); - return Err(Error::CtrlC) - } - } - }; - - let response = leader_client - .member_bootstrap_ready(Request::new(MemberBootstrapReadyArgs { node_id })) - .await? - .into_inner(); - - match response.code() { - ResultCode::Ok => { - logger.debug(&format!( - "Node {} send the bootstrap ready request successfully.", - node_id - )); - } - ResultCode::Error => { - logger.error("Failed to send the bootstrap ready request."); - } - ResultCode::WrongLeader => { - logger.error("Wrong leader address. Check leader changes while sending bootstrap ready request."); - } - } - - Ok(()) - } - pub async fn snapshot(&self) -> Result<()> { let storage = self.raft_node.storage().await; diff --git a/raftify/src/raft_node/bootstrap.rs b/raftify/src/raft_node/bootstrap.rs index 82e8ac22..6bd01b26 100644 --- a/raftify/src/raft_node/bootstrap.rs +++ b/raftify/src/raft_node/bootstrap.rs @@ -15,6 +15,8 @@ use crate::{ }; /// Commit the configuration change to add all follower nodes to the cluster. +#[deprecated] +#[allow(dead_code)] pub async fn bootstrap_peers( peers: Arc>, raw_node: &mut RawNode, diff --git a/raftify/src/raft_node/mod.rs b/raftify/src/raft_node/mod.rs index fd67bf79..dd665c80 100644 --- a/raftify/src/raft_node/mod.rs +++ b/raftify/src/raft_node/mod.rs @@ -25,6 +25,7 @@ use response_sender::ResponseSender; use utils::inspect_raftnode; use crate::{ + create_client, error::{Result, SendMessageError}, raft::{ eraftpb::{ @@ -34,8 +35,7 @@ use crate::{ formatter::{format_confchangev2, format_message}, raw_node::RawNode, }, - raft_node::bootstrap::bootstrap_peers, - raft_service::{self, ChangeConfigResultType, ResultCode}, + raft_service::{self, ChangeConfigResultType, ProposeArgs}, request_message::{LocalRequestMsg, SelfMessage, ServerRequestMsg}, response_message::{ ConfChangeResponseResult, LocalResponseMsg, RequestIdResponseResult, ResponseMessage, @@ -67,60 +67,27 @@ impl< > RaftNode { #[allow(clippy::too_many_arguments)] - pub fn bootstrap_cluster( + pub fn bootstrap( node_id: u64, + should_be_leader: bool, fsm: FSM, config: Config, initial_peers: Peers, raft_addr: SocketAddr, logger: Arc, - bootstrap_done: bool, server_rcv: mpsc::Receiver, server_snd: mpsc::Sender, local_rcv: mpsc::Receiver>, local_snd: mpsc::Sender>, ) -> Result { - RaftNodeCore::::bootstrap_cluster( + RaftNodeCore::::bootstrap( node_id, + should_be_leader, fsm, config, initial_peers, raft_addr, logger, - bootstrap_done, - server_rcv, - server_snd, - local_rcv, - local_snd.clone(), - ) - .map(|core| Self { - inner: Arc::new(OneShotMutex::new(core)), - local_sender: local_snd.clone(), - }) - } - - #[allow(clippy::too_many_arguments)] - pub fn new_follower( - node_id: u64, - fsm: FSM, - config: Config, - peers: Peers, - raft_addr: SocketAddr, - logger: Arc, - bootstrap_done: bool, - server_rcv: mpsc::Receiver, - server_snd: mpsc::Sender, - local_rcv: mpsc::Receiver>, - local_snd: mpsc::Sender>, - ) -> Result { - RaftNodeCore::::new_follower( - node_id, - fsm, - config, - peers, - raft_addr, - logger, - bootstrap_done, server_rcv, server_snd, local_rcv, @@ -259,70 +226,105 @@ impl< } } - pub async fn propose(&self, proposal: Vec) { + pub async fn propose(&self, proposal: Vec) -> Result<()> { let (tx, rx) = oneshot::channel(); self.local_sender - .send(LocalRequestMsg::Propose { proposal, chan: tx }) + .send(LocalRequestMsg::Propose { + proposal: proposal.clone(), + chan: tx, + }) .await .unwrap(); + let resp = rx.await.unwrap(); match resp { - LocalResponseMsg::Propose {} => (), + LocalResponseMsg::Propose { result } => match result { + ResponseResult::Success => (), + ResponseResult::Error(e) => return Err(e), + ResponseResult::WrongLeader { leader_addr, .. } => { + let mut client = create_client(leader_addr).await?; + client + .propose(Request::new(ProposeArgs { msg: proposal })) + .await?; + } + }, _ => unreachable!(), } + Ok(()) } - pub async fn get_cluster_size(&self) -> usize { + pub async fn change_config(&self, conf_change: ConfChangeV2) -> ConfChangeResponseResult { let (tx, rx) = oneshot::channel(); self.local_sender - .send(LocalRequestMsg::GetClusterSize { chan: tx }) + .send(LocalRequestMsg::ChangeConfig { + conf_change: conf_change.clone(), + chan: tx, + }) .await .unwrap(); + let resp = rx.await.unwrap(); match resp { - LocalResponseMsg::GetClusterSize { size } => size, + LocalResponseMsg::ConfigChange { result } => match result { + ConfChangeResponseResult::WrongLeader { leader_addr, .. } => { + let mut client = create_client(leader_addr).await.unwrap(); + let res = client.change_config(conf_change.clone()).await.unwrap(); + + let result = res.into_inner(); + + if result.result_type + == raft_service::ChangeConfigResultType::ChangeConfigSuccess as i32 + { + ConfChangeResponseResult::JoinSuccess { + assigned_id: result.assigned_id, + peers: deserialize(result.peers.as_slice()).unwrap(), + } + } else { + ConfChangeResponseResult::Error(Error::Unknown) + } + } + _ => result, + }, _ => unreachable!(), } } - pub async fn quit(&self) { + pub async fn get_cluster_size(&self) -> usize { let (tx, rx) = oneshot::channel(); self.local_sender - .send(LocalRequestMsg::Quit { chan: tx }) + .send(LocalRequestMsg::GetClusterSize { chan: tx }) .await .unwrap(); + let resp = rx.await.unwrap(); match resp { - LocalResponseMsg::Quit {} => (), + LocalResponseMsg::GetClusterSize { size } => size, _ => unreachable!(), } } - pub async fn leave(&self) { + pub async fn quit(&self) { let (tx, rx) = oneshot::channel(); self.local_sender - .send(LocalRequestMsg::Leave { chan: tx }) + .send(LocalRequestMsg::Quit { chan: tx }) .await .unwrap(); let resp = rx.await.unwrap(); match resp { - LocalResponseMsg::ChangeConfig { result: _result } => (), + LocalResponseMsg::Quit {} => (), _ => unreachable!(), } } - pub async fn change_config(&self, conf_change: ConfChangeV2) -> ConfChangeResponseResult { + pub async fn leave(&self) { let (tx, rx) = oneshot::channel(); self.local_sender - .send(LocalRequestMsg::ChangeConfig { - conf_change, - chan: tx, - }) + .send(LocalRequestMsg::Leave { chan: tx }) .await .unwrap(); let resp = rx.await.unwrap(); match resp { - LocalResponseMsg::ChangeConfig { result } => result, + LocalResponseMsg::ConfigChange { result: _result } => (), _ => unreachable!(), } } @@ -373,19 +375,6 @@ impl< } } - pub async fn set_bootstrap_done(&self) { - let (tx, rx) = oneshot::channel(); - self.local_sender - .send(LocalRequestMsg::SetBootstrapDone { chan: tx }) - .await - .unwrap(); - let resp = rx.await.unwrap(); - match resp { - LocalResponseMsg::SetBootstrapDone {} => (), - _ => unreachable!(), - } - } - pub async fn run(self) -> Result<()> { self.inner .lock() @@ -409,8 +398,6 @@ pub struct RaftNodeCore< should_exit: bool, last_snapshot_created: Instant, logger: Arc, - bootstrap_done: bool, - peers_bootstrap_ready: Option>, response_senders: HashMap>, server_rcv: mpsc::Receiver, @@ -431,14 +418,14 @@ impl< > RaftNodeCore { #[allow(clippy::too_many_arguments)] - pub fn bootstrap_cluster( + pub fn bootstrap( node_id: u64, + should_be_leader: bool, fsm: FSM, mut config: Config, initial_peers: Peers, raft_addr: SocketAddr, logger: Arc, - bootstrap_done: bool, server_rcv: mpsc::Receiver, server_snd: mpsc::Sender, local_rcv: mpsc::Receiver>, @@ -458,8 +445,10 @@ impl< let mut snapshot = storage.snapshot(0, storage.last_index()?)?; let conf_state = snapshot.mut_metadata().mut_conf_state(); + let voters = initial_peers.inner.clone().into_keys().collect::>(); + if conf_state.voters.is_empty() { - conf_state.set_voters(vec![node_id]); + conf_state.set_voters(voters); } match (config.restore_wal_from, config.restore_wal_snapshot_from) { @@ -494,113 +483,23 @@ impl< let response_seq = AtomicU64::new(0); let last_snapshot_created = Instant::now(); - raw_node.raft.become_candidate(); - raw_node.raft.become_leader(); - - let peers_bootstrap_ready = if !initial_peers.is_empty() { - Some(HashMap::from([(node_id, true)])) - } else { - None - }; - let (self_snd, self_rcv) = mpsc::channel(100); - Ok(RaftNodeCore { - raw_node, - fsm, - response_seq, - config, - raft_addr, - logger, - last_snapshot_created, - should_exit: false, - peers: Arc::new(Mutex::new(initial_peers)), - bootstrap_done, - peers_bootstrap_ready, - response_senders: HashMap::new(), - server_rcv, - server_snd, - local_rcv, - local_snd, - self_snd, - self_rcv, - _phantom_log_entry_typ: PhantomData, - }) - } - - #[allow(clippy::too_many_arguments)] - pub fn new_follower( - node_id: u64, - fsm: FSM, - mut config: Config, - peers: Peers, - raft_addr: SocketAddr, - logger: Arc, - bootstrap_done: bool, - server_rcv: mpsc::Receiver, - server_snd: mpsc::Sender, - local_rcv: mpsc::Receiver>, - local_snd: mpsc::Sender>, - ) -> Result { - config.raft_config.id = node_id; - config.validate()?; - - let storage_pth = get_storage_path(config.log_dir.as_str(), node_id); - - if let (None, None) = (config.restore_wal_from, config.restore_wal_snapshot_from) { - clear_storage_path(storage_pth.as_str())?; - ensure_directory_exist(storage_pth.as_str())?; - }; - - let storage = HeedStorage::create(storage_pth.as_str(), &config, logger.clone())?; - let mut snapshot = storage.snapshot(0, storage.last_index()?)?; - - match (config.restore_wal_from, config.restore_wal_snapshot_from) { - (Some(restore_wal_from), None) => { - if restore_wal_from != node_id { - std::fs::copy( - get_data_mdb_path(config.log_dir.as_str(), restore_wal_from), - get_data_mdb_path(config.log_dir.as_str(), node_id), - )?; - } - - let meta = snapshot.mut_metadata(); - meta.set_index(storage.entries_last_index()?); - } - (None, Some(_restore_wal_snapshot_from)) => { - // Follower doesn't need to restore snapshot because they will be sent by leader. - - // if restore_wal_snapshot_from != node_id { - // std::fs::copy( - // get_data_mdb_path(config.log_dir.as_str(), restore_wal_snapshot_from), - // get_data_mdb_path(config.log_dir.as_str(), node_id), - // )?; - // } - } - (Some(_), Some(_)) => { - unreachable!() - } - _ => {} + if should_be_leader { + raw_node.raft.become_candidate(); + raw_node.raft.become_leader(); } - let raw_node = RawNode::new(&config.raft_config, storage, logger.clone())?; - let response_seq = AtomicU64::new(0); - let last_snapshot_created = Instant::now(); - - let (self_snd, self_rcv) = mpsc::channel(100); - Ok(RaftNodeCore { raw_node, fsm, response_seq, config, raft_addr, - logger: logger.clone(), + logger, last_snapshot_created, should_exit: false, - peers: Arc::new(Mutex::new(peers)), - bootstrap_done, - peers_bootstrap_ready: None, + peers: Arc::new(Mutex::new(initial_peers)), response_senders: HashMap::new(), server_rcv, server_snd, @@ -628,18 +527,17 @@ impl< self.peers.lock().await.to_owned() } - pub async fn add_peer(&mut self, id: u64, addr: A) { - self.peers.lock().await.add_peer(id, addr) + pub async fn add_peer(&mut self, id: u64, addr: A) -> Result<()> { + let mut peers = self.peers.lock().await; + peers.add_peer(id, addr); + peers.connect(id).await } - pub async fn add_peers(&mut self, peers: HashMap) { + pub async fn add_peers(&mut self, peers: HashMap) -> Result<()> { for (id, peer_addr) in peers.iter() { - self.add_peer(id.to_owned(), *peer_addr).await; + self.add_peer(id.to_owned(), *peer_addr).await?; } - } - - pub fn set_bootstrap_done(&mut self) { - self.bootstrap_done = true; + Ok(()) } async fn send_message( @@ -656,7 +554,7 @@ impl< Some(peer) => { if peer.client.is_none() { if let Err(e) = peer.connect().await { - logger.trace(format!("Connection error: {:?}", e).as_str()); + logger.debug(format!("Connection error: {:?}", e).as_str()); ok = Err(SendMessageError::ConnectionError(node_id.to_string())); } } @@ -685,14 +583,6 @@ impl< } async fn send_messages(&mut self, messages: Vec) { - if !self.bootstrap_done { - self.logger.warn(format!( - "Skipping sending messages because bootstrap is not done yet. Skipped messages: {messages:?}", - messages=messages - ).as_str()); - return; - } - for message in messages { tokio::spawn(RaftNodeCore::::send_message( message, @@ -733,28 +623,22 @@ impl< let peer_addr = ticket.leader_addr; - loop { - let mut leader_client = - RaftServiceClient::connect(format!("http://{}", peer_addr)).await?; - let response = leader_client - .change_config(change.clone()) - .await? - .into_inner(); - - match response.result_type() { - ChangeConfigResultType::ChangeConfigWrongLeader => { - // TODO: Handle this - // response.data(); - continue; - } - ChangeConfigResultType::ChangeConfigSuccess => break Ok(()), - ChangeConfigResultType::ChangeConfigUnknownError => return Err(Error::JoinError), - ChangeConfigResultType::ChangeConfigRejected => { - return Err(Error::Rejected("Join request rejected".to_string())) - } - ChangeConfigResultType::ChangeConfigTimeoutError => { - return Err(Error::Timeout); - } + let mut leader_client = RaftServiceClient::connect(format!("http://{}", peer_addr)).await?; + let response = leader_client + .change_config(change.clone()) + .await? + .into_inner(); + + match response.result_type() { + ChangeConfigResultType::ChangeConfigSuccess => Ok(()), + ChangeConfigResultType::ChangeConfigUnknownError => Err(Error::JoinError), + ChangeConfigResultType::ChangeConfigRejected => { + Err(Error::Rejected("Join request rejected".to_string())) + } + ChangeConfigResultType::ChangeConfigTimeoutError => Err(Error::Timeout), + ChangeConfigResultType::ChangeConfigWrongLeader => { + // Should be handled in RaftServiceClient + unreachable!() } } } @@ -766,7 +650,11 @@ impl< if let Some(sender) = self.response_senders.remove(&response_seq) { match sender { ResponseSender::Local(sender) => { - sender.send(LocalResponseMsg::Propose {}).unwrap(); + sender + .send(LocalResponseMsg::Propose { + result: ResponseResult::Success, + }) + .unwrap(); } ResponseSender::Server(sender) => { sender @@ -890,7 +778,7 @@ impl< match sender { ResponseSender::Local(sender) => { sender - .send(LocalResponseMsg::ChangeConfig { result: response }) + .send(LocalResponseMsg::ConfigChange { result: response }) .unwrap(); } ResponseSender::Server(sender) => { @@ -908,90 +796,6 @@ impl< inspect_raftnode(&self.raw_node) } - pub async fn wait_for_initial_peers_bootstrap(&mut self) -> Result<()> { - if !self.is_leader() { - self.logger - .warn("Waiting for initial peers bootstrap is only allowed on leader node."); - return Ok(()); - } - - let peers_bootstrap_ready = self.peers_bootstrap_ready.clone().unwrap(); - - { - let peers = self.peers.lock().await; - if !(peers.len() == peers_bootstrap_ready.len() - && peers_bootstrap_ready.iter().all(|v| *v.1)) - { - self.logger - .trace("Waiting for all follower nodes to be ready to join the cluster..."); - - return Ok(()); - } - } - - self.logger.info( - "Received all follower nodes' join requests, preparing to bootstrap the cluster...", - ); - - self.bootstrap_peers().await?; - - let mut peers = self.peers.lock().await; - - for node_id in peers_bootstrap_ready.keys() { - if *node_id == self.raw_node.raft.id { - continue; - } - - let peer = peers - .get_mut(node_id) - .expect(&format!("Peer {} not found!", node_id)); - - // ??? - if let Err(err) = peer.connect().await { - self.logger - .error(&format!("Failed to connect to node {}: {}", node_id, err)); - return Err(err); - } - - let response = peer - .client - .as_mut() - .unwrap() - .cluster_bootstrap_ready(Request::new(raft_service::Empty {})) - .await? - .into_inner(); - - match response.code() { - ResultCode::Ok => {} - ResultCode::Error => { - self.logger - .error(&format!("Node {} failed to join the cluster.", node_id)); - } - ResultCode::WrongLeader => { - self.logger.error(&format!( - "Node {} failed to join the cluster because of wrong leader.", - node_id - )); - } - } - } - - Ok(()) - } - - /// Commit the configuration change to add all follower nodes to the cluster. - async fn bootstrap_peers(&mut self) -> Result<()> { - assert!(self.is_leader()); - - let last_term = self.raw_node.raft.raft_log.last_term(); - self.make_snapshot(self.raw_node.store().last_index()?, last_term) - .await?; - - bootstrap_peers(self.peers.clone(), &mut self.raw_node).await?; - self.bootstrap_done = true; - Ok(()) - } - async fn handle_propose_request( &mut self, proposal: Vec, @@ -1011,7 +815,13 @@ impl< .to_string(); let raft_response: ResponseMessage = match chan { - ResponseSender::Local(_) => LocalResponseMsg::Propose {}.into(), + ResponseSender::Local(_) => LocalResponseMsg::Propose { + result: ResponseResult::WrongLeader { + leader_id, + leader_addr, + }, + } + .into(), ResponseSender::Server(_) => ServerResponseMsg::Propose { result: ResponseResult::WrongLeader { leader_id, @@ -1058,17 +868,21 @@ impl< let peers = self.peers.lock().await; let leader_addr = peers.get(&leader_id).unwrap().addr.to_string(); - let result = ConfChangeResponseResult::WrongLeader { + let wrong_leader_result = ConfChangeResponseResult::WrongLeader { leader_id, leader_addr, }; match chan { ResponseSender::Local(chan) => chan - .send(LocalResponseMsg::ChangeConfig { result }) + .send(LocalResponseMsg::ConfigChange { + result: wrong_leader_result, + }) .unwrap(), ResponseSender::Server(chan) => chan - .send(ServerResponseMsg::ConfigChange { result }) + .send(ServerResponseMsg::ConfigChange { + result: wrong_leader_result, + }) .unwrap(), } } else { @@ -1126,11 +940,11 @@ impl< .unwrap(); } LocalRequestMsg::AddPeer { id, addr, chan } => { - self.add_peer(id, addr).await; + self.add_peer(id, addr).await?; chan.send(LocalResponseMsg::AddPeer {}).unwrap(); } LocalRequestMsg::AddPeers { peers, chan } => { - self.add_peers(peers).await; + self.add_peers(peers).await?; chan.send(LocalResponseMsg::AddPeers {}).unwrap(); } LocalRequestMsg::Store { chan } => { @@ -1197,10 +1011,6 @@ impl< self.raw_node.step(*message)?; chan.send(LocalResponseMsg::SendMessage {}).unwrap(); } - LocalRequestMsg::SetBootstrapDone { chan } => { - self.set_bootstrap_done(); - chan.send(LocalResponseMsg::SetBootstrapDone {}).unwrap(); - } } Ok(()) @@ -1218,28 +1028,6 @@ impl< pub async fn handle_server_request_msg(&mut self, message: ServerRequestMsg) -> Result<()> { match message { - ServerRequestMsg::ClusterBootstrapReady { chan } => { - self.logger.info(&format!("Node {} received the ClusterBootstrapReady message that all initial nodes's join requests collected. Start to bootstrap process...", self.get_id())); - chan.send(ServerResponseMsg::ClusterBootstrapReady { - result: ResponseResult::Success, - }) - .unwrap(); - self.bootstrap_done = true; - } - ServerRequestMsg::MemberBootstrapReady { node_id, chan } => { - assert!(self.is_leader()); - self.logger - .info(&format!("Node {} requested to join the cluster.", node_id)); - self.peers_bootstrap_ready - .as_mut() - .unwrap() - .insert(node_id, true); - - chan.send(ServerResponseMsg::MemberBootstrapReady { - result: ResponseResult::Success, - }) - .unwrap(); - } ServerRequestMsg::ChangeConfig { conf_change, chan } => { self.handle_confchange_request(conf_change, ResponseSender::Server(chan)) .await?; @@ -1259,7 +1047,6 @@ impl< } ServerRequestMsg::RequestId { raft_addr, chan } => { if !self.is_leader() { - // TODO: retry strategy in case of failure let leader_id = self.get_leader_id(); let peers = self.peers.lock().await; let leader_addr = peers.get(&leader_id).unwrap().addr.to_string(); @@ -1330,12 +1117,6 @@ impl< return Ok(()); } - if !self.bootstrap_done { - if self.is_leader() { - self.wait_for_initial_peers_bootstrap().await?; - } - } - tokio::select! { msg = timeout(fixed_tick_timer, self.self_rcv.recv()) => { if let Ok(Some(msg)) = msg { @@ -1358,10 +1139,7 @@ impl< now = Instant::now(); if elapsed > tick_timer { tick_timer = fixed_tick_timer; - - if self.bootstrap_done { - self.raw_node.tick(); - } + self.raw_node.tick(); } else { tick_timer -= elapsed; } diff --git a/raftify/src/raft_server.rs b/raftify/src/raft_server.rs index b580b650..f66dce68 100644 --- a/raftify/src/raft_server.rs +++ b/raftify/src/raft_server.rs @@ -25,7 +25,12 @@ use super::{ response_message::{RequestIdResponseResult, ServerResponseMsg}, Config, Error, }; -use crate::raft::eraftpb::{ConfChangeV2, Message as RaftMessage}; +use crate::{ + create_client, + raft::eraftpb::{ConfChangeV2, Message as RaftMessage}, + raft_service::ProposeArgs, + response_message::{ConfChangeResponseResult, ResponseResult}, +}; #[derive(Clone)] pub struct RaftServer { @@ -92,7 +97,7 @@ impl RaftService for RaftServer { let (tx, rx) = oneshot::channel(); sender .send(ServerRequestMsg::RequestId { - raft_addr: request_args.raft_addr, + raft_addr: request_args.raft_addr.clone(), chan: tx, }) .await @@ -111,18 +116,21 @@ impl RaftService for RaftServer { reserved_id, leader_addr: self.addr.to_string(), peers: serialize(&peers).unwrap(), + ..Default::default() })), - RequestIdResponseResult::WrongLeader { - leader_id, - leader_addr, - } => Ok(Response::new(raft_service::RequestIdResponse { - code: raft_service::ResultCode::WrongLeader as i32, - leader_id, - leader_addr, - reserved_id: 0, - peers: vec![], - })), - _ => unreachable!(), + RequestIdResponseResult::Error(e) => { + Ok(Response::new(raft_service::RequestIdResponse { + code: raft_service::ResultCode::Error as i32, + error: e.to_string().as_bytes().to_vec(), + ..Default::default() + })) + } + RequestIdResponseResult::WrongLeader { leader_addr, .. } => { + let mut client = create_client(leader_addr).await.unwrap(); + let reply = client.request_id(request_args).await?.into_inner(); + + Ok(Response::new(reply)) + } }, _ => unreachable!(), } @@ -137,7 +145,7 @@ impl RaftService for RaftServer { let (tx, rx) = oneshot::channel(); let message = ServerRequestMsg::ChangeConfig { - conf_change: request_args, + conf_change: request_args.clone(), chan: tx, }; @@ -155,16 +163,48 @@ impl RaftService for RaftServer { ) .await { - Ok(Ok(_raft_response)) => { + Ok(Ok(raft_response)) => { + match raft_response { + ServerResponseMsg::ConfigChange { result } => match result { + ConfChangeResponseResult::JoinSuccess { assigned_id, peers } => { + reply.result_type = + raft_service::ChangeConfigResultType::ChangeConfigSuccess as i32; + reply.assigned_id = assigned_id; + reply.peers = serialize(&peers).unwrap(); + } + ConfChangeResponseResult::RemoveSuccess {} => { + reply.result_type = + raft_service::ChangeConfigResultType::ChangeConfigSuccess as i32; + } + ConfChangeResponseResult::Error(e) => { + reply.result_type = + raft_service::ChangeConfigResultType::ChangeConfigUnknownError + as i32; + reply.error = e.to_string().as_bytes().to_vec(); + } + ConfChangeResponseResult::WrongLeader { leader_addr, .. } => { + reply.result_type = + raft_service::ChangeConfigResultType::ChangeConfigWrongLeader + as i32; + + let mut client = create_client(leader_addr).await.unwrap(); + reply = client.change_config(request_args).await?.into_inner(); + } + }, + _ => unreachable!(), + } reply.result_type = raft_service::ChangeConfigResultType::ChangeConfigSuccess as i32; - reply.data = vec![]; } - Ok(_) => (), - Err(_e) => { + Ok(Err(e)) => { + reply.result_type = + raft_service::ChangeConfigResultType::ChangeConfigUnknownError as i32; + reply.error = e.to_string().as_bytes().to_vec(); + } + Err(e) => { reply.result_type = raft_service::ChangeConfigResultType::ChangeConfigTimeoutError as i32; - reply.data = vec![]; + reply.error = e.to_string().as_bytes().to_vec(); self.logger.error("timeout waiting for reply"); } } @@ -201,7 +241,7 @@ impl RaftService for RaftServer { let (tx, rx) = oneshot::channel(); match sender .send(ServerRequestMsg::Propose { - proposal: request_args.msg, + proposal: request_args.msg.clone(), chan: tx, }) .await @@ -212,8 +252,22 @@ impl RaftService for RaftServer { let response = rx.await.unwrap(); match response { - ServerResponseMsg::Propose { result: _result } => { - Ok(Response::new(raft_service::Empty {})) + ServerResponseMsg::Propose { result } => { + match result { + ResponseResult::Success => Ok(Response::new(raft_service::Empty {})), + ResponseResult::Error(_) => Ok(Response::new(raft_service::Empty {})), + ResponseResult::WrongLeader { leader_addr, .. } => { + // TODO: Handle this kind of errors + let mut client = create_client(leader_addr).await.unwrap(); + let _ = client + .propose(ProposeArgs { + msg: request_args.msg, + }) + .await?; + + Ok(Response::new(raft_service::Empty {})) + } + } } _ => unreachable!(), } @@ -243,49 +297,6 @@ impl RaftService for RaftServer { } } - async fn member_bootstrap_ready( - &self, - request: Request, - ) -> Result, Status> { - let request_args = request.into_inner(); - let (tx, rx) = oneshot::channel(); - let sender = self.snd.clone(); - match sender - .send(ServerRequestMsg::MemberBootstrapReady { - node_id: request_args.node_id, - chan: tx, - }) - .await - { - Ok(_) => (), - Err(_) => self.print_send_error(function_name!()), - } - let _response = rx.await.unwrap(); - Ok(Response::new(raft_service::MemberBootstrapReadyResponse { - code: raft_service::ResultCode::Ok as i32, - })) - } - - async fn cluster_bootstrap_ready( - &self, - request: Request, - ) -> Result, Status> { - let _request_args = request.into_inner(); - let (tx, rx) = oneshot::channel(); - let sender = self.snd.clone(); - match sender - .send(ServerRequestMsg::ClusterBootstrapReady { chan: tx }) - .await - { - Ok(_) => (), - Err(_) => self.print_send_error(function_name!()), - } - let _response = rx.await.unwrap(); - Ok(Response::new(raft_service::ClusterBootstrapReadyResponse { - code: raft_service::ResultCode::Ok as i32, - })) - } - async fn get_peers( &self, request: Request, diff --git a/raftify/src/request_message.rs b/raftify/src/request_message.rs index 8cba721a..ec1c9b8b 100644 --- a/raftify/src/request_message.rs +++ b/raftify/src/request_message.rs @@ -10,13 +10,6 @@ use crate::raft::eraftpb::{ConfChangeV2, Message as RaftMessage}; /// Request type processed through network calls (gRPC) pub enum ServerRequestMsg { - MemberBootstrapReady { - node_id: u64, - chan: Sender, - }, - ClusterBootstrapReady { - chan: Sender, - }, RequestId { raft_addr: String, chan: Sender, @@ -43,7 +36,7 @@ pub enum ServerRequestMsg { }, } -/// Request type used for communication (method calls) between RaftFacade and RaftNode +/// Request type used for communication (method calls) between user side and RaftNode pub enum LocalRequestMsg { IsLeader { chan: Sender>, @@ -105,9 +98,6 @@ pub enum LocalRequestMsg ticket: ClusterJoinTicket, chan: Sender>, }, - SetBootstrapDone { - chan: Sender>, - }, } /// Request type sent from a RaftNode to itself (RaftNode). @@ -148,6 +138,5 @@ impl_debug_for_enum!( Propose, ChangeConfig, SendMessage, - JoinCluster, - SetBootstrapDone + JoinCluster ); diff --git a/raftify/src/response_message.rs b/raftify/src/response_message.rs index 574611d1..99b880d8 100644 --- a/raftify/src/response_message.rs +++ b/raftify/src/response_message.rs @@ -55,16 +55,16 @@ pub enum RequestIdResponseResult { #[derive(Debug)] pub enum ServerResponseMsg { - MemberBootstrapReady { result: ResponseResult }, - ClusterBootstrapReady { result: ResponseResult }, - Propose { result: ResponseResult }, - ConfigChange { result: ConfChangeResponseResult }, - RequestId { result: RequestIdResponseResult }, ReportUnreachable { result: ResponseResult }, DebugNode { result_json: String }, GetPeers { peers: Peers }, SendMessage { result: ResponseResult }, CreateSnapshot {}, + + // Rerouting available + Propose { result: ResponseResult }, + ConfigChange { result: ConfChangeResponseResult }, + RequestId { result: RequestIdResponseResult }, } pub enum LocalResponseMsg { @@ -77,15 +77,16 @@ pub enum LocalResponseMsg Store { store: FSM }, Storage { storage: HeedStorage }, GetClusterSize { size: usize }, - ChangeConfig { result: ConfChangeResponseResult }, Quit {}, MakeSnapshot {}, - Propose {}, - DebugNode { result_json: String }, JoinCluster {}, SendMessage {}, - SetBootstrapDone {}, + DebugNode { result_json: String }, _Phantom(PhantomData), + + // Rerouting available + Propose { result: ResponseResult }, + ConfigChange { result: ConfChangeResponseResult }, } impl fmt::Debug diff --git a/raftify/src/storage/utils.rs b/raftify/src/storage/utils.rs index ee44a70e..7e9b8044 100644 --- a/raftify/src/storage/utils.rs +++ b/raftify/src/storage/utils.rs @@ -35,7 +35,7 @@ pub fn clear_storage_path(log_dir_path: &str) -> Result<()> { pub fn ensure_directory_exist(dir_pth: &str) -> Result<()> { let dir_pth: &Path = Path::new(&dir_pth); - if !fs::metadata(dir_pth).is_ok() { + if fs::metadata(dir_pth).is_err() { fs::create_dir_all(dir_pth)?; } Ok(())