diff --git a/core/main/src/broker/http_broker.rs b/core/main/src/broker/http_broker.rs index 5430bc3df..80ba78a3a 100644 --- a/core/main/src/broker/http_broker.rs +++ b/core/main/src/broker/http_broker.rs @@ -134,52 +134,71 @@ impl EndpointBroker for HttpBroker { let broker = BrokerSender { sender: tx }; let client = Client::new(); // let _ = endpoint.get_url().parse().map_err(|e| error!("broker url {:?} in endpoint is invalid, cannot start http broker. error={}",endpoint,e) ).map(|uri| tokio::spawn(async move { - let url = endpoint.get_url(); - tokio::spawn(async move { - while let Some(request) = tr.recv().await { - debug!("http broker received request={:?}", request); - match send_http_request(&client, Method::GET, url.clone(), &request.clone().rule.alias) - .await - { - Ok(response) => { - // let (parts, body) = response.into_parts(); - - // let body = body_to_bytes(body).await; - let parts = response.status(); - let body = response.bytes().await.unwrap().to_vec(); - - let mut request = request; - if let Ok(json_str) = serde_json::from_slice::(&body).map(|v| vec![v]) - .and_then(|v| serde_json::to_string(&v)) - { - request.rpc.params_json = json_str; - let response = Self::update_request(&request); - trace!( - "http broker response={:?} to request: {:?} using rule={:?}", - response, request, request.rule + let url = endpoint.get_url(); + tokio::spawn(async move { + while let Some(request) = tr.recv().await { + debug!("http broker received request={:?}", request); + match send_http_request( + &client, + Method::GET, + url.clone(), + &request.clone().rule.alias, + ) + .await + { + Ok(response) => { + // let (parts, body) = response.into_parts(); + + // let body = body_to_bytes(body).await; + let parts = response.status(); + let body = response.bytes().await.unwrap().to_vec(); + + let mut request = request; + if let Ok(json_str) = serde_json::from_slice::(&body) + .map(|v| vec![v]) + .and_then(|v| serde_json::to_string(&v)) + { + request.rpc.params_json = json_str; + let response = Self::update_request(&request); + trace!( + "http broker response={:?} to request: {:?} using rule={:?}", + response, + request, + request.rule + ); + + send_broker_response(&callback, &request, &body).await; + if !parts.is_success() { + error!( + "http error {} returned from http service in http broker {:?}", + parts.as_str(), + body ); - - send_broker_response(&callback, &request, &body).await; - if !parts.is_success() { - error!( - "http error {} returned from http service in http broker {:?}", - parts.as_str(), body - ); - } - } else { - let msg = format!("Error in http broker parsing response from http service at {}. status={:?}",url, parts); - error!("{}",msg); - send_broker_response(&callback, &request, error_string_to_json(msg.as_str()).to_string().as_bytes()).await; } + } else { + let msg = format!("Error in http broker parsing response from http service at {}. status={:?}",url, parts); + error!("{}", msg); + send_broker_response( + &callback, + &request, + error_string_to_json(msg.as_str()).to_string().as_bytes(), + ) + .await; } - Err(err) => { - let msg = format!("An error message from calling the downstream http service={} in http broker {:?}", url, err); - error!("{}",msg); - send_broker_response(&callback, &request, error_string_to_json(msg.as_str()).to_string().as_bytes()).await; - } + } + Err(err) => { + let msg = format!("An error message from calling the downstream http service={} in http broker {:?}", url, err); + error!("{}", msg); + send_broker_response( + &callback, + &request, + error_string_to_json(msg.as_str()).to_string().as_bytes(), + ) + .await; } } - }); + } + }); Self { sender: broker, diff --git a/core/main/src/firebolt/firebolt_ws.rs b/core/main/src/firebolt/firebolt_ws.rs index d4728c772..60a9b7cce 100644 --- a/core/main/src/firebolt/firebolt_ws.rs +++ b/core/main/src/firebolt/firebolt_ws.rs @@ -25,8 +25,17 @@ use crate::{ session_state::Session, }, }; +use fastwebsockets::WebSocketError; +use fastwebsockets::{upgrade, FragmentCollectorRead}; use futures::SinkExt; use futures::StreamExt; +use http_body_util::Empty; +use hyper::body::Bytes; +use hyper::body::Incoming; +use hyper::server::conn::http1; +use hyper::service::service_fn; +use hyper::Request; +use hyper::Response; use jsonrpsee::types::{error::ErrorCode, ErrorResponse, Id}; use ripple_sdk::{ api::gateway::rpc_gateway_api::{ApiMessage, ApiProtocol, ClientContext, RpcRequest}, @@ -39,15 +48,6 @@ use ripple_sdk::{ uuid::Uuid, }; use ripple_sdk::{log::debug, tokio}; -use fastwebsockets::{upgrade, FragmentCollectorRead}; -use fastwebsockets::WebSocketError; -use hyper::service::service_fn; -use hyper::Request; -use hyper::Response; -use http_body_util::Empty; -use hyper::body::Bytes; -use hyper::body::Incoming; -use hyper::server::conn::http1; #[allow(dead_code)] pub struct FireboltWs {} @@ -92,10 +92,21 @@ impl FireboltWs { let io = hyper_util::rt::TokioIo::new(stream); // let conn_fut = Http::new().serve_connection(io, service_fn(Self::server_upgrade)); let conn_fut = http1::Builder::new() - .serve_connection(io, service_fn(move |req| Self::handle_connection(req, state_for_connection_c.clone(), secure.clone(), internal_app_id_c.clone()))).with_upgrades(); - if let Err(e) = conn_fut.await { - println!("An error occurred: {:?}", e); - } + .serve_connection( + io, + service_fn(move |req| { + Self::handle_connection( + req, + state_for_connection_c.clone(), + secure.clone(), + internal_app_id_c.clone(), + ) + }), + ) + .with_upgrades(); + if let Err(e) = conn_fut.await { + println!("An error occurred: {:?}", e); + } }); } } @@ -104,8 +115,8 @@ impl FireboltWs { mut req: Request, state: PlatformState, gateway_secure: bool, - internal_app_id: Option - ) -> Result>, WebSocketError> { + internal_app_id: Option, + ) -> Result>, WebSocketError> { let (response, fut) = upgrade::upgrade(&mut req)?; let uri = req.uri(); let state_for_connection = state.clone(); @@ -124,9 +135,15 @@ impl FireboltWs { // let app_id = tags.get("app_id").unwrap_or(&"".to_string()).to_string(); let ctx = ClientContext { - session_id: query_map.get("sessionId").unwrap_or(&(Uuid::new_v4().to_string()).to_string()).to_string(), - app_id: query_map.get("appId").unwrap_or(&(Uuid::new_v4().to_string()).to_string()).to_string(), - gateway_secure: gateway_secure + session_id: query_map + .get("sessionId") + .unwrap_or(&(Uuid::new_v4().to_string()).to_string()) + .to_string(), + app_id: query_map + .get("appId") + .unwrap_or(&(Uuid::new_v4().to_string()).to_string()) + .to_string(), + gateway_secure: gateway_secure, }; let session = Session::new( ctx.app_id.clone(), @@ -162,100 +179,117 @@ impl FireboltWs { } //end of register connection session - - tokio::task::spawn(async move { - if let Err(e) = tokio::task::unconstrained(Self::handle_request(fut, state_for_connection, gateway_secure, resp_rx, ctx, connection_id_c)).await { - eprintln!("Error in websocket connection: {}", e); - } + if let Err(e) = tokio::task::unconstrained(Self::handle_request( + fut, + state_for_connection, + gateway_secure, + resp_rx, + ctx, + connection_id_c, + )) + .await + { + eprintln!("Error in websocket connection: {}", e); + } }); - + Ok(response) - } + } - async fn handle_request(fut: upgrade::UpgradeFut, state: PlatformState, secure: bool, mut resp_rx: mpsc::Receiver, ctx: ClientContext, connection_id: String) -> Result<(), WebSocketError> { - // let mut ws = fastwebsockets::FragmentCollector::new(fut.await?); - let ws = fut.await?; - let (sender, mut receiver) = ws.split(tokio::io::split); - let mut sender = FragmentCollectorRead::new(sender); + async fn handle_request( + fut: upgrade::UpgradeFut, + state: PlatformState, + secure: bool, + mut resp_rx: mpsc::Receiver, + ctx: ClientContext, + connection_id: String, + ) -> Result<(), WebSocketError> { + // let mut ws = fastwebsockets::FragmentCollector::new(fut.await?); + let ws = fut.await?; + let (sender, mut receiver) = ws.split(tokio::io::split); + let mut sender = FragmentCollectorRead::new(sender); - tokio::spawn(async move { - while let Some(rs) = resp_rx.recv().await { - let send_result = receiver.write_frame(fastwebsockets::Frame::text(rs.jsonrpc_msg.into_bytes().to_vec().into())).await; - match send_result { - Ok(_) => { - debug!( - "Sent Firebolt response", - ); + tokio::spawn(async move { + while let Some(rs) = resp_rx.recv().await { + let send_result = receiver + .write_frame(fastwebsockets::Frame::text( + rs.jsonrpc_msg.into_bytes().to_vec().into(), + )) + .await; + match send_result { + Ok(_) => { + debug!("Sent Firebolt response",); + } + Err(err) => error!("{:?}", err), } - Err(err) => error!("{:?}", err), } - } - // debug!( - // "api msg rx closed {} {} {}", - // app_id_c, session_id_c, connection_id_c - // ); - }); + // debug!( + // "api msg rx closed {} {} {}", + // app_id_c, session_id_c, connection_id_c + // ); + }); - loop { - let frame = sender.read_frame::<_, WebSocketError>(&mut move |_| async { - unreachable!(); - }) - .await?; - let msg = String::from_utf8(frame.payload.to_vec()); - match msg { - Ok(msg) => { - if !msg.is_empty() { - //let req_text = String::from(msg.to_text().unwrap()); - let req_id = Uuid::new_v4().to_string(); - if let Ok(request) = RpcRequest::parse( - msg.clone(), - ctx.app_id.clone(), - ctx.session_id.clone(), - req_id.clone(), - Some(connection_id.clone()), - ctx.gateway_secure, - ) { - debug!( - "firebolt_ws Received Firebolt request {} {} {}", - connection_id, request.ctx.request_id, request.method - ); - let msg = FireboltGatewayCommand::HandleRpc { request }; - let client = state.get_client(); - let res = client.clone().send_gateway_command(msg); - if let Err(e) = res { - error!("failed to send request {:?}", e); - } - } else { - if let Some(session) = &state - .session_state - .get_session_for_connection_id(&connection_id) - { - use ripple_sdk::api::apps::EffectiveTransport; - let err = - ErrorResponse::new(ErrorCode::InvalidRequest.into(), Id::Null); - let msg = serde_json::to_string(&err).unwrap(); - let api_msg = - ApiMessage::new(ApiProtocol::JsonRpc, msg, req_id.clone()); - match session.get_transport() { - EffectiveTransport::Bridge(id) => { - let _ = state.send_to_bridge(id, api_msg).await; - } - EffectiveTransport::Websocket => { - let _ = session.send_json_rpc(api_msg).await; + loop { + let frame = sender + .read_frame::<_, WebSocketError>(&mut move |_| async { + unreachable!(); + }) + .await?; + let msg = String::from_utf8(frame.payload.to_vec()); + match msg { + Ok(msg) => { + if !msg.is_empty() { + //let req_text = String::from(msg.to_text().unwrap()); + let req_id = Uuid::new_v4().to_string(); + if let Ok(request) = RpcRequest::parse( + msg.clone(), + ctx.app_id.clone(), + ctx.session_id.clone(), + req_id.clone(), + Some(connection_id.clone()), + ctx.gateway_secure, + ) { + debug!( + "firebolt_ws Received Firebolt request {} {} {}", + connection_id, request.ctx.request_id, request.method + ); + let msg = FireboltGatewayCommand::HandleRpc { request }; + let client = state.get_client(); + let res = client.clone().send_gateway_command(msg); + if let Err(e) = res { + error!("failed to send request {:?}", e); + } + } else { + if let Some(session) = &state + .session_state + .get_session_for_connection_id(&connection_id) + { + use ripple_sdk::api::apps::EffectiveTransport; + let err = + ErrorResponse::new(ErrorCode::InvalidRequest.into(), Id::Null); + let msg = serde_json::to_string(&err).unwrap(); + let api_msg = + ApiMessage::new(ApiProtocol::JsonRpc, msg, req_id.clone()); + match session.get_transport() { + EffectiveTransport::Bridge(id) => { + let _ = state.send_to_bridge(id, api_msg).await; + } + EffectiveTransport::Websocket => { + let _ = session.send_json_rpc(api_msg).await; + } } } + // error!("invalid message {}", req_text) } - // error!("invalid message {}", req_text) } } - } - - Err(e) => { - // error!("ws error cid={} error={:?}", connection_id, e); + + Err(e) => { + // error!("ws error cid={} error={:?}", connection_id, e); + } } } - } - Ok(()) + Ok(()) } }