From 472636dd6fde04f9ec126cf4f0751e92b93eaadc Mon Sep 17 00:00:00 2001 From: Gyubong Date: Mon, 30 Sep 2024 11:35:13 +0900 Subject: [PATCH] feat: Improve error handling --- examples/memstore/src/web_server_api.rs | 9 +- harness/src/utils.rs | 4 +- harness/tests/data_replication.rs | 6 +- harness/tests/leader_election.rs | 8 +- raftify/src/raft_node/mod.rs | 136 ++++++++++++------------ 5 files changed, 85 insertions(+), 78 deletions(-) diff --git a/examples/memstore/src/web_server_api.rs b/examples/memstore/src/web_server_api.rs index c8bd7655..f43a13ad 100644 --- a/examples/memstore/src/web_server_api.rs +++ b/examples/memstore/src/web_server_api.rs @@ -28,7 +28,12 @@ async fn get(data: web::Data<(HashStore, Raft)>, path: web::Path) -> impl R #[get("/leader")] async fn leader_id(data: web::Data<(HashStore, Raft)>) -> impl Responder { let raft = data.clone(); - let leader_id = raft.1.get_leader_id().await.to_string(); + let leader_id = raft + .1 + .get_leader_id() + .await + .expect("Failed to get leader id") + .to_string(); format!("{:?}", leader_id) } @@ -68,6 +73,7 @@ async fn snapshot(data: web::Data<(HashStore, Raft)>) -> impl Responder { .1 .storage() .await + .expect("Failed to get storage") .last_index() .expect("Failed to get last index"); @@ -75,6 +81,7 @@ async fn snapshot(data: web::Data<(HashStore, Raft)>) -> impl Responder { .1 .storage() .await + .expect("Failed to get storage") .hard_state() .expect("Failed to get hard state"); diff --git a/harness/src/utils.rs b/harness/src/utils.rs index 679c1f9e..912b0495 100644 --- a/harness/src/utils.rs +++ b/harness/src/utils.rs @@ -89,7 +89,7 @@ pub async fn wait_for_until_cluster_size_increase(raft: Raft, target: usize) { )); loop { - let size = raft.get_cluster_size().await; + let size = raft.get_cluster_size().await.unwrap(); if size >= target { break; } @@ -107,7 +107,7 @@ pub async fn wait_for_until_cluster_size_decrease(raft: Raft, target: usize) { )); loop { - let size = raft.get_cluster_size().await; + let size = raft.get_cluster_size().await.unwrap(); if size <= target { break; } diff --git a/harness/tests/data_replication.rs b/harness/tests/data_replication.rs index 73699cbb..cad05036 100644 --- a/harness/tests/data_replication.rs +++ b/harness/tests/data_replication.rs @@ -37,7 +37,7 @@ pub async fn test_data_replication() { // Data should be replicated to all nodes. for (_, raft) in rafts.iter_mut() { - let store = raft.state_machine().await; + let store = raft.state_machine().await.unwrap(); let store_lk = store.0.read().unwrap(); assert_eq!(store_lk.get(&1).unwrap(), "test"); } @@ -59,7 +59,7 @@ pub async fn test_data_replication() { wait_for_until_cluster_size_increase(raft_1.clone(), 4).await; let raft_4 = rafts.get(&4).unwrap(); - let store = raft_4.state_machine().await; + let store = raft_4.state_machine().await.unwrap(); let store_lk = store.0.read().unwrap(); // Data should be replicated to new joined node. @@ -80,7 +80,7 @@ pub async fn test_data_replication() { // New entry data should be replicated to all nodes including new joined node. for (_, raft) in rafts.iter() { // stop - let store = raft.state_machine().await; + let store = raft.state_machine().await.unwrap(); let store_lk = store.0.read().unwrap(); assert_eq!(store_lk.get(&2).unwrap(), "test2"); } diff --git a/harness/tests/leader_election.rs b/harness/tests/leader_election.rs index d56adec4..77dc8b4f 100644 --- a/harness/tests/leader_election.rs +++ b/harness/tests/leader_election.rs @@ -36,7 +36,7 @@ pub async fn test_leader_election_in_three_node_example() { wait_for_until_cluster_size_decrease(raft_2.clone(), 2).await; - let leader_id = raft_2.get_leader_id().await; + let leader_id = raft_2.get_leader_id().await.unwrap(); let timer = timeout(Duration::from_secs(5), async { while leader_id == 0 { @@ -85,7 +85,7 @@ pub async fn test_leader_election_in_five_node_example() { sleep(Duration::from_secs(2)).await; - let leader_id = raft_2.get_leader_id().await; + let leader_id = raft_2.get_leader_id().await.unwrap(); assert!( [2, 3, 4, 5].contains(&leader_id), @@ -106,10 +106,10 @@ pub async fn test_leader_election_in_five_node_example() { wait_for_until_cluster_size_decrease(raft_k.clone(), 3).await; sleep(Duration::from_secs(2)).await; - let leader_id = raft_k.get_leader_id().await; + let leader_id = raft_k.get_leader_id().await.unwrap(); assert!(leader_id != 0); - assert_eq!(raft_k.get_cluster_size().await, 3); + assert_eq!(raft_k.get_cluster_size().await.unwrap(), 3); sleep(Duration::from_secs(2)).await; diff --git a/raftify/src/raft_node/mod.rs b/raftify/src/raft_node/mod.rs index 9c78c8ef..03aa4c3d 100644 --- a/raftify/src/raft_node/mod.rs +++ b/raftify/src/raft_node/mod.rs @@ -106,63 +106,68 @@ impl< }) } - pub async fn is_leader(&self) -> bool { + pub async fn is_leader(&self) -> Result { let (tx, rx) = oneshot::channel(); self.tx_local .send(LocalRequestMsg::IsLeader { tx_msg: tx }) .await .unwrap(); - let resp = rx.await.unwrap(); + let resp = rx.await?; match resp { - LocalResponseMsg::IsLeader { is_leader } => is_leader, + LocalResponseMsg::IsLeader { is_leader } => Ok(is_leader), _ => unreachable!(), } } - pub async fn get_id(&self) -> u64 { + pub async fn get_id(&self) -> Result { let (tx, rx) = oneshot::channel(); self.tx_local .send(LocalRequestMsg::GetId { tx_msg: tx }) .await .unwrap(); - let resp = rx.await.unwrap(); + let resp = rx.await?; match resp { - LocalResponseMsg::GetId { id } => id, + LocalResponseMsg::GetId { id } => Ok(id), _ => unreachable!(), } } - pub async fn get_leader_id(&self) -> u64 { + pub async fn get_leader_id(&self) -> Result { let (tx, rx) = oneshot::channel(); self.tx_local .send(LocalRequestMsg::GetLeaderId { tx_msg: tx }) .await .unwrap(); - let resp = rx.await.unwrap(); + let resp = rx.await?; match resp { - LocalResponseMsg::GetLeaderId { leader_id } => leader_id, + LocalResponseMsg::GetLeaderId { leader_id } => Ok(leader_id), _ => unreachable!(), } } - pub async fn get_peers(&self) -> Peers { + pub async fn get_peers(&self) -> Result { let (tx, rx) = oneshot::channel(); self.tx_local .send(LocalRequestMsg::GetPeers { tx_msg: tx }) .await .unwrap(); - let resp = rx.await.unwrap(); + let resp = rx.await?; match resp { - LocalResponseMsg::GetPeers { peers } => peers, + LocalResponseMsg::GetPeers { peers } => Ok(peers), _ => unreachable!(), } } - pub async fn add_peer(&self, id: u64, addr: A, role: Option) { + pub async fn add_peer( + &self, + id: u64, + addr: A, + role: Option, + ) -> Result<()> { let addr = addr.to_socket_addrs().unwrap().next().unwrap().to_string(); let (tx, rx) = oneshot::channel(); self.tx_local @@ -174,24 +179,24 @@ impl< }) .await .unwrap(); - let resp = rx.await.unwrap(); + let resp = rx.await?; match resp { - LocalResponseMsg::AddPeer {} => (), + LocalResponseMsg::AddPeer {} => Ok(()), _ => unreachable!(), } } - pub async fn add_peers(&self, peers: HashMap) { + pub async fn add_peers(&self, peers: HashMap) -> Result<()> { let (tx, rx) = oneshot::channel(); self.tx_local .send(LocalRequestMsg::AddPeers { peers, tx_msg: tx }) .await .unwrap(); - let resp = rx.await.unwrap(); + let resp = rx.await?; match resp { - LocalResponseMsg::AddPeers {} => (), + LocalResponseMsg::AddPeers {} => Ok(()), _ => unreachable!(), } } @@ -202,7 +207,7 @@ impl< .send(LocalRequestMsg::DebugNode { tx_msg: tx }) .await .unwrap(); - let resp = rx.await.unwrap(); + let resp = rx.await?; match resp { LocalResponseMsg::DebugNode { result_json } => Ok(result_json), @@ -210,30 +215,30 @@ impl< } } - pub async fn state_machine(&self) -> FSM { + pub async fn state_machine(&self) -> Result { let (tx, rx) = oneshot::channel(); self.tx_local .send(LocalRequestMsg::GetStateMachine { tx_msg: tx }) .await .unwrap(); - let resp = rx.await.unwrap(); + let resp = rx.await?; match resp { - LocalResponseMsg::GetStateMachine { store } => store, + LocalResponseMsg::GetStateMachine { store } => Ok(store), _ => unreachable!(), } } - pub async fn storage(&self) -> LogStorage { + pub async fn storage(&self) -> Result { let (tx, rx) = oneshot::channel(); self.tx_local .send(LocalRequestMsg::GetStorage { tx_msg: tx }) .await .unwrap(); - let resp = rx.await.unwrap(); + let resp = rx.await?; match resp { - LocalResponseMsg::GetStorage { storage } => storage, + LocalResponseMsg::GetStorage { storage } => Ok(storage), _ => unreachable!(), } } @@ -248,7 +253,7 @@ impl< .await .unwrap(); - let resp = rx.await.unwrap(); + let resp = rx.await?; match resp { LocalResponseMsg::Propose { result } => match result { ResponseResult::Success => (), @@ -265,7 +270,10 @@ impl< Ok(()) } - pub async fn change_config(&self, conf_change: ConfChangeV2) -> ConfChangeResponseResult { + pub async fn change_config( + &self, + conf_change: ConfChangeV2, + ) -> Result { let (tx, rx) = oneshot::channel(); let conf_change: ConfChangeRequest = conf_change.into(); self.tx_local @@ -276,7 +284,7 @@ impl< .await .unwrap(); - let resp = rx.await.unwrap(); + let resp = rx.await?; match resp { LocalResponseMsg::ConfigChange { result } => match result { ConfChangeResponseResult::WrongLeader { leader_addr, .. } => { @@ -291,48 +299,48 @@ impl< if result.result_type == raft_service::ChangeConfigResultType::ChangeConfigSuccess as i32 { - ConfChangeResponseResult::JoinSuccess { + Ok(ConfChangeResponseResult::JoinSuccess { assigned_ids: result.assigned_ids, peers: deserialize(result.peers.as_slice()).unwrap(), - } + }) } else { - ConfChangeResponseResult::Error(Error::Unknown) + Ok(ConfChangeResponseResult::Error(Error::Unknown)) } } - _ => result, + _ => Ok(result), }, _ => unreachable!(), } } - pub async fn get_cluster_size(&self) -> usize { + pub async fn get_cluster_size(&self) -> Result { let (tx, rx) = oneshot::channel(); self.tx_local .send(LocalRequestMsg::GetClusterSize { tx_msg: tx }) .await .unwrap(); - let resp = rx.await.unwrap(); + let resp = rx.await?; match resp { - LocalResponseMsg::GetClusterSize { size } => size, + LocalResponseMsg::GetClusterSize { size } => Ok(size), _ => unreachable!(), } } - pub async fn quit(&self) { + pub async fn quit(&self) -> Result<()> { let (tx, rx) = oneshot::channel(); self.tx_local .send(LocalRequestMsg::Quit { tx_msg: tx }) .await .unwrap(); - let resp = rx.await.unwrap(); + let resp = rx.await?; match resp { - LocalResponseMsg::Quit {} => (), + LocalResponseMsg::Quit {} => Ok(()), _ => unreachable!(), } } - pub async fn transfer_leader(&self, node_id: u64) { + pub async fn transfer_leader(&self, node_id: u64) -> Result<()> { let (tx, rx) = oneshot::channel(); self.tx_local .send(LocalRequestMsg::TransferLeader { @@ -341,27 +349,27 @@ impl< }) .await .unwrap(); - let resp = rx.await.unwrap(); + let resp = rx.await?; match resp { - LocalResponseMsg::TransferLeader {} => (), + LocalResponseMsg::TransferLeader {} => Ok(()), _ => unreachable!(), } } - pub async fn campaign(&self) { + pub async fn campaign(&self) -> Result<()> { let (tx, rx) = oneshot::channel(); self.tx_local .send(LocalRequestMsg::Campaign { tx_msg: tx }) .await .unwrap(); - let resp = rx.await.unwrap(); + let resp = rx.await?; match resp { - LocalResponseMsg::Campaign {} => (), + LocalResponseMsg::Campaign {} => Ok(()), _ => unreachable!(), } } - pub async fn demote(&self, term: u64, leader_id: u64) { + pub async fn demote(&self, term: u64, leader_id: u64) -> Result<()> { let (tx, rx) = oneshot::channel(); self.tx_local .send(LocalRequestMsg::Demote { @@ -371,22 +379,22 @@ impl< }) .await .unwrap(); - let resp = rx.await.unwrap(); + let resp = rx.await?; match resp { - LocalResponseMsg::Demote {} => (), + LocalResponseMsg::Demote {} => Ok(()), _ => unreachable!(), } } - pub async fn leave(&self) { + pub async fn leave(&self) -> Result<()> { let (tx, rx) = oneshot::channel(); self.tx_local .send(LocalRequestMsg::Leave { tx_msg: tx }) .await .unwrap(); - let resp = rx.await.unwrap(); + let resp = rx.await?; match resp { - LocalResponseMsg::ConfigChange { result: _result } => (), + LocalResponseMsg::ConfigChange { result: _result } => Ok(()), _ => unreachable!(), } } @@ -398,7 +406,7 @@ impl< .unwrap(); } - pub async fn send_message(&self, message: RaftMessage) { + pub async fn send_message(&self, message: RaftMessage) -> Result<()> { let (tx, rx) = oneshot::channel(); self.tx_local .send(LocalRequestMsg::SendMessage { @@ -407,9 +415,9 @@ impl< }) .await .unwrap(); - let resp = rx.await.unwrap(); + let resp = rx.await?; match resp { - LocalResponseMsg::SendMessage {} => (), + LocalResponseMsg::SendMessage {} => Ok(()), _ => unreachable!(), } } @@ -431,7 +439,7 @@ impl< } } - pub async fn join_cluster(&self, tickets: Vec) { + pub async fn join_cluster(&self, tickets: Vec) -> Result<()> { let (tx, rx) = oneshot::channel(); self.tx_local .send(LocalRequestMsg::JoinCluster { @@ -440,25 +448,25 @@ impl< }) .await .unwrap(); - let resp = rx.await.unwrap(); + let resp = rx.await?; match resp { - LocalResponseMsg::JoinCluster {} => (), + LocalResponseMsg::JoinCluster {} => Ok(()), _ => unreachable!(), } } /// # Safety /// TODO: Write this. - pub async unsafe fn get_raw_node(&self) -> Arc>> { + pub async unsafe fn get_raw_node(&self) -> Result>>> { let (tx, rx) = oneshot::channel(); self.tx_local .send(LocalRequestMsg::GetRawNode { tx_msg: tx }) .await .unwrap(); - let resp = rx.await.unwrap(); + let resp = rx.await?; match resp { - LocalResponseMsg::GetRawNode { raw_node } => raw_node, + LocalResponseMsg::GetRawNode { raw_node } => Ok(raw_node), _ => unreachable!(), } } @@ -479,7 +487,7 @@ pub struct RaftNodeCore< FSM: AbstractStateMachine + Clone + 'static, > { pub raw_node: RawNode, - // pub log_storage: LogStorage, + // pub log_storage: LogStorage, # Since there is no particular reason to store it, we do not save the log_storage. pub fsm: FSM, pub peers: Arc>, response_seq: AtomicU64, @@ -600,7 +608,6 @@ impl< tx_self, rx_self, _phantom_log_entry_typ: PhantomData, - // log_storage, }) } @@ -1182,14 +1189,7 @@ impl< term, tx_msg, } => { - // println!("Make snapshot!! 2 2"); - // let r = self.make_snapshot(index, term).await; - // println!("Make snapshot!! 2 3 r: {:?}", r); - // if let Err(e) = r { - // return Err(e); - // } self.make_snapshot(index, term).await?; - println!("Make snapshot!! 2 3"); tx_msg.send(LocalResponseMsg::MakeSnapshot {}).unwrap(); } LocalRequestMsg::JoinCluster { tickets, tx_msg } => {