From fb84de23473fab349c142c33e1536468c040b439 Mon Sep 17 00:00:00 2001 From: Ryan Levick Date: Mon, 16 Oct 2023 21:12:03 +0200 Subject: [PATCH 1/2] Move mysql interface to resources Signed-off-by: Ryan Levick --- Cargo.lock | 1 + crates/outbound-mysql/Cargo.toml | 9 +- crates/outbound-mysql/src/lib.rs | 190 +++++++++++------- examples/rust-outbound-mysql/src/lib.rs | 15 +- examples/spin-timer/Cargo.lock | 1 + sdk/rust/src/mysql.rs | 4 +- .../http-rust-outbound-mysql/src/lib.rs | 14 +- wit/preview2/mysql.wit | 24 +++ wit/preview2/world.wit | 2 +- 9 files changed, 174 insertions(+), 86 deletions(-) create mode 100644 wit/preview2/mysql.wit diff --git a/Cargo.lock b/Cargo.lock index 90eabbae4..ecb761d0b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4159,6 +4159,7 @@ dependencies = [ "mysql_common", "spin-core", "spin-world", + "table", "tokio", "tracing", "url", diff --git a/crates/outbound-mysql/Cargo.toml b/crates/outbound-mysql/Cargo.toml index 797050198..a4fd67bfb 100644 --- a/crates/outbound-mysql/Cargo.toml +++ b/crates/outbound-mysql/Cargo.toml @@ -11,11 +11,14 @@ doctest = false anyhow = "1.0" flate2 = "1.0.17" # Removing default features for mysql_async to remove flate2/zlib feature -mysql_async = { version = "0.32.2", default-features = false, features = ["native-tls-tls"] } +mysql_async = { version = "0.32.2", default-features = false, features = [ + "native-tls-tls", +] } # Removing default features for mysql_common to remove flate2/zlib feature mysql_common = { version = "0.30.6", default-features = false } spin-core = { path = "../core" } spin-world = { path = "../world" } -tokio = { version = "1", features = [ "rt-multi-thread" ] } -tracing = { version = "0.1", features = [ "log" ] } +table = { path = "../table" } +tokio = { version = "1", features = ["rt-multi-thread"] } +tracing = { version = "0.1", features = ["log"] } url = "2.3.1" diff --git a/crates/outbound-mysql/src/lib.rs b/crates/outbound-mysql/src/lib.rs index 9cd8322cd..d88dc6c1b 100644 --- a/crates/outbound-mysql/src/lib.rs +++ b/crates/outbound-mysql/src/lib.rs @@ -1,19 +1,28 @@ use anyhow::Result; -pub use mysql::add_to_linker; use mysql_async::{consts::ColumnType, from_value_opt, prelude::*, Opts, OptsBuilder, SslOpts}; +use spin_core::wasmtime::component::Resource; use spin_core::{async_trait, HostComponent}; -use spin_world::v1::{ - mysql::{self, MysqlError}, - rdbms_types::{Column, DbDataType, DbValue, ParameterValue, RowSet}, -}; -use std::collections::HashMap; +use spin_world::v1::mysql as v1; +use spin_world::v1::rdbms_types::{Column, DbDataType, DbValue, ParameterValue, RowSet}; +use spin_world::v2::mysql::{self as v2, Connection}; use std::sync::Arc; use url::Url; /// A simple implementation to support outbound mysql connection #[derive(Default)] pub struct OutboundMysql { - pub connections: HashMap, + pub connections: table::Table, +} + +impl OutboundMysql { + async fn get_conn( + &mut self, + connection: Resource, + ) -> Result<&mut mysql_async::Conn, v2::Error> { + self.connections + .get_mut(connection.rep()) + .ok_or_else(|| v2::Error::ConnectionFailed("no connection found".into())) + } } impl HostComponent for OutboundMysql { @@ -23,7 +32,8 @@ impl HostComponent for OutboundMysql { linker: &mut spin_core::Linker, get: impl Fn(&mut spin_core::Data) -> &mut Self::Data + Send + Sync + Copy + 'static, ) -> anyhow::Result<()> { - mysql::add_to_linker(linker, get) + v2::add_to_linker(linker, get)?; + v1::add_to_linker(linker, get) } fn build_data(&self) -> Self::Data { @@ -31,29 +41,39 @@ impl HostComponent for OutboundMysql { } } +impl v2::Host for OutboundMysql {} + #[async_trait] -impl mysql::Host for OutboundMysql { +impl v2::HostConnection for OutboundMysql { + async fn open(&mut self, address: String) -> Result, v2::Error>> { + Ok(async { + self.connections + .push( + build_conn(&address) + .await + .map_err(|e| v2::Error::ConnectionFailed(format!("{e:?}")))?, + ) + .map_err(|_| v2::Error::Other("too many connections".into())) + .map(Resource::new_own) + } + .await) + } + async fn execute( &mut self, - address: String, + connection: Resource, statement: String, params: Vec, - ) -> Result> { + ) -> Result> { Ok(async { - let db_params = params - .iter() - .map(to_sql_parameter) - .collect::>>() - .map_err(|e| MysqlError::QueryFailed(format!("{:?}", e)))?; - + let db_params = params.into_iter().map(to_sql_parameter).collect::>(); let parameters = mysql_async::Params::Positional(db_params); - self.get_conn(&address) - .await - .map_err(|e| MysqlError::ConnectionFailed(format!("{:?}", e)))? + self.get_conn(connection) + .await? .exec_batch(&statement, &[parameters]) .await - .map_err(|e| MysqlError::QueryFailed(format!("{:?}", e)))?; + .map_err(|e| v2::Error::QueryFailed(format!("{:?}", e)))?; Ok(()) } @@ -62,38 +82,31 @@ impl mysql::Host for OutboundMysql { async fn query( &mut self, - address: String, + connection: Resource, statement: String, params: Vec, - ) -> Result> { + ) -> Result> { Ok(async { - let db_params = params - .iter() - .map(to_sql_parameter) - .collect::>>() - .map_err(|e| MysqlError::QueryFailed(format!("{:?}", e)))?; - + let db_params = params.into_iter().map(to_sql_parameter).collect::>(); let parameters = mysql_async::Params::Positional(db_params); let mut query_result = self - .get_conn(&address) - .await - .map_err(|e| MysqlError::ConnectionFailed(format!("{:?}", e)))? + .get_conn(connection) + .await? .exec_iter(&statement, parameters) .await - .map_err(|e| MysqlError::QueryFailed(format!("{:?}", e)))?; + .map_err(|e| v2::Error::QueryFailed(format!("{:?}", e)))?; // We have to get these before collect() destroys them let columns = convert_columns(query_result.columns()); match query_result.collect::().await { - Err(e) => Err(MysqlError::OtherError(format!("{:?}", e))), + Err(e) => Err(v2::Error::Other(e.to_string())), Ok(result_set) => { let rows = result_set .into_iter() .map(|row| convert_row(row, &columns)) - .collect::, _>>() - .map_err(|e| MysqlError::QueryFailed(format!("{:?}", e)))?; + .collect::, _>>()?; Ok(RowSet { columns, rows }) } @@ -101,24 +114,73 @@ impl mysql::Host for OutboundMysql { } .await) } + + fn drop(&mut self, connection: Resource) -> Result<()> { + self.connections.remove(connection.rep()); + Ok(()) + } +} + +/// Delegate a function call to the v2::HostConnection implementation +macro_rules! delegate { + ($self:ident.$name:ident($address:expr, $($arg:expr),*)) => {{ + let connection = match ::open($self, $address).await? { + Ok(c) => c, + Err(e) => return Ok(Err(to_legacy_error(e))), + }; + Ok(::$name($self, connection, $($arg),*) + .await? + .map_err(|e| to_legacy_error(e))) + }}; +} + +#[async_trait] +impl v1::Host for OutboundMysql { + async fn execute( + &mut self, + address: String, + statement: String, + params: Vec, + ) -> Result> { + delegate!(self.execute(address, statement, params)) + } + + async fn query( + &mut self, + address: String, + statement: String, + params: Vec, + ) -> Result> { + delegate!(self.query(address, statement, params)) + } +} + +fn to_legacy_error(error: v2::Error) -> v1::MysqlError { + match error { + v2::Error::ConnectionFailed(e) => v1::MysqlError::ConnectionFailed(e), + v2::Error::BadParameter(e) => v1::MysqlError::BadParameter(e), + v2::Error::QueryFailed(e) => v1::MysqlError::QueryFailed(e), + v2::Error::ValueConversionFailed(e) => v1::MysqlError::ValueConversionFailed(e), + v2::Error::Other(e) => v1::MysqlError::OtherError(e), + } } -fn to_sql_parameter(value: &ParameterValue) -> anyhow::Result { +fn to_sql_parameter(value: ParameterValue) -> mysql_async::Value { match value { - ParameterValue::Boolean(v) => Ok(mysql_async::Value::from(v)), - ParameterValue::Int32(v) => Ok(mysql_async::Value::from(v)), - ParameterValue::Int64(v) => Ok(mysql_async::Value::from(v)), - ParameterValue::Int8(v) => Ok(mysql_async::Value::from(v)), - ParameterValue::Int16(v) => Ok(mysql_async::Value::from(v)), - ParameterValue::Floating32(v) => Ok(mysql_async::Value::from(v)), - ParameterValue::Floating64(v) => Ok(mysql_async::Value::from(v)), - ParameterValue::Uint8(v) => Ok(mysql_async::Value::from(v)), - ParameterValue::Uint16(v) => Ok(mysql_async::Value::from(v)), - ParameterValue::Uint32(v) => Ok(mysql_async::Value::from(v)), - ParameterValue::Uint64(v) => Ok(mysql_async::Value::from(v)), - ParameterValue::Str(v) => Ok(mysql_async::Value::from(v)), - ParameterValue::Binary(v) => Ok(mysql_async::Value::from(v)), - ParameterValue::DbNull => Ok(mysql_async::Value::NULL), + ParameterValue::Boolean(v) => mysql_async::Value::from(v), + ParameterValue::Int32(v) => mysql_async::Value::from(v), + ParameterValue::Int64(v) => mysql_async::Value::from(v), + ParameterValue::Int8(v) => mysql_async::Value::from(v), + ParameterValue::Int16(v) => mysql_async::Value::from(v), + ParameterValue::Floating32(v) => mysql_async::Value::from(v), + ParameterValue::Floating64(v) => mysql_async::Value::from(v), + ParameterValue::Uint8(v) => mysql_async::Value::from(v), + ParameterValue::Uint16(v) => mysql_async::Value::from(v), + ParameterValue::Uint32(v) => mysql_async::Value::from(v), + ParameterValue::Uint64(v) => mysql_async::Value::from(v), + ParameterValue::Str(v) => mysql_async::Value::from(v), + ParameterValue::Binary(v) => mysql_async::Value::from(v), + ParameterValue::DbNull => mysql_async::Value::NULL, } } @@ -130,7 +192,7 @@ fn convert_columns(columns: Option>) -> Vec { } fn convert_column(column: &mysql_async::Column) -> Column { - let name = column.name_str().to_string(); + let name = column.name_str().into_owned(); let data_type = convert_data_type(column); Column { name, data_type } @@ -192,7 +254,7 @@ fn is_binary(column: &mysql_async::Column) -> bool { .contains(mysql_async::consts::ColumnFlags::BINARY_FLAG) } -fn convert_row(mut row: mysql_async::Row, columns: &[Column]) -> Result, MysqlError> { +fn convert_row(mut row: mysql_async::Row, columns: &[Column]) -> Result, v2::Error> { let mut result = Vec::with_capacity(row.len()); for index in 0..row.len() { result.push(convert_entry(&mut row, index, columns)?); @@ -204,10 +266,10 @@ fn convert_entry( row: &mut mysql_async::Row, index: usize, columns: &[Column], -) -> Result { +) -> Result { match (row.take(index), columns.get(index)) { (None, _) => Ok(DbValue::DbNull), // TODO: is this right or is this an "index out of range" thing - (_, None) => Err(MysqlError::OtherError(format!( + (_, None) => Err(v2::Error::Other(format!( "Can't get column at index {}", index ))), @@ -216,7 +278,7 @@ fn convert_entry( } } -fn convert_value(value: mysql_async::Value, column: &Column) -> Result { +fn convert_value(value: mysql_async::Value, column: &Column) -> Result { match column.data_type { DbDataType::Binary => convert_value_to::>(value).map(DbValue::Binary), DbDataType::Boolean => convert_value_to::(value).map(DbValue::Boolean), @@ -231,23 +293,13 @@ fn convert_value(value: mysql_async::Value, column: &Column) -> Result convert_value_to::(value).map(DbValue::Uint16), DbDataType::Uint32 => convert_value_to::(value).map(DbValue::Uint32), DbDataType::Uint64 => convert_value_to::(value).map(DbValue::Uint64), - DbDataType::Other => Err(MysqlError::ValueConversionFailed(format!( + DbDataType::Other => Err(v2::Error::ValueConversionFailed(format!( "Cannot convert value {:?} in column {} data type {:?}", value, column.name, column.data_type ))), } } -impl OutboundMysql { - async fn get_conn(&mut self, address: &str) -> anyhow::Result<&mut mysql_async::Conn> { - let client = match self.connections.entry(address.to_owned()) { - std::collections::hash_map::Entry::Occupied(o) => o.into_mut(), - std::collections::hash_map::Entry::Vacant(v) => v.insert(build_conn(address).await?), - }; - Ok(client) - } -} - async fn build_conn(address: &str) -> Result { tracing::log::debug!("Build new connection: {}", address); @@ -295,8 +347,8 @@ fn build_opts(address: &str) -> Result { .into()) } -fn convert_value_to(value: mysql_async::Value) -> Result { - from_value_opt::(value).map_err(|e| MysqlError::ValueConversionFailed(format!("{}", e))) +fn convert_value_to(value: mysql_async::Value) -> Result { + from_value_opt::(value).map_err(|e| v2::Error::ValueConversionFailed(format!("{}", e))) } #[cfg(test)] diff --git a/examples/rust-outbound-mysql/src/lib.rs b/examples/rust-outbound-mysql/src/lib.rs index 924b9ba55..9334f00a1 100644 --- a/examples/rust-outbound-mysql/src/lib.rs +++ b/examples/rust-outbound-mysql/src/lib.rs @@ -91,9 +91,10 @@ fn body_json_to_map(req: &Request) -> Result> { fn list() -> Result { let address = std::env::var(DB_URL_ENV)?; + let conn = mysql::Connection::open(&address)?; let sql = "SELECT id, name, prey, is_finicky FROM pets"; - let rowset = mysql::query(&address, sql, &[])?; + let rowset = conn.query(sql, &[])?; let column_summary = rowset .columns @@ -124,10 +125,11 @@ fn list() -> Result { fn get(id: i32) -> Result { let address = std::env::var(DB_URL_ENV)?; + let conn = mysql::Connection::open(&address)?; let sql = "SELECT id, name, prey, is_finicky FROM pets WHERE id = ?"; let params = vec![ParameterValue::Int32(id)]; - let rowset = mysql::query(&address, sql, ¶ms)?; + let rowset = conn.query(sql, ¶ms)?; match rowset.rows.first() { None => Ok(http::Response::builder().status(404).body(None)?), @@ -143,8 +145,9 @@ fn get(id: i32) -> Result { fn create(name: String, prey: Option, is_finicky: bool) -> Result { let address = std::env::var(DB_URL_ENV)?; + let conn = mysql::Connection::open(&address)?; - let id = max_pet_id(&address)? + 1; + let id = max_pet_id(&conn)? + 1; let prey_param = match prey { None => ParameterValue::DbNull, @@ -160,7 +163,7 @@ fn create(name: String, prey: Option, is_finicky: bool) -> Result String { format!("{}: {:?}", column.name, column.data_type) } -fn max_pet_id(address: &str) -> Result { +fn max_pet_id(conn: &mysql::Connection) -> Result { let sql = "SELECT MAX(id) FROM pets"; - let rowset = mysql::query(address, sql, &[])?; + let rowset = conn.query(sql, &[])?; match rowset.rows.first() { None => Ok(0), diff --git a/examples/spin-timer/Cargo.lock b/examples/spin-timer/Cargo.lock index 1bf801c66..1b864dde3 100644 --- a/examples/spin-timer/Cargo.lock +++ b/examples/spin-timer/Cargo.lock @@ -2551,6 +2551,7 @@ dependencies = [ "mysql_common", "spin-core", "spin-world", + "table", "tokio", "tracing", "url", diff --git a/sdk/rust/src/mysql.rs b/sdk/rust/src/mysql.rs index c91a13c56..2a7ac8d28 100644 --- a/sdk/rust/src/mysql.rs +++ b/sdk/rust/src/mysql.rs @@ -18,8 +18,10 @@ //! | `String` | str(string) | VARCHAR, CHAR, TEXT | //! | `Vec` | binary(list\) | VARBINARY, BINARY, BLOB | -pub use super::wit::v1::mysql::{execute, query, MysqlError}; +#[doc(inline)] pub use super::wit::v1::rdbms_types::*; +#[doc(inline)] +pub use super::wit::v2::mysql::{Connection, Error as MysqlError}; /// A MySQL error #[derive(Debug, thiserror::Error)] diff --git a/tests/testcases/http-rust-outbound-mysql/src/lib.rs b/tests/testcases/http-rust-outbound-mysql/src/lib.rs index 474506689..5f3ac0a4c 100644 --- a/tests/testcases/http-rust-outbound-mysql/src/lib.rs +++ b/tests/testcases/http-rust-outbound-mysql/src/lib.rs @@ -51,6 +51,7 @@ fn process(req: Request) -> Result { fn test_numeric_types(_req: Request) -> Result { let address = std::env::var(DB_URL_ENV)?; + let conn = mysql::Connection::open(&address)?; let create_table_sql = r#" CREATE TEMPORARY TABLE test_numeric_types ( @@ -71,7 +72,7 @@ fn test_numeric_types(_req: Request) -> Result { ); "#; - mysql::execute(&address, create_table_sql, &[])?; + conn.execute(create_table_sql, &[])?; let insert_sql = r#" INSERT INTO test_numeric_types @@ -80,7 +81,7 @@ fn test_numeric_types(_req: Request) -> Result { (0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1); "#; - mysql::execute(&address, insert_sql, &[])?; + conn.execute(insert_sql, &[])?; let sql = r#" SELECT @@ -101,7 +102,7 @@ fn test_numeric_types(_req: Request) -> Result { FROM test_numeric_types; "#; - let rowset = mysql::query(&address, sql, &[])?; + let rowset = conn.query(sql, &[])?; let column_summary = rowset .columns @@ -162,6 +163,7 @@ fn test_numeric_types(_req: Request) -> Result { fn test_character_types(_req: Request) -> Result { let address = std::env::var(DB_URL_ENV)?; + let conn = mysql::Connection::open(&address)?; let create_table_sql = r#" CREATE TEMPORARY TABLE test_character_types ( @@ -174,7 +176,7 @@ fn test_character_types(_req: Request) -> Result { ); "#; - mysql::execute(&address, create_table_sql, &[]) + conn.execute(create_table_sql, &[]) .map_err(|e| anyhow!("Error executing MySQL command: {:?}", e))?; let insert_sql = r#" @@ -184,7 +186,7 @@ fn test_character_types(_req: Request) -> Result { ('rvarchar', 'rtext', 'rchar', 'a', 'a', 'a'); "#; - mysql::execute(&address, insert_sql, &[])?; + conn.execute(insert_sql, &[])?; let sql = r#" SELECT @@ -192,7 +194,7 @@ fn test_character_types(_req: Request) -> Result { FROM test_character_types; "#; - let rowset = mysql::query(&address, sql, &[])?; + let rowset = conn.query(sql, &[])?; let column_summary = rowset .columns diff --git a/wit/preview2/mysql.wit b/wit/preview2/mysql.wit new file mode 100644 index 000000000..f65a0cf20 --- /dev/null +++ b/wit/preview2/mysql.wit @@ -0,0 +1,24 @@ +interface mysql { + use fermyon:spin/rdbms-types.{parameter-value, row-set} + + /// Errors related to interacting with Mysql. + variant error { + connection-failed(string), + bad-parameter(string), + query-failed(string), + value-conversion-failed(string), + other(string) + } + + /// A connection to a mysql database. + resource connection { + /// Open a connection to the Postgres instance at `address`. + open: static func(address: string) -> result + + /// query the database: select + query: func(statement: string, params: list) -> result + + /// execute command to the database: insert, update, delete + execute: func(statement: string, params: list) -> result<_, error> + } +} diff --git a/wit/preview2/world.wit b/wit/preview2/world.wit index 5aa029475..0a2c9c7c4 100644 --- a/wit/preview2/world.wit +++ b/wit/preview2/world.wit @@ -18,11 +18,11 @@ world http-trigger { world platform { import fermyon:spin/config import fermyon:spin/http - import fermyon:spin/mysql import fermyon:spin/llm import redis import postgres + import mysql import sqlite import key-value } From 8d3005548742bcf7df8ef55427252d8e978c5caf Mon Sep 17 00:00:00 2001 From: Ryan Levick Date: Wed, 18 Oct 2023 09:14:23 +0200 Subject: [PATCH 2/2] PR feedback Signed-off-by: Ryan Levick --- crates/outbound-mysql/src/lib.rs | 2 +- crates/outbound-pg/src/lib.rs | 2 +- wit/preview2/mysql.wit | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/outbound-mysql/src/lib.rs b/crates/outbound-mysql/src/lib.rs index d88dc6c1b..8c2cd639a 100644 --- a/crates/outbound-mysql/src/lib.rs +++ b/crates/outbound-mysql/src/lib.rs @@ -53,7 +53,7 @@ impl v2::HostConnection for OutboundMysql { .await .map_err(|e| v2::Error::ConnectionFailed(format!("{e:?}")))?, ) - .map_err(|_| v2::Error::Other("too many connections".into())) + .map_err(|_| v2::Error::ConnectionFailed("too many connections".into())) .map(Resource::new_own) } .await) diff --git a/crates/outbound-pg/src/lib.rs b/crates/outbound-pg/src/lib.rs index 4b6bb8631..980d728e5 100644 --- a/crates/outbound-pg/src/lib.rs +++ b/crates/outbound-pg/src/lib.rs @@ -56,7 +56,7 @@ impl v2::HostConnection for OutboundPg { .await .map_err(|e| v2::Error::ConnectionFailed(format!("{e:?}")))?, ) - .map_err(|_| v2::Error::Other("too many connections".into())) + .map_err(|_| v2::Error::ConnectionFailed("too many connections".into())) .map(Resource::new_own) } .await) diff --git a/wit/preview2/mysql.wit b/wit/preview2/mysql.wit index f65a0cf20..afc436057 100644 --- a/wit/preview2/mysql.wit +++ b/wit/preview2/mysql.wit @@ -1,7 +1,7 @@ interface mysql { use fermyon:spin/rdbms-types.{parameter-value, row-set} - /// Errors related to interacting with Mysql. + /// Errors related to interacting with MySQL. variant error { connection-failed(string), bad-parameter(string), @@ -10,9 +10,9 @@ interface mysql { other(string) } - /// A connection to a mysql database. + /// A connection to a MySQL database. resource connection { - /// Open a connection to the Postgres instance at `address`. + /// Open a connection to the MySQL instance at `address`. open: static func(address: string) -> result /// query the database: select