Skip to content

Commit

Permalink
Merge pull request #2740 from lann/factors-spin-http
Browse files Browse the repository at this point in the history
factors: Implement spin outbound http
  • Loading branch information
rylev authored Aug 22, 2024
2 parents 2491098 + 07a238a commit 8971708
Show file tree
Hide file tree
Showing 17 changed files with 215 additions and 138 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/factor-outbound-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ anyhow = "1.0"
http = "1.1.0"
http-body-util = "0.1"
hyper = "1.4.1"
reqwest = { version = "0.11", features = ["gzip"] }
rustls = "0.23"
spin-factor-outbound-networking = { path = "../factor-outbound-networking" }
spin-factors = { path = "../factors" }
spin-telemetry = { path = "../telemetry" }
spin-world = { path = "../world" }
terminal = { path = "../terminal" }
tokio = { version = "1", features = ["macros", "rt"] }
Expand Down
9 changes: 9 additions & 0 deletions crates/factor-outbound-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ impl Factor for OutboundHttpFactor {
component_tls_configs,
self_request_origin: None,
request_interceptor: None,
spin_http_client: None,
})
}
}
Expand All @@ -81,6 +82,8 @@ pub struct InstanceState {
component_tls_configs: ComponentTlsConfigs,
self_request_origin: Option<SelfRequestOrigin>,
request_interceptor: Option<Box<dyn OutboundHttpInterceptor>>,
// Connection-pooling client for 'fermyon:spin/http' interface
spin_http_client: Option<reqwest::Client>,
}

impl InstanceState {
Expand Down Expand Up @@ -156,6 +159,12 @@ impl SelfRequestOrigin {
}
}

impl std::fmt::Display for SelfRequestOrigin {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}://{}", self.scheme, self.authority)
}
}

/// An outbound HTTP request interceptor to be used with
/// [`InstanceState::set_request_interceptor`].
pub trait OutboundHttpInterceptor: Send + Sync {
Expand Down
174 changes: 162 additions & 12 deletions crates/factor-outbound-http/src/spin.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,84 @@
use spin_world::{
async_trait,
v1::http,
v1::http_types::{self, HttpError, Request, Response},
v1::{
http as spin_http,
http_types::{self, HttpError, Method, Request, Response},
},
};
use tracing::{field::Empty, instrument, Level, Span};

#[async_trait]
impl http::Host for crate::InstanceState {
impl spin_http::Host for crate::InstanceState {
#[instrument(name = "spin_outbound_http.send_request", skip_all, err(level = Level::INFO),
fields(otel.kind = "client", url.full = Empty, http.request.method = Empty,
http.response.status_code = Empty, otel.name = Empty, server.address = Empty, server.port = Empty))]
async fn send_request(&mut self, req: Request) -> Result<Response, HttpError> {
// FIXME(lann): This is all just a stub to test allowed_outbound_hosts
match self.allowed_hosts.check_url(&req.uri, "https").await {
Ok(true) => (),
_ => {
let span = Span::current();
record_request_fields(&span, &req);

let uri = req.uri;
tracing::trace!("Sending outbound HTTP to {uri:?}");

let abs_url = if !uri.starts_with('/') {
// Absolute URI
let is_allowed = self
.allowed_hosts
.check_url(&uri, "https")
.await
.unwrap_or(false);
if !is_allowed {
return Err(HttpError::DestinationNotAllowed);
}
uri
} else {
// Relative URI ("self" request)
let is_allowed = self
.allowed_hosts
.check_relative_url(&["http", "https"])
.await
.unwrap_or(false);
if !is_allowed {
return Err(HttpError::DestinationNotAllowed);
}

let Some(origin) = &self.self_request_origin else {
tracing::error!(
"Couldn't handle outbound HTTP request to relative URI; no origin set"
);
return Err(HttpError::InvalidUrl);
};
format!("{origin}{uri}")
};
let req_url = reqwest::Url::parse(&abs_url).map_err(|_| HttpError::InvalidUrl)?;

if !req.params.is_empty() {
tracing::warn!("HTTP params field is deprecated");
}
Ok(Response {
status: 200,
headers: None,
body: Some(b"test response".into()),
})

// Allow reuse of Client's internal connection pool for multiple requests
// in a single component execution
let client = self.spin_http_client.get_or_insert_with(Default::default);

let mut req = {
let mut builder = client.request(reqwest_method(req.method), req_url);
for (key, val) in req.headers {
builder = builder.header(key, val);
}
builder
.body(req.body.unwrap_or_default())
.build()
.map_err(|err| {
tracing::error!("Error building outbound request: {err}");
HttpError::RuntimeError
})?
};
spin_telemetry::inject_trace_context(req.headers_mut());

let resp = client.execute(req).await.map_err(log_reqwest_error)?;

tracing::trace!("Returning response from outbound request to {abs_url}");
span.record("http.response.status_code", resp.status().as_u16());
response_from_reqwest(resp).await
}
}

Expand All @@ -27,3 +87,93 @@ impl http_types::Host for crate::InstanceState {
Ok(err)
}
}

fn record_request_fields(span: &Span, req: &Request) {
let method = match req.method {
Method::Get => "GET",
Method::Post => "POST",
Method::Put => "PUT",
Method::Delete => "DELETE",
Method::Patch => "PATCH",
Method::Head => "HEAD",
Method::Options => "OPTIONS",
};
span.record("otel.name", method)
.record("http.request.method", method)
.record("url.full", req.uri.clone());
if let Ok(uri) = req.uri.parse::<http::Uri>() {
if let Some(authority) = uri.authority() {
span.record("server.address", authority.host());
if let Some(port) = authority.port() {
span.record("server.port", port.as_u16());
}
}
}
}

fn reqwest_method(m: Method) -> reqwest::Method {
match m {
Method::Get => reqwest::Method::GET,
Method::Post => reqwest::Method::POST,
Method::Put => reqwest::Method::PUT,
Method::Delete => reqwest::Method::DELETE,
Method::Patch => reqwest::Method::PATCH,
Method::Head => reqwest::Method::HEAD,
Method::Options => reqwest::Method::OPTIONS,
}
}

fn log_reqwest_error(err: reqwest::Error) -> HttpError {
let error_desc = if err.is_timeout() {
"timeout error"
} else if err.is_connect() {
"connection error"
} else if err.is_body() || err.is_decode() {
"message body error"
} else if err.is_request() {
"request error"
} else {
"error"
};
tracing::warn!(
"Outbound HTTP {}: URL {}, error detail {:?}",
error_desc,
err.url()
.map(|u| u.to_string())
.unwrap_or_else(|| "<unknown>".to_owned()),
err
);
HttpError::RuntimeError
}

async fn response_from_reqwest(res: reqwest::Response) -> Result<Response, HttpError> {
let status = res.status().as_u16();

let headers = res
.headers()
.into_iter()
.map(|(key, val)| {
Ok((
key.to_string(),
val.to_str()
.map_err(|_| {
tracing::error!("Non-ascii response header {key} = {val:?}");
HttpError::RuntimeError
})?
.to_string(),
))
})
.collect::<Result<Vec<_>, _>>()?;

let body = res
.bytes()
.await
.map_err(|_| HttpError::RuntimeError)?
.to_vec();

Ok(Response {
status,
headers: Some(headers),
body: Some(body),
})
}
16 changes: 16 additions & 0 deletions crates/factor-outbound-http/src/wasi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,21 @@ impl<'a> WasiHttpView for WasiHttpImplInner<'a> {
mut request: Request<wasmtime_wasi_http::body::HyperOutgoingBody>,
mut config: wasmtime_wasi_http::types::OutgoingRequestConfig,
) -> wasmtime_wasi_http::HttpResult<wasmtime_wasi_http::types::HostFutureIncomingResponse> {
// wasmtime-wasi-http fills in scheme and authority for relative URLs
// (e.g. https://:443/<path>), which makes them hard to reason about.
// Undo that here.
let uri = request.uri_mut();
if uri
.authority()
.is_some_and(|authority| authority.host().is_empty())
{
let mut builder = http::uri::Builder::new();
if let Some(paq) = uri.path_and_query() {
builder = builder.path_and_query(paq.clone());
}
*uri = builder.build().unwrap();
}

if let Some(interceptor) = &self.state.request_interceptor {
match interceptor.intercept(&mut request, &mut config) {
InterceptOutcome::Continue => (),
Expand Down Expand Up @@ -149,6 +164,7 @@ async fn send_request_impl(
config.use_tls = origin.use_tls();

request.headers_mut().insert(HOST, origin.host_header());
spin_telemetry::inject_trace_context(&mut request);

let path_and_query = request.uri().path_and_query().cloned();
*request.uri_mut() = origin.into_uri(path_and_query);
Expand Down
Loading

0 comments on commit 8971708

Please sign in to comment.