Skip to content

Commit

Permalink
validating fastwebsockets lib for improving performance
Browse files Browse the repository at this point in the history
  • Loading branch information
maggie98choy committed Oct 10, 2024
1 parent 97b5e6b commit faafffb
Show file tree
Hide file tree
Showing 3 changed files with 235 additions and 260 deletions.
8 changes: 7 additions & 1 deletion core/main/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ local_dev = []
sysd = ["sd-notify"]
pre_prod = []
contract_tests = []
unstable-split = []

[dependencies]
base64.workspace = true
Expand All @@ -41,6 +42,8 @@ jsonrpsee = { workspace = true, features = ["macros", "ws-server"] }
futures-channel.workspace = true
futures.workspace = true
tokio-tungstenite = { workspace = true, features = ["handshake"] }
fastwebsockets = { version = "0.8.0", features = ["upgrade", "unstable-split"]}
hyper-util = { version = "0.1.0", features = ["tokio"] }
querystring.workspace = true
serde.workspace = true
regex.workspace = true
Expand All @@ -53,7 +56,10 @@ exitcode = "1.1.2"
rand = { version = "0.8", default-features = false }
url.workspace = true
futures-util = { version = "0.3.28", features = ["sink", "std"], default-features = false}
hyper = { version = "=0.14.27", features = ["client", "http1", "tcp"], default-features = false }
hyper = { version = "=1.1.0", features = ["client", "http1", "server"], default-features = false }
http-body-util = { version = "0.1.0" }
http = { version = "1.1.0"}
reqwest = { version = "0.11", features = ["json"] }
jaq-interpret = { version = "1.5.0", default-features = false }
jaq-parse = { version = "1.0.2", default-features = false }
jaq-core = "1.5.0"
Expand Down
164 changes: 91 additions & 73 deletions core/main/src/broker/http_broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

use std::vec;

use hyper::{client::HttpConnector, Body, Client, Method, Request, Response, Uri};
use http_body_util::StreamBody;

Check failure on line 20 in core/main/src/broker/http_broker.rs

View workflow job for this annotation

GitHub Actions / Run Clippy

unused import: `http_body_util::StreamBody`

Check failure on line 20 in core/main/src/broker/http_broker.rs

View workflow job for this annotation

GitHub Actions / Run Unit Tests

unused import: `http_body_util::StreamBody`

Check failure on line 20 in core/main/src/broker/http_broker.rs

View workflow job for this annotation

GitHub Actions / Generate Code Coverage

unused import: `http_body_util::StreamBody`
// use hyper::{client::HttpConnector, Body, Client, Method, Request, Response};
use reqwest::{Body, Client, Method, Request, Response};

Check failure on line 22 in core/main/src/broker/http_broker.rs

View workflow job for this annotation

GitHub Actions / Run Clippy

unused imports: `Body`, `Request`

Check failure on line 22 in core/main/src/broker/http_broker.rs

View workflow job for this annotation

GitHub Actions / Run Unit Tests

unused imports: `Body`, `Request`

Check failure on line 22 in core/main/src/broker/http_broker.rs

View workflow job for this annotation

GitHub Actions / Generate Code Coverage

unused imports: `Body`, `Request`
// use http::{Method, Request, Uri};
use ripple_sdk::{
log::{debug, error, trace},
tokio::{self, sync::mpsc},
Expand All @@ -40,44 +43,53 @@ pub struct HttpBroker {
*/

async fn send_http_request(
client: &Client<HttpConnector>,
client: &Client,
method: Method,

Check failure on line 47 in core/main/src/broker/http_broker.rs

View workflow job for this annotation

GitHub Actions / Run Clippy

unused variable: `method`

Check failure on line 47 in core/main/src/broker/http_broker.rs

View workflow job for this annotation

GitHub Actions / Run Unit Tests

unused variable: `method`

Check failure on line 47 in core/main/src/broker/http_broker.rs

View workflow job for this annotation

GitHub Actions / Generate Code Coverage

unused variable: `method`
uri: &Uri,
uri: String,
path: &str,
) -> Result<Response<Body>, RippleError> {
) -> Result<Response, RippleError> {
/*
TODO? we may need to support body for POST request in the future
*/
let http_request = Request::new(Body::empty());
let (mut parts, _) = http_request.into_parts();
//TODO, need to refactor to support other methods
parts.method = method.clone();
// let http_request = Request::new(Body::empty());
// let (mut parts, _) = http_request.into_parts();
// //TODO, need to refactor to support other methods
// parts.method = method.clone();
/*
mix endpoint url with method
*/
/*
TODONT: unwraps are bad, need to handle errors
*/

let uri: Uri = format!("{}{}", uri, path)
.parse()
.map_err(|e: InvalidUri| RippleError::BrokerError(e.to_string()))?;
let new_request = Request::builder()
.uri(uri)
.body(Body::empty())
.map_err(|e| RippleError::BrokerError(e.to_string()))?;
let (uri_parts, _) = new_request.into_parts();

parts.uri = uri_parts.uri;

let http_request = Request::from_parts(parts, Body::empty());

debug!(
"http_broker sending {} request={}",
method,
http_request.uri(),
);
match client.request(http_request).await {
// let uri: Uri = format!("{}{}", uri, path)
// .parse()
// .map_err(|e: InvalidUri| RippleError::BrokerError(e.to_string()))?;
let url = format!("{}{}", uri, path);
let response = client.post(url).body("").send().await;
// let new_request = Request::builder()
// .uri(uri)
// .body(Body::empty())
// .map_err(|e| RippleError::BrokerError(e.to_string()))?;
// let (uri_parts, _) = new_request.into_parts();

// parts.uri = uri_parts.uri;

// let http_request = Request::from_parts(parts, Body::empty());

// debug!(
// "http_broker sending {} request={}",
// method,
// http_request.uri(),
// );
// match client.request(http_request).await {
// Ok(v) => Ok(v),
// Err(e) => {
// error!("Error in server");
// Err(RippleError::BrokerError(e.to_string()))
// }
// }
match response {
Ok(v) => Ok(v),
Err(e) => {
error!("Error in server");
Expand All @@ -102,66 +114,72 @@ fn error_string_to_json(msg: &str) -> serde_json::Value {
"error": msg
})
}
async fn body_to_bytes(body: Body) -> Vec<u8> {
match hyper::body::to_bytes(body).await {
Ok(bytes) => {
let value: Vec<u8> = bytes.into();
value.as_slice().to_vec()
}
Err(e) => format!("error in http broker transforming body to bytes {}", e)
.to_string()
.as_bytes()
.to_vec(),
}
}
// async fn body_to_bytes(body: Body) -> Vec<u8> {
// match hyper::body::to_bytes(body).await {
// Ok(bytes) => {
// let value: Vec<u8> = bytes.into();
// value.as_slice().to_vec()
// }
// Err(e) => format!("error in http broker transforming body to bytes {}", e)
// .to_string()
// .as_bytes()
// .to_vec(),
// }
// }

impl EndpointBroker for HttpBroker {
fn get_broker(request: BrokerConnectRequest, callback: BrokerCallback) -> Self {
let endpoint = request.endpoint.clone();
let (tx, mut tr) = mpsc::channel(10);
let broker = BrokerSender { sender: tx };

Check warning on line 134 in core/main/src/broker/http_broker.rs

View workflow job for this annotation

GitHub Actions / Format checker

Diff in /home/runner/work/Ripple/Ripple/core/main/src/broker/http_broker.rs
let client = Client::new();
let _ = endpoint.get_url().parse().map_err(|e| error!("broker url {:?} in endpoint is invalid, cannot start http broker. error={}",endpoint,e) ).map(|uri| tokio::spawn(async move {
while let Some(request) = tr.recv().await {
debug!("http broker received request={:?}", request);
match send_http_request(&client, Method::GET, &uri, &request.clone().rule.alias)
.await
{
Ok(response) => {
let (parts, body) = response.into_parts();
let body = body_to_bytes(body).await;
let mut request = request;
if let Ok(json_str) = serde_json::from_slice::<serde_json::Value>(&body).map(|v| vec![v])
.and_then(|v| serde_json::to_string(&v))
{
request.rpc.params_json = json_str;
let response = Self::update_request(&request);
trace!(
"http broker response={:?} to request: {:?} using rule={:?}",
response, request, request.rule
);

send_broker_response(&callback, &request, &body).await;
if !parts.status.is_success() {
error!(
"http error {} returned from http service in http broker {:?}",
parts.status, body
// let _ = endpoint.get_url().parse().map_err(|e| error!("broker url {:?} in endpoint is invalid, cannot start http broker. error={}",endpoint,e) ).map(|uri| tokio::spawn(async move {
let url = endpoint.get_url();
tokio::spawn(async move {
while let Some(request) = tr.recv().await {
debug!("http broker received request={:?}", request);
match send_http_request(&client, Method::GET, url.clone(), &request.clone().rule.alias)
.await
{
Ok(response) => {
// let (parts, body) = response.into_parts();

// let body = body_to_bytes(body).await;
let parts = response.status();
let body = response.bytes().await.unwrap().to_vec();

let mut request = request;
if let Ok(json_str) = serde_json::from_slice::<serde_json::Value>(&body).map(|v| vec![v])
.and_then(|v| serde_json::to_string(&v))
{
request.rpc.params_json = json_str;
let response = Self::update_request(&request);
trace!(
"http broker response={:?} to request: {:?} using rule={:?}",
response, request, request.rule
);

send_broker_response(&callback, &request, &body).await;
if !parts.is_success() {
error!(
"http error {} returned from http service in http broker {:?}",
parts.as_str(), body
);
}
} else {
let msg = format!("Error in http broker parsing response from http service at {}. status={:?}",url, parts);
error!("{}",msg);
send_broker_response(&callback, &request, error_string_to_json(msg.as_str()).to_string().as_bytes()).await;
}
} else {
let msg = format!("Error in http broker parsing response from http service at {}. status={:?}",uri, parts.status);
}
Err(err) => {
let msg = format!("An error message from calling the downstream http service={} in http broker {:?}", url, err);
error!("{}",msg);
send_broker_response(&callback, &request, error_string_to_json(msg.as_str()).to_string().as_bytes()).await;
}
}
Err(err) => {
let msg = format!("An error message from calling the downstream http service={} in http broker {:?}", uri, err);
error!("{}",msg);
send_broker_response(&callback, &request, error_string_to_json(msg.as_str()).to_string().as_bytes()).await;
}
}
}
}));
});

Self {
sender: broker,
Expand Down
Loading

0 comments on commit faafffb

Please sign in to comment.