diff --git a/cli/tests/unit/streams_test.ts b/cli/tests/unit/streams_test.ts index e496b5c4f399ee..7713cb4b5e0510 100644 --- a/cli/tests/unit/streams_test.ts +++ b/cli/tests/unit/streams_test.ts @@ -126,9 +126,7 @@ function emptyChunkStream() { }); } -// Try to blow up any recursive reads. Note that because of the use of Array.shift in -// ReadableStream, this might not actually be able to complete with larger values of -// length. +// Try to blow up any recursive reads. function veryLongTinyPacketStream(length: number) { return new ReadableStream({ start(controller) { @@ -249,9 +247,9 @@ Deno.test(async function readableStreamLongAsyncReadAll() { }); Deno.test(async function readableStreamVeryLongReadAll() { - const rid = resourceForReadableStream(veryLongTinyPacketStream(10000)); + const rid = resourceForReadableStream(veryLongTinyPacketStream(1_000_000)); const buffer = await core.ops.op_read_all(rid); - assertEquals(buffer.length, 10000); + assertEquals(buffer.length, 1_000_000); core.ops.op_close(rid); }); diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js index 38581ac794ca29..66be90a61f7d09 100644 --- a/ext/web/06_streams.js +++ b/ext/web/06_streams.js @@ -218,6 +218,50 @@ function uponPromise(promise, onFulfilled, onRejected) { ); } +class Queue { + #head = null; + #tail = null; + #size = 0; + + enqueue(value) { + const node = { value, next: null }; + if (this.#head === null) { + this.#head = node; + this.#tail = node; + } else { + this.#tail.next = node; + this.#tail = node; + } + return ++this.#size; + } + + dequeue() { + const node = this.#head; + if (node === null) { + return null; + } + + if (this.#head === this.#tail) { + this.#tail = null; + } + this.#head = this.#head.next; + this.#size--; + return node.value; + } + + peek() { + if (this.#head === null) { + return null; + } + + return this.#head.value; + } + + get size() { + return this.#size; + } +} + /** * @param {ArrayBufferLike} O * @returns {boolean} @@ -351,6 +395,17 @@ const _writable = Symbol("[[writable]]"); const _writeAlgorithm = Symbol("[[writeAlgorithm]]"); const _writer = Symbol("[[writer]]"); const _writeRequests = Symbol("[[writeRequests]]"); +const _brand = webidl.brand; + +function noop() {} +async function noopAsync() {} +const _defaultStartAlgorithm = noop; +const _defaultWriteAlgorithm = noopAsync; +const _defaultCloseAlgorithm = noopAsync; +const _defaultAbortAlgorithm = noopAsync; +const _defaultPullAlgorithm = noopAsync; +const _defaultFlushAlgorithm = noopAsync; +const _defaultCancelAlgorithm = noopAsync; /** * @template R @@ -358,7 +413,9 @@ const _writeRequests = Symbol("[[writeRequests]]"); * @returns {ReadableStreamDefaultReader} */ function acquireReadableStreamDefaultReader(stream) { - return new ReadableStreamDefaultReader(stream); + const reader = new ReadableStreamDefaultReader(_brand); + setUpReadableStreamDefaultReader(reader, stream); + return reader; } /** @@ -367,7 +424,7 @@ function acquireReadableStreamDefaultReader(stream) { * @returns {ReadableStreamBYOBReader} */ function acquireReadableStreamBYOBReader(stream) { - const reader = webidl.createBranded(ReadableStreamBYOBReader); + const reader = new ReadableStreamBYOBReader(_brand); setUpReadableStreamBYOBReader(reader, stream); return reader; } @@ -399,9 +456,9 @@ function createReadableStream( ) { assert(isNonNegativeNumber(highWaterMark)); /** @type {ReadableStream} */ - const stream = webidl.createBranded(ReadableStream); + const stream = new ReadableStream(_brand); initializeReadableStream(stream); - const controller = webidl.createBranded(ReadableStreamDefaultController); + const controller = new ReadableStreamDefaultController(_brand); setUpReadableStreamDefaultController( stream, controller, @@ -433,9 +490,9 @@ function createWritableStream( sizeAlgorithm, ) { assert(isNonNegativeNumber(highWaterMark)); - const stream = webidl.createBranded(WritableStream); + const stream = new WritableStream(_brand); initializeWritableStream(stream); - const controller = webidl.createBranded(WritableStreamDefaultController); + const controller = new WritableStreamDefaultController(_brand); setUpWritableStreamDefaultController( stream, controller, @@ -455,19 +512,15 @@ function createWritableStream( * @returns {T} */ function dequeueValue(container) { - assert( - ReflectHas(container, _queue) && - ReflectHas(container, _queueTotalSize), - ); - assert(container[_queue].length); - const valueWithSize = ArrayPrototypeShift(container[_queue]); + assert(container[_queue] && typeof container[_queueTotalSize] === "number"); + assert(container[_queue].size); + const valueWithSize = container[_queue].dequeue(); container[_queueTotalSize] -= valueWithSize.size; if (container[_queueTotalSize] < 0) { container[_queueTotalSize] = 0; } return valueWithSize.value; } - /** * @template T * @param {{ [_queue]: Array>, [_queueTotalSize]: number }} container @@ -476,17 +529,14 @@ function dequeueValue(container) { * @returns {void} */ function enqueueValueWithSize(container, value, size) { - assert( - ReflectHas(container, _queue) && - ReflectHas(container, _queueTotalSize), - ); + assert(container[_queue] && typeof container[_queueTotalSize] === "number"); if (isNonNegativeNumber(size) === false) { throw RangeError("chunk size isn't a positive number"); } if (size === Infinity) { throw RangeError("chunk size is invalid"); } - ArrayPrototypePush(container[_queue], { value, size }); + container[_queue].enqueue({ value, size }); container[_queueTotalSize] += size; } @@ -537,9 +587,9 @@ function createReadableByteStream( pullAlgorithm, cancelAlgorithm, ) { - const stream = webidl.createBranded(ReadableStream); + const stream = new ReadableStream(_brand); initializeReadableStream(stream); - const controller = webidl.createBranded(ReadableByteStreamController); + const controller = new ReadableByteStreamController(_brand); setUpReadableByteStreamController( stream, controller, @@ -646,16 +696,7 @@ function initializeWritableStream(stream) { * @returns {v is number} */ function isNonNegativeNumber(v) { - if (typeof v !== "number") { - return false; - } - if (NumberIsNaN(v)) { - return false; - } - if (v < 0) { - return false; - } - return true; + return typeof v === "number" && v >= 0; } /** @@ -663,8 +704,7 @@ function isNonNegativeNumber(v) { * @returns {value is ReadableStream} */ function isReadableStream(value) { - return !(typeof value !== "object" || value === null || - !ReflectHas(value, _controller)); + return !(typeof value !== "object" || value === null || !value[_controller]); } /** @@ -672,10 +712,7 @@ function isReadableStream(value) { * @returns {boolean} */ function isReadableStreamLocked(stream) { - if (stream[_reader] === undefined) { - return false; - } - return true; + return stream[_reader] !== undefined; } /** @@ -684,7 +721,7 @@ function isReadableStreamLocked(stream) { */ function isReadableStreamDefaultReader(value) { return !(typeof value !== "object" || value === null || - !ReflectHas(value, _readRequests)); + !value[_readRequests]); } /** @@ -873,7 +910,7 @@ const _original = Symbol("[[original]]"); * @returns {ReadableStream} */ function readableStreamForRid(rid, autoClose = true) { - const stream = webidl.createBranded(ReadableStream); + const stream = new ReadableStream(_brand); stream[_resourceBacking] = { rid, autoClose }; const tryClose = () => { @@ -943,7 +980,7 @@ const _isUnref = Symbol("isUnref"); * @returns {ReadableStream} */ function readableStreamForRidUnrefable(rid) { - const stream = webidl.createBranded(ReadableStream); + const stream = new ReadableStream(_brand); stream[promiseIdSymbol] = undefined; stream[_isUnref] = false; stream[_resourceBackingUnrefable] = { rid, autoClose: true }; @@ -1094,7 +1131,7 @@ async function readableStreamCollectIntoUint8Array(stream) { * @returns {ReadableStream} */ function writableStreamForRid(rid, autoClose = true) { - const stream = webidl.createBranded(WritableStream); + const stream = new WritableStream(_brand); stream[_resourceBacking] = { rid, autoClose }; const tryClose = () => { @@ -1173,11 +1210,11 @@ function isWritableStreamLocked(stream) { */ function peekQueueValue(container) { assert( - ReflectHas(container, _queue) && - ReflectHas(container, _queueTotalSize), + container[_queue] && + typeof container[_queueTotalSize] === "number", ); - assert(container[_queue].length); - const valueWithSize = container[_queue][0]; + assert(container[_queue].size); + const valueWithSize = container[_queue].peek(); return valueWithSize.value; } @@ -1347,7 +1384,7 @@ function readableByteStreamControllerEnqueue(controller, chunk) { byteLength, ); } else { - assert(controller[_queue].length === 0); + assert(controller[_queue].size === 0); if (controller[_pendingPullIntos].length !== 0) { assert(controller[_pendingPullIntos][0].readerType === "default"); readableByteStreamControllerShiftPendingPullInto(controller); @@ -1394,7 +1431,7 @@ function readableByteStreamControllerEnqueueChunkToQueue( byteOffset, byteLength, ) { - ArrayPrototypePush(controller[_queue], { buffer, byteOffset, byteLength }); + controller[_queue].enqueue({ buffer, byteOffset, byteLength }); controller[_queueTotalSize] += byteLength; } @@ -1476,7 +1513,7 @@ function readableByteStreamControllerGetBYOBRequest(controller) { // deno-lint-ignore prefer-primordials firstDescriptor.byteLength - firstDescriptor.bytesFilled, ); - const byobRequest = webidl.createBranded(ReadableStreamBYOBRequest); + const byobRequest = new ReadableStreamBYOBRequest(_brand); byobRequest[_controller] = controller; byobRequest[_view] = view; controller[_byobRequest] = byobRequest; @@ -1504,7 +1541,7 @@ function readableByteStreamControllerGetDesiredSize(controller) { * @returns {void} */ function resetQueue(container) { - container[_queue] = []; + container[_queue] = new Queue(); container[_queueTotalSize] = 0; } @@ -1564,7 +1601,7 @@ function readableByteStreamControllerShouldCallPull(controller) { function readableStreamAddReadRequest(stream, readRequest) { assert(isReadableStreamDefaultReader(stream[_reader])); assert(stream[_state] === "readable"); - ArrayPrototypePush(stream[_reader][_readRequests], readRequest); + stream[_reader][_readRequests].enqueue(readRequest); } /** @@ -1587,7 +1624,7 @@ function readableStreamAddReadIntoRequest(stream, readRequest) { function readableStreamCancel(stream, reason) { stream[_disturbed] = true; if (stream[_state] === "closed") { - return resolvePromiseWith(undefined); + return PromiseResolve(undefined); } if (stream[_state] === "errored") { return PromiseReject(stream[_storedError]); @@ -1604,7 +1641,7 @@ function readableStreamCancel(stream, reason) { } /** @type {Promise} */ const sourceCancelPromise = stream[_controller][_cancelSteps](reason); - return PromisePrototypeThen(sourceCancelPromise, () => undefined); + return PromisePrototypeThen(sourceCancelPromise, noop); } /** @@ -1623,9 +1660,8 @@ function readableStreamClose(stream) { if (isReadableStreamDefaultReader(reader)) { /** @type {Array>} */ const readRequests = reader[_readRequests]; - reader[_readRequests] = []; - for (let i = 0; i < readRequests.length; ++i) { - const readRequest = readRequests[i]; + while (readRequests.size !== 0) { + const readRequest = readRequests.dequeue(); readRequest.closeSteps(); } } @@ -1676,11 +1712,7 @@ function readableStreamDefaultControllerCallPullIfNeeded(controller) { */ function readableStreamDefaultControllerCanCloseOrEnqueue(controller) { const state = controller[_stream][_state]; - if (controller[_closeRequested] === false && state === "readable") { - return true; - } else { - return false; - } + return controller[_closeRequested] === false && state === "readable"; } /** @param {ReadableStreamDefaultController} controller */ @@ -1699,7 +1731,7 @@ function readableStreamDefaultControllerClose(controller) { } const stream = controller[_stream]; controller[_closeRequested] = true; - if (controller[_queue].length === 0) { + if (controller[_queue].size === 0) { readableStreamDefaultControllerClearAlgorithms(controller); readableStreamClose(stream); } @@ -1785,7 +1817,6 @@ function readableStreamDefaultcontrollerHasBackpressure(controller) { * @returns {boolean} */ function readableStreamDefaultcontrollerShouldCallPull(controller) { - const stream = controller[_stream]; if ( readableStreamDefaultControllerCanCloseOrEnqueue(controller) === false ) { @@ -1794,6 +1825,7 @@ function readableStreamDefaultcontrollerShouldCallPull(controller) { if (controller[_started] === false) { return false; } + const stream = controller[_stream]; if ( isReadableStreamLocked(stream) && readableStreamGetNumReadRequests(stream) > 0 @@ -1803,10 +1835,11 @@ function readableStreamDefaultcontrollerShouldCallPull(controller) { const desiredSize = readableStreamDefaultControllerGetDesiredSize( controller, ); - assert(desiredSize !== null); + if (desiredSize > 0) { return true; } + assert(desiredSize !== null); return false; } @@ -1846,9 +1879,8 @@ function readableStreamBYOBReaderRelease(reader) { */ function readableStreamDefaultReaderErrorReadRequests(reader, e) { const readRequests = reader[_readRequests]; - reader[_readRequests] = []; - for (let i = 0; i < readRequests.length; ++i) { - const readRequest = readRequests[i]; + while (readRequests.size !== 0) { + const readRequest = readRequests.dequeue(); readRequest.errorSteps(e); } } @@ -1887,11 +1919,11 @@ function readableByteStreamControllerProcessReadRequestsUsingQueue( ) { const reader = controller[_stream][_reader]; assert(isReadableStreamDefaultReader(reader)); - while (reader[_readRequests].length !== 0) { + while (reader[_readRequests].size !== 0) { if (controller[_queueTotalSize] === 0) { return; } - const readRequest = ArrayPrototypeShift(reader[_readRequests]); + const readRequest = reader[_readRequests].dequeue(); readableByteStreamControllerFillReadRequestFromQueue( controller, readRequest, @@ -2326,7 +2358,7 @@ function readableByteStreamControllerFillPullIntoDescriptorFromQueue( } const queue = controller[_queue]; while (totalBytesToCopyRemaining > 0) { - const headOfQueue = queue[0]; + const headOfQueue = queue.peek(); const bytesToCopy = MathMin( totalBytesToCopyRemaining, // deno-lint-ignore prefer-primordials @@ -2353,7 +2385,7 @@ function readableByteStreamControllerFillPullIntoDescriptorFromQueue( // deno-lint-ignore prefer-primordials if (headOfQueue.byteLength === bytesToCopy) { - ArrayPrototypeShift(queue); + queue.dequeue(); } else { headOfQueue.byteOffset += bytesToCopy; headOfQueue.byteLength -= bytesToCopy; @@ -2384,7 +2416,7 @@ function readableByteStreamControllerFillReadRequestFromQueue( readRequest, ) { assert(controller[_queueTotalSize] > 0); - const entry = ArrayPrototypeShift(controller[_queue]); + const entry = controller[_queue].dequeue(); // deno-lint-ignore prefer-primordials controller[_queueTotalSize] -= entry.byteLength; readableByteStreamControllerHandleQueueDrain(controller); @@ -2526,9 +2558,9 @@ function readableStreamFulfillReadRequest(stream, chunk, done) { assert(readableStreamHasDefaultReader(stream) === true); /** @type {ReadableStreamDefaultReader} */ const reader = stream[_reader]; - assert(reader[_readRequests].length); + assert(reader[_readRequests].size); /** @type {ReadRequest} */ - const readRequest = ArrayPrototypeShift(reader[_readRequests]); + const readRequest = reader[_readRequests].dequeue(); if (done) { readRequest.closeSteps(); } else { @@ -2551,7 +2583,7 @@ function readableStreamGetNumReadIntoRequests(stream) { */ function readableStreamGetNumReadRequests(stream) { assert(readableStreamHasDefaultReader(stream) === true); - return stream[_reader][_readRequests].length; + return stream[_reader][_readRequests].size; } /** @@ -2621,7 +2653,7 @@ function readableStreamPipeTo( const writer = acquireWritableStreamDefaultWriter(dest); source[_disturbed] = true; let shuttingDown = false; - let currentWrite = resolvePromiseWith(undefined); + let currentWrite = PromiseResolve(undefined); /** @type {Deferred} */ const promise = new Deferred(); /** @type {() => void} */ @@ -2636,7 +2668,7 @@ function readableStreamPipeTo( if (dest[_state] === "writable") { return writableStreamAbort(dest, error); } else { - return resolvePromiseWith(undefined); + return PromiseResolve(undefined); } }); } @@ -2645,7 +2677,7 @@ function readableStreamPipeTo( if (source[_state] === "readable") { return readableStreamCancel(source, error); } else { - return resolvePromiseWith(undefined); + return PromiseResolve(undefined); } }); } @@ -2680,7 +2712,7 @@ function readableStreamPipeTo( /** @returns {Promise} */ function pipeStep() { if (shuttingDown === true) { - return resolvePromiseWith(true); + return PromiseResolve(true); } return transformPromiseWith(writer[_readyPromise].promise, () => { @@ -2997,7 +3029,7 @@ function readableStreamDefaultTee(stream, cloneForBranch2) { function pullAlgorithm() { if (reading === true) { readAgain = true; - return resolvePromiseWith(undefined); + return PromiseResolve(undefined); } reading = true; /** @type {ReadRequest} */ @@ -3073,7 +3105,7 @@ function readableStreamDefaultTee(stream, cloneForBranch2) { }, }; readableStreamDefaultReaderRead(reader, readRequest); - return resolvePromiseWith(undefined); + return PromiseResolve(undefined); } /** @@ -3249,7 +3281,7 @@ function readableByteStreamTee(stream) { function pullWithBYOBReader(view, forBranch2) { if (isReadableStreamDefaultReader(reader)) { - assert(reader[_readRequests].length === 0); + assert(reader[_readRequests].size === 0); readableStreamDefaultReaderRelease(reader); reader = acquireReadableStreamBYOBReader(stream); forwardReaderError(reader); @@ -3458,7 +3490,7 @@ function setUpReadableByteStreamController( controller[_pendingPullIntos] = []; stream[_controller] = controller; const startResult = startAlgorithm(); - const startPromise = resolvePromiseWith(startResult); + const startPromise = PromiseResolve(startResult); setPromiseIsHandledToTrue( PromisePrototypeThen( startPromise, @@ -3487,13 +3519,13 @@ function setUpReadableByteStreamControllerFromUnderlyingSource( underlyingSourceDict, highWaterMark, ) { - const controller = webidl.createBranded(ReadableByteStreamController); + const controller = new ReadableByteStreamController(_brand); /** @type {() => void} */ - let startAlgorithm = () => undefined; + let startAlgorithm = _defaultStartAlgorithm; /** @type {() => Promise} */ - let pullAlgorithm = () => resolvePromiseWith(undefined); + let pullAlgorithm = _defaultPullAlgorithm; /** @type {(reason: any) => Promise} */ - let cancelAlgorithm = (_reason) => resolvePromiseWith(undefined); + let cancelAlgorithm = _defaultCancelAlgorithm; if (underlyingSourceDict.start !== undefined) { startAlgorithm = () => webidl.invokeCallbackFunction( @@ -3574,7 +3606,7 @@ function setUpReadableStreamDefaultController( controller[_cancelAlgorithm] = cancelAlgorithm; stream[_controller] = controller; const startResult = startAlgorithm(controller); - const startPromise = resolvePromiseWith(startResult); + const startPromise = PromiseResolve(startResult); uponPromise(startPromise, () => { controller[_started] = true; assert(controller[_pulling] === false); @@ -3600,13 +3632,13 @@ function setUpReadableStreamDefaultControllerFromUnderlyingSource( highWaterMark, sizeAlgorithm, ) { - const controller = webidl.createBranded(ReadableStreamDefaultController); + const controller = new ReadableStreamDefaultController(_brand); /** @type {() => Promise} */ - let startAlgorithm = () => undefined; + let startAlgorithm = _defaultStartAlgorithm; /** @type {() => Promise} */ - let pullAlgorithm = () => resolvePromiseWith(undefined); + let pullAlgorithm = _defaultPullAlgorithm; /** @type {(reason?: any) => Promise} */ - let cancelAlgorithm = () => resolvePromiseWith(undefined); + let cancelAlgorithm = _defaultCancelAlgorithm; if (underlyingSourceDict.start !== undefined) { startAlgorithm = () => webidl.invokeCallbackFunction( @@ -3681,7 +3713,7 @@ function setUpReadableStreamDefaultReader(reader, stream) { throw new TypeError("ReadableStream is locked."); } readableStreamReaderGenericInitialize(reader, stream); - reader[_readRequests] = []; + reader[_readRequests] = new Queue(); } /** @@ -3721,7 +3753,7 @@ function setUpTransformStreamDefaultControllerFromTransformer( transformerDict, ) { /** @type {TransformStreamDefaultController} */ - const controller = webidl.createBranded(TransformStreamDefaultController); + const controller = new TransformStreamDefaultController(_brand); /** @type {(chunk: O, controller: TransformStreamDefaultController) => Promise} */ let transformAlgorithm = (chunk) => { try { @@ -3729,12 +3761,11 @@ function setUpTransformStreamDefaultControllerFromTransformer( } catch (e) { return PromiseReject(e); } - return resolvePromiseWith(undefined); + return PromiseResolve(undefined); }; /** @type {(controller: TransformStreamDefaultController) => Promise} */ - let flushAlgorithm = () => resolvePromiseWith(undefined); - /** @type {(reason: any) => Promise} */ - let cancelAlgorithm = () => resolvePromiseWith(undefined); + let flushAlgorithm = _defaultFlushAlgorithm; + let cancelAlgorithm = _defaultCancelAlgorithm; if (transformerDict.transform !== undefined) { transformAlgorithm = (chunk, controller) => webidl.invokeCallbackFunction( @@ -3842,14 +3873,14 @@ function setUpWritableStreamDefaultControllerFromUnderlyingSink( highWaterMark, sizeAlgorithm, ) { - const controller = webidl.createBranded(WritableStreamDefaultController); + const controller = new WritableStreamDefaultController(_brand); /** @type {(controller: WritableStreamDefaultController) => any} */ - let startAlgorithm = () => undefined; + let startAlgorithm = _defaultStartAlgorithm; /** @type {(chunk: W, controller: WritableStreamDefaultController) => Promise} */ - let writeAlgorithm = () => resolvePromiseWith(undefined); - let closeAlgorithm = () => resolvePromiseWith(undefined); + let writeAlgorithm = _defaultWriteAlgorithm; + let closeAlgorithm = _defaultCloseAlgorithm; /** @type {(reason?: any) => Promise} */ - let abortAlgorithm = () => resolvePromiseWith(undefined); + let abortAlgorithm = _defaultAbortAlgorithm; if (underlyingSinkDict.start !== undefined) { startAlgorithm = () => @@ -4215,11 +4246,11 @@ function transformStreamUnblockWrite(stream) { function writableStreamAbort(stream, reason) { const state = stream[_state]; if (state === "closed" || state === "errored") { - return resolvePromiseWith(undefined); + return PromiseResolve(undefined); } stream[_controller][_signal][signalAbort](reason); if (state === "closed" || state === "errored") { - return resolvePromiseWith(undefined); + return PromiseResolve(undefined); } if (stream[_pendingAbortRequest] !== undefined) { return stream[_pendingAbortRequest].deferred.promise; @@ -4329,7 +4360,7 @@ function writableStreamDefaultControllerAdvanceQueueIfNeeded(controller) { writableStreamFinishErroring(stream); return; } - if (controller[_queue].length === 0) { + if (controller[_queue].size === 0) { return; } const value = peekQueueValue(controller); @@ -4415,7 +4446,7 @@ function writableStreamDefaultControllerProcessClose(controller) { const stream = controller[_stream]; writableStreamMarkCloseRequestInFlight(stream); dequeueValue(controller); - assert(controller[_queue].length === 0); + assert(controller[_queue].size === 0); const sinkClosePromise = controller[_closeAlgorithm](); writableStreamDefaultControllerClearAlgorithms(controller); uponPromise(sinkClosePromise, () => { @@ -4515,7 +4546,7 @@ function writableStreamDefaultWriterCloseWithErrorPropagation(writer) { if ( writableStreamCloseQueuedOrInFlight(stream) === true || state === "closed" ) { - return resolvePromiseWith(undefined); + return PromiseResolve(undefined); } if (state === "errored") { return PromiseReject(stream[_storedError]); @@ -4819,6 +4850,35 @@ const asyncIteratorPrototype = ObjectGetPrototypeOf(AsyncGeneratorPrototype); const _iteratorNext = Symbol("[[iteratorNext]]"); const _iteratorFinished = Symbol("[[iteratorFinished]]"); +class ReadableStreamAsyncIteratorReadRequest { + #reader; + #promise; + + constructor(reader, promise) { + this.#reader = reader; + this.#promise = promise; + } + + chunkSteps(chunk) { + this.#reader[_iteratorNext] = null; + this.#promise.resolve({ value: chunk, done: false }); + } + + closeSteps() { + this.#reader[_iteratorNext] = null; + this.#reader[_iteratorFinished] = true; + readableStreamDefaultReaderRelease(this.#reader); + this.#promise.resolve({ value: undefined, done: true }); + } + + errorSteps(e) { + this.#reader[_iteratorNext] = null; + this.#reader[_iteratorFinished] = true; + readableStreamDefaultReaderRelease(this.#reader); + this.#promise.reject(e); + } +} + /** @type {AsyncIterator} */ const readableStreamAsyncIteratorPrototype = ObjectSetPrototypeOf({ /** @returns {Promise>} */ @@ -4840,41 +4900,21 @@ const readableStreamAsyncIteratorPrototype = ObjectSetPrototypeOf({ /** @type {Deferred>} */ const promise = new Deferred(); - /** @type {ReadRequest} */ - const readRequest = { - chunkSteps(chunk) { - promise.resolve({ value: chunk, done: false }); - }, - closeSteps() { - readableStreamDefaultReaderRelease(reader); - promise.resolve({ value: undefined, done: true }); - }, - errorSteps(e) { - readableStreamDefaultReaderRelease(reader); - promise.reject(e); - }, - }; + // internal values (_iteratorNext & _iteratorFinished) are modified inside + // ReadableStreamAsyncIteratorReadRequest methods + // see: https://webidl.spec.whatwg.org/#es-default-asynchronous-iterator-object + const readRequest = new ReadableStreamAsyncIteratorReadRequest( + reader, + promise, + ); readableStreamDefaultReaderRead(reader, readRequest); - return PromisePrototypeThen(promise.promise, (result) => { - reader[_iteratorNext] = null; - if (result.done === true) { - reader[_iteratorFinished] = true; - return { value: undefined, done: true }; - } - return result; - }, (reason) => { - reader[_iteratorNext] = null; - reader[_iteratorFinished] = true; - throw reason; - }); + return PromisePrototypeThen(promise.promise); } - reader[_iteratorNext] = reader[_iteratorNext] + return reader[_iteratorNext] = reader[_iteratorNext] ? PromisePrototypeThen(reader[_iteratorNext], nextSteps, nextSteps) : nextSteps(); - - return reader[_iteratorNext]; }, /** * @param {unknown} arg @@ -4892,7 +4932,7 @@ const readableStreamAsyncIteratorPrototype = ObjectSetPrototypeOf({ if (reader[_stream] === undefined) { return PromiseResolve({ value: undefined, done: true }); } - assert(reader[_readRequests].length === 0); + assert(reader[_readRequests].size === 0); if (this[_preventCancel] === false) { const result = readableStreamReaderGenericCancel(reader, arg); readableStreamDefaultReaderRelease(reader); @@ -4918,7 +4958,7 @@ class ByteLengthQueuingStrategy { const prefix = "Failed to construct 'ByteLengthQueuingStrategy'"; webidl.requiredArguments(arguments.length, 1, prefix); init = webidl.converters.QueuingStrategyInit(init, prefix, "Argument 1"); - this[webidl.brand] = webidl.brand; + this[_brand] = _brand; this[_globalObject] = globalThis; this[_highWaterMark] = init.highWaterMark; } @@ -4972,7 +5012,7 @@ class CountQueuingStrategy { const prefix = "Failed to construct 'CountQueuingStrategy'"; webidl.requiredArguments(arguments.length, 1, prefix); init = webidl.converters.QueuingStrategyInit(init, prefix, "Argument 1"); - this[webidl.brand] = webidl.brand; + this[_brand] = _brand; this[_globalObject] = globalThis; this[_highWaterMark] = init.highWaterMark; } @@ -5070,34 +5110,36 @@ class ReadableStream { * @param {QueuingStrategy=} strategy */ constructor(underlyingSource = undefined, strategy = undefined) { + if (underlyingSource === _brand) { + this[_brand] = _brand; + return; + } + const prefix = "Failed to construct 'ReadableStream'"; - if (underlyingSource !== undefined) { - underlyingSource = webidl.converters.object( + underlyingSource = underlyingSource !== undefined + ? webidl.converters.object( underlyingSource, prefix, "Argument 1", - ); - } else { - underlyingSource = null; - } - if (strategy !== undefined) { - strategy = webidl.converters.QueuingStrategy( + ) + : null; + strategy = strategy !== undefined + ? webidl.converters.QueuingStrategy( strategy, prefix, "Argument 2", - ); - } else { - strategy = {}; - } - this[webidl.brand] = webidl.brand; - let underlyingSourceDict = {}; - if (underlyingSource !== undefined) { - underlyingSourceDict = webidl.converters.UnderlyingSource( + ) + : {}; + + const underlyingSourceDict = underlyingSource !== undefined + ? webidl.converters.UnderlyingSource( underlyingSource, prefix, "underlyingSource", - ); - } + ) + : {}; + this[_brand] = _brand; + initializeReadableStream(this); if (underlyingSourceDict.type === "bytes") { if (strategy.size !== undefined) { @@ -5114,7 +5156,6 @@ class ReadableStream { highWaterMark, ); } else { - assert(!(ReflectHas(underlyingSourceDict, "type"))); const sizeAlgorithm = extractSizeAlgorithm(strategy); const highWaterMark = extractHighWaterMark(strategy, 1); setUpReadableStreamDefaultControllerFromUnderlyingSource( @@ -5137,7 +5178,7 @@ class ReadableStream { const iterator = getIterator(asyncIterable, true); - const stream = createReadableStream(() => undefined, async () => { + const stream = createReadableStream(noop, async () => { // deno-lint-ignore prefer-primordials const res = await iterator.next(); if (typeof res !== "object") { @@ -5310,19 +5351,23 @@ class ReadableStream { * @param {ReadableStreamIteratorOptions=} options * @returns {AsyncIterableIterator} */ - values(options = {}) { + values(options = undefined) { webidl.assertBranded(this, ReadableStreamPrototype); - const prefix = "Failed to execute 'values' on 'ReadableStream'"; - options = webidl.converters.ReadableStreamIteratorOptions( - options, - prefix, - "Argument 1", - ); + let preventCancel = false; + if (options !== undefined) { + const prefix = "Failed to execute 'values' on 'ReadableStream'"; + options = webidl.converters.ReadableStreamIteratorOptions( + options, + prefix, + "Argument 1", + ); + preventCancel = options.preventCancel; + } /** @type {AsyncIterableIterator} */ const iterator = ObjectCreate(readableStreamAsyncIteratorPrototype); const reader = acquireReadableStreamDefaultReader(this); iterator[_reader] = reader; - iterator[_preventCancel] = options.preventCancel; + iterator[_preventCancel] = preventCancel; return iterator; } @@ -5357,10 +5402,14 @@ class ReadableStreamDefaultReader { /** @param {ReadableStream} stream */ constructor(stream) { + if (stream === _brand) { + this[_brand] = _brand; + return; + } const prefix = "Failed to construct 'ReadableStreamDefaultReader'"; webidl.requiredArguments(arguments.length, 1, prefix); stream = webidl.converters.ReadableStream(stream, prefix, "Argument 1"); - this[webidl.brand] = webidl.brand; + this[_brand] = _brand; setUpReadableStreamDefaultReader(this, stream); } @@ -5454,10 +5503,14 @@ class ReadableStreamBYOBReader { /** @param {ReadableStream} stream */ constructor(stream) { + if (stream === _brand) { + this[_brand] = _brand; + return; + } const prefix = "Failed to construct 'ReadableStreamBYOBReader'"; webidl.requiredArguments(arguments.length, 1, prefix); stream = webidl.converters.ReadableStream(stream, prefix, "Argument 1"); - this[webidl.brand] = webidl.brand; + this[_brand] = _brand; setUpReadableStreamBYOBReader(this, stream); } @@ -5491,16 +5544,19 @@ class ReadableStreamBYOBReader { new TypeError("view must have non-zero byteLength"), ); } + if (getArrayBufferByteLength(buffer) === 0) { + if (isDetachedBuffer(buffer)) { + return PromiseReject( + new TypeError("view's buffer has been detached"), + ); + } + return PromiseReject( new TypeError("view's buffer must have non-zero byteLength"), ); } - if (isDetachedBuffer(buffer)) { - return PromiseReject( - new TypeError("view's buffer has been detached"), - ); - } + if (this[_stream] === undefined) { return PromiseReject( new TypeError("Reader has no associated stream."), @@ -5584,8 +5640,11 @@ class ReadableStreamBYOBRequest { return this[_view]; } - constructor() { - webidl.illegalConstructor(); + constructor(brand = undefined) { + if (brand !== _brand) { + webidl.illegalConstructor(); + } + this[_brand] = _brand; } respond(bytesWritten) { @@ -5680,8 +5739,11 @@ class ReadableByteStreamController { /** @type {ReadableStream} */ [_stream]; - constructor() { - webidl.illegalConstructor(); + constructor(brand = undefined) { + if (brand !== _brand) { + webidl.illegalConstructor(); + } + this[_brand] = _brand; } /** @returns {ReadableStreamBYOBRequest | null} */ @@ -5875,8 +5937,11 @@ class ReadableStreamDefaultController { /** @type {ReadableStream} */ [_stream]; - constructor() { - webidl.illegalConstructor(); + constructor(brand = undefined) { + if (brand !== _brand) { + webidl.illegalConstructor(); + } + this[_brand] = _brand; } /** @returns {number | null} */ @@ -5949,9 +6014,9 @@ class ReadableStreamDefaultController { */ [_pullSteps](readRequest) { const stream = this[_stream]; - if (this[_queue].length) { + if (this[_queue].size) { const chunk = dequeueValue(this); - if (this[_closeRequested] && this[_queue].length === 0) { + if (this[_closeRequested] && this[_queue].size === 0) { readableStreamDefaultControllerClearAlgorithms(this); readableStreamClose(stream); } else { @@ -6015,7 +6080,7 @@ class TransformStream { prefix, "Argument 3", ); - this[webidl.brand] = webidl.brand; + this[_brand] = _brand; if (transformer === undefined) { transformer = null; } @@ -6103,8 +6168,11 @@ class TransformStreamDefaultController { /** @type {(chunk: O, controller: this) => Promise} */ [_transformAlgorithm]; - constructor() { - webidl.illegalConstructor(); + constructor(brand = undefined) { + if (brand !== _brand) { + webidl.illegalConstructor(); + } + this[_brand] = _brand; } /** @returns {number | null} */ @@ -6191,7 +6259,11 @@ class WritableStream { * @param {UnderlyingSink=} underlyingSink * @param {QueuingStrategy=} strategy */ - constructor(underlyingSink = undefined, strategy = {}) { + constructor(underlyingSink = undefined, strategy = undefined) { + if (underlyingSink === _brand) { + this[_brand] = _brand; + return; + } const prefix = "Failed to construct 'WritableStream'"; if (underlyingSink !== undefined) { underlyingSink = webidl.converters.object( @@ -6200,12 +6272,14 @@ class WritableStream { "Argument 1", ); } - strategy = webidl.converters.QueuingStrategy( - strategy, - prefix, - "Argument 2", - ); - this[webidl.brand] = webidl.brand; + strategy = strategy !== undefined + ? webidl.converters.QueuingStrategy( + strategy, + prefix, + "Argument 2", + ) + : {}; + this[_brand] = _brand; if (underlyingSink === undefined) { underlyingSink = null; } @@ -6314,7 +6388,7 @@ class WritableStreamDefaultWriter { const prefix = "Failed to construct 'WritableStreamDefaultWriter'"; webidl.requiredArguments(arguments.length, 1, prefix); stream = webidl.converters.WritableStream(stream, prefix, "Argument 1"); - this[webidl.brand] = webidl.brand; + this[_brand] = _brand; setUpWritableStreamDefaultWriter(this, stream); } @@ -6471,8 +6545,11 @@ class WritableStreamDefaultController { return this[_signal]; } - constructor() { - webidl.illegalConstructor(); + constructor(brand = undefined) { + if (brand !== _brand) { + webidl.illegalConstructor(); + } + this[_brand] = _brand; } /**