diff --git a/raftify/src/raft_node/mod.rs b/raftify/src/raft_node/mod.rs index 986b30bf..da872cdb 100644 --- a/raftify/src/raft_node/mod.rs +++ b/raftify/src/raft_node/mod.rs @@ -25,6 +25,7 @@ use response_sender::ResponseSender; use utils::inspect_raftnode; use crate::{ + create_client, error::{Result, SendMessageError}, raft::{ eraftpb::{ @@ -34,7 +35,7 @@ use crate::{ formatter::{format_confchangev2, format_message}, raw_node::RawNode, }, - raft_service::ChangeConfigResultType, + raft_service::{self, ChangeConfigResultType, ProposeArgs}, request_message::{LocalRequestMsg, SelfMessage, ServerRequestMsg}, response_message::{ ConfChangeResponseResult, LocalResponseMsg, RequestIdResponseResult, ResponseMessage, @@ -225,17 +226,33 @@ impl< } } - pub async fn propose(&self, proposal: Vec) { + pub async fn propose(&self, proposal: Vec) -> Result<()> { let (tx, rx) = oneshot::channel(); self.local_sender - .send(LocalRequestMsg::Propose { proposal, chan: tx }) + .send(LocalRequestMsg::Propose { + proposal: proposal.clone(), + chan: tx, + }) .await .unwrap(); let resp = rx.await.unwrap(); match resp { - LocalResponseMsg::Propose {} => (), + LocalResponseMsg::Propose { result } => match result { + ResponseResult::Success => (), + ResponseResult::Error(_) => (), + ResponseResult::WrongLeader { + leader_id, + leader_addr, + } => { + let mut client = create_client(leader_addr).await?; + client + .propose(Request::new(ProposeArgs { msg: proposal })) + .await?; + } + }, _ => unreachable!(), } + Ok(()) } pub async fn get_cluster_size(&self) -> usize { @@ -620,7 +637,11 @@ impl< if let Some(sender) = self.response_senders.remove(&response_seq) { match sender { ResponseSender::Local(sender) => { - sender.send(LocalResponseMsg::Propose {}).unwrap(); + sender + .send(LocalResponseMsg::Propose { + result: ResponseResult::Success, + }) + .unwrap(); } ResponseSender::Server(sender) => { sender @@ -781,7 +802,13 @@ impl< .to_string(); let raft_response: ResponseMessage = match chan { - ResponseSender::Local(_) => LocalResponseMsg::Propose {}.into(), + ResponseSender::Local(_) => LocalResponseMsg::Propose { + result: ResponseResult::WrongLeader { + leader_id, + leader_addr, + }, + } + .into(), ResponseSender::Server(_) => ServerResponseMsg::Propose { result: ResponseResult::WrongLeader { leader_id, @@ -982,6 +1009,23 @@ impl< Ok(()) } + pub async fn handle_rerouting_msg(&mut self, message: ServerRequestMsg) -> Result<()> { + match message { + ServerRequestMsg::ChangeConfig { conf_change, chan } => { + self.handle_confchange_request(conf_change, ResponseSender::Server(chan)) + .await?; + } + ServerRequestMsg::Propose { proposal, chan } => { + self.handle_propose_request(proposal, ResponseSender::Server(chan)) + .await?; + } + _ => { + unimplemented!() + } + } + Ok(()) + } + pub async fn handle_server_request_msg(&mut self, message: ServerRequestMsg) -> Result<()> { match message { ServerRequestMsg::ChangeConfig { conf_change, chan } => { diff --git a/raftify/src/raft_server.rs b/raftify/src/raft_server.rs index ac14f356..5804f37c 100644 --- a/raftify/src/raft_server.rs +++ b/raftify/src/raft_server.rs @@ -25,7 +25,13 @@ use super::{ response_message::{RequestIdResponseResult, ServerResponseMsg}, Config, Error, }; -use crate::raft::eraftpb::{ConfChangeV2, Message as RaftMessage}; +use crate::{ + create_client, + raft::eraftpb::{ConfChangeV2, Message as RaftMessage}, + raft_service::{ProposeArgs}, + response_message::ResponseResult, + RaftServiceClient, +}; #[derive(Clone)] pub struct RaftServer { @@ -201,7 +207,7 @@ impl RaftService for RaftServer { let (tx, rx) = oneshot::channel(); match sender .send(ServerRequestMsg::Propose { - proposal: request_args.msg, + proposal: request_args.msg.clone(), chan: tx, }) .await @@ -212,8 +218,21 @@ impl RaftService for RaftServer { let response = rx.await.unwrap(); match response { - ServerResponseMsg::Propose { result: _result } => { - Ok(Response::new(raft_service::Empty {})) + ServerResponseMsg::Propose { result } => { + match result { + ResponseResult::Success => Ok(Response::new(raft_service::Empty {})), + ResponseResult::Error(_) => Ok(Response::new(raft_service::Empty {})), + ResponseResult::WrongLeader { + leader_id, + leader_addr, + } => { + // TODO: Handle this kind of errors + let mut client = create_client(leader_addr).await.unwrap(); + let _ = client.propose(ProposeArgs {msg: request_args.msg}).await?; + + Ok(Response::new(raft_service::Empty {})) + } + } } _ => unreachable!(), } diff --git a/raftify/src/response_message.rs b/raftify/src/response_message.rs index 5b62a22b..7a8c4541 100644 --- a/raftify/src/response_message.rs +++ b/raftify/src/response_message.rs @@ -1,5 +1,9 @@ use std::{fmt, marker::PhantomData}; +use jopemachine_raft::eraftpb::ConfChangeV2; + +use crate::raft_service; + use super::{AbstractLogEntry, AbstractStateMachine, Error, HeedStorage, Peers}; pub enum ResponseMessage { @@ -55,16 +59,35 @@ pub enum RequestIdResponseResult { #[derive(Debug)] pub enum ServerResponseMsg { - MemberBootstrapReady { result: ResponseResult }, - ClusterBootstrapReady { result: ResponseResult }, - Propose { result: ResponseResult }, - ConfigChange { result: ConfChangeResponseResult }, - RequestId { result: RequestIdResponseResult }, - ReportUnreachable { result: ResponseResult }, - DebugNode { result_json: String }, - GetPeers { peers: Peers }, - SendMessage { result: ResponseResult }, + ReportUnreachable { + result: ResponseResult, + }, + DebugNode { + result_json: String, + }, + GetPeers { + peers: Peers, + }, + SendMessage { + result: ResponseResult, + }, CreateSnapshot {}, + + RerouteMessageResponse { + typ: i32, + propose_response: raft_service::Empty, + conf_change_response: Option, + }, + // Rerouting available + Propose { + result: ResponseResult, + }, + ConfigChange { + result: ConfChangeResponseResult, + }, + RequestId { + result: RequestIdResponseResult, + }, } pub enum LocalResponseMsg { @@ -77,13 +100,15 @@ pub enum LocalResponseMsg Store { store: FSM }, Storage { storage: HeedStorage }, GetClusterSize { size: usize }, - ChangeConfig { result: ConfChangeResponseResult }, Quit {}, MakeSnapshot {}, - Propose {}, - DebugNode { result_json: String }, JoinCluster {}, SendMessage {}, + DebugNode { result_json: String }, + + // Rerouting available + Propose { result: ResponseResult }, + ChangeConfig { result: ConfChangeResponseResult }, _Phantom(PhantomData), }