From c255e526eccc50beb592fe1188fdb034f062663a Mon Sep 17 00:00:00 2001 From: Gyubong Lee Date: Mon, 24 Jun 2024 17:33:31 +0900 Subject: [PATCH] refactor: Wrap `lease_keepalive_abort` with defer (#10) --- Cargo.lock | 1 + Cargo.toml | 1 + src/lock_manager.rs | 34 +++++++++++++++++++--------------- 3 files changed, 21 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6ebe1b7..21c9ced 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -215,6 +215,7 @@ dependencies = [ "etcd-client", "pyo3", "pyo3-asyncio", + "scopeguard", "tokio", "tokio-stream", "tonic", diff --git a/Cargo.toml b/Cargo.toml index c5005a1..044f8fa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ crate-type = ["cdylib"] etcd-client = "0.12.4" pyo3 = { version = "0.20.2", features = ["extension-module", "multiple-pymethods"] } pyo3-asyncio = { version = "0.20.0", features = ["tokio-runtime"] } +scopeguard = "1.2.0" tokio = { version = "1.32.0", features = ["sync"] } tokio-stream = "0.1.14" tonic = "0.10.2" diff --git a/src/lock_manager.rs b/src/lock_manager.rs index 9243a61..b3e18d8 100644 --- a/src/lock_manager.rs +++ b/src/lock_manager.rs @@ -80,14 +80,20 @@ impl EtcdLockManager { .await .map_err(PyClientError)?; - self.lease_id = match self.ttl { + let mut self_ = scopeguard::guard(self, |self_| { + if let Some(ref lease_keepalive_task) = self_.lease_keepalive_task { + lease_keepalive_task.abort(); + } + }); + + self_.lease_id = match self_.ttl { Some(ttl) => { let lease_grant_res = client.lease_grant(ttl, None).await.map_err(PyClientError)?; let lease_id = lease_grant_res.id(); let mut client_to_move = client.clone(); - self.lease_keepalive_task = Some(tokio::spawn(async move { + self_.lease_keepalive_task = Some(tokio::spawn(async move { let (mut lease_keeper, _lease_stream) = client_to_move .lease_keep_alive(lease_id) .await @@ -105,22 +111,22 @@ impl EtcdLockManager { }; let timeout_result: Result, tokio::time::error::Elapsed> = - match self.timeout_seconds { + match self_.timeout_seconds { Some(seconds) => { - timeout(Duration::from_secs_f64(seconds), self.try_lock(&mut client)).await + timeout( + Duration::from_secs_f64(seconds), + self_.try_lock(&mut client), + ) + .await } - None => ready(Ok(self.try_lock(&mut client).await)).await, + None => ready(Ok(self_.try_lock(&mut client).await)).await, }; - if let Some(ref lease_keepalive_task) = self.lease_keepalive_task { - lease_keepalive_task.abort(); - } - match timeout_result { - Ok(Ok(_)) => {} - Ok(Err(e)) => return Err(e.into()), + Ok(Ok(_)) => Ok(PyCommunicator::new(client)), + Ok(Err(try_lock_err)) => Err(try_lock_err.into()), Err(timedout_err) => { - if let Some(lease_id) = self.lease_id { + if let Some(lease_id) = self_.lease_id { if let Err(etcd_client::Error::GRpcStatus(status)) = client.lease_revoke(lease_id).await { @@ -129,11 +135,9 @@ impl EtcdLockManager { } } } - return Err(LockError::new_err(timedout_err.to_string())); + Err(LockError::new_err(timedout_err.to_string())) } } - - Ok(PyCommunicator::new(client)) } pub async fn handle_aexit(&mut self) -> PyResult<()> {