Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch back to the old sync version of HostOutputStream. #2738

Merged
merged 1 commit into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/factor-wasi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ edition = { workspace = true }

[dependencies]
async-trait = "0.1"
bytes = "1.0"
cap-primitives = "3.0.0"
spin-common = { path = "../common" }
spin-factors = { path = "../factors" }
Expand Down
126 changes: 126 additions & 0 deletions crates/factor-wasi/src/io.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
use std::io::{Read, Write};
use std::sync::{Arc, Mutex};

use async_trait::async_trait;
use spin_factors::anyhow;
use wasmtime_wasi::{
HostInputStream, HostOutputStream, StdinStream, StdoutStream, StreamError, Subscribe,
};

/// A [`HostOutputStream`] that writes to a `Write` type.
///
/// `StdinStream::stream` and `StdoutStream::new` can be called more than once in components
/// which are composed of multiple subcomponents, since each subcomponent will potentially want
/// its own handle. This means the streams need to be shareable. The easiest way to do that is
/// provide cloneable implementations of streams which operate synchronously.
///
/// Note that this amounts to doing synchronous I/O in an asynchronous context, which we'd normally
/// prefer to avoid, but the properly asynchronous implementations Host{In|Out}putStream based on
/// `AsyncRead`/`AsyncWrite`` are quite hairy and probably not worth it for "normal" stdio streams in
/// Spin. If this does prove to be a performance bottleneck, though, we can certainly revisit it.
pub struct PipedWriteStream<T>(Arc<Mutex<T>>);

impl<T> PipedWriteStream<T> {
pub fn new(inner: T) -> Self {
Self(Arc::new(Mutex::new(inner)))
}
}

impl<T> Clone for PipedWriteStream<T> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}

impl<T: Write + Send + Sync + 'static> HostOutputStream for PipedWriteStream<T> {
fn write(&mut self, bytes: bytes::Bytes) -> Result<(), StreamError> {
self.0
.lock()
.unwrap()
.write_all(&bytes)
.map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))
}

fn flush(&mut self) -> Result<(), StreamError> {
self.0
.lock()
.unwrap()
.flush()
.map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))
}

fn check_write(&mut self) -> Result<usize, StreamError> {
Ok(1024 * 1024)
}
}

impl<T: Write + Send + Sync + 'static> StdoutStream for PipedWriteStream<T> {
fn stream(&self) -> Box<dyn HostOutputStream> {
Box::new(self.clone())
}

fn isatty(&self) -> bool {
false
}
}

#[async_trait]
impl<T: Write + Send + Sync + 'static> Subscribe for PipedWriteStream<T> {
async fn ready(&mut self) {}
}

/// A [`HostInputStream`] that reads to a `Read` type.
///
/// See [`PipedWriteStream`] for more information on why this is synchronous.
pub struct PipeReadStream<T> {
buffer: Vec<u8>,
inner: Arc<Mutex<T>>,
}

impl<T> PipeReadStream<T> {
pub fn new(inner: T) -> Self {
Self {
buffer: vec![0_u8; 64 * 1024],
inner: Arc::new(Mutex::new(inner)),
}
}
}

impl<T> Clone for PipeReadStream<T> {
fn clone(&self) -> Self {
Self {
buffer: vec![0_u8; 64 * 1024],
inner: self.inner.clone(),
}
}
}

impl<T: Read + Send + Sync + 'static> HostInputStream for PipeReadStream<T> {
fn read(&mut self, size: usize) -> wasmtime_wasi::StreamResult<bytes::Bytes> {
let size = size.min(self.buffer.len());

let count = self
.inner
.lock()
.unwrap()
.read(&mut self.buffer[..size])
.map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))?;

Ok(bytes::Bytes::copy_from_slice(&self.buffer[..count]))
}
}

#[async_trait]
impl<T: Read + Send + Sync + 'static> Subscribe for PipeReadStream<T> {
async fn ready(&mut self) {}
}

impl<T: Read + Send + Sync + 'static> StdinStream for PipeReadStream<T> {
fn stream(&self) -> Box<dyn HostInputStream> {
Box::new(self.clone())
}

fn isatty(&self) -> bool {
false
}
}
39 changes: 19 additions & 20 deletions crates/factor-wasi/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
mod io;
pub mod spin;
mod wasi_2023_10_18;
mod wasi_2023_11_10;

use std::{future::Future, net::SocketAddr, path::Path};
use std::{
future::Future,
io::{Read, Write},
net::SocketAddr,
path::Path,
};

use io::{PipeReadStream, PipedWriteStream};
use spin_factors::{
anyhow, AppComponent, Factor, FactorInstanceBuilder, InitContext, InstanceBuilders,
PrepareContext, RuntimeFactors, RuntimeFactorsInstanceState,
};
use tokio::io::{AsyncRead, AsyncWrite};
use wasmtime_wasi::{
pipe::{AsyncReadStream, AsyncWriteStream},
AsyncStdinStream, AsyncStdoutStream, DirPerms, FilePerms, ResourceTable, StdinStream,
StdoutStream, WasiCtx, WasiCtxBuilder, WasiImpl, WasiView,
DirPerms, FilePerms, ResourceTable, StdinStream, StdoutStream, WasiCtx, WasiCtxBuilder,
WasiImpl, WasiView,
};

pub use wasmtime_wasi::SocketAddrUse;
Expand Down Expand Up @@ -179,35 +184,29 @@ impl InstanceBuilder {
self.ctx.stdin(stdin);
}

/// Sets the WASI `stdin` descriptor to the given [`AsyncRead`]er.
pub fn stdin_pipe(&mut self, r: impl AsyncRead + Send + Unpin + 'static) {
self.stdin(AsyncStdinStream::new(AsyncReadStream::new(r)));
/// Sets the WASI `stdin` descriptor to the given [`Read`]er.
pub fn stdin_pipe(&mut self, r: impl Read + Send + Sync + Unpin + 'static) {
self.stdin(PipeReadStream::new(r));
}

/// Sets the WASI `stdout` descriptor to the given [`StdoutStream`].
pub fn stdout(&mut self, stdout: impl StdoutStream + 'static) {
self.ctx.stdout(stdout);
}

/// Sets the WASI `stdout` descriptor to the given [`AsyncWrite`]r.
pub fn stdout_pipe(&mut self, w: impl AsyncWrite + Send + Unpin + 'static) {
self.stdout(AsyncStdoutStream::new(AsyncWriteStream::new(
1024 * 1024,
w,
)));
/// Sets the WASI `stdout` descriptor to the given [`Write`]r.
pub fn stdout_pipe(&mut self, w: impl Write + Send + Sync + Unpin + 'static) {
rylev marked this conversation as resolved.
Show resolved Hide resolved
self.stdout(PipedWriteStream::new(w));
}

/// Sets the WASI `stderr` descriptor to the given [`StdoutStream`].
pub fn stderr(&mut self, stderr: impl StdoutStream + 'static) {
self.ctx.stderr(stderr);
}

/// Sets the WASI `stderr` descriptor to the given [`AsyncWrite`]r.
pub fn stderr_pipe(&mut self, w: impl AsyncWrite + Send + Unpin + 'static) {
self.stderr(AsyncStdoutStream::new(AsyncWriteStream::new(
1024 * 1024,
w,
)));
/// Sets the WASI `stderr` descriptor to the given [`Write`]r.
pub fn stderr_pipe(&mut self, w: impl Write + Send + Sync + Unpin + 'static) {
self.stderr(PipedWriteStream::new(w));
}

/// Appends the given strings to the WASI 'args'.
Expand Down
Loading