Skip to content

Commit

Permalink
fix: WebTransport stream now extends abstract stream (#2514)
Browse files Browse the repository at this point in the history
The PR pulls all of the non-`@fails/webtransport` parts out of #2422

There's a lot of work that's been done to re-use existing libp2p
code such as the abstract stream class which handles a lot more
closing scenarios than the existing implementation so it would be
good to get that in.
  • Loading branch information
achingbrain authored May 1, 2024
1 parent c824323 commit de3f7ae
Show file tree
Hide file tree
Showing 13 changed files with 372 additions and 311 deletions.
14 changes: 9 additions & 5 deletions packages/transport-webtransport/.aegir.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
/* eslint-disable no-console */
import { spawn, exec } from 'child_process'
import { existsSync } from 'fs'
import { existsSync } from 'node:fs'
import os from 'node:os'
import defer from 'p-defer'

/** @type {import('aegir/types').PartialOptions} */
export default {
test: {
async before() {
async before () {
const main = os.platform() === 'win32' ? 'main.exe' : 'main'

if (!existsSync('./go-libp2p-webtransport-server/main')) {
await new Promise((resolve, reject) => {
exec('go build -o main main.go',
exec(`go build -o ${main} main.go`,
{ cwd: './go-libp2p-webtransport-server' },
(error, stdout, stderr) => {
if (error) {
Expand All @@ -21,7 +25,7 @@ export default {
})
}

const server = spawn('./main', [], { cwd: './go-libp2p-webtransport-server', killSignal: 'SIGINT' })
const server = spawn(`./${main}`, [], { cwd: './go-libp2p-webtransport-server', killSignal: 'SIGINT' })
server.stderr.on('data', (data) => {
console.log('stderr:', data.toString())
})
Expand Down Expand Up @@ -53,7 +57,7 @@ export default {
}
}
},
async after(_, { server }) {
async after (_, { server }) {
server.kill('SIGINT')
}
},
Expand Down
15 changes: 12 additions & 3 deletions packages/transport-webtransport/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,25 +53,34 @@
"@chainsafe/libp2p-noise": "^15.0.0",
"@libp2p/interface": "^1.3.0",
"@libp2p/peer-id": "^4.1.0",
"@libp2p/utils": "^5.3.2",
"@multiformats/multiaddr": "^12.2.1",
"@multiformats/multiaddr-matcher": "^1.2.0",
"it-stream-types": "^2.0.1",
"multiformats": "^13.1.0",
"race-signal": "^1.0.2",
"uint8arraylist": "^2.4.8",
"uint8arrays": "^5.0.3"
},
"devDependencies": {
"@libp2p/logger": "^4.0.11",
"@libp2p/peer-id-factory": "^4.1.0",
"@noble/hashes": "^1.4.0",
"aegir": "^42.2.5",
"it-map": "^3.1.0",
"it-to-buffer": "^4.0.7",
"libp2p": "^1.4.3",
"p-defer": "^4.0.1"
"p-defer": "^4.0.1",
"p-wait-for": "^5.0.2"
},
"browser": {
"./dist/src/listener.js": "./dist/src/listener.browser.js"
"./dist/src/listener.js": "./dist/src/listener.browser.js",
"./dist/src/webtransport.js": "./dist/src/webtransport.browser.js"
},
"react-native": {
"./dist/src/listener.js": "./dist/src/listener.browser.js"
"./dist/src/listener.js": "./dist/src/listener.browser.js",
"./dist/src/webtransport.js": "./dist/src/webtransport.browser.js",
"./dist/src/utils/generate-certificates.js": "./dist/src/utils/generate-certificates.browser.js"
},
"sideEffects": false
}
177 changes: 57 additions & 120 deletions packages/transport-webtransport/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,38 @@
*/

import { noise } from '@chainsafe/libp2p-noise'
import { type Transport, transportSymbol, type CreateListenerOptions, type DialOptions, type Listener, type ComponentLogger, type Logger, type Connection, type MultiaddrConnection, type Stream, type CounterGroup, type Metrics, type PeerId, type StreamMuxerFactory, type StreamMuxerInit, type StreamMuxer } from '@libp2p/interface'
import { type Multiaddr, type AbortOptions } from '@multiformats/multiaddr'
import { AbortError, CodeError, transportSymbol } from '@libp2p/interface'
import { WebTransport as WebTransportMatcher } from '@multiformats/multiaddr-matcher'
import { webtransportBiDiStreamToStream } from './stream.js'
import { raceSignal } from 'race-signal'
import createListener from './listener.js'
import { webtransportMuxer } from './muxer.js'
import { inertDuplex } from './utils/inert-duplex.js'
import { isSubset } from './utils/is-subset.js'
import { parseMultiaddr } from './utils/parse-multiaddr.js'
import WebTransport from './webtransport.js'
import type { Transport, CreateListenerOptions, DialOptions, Listener, ComponentLogger, Logger, Connection, MultiaddrConnection, CounterGroup, Metrics, PeerId } from '@libp2p/interface'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { Source } from 'it-stream-types'
import type { MultihashDigest } from 'multiformats/hashes/interface'
import type { Uint8ArrayList } from 'uint8arraylist'

/**
* PEM format server certificate and private key
*/
export interface WebTransportCertificate {
privateKey: string
pem: string
hash: MultihashDigest<number>
secret: string
}

interface WebTransportSessionCleanup {
(metric: string): void
}

export interface WebTransportInit {
maxInboundStreams?: number
certificates?: WebTransportCertificate[]
}

export interface WebTransportComponents {
Expand All @@ -69,7 +84,9 @@ class WebTransportTransport implements Transport {
this.log = components.logger.forComponent('libp2p:webtransport')
this.components = components
this.config = {
maxInboundStreams: init.maxInboundStreams ?? 1000
...init,
maxInboundStreams: init.maxInboundStreams ?? 1000,
certificates: init.certificates ?? []
}

if (components.metrics != null) {
Expand All @@ -87,24 +104,26 @@ class WebTransportTransport implements Transport {
readonly [transportSymbol] = true

async dial (ma: Multiaddr, options: DialOptions): Promise<Connection> {
options?.signal?.throwIfAborted()
if (options?.signal?.aborted === true) {
throw new AbortError()
}

this.log('dialing %s', ma)
const localPeer = this.components.peerId
if (localPeer === undefined) {
throw new Error('Need a local peerid')
throw new CodeError('Need a local peerid', 'ERR_INVALID_PARAMETERS')
}

options = options ?? {}

const { url, certhashes, remotePeer } = parseMultiaddr(ma)

if (remotePeer == null) {
throw new Error('Need a target peerid')
throw new CodeError('Need a target peerid', 'ERR_INVALID_PARAMETERS')
}

if (certhashes.length === 0) {
throw new Error('Expected multiaddr to contain certhashes')
throw new CodeError('Expected multiaddr to contain certhashes', 'ERR_INVALID_PARAMETERS')
}

let abortListener: (() => void) | undefined
Expand Down Expand Up @@ -159,10 +178,12 @@ class WebTransportTransport implements Transport {
once: true
})

this.log('wait for session to be ready')
await Promise.race([
wt.closed,
wt.ready
])
this.log('session became ready')

ready = true
this.metrics?.dialerEvents.increment({ ready: true })
Expand All @@ -175,15 +196,17 @@ class WebTransportTransport implements Transport {
cleanUpWTSession('remote_close')
})

if (!await this.authenticateWebTransport(wt, localPeer, remotePeer, certhashes)) {
throw new Error('Failed to authenticate webtransport')
authenticated = await raceSignal(this.authenticateWebTransport(wt, localPeer, remotePeer, certhashes), options.signal)

if (!authenticated) {
throw new CodeError('Failed to authenticate webtransport', 'ERR_AUTHENTICATION_FAILED')
}

this.metrics?.dialerEvents.increment({ open: true })

maConn = {
close: async () => {
this.log('Closing webtransport')
this.log('closing webtransport')
cleanUpWTSession('close')
},
abort: (err: Error) => {
Expand All @@ -199,9 +222,11 @@ class WebTransportTransport implements Transport {
...inertDuplex()
}

authenticated = true

return await options.upgrader.upgradeOutbound(maConn, { skipEncryption: true, muxerFactory: this.webtransportMuxer(wt), skipProtection: true })
return await options.upgrader.upgradeOutbound(maConn, {
skipEncryption: true,
muxerFactory: webtransportMuxer(wt, wt.incomingBidirectionalStreams.getReader(), this.components.logger, this.config),
skipProtection: true
})
} catch (err: any) {
this.log.error('caught wt session err', err)

Expand All @@ -221,11 +246,14 @@ class WebTransportTransport implements Transport {
}
}

async authenticateWebTransport (wt: InstanceType<typeof WebTransport>, localPeer: PeerId, remotePeer: PeerId, certhashes: Array<MultihashDigest<number>>): Promise<boolean> {
async authenticateWebTransport (wt: WebTransport, localPeer: PeerId, remotePeer: PeerId, certhashes: Array<MultihashDigest<number>>, signal?: AbortSignal): Promise<boolean> {
if (signal?.aborted === true) {
throw new AbortError()
}

const stream = await wt.createBidirectionalStream()
const writer = stream.writable.getWriter()
const reader = stream.readable.getReader()
await writer.ready

const duplex = {
source: (async function * () {
Expand All @@ -241,13 +269,15 @@ class WebTransportTransport implements Transport {
}
}
})(),
sink: async function (source: Source<Uint8Array | Uint8ArrayList>) {
sink: async (source: Source<Uint8Array | Uint8ArrayList>) => {
for await (const chunk of source) {
if (chunk instanceof Uint8Array) {
await writer.write(chunk)
} else {
await writer.write(chunk.subarray())
}
await raceSignal(writer.ready, signal)

const buf = chunk instanceof Uint8Array ? chunk : chunk.subarray()

writer.write(buf).catch(err => {
this.log.error('could not write chunk during authentication of WebTransport stream', err)
})
}
}
}
Expand All @@ -273,105 +303,12 @@ class WebTransportTransport implements Transport {
return true
}

webtransportMuxer (wt: WebTransport): StreamMuxerFactory {
let streamIDCounter = 0
const config = this.config
const self = this
return {
protocol: 'webtransport',
createStreamMuxer: (init?: StreamMuxerInit): StreamMuxer => {
// !TODO handle abort signal when WebTransport supports this.

if (typeof init === 'function') {
// The api docs say that init may be a function
init = { onIncomingStream: init }
}

const activeStreams: Stream[] = [];

(async function () {
//! TODO unclear how to add backpressure here?

const reader = wt.incomingBidirectionalStreams.getReader()
while (true) {
const { done, value: wtStream } = await reader.read()

if (done) {
break
}

if (activeStreams.length >= config.maxInboundStreams) {
// We've reached our limit, close this stream.
wtStream.writable.close().catch((err: Error) => {
self.log.error(`Failed to close inbound stream that crossed our maxInboundStream limit: ${err.message}`)
})
wtStream.readable.cancel().catch((err: Error) => {
self.log.error(`Failed to close inbound stream that crossed our maxInboundStream limit: ${err.message}`)
})
} else {
const stream = await webtransportBiDiStreamToStream(
wtStream,
String(streamIDCounter++),
'inbound',
activeStreams,
init?.onStreamEnd,
self.components.logger
)
activeStreams.push(stream)
init?.onIncomingStream?.(stream)
}
}
})().catch(() => {
this.log.error('WebTransport failed to receive incoming stream')
})

const muxer: StreamMuxer = {
protocol: 'webtransport',
streams: activeStreams,
newStream: async (name?: string): Promise<Stream> => {
const wtStream = await wt.createBidirectionalStream()

const stream = await webtransportBiDiStreamToStream(
wtStream,
String(streamIDCounter++),
init?.direction ?? 'outbound',
activeStreams,
init?.onStreamEnd,
self.components.logger
)
activeStreams.push(stream)

return stream
},

/**
* Close or abort all tracked streams and stop the muxer
*/
close: async (options?: AbortOptions) => {
this.log('Closing webtransport muxer')

await Promise.all(
activeStreams.map(async s => s.close(options))
)
},
abort: (err: Error) => {
this.log('Aborting webtransport muxer with err:', err)

for (const stream of activeStreams) {
stream.abort(err)
}
},
// This stream muxer is webtransport native. Therefore it doesn't plug in with any other duplex.
...inertDuplex()
}

return muxer
}
}
}

createListener (options: CreateListenerOptions): Listener {
throw new Error('Webtransport servers are not supported in Node or the browser')
return createListener(this.components, {
...options,
certificates: this.config.certificates,
maxInboundStreams: this.config.maxInboundStreams
})
}

/**
Expand Down
5 changes: 5 additions & 0 deletions packages/transport-webtransport/src/listener.browser.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import type { CreateListenerOptions, Listener } from '@libp2p/interface'

export default function createListener (options: CreateListenerOptions): Listener {
throw new Error('Not implemented')
}
19 changes: 19 additions & 0 deletions packages/transport-webtransport/src/listener.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import type { WebTransportCertificate } from './index.js'
import type { Connection, Upgrader, Listener, CreateListenerOptions, PeerId, ComponentLogger, Metrics } from '@libp2p/interface'

export interface WebTransportListenerComponents {
peerId: PeerId
logger: ComponentLogger
metrics?: Metrics
}

export interface WebTransportListenerInit extends CreateListenerOptions {
handler?(conn: Connection): void
upgrader: Upgrader
certificates?: WebTransportCertificate[]
maxInboundStreams?: number
}

export default function createListener (components: WebTransportListenerComponents, options: WebTransportListenerInit): Listener {
throw new Error('Only supported in browsers')
}
Loading

0 comments on commit de3f7ae

Please sign in to comment.