Skip to content

Commit

Permalink
refactor: Wrap lease_keepalive_abort with defer (#10)
Browse files Browse the repository at this point in the history
  • Loading branch information
jopemachine authored Jun 24, 2024
1 parent 7f2b1b3 commit c255e52
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 15 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ crate-type = ["cdylib"]
etcd-client = "0.12.4"
pyo3 = { version = "0.20.2", features = ["extension-module", "multiple-pymethods"] }
pyo3-asyncio = { version = "0.20.0", features = ["tokio-runtime"] }
scopeguard = "1.2.0"
tokio = { version = "1.32.0", features = ["sync"] }
tokio-stream = "0.1.14"
tonic = "0.10.2"
34 changes: 19 additions & 15 deletions src/lock_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,20 @@ impl EtcdLockManager {
.await
.map_err(PyClientError)?;

self.lease_id = match self.ttl {
let mut self_ = scopeguard::guard(self, |self_| {
if let Some(ref lease_keepalive_task) = self_.lease_keepalive_task {
lease_keepalive_task.abort();
}
});

self_.lease_id = match self_.ttl {
Some(ttl) => {
let lease_grant_res = client.lease_grant(ttl, None).await.map_err(PyClientError)?;
let lease_id = lease_grant_res.id();

let mut client_to_move = client.clone();

self.lease_keepalive_task = Some(tokio::spawn(async move {
self_.lease_keepalive_task = Some(tokio::spawn(async move {
let (mut lease_keeper, _lease_stream) = client_to_move
.lease_keep_alive(lease_id)
.await
Expand All @@ -105,22 +111,22 @@ impl EtcdLockManager {
};

let timeout_result: Result<Result<(), PyClientError>, tokio::time::error::Elapsed> =
match self.timeout_seconds {
match self_.timeout_seconds {
Some(seconds) => {
timeout(Duration::from_secs_f64(seconds), self.try_lock(&mut client)).await
timeout(
Duration::from_secs_f64(seconds),
self_.try_lock(&mut client),
)
.await
}
None => ready(Ok(self.try_lock(&mut client).await)).await,
None => ready(Ok(self_.try_lock(&mut client).await)).await,
};

if let Some(ref lease_keepalive_task) = self.lease_keepalive_task {
lease_keepalive_task.abort();
}

match timeout_result {
Ok(Ok(_)) => {}
Ok(Err(e)) => return Err(e.into()),
Ok(Ok(_)) => Ok(PyCommunicator::new(client)),
Ok(Err(try_lock_err)) => Err(try_lock_err.into()),
Err(timedout_err) => {
if let Some(lease_id) = self.lease_id {
if let Some(lease_id) = self_.lease_id {
if let Err(etcd_client::Error::GRpcStatus(status)) =
client.lease_revoke(lease_id).await
{
Expand All @@ -129,11 +135,9 @@ impl EtcdLockManager {
}
}
}
return Err(LockError::new_err(timedout_err.to_string()));
Err(LockError::new_err(timedout_err.to_string()))
}
}

Ok(PyCommunicator::new(client))
}

pub async fn handle_aexit(&mut self) -> PyResult<()> {
Expand Down

0 comments on commit c255e52

Please sign in to comment.