Skip to content

Commit

Permalink
feat: support takeover for fscache
Browse files Browse the repository at this point in the history
refine the UpgradeManager, make it can also support store status for
fscache daemon. And make the takeover feature applies to both fuse and
fscache mode.

Signed-off-by: Xin Yin <[email protected]>
  • Loading branch information
Xin Yin committed Dec 6, 2023
1 parent fc86651 commit db8db96
Show file tree
Hide file tree
Showing 17 changed files with 517 additions and 212 deletions.
31 changes: 28 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ path = "src/lib.rs"
anyhow = "1"
clap = { version = "4.0.18", features = ["derive", "cargo"] }
flexi_logger = { version = "0.25", features = ["compress"] }
fuse-backend-rs = { git = "https://github.com/loheagn/fuse-backend-rs.git", branch = "vfs-persist" }
fuse-backend-rs = "^0.11.0"
hex = "0.4.3"
hyper = "0.14.11"
hyperlocal = "0.8.0"
Expand Down
2 changes: 1 addition & 1 deletion api/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1070,7 +1070,7 @@ pub const BLOB_CACHE_TYPE_META_BLOB: &str = "bootstrap";
pub const BLOB_CACHE_TYPE_DATA_BLOB: &str = "datablob";

/// Configuration information for a cached blob.
#[derive(Debug, Deserialize, Serialize)]
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct BlobCacheEntry {
/// Type of blob object, bootstrap or data blob.
#[serde(rename = "type")]
Expand Down
2 changes: 1 addition & 1 deletion clib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ crate-type = ["cdylib", "staticlib"]
[dependencies]
libc = "0.2.137"
log = "0.4.17"
fuse-backend-rs = { git = "https://github.com/loheagn/fuse-backend-rs.git", branch = "vfs-persist" }
fuse-backend-rs = "^0.11.0"
nydus-api = { version = "0.3", path = "../api" }
nydus-rafs = { version = "0.3.1", path = "../rafs" }
nydus-storage = { version = "0.6.3", path = "../storage" }
Expand Down
2 changes: 1 addition & 1 deletion rafs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ nix = "0.24"
serde = { version = "1.0.110", features = ["serde_derive", "rc"] }
serde_json = "1.0.53"
vm-memory = "0.10"
fuse-backend-rs = { git = "https://github.com/loheagn/fuse-backend-rs.git", branch = "vfs-persist" }
fuse-backend-rs = "^0.11.0"
thiserror = "1"

nydus-api = { version = "0.3", path = "../api" }
Expand Down
4 changes: 1 addition & 3 deletions service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@ resolver = "2"
[dependencies]
bytes = { version = "1", optional = true }
dbs-allocator = { version = "0.1.1", optional = true }
fuse-backend-rs = { git = "https://github.com/loheagn/fuse-backend-rs.git", branch = "vfs-persist", features = [
"persist",
] }
fuse-backend-rs = { version = "^0.11.0", features = ["persist"] }
libc = "0.2"
log = "0.4.8"
mio = { version = "0.8", features = ["os-poll", "os-ext"] }
Expand Down
7 changes: 6 additions & 1 deletion service/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::ops::Deref;
use std::process;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, MutexGuard};
use std::thread::{Builder, JoinHandle};

use mio::{Events, Poll, Token, Waker};
Expand All @@ -23,6 +23,7 @@ use rust_fsm::*;
use serde::{self, Serialize};

use crate::fs_service::{FsBackendCollection, FsService};
use crate::upgrade::UpgradeManager;
use crate::{BlobCacheMgr, Error, Result};

/// Nydus daemon working states.
Expand Down Expand Up @@ -170,6 +171,10 @@ pub trait NydusDaemon: DaemonStateMachineSubscriber + Send + Sync {
self.on_event(DaemonStateMachineInput::Start)
}

fn upgrade_mgr(&self) -> Option<MutexGuard<UpgradeManager>> {
None
}

// For backward compatibility.
/// Set default filesystem service object.
fn get_default_fs_service(&self) -> Option<Arc<dyn FsService>> {
Expand Down
46 changes: 30 additions & 16 deletions service/src/fs_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,22 +266,26 @@ impl FsCacheHandler {
tag: Option<&str>,
blob_cache_mgr: Arc<BlobCacheMgr>,
threads: usize,
restore_file: Option<&File>,
) -> Result<Self> {
info!(
"fscache: create FsCacheHandler with dir {}, tag {}",
dir,
tag.unwrap_or("<None>")
);

let mut file = OpenOptions::new()
.write(true)
.read(true)
.create(false)
.open(path)
.map_err(|e| {
error!("Failed to open cachefiles device {}. {}", path, e);
e
})?;
let mut file = match restore_file {
None => OpenOptions::new()
.write(true)
.read(true)
.create(false)
.open(path)
.map_err(|e| {
error!("Failed to open cachefiles device {}. {}", path, e);
e
})?,
Some(f) => f.try_clone()?,
};

let poller =
Poll::new().map_err(|_e| eother!("fscache: failed to create poller for service"))?;
Expand All @@ -296,15 +300,21 @@ impl FsCacheHandler {
)
.map_err(|_e| eother!("fscache: failed to register fd for service"))?;

// Initialize the fscache session
file.write_all(format!("dir {}", dir).as_bytes())?;
file.flush()?;
if let Some(tag) = tag {
file.write_all(format!("tag {}", tag).as_bytes())?;
if restore_file.is_none() {
// Initialize the fscache session
file.write_all(format!("dir {}", dir).as_bytes())?;
file.flush()?;
if let Some(tag) = tag {
file.write_all(format!("tag {}", tag).as_bytes())?;
file.flush()?;
}
file.write_all(b"bind ondemand")?;
file.flush()?;
} else {
// send restore cmd, if we are in restore process
file.write_all(b"restore")?;
file.flush()?;
}
file.write_all(b"bind ondemand")?;
file.flush()?;

let state = FsCacheState {
id_to_object_map: Default::default(),
Expand Down Expand Up @@ -379,6 +389,10 @@ impl FsCacheHandler {
}
}

pub fn get_file(&self) -> &File {
&self.file
}

/// Read and process all requests from fscache driver until no data available.
fn handle_requests(&self, buf: &mut [u8]) -> Result<()> {
loop {
Expand Down
27 changes: 5 additions & 22 deletions service/src/fs_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,8 @@ pub trait FsService: Send + Sync {
);
}
if let Some(mut mgr_guard) = self.upgrade_mgr() {
if let Err(e) = mgr_guard.add_mounts_state(cmd, index) {
warn!(
"failed to add filesystem instance to upgrade manager, {}",
e
);
mgr_guard.disable_upgrade();
warn!("disable online upgrade due to inconsistent status!!!");
}
mgr_guard.add_mounts_state(cmd, index);
mgr_guard.save_vfs_stat(self.get_vfs())?;
}

Ok(())
Expand Down Expand Up @@ -161,14 +155,7 @@ pub trait FsService: Send + Sync {
}
// Update mounts opaque from UpgradeManager
if let Some(mut mgr_guard) = self.upgrade_mgr() {
if let Err(e) = mgr_guard.update_mounts_state(cmd) {
warn!(
"failed to update filesystem instance to upgrade manager, {}",
e
);
mgr_guard.disable_upgrade();
warn!("disable online upgrade due to inconsistent status!!!");
}
mgr_guard.update_mounts_state(cmd)?;
}

Ok(())
Expand All @@ -195,12 +182,8 @@ pub trait FsService: Send + Sync {
self.backend_collection().del(&cmd.mountpoint);
if let Some(mut mgr_guard) = self.upgrade_mgr() {
// Remove mount opaque from UpgradeManager
if let Err(e) = mgr_guard.remove_mounts_state(cmd) {
warn!(
"failed to remove filesystem instance from upgrade manager, {}",
e
);
}
mgr_guard.remove_mounts_state(cmd);
mgr_guard.save_vfs_stat(self.get_vfs())?;
}

debug!("try to gc unused blobs");
Expand Down
11 changes: 11 additions & 0 deletions service/src/fusedev.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,7 @@ pub fn create_fuse_daemon(
error!("service session mount error: {}", &e);
eother!(e)
})?;

daemon
.on_event(DaemonStateMachineInput::Mount)
.map_err(|e| eother!(e))?;
Expand All @@ -615,6 +616,16 @@ pub fn create_fuse_daemon(
.service
.conn
.store(calc_fuse_conn(mnt)?, Ordering::Relaxed);

if let Some(f) = daemon.service.session.lock().unwrap().get_fuse_file() {
if let Some(mut m) = daemon.service.upgrade_mgr() {
m.hold_file(f).map_err(|e| {
error!("Failed to hold fusedev fd, {:?}", e);
eother!(e)
})?;
m.save_fuse_cid(daemon.service.conn.load(Ordering::Acquire));
}
}
}

Ok(daemon)
Expand Down
Loading

0 comments on commit db8db96

Please sign in to comment.