Skip to content

Commit

Permalink
fix formatting error
Browse files Browse the repository at this point in the history
  • Loading branch information
maggie98choy committed Oct 10, 2024
1 parent faafffb commit fba4657
Show file tree
Hide file tree
Showing 2 changed files with 192 additions and 139 deletions.
101 changes: 60 additions & 41 deletions core/main/src/broker/http_broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,52 +134,71 @@ impl EndpointBroker for HttpBroker {
let broker = BrokerSender { sender: tx };
let client = Client::new();
// let _ = endpoint.get_url().parse().map_err(|e| error!("broker url {:?} in endpoint is invalid, cannot start http broker. error={}",endpoint,e) ).map(|uri| tokio::spawn(async move {
let url = endpoint.get_url();
tokio::spawn(async move {
while let Some(request) = tr.recv().await {
debug!("http broker received request={:?}", request);
match send_http_request(&client, Method::GET, url.clone(), &request.clone().rule.alias)
.await
{
Ok(response) => {
// let (parts, body) = response.into_parts();

// let body = body_to_bytes(body).await;
let parts = response.status();
let body = response.bytes().await.unwrap().to_vec();

let mut request = request;
if let Ok(json_str) = serde_json::from_slice::<serde_json::Value>(&body).map(|v| vec![v])
.and_then(|v| serde_json::to_string(&v))
{
request.rpc.params_json = json_str;
let response = Self::update_request(&request);
trace!(
"http broker response={:?} to request: {:?} using rule={:?}",
response, request, request.rule
let url = endpoint.get_url();
tokio::spawn(async move {
while let Some(request) = tr.recv().await {
debug!("http broker received request={:?}", request);
match send_http_request(
&client,
Method::GET,
url.clone(),
&request.clone().rule.alias,
)
.await
{
Ok(response) => {
// let (parts, body) = response.into_parts();

// let body = body_to_bytes(body).await;
let parts = response.status();
let body = response.bytes().await.unwrap().to_vec();

let mut request = request;
if let Ok(json_str) = serde_json::from_slice::<serde_json::Value>(&body)
.map(|v| vec![v])
.and_then(|v| serde_json::to_string(&v))
{
request.rpc.params_json = json_str;
let response = Self::update_request(&request);
trace!(
"http broker response={:?} to request: {:?} using rule={:?}",
response,
request,
request.rule
);

send_broker_response(&callback, &request, &body).await;
if !parts.is_success() {
error!(
"http error {} returned from http service in http broker {:?}",
parts.as_str(),
body
);

send_broker_response(&callback, &request, &body).await;
if !parts.is_success() {
error!(
"http error {} returned from http service in http broker {:?}",
parts.as_str(), body
);
}
} else {
let msg = format!("Error in http broker parsing response from http service at {}. status={:?}",url, parts);
error!("{}",msg);
send_broker_response(&callback, &request, error_string_to_json(msg.as_str()).to_string().as_bytes()).await;
}
} else {
let msg = format!("Error in http broker parsing response from http service at {}. status={:?}",url, parts);
error!("{}", msg);
send_broker_response(
&callback,
&request,
error_string_to_json(msg.as_str()).to_string().as_bytes(),
)
.await;
}
Err(err) => {
let msg = format!("An error message from calling the downstream http service={} in http broker {:?}", url, err);
error!("{}",msg);
send_broker_response(&callback, &request, error_string_to_json(msg.as_str()).to_string().as_bytes()).await;
}
}
Err(err) => {
let msg = format!("An error message from calling the downstream http service={} in http broker {:?}", url, err);
error!("{}", msg);
send_broker_response(
&callback,
&request,
error_string_to_json(msg.as_str()).to_string().as_bytes(),
)
.await;
}
}
});
}
});

Self {
sender: broker,
Expand Down
Loading

0 comments on commit fba4657

Please sign in to comment.