Skip to content

Commit

Permalink
feat(ext/web): cancel support for TransformStream (denoland#20815)
Browse files Browse the repository at this point in the history
  • Loading branch information
lucacasonato committed Oct 10, 2023
1 parent 6bbccb7 commit 6450334
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 34 deletions.
121 changes: 105 additions & 16 deletions ext/web/06_streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ const _controller = Symbol("[[controller]]");
const _detached = Symbol("[[Detached]]");
const _disturbed = Symbol("[[disturbed]]");
const _errorSteps = Symbol("[[ErrorSteps]]");
const _finishPromise = Symbol("[[finishPromise]]");
const _flushAlgorithm = Symbol("[[flushAlgorithm]]");
const _globalObject = Symbol("[[globalObject]]");
const _highWaterMark = Symbol("[[highWaterMark]]");
Expand Down Expand Up @@ -609,8 +610,7 @@ function initializeTransformStream(
}

function cancelAlgorithm(reason) {
transformStreamErrorWritableAndUnblockWrite(stream, reason);
return resolvePromiseWith(undefined);
return transformStreamDefaultSourceCancelAlgorithm(stream, reason);
}

stream[_readable] = createReadableStream(
Expand Down Expand Up @@ -3690,19 +3690,22 @@ function setUpReadableStreamDefaultReader(reader, stream) {
* @param {TransformStreamDefaultController<O>} controller
* @param {(chunk: O, controller: TransformStreamDefaultController<O>) => Promise<void>} transformAlgorithm
* @param {(controller: TransformStreamDefaultController<O>) => Promise<void>} flushAlgorithm
* @param {(reason: any) => Promise<void>} cancelAlgorithm
*/
function setUpTransformStreamDefaultController(
stream,
controller,
transformAlgorithm,
flushAlgorithm,
cancelAlgorithm,
) {
assert(ObjectPrototypeIsPrototypeOf(TransformStreamPrototype, stream));
assert(stream[_controller] === undefined);
controller[_stream] = stream;
stream[_controller] = controller;
controller[_transformAlgorithm] = transformAlgorithm;
controller[_flushAlgorithm] = flushAlgorithm;
controller[_cancelAlgorithm] = cancelAlgorithm;
}

/**
Expand Down Expand Up @@ -3730,6 +3733,8 @@ function setUpTransformStreamDefaultControllerFromTransformer(
};
/** @type {(controller: TransformStreamDefaultController<O>) => Promise<void>} */
let flushAlgorithm = () => resolvePromiseWith(undefined);
/** @type {(reason: any) => Promise<void>} */
let cancelAlgorithm = () => resolvePromiseWith(undefined);
if (transformerDict.transform !== undefined) {
transformAlgorithm = (chunk, controller) =>
webidl.invokeCallbackFunction(
Expand All @@ -3752,11 +3757,23 @@ function setUpTransformStreamDefaultControllerFromTransformer(
true,
);
}
if (transformerDict.cancel !== undefined) {
cancelAlgorithm = (reason) =>
webidl.invokeCallbackFunction(
transformerDict.cancel,
[reason],
transformer,
webidl.converters["Promise<undefined>"],
"Failed to call 'cancelAlgorithm' on 'TransformStreamDefaultController'",
true,
);
}
setUpTransformStreamDefaultController(
stream,
controller,
transformAlgorithm,
flushAlgorithm,
cancelAlgorithm,
);
}

Expand Down Expand Up @@ -3938,6 +3955,7 @@ function setUpWritableStreamDefaultWriter(writer, stream) {
function transformStreamDefaultControllerClearAlgorithms(controller) {
controller[_transformAlgorithm] = undefined;
controller[_flushAlgorithm] = undefined;
controller[_cancelAlgorithm] = undefined;
}

/**
Expand Down Expand Up @@ -4007,13 +4025,33 @@ function transformStreamDefaultControllerTerminate(controller) {
}

/**
* @param {TransformStream} stream
* @template I
* @template O
* @param {TransformStream<I, O>} stream
* @param {any=} reason
* @returns {Promise<void>}
*/
function transformStreamDefaultSinkAbortAlgorithm(stream, reason) {
transformStreamError(stream, reason);
return resolvePromiseWith(undefined);
const controller = stream[_controller];
if (controller[_finishPromise] !== undefined) {
return controller[_finishPromise].promise;
}
const readable = stream[_readable];
controller[_finishPromise] = new Deferred();
const cancelPromise = controller[_cancelAlgorithm](reason);
transformStreamDefaultControllerClearAlgorithms(controller);
transformPromiseWith(cancelPromise, () => {
if (readable[_state] === "errored") {
controller[_finishPromise].reject(readable[_storedError]);
} else {
readableStreamDefaultControllerError(readable[_controller], reason);
controller[_finishPromise].resolve(undefined);
}
}, (r) => {
readableStreamDefaultControllerError(readable[_controller], r);
controller[_finishPromise].reject(r);
});
return controller[_finishPromise].promise;
}

/**
Expand All @@ -4023,21 +4061,26 @@ function transformStreamDefaultSinkAbortAlgorithm(stream, reason) {
* @returns {Promise<void>}
*/
function transformStreamDefaultSinkCloseAlgorithm(stream) {
const readable = stream[_readable];
const controller = stream[_controller];
if (controller[_finishPromise] !== undefined) {
return controller[_finishPromise].promise;
}
const readable = stream[_readable];
controller[_finishPromise] = new Deferred();
const flushPromise = controller[_flushAlgorithm](controller);
transformStreamDefaultControllerClearAlgorithms(controller);
return transformPromiseWith(flushPromise, () => {
transformPromiseWith(flushPromise, () => {
if (readable[_state] === "errored") {
throw readable[_storedError];
controller[_finishPromise].reject(readable[_storedError]);
} else {
readableStreamDefaultControllerClose(readable[_controller]);
controller[_finishPromise].resolve(undefined);
}
readableStreamDefaultControllerClose(
/** @type {ReadableStreamDefaultController} */ readable[_controller],
);
}, (r) => {
transformStreamError(stream, r);
throw readable[_storedError];
readableStreamDefaultControllerError(readable[_controller], r);
controller[_finishPromise].reject(r);
});
return controller[_finishPromise].promise;
}

/**
Expand Down Expand Up @@ -4069,6 +4112,41 @@ function transformStreamDefaultSinkWriteAlgorithm(stream, chunk) {
return transformStreamDefaultControllerPerformTransform(controller, chunk);
}

/**
* @template I
* @template O
* @param {TransformStream<I, O>} stream
* @param {any=} reason
* @returns {Promise<void>}
*/
function transformStreamDefaultSourceCancelAlgorithm(stream, reason) {
const controller = stream[_controller];
if (controller[_finishPromise] !== undefined) {
return controller[_finishPromise].promise;
}
const writable = stream[_writable];
controller[_finishPromise] = new Deferred();
const cancelPromise = controller[_cancelAlgorithm](reason);
transformStreamDefaultControllerClearAlgorithms(controller);
transformPromiseWith(cancelPromise, () => {
if (writable[_state] === "errored") {
controller[_finishPromise].reject(writable[_storedError]);
} else {
writableStreamDefaultControllerErrorIfNeeded(
writable[_controller],
reason,
);
transformStreamUnblockWrite(stream);
controller[_finishPromise].resolve(undefined);
}
}, (r) => {
writableStreamDefaultControllerErrorIfNeeded(writable[_controller], r);
transformStreamUnblockWrite(stream);
controller[_finishPromise].reject(r);
});
return controller[_finishPromise].promise;
}

/**
* @param {TransformStream} stream
* @returns {Promise<void>}
Expand Down Expand Up @@ -4104,9 +4182,7 @@ function transformStreamErrorWritableAndUnblockWrite(stream, e) {
stream[_writable][_controller],
e,
);
if (stream[_backpressure] === true) {
transformStreamSetBackpressure(stream, false);
}
transformStreamUnblockWrite(stream);
}

/**
Expand All @@ -4122,6 +4198,15 @@ function transformStreamSetBackpressure(stream, backpressure) {
stream[_backpressure] = backpressure;
}

/**
* @param {TransformStream} stream
*/
function transformStreamUnblockWrite(stream) {
if (stream[_backpressure] === true) {
transformStreamSetBackpressure(stream, false);
}
}

/**
* @param {WritableStream} stream
* @param {any=} reason
Expand Down Expand Up @@ -6007,6 +6092,10 @@ const TransformStreamPrototype = TransformStream.prototype;

/** @template O */
class TransformStreamDefaultController {
/** @type {(reason: any) => Promise<void>} */
[_cancelAlgorithm];
/** @type {Promise<void> | undefined} */
[_finishPromise];
/** @type {(controller: this) => Promise<void>} */
[_flushAlgorithm];
/** @type {TransformStream<O>} */
Expand Down
1 change: 1 addition & 0 deletions ext/web/lib.deno_web.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -933,6 +933,7 @@ declare interface Transformer<I = any, O = any> {
readableType?: undefined;
start?: TransformStreamDefaultControllerCallback<O>;
transform?: TransformStreamDefaultControllerTransformCallback<I, O>;
cancel?: (reason: any) => Promise<void>;
writableType?: undefined;
}

Expand Down
24 changes: 6 additions & 18 deletions tools/wpt/expectation.json
Original file line number Diff line number Diff line change
Expand Up @@ -3161,32 +3161,20 @@
"transform-streams": {
"backpressure.any.html": true,
"backpressure.any.worker.html": true,
"errors.any.html": [
"controller.error() should close writable immediately after readable.cancel()"
],
"errors.any.worker.html": [
"controller.error() should close writable immediately after readable.cancel()"
],
"errors.any.html": true,
"errors.any.worker.html": true,
"flush.any.html": true,
"flush.any.worker.html": true,
"general.any.html": [
"terminate() should abort writable immediately after readable.cancel()"
],
"general.any.worker.html": [
"terminate() should abort writable immediately after readable.cancel()"
],
"general.any.html": true,
"general.any.worker.html": true,
"lipfuzz.any.html": true,
"lipfuzz.any.worker.html": true,
"patched-global.any.html": true,
"patched-global.any.worker.html": true,
"properties.any.html": true,
"properties.any.worker.html": true,
"reentrant-strategies.any.html": [
"writer.abort() inside size() should work"
],
"reentrant-strategies.any.worker.html": [
"writer.abort() inside size() should work"
],
"reentrant-strategies.any.html": true,
"reentrant-strategies.any.worker.html": true,
"strategies.any.html": true,
"strategies.any.worker.html": true,
"terminate.any.html": true,
Expand Down

0 comments on commit 6450334

Please sign in to comment.