diff --git a/Cargo.lock b/Cargo.lock index 186b6583d7b..83915ecb19e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -393,6 +393,19 @@ dependencies = [ "thiserror", ] +[[package]] +name = "dbs-snapshot" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf0c82c41414e93c765f9fb84d5531a836f6a0855c5e1c7fcf6cd2c20c85f6e" +dependencies = [ + "displaydoc", + "libc", + "thiserror", + "versionize", + "versionize_derive", +] + [[package]] name = "dbs-uhttp" version = "0.3.2" @@ -414,6 +427,17 @@ dependencies = [ "subtle", ] +[[package]] +name = "displaydoc" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "487585f4d0c6655fe74905e2504d8ad6908e4db67f744eb140876906c2f3175d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.37", +] + [[package]] name = "encoding_rs" version = "0.8.31" @@ -538,19 +562,20 @@ dependencies = [ [[package]] name = "fuse-backend-rs" -version = "0.10.5" -source = "git+https://github.com/loheagn/fuse-backend-rs.git?branch=vfs-persist#be366463fa66e2e675e32e79b3a5500e58f153d1" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e5a63a89f40ec26a0a1434e89de3f4ee939a920eae15d641053ee09ee6ed44b" dependencies = [ "arc-swap", "bitflags 1.3.2", "caps", "core-foundation-sys", + "dbs-snapshot", "lazy_static", "libc", "log", "mio", "nix", - "snapshot", "versionize", "versionize_derive", "vhost", diff --git a/Cargo.toml b/Cargo.toml index ce257ac9474..69de7eae25d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,7 +35,7 @@ path = "src/lib.rs" anyhow = "1" clap = { version = "4.0.18", features = ["derive", "cargo"] } flexi_logger = { version = "0.25", features = ["compress"] } -fuse-backend-rs = { git = "https://github.com/loheagn/fuse-backend-rs.git", branch = "vfs-persist" } +fuse-backend-rs = "^0.11.0" hex = "0.4.3" hyper = "0.14.11" hyperlocal = "0.8.0" diff --git a/api/src/config.rs b/api/src/config.rs index 5f8ce55c183..0dd77fd0d8f 100644 --- a/api/src/config.rs +++ b/api/src/config.rs @@ -1070,7 +1070,7 @@ pub const BLOB_CACHE_TYPE_META_BLOB: &str = "bootstrap"; pub const BLOB_CACHE_TYPE_DATA_BLOB: &str = "datablob"; /// Configuration information for a cached blob. -#[derive(Debug, Deserialize, Serialize)] +#[derive(Debug, Deserialize, Serialize, Clone)] pub struct BlobCacheEntry { /// Type of blob object, bootstrap or data blob. #[serde(rename = "type")] diff --git a/clib/Cargo.toml b/clib/Cargo.toml index fe5892f8dd2..cbc3bb73718 100644 --- a/clib/Cargo.toml +++ b/clib/Cargo.toml @@ -15,7 +15,7 @@ crate-type = ["cdylib", "staticlib"] [dependencies] libc = "0.2.137" log = "0.4.17" -fuse-backend-rs = { git = "https://github.com/loheagn/fuse-backend-rs.git", branch = "vfs-persist" } +fuse-backend-rs = "^0.11.0" nydus-api = { version = "0.3", path = "../api" } nydus-rafs = { version = "0.3.1", path = "../rafs" } nydus-storage = { version = "0.6.3", path = "../storage" } diff --git a/rafs/Cargo.toml b/rafs/Cargo.toml index acfcf5fd2b7..2e3d9633a66 100644 --- a/rafs/Cargo.toml +++ b/rafs/Cargo.toml @@ -19,7 +19,7 @@ nix = "0.24" serde = { version = "1.0.110", features = ["serde_derive", "rc"] } serde_json = "1.0.53" vm-memory = "0.10" -fuse-backend-rs = { git = "https://github.com/loheagn/fuse-backend-rs.git", branch = "vfs-persist" } +fuse-backend-rs = "^0.11.0" thiserror = "1" nydus-api = { version = "0.3", path = "../api" } diff --git a/service/Cargo.toml b/service/Cargo.toml index 5714960b029..3695655923f 100644 --- a/service/Cargo.toml +++ b/service/Cargo.toml @@ -12,9 +12,7 @@ resolver = "2" [dependencies] bytes = { version = "1", optional = true } dbs-allocator = { version = "0.1.1", optional = true } -fuse-backend-rs = { git = "https://github.com/loheagn/fuse-backend-rs.git", branch = "vfs-persist", features = [ - "persist", -] } +fuse-backend-rs = { version = "^0.11.0", features = ["persist"] } libc = "0.2" log = "0.4.8" mio = { version = "0.8", features = ["os-poll", "os-ext"] } diff --git a/service/src/daemon.rs b/service/src/daemon.rs index 911cb416a9f..3170f41cab9 100644 --- a/service/src/daemon.rs +++ b/service/src/daemon.rs @@ -14,7 +14,7 @@ use std::ops::Deref; use std::process; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{Receiver, Sender}; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, MutexGuard}; use std::thread::{Builder, JoinHandle}; use mio::{Events, Poll, Token, Waker}; @@ -23,6 +23,7 @@ use rust_fsm::*; use serde::{self, Serialize}; use crate::fs_service::{FsBackendCollection, FsService}; +use crate::upgrade::UpgradeManager; use crate::{BlobCacheMgr, Error, Result}; /// Nydus daemon working states. @@ -170,6 +171,10 @@ pub trait NydusDaemon: DaemonStateMachineSubscriber + Send + Sync { self.on_event(DaemonStateMachineInput::Start) } + fn upgrade_mgr(&self) -> Option> { + None + } + // For backward compatibility. /// Set default filesystem service object. fn get_default_fs_service(&self) -> Option> { diff --git a/service/src/fs_cache.rs b/service/src/fs_cache.rs index f750c1a649a..8f011733372 100644 --- a/service/src/fs_cache.rs +++ b/service/src/fs_cache.rs @@ -266,6 +266,7 @@ impl FsCacheHandler { tag: Option<&str>, blob_cache_mgr: Arc, threads: usize, + restore_file: Option<&File>, ) -> Result { info!( "fscache: create FsCacheHandler with dir {}, tag {}", @@ -273,15 +274,18 @@ impl FsCacheHandler { tag.unwrap_or("") ); - let mut file = OpenOptions::new() - .write(true) - .read(true) - .create(false) - .open(path) - .map_err(|e| { - error!("Failed to open cachefiles device {}. {}", path, e); - e - })?; + let mut file = match restore_file { + None => OpenOptions::new() + .write(true) + .read(true) + .create(false) + .open(path) + .map_err(|e| { + error!("Failed to open cachefiles device {}. {}", path, e); + e + })?, + Some(f) => f.try_clone()?, + }; let poller = Poll::new().map_err(|_e| eother!("fscache: failed to create poller for service"))?; @@ -296,15 +300,21 @@ impl FsCacheHandler { ) .map_err(|_e| eother!("fscache: failed to register fd for service"))?; - // Initialize the fscache session - file.write_all(format!("dir {}", dir).as_bytes())?; - file.flush()?; - if let Some(tag) = tag { - file.write_all(format!("tag {}", tag).as_bytes())?; + if restore_file.is_none() { + // Initialize the fscache session + file.write_all(format!("dir {}", dir).as_bytes())?; + file.flush()?; + if let Some(tag) = tag { + file.write_all(format!("tag {}", tag).as_bytes())?; + file.flush()?; + } + file.write_all(b"bind ondemand")?; + file.flush()?; + } else { + // send restore cmd, if we are in restore process + file.write_all(b"restore")?; file.flush()?; } - file.write_all(b"bind ondemand")?; - file.flush()?; let state = FsCacheState { id_to_object_map: Default::default(), @@ -379,6 +389,10 @@ impl FsCacheHandler { } } + pub fn get_file(&self) -> &File { + &self.file + } + /// Read and process all requests from fscache driver until no data available. fn handle_requests(&self, buf: &mut [u8]) -> Result<()> { loop { diff --git a/service/src/fs_service.rs b/service/src/fs_service.rs index 94c45699cd9..a834f331206 100644 --- a/service/src/fs_service.rs +++ b/service/src/fs_service.rs @@ -120,14 +120,8 @@ pub trait FsService: Send + Sync { ); } if let Some(mut mgr_guard) = self.upgrade_mgr() { - if let Err(e) = mgr_guard.add_mounts_state(cmd, index) { - warn!( - "failed to add filesystem instance to upgrade manager, {}", - e - ); - mgr_guard.disable_upgrade(); - warn!("disable online upgrade due to inconsistent status!!!"); - } + mgr_guard.add_mounts_state(cmd, index); + mgr_guard.save_vfs_stat(self.get_vfs())?; } Ok(()) @@ -161,14 +155,7 @@ pub trait FsService: Send + Sync { } // Update mounts opaque from UpgradeManager if let Some(mut mgr_guard) = self.upgrade_mgr() { - if let Err(e) = mgr_guard.update_mounts_state(cmd) { - warn!( - "failed to update filesystem instance to upgrade manager, {}", - e - ); - mgr_guard.disable_upgrade(); - warn!("disable online upgrade due to inconsistent status!!!"); - } + mgr_guard.update_mounts_state(cmd)?; } Ok(()) @@ -195,12 +182,8 @@ pub trait FsService: Send + Sync { self.backend_collection().del(&cmd.mountpoint); if let Some(mut mgr_guard) = self.upgrade_mgr() { // Remove mount opaque from UpgradeManager - if let Err(e) = mgr_guard.remove_mounts_state(cmd) { - warn!( - "failed to remove filesystem instance from upgrade manager, {}", - e - ); - } + mgr_guard.remove_mounts_state(cmd); + mgr_guard.save_vfs_stat(self.get_vfs())?; } debug!("try to gc unused blobs"); diff --git a/service/src/fusedev.rs b/service/src/fusedev.rs index d36341cb0ef..c6b983bef90 100644 --- a/service/src/fusedev.rs +++ b/service/src/fusedev.rs @@ -605,6 +605,7 @@ pub fn create_fuse_daemon( error!("service session mount error: {}", &e); eother!(e) })?; + daemon .on_event(DaemonStateMachineInput::Mount) .map_err(|e| eother!(e))?; @@ -615,6 +616,16 @@ pub fn create_fuse_daemon( .service .conn .store(calc_fuse_conn(mnt)?, Ordering::Relaxed); + + if let Some(f) = daemon.service.session.lock().unwrap().get_fuse_file() { + if let Some(mut m) = daemon.service.upgrade_mgr() { + m.hold_file(f).map_err(|e| { + error!("Failed to hold fusedev fd, {:?}", e); + eother!(e) + })?; + m.save_fuse_cid(daemon.service.conn.load(Ordering::Acquire)); + } + } } Ok(daemon) diff --git a/service/src/singleton.rs b/service/src/singleton.rs index 75c1f61a8f3..c16120b9fbc 100644 --- a/service/src/singleton.rs +++ b/service/src/singleton.rs @@ -5,9 +5,12 @@ //! Nydus daemon to host multiple services, including fscache and fusedev. use std::any::Any; +use std::fs::{metadata, File, OpenOptions}; +use std::os::unix::net::UnixStream; +use std::path::Path; use std::sync::atomic::{AtomicBool, AtomicI32, Ordering}; use std::sync::mpsc::{channel, Receiver, Sender}; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, MutexGuard}; use mio::Waker; use nydus_api::config::BlobCacheList; @@ -18,10 +21,11 @@ use crate::daemon::{ NydusDaemon, }; use crate::fs_service::FsService; +use crate::upgrade::{self, UpgradeManager}; use crate::{BlobCacheMgr, Error, Result}; #[allow(dead_code)] -struct ServiceController { +pub struct ServiceController { bti: BuildTimeInfo, id: Option, request_sender: Arc>>, @@ -31,7 +35,7 @@ struct ServiceController { waker: Arc, blob_cache_mgr: Arc, - + upgrade_mgr: Option>, fscache_enabled: AtomicBool, #[cfg(target_os = "linux")] fscache: Mutex>>, @@ -97,11 +101,12 @@ impl ServiceController { #[cfg(target_os = "linux")] impl ServiceController { - fn initialize_fscache_service( + pub fn initialize_fscache_service( &self, tag: Option<&str>, - threads: Option<&str>, + threads: usize, path: &str, + file: Option<&File>, ) -> std::io::Result<()> { // Validate --fscache option value is an existing directory. let p = match std::path::Path::new(&path).canonicalize() { @@ -125,12 +130,6 @@ impl ServiceController { } }; - let threads = if let Some(threads_value) = threads { - crate::validate_threads_configuration(threads_value).map_err(|err| einval!(err))? - } else { - 1usize - }; - info!( "Create fscache instance at {} with tag {}, {} working threads", p, @@ -143,12 +142,22 @@ impl ServiceController { tag, self.blob_cache_mgr.clone(), threads, + file, )?; *self.fscache.lock().unwrap() = Some(Arc::new(fscache)); self.fscache_enabled.store(true, Ordering::Release); Ok(()) } + + fn get_fscache_file(&self) -> std::io::Result { + if let Some(fscache) = self.fscache.lock().unwrap().clone() { + let f = fscache.get_file().try_clone()?; + Ok(f) + } else { + Err(einval!("fscache file not init")) + } + } } impl NydusDaemon for ServiceController { @@ -191,11 +200,15 @@ impl NydusDaemon for ServiceController { } fn save(&self) -> Result<()> { - Err(Error::Unsupported) + upgrade::fscache_upgrade::save(self) } fn restore(&self) -> Result<()> { - Err(Error::Unsupported) + upgrade::fscache_upgrade::restore(self) + } + + fn upgrade_mgr(&self) -> Option> { + self.upgrade_mgr.as_ref().map(|mgr| mgr.lock().unwrap()) } fn get_default_fs_service(&self) -> Option> { @@ -235,6 +248,35 @@ impl DaemonStateMachineSubscriber for ServiceController { } } +fn is_sock_residual(sock: impl AsRef) -> bool { + if metadata(&sock).is_ok() { + return UnixStream::connect(&sock).is_err(); + } + + false +} +/// When nydusd starts, it checks that whether a previous nydusd died unexpected by: +/// 1. Checking whether /dev/cachefiles can be opened. +/// 2. Checking whether the API socket exists and the connection can established or not. +fn is_crashed(sock: &impl AsRef) -> Result { + #[cfg(target_os = "linux")] + if let Err(_e) = OpenOptions::new() + .write(true) + .read(true) + .create(false) + .open("/dev/cachefiles") + { + warn!("cachefiles devfd can not open, the devfd may hold by supervisor or another daemon."); + if is_sock_residual(sock) { + warn!("A previous daemon crashed! Try to failover later."); + return Ok(true); + } + warn!("another daemon is running, will exit!"); + return Err(Error::Unsupported); + } + Ok(false) +} + /// Create and start a Nydus daemon to host fscache and fusedev services. #[allow(clippy::too_many_arguments, unused)] pub fn create_daemon( @@ -246,40 +288,67 @@ pub fn create_daemon( config: Option, bti: BuildTimeInfo, waker: Arc, + api_sock: Option>, + upgrade: bool, ) -> std::io::Result> { let (to_sm, from_client) = channel::(); let (to_client, from_sm) = channel::>(); + let upgrade_mgr = supervisor + .as_ref() + .map(|s| Mutex::new(UpgradeManager::new(s.to_string().into()))); + let service_controller = ServiceController { bti, id, request_sender: Arc::new(Mutex::new(to_sm)), result_receiver: Mutex::new(from_sm), - state: Default::default(), + state: AtomicI32::new(DaemonState::INIT as i32), supervisor, waker, blob_cache_mgr: Arc::new(BlobCacheMgr::new()), - + upgrade_mgr, fscache_enabled: AtomicBool::new(false), #[cfg(target_os = "linux")] fscache: Mutex::new(None), }; service_controller.initialize_blob_cache(&config)?; - #[cfg(target_os = "linux")] - if let Some(path) = fscache { - service_controller.initialize_fscache_service(tag, threads, path)?; - } let daemon = Arc::new(service_controller); let machine = DaemonStateMachineContext::new(daemon.clone(), from_client, to_client); machine.kick_state_machine()?; - daemon - .on_event(DaemonStateMachineInput::Mount) - .map_err(|e| eother!(e))?; - daemon - .on_event(DaemonStateMachineInput::Start) - .map_err(|e| eother!(e))?; + + // Without api socket, nydusd can't do neither live-upgrade nor failover, so the helper + // finding a victim is not necessary. + if (api_sock.as_ref().is_some() && !upgrade && !is_crashed(api_sock.as_ref().unwrap())?) + || api_sock.is_none() + { + #[cfg(target_os = "linux")] + if let Some(path) = fscache { + let threads = if let Some(threads_value) = threads { + crate::validate_threads_configuration(threads_value).map_err(|err| einval!(err))? + } else { + 1usize + }; + daemon.initialize_fscache_service(tag, threads, path, None)?; + let f = daemon.get_fscache_file()?; + if let Some(mut mgr_guard) = daemon.upgrade_mgr() { + mgr_guard.hold_file(&f).map_err(|e| { + error!("Failed to hold fscache fd, {:?}", e); + eother!(e) + })?; + mgr_guard.save_fscache_states(threads, path.to_string()); + } + } + + daemon + .on_event(DaemonStateMachineInput::Mount) + .map_err(|e| eother!(e))?; + daemon + .on_event(DaemonStateMachineInput::Start) + .map_err(|e| eother!(e))?; + } Ok(daemon) } @@ -316,6 +385,7 @@ mod tests { supervisor: Some(String::from("supervisor")), waker: Arc::new(waker), blob_cache_mgr: Arc::new(BlobCacheMgr::new()), + upgrade_mgr: None, fscache_enabled: AtomicBool::new(false), fscache: Mutex::new(None), } @@ -326,19 +396,19 @@ mod tests { let service_controller = create_service_controller(); assert!(service_controller - .initialize_fscache_service(None, None, "some path") + .initialize_fscache_service(None, 1, "some path", None) .is_err()); let mut p = std::env::current_dir().unwrap(); p.push("Cargo.toml"); assert!(service_controller - .initialize_fscache_service(None, None, p.to_str().unwrap()) + .initialize_fscache_service(None, 1, p.to_str().unwrap(), None) .is_err()); let tmp_dir = TempDir::new().unwrap(); let dir = tmp_dir.as_path().to_str().unwrap(); assert!(service_controller - .initialize_fscache_service(None, Some("1"), dir) + .initialize_fscache_service(None, 1, dir, None) .is_ok()); assert_eq!(service_controller.id(), Some(String::from("id"))); diff --git a/service/src/upgrade.rs b/service/src/upgrade.rs index afae3506450..e1f9698ecbc 100644 --- a/service/src/upgrade.rs +++ b/service/src/upgrade.rs @@ -7,16 +7,20 @@ use std::any::TypeId; use std::collections::HashMap; use std::convert::{TryFrom, TryInto}; +use std::fs::File; use std::io; -use std::os::fd::RawFd; +use std::os::fd::{AsRawFd, FromRawFd, RawFd}; use std::path::PathBuf; -use std::sync::atomic::{AtomicBool, Ordering}; +use nydus_api::BlobCacheEntry; use nydus_upgrade::backend::unix_domain_socket::UdsStorageBackend; use nydus_upgrade::backend::{StorageBackend, StorageBackendErr}; use crate::fs_service::{FsBackendMountCmd, FsBackendUmountCmd}; use crate::{Error, Result}; +use fuse_backend_rs::api::Vfs; +use versionize::{VersionMap, Versionize, VersionizeResult}; +use versionize_derive::Versionize; /// Error codes related to upgrade manager. #[derive(thiserror::Error, Debug)] @@ -26,11 +30,14 @@ pub enum UpgradeMgrError { #[error("failed to save/restore data via the backend, {0}")] StorageBackendError(StorageBackendErr), - #[error("failed to serialize, {0}")] Serialize(io::Error), #[error("failed to deserialize, {0}")] Deserialize(io::Error), + #[error("failed to clone file, {0}")] + CloneFile(io::Error), + #[error("failed to initialize fscache driver, {0}")] + InitializeFscache(io::Error), } impl From for Error { @@ -70,53 +77,121 @@ impl TryFrom<&String> for FailoverPolicy { const MAX_STATE_DATA_LENGTH: usize = 1024 * 32; +struct FscacheState { + blob_entry_map: HashMap, + threads: usize, + path: String, +} + +#[derive(Versionize, Clone)] +struct MountStateWrapper { + cmd: FsBackendMountCmd, + vfs_index: u8, +} + +struct FusedevState { + fs_mount_cmd_map: HashMap, + vfs_state_data: Vec, + fuse_conn_id: u64, +} + /// Online upgrade manager. pub struct UpgradeManager { - // backend_mount_cmd_map records the mount command of each backend filesystem. - // the structure is: mountpoint -> (FsBackendMountCmd, vfs_index) - backend_mount_cmd_map: HashMap, - fds: Vec, + fscache_deamon_stat: FscacheState, + fuse_deamon_stat: FusedevState, + file: Option, backend: Box, - - disabled: AtomicBool, } impl UpgradeManager { /// Create a new instance of [UpgradeManager]. pub fn new(socket_path: PathBuf) -> Self { UpgradeManager { - backend_mount_cmd_map: HashMap::new(), + fscache_deamon_stat: FscacheState { + blob_entry_map: HashMap::new(), + threads: 1, + path: "".to_string(), + }, + fuse_deamon_stat: FusedevState { + fs_mount_cmd_map: HashMap::new(), + vfs_state_data: vec![], + fuse_conn_id: 0, + }, + file: None, backend: Box::new(UdsStorageBackend::new(socket_path)), - fds: Vec::new(), - disabled: AtomicBool::new(false), } } + pub fn add_blob_entry_state(&mut self, entry: BlobCacheEntry) { + let mut blob_state_id = entry.domain_id.to_string(); + blob_state_id.push('/'); + blob_state_id.push_str(&entry.blob_id); + + self.fscache_deamon_stat + .blob_entry_map + .insert(blob_state_id, entry); + } - /// Add a filesystem instance into the upgrade manager. - pub fn add_mounts_state(&mut self, cmd: FsBackendMountCmd, vfs_index: u8) -> Result<()> { - if self.disabled.load(Ordering::Acquire) { - return Err(Error::Unsupported); + pub fn remove_blob_entry_state(&mut self, domain_id: &str, blob_id: &str) { + let mut blob_state_id = domain_id.to_string(); + blob_state_id.push('/'); + // for no shared domain mode, snapshotter will call unbind without blob_id + if !blob_id.is_empty() { + blob_state_id.push_str(blob_id); + } else { + blob_state_id.push_str(domain_id); } - let cmd_map = &mut self.backend_mount_cmd_map; - if cmd_map.contains_key(&cmd.mountpoint) { - return Err(Error::AlreadyExists); + if self + .fscache_deamon_stat + .blob_entry_map + .remove(&blob_state_id) + .is_none() + { + warn!("blob {}: state was not saved before!", blob_state_id) } + } + + pub fn save_fscache_states(&mut self, threads: usize, path: String) { + self.fscache_deamon_stat.path = path; + self.fscache_deamon_stat.threads = threads; + } + + pub fn save_fuse_cid(&mut self, fuse_conn_id: u64) { + self.fuse_deamon_stat.fuse_conn_id = fuse_conn_id; + } - cmd_map.insert(cmd.mountpoint.clone(), (cmd, vfs_index)); + pub fn save_vfs_stat(&mut self, vfs: &Vfs) -> Result<()> { + let vfs_state_data = vfs.save_to_bytes().map_err(|e| { + let io_err = io::Error::new( + io::ErrorKind::Other, + format!("Failed to save vfs state: {:?}", e), + ); + UpgradeMgrError::Serialize(io_err) + })?; + self.fuse_deamon_stat.vfs_state_data = vfs_state_data; Ok(()) } + /// Add a filesystem instance into the upgrade manager. + pub fn add_mounts_state(&mut self, cmd: FsBackendMountCmd, vfs_index: u8) { + let cmd_wrapper = MountStateWrapper { + cmd: cmd.clone(), + vfs_index, + }; + self.fuse_deamon_stat + .fs_mount_cmd_map + .insert(cmd.mountpoint, cmd_wrapper); + } + /// Update a filesystem instance in the upgrade manager. pub fn update_mounts_state(&mut self, cmd: FsBackendMountCmd) -> Result<()> { - if self.disabled.load(Ordering::Acquire) { - return Err(Error::Unsupported); - } - - let cmd_map = &mut self.backend_mount_cmd_map; - match cmd_map.get_mut(&cmd.mountpoint) { - Some(cmd_with_vfs_idx) => { - cmd_with_vfs_idx.0 = cmd; + match self + .fuse_deamon_stat + .fs_mount_cmd_map + .get_mut(&cmd.mountpoint) + { + Some(cmd_wrapper) => { + cmd_wrapper.cmd = cmd; Ok(()) } None => Err(Error::NotFound), @@ -124,35 +199,34 @@ impl UpgradeManager { } /// Remove a filesystem instance from the upgrade manager. - pub fn remove_mounts_state(&mut self, cmd: FsBackendUmountCmd) -> Result<()> { - if self.disabled.load(Ordering::Acquire) { - return Err(Error::Unsupported); - } - - let cmd_map = &mut self.backend_mount_cmd_map; - match cmd_map.get_mut(&cmd.mountpoint) { - Some(_) => { - cmd_map.remove(&cmd.mountpoint); - Ok(()) - } - None => Err(Error::NotFound), + pub fn remove_mounts_state(&mut self, cmd: FsBackendUmountCmd) { + if self + .fuse_deamon_stat + .fs_mount_cmd_map + .remove(&cmd.mountpoint) + .is_none() + { + warn!( + "mount state for {}: state was not saved before!", + cmd.mountpoint + ) } } - /// Disable online upgrade capability. - pub fn disable_upgrade(&mut self) { - self.disabled.store(true, Ordering::Release); - } + /// Save the fd and daemon state data for online upgrade. + fn save(&mut self, data: &[u8]) -> Result<()> { + let mut fds = Vec::new(); + if let Some(ref f) = self.file { + fds.push(f.as_raw_fd()) + } - /// Save the fuse fds and fuse state data for online upgrade. - fn save(&mut self, data: &Vec) -> Result<()> { self.backend - .save(&self.fds, data) + .save(&fds, data) .map_err(UpgradeMgrError::StorageBackendError)?; Ok(()) } - /// Restore the fuse fds and fuse state data for online upgrade. + /// Restore the fd and daemon state data for online upgrade. fn restore(&mut self) -> Result> { let mut fds = vec![0 as RawFd; 8]; let mut state_data = vec![0u8; MAX_STATE_DATA_LENGTH]; @@ -160,104 +234,222 @@ impl UpgradeManager { .backend .restore(&mut fds, &mut state_data) .map_err(UpgradeMgrError::StorageBackendError)?; - fds.truncate(fds_len); state_data.truncate(state_data_len); - - self.fds = fds; + if fds_len != 1 { + warn!("Too many fds {}, we may not correctly handle it", fds_len); + } + self.file = Some(unsafe { File::from_raw_fd(fds[0]) }); Ok(state_data) } - fn add_fd(&mut self, fd: RawFd) { - self.fds.push(fd); + pub fn hold_file(&mut self, fd: &File) -> Result<()> { + let f = fd.try_clone().map_err(UpgradeMgrError::CloneFile)?; + self.file = Some(f); + + Ok(()) } -} -/// Online upgrade utilities for FUSE daemon. -pub mod fusedev_upgrade { - use std::fs::File; - use std::os::fd::{FromRawFd, RawFd}; - use std::os::unix::io::AsRawFd; - use std::sync::atomic::Ordering; + pub fn return_file(&mut self) -> Option { + if let Some(ref f) = self.file { + // Basically, this can hardly fail. + f.try_clone() + .map_err(|e| { + error!("Clone file error, {}", e); + e + }) + .ok() + } else { + warn!("No file can be returned"); + None + } + } +} +#[cfg(target_os = "linux")] +/// Online upgrade utilities for fscache daemon. +pub mod fscache_upgrade { + use std::convert::TryFrom; + use std::str::FromStr; + use super::*; + use crate::daemon::NydusDaemon; + use crate::singleton::ServiceController; use nydus_upgrade::persist::Snapshotter; use versionize::{VersionMap, Versionize, VersionizeResult}; use versionize_derive::Versionize; + #[derive(Versionize, Clone)] + pub struct BlobCacheEntryState { + json_str: String, + } + + #[derive(Versionize, Clone, Default)] + struct FscacheBackendState { + blob_entry_list: Vec<(String, BlobCacheEntryState)>, + threads: usize, + path: String, + } + + impl Snapshotter for FscacheBackendState { + fn get_versions() -> Vec> { + vec![ + // version 1 + HashMap::from([(FscacheBackendState::type_id(), 1)]), + // more versions for the future + ] + } + } + + impl TryFrom<&FscacheBackendState> for FscacheState { + type Error = std::io::Error; + fn try_from(backend_stat: &FscacheBackendState) -> std::result::Result { + let mut map = HashMap::new(); + for (id, entry_stat) in &backend_stat.blob_entry_list { + let entry = BlobCacheEntry::from_str(&entry_stat.json_str)?; + map.insert(id.to_string(), entry); + } + Ok(FscacheState { + blob_entry_map: map, + threads: backend_stat.threads, + path: backend_stat.path.clone(), + }) + } + } + + impl TryFrom<&FscacheState> for FscacheBackendState { + type Error = std::io::Error; + fn try_from(stat: &FscacheState) -> std::result::Result { + let mut list = Vec::new(); + for (id, entry) in &stat.blob_entry_map { + let entry_stat = serde_json::to_string(&entry)?; + list.push(( + id.to_string(), + BlobCacheEntryState { + json_str: entry_stat, + }, + )); + } + Ok(FscacheBackendState { + blob_entry_list: list, + threads: stat.threads, + path: stat.path.clone(), + }) + } + } + + pub fn save(daemon: &ServiceController) -> Result<()> { + if let Some(mut mgr) = daemon.upgrade_mgr() { + let backend_stat = FscacheBackendState::try_from(&mgr.fscache_deamon_stat) + .map_err(UpgradeMgrError::Serialize)?; + let stat = backend_stat.save().map_err(UpgradeMgrError::Serialize)?; + mgr.save(&stat)?; + } + Ok(()) + } + + pub fn restore(daemon: &ServiceController) -> Result<()> { + if let Some(mut mgr) = daemon.upgrade_mgr() { + if let Some(blob_mgr) = daemon.get_blob_cache_mgr() { + // restore the mgr state via the backend in the mgr + let mut state_data = mgr.restore()?; + + let backend_stat = FscacheBackendState::restore(&mut state_data) + .map_err(UpgradeMgrError::Deserialize)?; + + let stat = + FscacheState::try_from(&backend_stat).map_err(UpgradeMgrError::Deserialize)?; + // restore blob entry + stat.blob_entry_map + .iter() + .try_for_each(|(_, entry)| -> Result<()> { + blob_mgr + .add_blob_entry(entry) + .map_err(UpgradeMgrError::Deserialize)?; + Ok(()) + })?; + + // init fscache daemon with restored fd + if let Some(f) = mgr.return_file() { + daemon + .initialize_fscache_service(None, stat.threads, &stat.path, Some(&f)) + .map_err(UpgradeMgrError::InitializeFscache)?; + } + + //restore upgrade manager fscache stat + mgr.fscache_deamon_stat = stat; + return Ok(()); + } + } + Err(UpgradeMgrError::MissingSupervisorPath.into()) + } +} + +/// Online upgrade utilities for FUSE daemon. +pub mod fusedev_upgrade { + use std::sync::atomic::Ordering; + use super::*; use crate::daemon::NydusDaemon; use crate::fusedev::{FusedevDaemon, FusedevFsService}; + use nydus_upgrade::persist::Snapshotter; + use versionize::{VersionMap, Versionize, VersionizeResult}; + use versionize_derive::Versionize; #[derive(Versionize, Clone, Default)] - struct FusedevState { - backend_fs_mount_cmd_list: Vec<(FsBackendMountCmd, u8)>, + struct FusedevBackendState { + fs_mount_cmd_list: Vec<(String, MountStateWrapper)>, vfs_state_data: Vec, fuse_conn_id: u64, } - impl Snapshotter for FusedevState { + impl Snapshotter for FusedevBackendState { fn get_versions() -> Vec> { vec![ // version 1 - HashMap::from([(FusedevState::type_id(), 1)]), + HashMap::from([(FusedevBackendState::type_id(), 1)]), // more versions for the future ] } } + impl From<&FusedevBackendState> for FusedevState { + fn from(backend_stat: &FusedevBackendState) -> Self { + let mut map = HashMap::new(); + for (mp, mw) in &backend_stat.fs_mount_cmd_list { + map.insert(mp.to_string(), mw.clone()); + } + FusedevState { + fs_mount_cmd_map: map, + vfs_state_data: backend_stat.vfs_state_data.clone(), + fuse_conn_id: backend_stat.fuse_conn_id, + } + } + } + + impl From<&FusedevState> for FusedevBackendState { + fn from(stat: &FusedevState) -> Self { + let mut list = Vec::new(); + for (mp, mw) in &stat.fs_mount_cmd_map { + list.push((mp.to_string(), mw.clone())); + } + FusedevBackendState { + fs_mount_cmd_list: list, + vfs_state_data: stat.vfs_state_data.clone(), + fuse_conn_id: stat.fuse_conn_id, + } + } + } + /// Save state information for a FUSE daemon. pub fn save(daemon: &FusedevDaemon) -> Result<()> { let svc = daemon.get_default_fs_service().ok_or(Error::NotFound)?; - if !svc.get_vfs().initialized() { return Err(Error::NotReady); } let mut mgr = svc.upgrade_mgr().unwrap(); + let backend_stat = FusedevBackendState::from(&mgr.fuse_deamon_stat); - // set fd - let fd = svc - .as_ref() - .as_any() - .downcast_ref::() - .unwrap() - .session - .lock() - .unwrap() - .get_fuse_file() - .unwrap() - .as_raw_fd(); - mgr.add_fd(fd); - - // save vfs state - let vfs_state_data = svc.get_vfs().save_to_bytes().map_err(|e| { - let io_err = io::Error::new( - io::ErrorKind::Other, - format!("Failed to save vfs state: {:?}", e), - ); - UpgradeMgrError::Serialize(io_err) - })?; - - let backend_fs_mount_cmd_list = mgr - .backend_mount_cmd_map - .iter() - .map(|(_, (cmd, vfs_idx))| (cmd.clone(), *vfs_idx)) - .collect(); - - let fuse_conn_id = svc - .as_any() - .downcast_ref::() - .unwrap() - .conn - .load(Ordering::Acquire); - - let state = FusedevState { - backend_fs_mount_cmd_list, - vfs_state_data, - fuse_conn_id, - }; - let state = state.save().map_err(UpgradeMgrError::Serialize)?; - - // save the mgr state via the backend in the mgr + let state = backend_stat.save().map_err(UpgradeMgrError::Serialize)?; mgr.save(&state)?; Ok(()) @@ -276,17 +468,10 @@ pub mod fusedev_upgrade { // restore the mgr state via the backend in the mgr let mut state_data = mgr.restore()?; - let mut state = - FusedevState::restore(&mut state_data).map_err(UpgradeMgrError::Deserialize)?; + let backend_state = + FusedevBackendState::restore(&mut state_data).map_err(UpgradeMgrError::Deserialize)?; - // restore the backend fs mount cmd map - state - .backend_fs_mount_cmd_list - .iter() - .for_each(|(cmd, vfs_idx)| { - mgr.backend_mount_cmd_map - .insert(cmd.mountpoint.clone(), (cmd.clone(), *vfs_idx)); - }); + let mut state = FusedevState::from(&backend_state); // restore the fuse daemon svc.as_any() @@ -296,27 +481,32 @@ pub mod fusedev_upgrade { .store(state.fuse_conn_id, Ordering::Release); // restore fuse fd - svc.as_any() - .downcast_ref::() - .unwrap() - .session - .lock() - .unwrap() - .set_fuse_file(unsafe { File::from_raw_fd(mgr.fds[0] as RawFd) }); + if let Some(f) = mgr.return_file() { + svc.as_any() + .downcast_ref::() + .unwrap() + .session + .lock() + .unwrap() + .set_fuse_file(f); + } // restore vfs svc.get_vfs() .restore_from_bytes(&mut state.vfs_state_data)?; state - .backend_fs_mount_cmd_list + .fs_mount_cmd_map .iter() - .try_for_each(|(cmd, vfs_idx)| -> Result<()> { - svc.restore_mount(cmd, *vfs_idx)?; + .try_for_each(|(_, mount_wrapper)| -> Result<()> { + svc.restore_mount(&mount_wrapper.cmd, mount_wrapper.vfs_index)?; // as we are in upgrade stage and obtain the lock, `unwrap` is safe here - mgr.add_mounts_state(cmd.clone(), *vfs_idx).unwrap(); + //mgr.add_mounts_state(cmd.clone(), *vfs_idx); Ok(()) })?; + //restore upgrade manager fuse stat + mgr.fuse_deamon_stat = state; + Ok(()) } } diff --git a/src/bin/nydusd/api_server_glue.rs b/src/bin/nydusd/api_server_glue.rs index dea9fa7cd92..18da52c6823 100644 --- a/src/bin/nydusd/api_server_glue.rs +++ b/src/bin/nydusd/api_server_glue.rs @@ -279,6 +279,10 @@ impl ApiServer { "{}", e )))) + } else if let Some(mut mgr_guard) = self.get_daemon_object()?.upgrade_mgr() { + // if started with supervisor, save the blob entry state + mgr_guard.add_blob_entry_state(entry.clone()); + Ok(ApiResponsePayload::Empty) } else { Ok(ApiResponsePayload::Empty) } @@ -295,6 +299,9 @@ impl ApiServer { "{}", e )))) + } else if let Some(mut mgr_guard) = self.get_daemon_object()?.upgrade_mgr() { + mgr_guard.remove_blob_entry_state(¶m.domain_id, ¶m.blob_id); + Ok(ApiResponsePayload::Empty) } else { Ok(ApiResponsePayload::Empty) } diff --git a/src/bin/nydusd/main.rs b/src/bin/nydusd/main.rs index 357882d0d7f..e06693e7a55 100644 --- a/src/bin/nydusd/main.rs +++ b/src/bin/nydusd/main.rs @@ -545,7 +545,7 @@ fn process_fs_service( fn process_singleton_arguments( subargs: &SubCmdArgs, - _apisock: Option<&str>, + apisock: Option<&str>, bti: BuildTimeInfo, ) -> Result<()> { let id = subargs.value_of("id").map(|id| id.to_string()); @@ -572,6 +572,8 @@ fn process_singleton_arguments( config, bti, DAEMON_CONTROLLER.alloc_waker(), + apisock, + subargs.is_present("upgrade"), ) .map_err(|e| { error!("Failed to start singleton daemon: {}", e); diff --git a/storage/Cargo.toml b/storage/Cargo.toml index 78373874156..84cfd50495f 100644 --- a/storage/Cargo.toml +++ b/storage/Cargo.toml @@ -42,7 +42,7 @@ tokio = { version = "1.19.0", features = [ ] } url = { version = "2.1.1", optional = true } vm-memory = "0.10" -fuse-backend-rs = { git = "https://github.com/loheagn/fuse-backend-rs.git", branch = "vfs-persist" } +fuse-backend-rs = "^0.11.0" gpt = { version = "3.1.0", optional = true } nydus-api = { version = "0.3", path = "../api" } diff --git a/upgrade/src/backend/mod.rs b/upgrade/src/backend/mod.rs index 6e85e4469db..435d91126c8 100644 --- a/upgrade/src/backend/mod.rs +++ b/upgrade/src/backend/mod.rs @@ -73,7 +73,7 @@ mod test { let fds = [5 as RawFd; FDS_LEN]; let data: [u8; DATA_LEN] = [7, 8, 9, 10, 12]; - let mut backend: Box = Box::new(TestStorageBackend::default()); + let mut backend: Box = Box::::default(); let saved_data_len = backend.save(&fds, &data).unwrap(); assert_eq!(saved_data_len, DATA_LEN); diff --git a/upgrade/src/backend/unix_domain_socket.rs b/upgrade/src/backend/unix_domain_socket.rs index 18f727b93d1..c107a9254c1 100644 --- a/upgrade/src/backend/unix_domain_socket.rs +++ b/upgrade/src/backend/unix_domain_socket.rs @@ -23,7 +23,7 @@ impl UdsStorageBackend { impl StorageBackend for UdsStorageBackend { fn save(&mut self, fds: &[RawFd], data: &[u8]) -> Result { - if fds.len() < 1 { + if fds.is_empty() { return Err(StorageBackendErr::NoEnoughFds); } @@ -43,7 +43,7 @@ impl StorageBackend for UdsStorageBackend { .recv_with_fd(data, fds) .map_err(StorageBackendErr::RecvFd)?; - if fds.len() < 1 { + if fds.is_empty() { return Err(StorageBackendErr::NoEnoughFds); }