Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement log storage polymorphism #82

Merged
merged 19 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 25 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ default-members = [
"examples/memstore/dynamic-members",
"examples/memstore/static-members",
]
exclude = ["raft-rs"]
exclude = ["raft-rs", "raftify-cli"]

[workspace.package]
version = "0.1.69"
version = "0.1.78"
authors = ["Lablup Inc."]
edition = "2021"
description = "Experimental High level Raft framework"
Expand All @@ -35,4 +35,5 @@ example-harness = { path = "examples" }
memstore-example-harness = { path = "examples/memstore" }

[patch.crates-io]
jopemachine-raft = { path = "./raft-rs" }
jopemachine-raft = { path = "./raft-rs" }
raftify = { path = "./raftify" }
23 changes: 20 additions & 3 deletions binding/python/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions binding/python/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "raftify-py"
version = "0.1.67"
version = "0.1.78"
authors = ["Lablup Inc."]
license = "Apache-2.0"
repository = "https://github.com/lablup/raftify"
Expand All @@ -19,7 +19,8 @@ pyo3-asyncio = { version = "0.20.0", features = ["tokio-runtime"] }
pythonize = "0.20.0"
tokio = { version = "1.4", features = ["full"] }
async-trait = "0.1.48"
raftify = { version = "=0.1.69", default-features = false }
raftify = { version = "=0.1.78", features = ["heed_storage"] , default-features = false }
raftify_cli = { version = "=0.1.1" }
slog = { version = "2.2", features = ["max_level_trace", "release_max_level_trace"] }
slog-envlogger = "2.1.0"
slog-term = "2.9.0"
Expand Down
9 changes: 6 additions & 3 deletions binding/python/src/bindings/cli.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
use pyo3::prelude::*;
use pyo3_asyncio::tokio::future_into_py;
use raftify::cli::cli_handler;
use raftify::HeedStorage;
use raftify_cli::cli_handler;

use super::state_machine::{PyFSM, PyLogEntry};
use super::abstract_types::{PyFSM, PyLogEntry};

// When args is None, std::env::args is automatically used.
#[pyfunction]
pub fn cli_main<'a>(args: Option<Vec<String>>, py: Python<'a>) -> PyResult<&'a PyAny> {
future_into_py(py, async move {
cli_handler::<PyLogEntry, PyFSM>(args).await.unwrap();
cli_handler::<PyLogEntry, HeedStorage, PyFSM>(args)
.await
.unwrap();
Ok(())
})
}
16 changes: 6 additions & 10 deletions binding/python/src/bindings/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@ pub struct PyConfig {
pub lmdb_map_size: u64,
pub cluster_id: String,
pub conf_change_request_timeout: f32,
pub bootstrap_from_snapshot: bool,
pub initial_peers: Option<PyPeers>,
pub snapshot_interval: Option<f32>,
pub restore_wal_from: Option<u64>,
pub restore_wal_snapshot_from: Option<u64>,
}

#[pymethods]
Expand All @@ -36,8 +35,7 @@ impl PyConfig {
conf_change_request_timeout: Option<f32>,
initial_peers: Option<PyPeers>,
snapshot_interval: Option<f32>,
restore_wal_from: Option<u64>,
restore_wal_snapshot_from: Option<u64>,
bootstrap_from_snapshot: Option<bool>,
) -> Self {
let cfg = Config::default();

Expand All @@ -56,8 +54,8 @@ impl PyConfig {
conf_change_request_timeout.unwrap_or(cfg.conf_change_request_timeout);
let initial_peers = initial_peers;
let snapshot_interval = snapshot_interval;
let restore_wal_from = restore_wal_from;
let restore_wal_snapshot_from = restore_wal_snapshot_from;
let bootstrap_from_snapshot =
bootstrap_from_snapshot.unwrap_or(cfg.bootstrap_from_snapshot);

Self {
raft_config,
Expand All @@ -71,8 +69,7 @@ impl PyConfig {
conf_change_request_timeout,
initial_peers,
snapshot_interval,
restore_wal_from,
restore_wal_snapshot_from,
bootstrap_from_snapshot,
}
}
}
Expand All @@ -91,8 +88,7 @@ impl From<PyConfig> for Config {
conf_change_request_timeout: config.conf_change_request_timeout,
initial_peers: config.initial_peers.map(|peers| peers.inner),
raft_config: config.raft_config.inner,
restore_wal_from: config.restore_wal_from,
restore_wal_snapshot_from: config.restore_wal_snapshot_from,
bootstrap_from_snapshot: config.bootstrap_from_snapshot,
}
}
}
4 changes: 2 additions & 2 deletions binding/python/src/bindings/mod.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
pub mod abstract_types;
pub mod cli;
pub mod cluster_join_ticket;
pub mod confchange_request;
pub mod config;
pub mod errors;
pub mod formatter;
pub mod initial_role;
pub mod logger;
pub mod peer;
pub mod peers;
pub mod raft_bootstrapper;
pub mod raft_client;
pub mod raft_node;
pub mod raft_rs;
pub mod role;
pub mod slogger;
pub mod state_machine;
pub mod utils;
2 changes: 1 addition & 1 deletion binding/python/src/bindings/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use pyo3::prelude::*;
use pyo3_asyncio::tokio::future_into_py;
use raftify::Peer;

use super::role::PyInitialRole;
use super::initial_role::PyInitialRole;

#[derive(Clone)]
#[pyclass(name = "Peer")]
Expand Down
2 changes: 1 addition & 1 deletion binding/python/src/bindings/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use raftify::Peers;
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, hash::BuildHasherDefault};

use super::{peer::PyPeer, role::PyInitialRole};
use super::{initial_role::PyInitialRole, peer::PyPeer};

#[derive(Serialize, Deserialize, Clone)]
#[pyclass(dict, name = "Peers")]
Expand Down
21 changes: 15 additions & 6 deletions binding/python/src/bindings/raft_bootstrapper.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
use pyo3::{exceptions::PyException, prelude::*, types::PyString};
use pyo3_asyncio::tokio::future_into_py;
use raftify::Raft;
use raftify::{HeedStorage, Raft};
use std::sync::Arc;

use super::{
abstract_types::{PyFSM, PyLogEntry},
cluster_join_ticket::PyClusterJoinTicket,
config::PyConfig,
logger::PyLogger,
raft_node::PyRaftNode,
state_machine::{PyFSM, PyLogEntry},
};

#[derive(Clone)]
#[pyclass(name = "Raft")]
pub struct PyRaftFacade {
inner: Raft<PyLogEntry, PyFSM>,
inner: Raft<PyLogEntry, HeedStorage, PyFSM>,
}

#[pymethods]
Expand All @@ -30,9 +30,17 @@ impl PyRaftFacade {
let fsm = PyFSM::new(fsm);
let addr = addr.to_string();

let storage = HeedStorage::create(
&config.log_dir.clone(),
&config.clone().into(),
Arc::new(PyLogger::new(logger.clone())),
)
.expect("Failed to create heed storage");

let raft = Raft::bootstrap(
node_id,
addr,
storage,
fsm,
config.into(),
Arc::new(PyLogger::new(logger)),
Expand All @@ -49,9 +57,10 @@ impl PyRaftFacade {
py: Python<'a>,
) -> PyResult<&'a PyAny> {
future_into_py(py, async move {
let ticket = Raft::<PyLogEntry, PyFSM>::request_id(raft_addr, peer_addr.to_owned())
.await
.unwrap();
let ticket =
Raft::<PyLogEntry, HeedStorage, PyFSM>::request_id(raft_addr, peer_addr.to_owned())
.await
.unwrap();
Ok(PyClusterJoinTicket { inner: ticket })
})
}
Expand Down
10 changes: 5 additions & 5 deletions binding/python/src/bindings/raft_node.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
use pyo3::{prelude::*, types::PyString};
use pyo3_asyncio::tokio::future_into_py;
use raftify::RaftNode;
use raftify::{HeedStorage, RaftNode};

use super::{
abstract_types::{PyFSM, PyLogEntry},
cluster_join_ticket::PyClusterJoinTicket,
initial_role::PyInitialRole,
peers::PyPeers,
raft_rs::eraftpb::{conf_change_v2::PyConfChangeV2, message::PyMessage},
role::PyInitialRole,
state_machine::{PyFSM, PyLogEntry},
};

#[derive(Clone)]
#[pyclass(name = "RaftNode")]
pub struct PyRaftNode {
pub inner: RaftNode<PyLogEntry, PyFSM>,
pub inner: RaftNode<PyLogEntry, HeedStorage, PyFSM>,
}

impl PyRaftNode {
pub fn new(inner: RaftNode<PyLogEntry, PyFSM>) -> Self {
pub fn new(inner: RaftNode<PyLogEntry, HeedStorage, PyFSM>) -> Self {
PyRaftNode { inner }
}
}
Expand Down
Loading
Loading