Skip to content

Commit

Permalink
Implement propose request rerouting
Browse files Browse the repository at this point in the history
  • Loading branch information
jopemachine committed Feb 1, 2024
1 parent ac1c72a commit 38ca8a3
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 22 deletions.
56 changes: 50 additions & 6 deletions raftify/src/raft_node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use response_sender::ResponseSender;
use utils::inspect_raftnode;

use crate::{
create_client,
error::{Result, SendMessageError},
raft::{
eraftpb::{
Expand All @@ -34,7 +35,7 @@ use crate::{
formatter::{format_confchangev2, format_message},
raw_node::RawNode,
},
raft_service::ChangeConfigResultType,
raft_service::{self, ChangeConfigResultType, ProposeArgs},
request_message::{LocalRequestMsg, SelfMessage, ServerRequestMsg},
response_message::{
ConfChangeResponseResult, LocalResponseMsg, RequestIdResponseResult, ResponseMessage,
Expand Down Expand Up @@ -225,17 +226,33 @@ impl<
}
}

pub async fn propose(&self, proposal: Vec<u8>) {
pub async fn propose(&self, proposal: Vec<u8>) -> Result<()> {
let (tx, rx) = oneshot::channel();
self.local_sender
.send(LocalRequestMsg::Propose { proposal, chan: tx })
.send(LocalRequestMsg::Propose {
proposal: proposal.clone(),
chan: tx,
})
.await
.unwrap();
let resp = rx.await.unwrap();
match resp {
LocalResponseMsg::Propose {} => (),
LocalResponseMsg::Propose { result } => match result {
ResponseResult::Success => (),
ResponseResult::Error(_) => (),
ResponseResult::WrongLeader {
leader_id,
leader_addr,
} => {
let mut client = create_client(leader_addr).await?;
client
.propose(Request::new(ProposeArgs { msg: proposal }))
.await?;
}
},
_ => unreachable!(),
}
Ok(())
}

pub async fn get_cluster_size(&self) -> usize {
Expand Down Expand Up @@ -620,7 +637,11 @@ impl<
if let Some(sender) = self.response_senders.remove(&response_seq) {
match sender {
ResponseSender::Local(sender) => {
sender.send(LocalResponseMsg::Propose {}).unwrap();
sender
.send(LocalResponseMsg::Propose {
result: ResponseResult::Success,
})
.unwrap();
}
ResponseSender::Server(sender) => {
sender
Expand Down Expand Up @@ -781,7 +802,13 @@ impl<
.to_string();

let raft_response: ResponseMessage<LogEntry, FSM> = match chan {
ResponseSender::Local(_) => LocalResponseMsg::Propose {}.into(),
ResponseSender::Local(_) => LocalResponseMsg::Propose {
result: ResponseResult::WrongLeader {
leader_id,
leader_addr,
},
}
.into(),
ResponseSender::Server(_) => ServerResponseMsg::Propose {
result: ResponseResult::WrongLeader {
leader_id,
Expand Down Expand Up @@ -982,6 +1009,23 @@ impl<
Ok(())
}

pub async fn handle_rerouting_msg(&mut self, message: ServerRequestMsg) -> Result<()> {
match message {
ServerRequestMsg::ChangeConfig { conf_change, chan } => {
self.handle_confchange_request(conf_change, ResponseSender::Server(chan))
.await?;
}
ServerRequestMsg::Propose { proposal, chan } => {
self.handle_propose_request(proposal, ResponseSender::Server(chan))
.await?;
}
_ => {
unimplemented!()
}
}
Ok(())
}

pub async fn handle_server_request_msg(&mut self, message: ServerRequestMsg) -> Result<()> {
match message {
ServerRequestMsg::ChangeConfig { conf_change, chan } => {
Expand Down
27 changes: 23 additions & 4 deletions raftify/src/raft_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,13 @@ use super::{
response_message::{RequestIdResponseResult, ServerResponseMsg},
Config, Error,
};
use crate::raft::eraftpb::{ConfChangeV2, Message as RaftMessage};
use crate::{
create_client,
raft::eraftpb::{ConfChangeV2, Message as RaftMessage},
raft_service::{ProposeArgs},
response_message::ResponseResult,
RaftServiceClient,
};

#[derive(Clone)]
pub struct RaftServer {
Expand Down Expand Up @@ -201,7 +207,7 @@ impl RaftService for RaftServer {
let (tx, rx) = oneshot::channel();
match sender
.send(ServerRequestMsg::Propose {
proposal: request_args.msg,
proposal: request_args.msg.clone(),
chan: tx,
})
.await
Expand All @@ -212,8 +218,21 @@ impl RaftService for RaftServer {

let response = rx.await.unwrap();
match response {
ServerResponseMsg::Propose { result: _result } => {
Ok(Response::new(raft_service::Empty {}))
ServerResponseMsg::Propose { result } => {
match result {
ResponseResult::Success => Ok(Response::new(raft_service::Empty {})),
ResponseResult::Error(_) => Ok(Response::new(raft_service::Empty {})),
ResponseResult::WrongLeader {
leader_id,
leader_addr,
} => {
// TODO: Handle this kind of errors
let mut client = create_client(leader_addr).await.unwrap();
let _ = client.propose(ProposeArgs {msg: request_args.msg}).await?;

Ok(Response::new(raft_service::Empty {}))
}
}
}
_ => unreachable!(),
}
Expand Down
49 changes: 37 additions & 12 deletions raftify/src/response_message.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
use std::{fmt, marker::PhantomData};

use jopemachine_raft::eraftpb::ConfChangeV2;

use crate::raft_service;

use super::{AbstractLogEntry, AbstractStateMachine, Error, HeedStorage, Peers};

pub enum ResponseMessage<LogEntry: AbstractLogEntry, FSM: AbstractStateMachine> {
Expand Down Expand Up @@ -55,16 +59,35 @@ pub enum RequestIdResponseResult {

#[derive(Debug)]
pub enum ServerResponseMsg {
MemberBootstrapReady { result: ResponseResult },
ClusterBootstrapReady { result: ResponseResult },
Propose { result: ResponseResult },
ConfigChange { result: ConfChangeResponseResult },
RequestId { result: RequestIdResponseResult },
ReportUnreachable { result: ResponseResult },
DebugNode { result_json: String },
GetPeers { peers: Peers },
SendMessage { result: ResponseResult },
ReportUnreachable {
result: ResponseResult,
},
DebugNode {
result_json: String,
},
GetPeers {
peers: Peers,
},
SendMessage {
result: ResponseResult,
},
CreateSnapshot {},

RerouteMessageResponse {
typ: i32,
propose_response: raft_service::Empty,
conf_change_response: Option<raft_service::ChangeConfigResponse>,
},
// Rerouting available
Propose {
result: ResponseResult,
},
ConfigChange {
result: ConfChangeResponseResult,
},
RequestId {
result: RequestIdResponseResult,
},
}

pub enum LocalResponseMsg<LogEntry: AbstractLogEntry, FSM: AbstractStateMachine> {
Expand All @@ -77,13 +100,15 @@ pub enum LocalResponseMsg<LogEntry: AbstractLogEntry, FSM: AbstractStateMachine>
Store { store: FSM },
Storage { storage: HeedStorage },
GetClusterSize { size: usize },
ChangeConfig { result: ConfChangeResponseResult },
Quit {},
MakeSnapshot {},
Propose {},
DebugNode { result_json: String },
JoinCluster {},
SendMessage {},
DebugNode { result_json: String },

// Rerouting available
Propose { result: ResponseResult },
ChangeConfig { result: ConfChangeResponseResult },
_Phantom(PhantomData<LogEntry>),
}

Expand Down

0 comments on commit 38ca8a3

Please sign in to comment.