Skip to content

Commit

Permalink
```
Browse files Browse the repository at this point in the history
fix: update transaction methods to return Cow<'static, str> and change depth type to u32
```
  • Loading branch information
LucianBuzzo committed Sep 30, 2024
1 parent a220cf5 commit f787095
Show file tree
Hide file tree
Showing 15 changed files with 133 additions and 159 deletions.
39 changes: 16 additions & 23 deletions quaint/src/connector/mssql/native/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::{
};
use async_trait::async_trait;
use futures::lock::Mutex;
use std::borrow::Cow;
use std::{
convert::TryFrom,
future::Future,
Expand Down Expand Up @@ -242,40 +243,32 @@ impl Queryable for Mssql {
}

/// Statement to begin a transaction
async fn begin_statement(&self, depth: i32) -> String {
let savepoint_stmt = format!("SAVE TRANSACTION savepoint{}", depth);
let ret = if depth > 1 {
savepoint_stmt
fn begin_statement(&self, depth: u32) -> Cow<'static, str> {
if depth > 1 {
Cow::Owned(format!("SAVEPOINT TRANSACTION savepoint{depth}"))
} else {
"BEGIN TRAN".to_string()
};

return ret;
Cow::Borrowed("BEGIN TRAN")
}
}

/// Statement to commit a transaction
async fn commit_statement(&self, depth: i32) -> String {
fn commit_statement(&self, depth: u32) -> Cow<'static, str> {
// MSSQL doesn't have a "RELEASE SAVEPOINT" equivalent, so in a nested
// transaction we just continue onwards
let ret = if depth > 1 {
" ".to_string()
if depth > 1 {
Cow::Owned("".to_string())
} else {
"COMMIT".to_string()
};

return ret;
Cow::Borrowed("COMMIT")
}
}

/// Statement to rollback a transaction
async fn rollback_statement(&self, depth: i32) -> String {
let savepoint_stmt = format!("ROLLBACK TRANSACTION savepoint{}", depth);
let ret = if depth > 1 {
savepoint_stmt
fn rollback_statement(&self, depth: u32) -> Cow<'static, str> {
if depth > 1 {
Cow::Owned(format!("ROLLBACK TRANSACTION savepoint{depth}"))
} else {
"ROLLBACK".to_string()
};

return ret;
Cow::Borrowed("ROLLBACK")
}
}

fn requires_isolation_first(&self) -> bool {
Expand Down
38 changes: 17 additions & 21 deletions quaint/src/connector/mysql/native/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use mysql_async::{
self as my,
prelude::{Query as _, Queryable as _},
};
use std::borrow::Cow;
use std::{
future::Future,
sync::atomic::{AtomicBool, Ordering},
Expand Down Expand Up @@ -347,34 +348,29 @@ impl Queryable for Mysql {
}

/// Statement to begin a transaction
async fn begin_statement(&self, depth: i32) -> String {
let savepoint_stmt = format!("SAVEPOINT savepoint{}", depth);
let ret = if depth > 1 { savepoint_stmt } else { "BEGIN".to_string() };

return ret;
fn begin_statement(&self, depth: u32) -> Cow<'static, str> {
if depth > 1 {
Cow::Owned(format!("SAVEPOINT savepoint{depth}"))
} else {
Cow::Borrowed("BEGIN")
}
}

/// Statement to commit a transaction
async fn commit_statement(&self, depth: i32) -> String {
let savepoint_stmt = format!("RELEASE SAVEPOINT savepoint{}", depth);
let ret = if depth > 1 {
savepoint_stmt
fn commit_statement(&self, depth: u32) -> Cow<'static, str> {
if depth > 1 {
Cow::Owned(format!("RELEASE SAVEPOINT savepoint{depth}"))
} else {
"COMMIT".to_string()
};

return ret;
Cow::Borrowed("COMMIT")
}
}

/// Statement to rollback a transaction
async fn rollback_statement(&self, depth: i32) -> String {
let savepoint_stmt = format!("ROLLBACK TO savepoint{}", depth);
let ret = if depth > 1 {
savepoint_stmt
fn rollback_statement(&self, depth: u32) -> Cow<'static, str> {
if depth > 1 {
Cow::Owned(format!("ROLLBACK TO savepoint{depth}"))
} else {
"ROLLBACK".to_string()
};

return ret;
Cow::Borrowed("ROLLBACK")
}
}
}
39 changes: 17 additions & 22 deletions quaint/src/connector/postgres/native/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use lru_cache::LruCache;
use native_tls::{Certificate, Identity, TlsConnector};
use postgres_native_tls::MakeTlsConnector;
use postgres_types::{Kind as PostgresKind, Type as PostgresType};
use std::borrow::Cow;
use std::hash::{DefaultHasher, Hash, Hasher};
use std::{
borrow::Borrow,
Expand Down Expand Up @@ -765,36 +766,30 @@ impl Queryable for PostgreSql {
}

/// Statement to begin a transaction
async fn begin_statement(&self, depth: i32) -> String {
println!("pg connector: Transaction depth: {}", depth);
let savepoint_stmt = format!("SAVEPOINT savepoint{}", depth);
let ret = if depth > 1 { savepoint_stmt } else { "BEGIN".to_string() };

return ret;
fn begin_statement(&self, depth: u32) -> Cow<'static, str> {
if depth > 1 {
Cow::Owned(format!("SAVEPOINT savepoint{depth}"))
} else {
Cow::Borrowed("BEGIN")
}
}

/// Statement to commit a transaction
async fn commit_statement(&self, depth: i32) -> String {
let savepoint_stmt = format!("RELEASE SAVEPOINT savepoint{}", depth);
let ret = if depth > 1 {
savepoint_stmt
fn commit_statement(&self, depth: u32) -> Cow<'static, str> {
if depth > 1 {
Cow::Owned(format!("RELEASE SAVEPOINT savepoint{depth}"))
} else {
"COMMIT".to_string()
};

return ret;
Cow::Borrowed("COMMIT")
}
}

/// Statement to rollback a transaction
async fn rollback_statement(&self, depth: i32) -> String {
let savepoint_stmt = format!("ROLLBACK TO SAVEPOINT savepoint{}", depth);
let ret = if depth > 1 {
savepoint_stmt
fn rollback_statement(&self, depth: u32) -> Cow<'static, str> {
if depth > 1 {
Cow::Owned(format!("ROLLBACK TO SAVEPOINT savepoint{depth}"))
} else {
"ROLLBACK".to_string()
};

return ret;
Cow::Borrowed("ROLLBACK")
}
}
}

Expand Down
40 changes: 18 additions & 22 deletions quaint/src/connector/queryable.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::borrow::Cow;

use super::{DescribedQuery, IsolationLevel, ResultSet, Transaction};
use crate::ast::*;
use async_trait::async_trait;
Expand Down Expand Up @@ -90,36 +92,30 @@ pub trait Queryable: Send + Sync {
}

/// Statement to begin a transaction
async fn begin_statement(&self, depth: i32) -> String {
println!("connector: Transaction depth: {}", depth);
let savepoint_stmt = format!("SAVEPOINT savepoint{}", depth);
let ret = if depth > 1 { savepoint_stmt } else { "BEGIN".to_string() };

return ret;
fn begin_statement(&self, depth: u32) -> Cow<'static, str> {
if depth > 1 {
Cow::Owned(format!("SAVEPOINT savepoint{depth}"))
} else {
Cow::Borrowed("BEGIN")
}
}

/// Statement to commit a transaction
async fn commit_statement(&self, depth: i32) -> String {
let savepoint_stmt = format!("RELEASE SAVEPOINT savepoint{}", depth);
let ret = if depth > 1 {
savepoint_stmt
fn commit_statement(&self, depth: u32) -> Cow<'static, str> {
if depth > 1 {
Cow::Owned(format!("RELEASE SAVEPOINT savepoint{depth}"))
} else {
"COMMIT".to_string()
};

return ret;
Cow::Borrowed("COMMIT")
}
}

/// Statement to rollback a transaction
async fn rollback_statement(&self, depth: i32) -> String {
let savepoint_stmt = format!("ROLLBACK TO SAVEPOINT savepoint{}", depth);
let ret = if depth > 1 {
savepoint_stmt
fn rollback_statement(&self, depth: u32) -> Cow<'static, str> {
if depth > 1 {
Cow::Owned(format!("ROLLBACK TO SAVEPOINT savepoint{depth}"))
} else {
"ROLLBACK".to_string()
};

return ret;
Cow::Borrowed("ROLLBACK")
}
}

/// Sets the transaction isolation level to given value.
Expand Down
40 changes: 16 additions & 24 deletions quaint/src/connector/sqlite/native/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::{
visitor::{self, Visitor},
};
use async_trait::async_trait;
use std::borrow::Cow;
use std::convert::TryFrom;
use tokio::sync::Mutex;

Expand Down Expand Up @@ -182,43 +183,34 @@ impl Queryable for Sqlite {
}

/// Statement to begin a transaction
async fn begin_statement(&self, depth: i32) -> String {
let savepoint_stmt = format!("SAVEPOINT savepoint{}", depth);
fn begin_statement(&self, depth: u32) -> Cow<'static, str> {
// From https://sqlite.org/isolation.html:
// `BEGIN IMMEDIATE` avoids possible `SQLITE_BUSY_SNAPSHOT` that arise when another connection jumps ahead in line.
// The BEGIN IMMEDIATE command goes ahead and starts a write transaction, and thus blocks all other writers.
// If the BEGIN IMMEDIATE operation succeeds, then no subsequent operations in that transaction will ever fail with an SQLITE_BUSY error.
let ret = if depth > 1 {
savepoint_stmt
if depth > 1 {
Cow::Owned(format!("SAVEPOINT savepoint{depth}"))
} else {
"BEGIN IMMEDIATE".to_string()
};

return ret;
Cow::Borrowed("BEGIN IMMEDIATE")
}
}

/// Statement to commit a transaction
async fn commit_statement(&self, depth: i32) -> String {
let savepoint_stmt = format!("RELEASE SAVEPOINT savepoint{}", depth);
let ret = if depth > 1 {
savepoint_stmt
fn commit_statement(&self, depth: u32) -> Cow<'static, str> {
if depth > 1 {
Cow::Owned(format!("RELEASE SAVEPOINT savepoint{depth}"))
} else {
"COMMIT".to_string()
};

return ret;
Cow::Borrowed("COMMIT")
}
}

/// Statement to rollback a transaction
async fn rollback_statement(&self, depth: i32) -> String {
let savepoint_stmt = format!("ROLLBACK TO savepoint{}", depth);
let ret = if depth > 1 {
savepoint_stmt
fn rollback_statement(&self, depth: u32) -> Cow<'static, str> {
if depth > 1 {
Cow::Owned(format!("ROLLBACK TO savepoint{depth}"))
} else {
"ROLLBACK".to_string()
};

return ret;
Cow::Borrowed("ROLLBACK")
}
}
}

Expand Down
16 changes: 8 additions & 8 deletions quaint/src/connector/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ pub trait Transaction: Queryable {
async fn begin(&mut self) -> crate::Result<()>;

/// Commit the changes to the database and consume the transaction.
async fn commit(&mut self) -> crate::Result<i32>;
async fn commit(&mut self) -> crate::Result<u32>;

/// Rolls back the changes to the database.
async fn rollback(&mut self) -> crate::Result<i32>;
async fn rollback(&mut self) -> crate::Result<u32>;

/// workaround for lack of upcasting between traits https://github.com/rust-lang/rust/issues/65991
fn as_queryable(&self) -> &dyn Queryable;
Expand All @@ -43,7 +43,7 @@ pub(crate) struct TransactionOptions {
/// transaction object will panic.
pub struct DefaultTransaction<'a> {
pub inner: &'a dyn Queryable,
pub depth: Arc<Mutex<i32>>,
pub depth: Arc<Mutex<u32>>,
}

impl<'a> DefaultTransaction<'a> {
Expand Down Expand Up @@ -87,15 +87,15 @@ impl<'a> Transaction for DefaultTransaction<'a> {
*depth
};

let begin_statement = self.inner.begin_statement(current_depth).await;
let begin_statement = self.inner.begin_statement(current_depth);

self.inner.raw_cmd(&begin_statement).await?;

Ok(())
}

/// Commit the changes to the database and consume the transaction.
async fn commit(&mut self) -> crate::Result<i32> {
async fn commit(&mut self) -> crate::Result<u32> {
decrement_gauge!("prisma_client_queries_active", 1.0);

// Lock the mutex and get the depth value
Expand All @@ -105,7 +105,7 @@ impl<'a> Transaction for DefaultTransaction<'a> {
};

// Perform the asynchronous operation without holding the lock
let commit_statement = self.inner.commit_statement(depth_val).await;
let commit_statement = self.inner.commit_statement(depth_val);
self.inner.raw_cmd(&commit_statement).await?;

// Lock the mutex again to modify the depth
Expand All @@ -119,7 +119,7 @@ impl<'a> Transaction for DefaultTransaction<'a> {
}

/// Rolls back the changes to the database.
async fn rollback(&mut self) -> crate::Result<i32> {
async fn rollback(&mut self) -> crate::Result<u32> {
decrement_gauge!("prisma_client_queries_active", 1.0);

// Lock the mutex and get the depth value
Expand All @@ -129,7 +129,7 @@ impl<'a> Transaction for DefaultTransaction<'a> {
};

// Perform the asynchronous operation without holding the lock
let rollback_statement = self.inner.rollback_statement(depth_val).await;
let rollback_statement = self.inner.rollback_statement(depth_val);

self.inner.raw_cmd(&rollback_statement).await?;

Expand Down
Loading

0 comments on commit f787095

Please sign in to comment.