Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
jopemachine committed Feb 1, 2024
1 parent 38ca8a3 commit ac86d8d
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 88 deletions.
1 change: 1 addition & 0 deletions harness/src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ pub async fn build_raft_cluster(peers: Peers) -> Result<()> {
Ok(())
}

// Note that this function lock RAFTS, so it should not be called while holding RAFTS lock.
pub async fn spawn_extra_node(raft_addr: &str, peer_addr: &str) -> Result<JoinHandle<Result<()>>> {
let logger = Arc::new(Slogger {
slog: build_logger(),
Expand Down
10 changes: 8 additions & 2 deletions harness/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ pub async fn load_peers(example_filename: &str) -> Result<Peers, Box<dyn std::er
}

pub async fn wait_for_until_cluster_size_increase(raft: Raft, target: usize) {
println!("Waiting for cluster size to increase to {}...", target);
raft.logger.debug(&format!(
"Waiting for cluster size to increase to... {}",
target
));

loop {
let size = raft.cluster_size().await;
Expand All @@ -80,7 +83,10 @@ pub async fn wait_for_until_cluster_size_increase(raft: Raft, target: usize) {
}

pub async fn wait_for_until_cluster_size_decrease(raft: Raft, target: usize) {
println!("Waiting for cluster size to decrease to {}...", target);
raft.logger.debug(&format!(
"Waiting for cluster size to decrease to {}...",
target
));

loop {
let size = raft.cluster_size().await;
Expand Down
109 changes: 57 additions & 52 deletions harness/tests/data_replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,68 +15,73 @@ pub async fn test_data_replication() {
let _raft_tasks = tokio::spawn(build_raft_cluster(peers.clone()));
sleep(Duration::from_secs(5)).await;

let mut rafts = RAFTS.lock().unwrap();
let raft_1 = rafts.get(&1).unwrap();

wait_for_until_cluster_size_increase(raft_1.clone(), 3).await;

let entry = LogEntry::Insert {
key: 1,
value: "test".to_string(),
}
.encode()
.unwrap();

raft_1.raft_node.propose(entry).await;

sleep(Duration::from_secs(1)).await;

// Data should be replicated to all nodes.
for (_, raft) in rafts.iter_mut() {
let store = raft.raft_node.store().await;
let store_lk = store.0.read().unwrap();
assert_eq!(store_lk.get(&1).unwrap(), "test");
{
let mut rafts = RAFTS.lock().unwrap();
let raft_1 = rafts.get(&1).unwrap();
wait_for_until_cluster_size_increase(raft_1.clone(), 3).await;

let entry = LogEntry::Insert {
key: 1,
value: "test".to_string(),
}
.encode()
.unwrap();

raft_1.raft_node.propose(entry).await.unwrap();

sleep(Duration::from_secs(1)).await;
// Data should be replicated to all nodes.
for (_, raft) in rafts.iter_mut() {
let store = raft.raft_node.store().await;
let store_lk = store.0.read().unwrap();
assert_eq!(store_lk.get(&1).unwrap(), "test");
}

sleep(Duration::from_secs(1)).await;
}

sleep(Duration::from_secs(1)).await;

// TODO: This assumes that the leader is node 1, and it is not true anymore, so we requires to implement rerouting logic.
tokio::spawn(spawn_extra_node("127.0.0.1:60064", RAFT_ADDRS[0]));
sleep(Duration::from_secs(1)).await;

let rafts = RAFTS.lock().unwrap();
let raft_1 = rafts.get(&1).unwrap();
wait_for_until_cluster_size_increase(raft_1.clone(), 4).await;
let raft_4 = rafts.get(&4).unwrap();
let store = raft_4.raft_node.store().await;
let store_lk = store.0.read().unwrap();

// Data should be replicated to new joined node.
assert_eq!(store_lk.get(&1).unwrap(), "test");

let mut rafts = RAFTS.lock().unwrap();
let raft_1 = rafts.get(&1).unwrap();
{
let rafts = RAFTS.lock().unwrap();
let raft_1 = rafts.get(&1).unwrap();
wait_for_until_cluster_size_increase(raft_1.clone(), 4).await;
let raft_4 = rafts.get(&4).unwrap();
let store = raft_4.raft_node.store().await;
let store_lk = store.0.read().unwrap();

let new_entry = LogEntry::Insert {
key: 2,
value: "test2".to_string(),
// Data should be replicated to new joined node.
assert_eq!(store_lk.get(&1).unwrap(), "test");
}
.encode()
.unwrap();

raft_1.raft_node.propose(new_entry).await;

sleep(Duration::from_secs(1)).await;

// New entry data should be replicated to all nodes including new joined node.
for (_, raft) in rafts.iter_mut() {
let store = raft.raft_node.store().await;
let store_lk = store.0.read().unwrap();
assert_eq!(store_lk.get(&2).unwrap(), "test2");
{
let mut rafts = RAFTS.lock().unwrap();
let raft_1 = rafts.get(&1).unwrap();

let new_entry = LogEntry::Insert {
key: 2,
value: "test2".to_string(),
}
.encode()
.unwrap();

raft_1.raft_node.propose(new_entry).await.unwrap();
sleep(Duration::from_secs(1)).await;

// New entry data should be replicated to all nodes including new joined node.
for (_, raft) in rafts.iter_mut() {
let store = raft.raft_node.store().await;
let store_lk = store.0.read().unwrap();
assert_eq!(store_lk.get(&2).unwrap(), "test2");
}
}

let mut rafts = RAFTS.lock().unwrap();
for (_, raft) in rafts.iter_mut() {
raft.raft_node.quit().await;
{
let mut rafts = RAFTS.lock().unwrap();
for (_, raft) in rafts.iter_mut() {
raft.raft_node.quit().await;
}
}
}
3 changes: 1 addition & 2 deletions harness/tests/leader_election.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ use harness::{
pub async fn test_leader_election_in_three_node_example() {
let peers = load_peers(THREE_NODE_EXAMPLE).await.unwrap();
let _raft_tasks = tokio::spawn(build_raft_cluster(peers.clone()));

sleep(Duration::from_secs(1)).await;
sleep(Duration::from_secs(5)).await;

let mut rafts = RAFTS.lock().unwrap();
let raft_1 = rafts.get_mut(&1).unwrap();
Expand Down
4 changes: 3 additions & 1 deletion raftify/proto/raft_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ enum ChangeConfigResultType {

message ChangeConfigResponse {
ChangeConfigResultType result_type = 1;
bytes data = 2;
uint64 assigned_id = 2; // Used in JoinSuccess
bytes peers = 3; // Used in JoinSuccess
bytes error = 4; // Used in Handling error
}

message DebugNodeResponse {
Expand Down
63 changes: 42 additions & 21 deletions raftify/src/raft_node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ impl<
match resp {
LocalResponseMsg::Propose { result } => match result {
ResponseResult::Success => (),
ResponseResult::Error(_) => (),
ResponseResult::Error(e) => return Err(e),
ResponseResult::WrongLeader {
leader_id,
leader_addr,
Expand All @@ -255,57 +255,79 @@ impl<
Ok(())
}

pub async fn get_cluster_size(&self) -> usize {
pub async fn change_config(&self, conf_change: ConfChangeV2) -> ConfChangeResponseResult {
let (tx, rx) = oneshot::channel();
self.local_sender
.send(LocalRequestMsg::GetClusterSize { chan: tx })
.send(LocalRequestMsg::ChangeConfig {
conf_change: conf_change.clone(),
chan: tx,
})
.await
.unwrap();
let resp = rx.await.unwrap();
match resp {
LocalResponseMsg::GetClusterSize { size } => size,
LocalResponseMsg::ChangeConfig { result } => match result {
ConfChangeResponseResult::WrongLeader {
leader_id,
leader_addr,
} => {
let mut client = create_client(leader_addr).await.unwrap();
let res = client.change_config(conf_change.clone()).await.unwrap();

let result = res.into_inner();

if result.result_type
== raft_service::ChangeConfigResultType::ChangeConfigSuccess as i32
{
ConfChangeResponseResult::JoinSuccess {
assigned_id: result.assigned_id,
peers: deserialize(result.peers.as_slice()).unwrap(),
}
} else {
ConfChangeResponseResult::Error(Error::Unknown)
}
}
_ => result,
},
_ => unreachable!(),
}
}

pub async fn quit(&self) {
pub async fn get_cluster_size(&self) -> usize {
let (tx, rx) = oneshot::channel();
self.local_sender
.send(LocalRequestMsg::Quit { chan: tx })
.send(LocalRequestMsg::GetClusterSize { chan: tx })
.await
.unwrap();
let resp = rx.await.unwrap();
match resp {
LocalResponseMsg::Quit {} => (),
LocalResponseMsg::GetClusterSize { size } => size,
_ => unreachable!(),
}
}

pub async fn leave(&self) {
pub async fn quit(&self) {
let (tx, rx) = oneshot::channel();
self.local_sender
.send(LocalRequestMsg::Leave { chan: tx })
.send(LocalRequestMsg::Quit { chan: tx })
.await
.unwrap();
let resp = rx.await.unwrap();
match resp {
LocalResponseMsg::ChangeConfig { result: _result } => (),
LocalResponseMsg::Quit {} => (),
_ => unreachable!(),
}
}

pub async fn change_config(&self, conf_change: ConfChangeV2) -> ConfChangeResponseResult {
pub async fn leave(&self) {
let (tx, rx) = oneshot::channel();
self.local_sender
.send(LocalRequestMsg::ChangeConfig {
conf_change,
chan: tx,
})
.send(LocalRequestMsg::Leave { chan: tx })
.await
.unwrap();
let resp = rx.await.unwrap();
match resp {
LocalResponseMsg::ChangeConfig { result } => result,
LocalResponseMsg::ChangeConfig { result: _result } => (),
_ => unreachable!(),
}
}
Expand Down Expand Up @@ -613,11 +635,6 @@ impl<
.into_inner();

match response.result_type() {
ChangeConfigResultType::ChangeConfigWrongLeader => {
// TODO: Handle this
// response.data();
continue;
}
ChangeConfigResultType::ChangeConfigSuccess => break Ok(()),
ChangeConfigResultType::ChangeConfigUnknownError => return Err(Error::JoinError),
ChangeConfigResultType::ChangeConfigRejected => {
Expand All @@ -626,6 +643,10 @@ impl<
ChangeConfigResultType::ChangeConfigTimeoutError => {
return Err(Error::Timeout);
}
ChangeConfigResultType::ChangeConfigWrongLeader => {
// Should be handled in RaftServiceClient
unreachable!()
}
}
}
}
Expand Down
58 changes: 48 additions & 10 deletions raftify/src/raft_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ use super::{
use crate::{
create_client,
raft::eraftpb::{ConfChangeV2, Message as RaftMessage},
raft_service::{ProposeArgs},
response_message::ResponseResult,
RaftServiceClient,
raft_service::ProposeArgs,
response_message::{ConfChangeResponseResult, ResponseResult},
};

#[derive(Clone)]
Expand Down Expand Up @@ -143,7 +142,7 @@ impl RaftService for RaftServer {
let (tx, rx) = oneshot::channel();

let message = ServerRequestMsg::ChangeConfig {
conf_change: request_args,
conf_change: request_args.clone(),
chan: tx,
};

Expand All @@ -161,16 +160,51 @@ impl RaftService for RaftServer {
)
.await
{
Ok(Ok(_raft_response)) => {
Ok(Ok(raft_response)) => {
match raft_response {
ServerResponseMsg::ConfigChange { result } => match result {
ConfChangeResponseResult::JoinSuccess { assigned_id, peers } => {
reply.result_type =
raft_service::ChangeConfigResultType::ChangeConfigSuccess as i32;
reply.assigned_id = assigned_id;
reply.peers = serialize(&peers).unwrap();
}
ConfChangeResponseResult::RemoveSuccess {} => {
reply.result_type =
raft_service::ChangeConfigResultType::ChangeConfigSuccess as i32;
}
ConfChangeResponseResult::Error(e) => {
reply.result_type =
raft_service::ChangeConfigResultType::ChangeConfigUnknownError
as i32;
reply.error = e.to_string().as_bytes().to_vec();
}
ConfChangeResponseResult::WrongLeader {
leader_id,
leader_addr,
} => {
reply.result_type =
raft_service::ChangeConfigResultType::ChangeConfigWrongLeader
as i32;

let mut client = create_client(leader_addr).await.unwrap();
reply = client.change_config(request_args).await?.into_inner();
}
},
_ => unreachable!(),
}
reply.result_type =
raft_service::ChangeConfigResultType::ChangeConfigSuccess as i32;
reply.data = vec![];
}
Ok(_) => (),
Err(_e) => {
Ok(Err(e)) => {
reply.result_type =
raft_service::ChangeConfigResultType::ChangeConfigUnknownError as i32;
reply.error = e.to_string().as_bytes().to_vec();
}
Err(e) => {
reply.result_type =
raft_service::ChangeConfigResultType::ChangeConfigTimeoutError as i32;
reply.data = vec![];
reply.error = e.to_string().as_bytes().to_vec();
self.logger.error("timeout waiting for reply");
}
}
Expand Down Expand Up @@ -228,7 +262,11 @@ impl RaftService for RaftServer {
} => {
// TODO: Handle this kind of errors
let mut client = create_client(leader_addr).await.unwrap();
let _ = client.propose(ProposeArgs {msg: request_args.msg}).await?;
let _ = client
.propose(ProposeArgs {
msg: request_args.msg,
})
.await?;

Ok(Response::new(raft_service::Empty {}))
}
Expand Down

0 comments on commit ac86d8d

Please sign in to comment.