From de3f7aeafa6a4ada2f65598aa6d8eeece6ad83d8 Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Wed, 1 May 2024 06:51:14 +0100 Subject: [PATCH] fix: WebTransport stream now extends abstract stream (#2514) The PR pulls all of the non-`@fails/webtransport` parts out of #2422 There's a lot of work that's been done to re-use existing libp2p code such as the abstract stream class which handles a lot more closing scenarios than the existing implementation so it would be good to get that in. --- packages/transport-webtransport/.aegir.js | 14 +- packages/transport-webtransport/package.json | 15 +- packages/transport-webtransport/src/index.ts | 177 ++++-------- .../src/listener.browser.ts | 5 + .../transport-webtransport/src/listener.ts | 19 ++ packages/transport-webtransport/src/muxer.ts | 95 +++++++ packages/transport-webtransport/src/stream.ts | 254 +++++++----------- .../utils/generate-certificates.browser.ts | 3 + .../src/utils/generate-certificates.ts | 11 + .../src/webtransport.browser.ts | 1 + .../src/webtransport.ts | 17 ++ .../transport-webtransport/test/browser.ts | 60 +++-- .../test/fixtures/random-bytes.ts | 12 + 13 files changed, 372 insertions(+), 311 deletions(-) create mode 100644 packages/transport-webtransport/src/listener.browser.ts create mode 100644 packages/transport-webtransport/src/listener.ts create mode 100644 packages/transport-webtransport/src/muxer.ts create mode 100644 packages/transport-webtransport/src/utils/generate-certificates.browser.ts create mode 100644 packages/transport-webtransport/src/utils/generate-certificates.ts create mode 100644 packages/transport-webtransport/src/webtransport.browser.ts create mode 100644 packages/transport-webtransport/src/webtransport.ts create mode 100644 packages/transport-webtransport/test/fixtures/random-bytes.ts diff --git a/packages/transport-webtransport/.aegir.js b/packages/transport-webtransport/.aegir.js index b211f70e0a..5f7d52f3db 100644 --- a/packages/transport-webtransport/.aegir.js +++ b/packages/transport-webtransport/.aegir.js @@ -1,14 +1,18 @@ +/* eslint-disable no-console */ import { spawn, exec } from 'child_process' -import { existsSync } from 'fs' +import { existsSync } from 'node:fs' +import os from 'node:os' import defer from 'p-defer' /** @type {import('aegir/types').PartialOptions} */ export default { test: { - async before() { + async before () { + const main = os.platform() === 'win32' ? 'main.exe' : 'main' + if (!existsSync('./go-libp2p-webtransport-server/main')) { await new Promise((resolve, reject) => { - exec('go build -o main main.go', + exec(`go build -o ${main} main.go`, { cwd: './go-libp2p-webtransport-server' }, (error, stdout, stderr) => { if (error) { @@ -21,7 +25,7 @@ export default { }) } - const server = spawn('./main', [], { cwd: './go-libp2p-webtransport-server', killSignal: 'SIGINT' }) + const server = spawn(`./${main}`, [], { cwd: './go-libp2p-webtransport-server', killSignal: 'SIGINT' }) server.stderr.on('data', (data) => { console.log('stderr:', data.toString()) }) @@ -53,7 +57,7 @@ export default { } } }, - async after(_, { server }) { + async after (_, { server }) { server.kill('SIGINT') } }, diff --git a/packages/transport-webtransport/package.json b/packages/transport-webtransport/package.json index 7fa64c517c..d1a53a9545 100644 --- a/packages/transport-webtransport/package.json +++ b/packages/transport-webtransport/package.json @@ -53,25 +53,34 @@ "@chainsafe/libp2p-noise": "^15.0.0", "@libp2p/interface": "^1.3.0", "@libp2p/peer-id": "^4.1.0", + "@libp2p/utils": "^5.3.2", "@multiformats/multiaddr": "^12.2.1", "@multiformats/multiaddr-matcher": "^1.2.0", "it-stream-types": "^2.0.1", "multiformats": "^13.1.0", + "race-signal": "^1.0.2", "uint8arraylist": "^2.4.8", "uint8arrays": "^5.0.3" }, "devDependencies": { "@libp2p/logger": "^4.0.11", "@libp2p/peer-id-factory": "^4.1.0", + "@noble/hashes": "^1.4.0", "aegir": "^42.2.5", + "it-map": "^3.1.0", + "it-to-buffer": "^4.0.7", "libp2p": "^1.4.3", - "p-defer": "^4.0.1" + "p-defer": "^4.0.1", + "p-wait-for": "^5.0.2" }, "browser": { - "./dist/src/listener.js": "./dist/src/listener.browser.js" + "./dist/src/listener.js": "./dist/src/listener.browser.js", + "./dist/src/webtransport.js": "./dist/src/webtransport.browser.js" }, "react-native": { - "./dist/src/listener.js": "./dist/src/listener.browser.js" + "./dist/src/listener.js": "./dist/src/listener.browser.js", + "./dist/src/webtransport.js": "./dist/src/webtransport.browser.js", + "./dist/src/utils/generate-certificates.js": "./dist/src/utils/generate-certificates.browser.js" }, "sideEffects": false } diff --git a/packages/transport-webtransport/src/index.ts b/packages/transport-webtransport/src/index.ts index e4ca8aa565..24f7c7a8d6 100644 --- a/packages/transport-webtransport/src/index.ts +++ b/packages/transport-webtransport/src/index.ts @@ -30,23 +30,38 @@ */ import { noise } from '@chainsafe/libp2p-noise' -import { type Transport, transportSymbol, type CreateListenerOptions, type DialOptions, type Listener, type ComponentLogger, type Logger, type Connection, type MultiaddrConnection, type Stream, type CounterGroup, type Metrics, type PeerId, type StreamMuxerFactory, type StreamMuxerInit, type StreamMuxer } from '@libp2p/interface' -import { type Multiaddr, type AbortOptions } from '@multiformats/multiaddr' +import { AbortError, CodeError, transportSymbol } from '@libp2p/interface' import { WebTransport as WebTransportMatcher } from '@multiformats/multiaddr-matcher' -import { webtransportBiDiStreamToStream } from './stream.js' +import { raceSignal } from 'race-signal' +import createListener from './listener.js' +import { webtransportMuxer } from './muxer.js' import { inertDuplex } from './utils/inert-duplex.js' import { isSubset } from './utils/is-subset.js' import { parseMultiaddr } from './utils/parse-multiaddr.js' +import WebTransport from './webtransport.js' +import type { Transport, CreateListenerOptions, DialOptions, Listener, ComponentLogger, Logger, Connection, MultiaddrConnection, CounterGroup, Metrics, PeerId } from '@libp2p/interface' +import type { Multiaddr } from '@multiformats/multiaddr' import type { Source } from 'it-stream-types' import type { MultihashDigest } from 'multiformats/hashes/interface' import type { Uint8ArrayList } from 'uint8arraylist' +/** + * PEM format server certificate and private key + */ +export interface WebTransportCertificate { + privateKey: string + pem: string + hash: MultihashDigest + secret: string +} + interface WebTransportSessionCleanup { (metric: string): void } export interface WebTransportInit { maxInboundStreams?: number + certificates?: WebTransportCertificate[] } export interface WebTransportComponents { @@ -69,7 +84,9 @@ class WebTransportTransport implements Transport { this.log = components.logger.forComponent('libp2p:webtransport') this.components = components this.config = { - maxInboundStreams: init.maxInboundStreams ?? 1000 + ...init, + maxInboundStreams: init.maxInboundStreams ?? 1000, + certificates: init.certificates ?? [] } if (components.metrics != null) { @@ -87,12 +104,14 @@ class WebTransportTransport implements Transport { readonly [transportSymbol] = true async dial (ma: Multiaddr, options: DialOptions): Promise { - options?.signal?.throwIfAborted() + if (options?.signal?.aborted === true) { + throw new AbortError() + } this.log('dialing %s', ma) const localPeer = this.components.peerId if (localPeer === undefined) { - throw new Error('Need a local peerid') + throw new CodeError('Need a local peerid', 'ERR_INVALID_PARAMETERS') } options = options ?? {} @@ -100,11 +119,11 @@ class WebTransportTransport implements Transport { const { url, certhashes, remotePeer } = parseMultiaddr(ma) if (remotePeer == null) { - throw new Error('Need a target peerid') + throw new CodeError('Need a target peerid', 'ERR_INVALID_PARAMETERS') } if (certhashes.length === 0) { - throw new Error('Expected multiaddr to contain certhashes') + throw new CodeError('Expected multiaddr to contain certhashes', 'ERR_INVALID_PARAMETERS') } let abortListener: (() => void) | undefined @@ -159,10 +178,12 @@ class WebTransportTransport implements Transport { once: true }) + this.log('wait for session to be ready') await Promise.race([ wt.closed, wt.ready ]) + this.log('session became ready') ready = true this.metrics?.dialerEvents.increment({ ready: true }) @@ -175,15 +196,17 @@ class WebTransportTransport implements Transport { cleanUpWTSession('remote_close') }) - if (!await this.authenticateWebTransport(wt, localPeer, remotePeer, certhashes)) { - throw new Error('Failed to authenticate webtransport') + authenticated = await raceSignal(this.authenticateWebTransport(wt, localPeer, remotePeer, certhashes), options.signal) + + if (!authenticated) { + throw new CodeError('Failed to authenticate webtransport', 'ERR_AUTHENTICATION_FAILED') } this.metrics?.dialerEvents.increment({ open: true }) maConn = { close: async () => { - this.log('Closing webtransport') + this.log('closing webtransport') cleanUpWTSession('close') }, abort: (err: Error) => { @@ -199,9 +222,11 @@ class WebTransportTransport implements Transport { ...inertDuplex() } - authenticated = true - - return await options.upgrader.upgradeOutbound(maConn, { skipEncryption: true, muxerFactory: this.webtransportMuxer(wt), skipProtection: true }) + return await options.upgrader.upgradeOutbound(maConn, { + skipEncryption: true, + muxerFactory: webtransportMuxer(wt, wt.incomingBidirectionalStreams.getReader(), this.components.logger, this.config), + skipProtection: true + }) } catch (err: any) { this.log.error('caught wt session err', err) @@ -221,11 +246,14 @@ class WebTransportTransport implements Transport { } } - async authenticateWebTransport (wt: InstanceType, localPeer: PeerId, remotePeer: PeerId, certhashes: Array>): Promise { + async authenticateWebTransport (wt: WebTransport, localPeer: PeerId, remotePeer: PeerId, certhashes: Array>, signal?: AbortSignal): Promise { + if (signal?.aborted === true) { + throw new AbortError() + } + const stream = await wt.createBidirectionalStream() const writer = stream.writable.getWriter() const reader = stream.readable.getReader() - await writer.ready const duplex = { source: (async function * () { @@ -241,13 +269,15 @@ class WebTransportTransport implements Transport { } } })(), - sink: async function (source: Source) { + sink: async (source: Source) => { for await (const chunk of source) { - if (chunk instanceof Uint8Array) { - await writer.write(chunk) - } else { - await writer.write(chunk.subarray()) - } + await raceSignal(writer.ready, signal) + + const buf = chunk instanceof Uint8Array ? chunk : chunk.subarray() + + writer.write(buf).catch(err => { + this.log.error('could not write chunk during authentication of WebTransport stream', err) + }) } } } @@ -273,105 +303,12 @@ class WebTransportTransport implements Transport { return true } - webtransportMuxer (wt: WebTransport): StreamMuxerFactory { - let streamIDCounter = 0 - const config = this.config - const self = this - return { - protocol: 'webtransport', - createStreamMuxer: (init?: StreamMuxerInit): StreamMuxer => { - // !TODO handle abort signal when WebTransport supports this. - - if (typeof init === 'function') { - // The api docs say that init may be a function - init = { onIncomingStream: init } - } - - const activeStreams: Stream[] = []; - - (async function () { - //! TODO unclear how to add backpressure here? - - const reader = wt.incomingBidirectionalStreams.getReader() - while (true) { - const { done, value: wtStream } = await reader.read() - - if (done) { - break - } - - if (activeStreams.length >= config.maxInboundStreams) { - // We've reached our limit, close this stream. - wtStream.writable.close().catch((err: Error) => { - self.log.error(`Failed to close inbound stream that crossed our maxInboundStream limit: ${err.message}`) - }) - wtStream.readable.cancel().catch((err: Error) => { - self.log.error(`Failed to close inbound stream that crossed our maxInboundStream limit: ${err.message}`) - }) - } else { - const stream = await webtransportBiDiStreamToStream( - wtStream, - String(streamIDCounter++), - 'inbound', - activeStreams, - init?.onStreamEnd, - self.components.logger - ) - activeStreams.push(stream) - init?.onIncomingStream?.(stream) - } - } - })().catch(() => { - this.log.error('WebTransport failed to receive incoming stream') - }) - - const muxer: StreamMuxer = { - protocol: 'webtransport', - streams: activeStreams, - newStream: async (name?: string): Promise => { - const wtStream = await wt.createBidirectionalStream() - - const stream = await webtransportBiDiStreamToStream( - wtStream, - String(streamIDCounter++), - init?.direction ?? 'outbound', - activeStreams, - init?.onStreamEnd, - self.components.logger - ) - activeStreams.push(stream) - - return stream - }, - - /** - * Close or abort all tracked streams and stop the muxer - */ - close: async (options?: AbortOptions) => { - this.log('Closing webtransport muxer') - - await Promise.all( - activeStreams.map(async s => s.close(options)) - ) - }, - abort: (err: Error) => { - this.log('Aborting webtransport muxer with err:', err) - - for (const stream of activeStreams) { - stream.abort(err) - } - }, - // This stream muxer is webtransport native. Therefore it doesn't plug in with any other duplex. - ...inertDuplex() - } - - return muxer - } - } - } - createListener (options: CreateListenerOptions): Listener { - throw new Error('Webtransport servers are not supported in Node or the browser') + return createListener(this.components, { + ...options, + certificates: this.config.certificates, + maxInboundStreams: this.config.maxInboundStreams + }) } /** diff --git a/packages/transport-webtransport/src/listener.browser.ts b/packages/transport-webtransport/src/listener.browser.ts new file mode 100644 index 0000000000..fc3851379e --- /dev/null +++ b/packages/transport-webtransport/src/listener.browser.ts @@ -0,0 +1,5 @@ +import type { CreateListenerOptions, Listener } from '@libp2p/interface' + +export default function createListener (options: CreateListenerOptions): Listener { + throw new Error('Not implemented') +} diff --git a/packages/transport-webtransport/src/listener.ts b/packages/transport-webtransport/src/listener.ts new file mode 100644 index 0000000000..a0e15ff49d --- /dev/null +++ b/packages/transport-webtransport/src/listener.ts @@ -0,0 +1,19 @@ +import type { WebTransportCertificate } from './index.js' +import type { Connection, Upgrader, Listener, CreateListenerOptions, PeerId, ComponentLogger, Metrics } from '@libp2p/interface' + +export interface WebTransportListenerComponents { + peerId: PeerId + logger: ComponentLogger + metrics?: Metrics +} + +export interface WebTransportListenerInit extends CreateListenerOptions { + handler?(conn: Connection): void + upgrader: Upgrader + certificates?: WebTransportCertificate[] + maxInboundStreams?: number +} + +export default function createListener (components: WebTransportListenerComponents, options: WebTransportListenerInit): Listener { + throw new Error('Only supported in browsers') +} diff --git a/packages/transport-webtransport/src/muxer.ts b/packages/transport-webtransport/src/muxer.ts new file mode 100644 index 0000000000..fa5c1e0492 --- /dev/null +++ b/packages/transport-webtransport/src/muxer.ts @@ -0,0 +1,95 @@ +import { webtransportBiDiStreamToStream } from './stream.js' +import { inertDuplex } from './utils/inert-duplex.js' +import type WebTransport from './webtransport.js' +import type { ComponentLogger, Stream, StreamMuxer, StreamMuxerFactory, StreamMuxerInit } from '@libp2p/interface' + +export interface WebTransportMuxerInit { + maxInboundStreams: number +} + +export function webtransportMuxer (wt: Pick, reader: ReadableStreamDefaultReader, logger: ComponentLogger, config: WebTransportMuxerInit): StreamMuxerFactory { + let streamIDCounter = 0 + const log = logger.forComponent('libp2p:webtransport:muxer') + + return { + protocol: 'webtransport', + createStreamMuxer: (init?: StreamMuxerInit): StreamMuxer => { + // !TODO handle abort signal when WebTransport supports this. + + if (typeof init === 'function') { + // The api docs say that init may be a function + init = { onIncomingStream: init } + } + + const activeStreams: Stream[] = [] + + void Promise.resolve().then(async () => { + //! TODO unclear how to add backpressure here? + while (true) { + const { done, value: wtStream } = await reader.read() + + if (done) { + break + } + + if (activeStreams.length >= config.maxInboundStreams) { + log(`too many inbound streams open - ${activeStreams.length}/${config.maxInboundStreams}, closing new incoming stream`) + // We've reached our limit, close this stream. + wtStream.writable.close().catch((err: Error) => { + log.error(`failed to close inbound stream that crossed our maxInboundStream limit: ${err.message}`) + }) + wtStream.readable.cancel().catch((err: Error) => { + log.error(`failed to close inbound stream that crossed our maxInboundStream limit: ${err.message}`) + }) + } else { + const stream = await webtransportBiDiStreamToStream( + wtStream, + String(streamIDCounter++), + 'inbound', + activeStreams, + init?.onStreamEnd, + logger + ) + activeStreams.push(stream) + init?.onIncomingStream?.(stream) + } + } + }) + + const muxer: StreamMuxer = { + protocol: 'webtransport', + streams: activeStreams, + newStream: async (name?: string): Promise => { + log('new outgoing stream', name) + + const wtStream = await wt.createBidirectionalStream() + const stream = await webtransportBiDiStreamToStream(wtStream, String(streamIDCounter++), init?.direction ?? 'outbound', activeStreams, init?.onStreamEnd, logger) + activeStreams.push(stream) + + return stream + }, + + /** + * Close all tracked streams and stop the muxer + */ + close: async () => { + log('closing webtransport muxer gracefully') + wt.close() + }, + + /** + * Abort all tracked streams and stop the muxer + */ + abort: (err: Error) => { + log('closing webtransport muxer with err:', err) + wt.close() + }, + + // This stream muxer is webtransport native. Therefore it doesn't plug in with any other duplex. + ...inertDuplex() + } + + return muxer + } + } +} diff --git a/packages/transport-webtransport/src/stream.ts b/packages/transport-webtransport/src/stream.ts index 43672408c9..4730dd60bd 100644 --- a/packages/transport-webtransport/src/stream.ts +++ b/packages/transport-webtransport/src/stream.ts @@ -1,184 +1,110 @@ +import { AbstractStream, type AbstractStreamInit } from '@libp2p/utils/abstract-stream' +import { raceSignal } from 'race-signal' import { Uint8ArrayList } from 'uint8arraylist' import type { AbortOptions, ComponentLogger, Direction, Stream } from '@libp2p/interface' -import type { Source } from 'it-stream-types' -export async function webtransportBiDiStreamToStream (bidiStream: WebTransportBidirectionalStream, streamId: string, direction: Direction, activeStreams: Stream[], onStreamEnd: undefined | ((s: Stream) => void), logger: ComponentLogger): Promise { - const log = logger.forComponent(`libp2p:webtransport:stream:${direction}:${streamId}`) - const writer = bidiStream.writable.getWriter() - const reader = bidiStream.readable.getReader() - await writer.ready - - function cleanupStreamFromActiveStreams (): void { - const index = activeStreams.findIndex(s => s === stream) - if (index !== -1) { - activeStreams.splice(index, 1) - stream.timeline.close = Date.now() - onStreamEnd?.(stream) - } - } +interface WebTransportStreamInit extends AbstractStreamInit { + bidiStream: WebTransportBidirectionalStream +} - let writerClosed = false - let readerClosed = false; - (async function () { - const err: Error | undefined = await writer.closed.catch((err: Error) => err) - if (err != null) { - const msg = err.message - if (!(msg.includes('aborted by the remote server') || msg.includes('STOP_SENDING'))) { - log.error(`WebTransport writer closed unexpectedly: streamId=${streamId} err=${err.message}`) - } - } - writerClosed = true - if (writerClosed && readerClosed) { - cleanupStreamFromActiveStreams() - } - })().catch(() => { - log.error('WebTransport failed to cleanup closed stream') - }); - - (async function () { - const err: Error | undefined = await reader.closed.catch((err: Error) => err) - if (err != null) { - log.error(`WebTransport reader closed unexpectedly: streamId=${streamId} err=${err.message}`) - } - readerClosed = true - if (writerClosed && readerClosed) { - cleanupStreamFromActiveStreams() - } - })().catch(() => { - log.error('WebTransport failed to cleanup closed stream') - }) +class WebTransportStream extends AbstractStream { + private readonly writer: WritableStreamDefaultWriter + private readonly reader: ReadableStreamDefaultReader - let sinkSunk = false - const stream: Stream = { - id: streamId, - status: 'open', - writeStatus: 'ready', - readStatus: 'ready', - abort (err: Error) { - if (!writerClosed) { - writer.abort(err) - .catch(err => { - log.error('could not abort stream', err) - }) - writerClosed = true - } - readerClosed = true - - this.status = 'aborted' - this.writeStatus = 'closed' - this.readStatus = 'closed' - - this.timeline.reset = - this.timeline.close = - this.timeline.closeRead = - this.timeline.closeWrite = Date.now() - - cleanupStreamFromActiveStreams() - }, - async close (options?: AbortOptions) { - this.status = 'closing' - - await Promise.all([ - stream.closeRead(options), - stream.closeWrite(options) - ]) - - cleanupStreamFromActiveStreams() - - this.status = 'closed' - this.timeline.close = Date.now() - }, - - async closeRead (options?: AbortOptions) { - if (!readerClosed) { - this.readStatus = 'closing' - - try { - await reader.cancel() - } catch (err: any) { - if (err.toString().includes('RESET_STREAM') === true) { - writerClosed = true - } - } + constructor (init: WebTransportStreamInit) { + super(init) - this.timeline.closeRead = Date.now() - this.readStatus = 'closed' + this.writer = init.bidiStream.writable.getWriter() + this.reader = init.bidiStream.readable.getReader() - readerClosed = true - } + Promise.resolve().then(async () => { + while (true) { + const result = await this.reader.read() - if (writerClosed) { - cleanupStreamFromActiveStreams() - } - }, + if (result.done) { + init.log('remote closed write') + return + } - async closeWrite (options?: AbortOptions) { - if (!writerClosed) { - writerClosed = true + if (result.value != null) { + this.sourcePush(new Uint8ArrayList(result.value)) + } + } + }) + .catch(err => { + init.log.error('error reading from stream', err) + this.abort(err) + }) + .finally(() => { + this.remoteCloseWrite() + }) + + void this.writer.closed + .then(() => { + init.log('writer closed') + }) + .catch((err) => { + init.log('writer close promise rejected', err) + }) + .finally(() => { + this.remoteCloseRead() + }) + } - this.writeStatus = 'closing' + sendNewStream (options?: AbortOptions | undefined): void { + // this is a no-op + } - try { - await writer.close() - } catch (err: any) { - if (err.toString().includes('RESET_STREAM') === true) { - readerClosed = true - } - } + async sendData (buf: Uint8ArrayList, options?: AbortOptions): Promise { + for await (const chunk of buf) { + this.log('sendData waiting for writer to be ready') + await raceSignal(this.writer.ready, options?.signal) + + // the streams spec recommends not waiting for data to be sent + // https://streams.spec.whatwg.org/#example-manual-write-dont-await + this.writer.write(chunk) + .catch(err => { + this.log.error('error sending stream data', err) + }) + } + } - this.timeline.closeWrite = Date.now() - this.writeStatus = 'closed' - } + async sendReset (options?: AbortOptions): Promise { + this.log('sendReset aborting writer') + await raceSignal(this.writer.abort(), options?.signal) + this.log('sendReset aborted writer') + } - if (readerClosed) { - cleanupStreamFromActiveStreams() - } - }, - direction, - timeline: { open: Date.now() }, - metadata: {}, - source: (async function * () { - while (true) { - const val = await reader.read() - if (val.done) { - readerClosed = true - if (writerClosed) { - cleanupStreamFromActiveStreams() - } - return - } + async sendCloseWrite (options?: AbortOptions): Promise { + this.log('sendCloseWrite closing writer') + await raceSignal(this.writer.close(), options?.signal) + this.log('sendCloseWrite closed writer') + } - yield new Uint8ArrayList(val.value) - } - })(), - sink: async function (source: Source) { - if (sinkSunk) { - throw new Error('sink already called on stream') - } - sinkSunk = true - try { - this.writeStatus = 'writing' - - for await (const chunks of source) { - if (chunks instanceof Uint8Array) { - await writer.write(chunks) - } else { - for (const buf of chunks) { - await writer.write(buf) - } - } - } + async sendCloseRead (options?: AbortOptions): Promise { + this.log('sendCloseRead cancelling reader') + await raceSignal(this.reader.cancel(), options?.signal) + this.log('sendCloseRead cancelled reader') + } +} - this.writeStatus = 'done' - } finally { - this.timeline.closeWrite = Date.now() - this.writeStatus = 'closed' +export async function webtransportBiDiStreamToStream (bidiStream: WebTransportBidirectionalStream, streamId: string, direction: Direction, activeStreams: Stream[], onStreamEnd: undefined | ((s: Stream) => void), logger: ComponentLogger): Promise { + const log = logger.forComponent(`libp2p:webtransport:stream:${direction}:${streamId}`) - await stream.closeWrite() + const stream = new WebTransportStream({ + bidiStream, + id: streamId, + direction, + log, + onEnd: () => { + const index = activeStreams.findIndex(s => s === stream) + if (index !== -1) { + activeStreams.splice(index, 1) } - }, - log - } + + onStreamEnd?.(stream) + } + }) return stream } diff --git a/packages/transport-webtransport/src/utils/generate-certificates.browser.ts b/packages/transport-webtransport/src/utils/generate-certificates.browser.ts new file mode 100644 index 0000000000..ba2c23f0a5 --- /dev/null +++ b/packages/transport-webtransport/src/utils/generate-certificates.browser.ts @@ -0,0 +1,3 @@ +export async function generateWebTransportCertificates (): Promise { + throw new Error('Not implemented') +} diff --git a/packages/transport-webtransport/src/utils/generate-certificates.ts b/packages/transport-webtransport/src/utils/generate-certificates.ts new file mode 100644 index 0000000000..57d3d2692a --- /dev/null +++ b/packages/transport-webtransport/src/utils/generate-certificates.ts @@ -0,0 +1,11 @@ +import type { WebTransportCertificate } from '../../src/index.js' + +export interface GenerateWebTransportCertificateOptions { + days: number + start?: Date + extensions?: any[] +} + +export async function generateWebTransportCertificates (options: GenerateWebTransportCertificateOptions[] = []): Promise { + throw new Error('Not implemented') +} diff --git a/packages/transport-webtransport/src/webtransport.browser.ts b/packages/transport-webtransport/src/webtransport.browser.ts new file mode 100644 index 0000000000..349cb18505 --- /dev/null +++ b/packages/transport-webtransport/src/webtransport.browser.ts @@ -0,0 +1 @@ +export default WebTransport diff --git a/packages/transport-webtransport/src/webtransport.ts b/packages/transport-webtransport/src/webtransport.ts new file mode 100644 index 0000000000..af19fed0e0 --- /dev/null +++ b/packages/transport-webtransport/src/webtransport.ts @@ -0,0 +1,17 @@ +export default class WebTransport { + constructor (url: string | URL, options?: WebTransportOptions) { + throw new Error('Only supported in browsers') + } + + close (): void { + throw new Error('Only supported in browsers') + } + + async createBidirectionalStream (): Promise { + throw new Error('Only supported in browsers') + } + + public closed = Promise.reject(new Error('Only supported in browsers')) + public ready = Promise.reject(new Error('Only supported in browsers')) + public incomingBidirectionalStreams: ReadableStream +} diff --git a/packages/transport-webtransport/test/browser.ts b/packages/transport-webtransport/test/browser.ts index c4a54f6ad5..ae8b3d3a42 100644 --- a/packages/transport-webtransport/test/browser.ts +++ b/packages/transport-webtransport/test/browser.ts @@ -1,11 +1,14 @@ -/* eslint-disable no-console */ /* eslint-env mocha */ import { noise } from '@chainsafe/libp2p-noise' import { multiaddr } from '@multiformats/multiaddr' import { expect } from 'aegir/chai' +import map from 'it-map' +import toBuffer from 'it-to-buffer' import { createLibp2p, type Libp2p } from 'libp2p' +import pWaitFor from 'p-wait-for' import { webTransport } from '../src/index.js' +import { randomBytes } from './fixtures/random-bytes.js' describe('libp2p-webtransport', () => { let node: Libp2p @@ -16,6 +19,9 @@ describe('libp2p-webtransport', () => { connectionEncryption: [noise()], connectionGater: { denyDialMultiaddr: async () => false + }, + connectionManager: { + minConnections: 0 } }) }) @@ -47,8 +53,7 @@ describe('libp2p-webtransport', () => { // we can use the builtin ping system const stream = await node.dialProtocol(ma, '/ipfs/ping/1.0.0') - const data = new Uint8Array(32) - globalThis.crypto.getRandomValues(data) + const data = randomBytes(32) const pong = new Promise((resolve, reject) => { (async () => { @@ -133,29 +138,46 @@ describe('libp2p-webtransport', () => { const maStr: string = process.env.serverAddr const ma = multiaddr(maStr) - async function * gen (): AsyncGenerator { - yield new Uint8Array([0]) - yield new Uint8Array([1, 2, 3, 4]) - yield new Uint8Array([5, 6, 7]) - yield new Uint8Array([8, 9, 10, 11]) - yield new Uint8Array([12, 13, 14, 15]) + const data = [ + Uint8Array.from([0]), + Uint8Array.from([1, 2, 3, 4]), + Uint8Array.from([5, 6, 7]), + Uint8Array.from([8, 9, 10, 11]), + Uint8Array.from([12, 13, 14, 15]) + ] + + async function * gen (): AsyncGenerator { + yield * data } const stream = await node.dialProtocol(ma, 'echo') - await stream.sink(gen()) + expect(stream.timeline.closeWrite).to.be.undefined() + expect(stream.timeline.closeRead).to.be.undefined() + expect(stream.timeline.close).to.be.undefined() + + // send and receive data + const [, output] = await Promise.all([ + stream.sink(gen()), + toBuffer(map(stream.source, buf => buf.subarray())) + ]) + + // closing takes a little bit of time + await pWaitFor(() => { + return stream.writeStatus === 'closed' + }, { + interval: 100 + }) - let expectedNextNumber = 0 - for await (const chunk of stream.source) { - for (const byte of chunk.subarray()) { - expect(byte).to.equal(expectedNextNumber++) - } - } - expect(expectedNextNumber).to.equal(16) + expect(stream.writeStatus).to.equal('closed') + expect(stream.timeline.closeWrite).to.be.greaterThan(0) - // Close read, we've should have closed the write side during sink - await stream.closeRead() + // should have read all of the bytes + expect(output).to.equalBytes(toBuffer(data)) + // should have set timeline events + expect(stream.timeline.closeWrite).to.be.greaterThan(0) + expect(stream.timeline.closeRead).to.be.greaterThan(0) expect(stream.timeline.close).to.be.greaterThan(0) }) }) diff --git a/packages/transport-webtransport/test/fixtures/random-bytes.ts b/packages/transport-webtransport/test/fixtures/random-bytes.ts new file mode 100644 index 0000000000..c34a09a5b4 --- /dev/null +++ b/packages/transport-webtransport/test/fixtures/random-bytes.ts @@ -0,0 +1,12 @@ +import { CodeError } from '@libp2p/interface' +import { randomBytes as randB } from '@noble/hashes/utils' + +/** + * Generates a Uint8Array with length `number` populated by random bytes + */ +export function randomBytes (length: number): Uint8Array { + if (isNaN(length) || length <= 0) { + throw new CodeError('random bytes length must be a Number bigger than 0', 'ERR_INVALID_LENGTH') + } + return randB(length) +}