From 09069320f0f923085f0df1f92dc60d4d8193b94c Mon Sep 17 00:00:00 2001 From: Gyubong Date: Thu, 1 Feb 2024 15:32:26 +0900 Subject: [PATCH] WIP --- harness/tests/data_replication.rs | 13 +++++------ raftify/proto/raft_service.proto | 1 + raftify/src/raft_node/mod.rs | 37 +++++++++---------------------- raftify/src/raft_server.rs | 30 +++++++++++++++---------- raftify/src/response_message.rs | 2 +- 5 files changed, 37 insertions(+), 46 deletions(-) diff --git a/harness/tests/data_replication.rs b/harness/tests/data_replication.rs index 377a1efa..d42340c3 100644 --- a/harness/tests/data_replication.rs +++ b/harness/tests/data_replication.rs @@ -19,16 +19,16 @@ pub async fn test_data_replication() { let mut rafts = RAFTS.lock().unwrap(); let raft_1 = rafts.get(&1).unwrap(); wait_for_until_cluster_size_increase(raft_1.clone(), 3).await; - + let entry = LogEntry::Insert { key: 1, value: "test".to_string(), } .encode() .unwrap(); - + raft_1.raft_node.propose(entry).await.unwrap(); - + sleep(Duration::from_secs(1)).await; // Data should be replicated to all nodes. for (_, raft) in rafts.iter_mut() { @@ -40,7 +40,6 @@ pub async fn test_data_replication() { sleep(Duration::from_secs(1)).await; } - // TODO: This assumes that the leader is node 1, and it is not true anymore, so we requires to implement rerouting logic. tokio::spawn(spawn_extra_node("127.0.0.1:60064", RAFT_ADDRS[0])); sleep(Duration::from_secs(1)).await; @@ -59,17 +58,17 @@ pub async fn test_data_replication() { { let mut rafts = RAFTS.lock().unwrap(); let raft_1 = rafts.get(&1).unwrap(); - + let new_entry = LogEntry::Insert { key: 2, value: "test2".to_string(), } .encode() .unwrap(); - + raft_1.raft_node.propose(new_entry).await.unwrap(); sleep(Duration::from_secs(1)).await; - + // New entry data should be replicated to all nodes including new joined node. for (_, raft) in rafts.iter_mut() { let store = raft.raft_node.store().await; diff --git a/raftify/proto/raft_service.proto b/raftify/proto/raft_service.proto index 9c583833..cf02c30a 100644 --- a/raftify/proto/raft_service.proto +++ b/raftify/proto/raft_service.proto @@ -45,6 +45,7 @@ message RequestIdResponse{ string leader_addr = 3; uint64 reserved_id = 4; bytes peers = 5; + bytes error = 6; } // Config Change diff --git a/raftify/src/raft_node/mod.rs b/raftify/src/raft_node/mod.rs index 5691f657..8ae96870 100644 --- a/raftify/src/raft_node/mod.rs +++ b/raftify/src/raft_node/mod.rs @@ -235,14 +235,15 @@ impl< }) .await .unwrap(); + let resp = rx.await.unwrap(); match resp { LocalResponseMsg::Propose { result } => match result { ResponseResult::Success => (), ResponseResult::Error(e) => return Err(e), ResponseResult::WrongLeader { - leader_id, leader_addr, + .. } => { let mut client = create_client(leader_addr).await?; client @@ -264,12 +265,13 @@ impl< }) .await .unwrap(); + let resp = rx.await.unwrap(); match resp { - LocalResponseMsg::ChangeConfig { result } => match result { + LocalResponseMsg::ConfigChange { result } => match result { ConfChangeResponseResult::WrongLeader { - leader_id, leader_addr, + .. } => { let mut client = create_client(leader_addr).await.unwrap(); let res = client.change_config(conf_change.clone()).await.unwrap(); @@ -299,6 +301,7 @@ impl< .send(LocalRequestMsg::GetClusterSize { chan: tx }) .await .unwrap(); + let resp = rx.await.unwrap(); match resp { LocalResponseMsg::GetClusterSize { size } => size, @@ -327,7 +330,7 @@ impl< .unwrap(); let resp = rx.await.unwrap(); match resp { - LocalResponseMsg::ChangeConfig { result: _result } => (), + LocalResponseMsg::ConfigChange { result: _result } => (), _ => unreachable!(), } } @@ -786,7 +789,7 @@ impl< match sender { ResponseSender::Local(sender) => { sender - .send(LocalResponseMsg::ChangeConfig { result: response }) + .send(LocalResponseMsg::ConfigChange { result: response }) .unwrap(); } ResponseSender::Server(sender) => { @@ -876,17 +879,17 @@ impl< let peers = self.peers.lock().await; let leader_addr = peers.get(&leader_id).unwrap().addr.to_string(); - let result = ConfChangeResponseResult::WrongLeader { + let wrong_leader_result = ConfChangeResponseResult::WrongLeader { leader_id, leader_addr, }; match chan { ResponseSender::Local(chan) => chan - .send(LocalResponseMsg::ChangeConfig { result }) + .send(LocalResponseMsg::ConfigChange { result: wrong_leader_result }) .unwrap(), ResponseSender::Server(chan) => chan - .send(ServerResponseMsg::ConfigChange { result }) + .send(ServerResponseMsg::ConfigChange { result: wrong_leader_result }) .unwrap(), } } else { @@ -1030,23 +1033,6 @@ impl< Ok(()) } - pub async fn handle_rerouting_msg(&mut self, message: ServerRequestMsg) -> Result<()> { - match message { - ServerRequestMsg::ChangeConfig { conf_change, chan } => { - self.handle_confchange_request(conf_change, ResponseSender::Server(chan)) - .await?; - } - ServerRequestMsg::Propose { proposal, chan } => { - self.handle_propose_request(proposal, ResponseSender::Server(chan)) - .await?; - } - _ => { - unimplemented!() - } - } - Ok(()) - } - pub async fn handle_server_request_msg(&mut self, message: ServerRequestMsg) -> Result<()> { match message { ServerRequestMsg::ChangeConfig { conf_change, chan } => { @@ -1068,7 +1054,6 @@ impl< } ServerRequestMsg::RequestId { raft_addr, chan } => { if !self.is_leader() { - // TODO: retry strategy in case of failure let leader_id = self.get_leader_id(); let peers = self.peers.lock().await; let leader_addr = peers.get(&leader_id).unwrap().addr.to_string(); diff --git a/raftify/src/raft_server.rs b/raftify/src/raft_server.rs index 7a4f1ee8..457e4382 100644 --- a/raftify/src/raft_server.rs +++ b/raftify/src/raft_server.rs @@ -97,7 +97,7 @@ impl RaftService for RaftServer { let (tx, rx) = oneshot::channel(); sender .send(ServerRequestMsg::RequestId { - raft_addr: request_args.raft_addr, + raft_addr: request_args.raft_addr.clone(), chan: tx, }) .await @@ -116,18 +116,24 @@ impl RaftService for RaftServer { reserved_id, leader_addr: self.addr.to_string(), peers: serialize(&peers).unwrap(), + ..Default::default() })), + RequestIdResponseResult::Error(e) => { + Ok(Response::new(raft_service::RequestIdResponse { + code: raft_service::ResultCode::Error as i32, + error: e.to_string().as_bytes().to_vec(), + ..Default::default() + })) + } RequestIdResponseResult::WrongLeader { - leader_id, leader_addr, - } => Ok(Response::new(raft_service::RequestIdResponse { - code: raft_service::ResultCode::WrongLeader as i32, - leader_id, - leader_addr, - reserved_id: 0, - peers: vec![], - })), - _ => unreachable!(), + .. + } => { + let mut client = create_client(leader_addr).await.unwrap(); + let reply = client.request_id(request_args).await?.into_inner(); + + Ok(Response::new(reply)) + }, }, _ => unreachable!(), } @@ -180,8 +186,8 @@ impl RaftService for RaftServer { reply.error = e.to_string().as_bytes().to_vec(); } ConfChangeResponseResult::WrongLeader { - leader_id, leader_addr, + .. } => { reply.result_type = raft_service::ChangeConfigResultType::ChangeConfigWrongLeader @@ -257,8 +263,8 @@ impl RaftService for RaftServer { ResponseResult::Success => Ok(Response::new(raft_service::Empty {})), ResponseResult::Error(_) => Ok(Response::new(raft_service::Empty {})), ResponseResult::WrongLeader { - leader_id, leader_addr, + .. } => { // TODO: Handle this kind of errors let mut client = create_client(leader_addr).await.unwrap(); diff --git a/raftify/src/response_message.rs b/raftify/src/response_message.rs index 7a8c4541..40ed0253 100644 --- a/raftify/src/response_message.rs +++ b/raftify/src/response_message.rs @@ -108,7 +108,7 @@ pub enum LocalResponseMsg // Rerouting available Propose { result: ResponseResult }, - ChangeConfig { result: ConfChangeResponseResult }, + ConfigChange { result: ConfChangeResponseResult }, _Phantom(PhantomData), }