Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
jopemachine committed Feb 1, 2024
1 parent 4038a03 commit 0906932
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 46 deletions.
13 changes: 6 additions & 7 deletions harness/tests/data_replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ pub async fn test_data_replication() {
let mut rafts = RAFTS.lock().unwrap();
let raft_1 = rafts.get(&1).unwrap();
wait_for_until_cluster_size_increase(raft_1.clone(), 3).await;

let entry = LogEntry::Insert {
key: 1,
value: "test".to_string(),
}
.encode()
.unwrap();

raft_1.raft_node.propose(entry).await.unwrap();

sleep(Duration::from_secs(1)).await;
// Data should be replicated to all nodes.
for (_, raft) in rafts.iter_mut() {
Expand All @@ -40,7 +40,6 @@ pub async fn test_data_replication() {
sleep(Duration::from_secs(1)).await;
}

// TODO: This assumes that the leader is node 1, and it is not true anymore, so we requires to implement rerouting logic.
tokio::spawn(spawn_extra_node("127.0.0.1:60064", RAFT_ADDRS[0]));
sleep(Duration::from_secs(1)).await;

Expand All @@ -59,17 +58,17 @@ pub async fn test_data_replication() {
{
let mut rafts = RAFTS.lock().unwrap();
let raft_1 = rafts.get(&1).unwrap();

let new_entry = LogEntry::Insert {
key: 2,
value: "test2".to_string(),
}
.encode()
.unwrap();

raft_1.raft_node.propose(new_entry).await.unwrap();
sleep(Duration::from_secs(1)).await;

// New entry data should be replicated to all nodes including new joined node.
for (_, raft) in rafts.iter_mut() {
let store = raft.raft_node.store().await;
Expand Down
1 change: 1 addition & 0 deletions raftify/proto/raft_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ message RequestIdResponse{
string leader_addr = 3;
uint64 reserved_id = 4;
bytes peers = 5;
bytes error = 6;
}

// Config Change
Expand Down
37 changes: 11 additions & 26 deletions raftify/src/raft_node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,14 +235,15 @@ impl<
})
.await
.unwrap();

let resp = rx.await.unwrap();
match resp {
LocalResponseMsg::Propose { result } => match result {
ResponseResult::Success => (),
ResponseResult::Error(e) => return Err(e),
ResponseResult::WrongLeader {
leader_id,
leader_addr,
..
} => {
let mut client = create_client(leader_addr).await?;
client
Expand All @@ -264,12 +265,13 @@ impl<
})
.await
.unwrap();

let resp = rx.await.unwrap();
match resp {
LocalResponseMsg::ChangeConfig { result } => match result {
LocalResponseMsg::ConfigChange { result } => match result {
ConfChangeResponseResult::WrongLeader {
leader_id,
leader_addr,
..
} => {
let mut client = create_client(leader_addr).await.unwrap();
let res = client.change_config(conf_change.clone()).await.unwrap();
Expand Down Expand Up @@ -299,6 +301,7 @@ impl<
.send(LocalRequestMsg::GetClusterSize { chan: tx })
.await
.unwrap();

let resp = rx.await.unwrap();
match resp {
LocalResponseMsg::GetClusterSize { size } => size,
Expand Down Expand Up @@ -327,7 +330,7 @@ impl<
.unwrap();
let resp = rx.await.unwrap();
match resp {
LocalResponseMsg::ChangeConfig { result: _result } => (),
LocalResponseMsg::ConfigChange { result: _result } => (),
_ => unreachable!(),
}
}
Expand Down Expand Up @@ -786,7 +789,7 @@ impl<
match sender {
ResponseSender::Local(sender) => {
sender
.send(LocalResponseMsg::ChangeConfig { result: response })
.send(LocalResponseMsg::ConfigChange { result: response })
.unwrap();
}
ResponseSender::Server(sender) => {
Expand Down Expand Up @@ -876,17 +879,17 @@ impl<
let peers = self.peers.lock().await;
let leader_addr = peers.get(&leader_id).unwrap().addr.to_string();

let result = ConfChangeResponseResult::WrongLeader {
let wrong_leader_result = ConfChangeResponseResult::WrongLeader {
leader_id,
leader_addr,
};

match chan {
ResponseSender::Local(chan) => chan
.send(LocalResponseMsg::ChangeConfig { result })
.send(LocalResponseMsg::ConfigChange { result: wrong_leader_result })
.unwrap(),
ResponseSender::Server(chan) => chan
.send(ServerResponseMsg::ConfigChange { result })
.send(ServerResponseMsg::ConfigChange { result: wrong_leader_result })
.unwrap(),
}
} else {
Expand Down Expand Up @@ -1030,23 +1033,6 @@ impl<
Ok(())
}

pub async fn handle_rerouting_msg(&mut self, message: ServerRequestMsg) -> Result<()> {
match message {
ServerRequestMsg::ChangeConfig { conf_change, chan } => {
self.handle_confchange_request(conf_change, ResponseSender::Server(chan))
.await?;
}
ServerRequestMsg::Propose { proposal, chan } => {
self.handle_propose_request(proposal, ResponseSender::Server(chan))
.await?;
}
_ => {
unimplemented!()
}
}
Ok(())
}

pub async fn handle_server_request_msg(&mut self, message: ServerRequestMsg) -> Result<()> {
match message {
ServerRequestMsg::ChangeConfig { conf_change, chan } => {
Expand All @@ -1068,7 +1054,6 @@ impl<
}
ServerRequestMsg::RequestId { raft_addr, chan } => {
if !self.is_leader() {
// TODO: retry strategy in case of failure
let leader_id = self.get_leader_id();
let peers = self.peers.lock().await;
let leader_addr = peers.get(&leader_id).unwrap().addr.to_string();
Expand Down
30 changes: 18 additions & 12 deletions raftify/src/raft_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl RaftService for RaftServer {
let (tx, rx) = oneshot::channel();
sender
.send(ServerRequestMsg::RequestId {
raft_addr: request_args.raft_addr,
raft_addr: request_args.raft_addr.clone(),
chan: tx,
})
.await
Expand All @@ -116,18 +116,24 @@ impl RaftService for RaftServer {
reserved_id,
leader_addr: self.addr.to_string(),
peers: serialize(&peers).unwrap(),
..Default::default()
})),
RequestIdResponseResult::Error(e) => {
Ok(Response::new(raft_service::RequestIdResponse {
code: raft_service::ResultCode::Error as i32,
error: e.to_string().as_bytes().to_vec(),
..Default::default()
}))
}
RequestIdResponseResult::WrongLeader {
leader_id,
leader_addr,
} => Ok(Response::new(raft_service::RequestIdResponse {
code: raft_service::ResultCode::WrongLeader as i32,
leader_id,
leader_addr,
reserved_id: 0,
peers: vec![],
})),
_ => unreachable!(),
..
} => {
let mut client = create_client(leader_addr).await.unwrap();
let reply = client.request_id(request_args).await?.into_inner();

Ok(Response::new(reply))
},
},
_ => unreachable!(),
}
Expand Down Expand Up @@ -180,8 +186,8 @@ impl RaftService for RaftServer {
reply.error = e.to_string().as_bytes().to_vec();
}
ConfChangeResponseResult::WrongLeader {
leader_id,
leader_addr,
..
} => {
reply.result_type =
raft_service::ChangeConfigResultType::ChangeConfigWrongLeader
Expand Down Expand Up @@ -257,8 +263,8 @@ impl RaftService for RaftServer {
ResponseResult::Success => Ok(Response::new(raft_service::Empty {})),
ResponseResult::Error(_) => Ok(Response::new(raft_service::Empty {})),
ResponseResult::WrongLeader {
leader_id,
leader_addr,
..
} => {
// TODO: Handle this kind of errors
let mut client = create_client(leader_addr).await.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion raftify/src/response_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ pub enum LocalResponseMsg<LogEntry: AbstractLogEntry, FSM: AbstractStateMachine>

// Rerouting available
Propose { result: ResponseResult },
ChangeConfig { result: ConfChangeResponseResult },
ConfigChange { result: ConfChangeResponseResult },
_Phantom(PhantomData<LogEntry>),
}

Expand Down

0 comments on commit 0906932

Please sign in to comment.