diff --git a/harness/src/raft.rs b/harness/src/raft.rs index e912bed5..3993a262 100644 --- a/harness/src/raft.rs +++ b/harness/src/raft.rs @@ -79,6 +79,7 @@ pub async fn build_raft_cluster(peers: Peers) -> Result<()> { Ok(()) } +// Note that this function lock RAFTS, so it should not be called while holding RAFTS lock. pub async fn spawn_extra_node(raft_addr: &str, peer_addr: &str) -> Result>> { let logger = Arc::new(Slogger { slog: build_logger(), diff --git a/harness/src/utils.rs b/harness/src/utils.rs index 9e31184b..8fa80c4a 100644 --- a/harness/src/utils.rs +++ b/harness/src/utils.rs @@ -65,7 +65,10 @@ pub async fn load_peers(example_filename: &str) -> Result match result { ResponseResult::Success => (), - ResponseResult::Error(_) => (), + ResponseResult::Error(e) => return Err(e), ResponseResult::WrongLeader { leader_id, leader_addr, @@ -255,57 +255,79 @@ impl< Ok(()) } - pub async fn get_cluster_size(&self) -> usize { + pub async fn change_config(&self, conf_change: ConfChangeV2) -> ConfChangeResponseResult { let (tx, rx) = oneshot::channel(); self.local_sender - .send(LocalRequestMsg::GetClusterSize { chan: tx }) + .send(LocalRequestMsg::ChangeConfig { + conf_change: conf_change.clone(), + chan: tx, + }) .await .unwrap(); let resp = rx.await.unwrap(); match resp { - LocalResponseMsg::GetClusterSize { size } => size, + LocalResponseMsg::ChangeConfig { result } => match result { + ConfChangeResponseResult::WrongLeader { + leader_id, + leader_addr, + } => { + let mut client = create_client(leader_addr).await.unwrap(); + let res = client.change_config(conf_change.clone()).await.unwrap(); + + let result = res.into_inner(); + + if result.result_type + == raft_service::ChangeConfigResultType::ChangeConfigSuccess as i32 + { + ConfChangeResponseResult::JoinSuccess { + assigned_id: result.assigned_id, + peers: deserialize(result.peers.as_slice()).unwrap(), + } + } else { + ConfChangeResponseResult::Error(Error::Unknown) + } + } + _ => result, + }, _ => unreachable!(), } } - pub async fn quit(&self) { + pub async fn get_cluster_size(&self) -> usize { let (tx, rx) = oneshot::channel(); self.local_sender - .send(LocalRequestMsg::Quit { chan: tx }) + .send(LocalRequestMsg::GetClusterSize { chan: tx }) .await .unwrap(); let resp = rx.await.unwrap(); match resp { - LocalResponseMsg::Quit {} => (), + LocalResponseMsg::GetClusterSize { size } => size, _ => unreachable!(), } } - pub async fn leave(&self) { + pub async fn quit(&self) { let (tx, rx) = oneshot::channel(); self.local_sender - .send(LocalRequestMsg::Leave { chan: tx }) + .send(LocalRequestMsg::Quit { chan: tx }) .await .unwrap(); let resp = rx.await.unwrap(); match resp { - LocalResponseMsg::ChangeConfig { result: _result } => (), + LocalResponseMsg::Quit {} => (), _ => unreachable!(), } } - pub async fn change_config(&self, conf_change: ConfChangeV2) -> ConfChangeResponseResult { + pub async fn leave(&self) { let (tx, rx) = oneshot::channel(); self.local_sender - .send(LocalRequestMsg::ChangeConfig { - conf_change, - chan: tx, - }) + .send(LocalRequestMsg::Leave { chan: tx }) .await .unwrap(); let resp = rx.await.unwrap(); match resp { - LocalResponseMsg::ChangeConfig { result } => result, + LocalResponseMsg::ChangeConfig { result: _result } => (), _ => unreachable!(), } } @@ -613,11 +635,6 @@ impl< .into_inner(); match response.result_type() { - ChangeConfigResultType::ChangeConfigWrongLeader => { - // TODO: Handle this - // response.data(); - continue; - } ChangeConfigResultType::ChangeConfigSuccess => break Ok(()), ChangeConfigResultType::ChangeConfigUnknownError => return Err(Error::JoinError), ChangeConfigResultType::ChangeConfigRejected => { @@ -626,6 +643,10 @@ impl< ChangeConfigResultType::ChangeConfigTimeoutError => { return Err(Error::Timeout); } + ChangeConfigResultType::ChangeConfigWrongLeader => { + // Should be handled in RaftServiceClient + unreachable!() + } } } } diff --git a/raftify/src/raft_server.rs b/raftify/src/raft_server.rs index 5804f37c..7a4f1ee8 100644 --- a/raftify/src/raft_server.rs +++ b/raftify/src/raft_server.rs @@ -28,9 +28,8 @@ use super::{ use crate::{ create_client, raft::eraftpb::{ConfChangeV2, Message as RaftMessage}, - raft_service::{ProposeArgs}, - response_message::ResponseResult, - RaftServiceClient, + raft_service::ProposeArgs, + response_message::{ConfChangeResponseResult, ResponseResult}, }; #[derive(Clone)] @@ -143,7 +142,7 @@ impl RaftService for RaftServer { let (tx, rx) = oneshot::channel(); let message = ServerRequestMsg::ChangeConfig { - conf_change: request_args, + conf_change: request_args.clone(), chan: tx, }; @@ -161,16 +160,51 @@ impl RaftService for RaftServer { ) .await { - Ok(Ok(_raft_response)) => { + Ok(Ok(raft_response)) => { + match raft_response { + ServerResponseMsg::ConfigChange { result } => match result { + ConfChangeResponseResult::JoinSuccess { assigned_id, peers } => { + reply.result_type = + raft_service::ChangeConfigResultType::ChangeConfigSuccess as i32; + reply.assigned_id = assigned_id; + reply.peers = serialize(&peers).unwrap(); + } + ConfChangeResponseResult::RemoveSuccess {} => { + reply.result_type = + raft_service::ChangeConfigResultType::ChangeConfigSuccess as i32; + } + ConfChangeResponseResult::Error(e) => { + reply.result_type = + raft_service::ChangeConfigResultType::ChangeConfigUnknownError + as i32; + reply.error = e.to_string().as_bytes().to_vec(); + } + ConfChangeResponseResult::WrongLeader { + leader_id, + leader_addr, + } => { + reply.result_type = + raft_service::ChangeConfigResultType::ChangeConfigWrongLeader + as i32; + + let mut client = create_client(leader_addr).await.unwrap(); + reply = client.change_config(request_args).await?.into_inner(); + } + }, + _ => unreachable!(), + } reply.result_type = raft_service::ChangeConfigResultType::ChangeConfigSuccess as i32; - reply.data = vec![]; } - Ok(_) => (), - Err(_e) => { + Ok(Err(e)) => { + reply.result_type = + raft_service::ChangeConfigResultType::ChangeConfigUnknownError as i32; + reply.error = e.to_string().as_bytes().to_vec(); + } + Err(e) => { reply.result_type = raft_service::ChangeConfigResultType::ChangeConfigTimeoutError as i32; - reply.data = vec![]; + reply.error = e.to_string().as_bytes().to_vec(); self.logger.error("timeout waiting for reply"); } } @@ -228,7 +262,11 @@ impl RaftService for RaftServer { } => { // TODO: Handle this kind of errors let mut client = create_client(leader_addr).await.unwrap(); - let _ = client.propose(ProposeArgs {msg: request_args.msg}).await?; + let _ = client + .propose(ProposeArgs { + msg: request_args.msg, + }) + .await?; Ok(Response::new(raft_service::Empty {})) }