diff --git a/src/client/daemon.rs b/src/client/daemon.rs index 152d25b..91409e9 100644 --- a/src/client/daemon.rs +++ b/src/client/daemon.rs @@ -14,7 +14,9 @@ use log::{error, info, warn}; use crate::{ common::{ - errors::CONNECTION_ERROR, sender::REQUEST_TIMEOUT, serialization::MountVolumeSendMetaData, + errors::{status_to_string, CONNECTION_ERROR}, + sender::REQUEST_TIMEOUT, + serialization::MountVolumeSendMetaData, }, rpc::{ client::{RpcClient, UnixStreamCreator}, @@ -85,7 +87,7 @@ impl SealfsFused { } } Err(e) => { - error!("mount error: {}", e); + error!("mount error: {}", status_to_string(e)); Err(e) } } @@ -198,10 +200,7 @@ impl Handler for SealfsFused { self.sync_index_file(); Ok((0, 0, 0, 0, vec![], vec![])) } - Err(e) => { - error!("mount error: {}", e); - Ok((e, 0, 0, 0, vec![], vec![])) - } + Err(e) => Ok((e, 0, 0, 0, vec![], vec![])), } } UMOUNT => { diff --git a/src/rpc/client.rs b/src/rpc/client.rs index b17fe93..f36b250 100644 --- a/src/rpc/client.rs +++ b/src/rpc/client.rs @@ -151,33 +151,31 @@ impl< async fn reconnect(&self, server_address: &str) -> Result<(), String> { match self.connections.get(server_address) { Some(connection) => { - if connection.value().reconnect() { - match S::create_stream(server_address).await { - Ok((read_stream, write_stream)) => { - tokio::spawn(parse_response( - read_stream, - connection.clone(), - self.pool.clone(), - )); - connection.value().reset_connection(write_stream).await; - info!("reconnect to {} success", server_address); - return Ok(()); - } - Err(e) => { - warn!( - "reconnect to {} failed: {}, wait for a while", - server_address, e - ); - tokio::time::sleep(Duration::from_secs(1)).await; - connection.value().reconnect_failed(); - } + if connection.is_connected() { + return Ok(()); + } + match S::create_stream(server_address).await { + Ok((read_stream, write_stream)) => { + tokio::spawn(parse_response( + read_stream, + connection.clone(), + self.pool.clone(), + )); + connection.value().reset_connection(write_stream).await; + info!("reconnect to {} success", server_address); + Ok(()) + } + Err(e) => { + warn!( + "reconnect to {} failed: {}, wait for a while", + server_address, e + ); + tokio::time::sleep(Duration::from_secs(1)).await; + Ok(()) } } - Ok(()) - } - None => { - panic!("connection not exists: {}", server_address); } + None => Err(format!("connection not exists: {}", server_address)), } } @@ -213,7 +211,7 @@ impl< let (batch, id) = self .pool .register_callback(recv_meta_data, recv_data) - .await?; + .await?; // TODO: unregister callback when error if let Err(e) = connection .send_request( @@ -228,18 +226,32 @@ impl< .await { error!("send request to {} failed: {}", server_address, e); - if connection.disconnect() { - self.reconnect(server_address).await?; + connection.disconnect(); + let _lock = connection.get_reconnecting_lock().await; + warn!("connection to {} disconnected", server_address); + match self.reconnect(server_address).await { + Ok(_) => { + continue; + } + Err(e) => { + error!("reconnect to {} failed: {}", server_address, e); + return Err(format!("reconnect to {} failed: {}", server_address, e)); + } + } + } + match self.pool.wait_for_callback(id, timeout).await { + Ok((s, f, meta_data_length, data_length)) => { + *status = s; + *rsp_flags = f; + *recv_meta_data_length = meta_data_length; + *recv_data_length = data_length; + return Ok(()); + } + Err(e) => { + error!("wait for callback failed: {}", e); + continue; } - continue; } - let (s, f, meta_data_length, data_length) = - self.pool.wait_for_callback(id, timeout).await?; // TODO: retry the request - *status = s; - *rsp_flags = f; - *recv_meta_data_length = meta_data_length; - *recv_data_length = data_length; - return Ok(()); } Err(format!( "send request to {} error: send retry times exceed", diff --git a/src/rpc/connection.rs b/src/rpc/connection.rs index 84261fc..fc45644 100644 --- a/src/rpc/connection.rs +++ b/src/rpc/connection.rs @@ -8,7 +8,7 @@ use super::protocol::{ RequestHeader, ResponseHeader, MAX_DATA_LENGTH, MAX_FILENAME_LENGTH, MAX_METADATA_LENGTH, REQUEST_HEADER_SIZE, RESPONSE_HEADER_SIZE, }; -use log::{error, info}; +use log::error; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, sync::Mutex, @@ -16,12 +16,12 @@ use tokio::{ const CONNECTED: u32 = 0; const DISCONNECTED: u32 = 1; -const RECONNECTING: u32 = 2; pub struct ClientConnection { pub server_address: String, write_stream: Mutex>, status: AtomicU32, + reconneting_lock: Mutex<()>, phantom_data: PhantomData, @@ -39,6 +39,7 @@ impl ClientConnection { server_address: server_address.to_string(), write_stream: Mutex::new(Some(write_stream)), status: AtomicU32::new(CONNECTED), + reconneting_lock: Mutex::new(()), phantom_data: PhantomData, _send_lock: Mutex::new(()), } @@ -59,15 +60,8 @@ impl ClientConnection { self.status.load(std::sync::atomic::Ordering::Acquire) == CONNECTED } - pub fn reconnect(&self) -> bool { - self.status - .compare_exchange( - DISCONNECTED, - RECONNECTING, - std::sync::atomic::Ordering::SeqCst, - std::sync::atomic::Ordering::SeqCst, - ) - .is_ok() + pub async fn get_reconnecting_lock(&self) -> tokio::sync::MutexGuard<'_, ()> { + self.reconneting_lock.lock().await } pub async fn reset_connection(&self, write_stream: W) { @@ -76,20 +70,6 @@ impl ClientConnection { .store(CONNECTED, std::sync::atomic::Ordering::SeqCst); } - pub fn reconnect_failed(&self) { - match self.status.compare_exchange( - RECONNECTING, - DISCONNECTED, - std::sync::atomic::Ordering::SeqCst, - std::sync::atomic::Ordering::SeqCst, - ) { - Ok(_) => {} - Err(_) => { - error!("wrong status while roll back connection status") - } - } - } - // request // | batch | id | type | flags | total_length | file_path_length | meta_data_length | data_length | filename | meta_data | data | // | 4Byte | 4Byte | 4Byte | 4Byte | 4Byte | 4Byte | 4Byte | 4Byte | 1~4kB | 0~ | 0~ | @@ -336,15 +316,17 @@ impl ServerConnection { read_stream: &mut R, header: &RequestHeader, ) -> Result<(Vec, Vec, Vec), String> { - if header.file_path_length as usize > MAX_FILENAME_LENGTH - || header.data_length as usize > MAX_DATA_LENGTH - || header.meta_data_length as usize > MAX_METADATA_LENGTH - { - info!( - "path length or data length or meta data length is too long: {} {} {}", - header.file_path_length, header.meta_data_length, header.data_length - ); - return Err("path length or data length or meta data length is too long".into()); + if header.file_path_length as usize > MAX_FILENAME_LENGTH { + error!("path length is too long: {}", header.file_path_length); + return Err("path length is too long".into()); + } + if header.data_length as usize > MAX_DATA_LENGTH { + error!("data length is too long: {}", header.data_length); + return Err("data length is too long".into()); + } + if header.meta_data_length as usize > MAX_METADATA_LENGTH { + error!("meta data length is too long: {}", header.meta_data_length); + return Err("meta data length is too long".into()); } let mut path = vec![0u8; header.file_path_length as usize]; let mut data = vec![0u8; header.data_length as usize]; diff --git a/src/rpc/protocol.rs b/src/rpc/protocol.rs index 328630c..aa742bf 100644 --- a/src/rpc/protocol.rs +++ b/src/rpc/protocol.rs @@ -3,12 +3,12 @@ // SPDX-License-Identifier: Apache-2.0 pub const MAX_FILENAME_LENGTH: usize = 4096; -pub const MAX_DATA_LENGTH: usize = 65536 * 128; -pub const MAX_METADATA_LENGTH: usize = 4096; +pub const MAX_DATA_LENGTH: usize = 65536 * 65536; +pub const MAX_METADATA_LENGTH: usize = 65536; pub const MAX_COPY_LENGTH: usize = 1024 * 8; pub const CONNECTION_RETRY_TIMES: i32 = 100; -pub const SEND_RETRY_TIMES: i32 = 3; +pub const SEND_RETRY_TIMES: i32 = 5; // request // | batch | id | type | flags | total_length | file_path_length | meta_data_length | data_length | filename | meta_data | data |