Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
jopemachine committed Jan 18, 2024
1 parent 9f07e9b commit d54a723
Show file tree
Hide file tree
Showing 18 changed files with 351 additions and 99 deletions.
33 changes: 20 additions & 13 deletions examples/memstore/dynamic-members/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::sync::Arc;
use example_harness::config::build_config;
use memstore_example_harness::{
state_machine::{HashStore, LogEntry},
web_server_api::{debug, get, leader_id, leave, put},
web_server_api::{debug, get, leader_id, leave, peers, put},
};
use raftify::{
raft::{formatter::set_custom_formatter, logger::Slogger},
Expand All @@ -32,6 +32,8 @@ struct Options {
peer_addr: Option<String>,
#[structopt(long)]
web_server: Option<String>,
#[structopt(long)]
restore_wal_from: Option<u64>,
}

#[actix_rt::main]
Expand Down Expand Up @@ -62,40 +64,44 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
let options = Options::from_args();
let store = HashStore::new();

let cfg = build_config();
let mut cfg = build_config();
cfg.restore_wal_from = options.restore_wal_from;

let (raft, raft_handle) = match options.peer_addr {
Some(peer_addr) => {
log::info!("Running in Follower mode");

let ticket = Raft::request_id(options.raft_addr.clone(), peer_addr.clone(), logger.clone())
.await
.unwrap();
let ticket =
Raft::request_id(options.raft_addr.clone(), peer_addr.clone(), logger.clone())
.await
.unwrap();
let node_id = ticket.reserved_id;

let raft = Raft::build(
let raft = Raft::new_follower(
node_id,
options.raft_addr,
store.clone(),
cfg,
logger.clone(),
cfg.clone(),
None,
logger.clone(),
)?;

let handle = tokio::spawn(raft.clone().run());
raft.join(ticket).await;
raft.raft_node.add_peers(ticket.peers.clone()).await;

if cfg.restore_wal_from.is_none() {
raft.join(ticket).await;
}
(raft, handle)
}
None => {
log::info!("Bootstrap a Raft Cluster");
let node_id = 1;
let raft = Raft::build(
node_id,
let raft = Raft::bootstrap_cluster(
options.raft_addr,
store.clone(),
cfg,
logger.clone(),
None,
logger.clone(),
)?;
let handle = tokio::spawn(raft.clone().run());
(raft, handle)
Expand All @@ -111,6 +117,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
.service(get)
.service(leave)
.service(debug)
.service(peers)
.service(leader_id)
})
.bind(addr)
Expand Down
7 changes: 7 additions & 0 deletions examples/memstore/src/web_server_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,10 @@ async fn debug(data: web::Data<(HashStore, Raft)>) -> impl Responder {
let parsed: HashMap<String, Value> = serde_json::from_str(&json).unwrap();
format!("{:?}", parsed)
}

#[get("/peers")]
async fn peers(data: web::Data<(HashStore, Raft)>) -> impl Responder {
let raft = data.clone();
let peers = raft.1.raft_node.clone().get_peers().await;
format!("{:?}", peers)
}
19 changes: 9 additions & 10 deletions examples/memstore/static-members/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use structopt::StructOpt;
use example_harness::config::build_config;
use memstore_example_harness::{
state_machine::{HashStore, LogEntry},
web_server_api::{debug, get, leader_id, leave, put},
web_server_api::{debug, get, leader_id, leave, peers, put},
};
use memstore_static_members::utils::load_peers;

Expand Down Expand Up @@ -62,44 +62,42 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {

let options = Options::from_args();
let store = HashStore::new();
let peers = load_peers().await?;
let initial_peers = load_peers().await?;

let cfg = build_config();

let (raft, raft_handle) = match options.peer_addr {
Some(_peer_addr) => {
log::info!("Running in Follower mode");

let node_id = peers
let node_id = initial_peers
.get_node_id_by_addr(options.raft_addr.clone())
.unwrap();

let raft = Raft::build(
let raft = Raft::new_follower(
node_id,
options.raft_addr,
store.clone(),
cfg,
Some(initial_peers.clone()),
logger.clone(),
Some(peers.clone()),
)?;

let handle = tokio::spawn(raft.clone().run());

let leader_addr = peers.get(&1).unwrap().addr;
let leader_addr = initial_peers.get(&1).unwrap().addr;
Raft::member_bootstrap_ready(leader_addr, node_id, logger).await?;

(raft, handle)
}
None => {
log::info!("Bootstrap a Raft Cluster");
let node_id = 1;
let raft = Raft::build(
node_id,
let raft = Raft::bootstrap_cluster(
options.raft_addr,
store.clone(),
cfg,
Some(initial_peers),
logger.clone(),
Some(peers),
)?;
let handle = tokio::spawn(raft.clone().run());
(raft, handle)
Expand All @@ -115,6 +113,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
.service(get)
.service(leave)
.service(debug)
.service(peers)
.service(leader_id)
})
.bind(addr)
Expand Down
35 changes: 23 additions & 12 deletions harness/src/raft_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,27 @@ fn run_raft(node_id: &u64, peers: Peers) -> Result<JoinHandle<Result<()>>> {
let store = HashStore::new();
let logger = build_logger();

let raft = Raft::build(
*node_id,
peer.addr,
store,
cfg,
Arc::new(Slogger {
slog: logger.clone(),
}),
Some(peers.clone()),
)
let raft = match node_id {
1 => Raft::bootstrap_cluster(
peer.addr,
store,
cfg,
Some(peers.clone()),
Arc::new(Slogger {
slog: logger.clone(),
}),
),
_ => Raft::new_follower(
*node_id,
peer.addr,
store,
cfg,
Some(peers.clone()),
Arc::new(Slogger {
slog: logger.clone(),
}),
),
}
.expect("Raft build failed!");

RAFTS.lock().unwrap().insert(*node_id, raft.clone());
Expand Down Expand Up @@ -99,8 +110,8 @@ pub async fn spawn_extra_node(peer_addr: &str, raft_addr: &str) -> Result<JoinHa
let cfg = build_config();
let store = HashStore::new();

let raft =
Raft::build(node_id, raft_addr, store, cfg, logger, None).expect("Raft build failed!");
let raft = Raft::new_follower(node_id, raft_addr, store, cfg, None, logger)
.expect("Raft build failed!");

RAFTS.lock().unwrap().insert(node_id, raft.clone());

Expand Down
7 changes: 3 additions & 4 deletions raftify/src/cli/commands/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ pub fn debug_persisted(path: &str, logger: slog::Logger) -> Result<()> {
Arc::new(Slogger {
slog: logger.clone(),
}),
)
.unwrap();
)?;

let entries = storage.all_entries()?;

Expand All @@ -42,8 +41,8 @@ pub fn debug_persisted(path: &str, logger: slog::Logger) -> Result<()> {
}

pub async fn debug_node(addr: &str) -> Result<()> {
let mut client = create_client(&addr).await.unwrap();
let response = client.debug_node(raft_service::Empty {}).await.unwrap();
let mut client = create_client(&addr).await?;
let response = client.debug_node(raft_service::Empty {}).await?;
let json = response.into_inner().result_json;
let parsed: HashMap<String, Value> = serde_json::from_str(&json).unwrap();

Expand Down
122 changes: 122 additions & 0 deletions raftify/src/cli/commands/dump.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
use bincode::{deserialize, serialize};
use jopemachine_raft::{
eraftpb::{ConfChange, ConfChangeSingle, ConfChangeType, ConfChangeV2, Entry, EntryType},
logger::Slogger,
Storage,
};
use prost::Message as _;
use std::{collections::HashMap, net::SocketAddr, sync::Arc};

use crate::{utils::to_confchange_v2, Config, HeedStorage, LogStore, Result};

pub async fn dump_peers(
path: &str,
peers: HashMap<u64, SocketAddr>,
logger: slog::Logger,
) -> Result<()> {
let config = Config {
log_dir: path.to_string(),
..Default::default()
};

let mut storage = HeedStorage::create(
config.log_dir.as_str(),
&config,
Arc::new(Slogger {
slog: logger.clone(),
}),
)?;

let entries = storage.all_entries()?;

let mut persisted_peers_configs: HashMap<u64, SocketAddr> = HashMap::new();

for (_, entry) in entries.iter().enumerate() {
let conf_change_v2 = match entry.get_entry_type() {
EntryType::EntryConfChange => to_confchange_v2(ConfChange::decode(entry.get_data())?),
EntryType::EntryConfChangeV2 => ConfChangeV2::decode(entry.get_data())?,
_ => continue,
};

let conf_changes = conf_change_v2.get_changes();
let addrs: Vec<SocketAddr> = deserialize(conf_change_v2.get_context())?;

for (cc_idx, conf_change) in conf_changes.iter().enumerate() {
let addr = addrs[cc_idx];

let node_id = conf_change.get_node_id();
let change_type = conf_change.get_change_type();

match change_type {
ConfChangeType::AddNode | ConfChangeType::AddLearnerNode => {
persisted_peers_configs.insert(node_id, addr);
}
ConfChangeType::RemoveNode => {
persisted_peers_configs.remove(&node_id);
}
}
}
}

// old - new = to be removed
let diffs_to_remove: HashMap<_, _> = persisted_peers_configs
.iter()
.filter(|(k, v)| peers.get(k) != Some(v))
.collect();

// new - old = to be added
let diffs_to_add: HashMap<_, _> = peers
.iter()
.filter(|(k, v)| persisted_peers_configs.get(k) != Some(v))
.collect();

let mut new_cc_v2 = ConfChangeV2::default();
let mut conf_changes: Vec<ConfChangeSingle> = vec![];
let mut cc_addrs: Vec<SocketAddr> = vec![];

for (k, v) in diffs_to_remove.into_iter() {
let mut cs = ConfChangeSingle::default();
cs.set_node_id(*k as u64);
cs.set_change_type(ConfChangeType::RemoveNode);
conf_changes.push(cs);
cc_addrs.push(*v);
}

// TODO: Support AddLearnerNode
for (k, v) in diffs_to_add.into_iter() {
let mut cs = ConfChangeSingle::default();
cs.set_node_id(*k as u64);
cs.set_change_type(ConfChangeType::AddNode);
conf_changes.push(cs);
cc_addrs.push(*v);
}

if conf_changes.len() >= 1 {
new_cc_v2.set_context(serialize(&cc_addrs)?);
new_cc_v2.set_changes(conf_changes);

let last_idx = LogStore::last_index(&storage)?;
let last_term = storage.term(last_idx)?;

let mut new_entry = Entry::default();
new_entry.set_index(last_idx + 1);
new_entry.set_term(last_term);
new_entry.set_entry_type(EntryType::EntryConfChangeV2);
new_entry.set_data(new_cc_v2.encode_to_vec());
new_entry.set_context(vec![]);

storage.append(vec![new_entry].as_slice())?;

let mut snapshot = LogStore::snapshot(&storage, 0, last_idx)?;
let mut meta = snapshot.get_metadata().clone();
meta.set_index(last_idx + 1);
snapshot.set_metadata(meta);
storage.apply_snapshot(snapshot)?;

println!("Changes applied successfully");
} else {
println!("No changes to be made");
}

Ok(())
}
2 changes: 2 additions & 0 deletions raftify/src/cli/commands/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
pub mod debug;
pub mod dump;
pub mod utils;
22 changes: 22 additions & 0 deletions raftify/src/cli/commands/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use serde_json::Value;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::str::FromStr;

pub fn parse_peers_json(
peers: &str,
) -> Result<HashMap<u64, SocketAddr>, Box<dyn std::error::Error>> {
let peers: Value = serde_json::from_str(peers)?;

let mut result = HashMap::new();

if let Value::Object(peer) = peers {
for (node_id, addr) in peer {
let key = node_id.parse::<u64>()?;
let value = SocketAddr::from_str(addr.as_str().unwrap())?;
result.insert(key, value);
}
}

Ok(result)
}
Loading

0 comments on commit d54a723

Please sign in to comment.