diff --git a/benchmark/streams/readable-bigread.js b/benchmark/streams/readable-bigread.js index 0d963c6803299e..8b36f49d7ffa53 100644 --- a/benchmark/streams/readable-bigread.js +++ b/benchmark/streams/readable-bigread.js @@ -15,7 +15,7 @@ function main({ n }) { bench.start(); for (let k = 0; k < n; ++k) { - for (let i = 0; i < 1e4; ++i) + for (let i = 0; i < 1e3; ++i) s.push(b); while (s.read(128)); } diff --git a/benchmark/streams/readable-readall.js b/benchmark/streams/readable-readall.js index b62b1d05b13876..c1a015a4d02716 100644 --- a/benchmark/streams/readable-readall.js +++ b/benchmark/streams/readable-readall.js @@ -15,7 +15,7 @@ function main({ n }) { bench.start(); for (let k = 0; k < n; ++k) { - for (let i = 0; i < 1e4; ++i) + for (let i = 0; i < 1e3; ++i) s.push(b); while (s.read()); } diff --git a/lib/internal/streams/buffer_list.js b/lib/internal/streams/buffer_list.js deleted file mode 100644 index 4629bdbf165e4f..00000000000000 --- a/lib/internal/streams/buffer_list.js +++ /dev/null @@ -1,181 +0,0 @@ -'use strict'; - -const { - StringPrototypeSlice, - SymbolIterator, - TypedArrayPrototypeSet, - Uint8Array, -} = primordials; - -const { Buffer } = require('buffer'); -const { inspect } = require('internal/util/inspect'); - -module.exports = class BufferList { - constructor() { - this.head = null; - this.tail = null; - this.length = 0; - } - - push(v) { - const entry = { data: v, next: null }; - if (this.length > 0) - this.tail.next = entry; - else - this.head = entry; - this.tail = entry; - ++this.length; - } - - unshift(v) { - const entry = { data: v, next: this.head }; - if (this.length === 0) - this.tail = entry; - this.head = entry; - ++this.length; - } - - shift() { - if (this.length === 0) - return; - const ret = this.head.data; - if (this.length === 1) - this.head = this.tail = null; - else - this.head = this.head.next; - --this.length; - return ret; - } - - clear() { - this.head = this.tail = null; - this.length = 0; - } - - join(s) { - if (this.length === 0) - return ''; - let p = this.head; - let ret = '' + p.data; - while ((p = p.next) !== null) - ret += s + p.data; - return ret; - } - - concat(n) { - if (this.length === 0) - return Buffer.alloc(0); - const ret = Buffer.allocUnsafe(n >>> 0); - let p = this.head; - let i = 0; - while (p) { - TypedArrayPrototypeSet(ret, p.data, i); - i += p.data.length; - p = p.next; - } - return ret; - } - - // Consumes a specified amount of bytes or characters from the buffered data. - consume(n, hasStrings) { - const data = this.head.data; - if (n < data.length) { - // `slice` is the same for buffers and strings. - const slice = data.slice(0, n); - this.head.data = data.slice(n); - return slice; - } - if (n === data.length) { - // First chunk is a perfect match. - return this.shift(); - } - // Result spans more than one buffer. - return hasStrings ? this._getString(n) : this._getBuffer(n); - } - - first() { - return this.head.data; - } - - *[SymbolIterator]() { - for (let p = this.head; p; p = p.next) { - yield p.data; - } - } - - // Consumes a specified amount of characters from the buffered data. - _getString(n) { - let ret = ''; - let p = this.head; - let c = 0; - do { - const str = p.data; - if (n > str.length) { - ret += str; - n -= str.length; - } else { - if (n === str.length) { - ret += str; - ++c; - if (p.next) - this.head = p.next; - else - this.head = this.tail = null; - } else { - ret += StringPrototypeSlice(str, 0, n); - this.head = p; - p.data = StringPrototypeSlice(str, n); - } - break; - } - ++c; - } while ((p = p.next) !== null); - this.length -= c; - return ret; - } - - // Consumes a specified amount of bytes from the buffered data. - _getBuffer(n) { - const ret = Buffer.allocUnsafe(n); - const retLen = n; - let p = this.head; - let c = 0; - do { - const buf = p.data; - if (n > buf.length) { - TypedArrayPrototypeSet(ret, buf, retLen - n); - n -= buf.length; - } else { - if (n === buf.length) { - TypedArrayPrototypeSet(ret, buf, retLen - n); - ++c; - if (p.next) - this.head = p.next; - else - this.head = this.tail = null; - } else { - TypedArrayPrototypeSet(ret, - new Uint8Array(buf.buffer, buf.byteOffset, n), - retLen - n); - this.head = p; - p.data = buf.slice(n); - } - break; - } - ++c; - } while ((p = p.next) !== null); - this.length -= c; - return ret; - } - - // Make sure the linked list only shows the minimal necessary information. - [inspect.custom](_, options) { - return inspect(this, { - ...options, - // Only inspect one level. - depth: 0, - // It should not recurse. - customInspect: false, - }); - } -}; diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index a129b1b6f4b75d..3735f264302652 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -31,9 +31,11 @@ const { ObjectSetPrototypeOf, Promise, SafeSet, + Symbol, SymbolAsyncDispose, SymbolAsyncIterator, - Symbol, + SymbolSpecies, + TypedArrayPrototypeSet, } = primordials; module.exports = Readable; @@ -51,7 +53,6 @@ const eos = require('internal/streams/end-of-stream'); let debug = require('internal/util/debuglog').debuglog('stream', (fn) => { debug = fn; }); -const BufferList = require('internal/streams/buffer_list'); const destroyImpl = require('internal/streams/destroy'); const { getHighWaterMark, @@ -73,6 +74,7 @@ const { const { validateObject } = require('internal/validators'); const kState = Symbol('kState'); +const FastBuffer = Buffer[SymbolSpecies]; const { StringDecoder } = require('string_decoder'); const from = require('internal/streams/from'); @@ -275,10 +277,8 @@ function ReadableState(options, stream, isDuplex) { getHighWaterMark(this, options, 'readableHighWaterMark', isDuplex) : getDefaultHighWaterMark(false); - // A linked list is used to store data chunks instead of an array because the - // linked list can remove elements from the beginning faster than - // array.shift(). - this.buffer = new BufferList(); + this.buffer = []; + this.bufferIndex = 0; this.length = 0; this.pipes = []; @@ -546,10 +546,15 @@ function addChunk(stream, state, chunk, addToFront) { } else { // Update the buffer info. state.length += (state[kState] & kObjectMode) !== 0 ? 1 : chunk.length; - if (addToFront) - state.buffer.unshift(chunk); - else + if (addToFront) { + if (state.bufferIndex > 0) { + state.buffer[--state.bufferIndex] = chunk; + } else { + state.buffer.unshift(chunk); // Slow path + } + } else { state.buffer.push(chunk); + } if ((state[kState] & kNeedReadable) !== 0) emitReadable(stream); @@ -564,21 +569,24 @@ Readable.prototype.isPaused = function() { // Backwards compatibility. Readable.prototype.setEncoding = function(enc) { + const state = this._readableState; + const decoder = new StringDecoder(enc); - this._readableState.decoder = decoder; + state.decoder = decoder; // If setEncoding(null), decoder.encoding equals utf8. - this._readableState.encoding = this._readableState.decoder.encoding; + state.encoding = state.decoder.encoding; - const buffer = this._readableState.buffer; // Iterate over current buffer to convert already stored Buffers: let content = ''; - for (const data of buffer) { + for (const data of state.buffer.slice(state.bufferIndex)) { content += decoder.write(data); } - buffer.clear(); + state.buffer.length = 0; + state.bufferIndex = 0; + if (content !== '') - buffer.push(content); - this._readableState.length = content.length; + state.buffer.push(content); + state.length = content.length; return this; }; @@ -611,7 +619,7 @@ function howMuchToRead(n, state) { if (NumberIsNaN(n)) { // Only flow one buffer at a time. if ((state[kState] & kFlowing) !== 0 && state.length) - return state.buffer.first().length; + return state.buffer[state.bufferIndex].length; return state.length; } if (n <= state.length) @@ -1549,21 +1557,96 @@ function fromList(n, state) { if (state.length === 0) return null; + let idx = state.bufferIndex; let ret; - if (state.objectMode) - ret = state.buffer.shift(); - else if (!n || n >= state.length) { + + const buf = state.buffer; + const len = buf.length; + + if ((state[kState] & kObjectMode) !== 0) { + ret = buf[idx]; + buf[idx++] = null; + } else if (!n || n >= state.length) { // Read it all, truncate the list. - if (state.decoder) - ret = state.buffer.join(''); - else if (state.buffer.length === 1) - ret = state.buffer.first(); - else - ret = state.buffer.concat(state.length); - state.buffer.clear(); + if ((state[kState] & kDecoder) !== 0) { + ret = ''; + while (idx < len) { + ret += buf[idx]; + buf[idx++] = null; + } + } else if (len - idx === 0) { + ret = Buffer.alloc(0); + } else if (len - idx === 1) { + ret = buf[idx]; + buf[idx++] = null; + } else { + ret = Buffer.allocUnsafe(state.length); + + let i = 0; + while (idx < len) { + TypedArrayPrototypeSet(ret, buf[idx], i); + i += buf[idx].length; + buf[idx++] = null; + } + } + } else if (n < buf[idx].length) { + // `slice` is the same for buffers and strings. + ret = buf[idx].slice(0, n); + buf[idx] = buf[idx].slice(n); + } else if (n === buf[idx].length) { + // First chunk is a perfect match. + ret = buf[idx]; + buf[idx++] = null; + } else if ((state[kState] & kDecoder) !== 0) { + ret = ''; + while (idx < len) { + const str = buf[idx]; + if (n > str.length) { + ret += str; + n -= str.length; + buf[idx++] = null; + } else { + if (n === buf.length) { + ret += str; + buf[idx++] = null; + } else { + ret += str.slice(0, n); + buf[idx] = str.slice(n); + } + break; + } + } + } else { + ret = Buffer.allocUnsafe(n); + + const retLen = n; + while (idx < len) { + const data = buf[idx]; + if (n > data.length) { + TypedArrayPrototypeSet(ret, data, retLen - n); + n -= data.length; + buf[idx++] = null; + } else { + if (n === data.length) { + TypedArrayPrototypeSet(ret, data, retLen - n); + buf[idx++] = null; + } else { + TypedArrayPrototypeSet(ret, new FastBuffer(data.buffer, data.byteOffset, n), retLen - n); + buf[idx] = new FastBuffer(data.buffer, data.byteOffset + n, data.length - n); + } + break; + } + } + } + + if (idx === len) { + state.buffer.length = 0; + state.bufferIndex = 0; + } else if (idx > 1024) { + state.buffer.splice(0, idx); + state.bufferIndex = 0; } else { - // read part of list. - ret = state.buffer.consume(n, state.decoder); + state.bufferIndex = idx; } return ret; diff --git a/test/parallel/test-bootstrap-modules.js b/test/parallel/test-bootstrap-modules.js index 78db466a95b38e..e4561d746606ed 100644 --- a/test/parallel/test-bootstrap-modules.js +++ b/test/parallel/test-bootstrap-modules.js @@ -107,7 +107,6 @@ if (common.isMainThread) { 'NativeModule internal/perf/event_loop_utilization', 'NativeModule internal/process/worker_thread_only', 'NativeModule internal/streams/add-abort-signal', - 'NativeModule internal/streams/buffer_list', 'NativeModule internal/streams/compose', 'NativeModule internal/streams/destroy', 'NativeModule internal/streams/duplex', diff --git a/test/parallel/test-stream-buffer-list.js b/test/parallel/test-stream-buffer-list.js deleted file mode 100644 index d9d0405f4d5a45..00000000000000 --- a/test/parallel/test-stream-buffer-list.js +++ /dev/null @@ -1,84 +0,0 @@ -// Flags: --expose-internals -'use strict'; -require('../common'); -const assert = require('assert'); -const BufferList = require('internal/streams/buffer_list'); - -// Test empty buffer list. -const emptyList = new BufferList(); - -emptyList.shift(); -assert.deepStrictEqual(emptyList, new BufferList()); - -assert.strictEqual(emptyList.join(','), ''); - -assert.deepStrictEqual(emptyList.concat(0), Buffer.alloc(0)); - -const buf = Buffer.from('foo'); - -function testIterator(list, count) { - // test iterator - let len = 0; - // eslint-disable-next-line no-unused-vars - for (const x of list) { - len++; - } - assert.strictEqual(len, count); -} - -// Test buffer list with one element. -const list = new BufferList(); -testIterator(list, 0); - -list.push(buf); -testIterator(list, 1); -for (const x of list) { - assert.strictEqual(x, buf); -} - -const copy = list.concat(3); -testIterator(copy, 3); - -assert.notStrictEqual(copy, buf); -assert.deepStrictEqual(copy, buf); - -assert.strictEqual(list.join(','), 'foo'); - -const shifted = list.shift(); -testIterator(list, 0); -assert.strictEqual(shifted, buf); -assert.deepStrictEqual(list, new BufferList()); - -{ - const list = new BufferList(); - list.push('foo'); - list.push('bar'); - list.push('foo'); - list.push('bar'); - assert.strictEqual(list.consume(6, true), 'foobar'); - assert.strictEqual(list.consume(6, true), 'foobar'); -} - -{ - const list = new BufferList(); - list.push('foo'); - list.push('bar'); - assert.strictEqual(list.consume(5, true), 'fooba'); -} - -{ - const list = new BufferList(); - list.push(buf); - list.push(buf); - list.push(buf); - list.push(buf); - assert.strictEqual(list.consume(6).toString(), 'foofoo'); - assert.strictEqual(list.consume(6).toString(), 'foofoo'); -} - -{ - const list = new BufferList(); - list.push(buf); - list.push(buf); - assert.strictEqual(list.consume(5).toString(), 'foofo'); -} diff --git a/test/parallel/test-stream2-readable-from-list.js b/test/parallel/test-stream2-readable-from-list.js deleted file mode 100644 index d5d113304e4925..00000000000000 --- a/test/parallel/test-stream2-readable-from-list.js +++ /dev/null @@ -1,101 +0,0 @@ -// Copyright Joyent, Inc. and other Node contributors. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the -// "Software"), to deal in the Software without restriction, including -// without limitation the rights to use, copy, modify, merge, publish, -// distribute, sublicense, and/or sell copies of the Software, and to permit -// persons to whom the Software is furnished to do so, subject to the -// following conditions: -// -// The above copyright notice and this permission notice shall be included -// in all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF -// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN -// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, -// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR -// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE -// USE OR OTHER DEALINGS IN THE SOFTWARE. - -// Flags: --expose-internals -'use strict'; -require('../common'); -const assert = require('assert'); -const fromList = require('stream').Readable._fromList; -const BufferList = require('internal/streams/buffer_list'); -const util = require('util'); - -function bufferListFromArray(arr) { - const bl = new BufferList(); - for (let i = 0; i < arr.length; ++i) - bl.push(arr[i]); - return bl; -} - -{ - // Verify behavior with buffers - let list = [ Buffer.from('foog'), - Buffer.from('bark'), - Buffer.from('bazy'), - Buffer.from('kuel') ]; - list = bufferListFromArray(list); - - assert.strictEqual( - util.inspect([ list ], { compact: false }), - `[ - BufferList { - head: [Object], - tail: [Object], - length: 4 - } -]`); - - // Read more than the first element. - let ret = fromList(6, { buffer: list, length: 16 }); - assert.strictEqual(ret.toString(), 'foogba'); - - // Read exactly the first element. - ret = fromList(2, { buffer: list, length: 10 }); - assert.strictEqual(ret.toString(), 'rk'); - - // Read less than the first element. - ret = fromList(2, { buffer: list, length: 8 }); - assert.strictEqual(ret.toString(), 'ba'); - - // Read more than we have. - ret = fromList(100, { buffer: list, length: 6 }); - assert.strictEqual(ret.toString(), 'zykuel'); - - // all consumed. - assert.deepStrictEqual(list, new BufferList()); -} - -{ - // Verify behavior with strings - let list = [ 'foog', - 'bark', - 'bazy', - 'kuel' ]; - list = bufferListFromArray(list); - - // Read more than the first element. - let ret = fromList(6, { buffer: list, length: 16, decoder: true }); - assert.strictEqual(ret, 'foogba'); - - // Read exactly the first element. - ret = fromList(2, { buffer: list, length: 10, decoder: true }); - assert.strictEqual(ret, 'rk'); - - // Read less than the first element. - ret = fromList(2, { buffer: list, length: 8, decoder: true }); - assert.strictEqual(ret, 'ba'); - - // Read more than we have. - ret = fromList(100, { buffer: list, length: 6, decoder: true }); - assert.strictEqual(ret, 'zykuel'); - - // all consumed. - assert.deepStrictEqual(list, new BufferList()); -}