Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(ext/streams): optimize streams #20649

Merged
merged 25 commits into from
Oct 13, 2023
Merged
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
perf: use Queue
  • Loading branch information
marcosc90 committed Sep 23, 2023
commit fe9cc021d13ec48694ae8f6f9f75dd52d02c2ee1
68 changes: 36 additions & 32 deletions ext/web/06_streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ class Queue {

enqueue(value) {
const node = { value, next: null };
if (!this.head) {
if (this.head === null) {
this.head = node;
this.tail = node;
} else {
Expand All @@ -227,10 +227,11 @@ class Queue {
}

dequeue() {
if (!this.head) {
const node = this.head;
if (node === null) {
return null;
}
const node = this.head;

if (this.head === this.tail) {
this.tail = null;
}
Expand All @@ -240,7 +241,7 @@ class Queue {
}

peek() {
if (!this.head) {
if (this.head === null) {
return null;
}

Expand Down Expand Up @@ -1580,7 +1581,7 @@ function readableByteStreamControllerShouldCallPull(controller) {
function readableStreamAddReadRequest(stream, readRequest) {
assert(isReadableStreamDefaultReader(stream[_reader]));
assert(stream[_state] === "readable");
ArrayPrototypePush(stream[_reader][_readRequests], readRequest);
stream[_reader][_readRequests].enqueue(readRequest);
}

/**
Expand Down Expand Up @@ -1639,9 +1640,9 @@ function readableStreamClose(stream) {
if (isReadableStreamDefaultReader(reader)) {
/** @type {Array<ReadRequest<R>>} */
const readRequests = reader[_readRequests];
reader[_readRequests] = [];
for (let i = 0; i < readRequests.length; ++i) {
const readRequest = readRequests[i];
// reader[_readRequests] = new Queue();
while (readRequests.size !== 0) {
const readRequest = readRequests.dequeue();
readRequest.closeSteps();
}
}
Expand Down Expand Up @@ -1797,7 +1798,6 @@ function readableStreamDefaultcontrollerHasBackpressure(controller) {
* @returns {boolean}
*/
function readableStreamDefaultcontrollerShouldCallPull(controller) {
const stream = controller[_stream];
if (
readableStreamDefaultControllerCanCloseOrEnqueue(controller) === false
) {
Expand All @@ -1806,6 +1806,7 @@ function readableStreamDefaultcontrollerShouldCallPull(controller) {
if (controller[_started] === false) {
return false;
}
const stream = controller[_stream];
if (
isReadableStreamLocked(stream) &&
readableStreamGetNumReadRequests(stream) > 0
Expand All @@ -1815,10 +1816,11 @@ function readableStreamDefaultcontrollerShouldCallPull(controller) {
const desiredSize = readableStreamDefaultControllerGetDesiredSize(
controller,
);
assert(desiredSize !== null);

if (desiredSize > 0) {
return true;
}
assert(desiredSize !== null);
return false;
}

Expand Down Expand Up @@ -1858,9 +1860,9 @@ function readableStreamBYOBReaderRelease(reader) {
*/
function readableStreamDefaultReaderErrorReadRequests(reader, e) {
const readRequests = reader[_readRequests];
reader[_readRequests] = [];
for (let i = 0; i < readRequests.length; ++i) {
const readRequest = readRequests[i];
reader[_readRequests] = new Queue();
while (readRequests.size !== 0) {
const readRequest = readRequests.dequeue();
readRequest.errorSteps(e);
}
}
Expand Down Expand Up @@ -1899,11 +1901,11 @@ function readableByteStreamControllerProcessReadRequestsUsingQueue(
) {
const reader = controller[_stream][_reader];
assert(isReadableStreamDefaultReader(reader));
while (reader[_readRequests].length !== 0) {
while (reader[_readRequests].size !== 0) {
if (controller[_queueTotalSize] === 0) {
return;
}
const readRequest = ArrayPrototypeShift(reader[_readRequests]);
const readRequest = reader[_readRequests].dequeue();
readableByteStreamControllerFillReadRequestFromQueue(
controller,
readRequest,
Expand Down Expand Up @@ -2538,9 +2540,9 @@ function readableStreamFulfillReadRequest(stream, chunk, done) {
assert(readableStreamHasDefaultReader(stream) === true);
/** @type {ReadableStreamDefaultReader<R>} */
const reader = stream[_reader];
assert(reader[_readRequests].length);
assert(reader[_readRequests].size);
/** @type {ReadRequest<R>} */
const readRequest = ArrayPrototypeShift(reader[_readRequests]);
const readRequest = reader[_readRequests].dequeue();
if (done) {
readRequest.closeSteps();
} else {
Expand All @@ -2563,7 +2565,7 @@ function readableStreamGetNumReadIntoRequests(stream) {
*/
function readableStreamGetNumReadRequests(stream) {
assert(readableStreamHasDefaultReader(stream) === true);
return stream[_reader][_readRequests].length;
return stream[_reader][_readRequests].size;
}

/**
Expand Down Expand Up @@ -3261,7 +3263,7 @@ function readableByteStreamTee(stream) {

function pullWithBYOBReader(view, forBranch2) {
if (isReadableStreamDefaultReader(reader)) {
assert(reader[_readRequests].length === 0);
assert(reader[_readRequests].size === 0);
readableStreamDefaultReaderRelease(reader);
reader = acquireReadableStreamBYOBReader(stream);
forwardReaderError(reader);
Expand Down Expand Up @@ -3693,7 +3695,7 @@ function setUpReadableStreamDefaultReader(reader, stream) {
throw new TypeError("ReadableStream is locked.");
}
readableStreamReaderGenericInitialize(reader, stream);
reader[_readRequests] = [];
reader[_readRequests] = new Queue();
}

/**
Expand Down Expand Up @@ -4802,11 +4804,9 @@ const readableStreamAsyncIteratorPrototype = ObjectSetPrototypeOf({
return PromisePrototypeThen(promise.promise);
}

reader[_iteratorNext] = reader[_iteratorNext]
return reader[_iteratorNext] = reader[_iteratorNext]
? PromisePrototypeThen(reader[_iteratorNext], nextSteps, nextSteps)
: nextSteps();

return reader[_iteratorNext];
},
/**
* @param {unknown} arg
Expand All @@ -4824,7 +4824,7 @@ const readableStreamAsyncIteratorPrototype = ObjectSetPrototypeOf({
if (reader[_stream] === undefined) {
return PromiseResolve({ value: undefined, done: true });
}
assert(reader[_readRequests].length === 0);
assert(reader[_readRequests].size === 0);
if (this[_preventCancel] === false) {
const result = readableStreamReaderGenericCancel(reader, arg);
readableStreamDefaultReaderRelease(reader);
Expand Down Expand Up @@ -5247,19 +5247,23 @@ class ReadableStream {
* @param {ReadableStreamIteratorOptions=} options
* @returns {AsyncIterableIterator<R>}
*/
values(options = {}) {
values(options = undefined) {
webidl.assertBranded(this, ReadableStreamPrototype);
const prefix = "Failed to execute 'values' on 'ReadableStream'";
options = webidl.converters.ReadableStreamIteratorOptions(
options,
prefix,
"Argument 1",
);
let preventCancel = false;
if (options !== undefined) {
const prefix = "Failed to execute 'values' on 'ReadableStream'";
options = webidl.converters.ReadableStreamIteratorOptions(
options,
prefix,
"Argument 1",
);
preventCancel = options.preventCancel;
}
/** @type {AsyncIterableIterator<R>} */
const iterator = ObjectCreate(readableStreamAsyncIteratorPrototype);
const reader = acquireReadableStreamDefaultReader(this);
iterator[_reader] = reader;
iterator[_preventCancel] = options.preventCancel;
iterator[_preventCancel] = preventCancel;
return iterator;
}

Expand Down