Skip to content

Commit

Permalink
add lock for reconnecting
Browse files Browse the repository at this point in the history
  • Loading branch information
luanshaotong committed Aug 7, 2023
1 parent 6d92f55 commit 0a4bf99
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 66 deletions.
11 changes: 5 additions & 6 deletions src/client/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ use log::{error, info, warn};

use crate::{
common::{
errors::CONNECTION_ERROR, sender::REQUEST_TIMEOUT, serialization::MountVolumeSendMetaData,
errors::{status_to_string, CONNECTION_ERROR},
sender::REQUEST_TIMEOUT,
serialization::MountVolumeSendMetaData,
},
rpc::{
client::{RpcClient, UnixStreamCreator},
Expand Down Expand Up @@ -85,7 +87,7 @@ impl SealfsFused {
}
}
Err(e) => {
error!("mount error: {}", e);
error!("mount error: {}", status_to_string(e));
Err(e)
}
}
Expand Down Expand Up @@ -198,10 +200,7 @@ impl Handler for SealfsFused {
self.sync_index_file();
Ok((0, 0, 0, 0, vec![], vec![]))
}
Err(e) => {
error!("mount error: {}", e);
Ok((e, 0, 0, 0, vec![], vec![]))
}
Err(e) => Ok((e, 0, 0, 0, vec![], vec![])),
}
}
UMOUNT => {
Expand Down
68 changes: 32 additions & 36 deletions src/rpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,33 +151,31 @@ impl<
async fn reconnect(&self, server_address: &str) -> Result<(), String> {
match self.connections.get(server_address) {
Some(connection) => {
if connection.value().reconnect() {
match S::create_stream(server_address).await {
Ok((read_stream, write_stream)) => {
tokio::spawn(parse_response(
read_stream,
connection.clone(),
self.pool.clone(),
));
connection.value().reset_connection(write_stream).await;
info!("reconnect to {} success", server_address);
return Ok(());
}
Err(e) => {
warn!(
"reconnect to {} failed: {}, wait for a while",
server_address, e
);
tokio::time::sleep(Duration::from_secs(1)).await;
connection.value().reconnect_failed();
}
if connection.is_connected() {
return Ok(());
}
match S::create_stream(server_address).await {
Ok((read_stream, write_stream)) => {
tokio::spawn(parse_response(
read_stream,
connection.clone(),
self.pool.clone(),
));
connection.value().reset_connection(write_stream).await;
info!("reconnect to {} success", server_address);
Ok(())
}
Err(e) => {
warn!(
"reconnect to {} failed: {}, wait for a while",
server_address, e
);
tokio::time::sleep(Duration::from_secs(1)).await;
Ok(())
}
}
Ok(())
}
None => {
panic!("connection not exists: {}", server_address);
}
None => Err(format!("connection not exists: {}", server_address)),
}
}

Expand Down Expand Up @@ -228,20 +226,18 @@ impl<
.await
{
error!("send request to {} failed: {}", server_address, e);
if connection.disconnect() {
warn!("connection to {} disconnected", server_address);
match self.reconnect(server_address).await {
Ok(_) => {
warn!("reconnect to {} success", server_address);
continue;
}
Err(e) => {
error!("reconnect to {} failed: {}", server_address, e);
return Err(format!("reconnect to {} failed: {}", server_address, e));
}
connection.disconnect();
let _lock = connection.get_reconnecting_lock().await;
warn!("connection to {} disconnected", server_address);
match self.reconnect(server_address).await {
Ok(_) => {
continue;
}
Err(e) => {
error!("reconnect to {} failed: {}", server_address, e);
return Err(format!("reconnect to {} failed: {}", server_address, e));
}
}
continue;
}
match self.pool.wait_for_callback(id, timeout).await {
Ok((s, f, meta_data_length, data_length)) => {
Expand Down
28 changes: 4 additions & 24 deletions src/rpc/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ use tokio::{

const CONNECTED: u32 = 0;
const DISCONNECTED: u32 = 1;
const RECONNECTING: u32 = 2;

pub struct ClientConnection<W: AsyncWriteExt + Unpin, R: AsyncReadExt + Unpin> {
pub server_address: String,
write_stream: Mutex<Option<W>>,
status: AtomicU32,
reconneting_lock: Mutex<()>,

phantom_data: PhantomData<R>,

Expand All @@ -39,6 +39,7 @@ impl<W: AsyncWriteExt + Unpin, R: AsyncReadExt + Unpin> ClientConnection<W, R> {
server_address: server_address.to_string(),
write_stream: Mutex::new(Some(write_stream)),
status: AtomicU32::new(CONNECTED),
reconneting_lock: Mutex::new(()),
phantom_data: PhantomData,
_send_lock: Mutex::new(()),
}
Expand All @@ -59,15 +60,8 @@ impl<W: AsyncWriteExt + Unpin, R: AsyncReadExt + Unpin> ClientConnection<W, R> {
self.status.load(std::sync::atomic::Ordering::Acquire) == CONNECTED
}

pub fn reconnect(&self) -> bool {
self.status
.compare_exchange(
DISCONNECTED,
RECONNECTING,
std::sync::atomic::Ordering::SeqCst,
std::sync::atomic::Ordering::SeqCst,
)
.is_ok()
pub async fn get_reconnecting_lock(&self) -> tokio::sync::MutexGuard<'_, ()> {
self.reconneting_lock.lock().await
}

pub async fn reset_connection(&self, write_stream: W) {
Expand All @@ -76,20 +70,6 @@ impl<W: AsyncWriteExt + Unpin, R: AsyncReadExt + Unpin> ClientConnection<W, R> {
.store(CONNECTED, std::sync::atomic::Ordering::SeqCst);
}

pub fn reconnect_failed(&self) {
match self.status.compare_exchange(
RECONNECTING,
DISCONNECTED,
std::sync::atomic::Ordering::SeqCst,
std::sync::atomic::Ordering::SeqCst,
) {
Ok(_) => {}
Err(_) => {
error!("wrong status while roll back connection status")
}
}
}

// request
// | batch | id | type | flags | total_length | file_path_length | meta_data_length | data_length | filename | meta_data | data |
// | 4Byte | 4Byte | 4Byte | 4Byte | 4Byte | 4Byte | 4Byte | 4Byte | 1~4kB | 0~ | 0~ |
Expand Down

0 comments on commit 0a4bf99

Please sign in to comment.