Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
jopemachine committed Sep 30, 2024
1 parent bad8c80 commit 077d01f
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 15 deletions.
5 changes: 3 additions & 2 deletions examples/memstore/dynamic-members/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
let options = Options::from_args();
let store = HashStore::new();

let mut cfg = build_config();

let (raft, raft_handle) = match options.peer_addr {
Some(peer_addr) => {
log::info!("Running in Follower mode");
Expand All @@ -76,6 +74,8 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
.await
.unwrap();
let node_id = ticket.reserved_id;
let mut cfg = build_config(node_id);

cfg.initial_peers = Some(ticket.peers.clone().into());

let storage_pth = get_storage_path(cfg.log_dir.as_str(), node_id);
Expand Down Expand Up @@ -106,6 +106,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
None => {
log::info!("Bootstrap a Raft Cluster");

let cfg: raftify::Config = build_config(1);
let storage_pth = get_storage_path(cfg.log_dir.as_str(), 1);
ensure_directory_exist(storage_pth.as_str())?;

Expand Down
5 changes: 4 additions & 1 deletion examples/memstore/src/web_server_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,10 @@ async fn snapshot(data: web::Data<(HashStore, Raft)>) -> impl Responder {
.hard_state()
.expect("Failed to get hard state");

raft.1.make_snapshot(last_index, hard_state.term).await;
raft.1
.make_snapshot(last_index, hard_state.term)
.await
.expect("Failed to make snapshot");
"OK".to_string()
}
}
Expand Down
7 changes: 4 additions & 3 deletions examples/memstore/static-members/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ struct Options {
raft_addr: String,
#[structopt(long)]
web_server: Option<String>,
// TODO: Make "bootstrap_from_snapshot" option here
}

#[actix_rt::main]
Expand Down Expand Up @@ -66,13 +67,13 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
let store = HashStore::new();
let initial_peers = load_peers("cluster_config.toml").await?;

let mut cfg = build_config();
cfg.initial_peers = Some(initial_peers.clone());

let node_id = initial_peers
.get_node_id_by_addr(options.raft_addr.clone())
.unwrap();

let mut cfg = build_config(node_id);
cfg.initial_peers = Some(initial_peers.clone());

let storage_pth = get_storage_path(cfg.log_dir.as_str(), node_id);
ensure_directory_exist(storage_pth.as_str())?;

Expand Down
3 changes: 2 additions & 1 deletion examples/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use raftify::{Config, RaftConfig};

pub fn build_config() -> Config {
pub fn build_config(node_id: u64) -> Config {
let raft_config = RaftConfig {
id: node_id,
election_tick: 10,
heartbeat_tick: 3,
..Default::default()
Expand Down
3 changes: 3 additions & 0 deletions raftify/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ pub enum Error {
#[error("Shut down by Ctrl+C signal")]
CtrlC,

#[error("RaftNode LocalRequestMsg Receive error: {0}")]
RecvError(#[from] tokio::sync::oneshot::error::RecvError),

#[error("Encoding error")]
EncodingError(String),
#[error("Decoding error")]
Expand Down
31 changes: 23 additions & 8 deletions raftify/src/raft_node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ impl<
}
}

pub async fn make_snapshot(&self, index: u64, term: u64) {
pub async fn make_snapshot(&self, index: u64, term: u64) -> Result<()> {
let (tx, rx) = oneshot::channel();
self.tx_local
.send(LocalRequestMsg::MakeSnapshot {
Expand All @@ -424,9 +424,9 @@ impl<
})
.await
.unwrap();
let resp = rx.await.unwrap();
let resp = rx.await?;
match resp {
LocalResponseMsg::MakeSnapshot {} => (),
LocalResponseMsg::MakeSnapshot {} => Ok(()),
_ => unreachable!(),
}
}
Expand Down Expand Up @@ -516,15 +516,17 @@ impl<
should_be_leader: bool,
mut log_storage: LogStorage,
fsm: FSM,
mut config: Config,
config: Config,
raft_addr: SocketAddr,
logger: Arc<dyn Logger>,
tx_server: mpsc::Sender<ServerRequestMsg<LogEntry, LogStorage, FSM>>,
rx_server: mpsc::Receiver<ServerRequestMsg<LogEntry, LogStorage, FSM>>,
tx_local: mpsc::Sender<LocalRequestMsg<LogEntry, LogStorage, FSM>>,
rx_local: mpsc::Receiver<LocalRequestMsg<LogEntry, LogStorage, FSM>>,
) -> Result<Self> {
config.raft_config.id = node_id;
assert_eq!(config.raft_config.id, node_id);
println!("node_id: {:?}", node_id);
println!("config: {:?}", config.raft_config.id);
config.validate()?;

let mut snapshot = log_storage.snapshot(0, log_storage.last_index()?)?;
Expand Down Expand Up @@ -559,8 +561,14 @@ impl<
conf_state.set_learners(learners);
}

if last_idx == 0 || config.bootstrap_from_snapshot {
if last_idx == 0 {
logger.info("Bootstrapping cluster init...");
log_storage.apply_snapshot(snapshot)?;
} else if config.bootstrap_from_snapshot {
logger.info("Bootstrapping from snapshot...");
log_storage.apply_snapshot(snapshot)?;
} else {
logger.info("Bootstrapping from existing logs...");
}

let mut raw_node = RawNode::new(&config.raft_config, log_storage.clone(), logger.clone())?;
Expand Down Expand Up @@ -1103,8 +1111,8 @@ impl<
})
.unwrap();

#[cfg(feature = "inmemory_storage")]
todo!("Implement this for inmemory storage");
// #[cfg(feature = "inmemory_storage")]
// todo!("Implement LocalRequestMsg::GetStorage request handler for inmemory storage");
}
LocalRequestMsg::DebugNode { tx_msg } => {
tx_msg
Expand Down Expand Up @@ -1174,7 +1182,14 @@ impl<
term,
tx_msg,
} => {
// println!("Make snapshot!! 2 2");
// let r = self.make_snapshot(index, term).await;
// println!("Make snapshot!! 2 3 r: {:?}", r);
// if let Err(e) = r {
// return Err(e);
// }
self.make_snapshot(index, term).await?;
println!("Make snapshot!! 2 3");
tx_msg.send(LocalResponseMsg::MakeSnapshot {}).unwrap();
}
LocalRequestMsg::JoinCluster { tickets, tx_msg } => {
Expand Down

0 comments on commit 077d01f

Please sign in to comment.