diff --git a/src/node/utils/Stream.js b/src/node/utils/Stream.js new file mode 100644 index 000000000..611b83b33 --- /dev/null +++ b/src/node/utils/Stream.js @@ -0,0 +1,134 @@ +'use strict'; + +/** + * Wrapper around any iterable that adds convenience methods that standard JavaScript iterable + * objects lack. + */ +class Stream { + /** + * @returns {Stream} A Stream that yields values in the half-open range [start, end). + */ + static range(start, end) { + return new Stream((function* () { for (let i = start; i < end; ++i) yield i; })()); + } + + /** + * @param {Iterable} values - Any iterable of values. + */ + constructor(values) { + this._iter = values[Symbol.iterator](); + this._next = null; + } + + /** + * Read values a chunk at a time from the underlying iterable. Once a full batch is read (or there + * aren't enough values to make a full batch), all of the batch's values are yielded before the + * next batch is read. + * + * This is useful for triggering groups of asynchronous tasks via Promises yielded from a + * synchronous generator. A for-await-of (or for-of with an await) loop consumes those Promises + * and automatically triggers the next batch of tasks when needed. For example: + * + * const resources = (function* () { + * for (let i = 0; i < 100; ++i) yield fetchResource(i); + * }).call(this); + * + * // Fetch 10 items at a time so that the fetch engine can bundle multiple requests into a + * // single query message. + * for await (const r of new Stream(resources).batch(10)) { + * processResource(r); + * } + * + * Chaining .buffer() after .batch() like stream.batch(n).buffer(m) will fetch in batches of n as + * needed to ensure that at least m are in flight at all times. + * + * Any Promise yielded by the underlying iterable has its rejection suppressed to prevent + * unhandled rejection errors while the Promise is sitting in the batch waiting to be yielded. It + * is assumed that the consumer of any yielded Promises will await the Promise (or call .catch() + * or .then()) to prevent the rejection from going unnoticed. If iteration is aborted early, any + * Promises read from the underlying iterable that have not yet been yielded will have their + * rejections un-suppressed to trigger unhandled rejection errors. + * + * @param {number} size - The number of values to read at a time. + * @returns {Stream} A new Stream that gets its values from this Stream. + */ + batch(size) { + return new Stream((function* () { + const b = []; + try { + for (const v of this) { + Promise.resolve(v).catch(() => {}); // Suppress unhandled rejection errors. + b.push(v); + if (b.length < size) continue; + while (b.length) yield b.shift(); + } + while (b.length) yield b.shift(); + } finally { + for (const v of b) Promise.resolve(v).then(() => {}); // Un-suppress unhandled rejections. + } + }).call(this)); + } + + /** + * Pre-fetch a certain number of values from the underlying iterable before yielding the first + * value. Each time a value is yielded (consumed from the buffer), another value is read from the + * underlying iterable and added to the buffer. + * + * This is useful for maintaining a constant number of in-flight asynchronous tasks via Promises + * yielded from a synchronous generator. A for-await-of (or for-of with an await) loop should be + * used to control the scheduling of the next task. For example: + * + * const resources = (function* () { + * for (let i = 0; i < 100; ++i) yield fetchResource(i); + * }).call(this); + * + * // Fetching a resource is high latency, so keep multiple in flight at all times until done. + * for await (const r of new Stream(resources).buffer(10)) { + * processResource(r); + * } + * + * Chaining after .batch() like stream.batch(n).buffer(m) will fetch in batches of n as needed to + * ensure that at least m are in flight at all times. + * + * Any Promise yielded by the underlying iterable has its rejection suppressed to prevent + * unhandled rejection errors while the Promise is sitting in the batch waiting to be yielded. It + * is assumed that the consumer of any yielded Promises will await the Promise (or call .catch() + * or .then()) to prevent the rejection from going unnoticed. If iteration is aborted early, any + * Promises read from the underlying iterable that have not yet been yielded will have their + * rejections un-suppressed to trigger unhandled rejection errors. + * + * @param {number} capacity - The number of values to keep buffered. + * @returns {Stream} A new Stream that gets its values from this Stream. + */ + buffer(capacity) { + return new Stream((function* () { + const b = []; + try { + for (const v of this) { + Promise.resolve(v).catch(() => {}); // Suppress unhandled rejection errors. + // Note: V8 has good Array push+shift optimization. + while (b.length >= capacity) yield b.shift(); + b.push(v); + } + while (b.length) yield b.shift(); + } finally { + for (const v of b) Promise.resolve(v).then(() => {}); // Un-suppress unhandled rejections. + } + }).call(this)); + } + + /** + * Like Array.map(). + * + * @param {(v: any) => any} fn - Value transformation function. + * @returns {Stream} A new Stream that yields this Stream's values, transformed by `fn`. + */ + map(fn) { return new Stream((function* () { for (const v of this) yield fn(v); }).call(this)); } + + /** + * Implements the JavaScript iterable protocol. + */ + [Symbol.iterator]() { return this._iter; } +} + +module.exports = Stream; diff --git a/src/tests/backend/specs/Stream.js b/src/tests/backend/specs/Stream.js new file mode 100644 index 000000000..b98a5f3b4 --- /dev/null +++ b/src/tests/backend/specs/Stream.js @@ -0,0 +1,358 @@ +'use strict'; + +const Stream = require('../../../node/utils/Stream'); +const assert = require('assert').strict; + +class DemoIterable { + constructor() { + this.value = 0; + this.errs = []; + this.rets = []; + } + + completed() { return this.errs.length > 0 || this.rets.length > 0; } + + next() { + if (this.completed()) return {value: undefined, done: true}; // Mimic standard generators. + return {value: this.value++, done: false}; + } + + throw(err) { + const alreadyCompleted = this.completed(); + this.errs.push(err); + if (alreadyCompleted) throw err; // Mimic standard generator objects. + throw err; + } + + return(ret) { + const alreadyCompleted = this.completed(); + this.rets.push(ret); + if (alreadyCompleted) return {value: ret, done: true}; // Mimic standard generator objects. + return {value: ret, done: true}; + } + + [Symbol.iterator]() { return this; } +} + +const assertUnhandledRejection = async (action, want) => { + // Temporarily remove unhandled Promise rejection listeners so that the unhandled rejections we + // expect to see don't trigger a test failure (or terminate node). + const event = 'unhandledRejection'; + const listenersBackup = process.rawListeners(event); + process.removeAllListeners(event); + let tempListener; + let asyncErr; + try { + const seenErrPromise = new Promise((resolve) => { + tempListener = (err) => { + assert.equal(asyncErr, undefined); + asyncErr = err; + resolve(); + }; + }); + process.on(event, tempListener); + await action(); + await seenErrPromise; + } finally { + // Restore the original listeners. + process.off(event, tempListener); + for (const listener of listenersBackup) process.on(event, listener); + } + await assert.rejects(Promise.reject(asyncErr), want); +}; + +describe(__filename, function () { + describe('basic behavior', function () { + it('takes a generator', async function () { + assert.deepEqual([...new Stream((function* () { yield 0; yield 1; yield 2; })())], [0, 1, 2]); + }); + + it('takes an array', async function () { + assert.deepEqual([...new Stream([0, 1, 2])], [0, 1, 2]); + }); + + it('takes an iterator', async function () { + assert.deepEqual([...new Stream([0, 1, 2][Symbol.iterator]())], [0, 1, 2]); + }); + + it('supports empty iterators', async function () { + assert.deepEqual([...new Stream([])], []); + }); + + it('is resumable', async function () { + const s = new Stream((function* () { yield 0; yield 1; yield 2; })()); + let iter = s[Symbol.iterator](); + assert.deepEqual(iter.next(), {value: 0, done: false}); + iter = s[Symbol.iterator](); + assert.deepEqual(iter.next(), {value: 1, done: false}); + assert.deepEqual([...s], [2]); + }); + + it('supports return value', async function () { + const s = new Stream((function* () { yield 0; return 1; })()); + const iter = s[Symbol.iterator](); + assert.deepEqual(iter.next(), {value: 0, done: false}); + assert.deepEqual(iter.next(), {value: 1, done: true}); + }); + + it('does not start until needed', async function () { + let lastYield = null; + new Stream((function* () { yield lastYield = 0; })()); + // Fetching from the underlying iterator should not start until the first value is fetched + // from the stream. + assert.equal(lastYield, null); + }); + + it('throw is propagated', async function () { + const underlying = new DemoIterable(); + const s = new Stream(underlying); + const iter = s[Symbol.iterator](); + assert.deepEqual(iter.next(), {value: 0, done: false}); + const err = new Error('injected'); + assert.throws(() => iter.throw(err), err); + assert.equal(underlying.errs[0], err); + }); + + it('return is propagated', async function () { + const underlying = new DemoIterable(); + const s = new Stream(underlying); + const iter = s[Symbol.iterator](); + assert.deepEqual(iter.next(), {value: 0, done: false}); + assert.deepEqual(iter.return(42), {value: 42, done: true}); + assert.equal(underlying.rets[0], 42); + }); + }); + + describe('range', function () { + it('basic', async function () { + assert.deepEqual([...Stream.range(0, 3)], [0, 1, 2]); + }); + + it('empty', async function () { + assert.deepEqual([...Stream.range(0, 0)], []); + }); + + it('positive start', async function () { + assert.deepEqual([...Stream.range(3, 5)], [3, 4]); + }); + + it('negative start', async function () { + assert.deepEqual([...Stream.range(-3, 0)], [-3, -2, -1]); + }); + + it('end before start', async function () { + assert.deepEqual([...Stream.range(3, 0)], []); + }); + }); + + describe('batch', function () { + it('empty', async function () { + assert.deepEqual([...new Stream([]).batch(10)], []); + }); + + it('does not start until needed', async function () { + let lastYield = null; + new Stream((function* () { yield lastYield = 0; })()).batch(10); + assert.equal(lastYield, null); + }); + + it('fewer than batch size', async function () { + let lastYield = null; + const values = (function* () { + for (let i = 0; i < 5; i++) yield lastYield = i; + })(); + const s = new Stream(values).batch(10); + assert.equal(lastYield, null); + assert.deepEqual(s[Symbol.iterator]().next(), {value: 0, done: false}); + assert.equal(lastYield, 4); + assert.deepEqual([...s], [1, 2, 3, 4]); + assert.equal(lastYield, 4); + }); + + it('exactly batch size', async function () { + let lastYield = null; + const values = (function* () { + for (let i = 0; i < 5; i++) yield lastYield = i; + })(); + const s = new Stream(values).batch(5); + assert.equal(lastYield, null); + assert.deepEqual(s[Symbol.iterator]().next(), {value: 0, done: false}); + assert.equal(lastYield, 4); + assert.deepEqual([...s], [1, 2, 3, 4]); + assert.equal(lastYield, 4); + }); + + it('multiple batches, last batch is not full', async function () { + let lastYield = null; + const values = (function* () { + for (let i = 0; i < 10; i++) yield lastYield = i; + })(); + const s = new Stream(values).batch(3); + assert.equal(lastYield, null); + const iter = s[Symbol.iterator](); + assert.deepEqual(iter.next(), {value: 0, done: false}); + assert.equal(lastYield, 2); + assert.deepEqual(iter.next(), {value: 1, done: false}); + assert.deepEqual(iter.next(), {value: 2, done: false}); + assert.equal(lastYield, 2); + assert.deepEqual(iter.next(), {value: 3, done: false}); + assert.equal(lastYield, 5); + assert.deepEqual([...s], [4, 5, 6, 7, 8, 9]); + assert.equal(lastYield, 9); + }); + + it('batched Promise rejections are suppressed while iterating', async function () { + let lastYield = null; + const err = new Error('injected'); + const values = (function* () { + lastYield = 'promise of 0'; + yield new Promise((resolve) => setTimeout(() => resolve(0), 100)); + lastYield = 'rejected Promise'; + yield Promise.reject(err); + lastYield = 'promise of 2'; + yield Promise.resolve(2); + })(); + const s = new Stream(values).batch(3); + const iter = s[Symbol.iterator](); + const nextp = iter.next().value; + assert.equal(lastYield, 'promise of 2'); + assert.equal(await nextp, 0); + await assert.rejects(iter.next().value, err); + iter.return(); + }); + + it('batched Promise rejections are unsuppressed when iteration completes', async function () { + let lastYield = null; + const err = new Error('injected'); + const values = (function* () { + lastYield = 'promise of 0'; + yield new Promise((resolve) => setTimeout(() => resolve(0), 100)); + lastYield = 'rejected Promise'; + yield Promise.reject(err); + lastYield = 'promise of 2'; + yield Promise.resolve(2); + })(); + const s = new Stream(values).batch(3); + const iter = s[Symbol.iterator](); + assert.equal(await iter.next().value, 0); + assert.equal(lastYield, 'promise of 2'); + await assertUnhandledRejection(() => iter.return(), err); + }); + }); + + describe('buffer', function () { + it('empty', async function () { + assert.deepEqual([...new Stream([]).buffer(10)], []); + }); + + it('does not start until needed', async function () { + let lastYield = null; + new Stream((function* () { yield lastYield = 0; })()).buffer(10); + assert.equal(lastYield, null); + }); + + it('fewer than buffer size', async function () { + let lastYield = null; + const values = (function* () { + for (let i = 0; i < 5; i++) yield lastYield = i; + })(); + const s = new Stream(values).buffer(10); + assert.equal(lastYield, null); + assert.deepEqual(s[Symbol.iterator]().next(), {value: 0, done: false}); + assert.equal(lastYield, 4); + assert.deepEqual([...s], [1, 2, 3, 4]); + assert.equal(lastYield, 4); + }); + + it('exactly buffer size', async function () { + let lastYield = null; + const values = (function* () { + for (let i = 0; i < 5; i++) yield lastYield = i; + })(); + const s = new Stream(values).buffer(5); + assert.equal(lastYield, null); + assert.deepEqual(s[Symbol.iterator]().next(), {value: 0, done: false}); + assert.equal(lastYield, 4); + assert.deepEqual([...s], [1, 2, 3, 4]); + assert.equal(lastYield, 4); + }); + + it('more than buffer size', async function () { + let lastYield = null; + const values = (function* () { + for (let i = 0; i < 10; i++) yield lastYield = i; + })(); + const s = new Stream(values).buffer(3); + assert.equal(lastYield, null); + const iter = s[Symbol.iterator](); + assert.deepEqual(iter.next(), {value: 0, done: false}); + assert.equal(lastYield, 3); + assert.deepEqual(iter.next(), {value: 1, done: false}); + assert.equal(lastYield, 4); + assert.deepEqual(iter.next(), {value: 2, done: false}); + assert.equal(lastYield, 5); + assert.deepEqual([...s], [3, 4, 5, 6, 7, 8, 9]); + assert.equal(lastYield, 9); + }); + + it('buffered Promise rejections are suppressed while iterating', async function () { + let lastYield = null; + const err = new Error('injected'); + const values = (function* () { + lastYield = 'promise of 0'; + yield new Promise((resolve) => setTimeout(() => resolve(0), 100)); + lastYield = 'rejected Promise'; + yield Promise.reject(err); + lastYield = 'promise of 2'; + yield Promise.resolve(2); + })(); + const s = new Stream(values).buffer(3); + const iter = s[Symbol.iterator](); + const nextp = iter.next().value; + assert.equal(lastYield, 'promise of 2'); + assert.equal(await nextp, 0); + await assert.rejects(iter.next().value, err); + iter.return(); + }); + + it('buffered Promise rejections are unsuppressed when iteration completes', async function () { + let lastYield = null; + const err = new Error('injected'); + const values = (function* () { + lastYield = 'promise of 0'; + yield new Promise((resolve) => setTimeout(() => resolve(0), 100)); + lastYield = 'rejected Promise'; + yield Promise.reject(err); + lastYield = 'promise of 2'; + yield Promise.resolve(2); + })(); + const s = new Stream(values).buffer(3); + const iter = s[Symbol.iterator](); + assert.equal(await iter.next().value, 0); + assert.equal(lastYield, 'promise of 2'); + await assertUnhandledRejection(() => iter.return(), err); + }); + }); + + describe('map', function () { + it('empty', async function () { + let called = false; + assert.deepEqual([...new Stream([]).map((v) => called = true)], []); + assert.equal(called, false); + }); + + it('does not start until needed', async function () { + let called = false; + assert.deepEqual([...new Stream([]).map((v) => called = true)], []); + new Stream((function* () { yield 0; })()).map((v) => called = true); + assert.equal(called, false); + }); + + it('works', async function () { + const calls = []; + assert.deepEqual( + [...new Stream([0, 1, 2]).map((v) => { calls.push(v); return 2 * v; })], [0, 2, 4]); + assert.deepEqual(calls, [0, 1, 2]); + }); + }); +});