Skip to content

Commit

Permalink
feat(streams): ReadableStream.read min option (denoland#20849)
Browse files Browse the repository at this point in the history
  • Loading branch information
crowlKats authored Nov 24, 2023
1 parent 998d906 commit 6f02fa1
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 15 deletions.
76 changes: 62 additions & 14 deletions ext/web/06_streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ const {
TypedArrayPrototypeGetBuffer,
TypedArrayPrototypeGetByteLength,
TypedArrayPrototypeGetByteOffset,
TypedArrayPrototypeGetLength,
TypedArrayPrototypeGetSymbolToStringTag,
TypedArrayPrototypeSet,
TypedArrayPrototypeSlice,
Expand Down Expand Up @@ -1303,7 +1304,9 @@ function readableByteStreamControllerClose(controller) {
}
if (controller[_pendingPullIntos].length !== 0) {
const firstPendingPullInto = controller[_pendingPullIntos][0];
if (firstPendingPullInto.bytesFilled > 0) {
if (
firstPendingPullInto.bytesFilled % firstPendingPullInto.elementSize !== 0
) {
const e = new TypeError(
"Insufficient bytes to fill elements in the given buffer",
);
Expand Down Expand Up @@ -1847,10 +1850,11 @@ function readableStreamDefaultcontrollerShouldCallPull(controller) {
/**
* @param {ReadableStreamBYOBReader} reader
* @param {ArrayBufferView} view
* @param {number} min
* @param {ReadIntoRequest} readIntoRequest
* @returns {void}
*/
function readableStreamBYOBReaderRead(reader, view, readIntoRequest) {
function readableStreamBYOBReaderRead(reader, view, min, readIntoRequest) {
const stream = reader[_stream];
assert(stream);
stream[_disturbed] = true;
Expand All @@ -1860,6 +1864,7 @@ function readableStreamBYOBReaderRead(reader, view, readIntoRequest) {
readableByteStreamControllerPullInto(
stream[_controller],
view,
min,
readIntoRequest,
);
}
Expand Down Expand Up @@ -1935,12 +1940,14 @@ function readableByteStreamControllerProcessReadRequestsUsingQueue(
/**
* @param {ReadableByteStreamController} controller
* @param {ArrayBufferView} view
* @param {number} min
* @param {ReadIntoRequest} readIntoRequest
* @returns {void}
*/
function readableByteStreamControllerPullInto(
controller,
view,
min,
readIntoRequest,
) {
const stream = controller[_stream];
Expand Down Expand Up @@ -2010,6 +2017,10 @@ function readableByteStreamControllerPullInto(
);
}

const minimumFill = min * elementSize;
assert(minimumFill >= 0 && minimumFill <= byteLength);
assert(minimumFill % elementSize === 0);

try {
buffer = transferArrayBuffer(buffer);
} catch (e) {
Expand All @@ -2024,6 +2035,7 @@ function readableByteStreamControllerPullInto(
byteOffset,
byteLength,
bytesFilled: 0,
minimumFill,
elementSize,
viewConstructor: ctor,
readerType: "byob",
Expand Down Expand Up @@ -2139,7 +2151,7 @@ function readableByteStreamControllerRespondInReadableState(
);
return;
}
if (pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize) {
if (pullIntoDescriptor.bytesFilled < pullIntoDescriptor.minimumFill) {
return;
}
readableByteStreamControllerShiftPendingPullInto(controller);
Expand Down Expand Up @@ -2219,7 +2231,7 @@ function readableByteStreamControllerRespondInClosedState(
controller,
firstDescriptor,
) {
assert(firstDescriptor.bytesFilled === 0);
assert(firstDescriptor.bytesFilled % firstDescriptor.elementSize === 0);
if (firstDescriptor.readerType === "none") {
readableByteStreamControllerShiftPendingPullInto(controller);
}
Expand Down Expand Up @@ -2249,7 +2261,9 @@ function readableByteStreamControllerCommitPullIntoDescriptor(
assert(pullIntoDescriptor.readerType !== "none");
let done = false;
if (stream[_state] === "closed") {
assert(pullIntoDescriptor.bytesFilled === 0);
assert(
pullIntoDescriptor.bytesFilled % pullIntoDescriptor.elementSize === 0,
);
done = true;
}
const filledView = readableByteStreamControllerConvertPullIntoDescriptor(
Expand Down Expand Up @@ -2340,19 +2354,18 @@ function readableByteStreamControllerFillPullIntoDescriptorFromQueue(
controller,
pullIntoDescriptor,
) {
const elementSize = pullIntoDescriptor.elementSize;
const currentAlignedBytes = pullIntoDescriptor.bytesFilled -
(pullIntoDescriptor.bytesFilled % elementSize);
const maxBytesToCopy = MathMin(
controller[_queueTotalSize],
// deno-lint-ignore prefer-primordials
pullIntoDescriptor.byteLength - pullIntoDescriptor.bytesFilled,
);
const maxBytesFilled = pullIntoDescriptor.bytesFilled + maxBytesToCopy;
const maxAlignedBytes = maxBytesFilled - (maxBytesFilled % elementSize);
let totalBytesToCopyRemaining = maxBytesToCopy;
let ready = false;
if (maxAlignedBytes > currentAlignedBytes) {
assert(pullIntoDescriptor.bytesFilled < pullIntoDescriptor.minimumFill);
const maxAlignedBytes = maxBytesFilled -
(maxBytesFilled % pullIntoDescriptor.elementSize);
if (maxAlignedBytes >= pullIntoDescriptor.minimumFill) {
totalBytesToCopyRemaining = maxAlignedBytes -
pullIntoDescriptor.bytesFilled;
ready = true;
Expand Down Expand Up @@ -2402,7 +2415,7 @@ function readableByteStreamControllerFillPullIntoDescriptorFromQueue(
if (!ready) {
assert(controller[_queueTotalSize] === 0);
assert(pullIntoDescriptor.bytesFilled > 0);
assert(pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize);
assert(pullIntoDescriptor.bytesFilled < pullIntoDescriptor.minimumFill);
}
return ready;
}
Expand Down Expand Up @@ -3375,7 +3388,7 @@ function readableByteStreamTee(stream) {
reading = false;
},
};
readableStreamBYOBReaderRead(reader, view, readIntoRequest);
readableStreamBYOBReaderRead(reader, view, 1, readIntoRequest);
}

function pull1Algorithm() {
Expand Down Expand Up @@ -5543,13 +5556,19 @@ class ReadableStreamBYOBReader {

/**
* @param {ArrayBufferView} view
* @param {ReadableStreamBYOBReaderReadOptions} options
* @returns {Promise<ReadableStreamBYOBReadResult>}
*/
read(view) {
read(view, options = {}) {
try {
webidl.assertBranded(this, ReadableStreamBYOBReaderPrototype);
const prefix = "Failed to execute 'read' on 'ReadableStreamBYOBReader'";
view = webidl.converters.ArrayBufferView(view, prefix, "Argument 1");
options = webidl.converters.ReadableStreamBYOBReaderReadOptions(
options,
prefix,
"Argument 2",
);
} catch (err) {
return PromiseReject(err);
}
Expand Down Expand Up @@ -5584,6 +5603,23 @@ class ReadableStreamBYOBReader {
);
}

if (options.min === 0) {
return PromiseReject(new TypeError("options.min must be non-zero"));
}
if (TypedArrayPrototypeGetSymbolToStringTag(view) !== undefined) {
if (options.min > TypedArrayPrototypeGetLength(view)) {
return PromiseReject(
new RangeError("options.min must be smaller or equal to view's size"),
);
}
} else {
if (options.min > DataViewPrototypeGetByteLength(view)) {
return PromiseReject(
new RangeError("options.min must be smaller or equal to view's size"),
);
}
}

if (this[_stream] === undefined) {
return PromiseReject(
new TypeError("Reader has no associated stream."),
Expand All @@ -5603,7 +5639,7 @@ class ReadableStreamBYOBReader {
promise.reject(e);
},
};
readableStreamBYOBReaderRead(this, view, readIntoRequest);
readableStreamBYOBReaderRead(this, view, options.min, readIntoRequest);
return promise.promise;
}

Expand Down Expand Up @@ -5929,6 +5965,7 @@ class ReadableByteStreamController {
byteLength: autoAllocateChunkSize,
bytesFilled: 0,
elementSize: 1,
minimumFill: 1,
viewConstructor: Uint8Array,
readerType: "default",
};
Expand Down Expand Up @@ -6799,6 +6836,17 @@ webidl.converters.ReadableStreamGetReaderOptions = webidl
converter: webidl.converters.ReadableStreamReaderMode,
}]);

webidl.converters.ReadableStreamBYOBReaderReadOptions = webidl
.createDictionaryConverter("ReadableStreamBYOBReaderReadOptions", [{
key: "min",
converter: (V, prefix, context, opts) =>
webidl.converters["unsigned long long"](V, prefix, context, {
...opts,
enforceRange: true,
}),
defaultValue: 1,
}]);

webidl.converters.ReadableWritablePair = webidl
.createDictionaryConverter("ReadableWritablePair", [
{
Expand Down
1 change: 1 addition & 0 deletions ext/web/06_streams_types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ interface PullIntoDescriptor {
byteOffset: number;
byteLength: number;
bytesFilled: number;
minimumFill: number;
elementSize: number;
// deno-lint-ignore no-explicit-any
viewConstructor: any;
Expand Down
6 changes: 6 additions & 0 deletions ext/web/lib.deno_web.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -623,12 +623,18 @@ declare type ReadableStreamBYOBReadResult<V extends ArrayBufferView> =
| ReadableStreamBYOBReadDoneResult<V>
| ReadableStreamBYOBReadValueResult<V>;

/** @category Streams API */
declare interface ReadableStreamBYOBReaderReadOptions {
min?: number;
}

/** @category Streams API */
declare interface ReadableStreamBYOBReader {
readonly closed: Promise<void>;
cancel(reason?: any): Promise<void>;
read<V extends ArrayBufferView>(
view: V,
options?: ReadableStreamBYOBReaderReadOptions,
): Promise<ReadableStreamBYOBReadResult<V>>;
releaseLock(): void;
}
Expand Down
4 changes: 3 additions & 1 deletion tools/wpt/expectation.json
Original file line number Diff line number Diff line change
Expand Up @@ -3204,7 +3204,9 @@
"respond-after-enqueue.any.html": true,
"respond-after-enqueue.any.worker.html": true,
"enqueue-with-detached-buffer.any.html": true,
"enqueue-with-detached-buffer.any.worker.html": true
"enqueue-with-detached-buffer.any.worker.html": true,
"read-min.any.html": true,
"read-min.any.worker.html": true
},
"readable-streams": {
"async-iterator.any.html": true,
Expand Down

0 comments on commit 6f02fa1

Please sign in to comment.