diff --git a/.github/assets/logo_v0.svg b/.github/assets/logo_v0.svg new file mode 100644 index 0000000..4af091e --- /dev/null +++ b/.github/assets/logo_v0.svg @@ -0,0 +1,14 @@ + + + + + + + + + + + + + + diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 02941fd..176ee38 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -1,6 +1,3 @@ -# This workflow will build a golang project -# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go - name: "Go" on: @@ -12,11 +9,12 @@ on: jobs: build: strategy: - fail-fast: false + fail-fast: true matrix: # os: [ "ubuntu-latest", "windows-latest", "macos-latest" ] # Windows is not supported until net library implements Fd() for Windows os: [ "ubuntu-latest", "macos-latest" ] go: [ "1.20.x", "1.21.x" ] + name: "build and test" runs-on: ${{ matrix.os }} steps: - uses: actions/checkout@v3 @@ -28,3 +26,40 @@ jobs: run: go build -v ./... - name: Test run: go test -failfast ./... + + golangci-lint: + strategy: + matrix: + go: [ "1.20.x", "1.21.x" ] + os: [ "ubuntu-latest", "macos-latest" ] + name: lint + runs-on: ${{ matrix.os }} + steps: + - uses: actions/checkout@v3 + - uses: actions/setup-go@v4 + with: + go-version: ${{ matrix.go }} + cache: false + - name: golangci-lint + uses: golangci/golangci-lint-action@v3 + with: + # Require: The version of golangci-lint to use. + # When `install-mode` is `binary` (default) the value can be v1.2 or v1.2.3 or `latest` to use the latest version. + # When `install-mode` is `goinstall` the value can be v1.2.3, `latest`, or the hash of a commit. + version: v1.54 + args: -v --disable structcheck,govet --timeout 5m + + # Optional: working directory, useful for monorepos + # working-directory: somedir + + # Optional: golangci-lint command line arguments. + # + # Note: by default the `.golangci.yml` file should be at the root of the repository. + # The location of the configuration file can be changed by using `--config=` + # args: --timeout=30m --config=/my/path/.golangci.yml --issues-exit-code=0 + + # Optional: show only new issues if it's a pull request. The default value is `false`. + # only-new-issues: true + + # Optional:The mode to install golangci-lint. It can be 'binary' or 'goinstall'. + # install-mode: "goinstall" diff --git a/.gitignore b/.gitignore index 35a80ed..a11bec3 100644 --- a/.gitignore +++ b/.gitignore @@ -35,4 +35,7 @@ Cargo.lock # vendor/ # Go workspace file -go.work \ No newline at end of file +go.work + +# Benchmark file +bench.txt \ No newline at end of file diff --git a/README.md b/README.md index 6f31c75..7d4fd6a 100644 --- a/README.md +++ b/README.md @@ -1,25 +1,110 @@ -# W.A.T.E.R.: WebAssembly Transport Executable Reactor -[![License](https://img.shields.io/badge/License-Apache_2.0-yellowgreen.svg)](https://opensource.org/licenses/Apache-2.0) [![Build Status](https://github.com/gaukas/water/actions/workflows/go.yml/badge.svg?branch=master)](https://github.com/gaukas/water/actions/workflows/go.yml) +# W.A.T.E.R.: WebAssembly Transport Executables Runtime +[![License](https://img.shields.io/badge/License-Apache_2.0-yellowgreen.svg)](https://opensource.org/licenses/Apache-2.0) +[![FOSSA Status](https://app.fossa.com/api/projects/git%2Bgithub.com%2Fgaukas%2Fwater.svg?type=shield&issueType=license)](https://app.fossa.com/projects/git%2Bgithub.com%2Fgaukas%2Fwater?ref=badge_shield&issueType=license) +[![Build Status](https://github.com/gaukas/water/actions/workflows/go.yml/badge.svg?branch=master)](https://github.com/gaukas/water/actions/workflows/go.yml) +[![Go Reference](https://pkg.go.dev/badge/github.com/gaukas/water.svg)](https://pkg.go.dev/github.com/gaukas/water) +[![DeepSource](https://app.deepsource.com/gh/gaukas/water.svg/?label=resolved+issues&show_trend=true&token=SonUOOtyjJHnPuIdEBGZp4zx)](https://app.deepsource.com/gh/gaukas/water/) -W.A.T.E.R. provides a runtime environment for WebAssembly modules to run in and work as a application-layer transport protocol. It is designed to be highly portable and lightweight, and can be used as a replacement for pluggable transports. +
+
+ WATER-go provides a golang runtime for WebAssembly Transport Modules(WATM) as a pluggable + application-layer transport protocol provider. It is designed to be highly portable and + lightweight, allowing for rapidly deployable pluggable transports. While other pluggable + transport implementations require a fresh client deployment (and app-store review) to update + their protocol WATER allows dynamic delivery of new transports in real time + over the network.
+
+
+
+ WATER wasm transport +
+
-## API +Information about writing, buiding, and sharing WebAssembly Transport Modules can be found in the +[water-rs](https://github.com/erikziyunchi/water-rs/tree/main/crates/wasm) library. -Currently, W.A.T.E.R. provides a set of APIs based on **WASI Preview 1 (wasip1)** snapshot. +# Usage -### Config -A `Config` is a struct that contains the configuration for a WASI instance. It is used to configure the WASI reactor before starting it. + +Based on **WASI Preview 1 (wasip1)** snapshot, currently W.A.T.E.R. provides a set of +`net`-like APIs, including `Dialer`, `Listener` and `Relay`. -### Dialer +### Dialer -A `Dialer` could be used to dial a remote address upon `Dial()` and return a `net.Conn` back to the caller once the connection is established. Caller could use the `net.Conn` to read and write data to the remote address and the data will be processed by a WebAssembly instance. +A `Dialer` connects to a remote address and returns a `net.Conn` to the caller if the connection can +be successfully established. The `net.Conn` then provides tunnelled read and write to the remote +endpoint with the WebAssembly module wrapping / encrypting / transforming the traffic. + +`Dialer` is used to encapsulate the caller's connection into an outbound, transport-wrapped +connection. + +```go + wasm, err := os.ReadFile("./examples/v0plus/plain/plain.wasm") + + config := &water.Config{ + TMBin: wasm, + } + + dialer, err := water.NewDialer(config) + conn, err := dialer.Dial("tcp", remoteAddr) + // ... +``` ### Listener -A `Listener` could be used to listen on a local address. Upon `Accept()`, it returns a `net.Conn` back once an incoming connection is accepted from the wrapped listener. Caller could use the `net.Conn` to read and write data to the remote address and the data will be processed by a WebAssembly instance. +A `Listener` listens on a local address for incoming connections which it `Accept()`s, returning +a `net.Conn` for the caller to handle. The WebAssembly Module is responsible for the initial +accpt allowing it to implement both the server side handshake as well as any unwrap / decrypt +/reform operations required to validate and re-assemble the stream. The `net.Conn` returned provides +the normalized stream, and allows the caller to write back to the client with the WebAssembly module +encoding / encrypting / transforming traffic to be obfuscated on the wire on the way to the remote +client. + + +`Listener` is used to decapsulate incoming transport-wrapped connections for the caller to handle, +managing the tunnel obfuscation once a connection is established. + +```go + wasm, err := os.ReadFile("./examples/v0plus/plain/plain.wasm") + + config := &water.Config{ + TMBin: wasm, + } + + lis, err := config.Listen("tcp", localAddr) + defer lis.Close() + log.Infof("Listening on %s", lis.Addr().String()) + + clientCntr := 0 + for { + conn, err := lis.Accept() + // ... + } +``` + +### Relay + +A `Relay` combines the role of `Dialer` and `Listener`. It listens on a local address `Accept()`-ing +incoming connections and `Dial()`-ing the remote endpoints on establishment. The connecions are +tunneled through the WebAssembly Transport Module allowing the module to handshake, encode, +transform, pad, etc. without any caller interaction. The internal `Relay` manages the incoming +connections as well as the associated outgoing connectons. + +`Relay` is a managed many-to-many handler for incoming connections that uses the WebAssemble module +to tunnel traffic. + +```go + wasm, err := os.ReadFile("./examples/v0plus/plain/plain.wasm") + + config := &water.Config{ + TMBin: wasm, + } + + relay, err := water.NewRelay(config) -### Server + err = relay.ListenAndRelayTo("tcp", localAddr, "tcp", remoteAddr) +``` -A `Server` somewhat combines the role of `Dialer` and `Listener`. It could be used to listen on a local address and dial a remote address and automatically `Accept()` the incoming connections, feed them into the WebAssembly instance and `Dial()` the pre-defined remote address. Without any caller interaction, the `Server` will automatically* handle the data transmission between the two ends. +## Example -***TODO: Server could not be realistic until WASI multi-threading or blocking mainloop is supported** \ No newline at end of file +See [examples](./examples) for example usecase of W.A.T.E.R. API, including `Dialer`, `Listener` and `Relay`. \ No newline at end of file diff --git a/config.go b/config.go index 94be183..9f4dfa7 100644 --- a/config.go +++ b/config.go @@ -2,45 +2,34 @@ package water import ( "net" - "os" - "github.com/gaukas/water/internal/log" "github.com/gaukas/water/internal/wasm" ) +// Config defines the configuration for the WATER Dialer/Config interface. type Config struct { - // WATMBin contains the binary format of the WebAssembly Transport Module. + // TMBin contains the binary format of the WebAssembly Transport Module. // In a typical use case, this mandatory field is populated by loading // from a .wasm file, downloaded from a remote target, or generated from // a .wat (WebAssembly Text Format) file. - WATMBin []byte + TMBin []byte - // DialerFunc specifies a func that dials the specified address on the + // NetworkDialerFunc specifies a func that dials the specified address on the // named network. This optional field can be set to override the Go // default dialer func: // net.Dial(network, address) - DialerFunc func(network, address string) (net.Conn, error) + NetworkDialerFunc func(network, address string) (net.Conn, error) // NetworkListener specifies a net.listener implementation that listens // on the specified address on the named network. This optional field // will be used to provide (incoming) network connections from a // presumably remote source to the WASM instance. Required by - // ListenConfig(). + // water.WrapListener(). NetworkListener net.Listener - // Feature specifies a series of experimental features for the WASM - // runtime. - // - // Each feature flag is bit-masked and version-dependent, and flags - // are independent of each other. This means that a particular - // feature flag may be supported in one version of the runtime but - // not in another. If a feature flag is not supported or not recognized - // by the runtime, it will be silently ignored. - Feature Feature - - // WATMConfig optionally provides a configuration file to be pushed into + // TMConfig optionally provides a configuration file to be pushed into // the WASM Transport Module. - WATMConfig WATMConfig + TMConfig TMConfig // wasiConfigFactory is used to replicate the WASI config for each WASM // instance created. This field is for advanced use cases and/or debugging @@ -52,32 +41,36 @@ type Config struct { wasiConfigFactory *wasm.WASIConfigFactory } +// Clone creates a deep copy of the Config. func (c *Config) Clone() *Config { if c == nil { return nil } - wasmClone := make([]byte, len(c.WATMBin)) - copy(wasmClone, c.WATMBin) + wasmClone := make([]byte, len(c.TMBin)) + copy(wasmClone, c.TMBin) return &Config{ - WATMBin: c.WATMBin, - DialerFunc: c.DialerFunc, + TMBin: c.TMBin, + NetworkDialerFunc: c.NetworkDialerFunc, NetworkListener: c.NetworkListener, - Feature: c.Feature, - WATMConfig: c.WATMConfig, + TMConfig: c.TMConfig, wasiConfigFactory: c.wasiConfigFactory.Clone(), } } -func (c *Config) DialerFuncOrDefault() func(network, address string) (net.Conn, error) { - if c.DialerFunc == nil { +// NetworkDialerFuncOrDefault returns the DialerFunc if it is not nil, otherwise +// returns the default net.Dial function. +func (c *Config) NetworkDialerFuncOrDefault() func(network, address string) (net.Conn, error) { + if c.NetworkDialerFunc == nil { return net.Dial } - return c.DialerFunc + return c.NetworkDialerFunc } +// NetworkListenerOrDefault returns the NetworkListener if it is not nil, +// otherwise it panics. func (c *Config) NetworkListenerOrPanic() net.Listener { if c.NetworkListener == nil { panic("water: network listener is not provided in config") @@ -86,14 +79,17 @@ func (c *Config) NetworkListenerOrPanic() net.Listener { return c.NetworkListener } +// WATMBinOrDefault returns the WATMBin if it is not nil, otherwise it panics. func (c *Config) WATMBinOrPanic() []byte { - if len(c.WATMBin) == 0 { + if len(c.TMBin) == 0 { panic("water: WebAssembly Transport Module binary is not provided in config") } - return c.WATMBin + return c.TMBin } +// WASIConfig returns the WASIConfigFactory. If the pointer is +// nil, a new WASIConfigFactory will be created and returned. func (c *Config) WASIConfig() *wasm.WASIConfigFactory { if c.wasiConfigFactory == nil { c.wasiConfigFactory = wasm.NewWasiConfigFactory() @@ -102,23 +98,14 @@ func (c *Config) WASIConfig() *wasm.WASIConfigFactory { return c.wasiConfigFactory } -// WATMConfig defines the configuration file used by the WebAssembly Transport Module. -type WATMConfig struct { - FilePath string // Path to the config file. -} - -// File opens the config file and returns the file descriptor. -func (c *WATMConfig) File() *os.File { - if c.FilePath == "" { - log.Errorf("water: WASM config file path is not provided in config") - return nil - } - - f, err := os.Open(c.FilePath) +func (c *Config) Listen(network, address string) (net.Listener, error) { + lis, err := net.Listen(network, address) if err != nil { - log.Errorf("water: failed to open WATM config file: %v", err) - return nil + return nil, err } - return f + config := c.Clone() + config.NetworkListener = lis + + return NewListener(config) } diff --git a/config_test.go b/config_test.go new file mode 100644 index 0000000..074b936 --- /dev/null +++ b/config_test.go @@ -0,0 +1,32 @@ +package water_test + +import ( + "crypto/rand" + "net" + "reflect" + "testing" + + "github.com/gaukas/water" +) + +func TestConfigClone(t *testing.T) { + c := &water.Config{ + TMBin: make([]byte, 256), + NetworkDialerFunc: nil, // functions aren't deeply equal unless nil + NetworkListener: &net.TCPListener{}, + TMConfig: water.TMConfig{ + FilePath: "/tmp/watm.toml", + }, + } + + _, err := rand.Read(c.TMBin) + if err != nil { + t.Fatalf("rand.Read error: %v", err) + } + + ccloned := c.Clone() + + if !reflect.DeepEqual(c, ccloned) { + t.Errorf("Clone() = %v, want %v", ccloned, c) + } +} diff --git a/conn_generic.go b/conn.go similarity index 65% rename from conn_generic.go rename to conn.go index d25c575..181517b 100644 --- a/conn_generic.go +++ b/conn.go @@ -6,13 +6,8 @@ import ( "time" ) -var ( - mapCoreDialContext = make(map[string]func(core *core, network, address string) (Conn, error)) - mapCoreAccept = make(map[string]func(*core) (Conn, error)) -) - -// Conn is an abstracted connection interface which encapsulates -// a WASM runtime core. +// Conn is an abstracted connection interface which is expected +// to encapsulate a Core. type Conn interface { net.Conn @@ -23,22 +18,6 @@ type Conn interface { mustEmbedUnimplementedConn() } -func RegisterDial(version string, dialContext func(core *core, network, address string) (Conn, error)) error { - if _, ok := mapCoreDialContext[version]; ok { - return fmt.Errorf("water: core dial context already registered for version %s", version) - } - mapCoreDialContext[version] = dialContext - return nil -} - -func RegisterAccept(version string, accept func(*core) (Conn, error)) error { - if _, ok := mapCoreAccept[version]; ok { - return fmt.Errorf("water: core accept already registered for version %s", version) - } - mapCoreAccept[version] = accept - return nil -} - // UnimplementedConn is used to provide forward compatibility for // implementations of Conn, such that if new methods are added // to the interface, old implementations will not be required to implement diff --git a/conn_v0.go b/conn_v0.go deleted file mode 100644 index 6f9430c..0000000 --- a/conn_v0.go +++ /dev/null @@ -1,248 +0,0 @@ -//go:build !nov0 - -package water - -import ( - "errors" - "fmt" - "io" - "net" - "time" - - "github.com/gaukas/water/internal/log" - "github.com/gaukas/water/internal/socket" - v0 "github.com/gaukas/water/internal/v0" - "github.com/gaukas/water/internal/wasm" -) - -func init() { - RegisterDial("_v0", DialV0) - RegisterAccept("_v0", AcceptV0) -} - -// ConnV0 is the first version of RuntimeConn. -type ConnV0 struct { - networkConn net.Conn // network-facing net.Conn, data written to this connection will be sent on the wire - uoConn net.Conn // user-oriented net.Conn, user Read()/Write() to this connection - - wasm *WASMv0 - - UnimplementedConn // embedded to ensure forward compatibility -} - -// DialV0 dials the network address using through the WASM module -// while using the dialerFunc specified in core.config. -func DialV0(core *core, network, address string) (c Conn, err error) { - wasm := NewWASMv0(core) - conn := &ConnV0{ - wasm: wasm, - } - - dialer := v0.MakeWASIDialer(network, address, core.Config().DialerFuncOrDefault()) - - if err = conn.wasm.LinkNetworkInterface(dialer, nil); err != nil { - return nil, err - } - - if err = conn.wasm.Initialize(); err != nil { - return nil, err - } - - if conn.wasm._dial == nil { - return nil, fmt.Errorf("water: WASM module does not export _dial") - } - - // Initialize WASM module as ReadWriter - if err = conn.wasm.InitializeReadWriter(); err != nil { - return nil, err - } - - var wasmCallerConn net.Conn - wasmCallerConn, conn.uoConn, err = socket.UnixConnPair("") - if err != nil { - if wasmCallerConn == nil || conn.uoConn == nil { - return nil, fmt.Errorf("water: socket.UnixConnPair returned error: %w", err) - } else { // likely due to Close() call errored - log.Errorf("water: socket.UnixConnPair returned error: %v", err) - } - } - - wasmNetworkConn, err := conn.wasm.DialFrom(wasmCallerConn) - if err != nil { - return nil, err - } - - conn.networkConn = wasmNetworkConn - - return conn, nil -} - -// AcceptV0 accepts the network connection using through the WASM module -// while using the net.Listener specified in core.config. -func AcceptV0(core *core) (c Conn, err error) { - wasm := NewWASMv0(core) - conn := &ConnV0{ - wasm: wasm, - } - - listener := v0.MakeWASIListener(core.Config().NetworkListenerOrPanic()) - - if err = conn.wasm.LinkNetworkInterface(nil, listener); err != nil { - return nil, err - } - - if err = conn.wasm.Initialize(); err != nil { - return nil, err - } - - if conn.wasm._accept == nil { - return nil, fmt.Errorf("water: WASM module does not export _accept") - } - - // Initialize WASM module as ReadWriter - if err = conn.wasm.InitializeReadWriter(); err != nil { - return nil, err - } - - var wasmCallerConn net.Conn - wasmCallerConn, conn.uoConn, err = socket.UnixConnPair("") - if err != nil { - if wasmCallerConn == nil || conn.uoConn == nil { - return nil, fmt.Errorf("water: socket.UnixConnPair returned error: %w", err) - } else { // likely due to Close() call errored - log.Errorf("water: socket.UnixConnPair returned error: %v", err) - } - } - - wasmNetworkConn, err := conn.wasm.AcceptFor(wasmCallerConn) - if err != nil { - return nil, err - } - - conn.networkConn = wasmNetworkConn - - return conn, nil -} - -// Read implements the net.Conn interface. -// -// It calls to the underlying user-oriented net.Conn's Read() method. -func (c *ConnV0) Read(b []byte) (n int, err error) { - if c.uoConn == nil { - return 0, errors.New("water: cannot read, (*RuntimeConnV0).uoConn is nil") - } - - // call _read - ret, err := c.wasm._read.Call(c.wasm.Store()) - if err != nil { - return 0, fmt.Errorf("water: (*wasmtime.Func).Call returned error: %w", err) - } - - if ret32, ok := ret.(int32); !ok { - return 0, fmt.Errorf("water: (*wasmtime.Func).Call returned non-int32 value") - } else { - if ret32 != 0 { - return 0, wasm.WASMErr(ret32) - } - } - - return c.uoConn.Read(b) -} - -// Write implements the net.Conn interface. -// -// It calls to the underlying user-oriented net.Conn's Write() method. -func (c *ConnV0) Write(b []byte) (n int, err error) { - if c.uoConn == nil { - return 0, errors.New("water: cannot write, (*RuntimeConnV0).uoConn is nil") - } - - n, err = c.uoConn.Write(b) - if err != nil { - return n, fmt.Errorf("uoConn.Write: %w", err) - } - - if n < len(b) { - return n, io.ErrShortWrite - } - - if n > len(b) { - return n, errors.New("invalid write result") // io.errInvalidWrite - } - - // call _write to notify WASM - ret, err := c.wasm._write.Call(c.wasm.Store()) - if err != nil { - return 0, fmt.Errorf("water: (*wasmtime.Func).Call returned error: %w", err) - } - - if ret32, ok := ret.(int32); !ok { - return 0, fmt.Errorf("water: (*wasmtime.Func).Call returned non-int32 value") - } else { - return n, wasm.WASMErr(ret32) - } -} - -// Close implements the net.Conn interface. -// -// It will close both the network connection AND the WASM module, then -// the user-facing net.Conn will be closed. -func (c *ConnV0) Close() error { - err := c.networkConn.Close() - if err != nil { - return fmt.Errorf("water: (*RuntimeConnV0).netConn.Close returned error: %w", err) - } - - _, err = c.wasm._close.Call(c.wasm.Store()) - if err != nil { - return fmt.Errorf("water: (*RuntimeConnV0)._close.Call returned error: %w", err) - } - - c.wasm.DeferAll() - c.wasm.Cleanup() - - if c.uoConn != nil { - c.uoConn.Close() - } - - return nil -} - -// LocalAddr implements the net.Conn interface. -// -// It calls to the underlying network connection's LocalAddr() method. -func (c *ConnV0) LocalAddr() net.Addr { - return c.networkConn.LocalAddr() -} - -// RemoteAddr implements the net.Conn interface. -// -// It calls to the underlying network connection's RemoteAddr() method. -func (c *ConnV0) RemoteAddr() net.Addr { - return c.networkConn.RemoteAddr() -} - -// SetDeadline implements the net.Conn interface. -// -// It calls to the underlying user-oriented connection's SetDeadline() method. -func (c *ConnV0) SetDeadline(t time.Time) error { - return c.uoConn.SetDeadline(t) -} - -// SetReadDeadline implements the net.Conn interface. -// -// It calls to the underlying user-oriented connection's SetReadDeadline() method. -// -// Note: in practice this method should actively be used by the caller. Otherwise -// it is possible for a silently failed network connection to cause the WASM module -// to hang forever on Read(). -func (c *ConnV0) SetReadDeadline(t time.Time) error { - return c.uoConn.SetReadDeadline(t) -} - -// SetWriteDeadline implements the net.Conn interface. -// -// It calls to the underlying user-oriented connection's SetWriteDeadline() method. -func (c *ConnV0) SetWriteDeadline(t time.Time) error { - return c.uoConn.SetWriteDeadline(t) -} diff --git a/conn_v0_test.go b/conn_v0_test.go deleted file mode 100644 index 1e7251f..0000000 --- a/conn_v0_test.go +++ /dev/null @@ -1,483 +0,0 @@ -//go:build unix && !windows && !nov0 - -package water_test - -import ( - "crypto/rand" - "encoding/hex" - "fmt" - "net" - "os" - "runtime" - "sync" - "testing" - "time" - - "github.com/gaukas/water" -) - -var ( - hexencoder_v0 []byte - plain_v0 []byte -) - -func TestConnV0(t *testing.T) { - // read file into hexencoder_v0 - var err error - hexencoder_v0, err = os.ReadFile("./testdata/hexencoder_v0.wasm") - if err != nil { - t.Fatal(err) - } - t.Run("DialerV0", testDialerV0) - t.Run("ListenerV0", testListenerV0) -} - -func testDialerV0(t *testing.T) { - // t.Parallel() - - // create random TCP listener listening on localhost - tcpLis, err := net.ListenTCP("tcp", nil) - if err != nil { - t.Fatal(err) - } - defer tcpLis.Close() - - // goroutine to accept incoming connections - var lisConn net.Conn - var goroutineErr error - var wg *sync.WaitGroup = new(sync.WaitGroup) - wg.Add(1) - go func() { - defer wg.Done() - lisConn, goroutineErr = tcpLis.Accept() - }() - - // Dial - dialer := &water.Dialer{ - Config: &water.Config{ - WATMBin: hexencoder_v0, - WATMConfig: water.WATMConfig{ - FilePath: "./testdata/hexencoder_v0.dialer.json", - }, - }, - } - dialer.Config.WASIConfig().InheritStdout() - - rConn, err := dialer.Dial("tcp", tcpLis.Addr().String()) - if err != nil { - t.Fatal(err) - } - - // wait for listener to accept connection - wg.Wait() - if goroutineErr != nil { - t.Fatal(goroutineErr) - } - - runtime.GC() - time.Sleep(10 * time.Millisecond) - - if err = testUppercaseHexencoderConn(rConn, lisConn, []byte("hello"), []byte("world")); err != nil { - t.Error(err) - } - - runtime.GC() - time.Sleep(10 * time.Millisecond) - - if err = testUppercaseHexencoderConn(rConn, lisConn, []byte("i'm dialer"), []byte("hello dialer")); err != nil { - t.Error(err) - } - - runtime.GC() - time.Sleep(10 * time.Millisecond) - - if err = testUppercaseHexencoderConn(rConn, lisConn, []byte("who are you?"), []byte("I'm listener")); err != nil { - t.Error(err) - } - - if err = rConn.Close(); err != nil { - t.Error(err) - } -} - -func testListenerV0(t *testing.T) { - // t.Parallel() - - // prepare for listener - config := &water.Config{ - WATMBin: hexencoder_v0, - WATMConfig: water.WATMConfig{ - FilePath: "./testdata/hexencoder_v0.listener.json", - }, - // WASIConfigFactory: wasm.NewWasiConfigFactory(), - } - config.WASIConfig().InheritStdout() - - lis, err := config.Listen("tcp", "localhost:0") - if err != nil { - t.Fatal(err) - } - - // goroutine to dial listener - var dialConn net.Conn - var goroutineErr error - var wg *sync.WaitGroup = new(sync.WaitGroup) - wg.Add(1) - go func() { - defer wg.Done() - dialConn, goroutineErr = net.Dial("tcp", lis.Addr().String()) - }() - - // Accept - rConn, err := lis.Accept() - if err != nil { - t.Fatal(err) - } - - // wait for dialer to dial - wg.Wait() - if goroutineErr != nil { - t.Fatal(goroutineErr) - } - - runtime.GC() - time.Sleep(100 * time.Millisecond) - - if err = testLowercaseHexencoderConn(rConn, dialConn, []byte("hello"), []byte("world")); err != nil { - t.Error(err) - } - - runtime.GC() - time.Sleep(100 * time.Millisecond) - - if err = testLowercaseHexencoderConn(rConn, dialConn, []byte("i'm listener"), []byte("hello listener")); err != nil { - t.Error(err) - } - - runtime.GC() - time.Sleep(100 * time.Millisecond) - - if err = testLowercaseHexencoderConn(rConn, dialConn, []byte("who are you?"), []byte("I'm dialer")); err != nil { - t.Error(err) - } - - if err = rConn.Close(); err != nil { - t.Error(err) - } -} - -func testUppercaseHexencoderConn(encoderConn, plainConn net.Conn, dMsg, lMsg []byte) error { - // dConn -> lConn - _, err := encoderConn.Write(dMsg) - if err != nil { - return err - } - - // receive data - buf := make([]byte, 1024) - n, err := plainConn.Read(buf) - if err != nil { - return err - } - - // decode hex - var decoded []byte = make([]byte, 1024) - n, err = hex.Decode(decoded, buf[:n]) - if err != nil { - return err - } - - // compare received bytes with expected bytes - if string(decoded[:n]) != string(dMsg) { - return fmt.Errorf("expected: %s, got: %s", dMsg, decoded[:n]) - } - - // encode hex - var encoded []byte = make([]byte, 1024) - n = hex.Encode(encoded, lMsg) - - // lConn -> dConn - _, err = plainConn.Write(encoded[:n]) - if err != nil { - return err - } - - // receive data - n, err = encoderConn.Read(buf) - if err != nil { - return err - } - - // compare received bytes with expected bytes - var upperLMsg []byte = make([]byte, len(lMsg)) - for i, b := range lMsg { - if b >= 'a' && b <= 'z' { // to uppercase - upperLMsg[i] = b - 32 - } else { - upperLMsg[i] = b - } - } - - if string(buf[:n]) != string(upperLMsg) { - return fmt.Errorf("expected: %s, got: %s", upperLMsg, decoded[:n]) - } - - return nil -} - -func testLowercaseHexencoderConn(encoderConn, plainConn net.Conn, dMsg, lMsg []byte) error { - // dConn -> lConn - _, err := encoderConn.Write(dMsg) - if err != nil { - return err - } - - // receive data - buf := make([]byte, 1024) - n, err := plainConn.Read(buf) - if err != nil { - return err - } - - // decode hex - var decoded []byte = make([]byte, 1024) - n, err = hex.Decode(decoded, buf[:n]) - if err != nil { - return err - } - - // compare received bytes with expected bytes - if string(decoded[:n]) != string(dMsg) { - return fmt.Errorf("expected: %s, got: %s", dMsg, decoded[:n]) - } - - // encode hex - var encoded []byte = make([]byte, 1024) - n = hex.Encode(encoded, lMsg) - - // lConn -> dConn - _, err = plainConn.Write(encoded[:n]) - if err != nil { - return err - } - - // receive data - n, err = encoderConn.Read(buf) - if err != nil { - return err - } - - // compare received bytes with expected bytes - var upperLMsg []byte = make([]byte, len(lMsg)) - for i, b := range lMsg { - if b >= 'A' && b <= 'Z' { // to lowercase - upperLMsg[i] = b + 32 - } else { - upperLMsg[i] = b - } - } - - if string(buf[:n]) != string(upperLMsg) { - return fmt.Errorf("expected: %s, got: %s", upperLMsg, decoded[:n]) - } - - return nil -} - -func BenchmarkConnV0(b *testing.B) { - // read file into plain_v0 - var err error - plain_v0, err = os.ReadFile("./testdata/plain_v0.wasm") - if err != nil { - b.Fatal(err) - } - b.Run("PlainV0-Dialer", benchmarkPlainV0Dialer) - b.Run("PlainV0-Listener", benchmarkPlainV0Listener) - b.Run("RefTCP", benchmarkReferenceTCP) -} - -func benchmarkPlainV0Dialer(b *testing.B) { - // create random TCP listener listening on localhost - tcpLis, err := net.ListenTCP("tcp", nil) - if err != nil { - b.Fatal(err) - } - defer tcpLis.Close() - - // goroutine to accept incoming connections - var lisConn net.Conn - var goroutineErr error - var wg *sync.WaitGroup = new(sync.WaitGroup) - wg.Add(1) - go func() { - defer wg.Done() - lisConn, goroutineErr = tcpLis.Accept() - }() - - // Dial - dialer := &water.Dialer{ - Config: &water.Config{ - WATMBin: plain_v0, - }, - } - - rConn, err := dialer.Dial("tcp", tcpLis.Addr().String()) - if err != nil { - b.Fatal(err) - } - defer rConn.Close() - - // wait for listener to accept connection - wg.Wait() - if goroutineErr != nil { - b.Fatal(goroutineErr) - } - - var sendMsg []byte = make([]byte, 1024) - rand.Read(sendMsg) - - runtime.GC() - time.Sleep(10 * time.Millisecond) - - b.SetBytes(1024) - b.ResetTimer() - start := time.Now() - for i := 0; i < b.N; i++ { - _, err = rConn.Write(sendMsg) - if err != nil { - b.Logf("Write error, cntr: %d, N: %d", i, b.N) - b.Fatal(err) - } - - buf := make([]byte, 1024+128) - _, err = lisConn.Read(buf) - if err != nil { - b.Logf("Read error, cntr: %d, N: %d", i, b.N) - b.Fatal(err) - } - } - b.StopTimer() - b.Logf("avg bandwidth: %f MB/s (N=%d)", float64(b.N*1024)/time.Since(start).Seconds()/1024/1024, b.N) -} - -func benchmarkPlainV0Listener(b *testing.B) { - // prepare for listener - config := &water.Config{ - WATMBin: plain_v0, - } - - lis, err := config.Listen("tcp", "localhost:0") - if err != nil { - b.Fatal(err) - } - - // goroutine to dial listener - var dialConn net.Conn - var goroutineErr error - var wg *sync.WaitGroup = new(sync.WaitGroup) - wg.Add(1) - go func() { - defer wg.Done() - dialConn, goroutineErr = net.Dial("tcp", lis.Addr().String()) - }() - - // Accept - rConn, err := lis.Accept() - if err != nil { - b.Fatal(err) - } - - // wait for dialer to dial - wg.Wait() - if goroutineErr != nil { - b.Fatal(goroutineErr) - } - - var sendMsg []byte = make([]byte, 512) - rand.Read(sendMsg) - - b.SetBytes(1024) // we will send 512-byte data and 128-byte will be transmitted on wire due to hex encoding - b.ResetTimer() - start := time.Now() - for i := 0; i < b.N; i++ { - _, err = rConn.Write(sendMsg) - if err != nil { - b.Logf("Write error, cntr: %d, N: %d", i, b.N) - b.Fatal(err) - } - - // receive data - buf := make([]byte, 1024) - _, err = dialConn.Read(buf) - if err != nil { - b.Logf("Read error, cntr: %d, N: %d", i, b.N) - b.Fatal(err) - } - } - b.StopTimer() - b.Logf("avg bandwidth: %f MB/s (N=%d)", float64(b.N*1024)/time.Since(start).Seconds()/1024/1024, b.N) - - if err = rConn.Close(); err != nil { - b.Fatal(err) - } -} - -func benchmarkReferenceTCP(b *testing.B) { - // create random TCP listener listening on localhost - tcpLis, err := net.ListenTCP("tcp", nil) - if err != nil { - b.Fatal(err) - } - defer tcpLis.Close() - - // goroutine to accept incoming connections - var lisConn net.Conn - var goroutineErr error - var wg *sync.WaitGroup = new(sync.WaitGroup) - wg.Add(1) - go func() { - defer wg.Done() - lisConn, goroutineErr = tcpLis.Accept() - }() - - nConn, err := net.Dial("tcp", tcpLis.Addr().String()) - if err != nil { - b.Fatal(err) - } - - // wait for listener to accept connection - wg.Wait() - if goroutineErr != nil { - b.Fatal(goroutineErr) - } - - var sendMsg []byte = make([]byte, 1024) - rand.Read(sendMsg) - - b.SetBytes(1024) - b.ResetTimer() - start := time.Now() - for i := 0; i < b.N; i++ { - _, err = nConn.Write(sendMsg) - if err != nil { - b.Logf("Write error, cntr: %d, N: %d", i, b.N) - b.Fatal(err) - } - - // receive data - buf := make([]byte, 1024) - _, err = lisConn.Read(buf) - if err != nil { - b.Logf("Read error, cntr: %d, N: %d", i, b.N) - b.Fatal(err) - } - - // time.Sleep(10 * time.Microsecond) - } - b.StopTimer() - b.Logf("avg bandwidth: %f MB/s (N=%d)", float64(b.N*1024)/time.Since(start).Seconds()/1024/1024, b.N) - - if err = nConn.Close(); err != nil { - b.Fatal(err) - } -} diff --git a/core.go b/core.go index b1fd23e..b43211c 100644 --- a/core.go +++ b/core.go @@ -6,11 +6,32 @@ import ( "github.com/bytecodealliance/wasmtime-go/v13" ) -// Core provides the WASM runtime base and is an internal struct +// Core provides the low-level access to the WebAssembly runtime +// environment. +// +// Currently it depends on the wasmtime-go API, but it is subject to +// change in the future. +type Core interface { + Config() *Config + Engine() *wasmtime.Engine + Instance() *wasmtime.Instance + Linker() *wasmtime.Linker + Module() *wasmtime.Module + Store() *wasmtime.Store + Instantiate() error +} + +// core provides the WASM runtime base and is an internal struct // that every RuntimeXxx implementation will embed. // -// Core is not versioned and is not subject to breaking changes -// unless a severe bug needs to be fixed in a breaking way. +// core is designed to be unmanaged and version-independent, +// which means it does not provide any functionalities other +// than simply collecting all the WASM runtime-related objects +// without overwriting access on them. And core is not subject +// to breaking changes unless a severe bug needs to be fixed +// in such a breaking manner inevitably. +// +// Implements Core. type core struct { // config config *Config @@ -23,25 +44,26 @@ type core struct { instance *wasmtime.Instance } -// Core creates a new Core, which is the base of all -// WASM runtime functionalities. -func Core(config *Config) (c *core, err error) { - c = &core{ +// NewCore creates a new Core with the given config. +// +// It uses the default implementation of interface.Core as +// defined in this file. +func NewCore(config *Config) (Core, error) { + c := &core{ config: config, } - var wasiConfig *wasmtime.WasiConfig - wasiConfig, err = c.config.WASIConfig().GetConfig() + wasiConfig, err := c.config.WASIConfig().GetConfig() if err != nil { err = fmt.Errorf("water: (*WasiConfigFactory).GetConfig returned error: %w", err) - return + return nil, err } c.engine = wasmtime.NewEngine() c.module, err = wasmtime.NewModule(c.engine, c.config.WATMBinOrPanic()) if err != nil { err = fmt.Errorf("water: wasmtime.NewModule returned error: %w", err) - return + return nil, err } c.store = wasmtime.NewStore(c.engine) c.store.SetWasiConfig(wasiConfig) @@ -49,28 +71,10 @@ func Core(config *Config) (c *core, err error) { err = c.linker.DefineWasi() if err != nil { err = fmt.Errorf("water: (*wasmtime.Linker).DefineWasi returned error: %w", err) - return + return nil, err } - return -} - -func (c *core) DialVersion(network, address string) (Conn, error) { - for _, export := range c.module.Exports() { - if f, ok := mapCoreDialContext[export.Name()]; ok { - return f(c, network, address) - } - } - return nil, fmt.Errorf("water: core loaded a WASM module that does not implement any known version") -} - -func (c *core) AcceptVersion() (Conn, error) { - for _, export := range c.module.Exports() { - if f, ok := mapCoreAccept[export.Name()]; ok { - return f(c) - } - } - return nil, fmt.Errorf("water: core loaded a WASM module that does not implement any known version") + return c, nil } // Config returns the Config used to create the Core. diff --git a/dialer.go b/dialer.go index f6264b0..a44234c 100644 --- a/dialer.go +++ b/dialer.go @@ -2,70 +2,89 @@ package water import ( "context" - "fmt" + "errors" ) -// Dialer dials the given network address upon caller calling -// Dial() and returns a net.Conn which is connected to the -// WASM module. +// Dialer dials a remote network address upon caller calling +// Dial() and returns a net.Conn which is upgraded by the +// WebAssembly Transport Module. // // The structure of a Dialer is as follows: // // dial +----------------+ dial -// ----->| Decode |------> -// Caller | WASM Runtime | Remote -// <-----| Decode/Encode |<------ +// ----->| Upgrade |------> +// Caller | WebAssembly | Remote +// <-----| Downgrade |<------ // +----------------+ // Dialer -type Dialer struct { - // Config is the configuration for the core. - Config *Config -} +type Dialer interface { + Dial(network, address string) (Conn, error) + DialContext(ctx context.Context, network, address string) (Conn, error) -func (c *Config) Dialer() *Dialer { - return &Dialer{ - Config: c.Clone(), - } + mustEmbedUnimplementedDialer() } -// Dialer dials the given network address using the specified dialer -// in the config. The returned RuntimeConn implements net.Conn and -// could be seen as the outbound connection with a wrapping transport -// protocol handled by the WASM module. +type newDialerFunc func(*Config) (Dialer, error) + +var ( + knownDialerVersions = make(map[string]newDialerFunc) + + ErrDialerAlreadyRegistered = errors.New("water: dialer already registered") + ErrDialerVersionNotFound = errors.New("water: dialer version not found") + ErrUnimplementedDialer = errors.New("water: unimplemented dialer") +) + +// UnimplementedDialer is a Dialer that always returns errors. // -// Internally, DialContext() is called with a background context. -func (d *Dialer) Dial(network, address string) (Conn, error) { - return d.DialContext(context.Background(), network, address) +// It is used to ensure forward compatibility of the Dialer interface. +type UnimplementedDialer struct{} + +// Dial implements Dialer.Dial(). +func (*UnimplementedDialer) Dial(_, _ string) (Conn, error) { + return nil, ErrUnimplementedDialer +} + +// DialContext implements Dialer.DialContext(). +func (*UnimplementedDialer) DialContext(_ context.Context, _, _ string) (Conn, error) { + return nil, ErrUnimplementedDialer +} + +// mustEmbedUnimplementedDialer is a function that developers cannot +// manually implement. It is used to ensure forward compatibility of +// the Dialer interface. +func (*UnimplementedDialer) mustEmbedUnimplementedDialer() {} + +// RegisterDialer registers a dialer function for the given version to +// the global registry. Only registered versions can be recognized and +// used by NewDialer(). +func RegisterDialer(version string, dialer newDialerFunc) error { + if _, ok := knownDialerVersions[version]; ok { + return ErrDialerAlreadyRegistered + } + knownDialerVersions[version] = dialer + return nil } -// DialContext dials the given network address using the specified dialer -// in the config. The returned RuntimeConn implements net.Conn and -// could be seen as the outbound connection with a wrapping transport -// protocol handled by the WASM module. +// NewDialer creates a new Dialer from the config. // -// If the context expires before the connection is complete, an error is -// returned. -func (d *Dialer) DialContext(ctx context.Context, network, address string) (conn Conn, err error) { - if d.Config == nil { - return nil, fmt.Errorf("water: dialing with nil config is not allowed") +// It automatically detects the version of the WebAssembly Transport +// Module specified in the config. +func NewDialer(c *Config) (Dialer, error) { + core, err := NewCore(c) + if err != nil { + return nil, err } - ctxReady, dialReady := context.WithCancel(context.Background()) - go func() { - defer dialReady() - var core *core - core, err = Core(d.Config) - if err != nil { - return + // Search through all exported names and match them to potential + // Dialer versions. + // + // TODO: detect the version of the WebAssembly Transport Module + // in a more organized way. + for _, export := range core.Module().Exports() { + if f, ok := knownDialerVersions[export.Name()]; ok { + return f(c) } - - conn, err = core.DialVersion(network, address) - }() - - select { - case <-ctx.Done(): - return nil, ctx.Err() - case <-ctxReady.Done(): - return conn, err } + + return nil, ErrDialerVersionNotFound } diff --git a/examples/hexencoder/client/client.go b/examples/hexencoder/client/client.go deleted file mode 100644 index 164035b..0000000 --- a/examples/hexencoder/client/client.go +++ /dev/null @@ -1,75 +0,0 @@ -package main - -import ( - "fmt" - "os" - "runtime" - "time" - - "github.com/gaukas/water" - "github.com/gaukas/water/internal/log" -) - -func main() { - if len(os.Args) != 2 { - panic("usage: client ") - } - remoteAddr := os.Args[1] - - // read file into hexencoder_v0 - hexencoder_v0, err := os.ReadFile("./examples/hexencoder/hexencoder_v0.wasm") - if err != nil { - panic(err) - } - - // Dial - dialer := &water.Dialer{ - Config: &water.Config{ - WATMBin: hexencoder_v0, - WATMConfig: water.WATMConfig{ - FilePath: "./examples/hexencoder/hexencoder_v0.dialer.json", - }, - }, - } - dialer.Config.WASIConfig().InheritStdout() - - rConn, err := dialer.Dial("tcp", remoteAddr) - if err != nil { - panic(err) - } - defer rConn.Close() - - // simulate Go GC - runtime.GC() - time.Sleep(100 * time.Millisecond) - - // spin up two goroutines to read and write - // go func() { - // for { - // buf := make([]byte, 1024) - // n, err := rConn.Read(buf) - // if err != nil { - // panic(err) - // } - // log.Infof("Server: %s", buf[:n]) - // // time.Sleep(5 * time.Second) - // } - // }() - - go func() { - var cntr int - for { - msg := fmt.Sprintf("hello world (%d)", cntr) - _, err := rConn.Write([]byte(msg)) - if err != nil { - panic(err) - } - cntr++ - log.Infof("Client: %s", msg) - time.Sleep(5 * time.Second) - } - }() - - // wait forever - select {} -} diff --git a/examples/hexencoder/hexencoder_v0.dialer.json b/examples/hexencoder/hexencoder_v0.dialer.json deleted file mode 100644 index b1ee3f0..0000000 --- a/examples/hexencoder/hexencoder_v0.dialer.json +++ /dev/null @@ -1,4 +0,0 @@ -{ - "role": "dialer", - "mode": "uppercase" -} \ No newline at end of file diff --git a/examples/hexencoder/hexencoder_v0.listener.json b/examples/hexencoder/hexencoder_v0.listener.json deleted file mode 100644 index c7b49c2..0000000 --- a/examples/hexencoder/hexencoder_v0.listener.json +++ /dev/null @@ -1,4 +0,0 @@ -{ - "role": "listener", - "mode": "lowercase" -} \ No newline at end of file diff --git a/examples/hexencoder/server/server.go b/examples/hexencoder/server/server.go deleted file mode 100644 index bf6d3b7..0000000 --- a/examples/hexencoder/server/server.go +++ /dev/null @@ -1,77 +0,0 @@ -package main - -import ( - "os" - "runtime" - "time" - - "github.com/gaukas/water" - "github.com/gaukas/water/internal/log" -) - -func main() { - if len(os.Args) != 2 { - panic("usage: server ") - } - localAddr := os.Args[1] - - // read file into hexencoder_v0 - hexencoder_v0, err := os.ReadFile("./examples/hexencoder/hexencoder_v0.wasm") - if err != nil { - panic(err) - } - - // Listen - config := &water.Config{ - WATMBin: hexencoder_v0, - WATMConfig: water.WATMConfig{ - FilePath: "./examples/hexencoder/hexencoder_v0.listener.json", - }, - } - config.WASIConfig().InheritStdout() - lis, err := config.Listen("tcp", localAddr) - if err != nil { - panic(err) - } - - // Accept - rConn, err := lis.Accept() - if err != nil { - panic(err) - } - defer rConn.Close() - - // simulate Go GC - runtime.GC() - time.Sleep(100 * time.Millisecond) - - // spin up two goroutines to read and write - go func() { - for { - buf := make([]byte, 1024) - n, err := rConn.Read(buf) - if err != nil { - panic(err) - } - log.Infof("Client: %s", buf[:n]) - // time.Sleep(5 * time.Second) - } - }() - - // go func() { - // var cntr int - // for { - // msg := fmt.Sprintf("hello world (%d)", cntr) - // _, err := rConn.Write([]byte(msg)) - // if err != nil { - // panic(err) - // } - // cntr++ - // log.Infof("Server: %s", msg) - // time.Sleep(5 * time.Second) - // } - // }() - - // wait forever - select {} -} diff --git a/examples/v0plus/plain/README.md b/examples/v0plus/plain/README.md new file mode 100644 index 0000000..126ae4f --- /dev/null +++ b/examples/v0plus/plain/README.md @@ -0,0 +1,3 @@ +# Plain + +This is just an example of using `water` with the `plain` transport, which does nothing to the bytestream but transmit it as-is. \ No newline at end of file diff --git a/examples/v0plus/plain/dialer/main.go b/examples/v0plus/plain/dialer/main.go new file mode 100644 index 0000000..77dd3c1 --- /dev/null +++ b/examples/v0plus/plain/dialer/main.go @@ -0,0 +1,102 @@ +package main + +import ( + "crypto/rand" + "fmt" + "net" + "os" + "time" + + "github.com/gaukas/water" + "github.com/gaukas/water/internal/log" + _ "github.com/gaukas/water/transport/v0" +) + +func main() { + if len(os.Args) != 2 { + panic("usage: dialer ") + } + var remoteAddr string = os.Args[1] + + wasm, err := os.ReadFile("./examples/v0plus/plain/plain.wasm") + if err != nil { + panic(fmt.Sprintf("failed to read wasm file: %v", err)) + } + + // start using W.A.T.E.R. API below this line, have fun! + config := &water.Config{ + TMBin: wasm, + NetworkDialerFunc: net.Dial, // optional field, defaults to net.Dial + } + // configuring the standard out of the WebAssembly instance to inherit + // from the parent process + config.WASIConfig().InheritStdout() + config.WASIConfig().InheritStderr() + + dialer, err := water.NewDialer(config) + if err != nil { + panic(fmt.Sprintf("failed to create dialer: %v", err)) + } + + conn, err := dialer.Dial("tcp", remoteAddr) + if err != nil { + panic(fmt.Sprintf("failed to dial: %v", err)) + } + defer conn.Close() + // conn is a net.Conn that you are familiar with. + // So effectively, W.A.T.E.R. API ends here and everything below + // this line is just how you treat a net.Conn. + + handleConn("server", conn) +} + +func handleConn(peer string, conn net.Conn) { + defer conn.Close() + + log.Infof("handling connection from/to %s(%s)", peer, conn.RemoteAddr()) + chanMsgRecv := make(chan []byte, 4) // up to 4 messages in the buffer + // start a goroutine to read data from the connection + go func() { + defer close(chanMsgRecv) + buf := make([]byte, 1024) // 1 KiB + for { + conn.SetReadDeadline(time.Now().Add(5 * time.Second)) + n, err := conn.Read(buf) + if err != nil { + log.Warnf("read %s: error %v, tearing down connection...", peer, err) + conn.Close() + return + } + chanMsgRecv <- buf[:n] + } + }() + + // start a ticker for sending data every 2 seconds + ticker := time.NewTicker(2 * time.Second) + defer ticker.Stop() + + var sendBuf []byte = make([]byte, 4) // 4 bytes per message + for { + select { + case msg := <-chanMsgRecv: + if msg == nil { + return // connection closed + } + log.Infof("%s: %x\n", peer, msg) + case <-ticker.C: + n, err := rand.Read(sendBuf) + if err != nil { + log.Warnf("rand.Read: error %v, tearing down connection...", err) + return + } + // print the bytes sending as hex string + log.Infof("sending: %x\n", sendBuf[:n]) + + _, err = conn.Write(sendBuf[:n]) + if err != nil { + log.Warnf("write %s: error %v, tearing down connection...", peer, err) + return + } + } + } +} diff --git a/examples/v0plus/plain/listener/main.go b/examples/v0plus/plain/listener/main.go new file mode 100644 index 0000000..a042ff4 --- /dev/null +++ b/examples/v0plus/plain/listener/main.go @@ -0,0 +1,109 @@ +package main + +import ( + "crypto/rand" + "fmt" + "net" + "os" + "time" + + "github.com/gaukas/water" + "github.com/gaukas/water/internal/log" + _ "github.com/gaukas/water/transport/v0" +) + +func main() { + if len(os.Args) != 2 { + panic("usage: listener ") + } + var localAddr string = os.Args[1] + + wasm, err := os.ReadFile("./examples/v0plus/plain/plain.wasm") + if err != nil { + panic(fmt.Sprintf("failed to read wasm file: %v", err)) + } + + // start using W.A.T.E.R. API below this line, have fun! + config := &water.Config{ + TMBin: wasm, + // NetworkDialerFunc: net.Dial, // optional field, defaults to net.Dial + } + // configuring the standard out of the WebAssembly instance to inherit + // from the parent process + config.WASIConfig().InheritStdout() + config.WASIConfig().InheritStderr() + + lis, err := config.Listen("tcp", localAddr) + if err != nil { + panic(fmt.Sprintf("failed to listen: %v", err)) + } + defer lis.Close() + log.Infof("Listening on %s", lis.Addr().String()) + // lis is a net.Listener that you are familiar with. + // So effectively, W.A.T.E.R. API ends here and everything below + // this line is just how you treat a net.Listener. + + clientCntr := 0 + for { + conn, err := lis.Accept() + if err != nil { + panic(fmt.Sprintf("failed to accept: %v", err)) + } + + // start a goroutine to handle the connection + go handleConn(fmt.Sprintf("client#%d", clientCntr), conn) + clientCntr++ + } +} + +func handleConn(peer string, conn net.Conn) { + defer conn.Close() + + log.Infof("handling connection from/to %s(%s)", peer, conn.RemoteAddr()) + chanMsgRecv := make(chan []byte, 4) // up to 4 messages in the buffer + // start a goroutine to read data from the connection + go func() { + defer close(chanMsgRecv) + buf := make([]byte, 1024) // 1 KiB + for { + // conn.SetReadDeadline(time.Now().Add(5 * time.Second)) + n, err := conn.Read(buf) + if err != nil { + log.Warnf("read %s: error %v, tearing down connection...", peer, err) + conn.Close() + return + } + chanMsgRecv <- buf[:n] + } + }() + + // start a ticker for sending data every 2 seconds + ticker := time.NewTicker(2 * time.Second) + defer ticker.Stop() + + var sendBuf []byte = make([]byte, 4) // 4 bytes per message + for { + select { + case msg := <-chanMsgRecv: + if msg == nil { + log.Warnf("read %s: connection closed, tearing down connection...", peer) + return // connection closed + } + log.Infof("%s: %x\n", peer, msg) + case <-ticker.C: + n, err := rand.Read(sendBuf) + if err != nil { + log.Warnf("rand.Read: error %v, tearing down connection...", err) + return + } + // print the bytes sending as hex string + log.Infof("sending: %x\n", sendBuf[:n]) + + _, err = conn.Write(sendBuf[:n]) + if err != nil { + log.Warnf("write %s: error %v, tearing down connection...", peer, err) + return + } + } + } +} diff --git a/examples/hexencoder/hexencoder_v0.wasm b/examples/v0plus/plain/plain.wasm similarity index 51% rename from examples/hexencoder/hexencoder_v0.wasm rename to examples/v0plus/plain/plain.wasm index 755133d..ebeeb6c 100644 Binary files a/examples/hexencoder/hexencoder_v0.wasm and b/examples/v0plus/plain/plain.wasm differ diff --git a/examples/v0plus/plain/relay/main.go b/examples/v0plus/plain/relay/main.go new file mode 100644 index 0000000..992f7b2 --- /dev/null +++ b/examples/v0plus/plain/relay/main.go @@ -0,0 +1,43 @@ +package main + +import ( + "fmt" + "net" + "os" + + "github.com/gaukas/water" + _ "github.com/gaukas/water/transport/v0" +) + +func main() { + if len(os.Args) != 3 { + panic("usage: relay ") + } + var localAddr string = os.Args[1] + var remoteAddr string = os.Args[2] + + wasm, err := os.ReadFile("./examples/v0plus/plain/plain.wasm") + if err != nil { + panic(fmt.Sprintf("failed to read wasm file: %v", err)) + } + + // start using W.A.T.E.R. API below this line, have fun! + config := &water.Config{ + TMBin: wasm, + NetworkDialerFunc: net.Dial, // optional field, defaults to net.Dial + } + // configuring the standard out of the WebAssembly instance to inherit + // from the parent process + config.WASIConfig().InheritStdout() + config.WASIConfig().InheritStderr() + + relay, err := water.NewRelay(config) + if err != nil { + panic(fmt.Sprintf("failed to create dialer: %v", err)) + } + + err = relay.ListenAndRelayTo("tcp", localAddr, "tcp", remoteAddr) + if err != nil { + panic(fmt.Sprintf("failed to dial: %v", err)) + } +} diff --git a/feature.go b/feature.go deleted file mode 100644 index 5ad5030..0000000 --- a/feature.go +++ /dev/null @@ -1,14 +0,0 @@ -package water - -type Feature uint64 - -// Feature is a bit mask of experimental features of WATER. -// -// TODO: implement Feature. -const ( - FEATURE_DUMMY Feature = 1 << iota // a dummy feature that does nothing. - FEATURE_RESERVED // reserved for future use - // ... - FEATURE_CWAL Feature = 0xFFFFFFFFFFFFFFFF // CWAL = Can't Wait Any Longer - FEATURE_NONE Feature = 0 // NONE = No Experimental Features -) diff --git a/gcfix.go b/gcfix.go deleted file mode 100644 index bdbe98a..0000000 --- a/gcfix.go +++ /dev/null @@ -1,12 +0,0 @@ -//go:build !nogcfix - -package water - -// GCFIX is a workaround to prevent Go GC from incorrectly garbage -// collecting the cloned `*os.File` pushed to WASM with `PushFile()`. -// -// BUG: There is an undocumented GC issue in Go 1.20 and Go 1.21. -// The first `*os.File` pushed to WASM with `PushFile()` from wasmtime -// will be incorrectly garbage collected by Go GC even if it is still -// accessible from Go. -const GCFIX bool = true diff --git a/go.sum b/go.sum index 160c503..f7b1fca 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,12 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/refraction-networking/wasmtime-go/v13 v13.0.0 h1:5Asz7xwxaRW59P9hTwxKjn5gKjf7BziCX0+Y9CIZJPs= github.com/refraction-networking/wasmtime-go/v13 v13.0.0/go.mod h1:KmsZLdjjzNH/E5wbfoRehqP70tHzKlfNOi730VCAR4E= github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g= golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/log/slog.go b/internal/log/slog.go index eec406c..c12f3ba 100644 --- a/internal/log/slog.go +++ b/internal/log/slog.go @@ -21,7 +21,7 @@ func Infof(format string, args ...any) { slogger.Info(fmt.Sprintf(format, args...)) } -func Warningf(format string, args ...any) { +func Warnf(format string, args ...any) { slogger.Warn(fmt.Sprintf(format, args...)) } diff --git a/internal/log/slog_old.go b/internal/log/slog_old.go index 933e1e8..1825d95 100644 --- a/internal/log/slog_old.go +++ b/internal/log/slog_old.go @@ -22,7 +22,7 @@ func Infof(format string, args ...any) { slogger.Info(fmt.Sprintf(format, args...)) } -func Warningf(format string, args ...any) { +func Warnf(format string, args ...any) { slogger.Warn(fmt.Sprintf(format, args...)) } diff --git a/internal/socket/file.go b/internal/socket/file.go index 22cc842..d9ba54f 100644 --- a/internal/socket/file.go +++ b/internal/socket/file.go @@ -9,12 +9,15 @@ import ( "github.com/gaukas/water/internal/log" ) +// ErrNoKnownConversion is returned when the given object cannot be converted to *os.File var ErrNoKnownConversion = errors.New("no known conversion to *os.File") +// EmbedFile is an interface for objects that can be converted to *os.File type EmbedFile interface { File() (*os.File, error) } +// AsFile converts the given object to *os.File func AsFile(f any) (*os.File, error) { switch f := f.(type) { case *os.File: diff --git a/internal/socket/tcpconn.go b/internal/socket/tcpconn.go index c1402ba..541294f 100644 --- a/internal/socket/tcpconn.go +++ b/internal/socket/tcpconn.go @@ -6,8 +6,13 @@ import ( "sync" ) -func TCPConnPair(address string) (c1, c2 net.Conn, err error) { - l, err := net.Listen("tcp", address) +func TCPConnPair(address ...string) (c1, c2 net.Conn, err error) { + var tcpAddr string = ":0" + if len(address) > 0 && address[0] != "" { + tcpAddr = address[0] + } + + l, err := net.Listen("tcp", tcpAddr) // skipcq: GSC-G102 if err != nil { return nil, nil, fmt.Errorf("net.Listen returned error: %w", err) } diff --git a/internal/socket/unixconn.go b/internal/socket/unixconn.go index 667acff..1bd970a 100644 --- a/internal/socket/unixconn.go +++ b/internal/socket/unixconn.go @@ -2,13 +2,14 @@ package socket import ( "crypto/rand" - "encoding/hex" "fmt" "io" "net" "os" "sync" "time" + + "github.com/gaukas/water/internal/log" ) // UnixConnWrap wraps an io.Reader/io.Writer/io.ReadWriteCloser @@ -25,83 +26,109 @@ import ( // Once this function is invoked, the caller should not perform I/O // operations on the ReadWriteCloser anymore. func UnixConnWrap(obj any) (*net.UnixConn, error) { - // randomize the name of the socket - var randName []byte = make([]byte, 8) // 8-byte so 16-char hex string, 64-bit randomness is good enough - if _, err := rand.Read(randName); err != nil { + // get a pair of connected UnixConn + unixConn, reverseUnixConn, err := UnixConnPair() + if err != nil && (unixConn == nil || reverseUnixConn == nil) { return nil, err } - socketName := hex.EncodeToString(randName) - // listen on the socket - unixAddr, err := net.ResolveUnixAddr("unix", os.TempDir()+"/"+string(socketName)) - if err != nil { - return nil, err + wg := new(sync.WaitGroup) + + // if the object implements io.Reader: read from the object and write to the reverseUnixConn + if reader, ok := obj.(io.Reader); ok { + wg.Add(1) + go func() { + _, _ = io.Copy(reverseUnixConn, reader) + + // when the src is closed, we will close the dst + time.Sleep(1 * time.Millisecond) + log.Debugf("closing reverseUnixConn and unixConn") + err = reverseUnixConn.Close() + err = unixConn.Close() + wg.Done() + }() } - unixListener, err := net.ListenUnix("unix", unixAddr) - if err != nil { - return nil, err + + // if the object implements io.Writer: read from the reverseUnixConn and write to the object + if writer, ok := obj.(io.Writer); ok { + wg.Add(1) + go func() { + _, _ = io.Copy(writer, reverseUnixConn) + // when the src is closed, we will close the dst + if closer, ok := obj.(io.Closer); ok { + time.Sleep(1 * time.Millisecond) + log.Debugf("closing obj") + _ = closer.Close() + } + wg.Done() + }() } - defer unixListener.Close() // we will no longer need this listener since the name is not recorded anywhere - // spin up a goroutine to wait for listening - var unixConn *net.UnixConn - var acceptErr error - acceptWg := &sync.WaitGroup{} - acceptWg.Add(1) - go func() { - defer acceptWg.Done() - unixConn, acceptErr = unixListener.AcceptUnix() // so caller will have the accepted connection - if acceptErr != nil { - return - } - }() + wg.Wait() + return unixConn, nil +} - // reverseUnixConn is used to access the unixConn's read/write buffer: - // - writing to reverseUnixConn = save to unixConn's read buffer - // - reading from reverseUnixConn = read from unixConn's write buffer - reverseUnixConn, err := net.DialUnix("unix", nil, unixAddr) - if err != nil { +func UnixConnFileWrap(obj any) (*os.File, error) { + // get a pair of connected UnixConn + unixConn, reverseUnixConn, err := UnixConnPair() + if err != nil && (unixConn == nil || reverseUnixConn == nil) { return nil, err } - acceptWg.Wait() // wait for the goroutine to accept the connection - if acceptErr != nil { - return nil, acceptErr + + unixConnFile, err := unixConn.File() + if err != nil { + return nil, err } + wg := new(sync.WaitGroup) + // if the object implements io.Reader: read from the object and write to the reverseUnixConn if reader, ok := obj.(io.Reader); ok { + wg.Add(1) go func() { - io.Copy(reverseUnixConn, reader) + _, _ = io.Copy(reverseUnixConn, reader) // when the src is closed, we will close the dst time.Sleep(1 * time.Millisecond) + log.Debugf("closing reverseUnixConn and unixConn") reverseUnixConn.Close() + _ = unixConn.Close() + _ = unixConnFile.Close() + wg.Done() }() } // if the object implements io.Writer: read from the reverseUnixConn and write to the object if writer, ok := obj.(io.Writer); ok { + wg.Add(1) go func() { - io.Copy(writer, reverseUnixConn) + _, _ = io.Copy(writer, reverseUnixConn) // when the src is closed, we will close the dst if closer, ok := obj.(io.Closer); ok { time.Sleep(1 * time.Millisecond) - closer.Close() + log.Debugf("closing obj") + _ = closer.Close() } + wg.Done() }() } - return unixConn, nil + wg.Wait() + return unixConnFile, nil } -func UnixConnPair(path string) (c1, c2 net.Conn, err error) { - unixPath := path - if path == "" { +func UnixConnPair(path ...string) (*net.UnixConn, *net.UnixConn, error) { + var c1, c2 net.Conn + + unixPath := "" + if len(path) == 0 || path[0] == "" { // randomize a socket name randBytes := make([]byte, 16) if _, err := rand.Read(randBytes); err != nil { return nil, nil, fmt.Errorf("crypto/rand.Read returned error: %w", err) } unixPath = os.TempDir() + string(os.PathSeparator) + fmt.Sprintf("%x", randBytes) + } else { + unixPath = path[0] } // create a one-time use UnixListener @@ -133,5 +160,14 @@ func UnixConnPair(path string) (c1, c2 net.Conn, err error) { return nil, nil, fmt.Errorf("c1 or c2 is nil") } - return c1, c2, ul.Close() + // type assertion + if uc1, ok := c1.(*net.UnixConn); ok { + if uc2, ok := c2.(*net.UnixConn); ok { + return uc1, uc2, ul.Close() + } else { + return nil, nil, fmt.Errorf("c2 is not *net.UnixConn") + } + } else { + return nil, nil, fmt.Errorf("c1 is not *net.UnixConn") + } } diff --git a/internal/socket/unixconn_test.go b/internal/socket/unixconn_test.go index 3ffee47..82f9d31 100644 --- a/internal/socket/unixconn_test.go +++ b/internal/socket/unixconn_test.go @@ -12,7 +12,7 @@ import ( ) func TestUnixConnPair(t *testing.T) { - c1, c2, err := socket.UnixConnPair("") + c1, c2, err := socket.UnixConnPair() if err != nil { if c1 == nil || c2 == nil { t.Fatal(err) @@ -42,9 +42,11 @@ func TestUnixConnPair(t *testing.T) { func testIO(wrConn, rdConn net.Conn, N int, sz int, sleep time.Duration) error { var sendMsg []byte = make([]byte, sz) - rand.Read(sendMsg) + _, err := rand.Read(sendMsg) + if err != nil { + return fmt.Errorf("rand.Read error: %w", err) + } - var err error for i := 0; i < N; i++ { _, err = wrConn.Write(sendMsg) if err != nil { diff --git a/internal/system/connhalt_fix_disabled.go b/internal/system/connhalt_fix_disabled.go new file mode 100644 index 0000000..e65308c --- /dev/null +++ b/internal/system/connhalt_fix_disabled.go @@ -0,0 +1,20 @@ +//go:build !connhaltbug + +package system + +// CONN_HALT_BUG is a boolean flag indicating whether the connection halt bug is present. +// +// Connection Halting Bug: for some reason when a TCP Connection is closed by the remote by +// a FIN packet, even tho the Go will be able to detect that and return EOF on the next Read(), +// the async read in WebAssembly may not be able to correctly handle that. In rare cases, the +// non-blocking async read becomes blocking and does not return EOF immediately, and meanwhile +// preventing other async operations from making progress, thus halting the whole system. +// +// This bug is not a runtime-side bug, but it is non-trivial to fix or work around inside the +// WebAssembly environment. Therefore from the runtime-side we work around it by using a +// unix socket to relay between the network socket and the WebAssembly environment. When the +// runtime detects that the network socket is closed, it will close the unix socket, which +// will immediately unblock the async read in WebAssembly. +// +// First reported in a finalist v0 standard (2023 Oct). +const CONN_HALT_BUG = false diff --git a/internal/system/connhalt_fix_enabled.go b/internal/system/connhalt_fix_enabled.go new file mode 100644 index 0000000..c19fc60 --- /dev/null +++ b/internal/system/connhalt_fix_enabled.go @@ -0,0 +1,20 @@ +//go:build connhaltbug + +package system + +// CONN_HALT_BUG is a boolean flag indicating whether the connection halt bug is present. +// +// Connection Halting Bug: for some reason when a TCP Connection is closed by the remote by +// a FIN packet, even tho the Go will be able to detect that and return EOF on the next Read(), +// the async read in WebAssembly may not be able to correctly handle that. In rare cases, the +// non-blocking async read becomes blocking and does not return EOF immediately, and meanwhile +// preventing other async operations from making progress, thus halting the whole system. +// +// This bug is not a runtime-side bug, but it is non-trivial to fix or work around inside the +// WebAssembly environment. Therefore from the runtime-side we work around it by using a +// unix socket to relay between the network socket and the WebAssembly environment. When the +// runtime detects that the network socket is closed, it will close the unix socket, which +// will immediately unblock the async read in WebAssembly. +// +// First reported in a finalist v0 standard (2023 Oct). +const CONN_HALT_BUG = true diff --git a/internal/system/gcfix_disabled.go b/internal/system/gcfix_disabled.go new file mode 100644 index 0000000..eabfadc --- /dev/null +++ b/internal/system/gcfix_disabled.go @@ -0,0 +1,13 @@ +//go:build !gcbug + +package system + +// GC_BUG is a boolean flag indicating whether the garbage collector bug is present. +// +// Garbage Collection bug: Go GC will garbage collect the first file we pushed +// into the WASM module no matter what. Seemingly no one admits this is a bug +// on their side, so we work around it by pushing a dummy file first. +// +// First reported in a v0 draft (2023 Sep). It is no longer observed later in the +// finalist v0 standard. +const GC_BUG = false diff --git a/internal/system/gcfix_enabled.go b/internal/system/gcfix_enabled.go new file mode 100644 index 0000000..3cc978c --- /dev/null +++ b/internal/system/gcfix_enabled.go @@ -0,0 +1,13 @@ +//go:build gcbug + +package system + +// GC_BUG is a boolean flag indicating whether the garbage collector bug is present. +// +// Garbage Collection bug: Go GC will garbage collect the first file we pushed +// into the WASM module no matter what. Seemingly no one admits this is a bug +// on their side, so we work around it by pushing a dummy file first. +// +// First reported in a v0 draft (2023 Sep). It is no longer observed later in the +// finalist v0 standard. +const GC_BUG = true diff --git a/internal/v0/managed_dialer.go b/internal/v0/managed_dialer.go new file mode 100644 index 0000000..471b1cf --- /dev/null +++ b/internal/v0/managed_dialer.go @@ -0,0 +1,28 @@ +package v0 + +import ( + "net" +) + +// ManagedDialer restricts the network and address to be +// used by the dialerFunc. +type ManagedDialer struct { + network string + address string + dialerFunc func(network, address string) (net.Conn, error) + // mapFdConn map[int32]net.Conn // saves all the connections created by this WasiDialer by their file descriptors! (So we could close them when needed) + // mapFdClonedFile map[int32]*os.File // saves all files so GC won't close them +} + +func NewManagedDialer(network, address string, dialerFunc func(network, address string) (net.Conn, error)) *ManagedDialer { + return &ManagedDialer{ + network: network, + address: address, + dialerFunc: dialerFunc, + } +} + +// dial(apw i32) -> fd i32 +func (md *ManagedDialer) Dial() (net.Conn, error) { + return md.dialerFunc(md.network, md.address) +} diff --git a/internal/v0/transport_module.go b/internal/v0/transport_module.go new file mode 100644 index 0000000..d5ec2ee --- /dev/null +++ b/internal/v0/transport_module.go @@ -0,0 +1,628 @@ +//go:build !nov0 + +package v0 + +import ( + "fmt" + "net" + "os" + "runtime" + "sync" + + "github.com/bytecodealliance/wasmtime-go/v13" + "github.com/gaukas/water" + "github.com/gaukas/water/internal/log" + "github.com/gaukas/water/internal/socket" + "github.com/gaukas/water/internal/system" + "github.com/gaukas/water/internal/wasm" +) + +// TransportModule acts like a "managed core". It was build to provide WebAssembly +// Transport Module API-facing functions and utilities that are exclusive to +// version 0. +type TransportModule struct { + core water.Core + + _init *wasmtime.Func // _init() -> i32 + + // _dial: + // - Calls to `env.host_dial() -> fd: i32` to dial a network connection and bind it to one + // of its file descriptors, record the fd for `remoteConnFd`. This will be the fd it used + // to read/write data from/to the remote destination. + // - Records the `callerConnFd`. This will be the fd it used to read/write data from/to + // the caller. + // - Returns `remoteConnFd` to the caller. + _dial *wasmtime.Func // _dial(callerConnFd i32) -> (remoteConnFd i32) + + // _accept: + // - Calls to `env.host_accept() -> fd: i32` to accept a network connection and bind it + // to one of its file descriptors, record the fd for `sourceConnFd`. This will be the fd + // it used to read/write data from/to the source address. + // - Records the `callerConnFd`. This will be the fd it used to read/write data from/to + // the caller. + // - Returns `sourceConnFd` to the caller. + _accept *wasmtime.Func // _accept(callerConnFd i32) -> (sourceConnFd i32) + + // _associate: + // - Calls to `env.host_accept() -> fd: i32` to accept a network connection and bind it + // to one of its file descriptors, record the fd for `sourceConnFd`. This will be the fd + // it used to read/write data from/to the source address. + // - Calls to `env.host_dial() -> fd: i32` to dial a network connection and bind it to one + // of its file descriptors, record the fd for `remoteConnFd`. This will be the fd it used + // to read/write data from/to the remote destination. + // - Returns 0 to the caller or an error code if any of the above steps failed. + _associate *wasmtime.Func // _associate() -> (err i32) + + // backgroundWorker is used to replace the deprecated read-write-close model. + // We put it in a inlined struct for better code styling. + backgroundWorker *struct { + // _cancel_with: + // - Provides a socket to the WASM module for it to listen to cancellation events. + // - on Cancel() call, the pipe will be written to by the host (caller). + // - WebAssembly instance should select on the socket and handle cancellation ASAP. + // + // This is a workaround for not being able to call another WASM function until the + // current one returns. And apparently this function needs to be called BEFORE the + // blocking _worker() function. + _cancel_with *wasmtime.Func // _cancel_with(fd int32) (err int32) + + // _worker provides a blocking function for the WASM module to run a worker thread. + // In the worker thread, WASM module should select on all previously pushed sockets + // (typically, two among callerConnFd, remoteConnFd, and sourceConnFd) and handle + // data bi-directionally. The exact behavior is up to the WebAssembly module and + // overall it drives data flow below based on the identity of the module: + // - Dialer: callerConn <==> remoteConn + // - Listener: callerConn <==> sourceConn + // - Relay: sourceConn <==> remoteConn + // + // The worker thread should exit and return when the cancellation pipe is available + // for reading. For the current design, the content read from the pipe does not + // include meaningful data. + _worker *wasmtime.Func // _worker() (err int32) + + // a channel used to send errors from the worker thread to the host in a non-blocking + // manner. When the worker thread exits, this channel will be closed. + // + // Read-only to the caller. Write-only to the worker thread. + chanWorkerErr chan error + + // a socket used to cancel the worker thread. When the host calls Cancel(), it should + // write to this socket. + cancelSocket net.Conn + } + + gcfixOnce *sync.Once + pushedConn map[int32]*struct { + groundTruthConn net.Conn // the conn we want to keep alive + pushedFile *os.File // the file we actually pushed to WebAssembly world also needs to be kept alive + } + pushedConnMutex *sync.RWMutex + + deferOnce *sync.Once + deferredFuncs []func() +} + +func Core2TransportModule(core water.Core) *TransportModule { + wasm := &TransportModule{ + core: core, + gcfixOnce: new(sync.Once), + pushedConn: make(map[int32]*struct { + groundTruthConn net.Conn + pushedFile *os.File + }), + pushedConnMutex: new(sync.RWMutex), + deferOnce: new(sync.Once), + deferredFuncs: make([]func(), 0), + } + + // SetFinalizer, so Go GC automatically cleans up the WASM runtime + // and all opened file descriptors (if any) associated with it + // when the TransportModule is garbage collected. + runtime.SetFinalizer(wasm, func(tm *TransportModule) { + _ = tm.Cancel() // tm cannot be nil here as we just set it above + tm.DeferAll() + tm.Cleanup() + }) + + return wasm +} + +func (tm *TransportModule) LinkNetworkInterface(dialer *ManagedDialer, listener net.Listener) error { + if tm.core.Linker() == nil { + return fmt.Errorf("water: linker not set, is Core initialized?") + } + + // import host_dial + if dialer != nil { + if err := tm.core.Linker().FuncNew( + "env", "host_dial", WASIConnectFuncType, + WrapConnectFunc( + func(caller *wasmtime.Caller) (fd int32, err error) { + conn, err := dialer.Dial() + if err != nil { + return wasm.GENERAL_ERROR, err + } + + return tm.PushConn(conn, caller) + }, + ), + ); err != nil { + return fmt.Errorf("water: linking WASI dialer, (*wasmtime.Linker).FuncNew: %w", err) + } + } else { + if err := tm.core.Linker().FuncNew( + "env", "host_dial", WASIConnectFuncType, + WrappedUnimplementedWASIConnectFunc(), + ); err != nil { + return fmt.Errorf("water: linking NOP dialer, (*wasmtime.Linker).FuncNew: %w", err) + } + } + + // import host_accept + if listener != nil { + if err := tm.core.Linker().FuncNew( + "env", "host_accept", WASIConnectFuncType, + WrapConnectFunc( + func(caller *wasmtime.Caller) (fd int32, err error) { + conn, err := listener.Accept() + if err != nil { + return wasm.GENERAL_ERROR, err + } + + return tm.PushConn(conn, caller) + }, + ), + ); err != nil { + return fmt.Errorf("water: linking WASI listener, (*wasmtime.Linker).FuncNew: %w", err) + } + } else { + if err := tm.core.Linker().FuncNew( + "env", "host_accept", WASIConnectFuncType, + WrappedUnimplementedWASIConnectFunc(), + ); err != nil { + return fmt.Errorf("water: linking NOP listener, (*wasmtime.Linker).FuncNew: %w", err) + } + } + + return nil +} + +// Initialize initializes the WASMv0 runtime by getting all the exported functions from +// the WASM module. +// +// All imports must be set before calling this function. +func (tm *TransportModule) Initialize() error { + if tm.core == nil { + return fmt.Errorf("water: no core loaded") + } + + var err error + // import host_defer function + if err = tm.core.Linker().FuncWrap("env", "host_defer", func() { + tm.DeferAll() + }); err != nil { + return fmt.Errorf("water: linking deferh function, (*wasmtime.Linker).FuncWrap: %w", err) + } + + // import pull_config function (it is called pushConfig here in the host) + if err := tm.core.Linker().FuncNew("env", "pull_config", WASIConnectFuncType, WrapConnectFunc(tm.pushConfig)); err != nil { + return fmt.Errorf("water: linking pull_config function, (*wasmtime.Linker).FuncNew: %w", err) + } + + // instantiate the WASM module + if err = tm.core.Instantiate(); err != nil { + return err + } + + // _init + tm._init = tm.core.Instance().GetFunc(tm.core.Store(), "_water_init") + if tm._init == nil { + return fmt.Errorf("water: WASM module does not export _water_init") + } + + // _dial + tm._dial = tm.core.Instance().GetFunc(tm.core.Store(), "_water_dial") + // if tm._dial == nil { + // return fmt.Errorf("water: WASM module does not export _dial") + // } + + // _accept + tm._accept = tm.core.Instance().GetFunc(tm.core.Store(), "_water_accept") + // if tm._accept == nil { + // return fmt.Errorf("water: WASM module does not export _accept") + // } + + // _associate + tm._associate = tm.core.Instance().GetFunc(tm.core.Store(), "_water_associate") + // if tm._associate == nil { + // return fmt.Errorf("water: WASM module does not export _associate") + // } + + // _cancel_with + _cancel_with := tm.core.Instance().GetFunc(tm.core.Store(), "_water_cancel_with") + if _cancel_with == nil { + return fmt.Errorf("water: WASM module does not export _water_cancel_with") + } + + // _worker + _worker := tm.core.Instance().GetFunc(tm.core.Store(), "_water_worker") + if _worker == nil { + return fmt.Errorf("water: WASM module does not export _water_worker") + } + + // wrap _cancel_with and _worker + tm.backgroundWorker = &struct { + _cancel_with *wasmtime.Func + _worker *wasmtime.Func + chanWorkerErr chan error + cancelSocket net.Conn + }{ + _cancel_with: _cancel_with, + _worker: _worker, + chanWorkerErr: make(chan error, 8), // at max 1 error would occur, but we will write multiple copies + // cancelSocket: nil, + } + + // call _init + ret, err := tm._init.Call(tm.core.Store()) + if err != nil { + return fmt.Errorf("water: calling _water_init function returned error: %w", err) + } + + return wasm.WASMErr(ret.(int32)) +} + +// DialFrom is used to make the Transport Module act as a dialer and +// dial a network connection. +// +// Takes the reverse caller connection as an argument, which is used +// to communicate with the caller. +func (tm *TransportModule) DialFrom(reverseCallerConn net.Conn) (destConn net.Conn, err error) { + // check if _dial is exported + if tm._dial == nil { + return nil, fmt.Errorf("water: WASM module does not export _water_dial") + } + + callerFd, err := tm.PushConn(reverseCallerConn) + if err != nil { + return nil, fmt.Errorf("water: pushing caller conn to store failed: %w", err) + } + + ret, err := tm._dial.Call(tm.core.Store(), callerFd) + if err != nil { + return nil, fmt.Errorf("water: calling _dial function returned error: %w", err) + } + + if remoteFd, ok := ret.(int32); !ok { + return nil, fmt.Errorf("water: invalid _dial function signature") + } else { + if remoteFd < 0 { + return nil, wasm.WASMErr(remoteFd) + } else { + destConn := tm.GetPushedConn(remoteFd) + if destConn == nil { + return nil, fmt.Errorf("water: failed to look up network connection by fd") + } + return destConn, nil + } + } +} + +// AcceptFor is used to make the Transport Module act as a listener and +// accept a network connection. +func (tm *TransportModule) AcceptFor(reverseCallerConn net.Conn) (sourceConn net.Conn, err error) { + // check if _accept is exported + if tm._accept == nil { + return nil, fmt.Errorf("water: WASM module does not export _water_accept") + } + + callerFd, err := tm.PushConn(reverseCallerConn) + if err != nil { + return nil, fmt.Errorf("water: pushing caller conn to store failed: %w", err) + } + + ret, err := tm._accept.Call(tm.core.Store(), callerFd) + if err != nil { + return nil, fmt.Errorf("water: calling _accept function returned error: %w", err) + } + + if sourceFd, ok := ret.(int32); !ok { + return nil, fmt.Errorf("water: invalid _accept function signature") + } else { + if sourceFd < 0 { + return nil, wasm.WASMErr(sourceFd) + } else { + sourceConn := tm.GetPushedConn(sourceFd) + if sourceConn == nil { + return nil, fmt.Errorf("water: failed to look up network connection by fd") + } + return sourceConn, nil + } + } +} + +// Associate is used to make the Transport Module act as a relay and +// associate two network connections, where one is from a source via +// a listener, and the other is to a destination via a dialer. +func (tm *TransportModule) Associate() error { + // check if _associate is exported + if tm._associate == nil { + return fmt.Errorf("water: WASM module does not export _water_associate") + } + + ret, err := tm._associate.Call(tm.core.Store()) + if err != nil { + return fmt.Errorf("water: calling _associate function returned error: %w", err) + } + + return wasm.WASMErr(ret.(int32)) +} + +// Worker spins up a worker thread for the WASM module to run a blocking function. +// +// This function is non-blocking UNLESS the error occurred before entering the worker +// thread. In that case, the error will be returned immediately. +func (tm *TransportModule) Worker() error { + // check if _worker is exported + if tm.backgroundWorker == nil { + return fmt.Errorf("water: Transport Module is not cancellable") + } + + // create cancel pipe + cancelConnR, cancelConnW, err := socket.UnixConnPair() + if err != nil { + return fmt.Errorf("water: creating cancel pipe failed: %w", err) + } + tm.backgroundWorker.cancelSocket = cancelConnW // host will Write to this pipe to cancel the worker + + // push cancel pipe to store + cancelPipeFd, err := tm.PushConn(cancelConnR) + if err != nil { + return fmt.Errorf("water: pushing cancel pipe to store failed: %w", err) + } + + // pass the fd to the WASM module + ret, err := tm.backgroundWorker._cancel_with.Call(tm.core.Store(), cancelPipeFd) + if err != nil { + return fmt.Errorf("water: calling _water_cancel_with function returned error: %w", err) + } + if ret.(int32) != 0 { + return fmt.Errorf("water: _water_cancel_with returned error: %w", wasm.WASMErr(ret.(int32))) + } + + log.Debugf("water: starting worker thread") + + // in a goroutine, call _worker + go func() { + defer close(tm.backgroundWorker.chanWorkerErr) + ret, err := tm.backgroundWorker._worker.Call(tm.core.Store()) + if err != nil { + // multiple copies in case of multiple receivers on the channel + tm.backgroundWorker.chanWorkerErr <- err + tm.backgroundWorker.chanWorkerErr <- err + tm.backgroundWorker.chanWorkerErr <- err + tm.backgroundWorker.chanWorkerErr <- err + return + } + if ret.(int32) != 0 { + // multiple copies in case of multiple receivers on the channel + tm.backgroundWorker.chanWorkerErr <- wasm.WASMErr(ret.(int32)) + tm.backgroundWorker.chanWorkerErr <- wasm.WASMErr(ret.(int32)) + tm.backgroundWorker.chanWorkerErr <- wasm.WASMErr(ret.(int32)) + tm.backgroundWorker.chanWorkerErr <- wasm.WASMErr(ret.(int32)) + log.Warnf("water: worker thread exited with code %d", ret.(int32)) + } else { + log.Debugf("water: worker thread exited with code 0") + } + }() + + log.Debugf("water: worker thread started") + + // last sanity check if the worker thread crashed immediately even before we return + select { + case err := <-tm.backgroundWorker.chanWorkerErr: // if already returned, basically it failed to start + return fmt.Errorf("water: worker thread returned error: %w", err) + default: + log.Debugf("water: Worker (func, not the worker thread) returning") + return nil + } +} + +// Cancel cancels the worker thread if it is running and returns +// the error returned by the worker thread. This call is designed +// to block until the worker thread exits. +func (tm *TransportModule) Cancel() error { + if tm.backgroundWorker == nil { + return fmt.Errorf("water: Transport Module is not initialized") + } + + if tm.backgroundWorker.cancelSocket == nil { + return fmt.Errorf("water: Transport Module is cancelled") + } + + select { + case err := <-tm.backgroundWorker.chanWorkerErr: // if already returned, we don't need to cancel + if err != nil { + return fmt.Errorf("water: worker thread returned error: %w", err) + } + return nil + default: // otherwise we will need to cancel the worker thread + break + } + + // write to the cancel pipe + if _, err := tm.backgroundWorker.cancelSocket.Write([]byte{0}); err != nil { + return fmt.Errorf("water: writing to cancel pipe failed: %w", err) + } + + // wait for the worker thread to exit + if err := <-tm.backgroundWorker.chanWorkerErr; err != nil { + return fmt.Errorf("water: worker thread returned error: %w", err) + } + + if err := tm.backgroundWorker.cancelSocket.Close(); err != nil { + return fmt.Errorf("water: closing cancel pipe failed: %w", err) + } + + tm.backgroundWorker.cancelSocket = nil + + return nil +} + +// WorkerErrored returns a channel that will be closed when the worker thread exits. +func (tm *TransportModule) WorkerErrored() <-chan error { + if tm.backgroundWorker == nil { + return nil + } + return tm.backgroundWorker.chanWorkerErr +} + +// wasiCtx is an interface used to push files to WASI store. +// +// In (the patched) package wasmtime, WasiCtx, Caller, and Store +// all implement this interface. +type wasiCtx interface { + PushFile(file *os.File, accessMode wasmtime.WasiFileAccessMode) (uint32, error) +} + +func (tm *TransportModule) PushConn(conn net.Conn, wasiCtxOverride ...wasiCtx) (fd int32, err error) { + var wasiCtx wasiCtx = nil + if len(wasiCtxOverride) > 0 { + wasiCtx = wasiCtxOverride[0] + } else { + wasiCtx = tm.core.Store() + } + + tm.gcfixOnce.Do(func() { + if system.GC_BUG { + // create temp file + var f *os.File + f, err = os.CreateTemp("", "water-gcfix") + if err != nil { + return + } + + // push dummy file + fd, err := wasiCtx.PushFile(f, wasmtime.READ_ONLY) + if err != nil { + return + } + + // save dummy file to map + tm.pushedConnMutex.Lock() + tm.pushedConn[int32(fd)] = &struct { + groundTruthConn net.Conn + pushedFile *os.File + }{ + groundTruthConn: nil, + pushedFile: f, + } + tm.pushedConnMutex.Unlock() + + log.Debugf("water: GC fix: pushed dummy file to WASM store with fd %d", fd) + } + }) + + if err != nil { + return wasm.INVALID_FD, fmt.Errorf("water: creating temp file for GC fix: %w", err) + } + + var connFile *os.File + if system.CONN_HALT_BUG { // if the bug needs to be worked around, we use a unix socket to forward all tcp streams + switch conn.(type) { + case *net.TCPConn: + connFile, err = socket.UnixConnFileWrap(conn) + default: + connFile, err = socket.AsFile(conn) + } + } else { + connFile, err = socket.AsFile(conn) + } + if err != nil { + return wasm.INVALID_FD, fmt.Errorf("water: converting conn to file failed: %w", err) + } + + fdu32, err := wasiCtx.PushFile(connFile, wasmtime.READ_WRITE) + if err != nil { + return wasm.INVALID_FD, fmt.Errorf("water: pushing conn file to store failed: %w", err) + } + fd = int32(fdu32) + + tm.pushedConnMutex.Lock() + tm.pushedConn[fd] = &struct { + groundTruthConn net.Conn + pushedFile *os.File + }{ + groundTruthConn: conn, + pushedFile: connFile, + } + tm.pushedConnMutex.Unlock() + + return fd, nil +} + +func (tm *TransportModule) DeferAll() { + tm.deferOnce.Do(func() { // execute all deferred functions if not yet executed + for _, f := range tm.deferredFuncs { + f() + } + }) +} + +func (tm *TransportModule) Defer(f func()) { + tm.deferredFuncs = append(tm.deferredFuncs, f) +} + +func (tm *TransportModule) Cleanup() { + // clean up pushed files + var keyList []int32 + tm.pushedConnMutex.Lock() + for k, v := range tm.pushedConn { + if v != nil { + if v.pushedFile != nil { + v.pushedFile.Close() + v.pushedFile = nil + } + if v.groundTruthConn != nil { + v.groundTruthConn.Close() + v.groundTruthConn = nil + } + } + keyList = append(keyList, k) + } + for _, k := range keyList { + delete(tm.pushedConn, k) + } + tm.pushedConnMutex.Unlock() + + // clean up deferred functions + tm.deferredFuncs = nil +} + +func (tm *TransportModule) pushConfig(caller *wasmtime.Caller) (int32, error) { + // get config file + configFile := tm.core.Config().TMConfig.File() + if configFile == nil { + return wasm.INVALID_FD, nil // we don't return error here so no trap is triggered + } + + // push file to WASM + configFd, err := caller.PushFile(configFile, wasmtime.READ_ONLY) + if err != nil { + return wasm.INVALID_FD, err + } + + return int32(configFd), nil +} + +func (tm *TransportModule) GetPushedConn(fd int32) net.Conn { + tm.pushedConnMutex.RLock() + defer tm.pushedConnMutex.RUnlock() + if tm.pushedConn == nil { + return nil + } + if v, ok := tm.pushedConn[fd]; ok { + return v.groundTruthConn + } + return nil +} diff --git a/internal/v0/wasi_dialer.go b/internal/v0/wasi_dialer.go deleted file mode 100644 index a99daf4..0000000 --- a/internal/v0/wasi_dialer.go +++ /dev/null @@ -1,101 +0,0 @@ -package v0 - -import ( - "fmt" - "net" - "os" - - "github.com/bytecodealliance/wasmtime-go/v13" - "github.com/gaukas/water/internal/socket" - "github.com/gaukas/water/internal/wasm" -) - -// WASIDialer is a convenient wrapper around net.Dialer which -// restricts the dialer to only dialing to a single address on -// a single network. -// -// WASM module will (through WASI) call to the dialer to dial -// for network connections. -type WASIDialer struct { - network string - address string - dialerFunc func(network, address string) (net.Conn, error) - mapFdConn map[int32]net.Conn // saves all the connections created by this WasiDialer by their file descriptors! (So we could close them when needed) - mapFdClonedFile map[int32]*os.File // saves all files so GC won't close them -} - -func MakeWASIDialer( - network, address string, - dialerFunc func(network, address string) (net.Conn, error), -) *WASIDialer { - return &WASIDialer{ - network: network, - address: address, - dialerFunc: dialerFunc, - mapFdConn: make(map[int32]net.Conn), - mapFdClonedFile: make(map[int32]*os.File), - } -} - -func (wd *WASIDialer) WrappedDial() wasm.WASMTIMEStoreIndependentFunction { - return WrapConnectFunc(wd.dial) -} - -// dial(apw i32) -> fd i32 -func (wd *WASIDialer) dial(caller *wasmtime.Caller) (fd int32, err error) { - conn, err := wd.dialerFunc(wd.network, wd.address) - if err != nil { - return wasm.GENERAL_ERROR, fmt.Errorf("dialerFunc: %w", err) - } - - connFile, err := socket.AsFile(conn) - if err != nil { - return wasm.GENERAL_ERROR, fmt.Errorf("socket.AsFile: %w", err) - } - - uintfd, err := caller.PushFile(connFile, wasmtime.READ_WRITE) - if err != nil { - return wasm.WASICTX_ERR, fmt.Errorf("(*wasmtime.Caller).PushFile: %w", err) - } - - wd.mapFdConn[int32(uintfd)] = conn // save the connection by its file descriptor - - // fix: Go GC will close the file descriptor (clone) created by (*net.XxxConn).File() - wd.mapFdClonedFile[int32(uintfd)] = connFile - - return int32(uintfd), nil -} - -func (wd *WASIDialer) GetConnByFd(fd int32) net.Conn { - if wd.mapFdConn == nil { - return nil - } - return wd.mapFdConn[fd] -} - -func (wd *WASIDialer) GetFileByFd(fd int32) *os.File { - if wd.mapFdClonedFile == nil { - return nil - } - return wd.mapFdClonedFile[fd] -} - -func (wd *WASIDialer) CloseAllConn() { - if wd == nil { - return - } - - if wd.mapFdConn != nil { - for k, conn := range wd.mapFdConn { - conn.Close() - delete(wd.mapFdConn, k) - } - } - - if wd.mapFdClonedFile != nil { - for k, file := range wd.mapFdClonedFile { - file.Close() - delete(wd.mapFdClonedFile, k) - } - } -} diff --git a/internal/v0/wasi_listener.go b/internal/v0/wasi_listener.go deleted file mode 100644 index 74f7146..0000000 --- a/internal/v0/wasi_listener.go +++ /dev/null @@ -1,95 +0,0 @@ -package v0 - -import ( - "fmt" - "net" - "os" - - "github.com/bytecodealliance/wasmtime-go/v13" - "github.com/gaukas/water/internal/socket" - "github.com/gaukas/water/internal/wasm" -) - -type WASIListener struct { - listener net.Listener - mapFdConn map[int32]net.Conn // saves all the connections accepted by this WASIListener by their file descriptors! - mapFdClonedFile map[int32]*os.File // saves all files so GC won't close them -} - -func MakeWASIListener(listener net.Listener) *WASIListener { - if listener == nil { - panic("water: NewWASIListener: listener is nil") - } - - return &WASIListener{ - listener: listener, - mapFdConn: make(map[int32]net.Conn), - mapFdClonedFile: make(map[int32]*os.File), - } -} - -func (wl *WASIListener) WrappedAccept() wasm.WASMTIMEStoreIndependentFunction { - return WrapConnectFunc(wl.accept) -} - -func (wl *WASIListener) accept(caller *wasmtime.Caller) (fd int32, err error) { - conn, err := wl.listener.Accept() - if err != nil { - return -1, fmt.Errorf("listener.Accept: %w", err) - } - - connFile, err := socket.AsFile(conn) - if err != nil { - return -1, fmt.Errorf("socket.AsFile: %w", err) - } - - uintfd, err := caller.PushFile(connFile, wasmtime.READ_WRITE) - if err != nil { - return -1, fmt.Errorf("(*wasmtime.Caller).PushFile: %w", err) - } - - wl.mapFdConn[int32(uintfd)] = conn // save the connection by its file descriptor - - // fix: Go GC will close the file descriptor clone created by (*net.XxxConn).File() - wl.mapFdClonedFile[int32(uintfd)] = connFile - - return int32(uintfd), nil -} - -// Close should not be called if the embedded listener is shared across -// multiple WASM instances or WASIListeners. -func (wl *WASIListener) Close() error { - return wl.listener.Close() -} - -func (wl *WASIListener) GetConnByFd(fd int32) net.Conn { - if wl.mapFdConn == nil { - return nil - } - return wl.mapFdConn[fd] -} - -func (wl *WASIListener) GetFileByFd(fd int32) *os.File { - if wl.mapFdClonedFile == nil { - return nil - } - return wl.mapFdClonedFile[fd] -} - -func (wl *WASIListener) CloseAllConn() { - if wl == nil { - return - } - - if wl.mapFdConn != nil { - for _, conn := range wl.mapFdConn { - conn.Close() - } - } - - if wl.mapFdClonedFile != nil { - for _, file := range wl.mapFdClonedFile { - file.Close() - } - } -} diff --git a/internal/v0/wasi_net.go b/internal/v0/wasi_net.go index f773ddb..9a65688 100644 --- a/internal/v0/wasi_net.go +++ b/internal/v0/wasi_net.go @@ -7,8 +7,10 @@ import ( "github.com/gaukas/water/internal/wasm" ) +// WASIConnectFunc is a function that creates a connection. type WASIConnectFunc = func(caller *wasmtime.Caller) (fd int32, err error) +// WASIConnectFuncType is the signature of WASIConnectFunc. var WASIConnectFuncType *wasmtime.FuncType = wasmtime.NewFuncType( []*wasmtime.ValType{}, []*wasmtime.ValType{ @@ -16,6 +18,7 @@ var WASIConnectFuncType *wasmtime.FuncType = wasmtime.NewFuncType( }, ) +// WrapConnectFunc wraps a WASIConnectFunc into a WASM function. func WrapConnectFunc(f WASIConnectFunc) wasm.WASMTIMEStoreIndependentFunction { return func(caller *wasmtime.Caller, vals []wasmtime.Val) ([]wasmtime.Val, *wasmtime.Trap) { if len(vals) != 0 { @@ -31,11 +34,13 @@ func WrapConnectFunc(f WASIConnectFunc) wasm.WASMTIMEStoreIndependentFunction { } } -func WrappedNopWASIConnectFunc() wasm.WASMTIMEStoreIndependentFunction { - return WrapConnectFunc(nopWASIConnectFunc) +// UnimplementedWASIConnectFunc wraps unimplementedWASIConnectFunc into a +// wasmtime-compliant function. +func WrappedUnimplementedWASIConnectFunc() wasm.WASMTIMEStoreIndependentFunction { + return WrapConnectFunc(unimplementedWASIConnectFunc) } -// nopWASIConnectFunc is a WASIConnectFunc that does nothing. -func nopWASIConnectFunc(caller *wasmtime.Caller) (fd int32, err error) { +// unimplementedWASIConnectFunc is a WASIConnectFunc that does nothing. +func unimplementedWASIConnectFunc(_ *wasmtime.Caller) (fd int32, err error) { return wasm.INVALID_FUNCTION, fmt.Errorf("NOP WASIConnectFunc is called") } diff --git a/internal/wasm/net.go b/internal/wasm/net.go index 0919f8a..8226d4d 100644 --- a/internal/wasm/net.go +++ b/internal/wasm/net.go @@ -2,4 +2,6 @@ package wasm import "github.com/bytecodealliance/wasmtime-go/v13" +// WASMTIMEStoreIndependentFunction is a function that takes a store at +// runtime to work with. type WASMTIMEStoreIndependentFunction = func(*wasmtime.Caller, []wasmtime.Val) ([]wasmtime.Val, *wasmtime.Trap) diff --git a/internal/wasm/wasi_config.go b/internal/wasm/wasi_config.go index dad336f..5723eaf 100644 --- a/internal/wasm/wasi_config.go +++ b/internal/wasm/wasi_config.go @@ -10,6 +10,7 @@ type WASIConfigFactory struct { setupFuncs []func(*wasmtime.WasiConfig) error // if any of these functions returns an error, the whole setup will fail. } +// NewWasiConfigFactory creates a new WASIConfigFactory. func NewWasiConfigFactory() *WASIConfigFactory { return &WASIConfigFactory{ setupFuncs: make([]func(*wasmtime.WasiConfig) error, 0), @@ -17,7 +18,11 @@ func NewWasiConfigFactory() *WASIConfigFactory { } func (wcf *WASIConfigFactory) Clone() *WASIConfigFactory { - if wcf == nil || wcf.setupFuncs == nil { + if wcf == nil { + return nil + } + + if wcf.setupFuncs == nil { return NewWasiConfigFactory() } @@ -113,7 +118,6 @@ func (wcf *WASIConfigFactory) InheritStderr() { func (wcf *WASIConfigFactory) SetPreopenDir(path string, guestPath string) { wcf.setupFuncs = append(wcf.setupFuncs, func(wasiConfig *wasmtime.WasiConfig) error { - wasiConfig.PreopenDir(path, guestPath) - return nil + return wasiConfig.PreopenDir(path, guestPath) }) } diff --git a/listener.go b/listener.go index 66d7f2c..747566a 100644 --- a/listener.go +++ b/listener.go @@ -1,110 +1,99 @@ package water import ( - "fmt" + "errors" "net" - "sync/atomic" ) // Listener listens on a local network address and upon caller // calling Accept(), it accepts an incoming connection and -// passes it to the WASM module, which returns a net.Conn to -// caller. +// returns the net.Conn upgraded by the WebAssembly Transport +// Module. // // The structure of a Listener is as follows: // // +---------------+ accept +---------------+ accept -// ---->| |------->| Decode |-------> -// Source | net.Listener | | WASM Runtime | Caller -// <----| |<-------| Decode/Encode |<------- +// ---->| |------->| Downgrade |-------> +// Source | net.Listener | | WebAssembly | Caller +// <----| |<-------| Upgrade |<------- // +---------------+ +---------------+ // \ / // \------Listener------/ // // As shown above, a Listener consists of a net.Listener to accept -// incoming connections and a WASM runtime to handle the incoming -// connections from an external source. The WASM runtime will return -// a net.Conn that caller can Read() from or Write() to. -// -// The WASM module used by a Listener must implement a WASMListener. -type Listener struct { - Config *Config - closed *atomic.Bool +// incoming connections and a WATM to handle the incoming connections +// from an external source. Accept() returns a net.Conn that caller +// can Read()-from or Write()-to. +type Listener interface { + net.Listener + + mustEmbedUnimplementedListener() } -// ListenConfig listens on the network address and returns a Listener -// configured with the given Config. +type newListenerFunc func(*Config) (Listener, error) + +var ( + knownListenerVersions = make(map[string]newListenerFunc) + + ErrListenerAlreadyRegistered = errors.New("water: listener already registered") + ErrListenerVersionNotFound = errors.New("water: listener version not found") + ErrUnimplementedListener = errors.New("water: unimplemented Listener") +) + +// UnimplementedListener is a Listener that always returns errors. // -// This is the recommended way to create a Listener, unless there are -// other requirements such as supplying a custom net.Listener. In that -// case, a Listener could be created with WrapListener() with a Config -// specifying a custom net.Listener. -func (c *Config) Listen(network, address string) (net.Listener, error) { - lis, err := net.Listen(network, address) - if err != nil { - return nil, err - } +// It is used to ensure forward compatibility of the Listener interface. +type UnimplementedListener struct{} - config := c.Clone() - config.NetworkListener = lis +// Accept implements Listener.Accept(). +func (*UnimplementedListener) Accept() (net.Conn, error) { + return nil, ErrUnimplementedListener +} - return &Listener{ - Config: config, - closed: new(atomic.Bool), - }, nil +// Close implements Listener.Close(). +func (*UnimplementedListener) Close() error { + return ErrUnimplementedListener } -// WrapListener creates a Listener with the given Config. -// -// The Config must specify a custom net.Listener, otherwise the -// Accept() method will fail. -func WrapListener(config *Config) *Listener { - return &Listener{ - Config: config, - closed: new(atomic.Bool), - } +// Addr implements Listener.Addr(). +func (*UnimplementedListener) Addr() net.Addr { + return nil } -// Accept waits for and returns the next connection after processing -// the data with the WASM module. -// -// The returned net.Conn implements net.Conn and could be seen as -// the inbound connection with a wrapping transport protocol handled -// by the WASM module. -// -// Implements net.Listener. -func (l *Listener) Accept() (net.Conn, error) { - if l.closed.Load() { - return nil, fmt.Errorf("water: listener is closed") - } +// mustEmbedUnimplementedListener is a function that developers cannot +func (*UnimplementedListener) mustEmbedUnimplementedListener() {} - if l.Config == nil { - return nil, fmt.Errorf("water: dialing with nil config is not allowed") +// RegisterListener registers a Listener function for the given version to +// the global registry. Only registered versions can be recognized and +// used by NewListener(). +func RegisterListener(version string, listener newListenerFunc) error { + if _, ok := knownListenerVersions[version]; ok { + return ErrListenerAlreadyRegistered } + knownListenerVersions[version] = listener + return nil +} - var core *core - var err error - core, err = Core(l.Config) +// NewListener creates a new Listener from the config. +// +// It automatically detects the version of the WebAssembly Transport +// Module specified in the config. +func NewListener(c *Config) (Listener, error) { + core, err := NewCore(c) if err != nil { return nil, err } - return core.AcceptVersion() -} - -// Close closes the listener. -// -// Implements net.Listener. -func (l *Listener) Close() error { - if l.closed.CompareAndSwap(false, true) { - return l.Config.NetworkListener.Close() + // Search through all exported names and match them to potential + // Listener versions. + // + // TODO: detect the version of the WebAssembly Transport Module + // in a more organized way. + for _, export := range core.Module().Exports() { + if f, ok := knownListenerVersions[export.Name()]; ok { + return f(c) + } } - return nil -} -// Addr returns the listener's network address. -// -// Implements net.Listener. -func (l *Listener) Addr() net.Addr { - return l.Config.NetworkListener.Addr() + return nil, ErrListenerVersionNotFound } diff --git a/nogcfix.go b/nogcfix.go deleted file mode 100644 index f925e89..0000000 --- a/nogcfix.go +++ /dev/null @@ -1,8 +0,0 @@ -//go:build nogcfix - -package water - -// If the program is compiled with `go build -tags nogcfix`, the -// GC fix mentioned in gcfix.go will not be applied. Unexpected -// GC behavior is expected ;) -const GCFIX bool = false diff --git a/relay.go b/relay.go index 13783a0..a69853e 100644 --- a/relay.go +++ b/relay.go @@ -1,29 +1,108 @@ -//go:build v1 - package water +import "errors" + // Relay listens on a local network address and handles requests // on incoming connections by passing the incoming connection to -// the WASM module and dial corresponding outbound connections -// to the pre-defined destination address, which can either be a -// remote TCP/UDP address or a unix socket. +// the WebAssembly Transport Module and dial corresponding +// outbound connections to a pre-defined destination address. +// By doing so, WATM upgrades the incoming connection. // // The structure of a Relay is as follows: // // accept +---------------+ +---------------+ dial -// ------->| |----->| Decode |-----> -// Source | net.Listener | | WASM Runtime | Remote -// <-------| |<-----| Decode/Encode |<----- +// ------->| |----->| Upgrade |-----> +// Source | net.Listener | | WebAssembly | Remote +// <-------| |<-----| Downgrade |<----- // +---------------+ +---------------+ // \ / // \------Relay-------/ +type Relay interface { + // RelayTo relays the incoming connection to the address specified + // by network and address. + RelayTo(network, address string) error + + // ListenAndRelayTo listens on the local network address and relays + // the incoming connection to the address specified by rnetwork + // and raddress. + ListenAndRelayTo(lnetwork, laddress, rnetwork, raddress string) error + + // Close closes the relay. No further incoming connections will be + // accepted and no further outbound connections will be dialed. It + // does not close the established connections. + Close() error + + mustEmbedUnimplementedRelay() +} + +type newRelayFunc func(*Config) (Relay, error) + +var ( + knownRelayVersions = make(map[string]newRelayFunc) + + ErrRelayAlreadyRegistered = errors.New("water: relay already registered") + ErrRelayVersionNotFound = errors.New("water: relay version not found") + ErrUnimplementedRelay = errors.New("water: unimplemented relay") + + ErrRelayAlreadyStarted = errors.New("water: relay already started") // RelayTo and ListenAndRelayTo may return this error if a relay was reused. +) + +// UnimplementedRelay is a Relay that always returns errors. // -// As shown above, a Relay consists of a net.Listener to accept -// incoming connections and a WASM runtime to handle the incoming -// connections from an external source. The WASM runtime will dial -// the corresponding outbound connections to a pre-defined -// destination address. It requires no further caller interaction -// once it is started. +// It is used to ensure forward compatibility of the Relay interface. +type UnimplementedRelay struct{} + +// RelayTo implements Relay.RelayTo(). +func (*UnimplementedRelay) RelayTo(_, _ string) error { + return ErrUnimplementedRelay +} + +// ListenAndRelayTo implements Relay.ListenAndRelayTo(). +func (*UnimplementedRelay) ListenAndRelayTo(_, _, _, _ string) error { + return ErrUnimplementedRelay +} + +// Close implements Relay.Close(). +func (*UnimplementedRelay) Close() error { + return ErrUnimplementedRelay +} + +// mustEmbedUnimplementedRelay is a function that developers cannot +// manually implement. It is used to ensure forward compatibility of +// the Relay interface. +func (*UnimplementedRelay) mustEmbedUnimplementedRelay() {} + +// RegisterRelay registers a relay function for the given version to +// the global registry. Only registered versions can be recognized and +// used by NewRelay(). +func RegisterRelay(version string, relay newRelayFunc) error { + if _, ok := knownRelayVersions[version]; ok { + return ErrRelayAlreadyRegistered + } + knownRelayVersions[version] = relay + return nil +} + +// NewRelay creates a new Relay from the config. // -// The WASM module used by a Relay must implement a WASMDialer. -type Relay struct{} +// It automatically detects the version of the WebAssembly Transport +// Module specified in the config. +func NewRelay(c *Config) (Relay, error) { + core, err := NewCore(c) + if err != nil { + return nil, err + } + + // Search through all exported names and match them to potential + // Listener versions. + // + // TODO: detect the version of the WebAssembly Transport Module + // in a more organized way. + for _, export := range core.Module().Exports() { + if f, ok := knownRelayVersions[export.Name()]; ok { + return f(c) + } + } + + return nil, ErrRelayVersionNotFound +} diff --git a/testdata/hexencoder_v0.dialer.json b/testdata/hexencoder_v0.dialer.json deleted file mode 100644 index b1ee3f0..0000000 --- a/testdata/hexencoder_v0.dialer.json +++ /dev/null @@ -1,4 +0,0 @@ -{ - "role": "dialer", - "mode": "uppercase" -} \ No newline at end of file diff --git a/testdata/hexencoder_v0.listener.json b/testdata/hexencoder_v0.listener.json deleted file mode 100644 index c7b49c2..0000000 --- a/testdata/hexencoder_v0.listener.json +++ /dev/null @@ -1,4 +0,0 @@ -{ - "role": "listener", - "mode": "lowercase" -} \ No newline at end of file diff --git a/testdata/hexencoder_v0.wasm b/testdata/hexencoder_v0.wasm deleted file mode 100644 index 755133d..0000000 Binary files a/testdata/hexencoder_v0.wasm and /dev/null differ diff --git a/testdata/plain_v0.wasm b/testdata/plain_v0.wasm index 011021a..ebeeb6c 100644 Binary files a/testdata/plain_v0.wasm and b/testdata/plain_v0.wasm differ diff --git a/transport/v0/conn.go b/transport/v0/conn.go new file mode 100644 index 0000000..71d296a --- /dev/null +++ b/transport/v0/conn.go @@ -0,0 +1,314 @@ +//go:build !exclude_v0 + +package v0 + +import ( + "errors" + "fmt" + "io" + "net" + "sync" + "sync/atomic" + "time" + + "github.com/gaukas/water" + "github.com/gaukas/water/internal/log" + "github.com/gaukas/water/internal/socket" + v0 "github.com/gaukas/water/internal/v0" +) + +// Conn is the first experimental version of Conn implementation. +type Conn struct { + // callerConn is used by DialV0() and AcceptV0(). It is used to talk to + // the caller of water API by allowing the caller to Read() and Write() to it. + callerConn net.Conn // the connection from the caller, usually a *net.UnixConn + + // srcConn is used by AcceptV0() and RelayV0(). It is used + // to talk to a remote source by accepting a connection from it. + srcConn net.Conn // the connection from the remote source, usually a *net.TCPConn + + // dstConn is used by DialV0() and RelayV0(). It is used + // to talk to a remote destination by actively dialing to it. + dstConn net.Conn // the connection to the remote destination, usually a *net.TCPConn + + tm *v0.TransportModule + + closeOnce *sync.Once + closed atomic.Bool + + water.UnimplementedConn // embedded to ensure forward compatibility +} + +// dial dials the network address using through the WASM module +// while using the dialerFunc specified in core.config. +func dial(core water.Core, network, address string) (c water.Conn, err error) { + tm := v0.Core2TransportModule(core) + conn := &Conn{ + tm: tm, + closeOnce: &sync.Once{}, + } + + dialer := v0.NewManagedDialer(network, address, core.Config().NetworkDialerFuncOrDefault()) + + if err = conn.tm.LinkNetworkInterface(dialer, nil); err != nil { + return nil, err + } + + if err = conn.tm.Initialize(); err != nil { + return nil, err + } + + reverseCallerConn, callerConn, err := socket.UnixConnPair() + // wasmCallerConn, conn.uoConn, err = socket.TCPConnPair() + if err != nil { + if reverseCallerConn == nil || callerConn == nil { + return nil, fmt.Errorf("water: socket.UnixConnPair returned error: %w", err) + } else { // likely due to Close() call errored + log.Errorf("water: socket.UnixConnPair returned error: %v", err) + } + } + conn.callerConn = callerConn + + conn.dstConn, err = conn.tm.DialFrom(reverseCallerConn) + if err != nil { + return nil, err + } + + if err := conn.tm.Worker(); err != nil { + return nil, err + } + + log.Debugf("water: DialV0: conn.tm.Worker() returned") + + // safety: we need to watch for the blocking worker thread's status. + // If it returns, no further data can be processed by the WASM module + // and we need to close this connection in that case. + go func() { + <-conn.tm.WorkerErrored() + log.Debugf("water: DialV0: worker thread returned") + conn.Close() + }() + + return conn, nil +} + +// accept accepts the network connection using through the WASM module +// while using the net.Listener specified in core.config. +func accept(core water.Core) (c water.Conn, err error) { + tm := v0.Core2TransportModule(core) + conn := &Conn{ + tm: tm, + closeOnce: &sync.Once{}, + } + + if err = conn.tm.LinkNetworkInterface(nil, core.Config().NetworkListenerOrPanic()); err != nil { + return nil, err + } + + if err = conn.tm.Initialize(); err != nil { + return nil, err + } + + reverseCallerConn, callerConn, err := socket.UnixConnPair() + if err != nil { + if reverseCallerConn == nil || callerConn == nil { + return nil, fmt.Errorf("water: socket.UnixConnPair returned error: %w", err) + } else { // likely due to Close() call errored + log.Errorf("water: socket.UnixConnPair returned error: %v", err) + } + } else if reverseCallerConn == nil || callerConn == nil { + return nil, errors.New("water: socket.UnixConnPair returned nil") + } + + conn.callerConn = callerConn + + conn.srcConn, err = conn.tm.AcceptFor(reverseCallerConn) + if err != nil { + return nil, err + } + + if err := conn.tm.Worker(); err != nil { + return nil, err + } + + // safety: we need to watch for the blocking worker thread's status. + // If it returns, no further data can be processed by the WASM module + // and we need to close this connection in that case. + go func() { + <-conn.tm.WorkerErrored() + conn.Close() + }() + + return conn, nil +} + +func relay(core water.Core, network, address string) (c water.Conn, err error) { + tm := v0.Core2TransportModule(core) + conn := &Conn{ + tm: tm, + closeOnce: &sync.Once{}, + } + + dialer := v0.NewManagedDialer(network, address, core.Config().NetworkDialerFuncOrDefault()) + + if err = conn.tm.LinkNetworkInterface(dialer, core.Config().NetworkListenerOrPanic()); err != nil { + return nil, err + } + + if err = conn.tm.Initialize(); err != nil { + return nil, err + } + + if err := conn.tm.Associate(); err != nil { + return nil, err + } + + if err := conn.tm.Worker(); err != nil { + return nil, err + } + + // safety: we need to watch for the blocking worker thread's status. + // If it returns, no further data can be processed by the WASM module + // and we need to close this connection in that case. + go func() { + <-conn.tm.WorkerErrored() + conn.Close() + }() + + return conn, nil +} + +// Read implements the net.Conn interface. +// +// It calls to the underlying user-oriented net.Conn's Read() method. +func (c *Conn) Read(b []byte) (n int, err error) { + if c.callerConn == nil { + return 0, errors.New("water: cannot read, (*RuntimeConn).uoConn is nil") + } + + return c.callerConn.Read(b) +} + +// Write implements the net.Conn interface. +// +// It calls to the underlying user-oriented net.Conn's Write() method. +func (c *Conn) Write(b []byte) (n int, err error) { + if c.callerConn == nil { + return 0, errors.New("water: cannot write, (*RuntimeConn).uoConn is nil") + } + + n, err = c.callerConn.Write(b) + if err != nil { + return n, fmt.Errorf("uoConn.Write: %w", err) + } + + if n == len(b) { + return n, nil + } else if n < len(b) { + return n, io.ErrShortWrite + } else { + return n, errors.New("invalid write result") // io.errInvalidWrite + } +} + +// Close implements the net.Conn interface. +// +// It will close both the network connection AND the WASM module, then +// the user-facing net.Conn will be closed. +func (c *Conn) Close() (err error) { + if !c.closed.CompareAndSwap(false, true) { + err = errors.New("water: already closed") + return err + } + + c.closeOnce.Do(func() { + log.Debugf("Defering TM") + c.tm.DeferAll() + log.Debugf("Cleaning TM") + c.tm.Cleanup() + log.Debugf("Canceling TM") + err = c.tm.Cancel() + log.Debugf("TM canceled") + }) + + return err +} + +// LocalAddr implements the net.Conn interface. +// +// It calls to the underlying network connection's LocalAddr() method. +// For Listener and Relay, the network connection of interest is the srcConn. +// And for Dialer, the network connection of interest is the dstConn. +func (c *Conn) LocalAddr() net.Addr { + // for Listener and Relay, the srcConn is of interest + if c.srcConn != nil { + return c.srcConn.LocalAddr() + } + return c.dstConn.LocalAddr() // for dialer +} + +// RemoteAddr implements the net.Conn interface. +// +// It calls to the underlying network connection's RemoteAddr() method. +// For Listener and Relay, the network connection of interest is the srcConn. +// And for Dialer, the network connection of interest is the dstConn. +func (c *Conn) RemoteAddr() net.Addr { + // for Listener and Relay, the srcConn is of interest + if c.srcConn != nil { + return c.srcConn.RemoteAddr() + } + return c.dstConn.RemoteAddr() // for dialer +} + +// SetDeadline implements the net.Conn interface. +// +// It calls to the underlying user-oriented connection's SetDeadline() method. +func (c *Conn) SetDeadline(t time.Time) (err error) { + // SetDeadline is only available to Dialer/Listener. But not Relay. + if c.callerConn == nil { + return errors.New("water: cannot set deadline, (*RuntimeConn).callerConn is nil") + } + + // note: the deadline needs to be set on the actually pushed connection, + // which is not necessarily the networkConn. (there would be middleware conns) + + if c.dstConn != nil { + err = c.dstConn.SetDeadline(t) + if err != nil { + return err + } + } + + if c.srcConn != nil { + err = c.srcConn.SetDeadline(t) + if err != nil { + return err + } + } + + return c.callerConn.SetDeadline(t) +} + +// SetReadDeadline implements the net.Conn interface. +// +// It calls to the underlying user-oriented connection's SetReadDeadline() method. +func (c *Conn) SetReadDeadline(t time.Time) error { + // SetReadDeadline is only available to Dialer/Listener. But not Relay. + if c.callerConn == nil { + return errors.New("water: cannot set deadline, (*RuntimeConn).callerConn is nil") + } + + return c.callerConn.SetReadDeadline(t) +} + +// SetWriteDeadline implements the net.Conn interface. +// +// It calls to the underlying user-oriented connection's SetWriteDeadline() method. +func (c *Conn) SetWriteDeadline(t time.Time) error { + // SetWriteDeadline is only available to Dialer/Listener. But not Relay. + if c.callerConn == nil { + return errors.New("water: cannot set deadline, (*RuntimeConn).callerConn is nil") + } + + return c.callerConn.SetWriteDeadline(t) +} diff --git a/transport/v0/conn_test.go b/transport/v0/conn_test.go new file mode 100644 index 0000000..d00f37b --- /dev/null +++ b/transport/v0/conn_test.go @@ -0,0 +1,237 @@ +// a //go:build unix && !windows && !exclude_v0 + +package v0_test + +import ( + "crypto/rand" + "net" + "os" + "runtime" + "sync" + "testing" + "time" + + "github.com/gaukas/water" + _ "github.com/gaukas/water/transport/v0" +) + +// hexencoder_v0 []byte +var plain_v0 []byte + +func BenchmarkConnV0(b *testing.B) { + // read file into plain_v0 + var err error + plain_v0, err = os.ReadFile("../../testdata/plain_v0.wasm") + if err != nil { + b.Fatal(err) + } + b.Run("PlainV0-Dialer", benchmarkPlainV0Dialer) + b.Run("PlainV0-Listener", benchmarkPlainV0Listener) + b.Run("RefTCP", benchmarkReferenceTCP) +} + +func benchmarkPlainV0Dialer(b *testing.B) { + // create random TCP listener listening on localhost + tcpLis, err := net.ListenTCP("tcp", nil) + if err != nil { + b.Fatal(err) + } + defer tcpLis.Close() + + // b.Logf("listener: %s", tcpLis.Addr()) + + // goroutine to accept incoming connections + var lisConn net.Conn + var goroutineErr error + var wg *sync.WaitGroup = new(sync.WaitGroup) + wg.Add(1) + go func() { + defer wg.Done() + lisConn, goroutineErr = tcpLis.Accept() + }() + + // Dial + config := &water.Config{ + TMBin: plain_v0, + } + // config.WASIConfig().InheritStdout() + dialer, err := water.NewDialer(config) + if err != nil { + b.Fatal(err) + } + + rConn, err := dialer.Dial("tcp", tcpLis.Addr().String()) + if err != nil { + b.Fatal(err) + } + defer rConn.Close() + + // b.Logf("dialer: %s, listener: %s", rConn.LocalAddr(), rConn.RemoteAddr()) + + // wait for listener to accept connection + wg.Wait() + if goroutineErr != nil { + b.Fatal(goroutineErr) + } + + var sendMsg []byte = make([]byte, 1024) + _, err = rand.Read(sendMsg) + if err != nil { + b.Fatalf("rand.Read error: %s", err) + } + + // b.Logf("sendMsg: %s", sendMsg) + + runtime.GC() + time.Sleep(10 * time.Millisecond) + + b.SetBytes(1024) + b.ResetTimer() + start := time.Now() + for i := 0; i < b.N; i++ { + // b.Logf("writing...") + _, err = rConn.Write(sendMsg) + if err != nil { + b.Logf("Write error, cntr: %d, N: %d", i, b.N) + b.Fatal(err) + } + + // b.Logf("reading...") + buf := make([]byte, 1024+128) + _, err = lisConn.Read(buf) + if err != nil { + b.Logf("Read error, cntr: %d, N: %d", i, b.N) + b.Fatal(err) + } + } + b.StopTimer() + b.Logf("avg bandwidth: %f MB/s (N=%d)", float64(b.N*1024)/time.Since(start).Seconds()/1024/1024, b.N) +} + +func benchmarkPlainV0Listener(b *testing.B) { + // prepare for listener + config := &water.Config{ + TMBin: plain_v0, + } + + lis, err := config.Listen("tcp", "localhost:0") + if err != nil { + b.Fatal(err) + } + + // goroutine to dial listener + var dialConn net.Conn + var goroutineErr error + var wg *sync.WaitGroup = new(sync.WaitGroup) + wg.Add(1) + go func() { + defer wg.Done() + dialConn, goroutineErr = net.Dial("tcp", lis.Addr().String()) + }() + + // Accept + rConn, err := lis.Accept() + if err != nil { + b.Fatal(err) + } + + // wait for dialer to dial + wg.Wait() + if goroutineErr != nil { + b.Fatal(goroutineErr) + } + + var sendMsg []byte = make([]byte, 512) + _, err = rand.Read(sendMsg) + if err != nil { + b.Fatalf("rand.Read error: %s", err) + } + + b.SetBytes(1024) // we will send 512-byte data and 128-byte will be transmitted on wire due to hex encoding + b.ResetTimer() + start := time.Now() + for i := 0; i < b.N; i++ { + _, err = rConn.Write(sendMsg) + if err != nil { + b.Logf("Write error, cntr: %d, N: %d", i, b.N) + b.Fatal(err) + } + + // receive data + buf := make([]byte, 1024) + _, err = dialConn.Read(buf) + if err != nil { + b.Logf("Read error, cntr: %d, N: %d", i, b.N) + b.Fatal(err) + } + } + b.StopTimer() + b.Logf("avg bandwidth: %f MB/s (N=%d)", float64(b.N*1024)/time.Since(start).Seconds()/1024/1024, b.N) + + if err = rConn.Close(); err != nil { + b.Fatal(err) + } +} + +func benchmarkReferenceTCP(b *testing.B) { + // create random TCP listener listening on localhost + tcpLis, err := net.ListenTCP("tcp", nil) + if err != nil { + b.Fatal(err) + } + defer tcpLis.Close() + + // goroutine to accept incoming connections + var lisConn net.Conn + var goroutineErr error + var wg *sync.WaitGroup = new(sync.WaitGroup) + wg.Add(1) + go func() { + defer wg.Done() + lisConn, goroutineErr = tcpLis.Accept() + }() + + nConn, err := net.Dial("tcp", tcpLis.Addr().String()) + if err != nil { + b.Fatal(err) + } + + // wait for listener to accept connection + wg.Wait() + if goroutineErr != nil { + b.Fatal(goroutineErr) + } + + var sendMsg []byte = make([]byte, 1024) + _, err = rand.Read(sendMsg) + if err != nil { + b.Fatalf("rand.Read error: %s", err) + } + + b.SetBytes(1024) + b.ResetTimer() + start := time.Now() + for i := 0; i < b.N; i++ { + _, err = nConn.Write(sendMsg) + if err != nil { + b.Logf("Write error, cntr: %d, N: %d", i, b.N) + b.Fatal(err) + } + + // receive data + buf := make([]byte, 1024) + _, err = lisConn.Read(buf) + if err != nil { + b.Logf("Read error, cntr: %d, N: %d", i, b.N) + b.Fatal(err) + } + + // time.Sleep(10 * time.Microsecond) + } + b.StopTimer() + b.Logf("avg bandwidth: %f MB/s (N=%d)", float64(b.N*1024)/time.Since(start).Seconds()/1024/1024, b.N) + + if err = nConn.Close(); err != nil { + b.Fatal(err) + } +} diff --git a/transport/v0/dialer.go b/transport/v0/dialer.go new file mode 100644 index 0000000..6b74f5f --- /dev/null +++ b/transport/v0/dialer.go @@ -0,0 +1,63 @@ +//go:build !exclude_v0 + +package v0 + +import ( + "context" + "fmt" + + "github.com/gaukas/water" +) + +func init() { + err := water.RegisterDialer("_water_v0", NewDialer) + if err != nil { + panic(err) + } +} + +// Dialer implements water.Dialer utilizing Water WATM API v0. +type Dialer struct { + config *water.Config + + water.UnimplementedDialer // embedded to ensure forward compatibility +} + +// NewDialer creates a new Dialer. +func NewDialer(c *water.Config) (water.Dialer, error) { + return &Dialer{ + config: c.Clone(), + }, nil +} + +// Dial dials the network address using the dialerFunc specified in config. +// +// Dial interfaces water.Dialer. +func (d *Dialer) Dial(network, address string) (conn water.Conn, err error) { + return d.DialContext(context.Background(), network, address) +} + +func (d *Dialer) DialContext(ctx context.Context, network, address string) (conn water.Conn, err error) { + if d.config == nil { + return nil, fmt.Errorf("water: dialing with nil config is not allowed") + } + + ctxReady, dialReady := context.WithCancel(context.Background()) + go func() { + defer dialReady() + var core water.Core + core, err = water.NewCore(d.config) + if err != nil { + return + } + + conn, err = dial(core, network, address) + }() + + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-ctxReady.Done(): + return conn, err + } +} diff --git a/transport/v0/listener.go b/transport/v0/listener.go new file mode 100644 index 0000000..9cab4ad --- /dev/null +++ b/transport/v0/listener.go @@ -0,0 +1,78 @@ +//go:build !exclude_v0 + +package v0 + +import ( + "fmt" + "net" + "sync/atomic" + + "github.com/gaukas/water" +) + +func init() { + err := water.RegisterListener("_water_v0", NewListener) + if err != nil { + panic(err) + } +} + +// Listener implements water.Listener utilizing Water WATM API v0. +type Listener struct { + config *water.Config + closed *atomic.Bool + + water.UnimplementedListener // embedded to ensure forward compatibility +} + +// NewListener creates a new Listener. +func NewListener(c *water.Config) (water.Listener, error) { + return &Listener{ + config: c.Clone(), + closed: new(atomic.Bool), + }, nil +} + +// Accept waits for and returns the next connection after processing +// the data with the WASM module. +// +// The returned net.Conn implements net.Conn and could be seen as +// the inbound connection with a wrapping transport protocol handled +// by the WASM module. +// +// Implements net.Listener. +func (l *Listener) Accept() (net.Conn, error) { + if l.closed.Load() { + return nil, fmt.Errorf("water: listener is closed") + } + + if l.config == nil { + return nil, fmt.Errorf("water: accept with nil config is not allowed") + } + + var core water.Core + var err error + core, err = water.NewCore(l.config) + if err != nil { + return nil, err + } + + return accept(core) +} + +// Close closes the listener. +// +// Implements net.Listener. +func (l *Listener) Close() error { + if l.closed.CompareAndSwap(false, true) { + return l.config.NetworkListener.Close() + } + return nil +} + +// Addr returns the listener's network address. +// +// Implements net.Listener. +func (l *Listener) Addr() net.Addr { + return l.config.NetworkListener.Addr() +} diff --git a/transport/v0/relay.go b/transport/v0/relay.go new file mode 100644 index 0000000..75d9038 --- /dev/null +++ b/transport/v0/relay.go @@ -0,0 +1,125 @@ +//go:build !exclude_v0 + +package v0 + +import ( + "fmt" + "net" + "sync/atomic" + + "github.com/gaukas/water" +) + +func init() { + err := water.RegisterRelay("_water_v0", NewRelay) + if err != nil { + panic(err) + } +} + +// Relay implements water.Relay utilizing Water WATM API v0. +type Relay struct { + config *water.Config + started *atomic.Bool + closed *atomic.Bool + + dialNetwork, dialAddress string + + water.UnimplementedRelay // embedded to ensure forward compatibility +} + +// NewRelay creates a relay with the given Config without starting +// it. To start the relay, call Start(). +func NewRelay(c *water.Config) (water.Relay, error) { + return &Relay{ + config: c.Clone(), + started: new(atomic.Bool), + closed: new(atomic.Bool), + }, nil +} + +// RelayTo implements Relay.RelayTo(). +func (r *Relay) RelayTo(network, address string) error { + if r.started.Load() { + return water.ErrRelayAlreadyStarted + } + + if r.config == nil { + return fmt.Errorf("water: relaying with nil config is not allowed") + } + + r.dialNetwork = network + r.dialAddress = address + + var core water.Core + var err error + for !r.closed.Load() { + core, err = water.NewCore(r.config) + if err != nil { + return err + } + + _, err = relay(core, network, address) + if err != nil { + if !r.closed.Load() { // errored before closing + return err + } + break + } + } + + return nil +} + +// ListenAndRelayTo implements Relay.ListenAndRelayTo(). +func (r *Relay) ListenAndRelayTo(lnetwork, laddress, rnetwork, raddress string) error { + if r.started.Load() { + return water.ErrRelayAlreadyStarted + } + + lis, err := net.Listen(lnetwork, laddress) + if err != nil { + return err + } + + config := r.config.Clone() + config.NetworkListener = lis + r.config = config + + if r.config == nil { + return fmt.Errorf("water: relaying with nil config is not allowed") + } + + r.dialNetwork = rnetwork + r.dialAddress = raddress + + var core water.Core + for !r.closed.Load() { + core, err = water.NewCore(r.config) + if err != nil { + return err + } + + _, err = relay(core, rnetwork, raddress) + if err != nil { + if !r.closed.Load() { // errored before closing + return err + } + break + } + } + + return nil +} + +func (r *Relay) Close() error { + if !r.closed.CompareAndSwap(false, true) { + return nil + } + + if r.config != nil { + r.config.NetworkListener.Close() + } + + return nil +} diff --git a/transport_module_config.go b/transport_module_config.go new file mode 100644 index 0000000..504d316 --- /dev/null +++ b/transport_module_config.go @@ -0,0 +1,28 @@ +package water + +import ( + "os" + + "github.com/gaukas/water/internal/log" +) + +// TMConfig defines the configuration file used by the WebAssembly Transport Module. +type TMConfig struct { + FilePath string // Path to the config file. +} + +// File opens the config file and returns the file descriptor. +func (c *TMConfig) File() *os.File { + if c.FilePath == "" { + log.Errorf("water: WASM config file path is not provided in config") + return nil + } + + f, err := os.Open(c.FilePath) + if err != nil { + log.Errorf("water: failed to open WATM config file: %v", err) + return nil + } + + return f +} diff --git a/wasm_api_v0.go b/wasm_api_v0.go deleted file mode 100644 index d1a3490..0000000 --- a/wasm_api_v0.go +++ /dev/null @@ -1,364 +0,0 @@ -//go:build !nov0 - -package water - -import ( - "fmt" - "net" - "os" - "runtime" - "sync" - - "github.com/bytecodealliance/wasmtime-go/v13" - "github.com/gaukas/water/internal/socket" - v0 "github.com/gaukas/water/internal/v0" - "github.com/gaukas/water/internal/wasm" -) - -// WASMv0 is a wrapper around core which provides extended functionalities -// for WASM runtime in V0 spec. -type WASMv0 struct { - *core - - _init *wasmtime.Func // _init() -> i32 - - // _dial: - // - Calls to `env.host_dial() -> fd: i32` to dial a network connection (wrapped with the - // application protocol) and bind it to one of its file descriptors, record the fd as - // `remoteConnFd`. This will be the fd it used to read/write data from/to the remote - // destination. - // - Records the `callerConnFd`. This will be the fd it used to read/write data from/to - // the caller. - // - Returns `remoteConnFd` to the caller to be kept track of. - _dial *wasmtime.Func // _dial(callerConnFd i32) (remoteConnFd i32) - - // _accept: - // - Calls to `env.host_accept() -> fd: i32` to accept a network connection (wrapped with the - // application protocol) and bind it to one of its file descriptors, record the fd as - // `sourceConnFd`. This will be the fd it used to read/write data from/to the source - // address. - // - Records the `callerConnFd`. This will be the fd it used to read/write data from/to - // the caller. - // - Returns `sourceConnFd` to the caller to be kept track of. - _accept *wasmtime.Func // _accept(callerConnFd i32) (sourceConnFd i32) - - // _read: - // - if `callerConnFd` is invalid, this will return an error. - // - if `sourceConnFd` is valid, this will read from `sourceConnFd` and write to `callerConnFd`. - // - if `remoteConnFd` is valid, this will read from `remoteConnFd` and write to `callerConnFd`. - _read *wasmtime.Func // _read() (err int32) - - // _write: - // - if `callerConnFd` is invalid, this will return an error. - // - if `sourceConnFd` is valid, this will read from `callerConnFd` and write to `sourceConnFd`. - // - if `remoteConnFd` is valid, this will read from `callerConnFd` and write to `remoteConnFd`. - _write *wasmtime.Func // _write() (err int32) - - // _close: - // - Closes the all the file descriptors it owns. - // - Cleans up any other resouce it allocated within the WASM module. - // - Calls back to runtime by calling `env.host_defer` for the runtime to self-clean. - _close *wasmtime.Func - - dialer *v0.WASIDialer - listener *v0.WASIListener - - gcfixOnce *sync.Once - pushedConn map[int32]*struct { - conn net.Conn - file *os.File - } - - deferOnce *sync.Once - deferredFuncs []func() -} - -func NewWASMv0(core *core) *WASMv0 { - wasm := &WASMv0{ - core: core, - gcfixOnce: new(sync.Once), - pushedConn: make(map[int32]*struct { - conn net.Conn - file *os.File - }), - deferOnce: new(sync.Once), - deferredFuncs: make([]func(), 0), - } - - runtime.SetFinalizer(wasm, func(w *WASMv0) { - w.DeferAll() - w.Cleanup() - }) - - return wasm -} - -func (w *WASMv0) LinkNetworkInterface(dialer *v0.WASIDialer, listener *v0.WASIListener) error { - if w.Linker() == nil { - return fmt.Errorf("water: linker not set, is Core initialized?") - } - - // import host_dial - if dialer != nil { - if err := w.Linker().FuncNew("env", "host_dial", v0.WASIConnectFuncType, dialer.WrappedDial()); err != nil { - return fmt.Errorf("water: linking WASI dialer, (*wasmtime.Linker).FuncNew: %w", err) - } - } else { - if err := w.Linker().FuncNew("env", "host_dial", v0.WASIConnectFuncType, v0.WrappedNopWASIConnectFunc()); err != nil { - return fmt.Errorf("water: linking NOP dialer, (*wasmtime.Linker).FuncNew: %w", err) - } - } - w.dialer = dialer - - // import host_accept - if listener != nil { - if err := w.Linker().FuncNew("env", "host_accept", v0.WASIConnectFuncType, listener.WrappedAccept()); err != nil { - return fmt.Errorf("water: linking WASI listener, (*wasmtime.Linker).FuncNew: %w", err) - } - } else { - if err := w.Linker().FuncNew("env", "host_accept", v0.WASIConnectFuncType, v0.WrappedNopWASIConnectFunc()); err != nil { - return fmt.Errorf("water: linking NOP listener, (*wasmtime.Linker).FuncNew: %w", err) - } - } - w.listener = listener - - return nil -} - -// Initialize initializes the WASMv0 runtime by getting all the exported functions from -// the WASM module. -// -// All imports must be set before calling this function. -func (w *WASMv0) Initialize() error { - if w.core == nil { - return fmt.Errorf("water: no core loaded") - } - - var err error - // import host_defer function - if err = w.Linker().FuncWrap("env", "host_defer", func() { - w.DeferAll() - }); err != nil { - return fmt.Errorf("water: linking deferh function, (*wasmtime.Linker).FuncWrap: %w", err) - } - - // import pull_config function (it is called pushConfig here in the host) - if err := w.Linker().FuncNew("env", "pull_config", v0.WASIConnectFuncType, v0.WrapConnectFunc(w.pushConfig)); err != nil { - return fmt.Errorf("water: linking pull_config function, (*wasmtime.Linker).FuncNew: %w", err) - } - - // instantiate the WASM module - if err = w.Instantiate(); err != nil { - return err - } - - // _init - w._init = w.Instance().GetFunc(w.Store(), "_init") - if w._init == nil { - return fmt.Errorf("water: WASM module does not export _init") - } - - // _dial - w._dial = w.Instance().GetFunc(w.Store(), "_dial") - // if w._dial == nil { - // return fmt.Errorf("water: WASM module does not export _dial") - // } - - // _accept - w._accept = w.Instance().GetFunc(w.Store(), "_accept") - // if w._accept == nil { - // return fmt.Errorf("water: WASM module does not export _accept") - // } - - // _close - w._close = w.Instance().GetFunc(w.Store(), "_close") - if w._close == nil { - return fmt.Errorf("water: WASM module does not export _close") - } - - // call _init - ret, err := w._init.Call(w.Store()) - if err != nil { - return fmt.Errorf("water: calling _init function returned error: %w", err) - } - - return wasm.WASMErr(ret.(int32)) -} - -// Caller need to make sure anything caller writes to the WASM module is -// readable on the callerConn. -func (w *WASMv0) InitializeReadWriter() error { - // _read - w._read = w.Instance().GetFunc(w.Store(), "_read") - if w._read == nil { - return fmt.Errorf("water: WASM module does not export _read") - } - - // _write - w._write = w.Instance().GetFunc(w.Store(), "_write") - if w._write == nil { - return fmt.Errorf("water: WASM module does not export _write") - } - - return nil -} - -func (w *WASMv0) DialFrom(callerConn net.Conn) (destConn net.Conn, err error) { - callerFd, err := w.PushConn(callerConn) - if err != nil { - return nil, fmt.Errorf("water: pushing caller conn to store failed: %w", err) - } - - ret, err := w._dial.Call(w.Store(), callerFd) - if err != nil { - return nil, fmt.Errorf("water: calling _dial function returned error: %w", err) - } - - if remoteFd, ok := ret.(int32); !ok { - return nil, fmt.Errorf("water: invalid _dial function signature") - } else { - if remoteFd < 0 { - return nil, wasm.WASMErr(remoteFd) - } else { - destConn := w.dialer.GetConnByFd(remoteFd) - if destConn == nil { - return nil, fmt.Errorf("water: failed to look up network connection by fd") - } - return destConn, nil - } - } -} - -func (w *WASMv0) AcceptFor(callerConn net.Conn) (sourceConn net.Conn, err error) { - callerFd, err := w.PushConn(callerConn) - if err != nil { - return nil, fmt.Errorf("water: pushing caller conn to store failed: %w", err) - } - - ret, err := w._accept.Call(w.Store(), callerFd) - if err != nil { - return nil, fmt.Errorf("water: calling _accept function returned error: %w", err) - } - - if sourceFd, ok := ret.(int32); !ok { - return nil, fmt.Errorf("water: invalid _accept function signature") - } else { - if sourceFd < 0 { - return nil, wasm.WASMErr(sourceFd) - } else { - sourceConn := w.listener.GetConnByFd(sourceFd) - if sourceConn == nil { - return nil, fmt.Errorf("water: failed to look up network connection by fd") - } - return sourceConn, nil - } - } -} - -func (w *WASMv0) PushConn(conn net.Conn) (fd int32, err error) { - w.gcfixOnce.Do(func() { - if GCFIX { - // create temp file - var f *os.File - f, err = os.CreateTemp("", "water-gcfix") - if err != nil { - return - } - - // push dummy file - fd, err := w.Store().PushFile(f, wasmtime.READ_ONLY) - if err != nil { - return - } - - // save dummy file to map - w.pushedConn[int32(fd)] = &struct { - conn net.Conn - file *os.File - }{ - conn: nil, - file: f, - } - } - }) - - if err != nil { - return 0, fmt.Errorf("water: creating temp file for GC fix: %w", err) - } - - connFile, err := socket.AsFile(conn) - if err != nil { - return 0, fmt.Errorf("water: converting conn to file failed: %w", err) - } - - fdu32, err := w.store.PushFile(connFile, wasmtime.READ_WRITE) - if err != nil { - return 0, fmt.Errorf("water: pushing conn file to store failed: %w", err) - } - fd = int32(fdu32) - - w.pushedConn[fd] = &struct { - conn net.Conn - file *os.File - }{ - conn: conn, - file: connFile, - } - - return fd, nil -} - -func (w *WASMv0) DeferAll() { - w.deferOnce.Do(func() { // execute all deferred functions if not yet executed - for _, f := range w.deferredFuncs { - f() - } - }) -} - -func (w *WASMv0) Defer(f func()) { - w.deferredFuncs = append(w.deferredFuncs, f) -} - -func (w *WASMv0) Cleanup() { - // clean up pushed files - var keyList []int32 - for k, v := range w.pushedConn { - if v != nil { - if v.file != nil { - v.file.Close() - v.file = nil - } - if v.conn != nil { - v.conn.Close() - v.conn = nil - } - } - keyList = append(keyList, k) - } - for _, k := range keyList { - delete(w.pushedConn, k) - } - - // clean up deferred functions - w.deferredFuncs = nil - - w.dialer.CloseAllConn() - w.listener.CloseAllConn() -} - -func (w *WASMv0) pushConfig(caller *wasmtime.Caller) (int32, error) { - // get config file - configFile := w.Config().WATMConfig.File() - if configFile == nil { - return wasm.INVALID_FD, nil // we don't return error here so no trap is triggered - } - - // push file to WASM - configFd, err := caller.PushFile(configFile, wasmtime.READ_ONLY) - if err != nil { - return wasm.INVALID_FD, err - } - - return int32(configFd), nil -}