Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

📦 node-stream-test: transfer #42

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"packages/mock/*",
"packages/move-path",
"packages/nextools/*",
"packages/node-stream-test",
"packages/perfa",
"packages/piall",
"packages/pifs",
Expand Down
22 changes: 22 additions & 0 deletions packages/node-stream-test/license.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# The MIT License (MIT)

* Copyright (c) 2018-2020 Alex Feinstein
* Copyright (c) 2020-present NexTools

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
23 changes: 23 additions & 0 deletions packages/node-stream-test/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"name": "node-stream-test",
"version": "0.3.11",
"main": "src/index.ts",
"repository": "nextools/metarepo",
"license": "MIT",
"engines": {
"node": ">=10.13.0"
},
"sideEffects": false,
"author": "psxcode <[email protected]> (https://github.com/psxcode)",
"dependencies": {
"@psxcode/wait": "^0.1.1",
"iterama": "^0.2.0"
},
"devDependencies": {
"@types/debug": "^4.1.5",
"@types/tape": "^4.2.34",
"debug": "^4.1.1",
"tape": "^5.0.0-next.5",
"test-fn": "^0.1.3"
}
}
165 changes: 165 additions & 0 deletions packages/node-stream-test/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
# Node Stream Test

## Install
```
npm install node-stream-test
```

## Usage

### `readable`
creates test `Readable` stream, simulating `sync`/`async` behaviors
`(options: MakeReadableOptions) => (readableOptions: ReadableOptions) => (iterable: Iterable<any>) => Readable`
```ts
type MakeReadableOptions = {
eager: boolean // lazy or eager stream behavior
log?: typeof console.log // provide debug logger or noop
delayMs?: number // simulate async stream behavior
errorAtStep?: number // emit 'error' event at certain step
continueOnError?: boolean // whether should stream continue on error or break
}
```
> Lazy stream pushes one `chunk` of data on every `read`.
Eager stream pushes all `chunks` in a synchronous loop on `read`.

> `delayMs` is a delay between `read` call and actual `chunk` push.
This simulates asynchronous stream behavior.
If the stream is `eager`, it will push all `chunks` in a loop after first delay
```ts
import { readable } from 'node-stream-test'

// create test-readable stream
const testReadable = readable({
log: console.log, // output debug info to console
delayMs: 10, // delay 10ms
eager: false // eager or lazy stream
})({
objectMode: true // provide Node Readable configuration
})(
[1, 2, 3, 4, 5] // provide data to stream
)

// subscribe to test-readable
testReadable
.on('data', () => {})
.on('end', () => {})
```

### `writable`
creates test `Writable` stream, simulating `sync`/`async` behaviors
`(options: MakeWritableOptions) => (writableOptions: WritableOptions) => (sink: (chunk: any) => void) => Writable`
```ts
type MakeWritableOptions = {
log: typeof console.log, // provide debug logger or noop
delayMs?: number // simulate async
errorAtStep?: number // emit 'error' event at certain step
}
```
> `delayMs` is a delay between `write` call and passing `chunk` to a sink.
This simulates long async writes.
```ts
import { writable } from 'node-stream-test'

// We have the following stream
declare var stream: ReadableStream

const testWritable = writable({
log: console.log, // output debug info to console
delayMs: 10 // delay 10ms
})({
objectMode: true // provide Node Writable configuration
})

// pipe the stream into test-writable
stream.pipe(
stream,
testWritable
).on('data', () => {})
.on('end', () => {})
```

### `producer`
writes `chunks` to a stream
`(options: ProducerOptions) => (iterable: Iterable<any>) => (stream: WritableStream) => () => void`
```ts
type ProducerOptions = {
log: typeof console.log, // provide debug logger or noop
eager: boolean // eager or lazy producer
}
```
> `eager` producer writes `chunks` in a synchronous loop until `highWatermark` reached.
`lazy` producer writes one `chunk` on `drain` event.
```ts
import { producer } from 'node-stream-test'

// We have the following writable stream
declare var stream: WritableStream

// create a producer
const beginProduce = producer({
log: console.log, // output debug info to console
eager: true // eager producer
})(
[1, 2, 3, 4, 5], // data to write
0 // write all data
)(
stream // write to this stream
)
```

### `push-consumer`
simple `on('data')` consumer with logging
`(options: DataConsumerOptions) => (sink: (chunk: any) => void) => (stream: ReadableStream) => () => void`
```ts
type PushConsumerOptions = {
log: typeof console.log // provide debug logger or noop
}
```
```ts
import { pushConsumer } from 'node-stream-test'

// We have the following stream
declare var stream: ReadableStream

pushConsumer({
log: console.log // output debug info to console
})(
(chunk: string) => {} // your callback on every `data` event
)(
stream, // stream to consume
)
```

### `pull-consumer`
simple `on('readable')` consumer with `sync/async` behavior and logging
`(options: ReadableConsumerOptions) => (sink: (chunk: any) => void) => (stream: ReadableStream) => () => void`
```ts
type PullConsumerOptions = {
log: typeof console.log, // provide debug logger or noop
delayMs?: number, // simulate async
eager?: boolean, // eager or lazy behavior
readSize?: number // how much data to read on each 'readable' event
}
```
> `delayMs` is a time between `readable` event and actual `read` call on stream.

> `eager` consumer calls `read` in synchronous loop until `null` returned.
Then waits for the next `readable`.
`lazy` consumer reads one `chunk`, then waits.
```ts
import { pullConsumer } from 'node-stream-test'

// We have the following stream
declare var stream: ReadableStream

pullConsumer({
log: console.log, // print debug info to console
delayMs: 10, // delay 10ms
eager: false, // lazy behavior
readSize: undefined // read all available data
})(
(chunk: string) => {} // your callback on `read` call, after `readable` event
)(
stream, // stream to consume
)
```
5 changes: 5 additions & 0 deletions packages/node-stream-test/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export { readable } from './readable'
export { writable } from './writable'
export { pushConsumer } from './push-consumer'
export { pullConsumer } from './pull-consumer'
export { producer } from './producer'
90 changes: 90 additions & 0 deletions packages/node-stream-test/src/producer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import { iterate } from 'iterama'
import { noop } from './utils'

export type ProducerOptions = {
log?: typeof console.log,
eager: boolean,
continueOnError?: boolean,
}

export const producer = ({ eager, log = noop, continueOnError = false }: ProducerOptions) =>
(iterable: Iterable<any>) => (stream: NodeJS.WritableStream) => {
const it = iterate(iterable)
let done = false
let i = 0
const writeChunk = (cb?: () => void): boolean => {
let iteratorResult: IteratorResult<any>

if (done || (iteratorResult = it.next()).done) {
log('ending %d', i)
stream.end()

return false
}

log('writing to stream %d', i++)

const isOk = stream.write(iteratorResult.value, cb)

if (!isOk) {
log('backpressure at %d', i)
}

return isOk
}
const eagerWriter = () => {
log('eager writing begin at %d', i)

// eslint-disable-next-line no-empty
while (writeChunk()) {}

log('eager writing done at %d', i)
}
const lazyWriter = (err?: Error) => {
if (err) {
log('callback got error at %d: %s', i, err && err.message)

if (!continueOnError) {
// eslint-disable-next-line no-use-before-define
unsubscribe()
}
}

log('lazy writing begin at %d', i)
writeChunk(lazyWriter)
log('lazy writing done at %d', i)
}
const onDrainEvent = () => {
log('received \'drain\' event %d', i)
eager ? eagerWriter() : lazyWriter()
}
const onErrorEvent = (err?: Error) => {
log('received \'error\' event %d: %s', i, err && err.message)

if (!continueOnError) {
// eslint-disable-next-line no-use-before-define
unsubscribe()
}
}
const unsubscribe = () => {
log('unsubscribe')
done = true
stream.removeListener('drain', onDrainEvent)
stream.removeListener('finish', unsubscribe)
stream.removeListener('error', onErrorEvent)
}

return () => {
/* subscribe */
log('producer subscribe')
stream.on('drain', onDrainEvent)
stream.on('finish', unsubscribe)
stream.on('error', onErrorEvent)
/* drain event could already be emitted, try to write once */

setImmediate(eager ? eagerWriter : lazyWriter)

return unsubscribe
}
}

Loading