From 147faa3af2e8ed05ebcd3322565df9ee329f9df4 Mon Sep 17 00:00:00 2001 From: Luca Casonato Date: Thu, 8 Jun 2023 17:46:01 +0200 Subject: [PATCH] TransformStream cleanup using "Transformer.cancel" This commit adds a "cancel" hook to "Transformer". This allows users to perform resource cleanup when the readable side of the TransformStream is cancelled, or the writable side is aborted. To preserve existing behavior, when the readable side is cancelled with a reason, the writable side is always immediately aborted with that same reason. The same is true in the reverse case. This means that the status of both sides is always either "closed", "erroring", or "erroring" when the "cancel" hook is called. "flush" and "cancel" are never both called. As per existing behaviour, when the writable side is closed the "flush" hook is called. If the readable side is cancelled while a promise returned from "flush" is still pending, "cancel" is not called. In this scenario the readable side ends up in the "errored" state, while the writable side ends up in the "closed" state. --- index.bs | 101 +++++++++++++++--- .../lib/Transformer.webidl | 2 + .../lib/abstract-ops/transform-streams.js | 53 +++++++-- 3 files changed, 134 insertions(+), 22 deletions(-) diff --git a/index.bs b/index.bs index 764d152f5..9d5d51946 100644 --- a/index.bs +++ b/index.bs @@ -5489,6 +5489,7 @@ dictionary Transformer { TransformerStartCallback start; TransformerTransformCallback transform; TransformerFlushCallback flush; + TransformerCancelCallback cancel; any readableType; any writableType; }; @@ -5496,6 +5497,7 @@ dictionary Transformer { callback TransformerStartCallback = any (TransformStreamDefaultController controller); callback TransformerFlushCallback = Promise (TransformStreamDefaultController controller); callback TransformerTransformCallback = Promise (any chunk, TransformStreamDefaultController controller); +callback TransformerCancelCallback = Promise (any reason, TransformStreamDefaultController controller);
@@ -5558,6 +5560,25 @@ callback TransformerTransformCallback = Promise (any chunk, Transform {{Transformer/flush|flush()}}; the stream is already in the process of successfully closing down, and terminating it would be counterproductive.) +
cancel(reason, controller)
+
+

A function called when the [=writable side=] is aborted, or when the [=readable side=] is + cancelled. + +

Typically this is used to clean up underlying transformer resources when the stream is + aborted or cancelled. + +

If the cancellation process is asynchronous, the function can return a promise to signal + success or failure; the result will be communicated to the caller of + {{WritableStream/abort()|stream.writable.abort()}} or + {{ReadableStream/cancel()|stream.readable.cancel()}}. Throwing an exception is treated the same + as returning a rejected promise. + +

(Note that there is no need to call + {{TransformStreamDefaultController/terminate()|controller.terminate()}} inside + {{Transformer/cancel|cancel()}}; the stream is already in the process of cancelling/aborting, and + terminating it would be counterproductive.) +

readableType

This property is reserved for future use, so any attempts to supply a value will throw an @@ -5570,9 +5591,9 @@ callback TransformerTransformCallback = Promise (any chunk, Transform

The controller object passed to {{Transformer/start|start()}}, -{{Transformer/transform|transform()}}, and {{Transformer/flush|flush()}} is an instance of -{{TransformStreamDefaultController}}, and has the ability to enqueue [=chunks=] to the [=readable -side=], or to terminate or error the stream. +{{Transformer/transform|transform()}}, {{Transformer/flush|flush()}}, and +{{Transformer/cancel|cancel()}} is an instance of {{TransformStreamDefaultController}}, and has the +ability to enqueue [=chunks=] to the [=readable side=], or to terminate or error the stream.

Constructor and properties

@@ -5726,6 +5747,10 @@ the following table: Internal Slot Description (non-normative) + + \[[cancelAlgorithm]] + A promise-returning algorithm, taking one argument (the [=reason=] for + cancellation), which communicates a requested cancellation to the [=transformer=] \[[flushAlgorithm]] A promise-returning algorithm which communicates a requested close to @@ -5819,8 +5844,7 @@ The following abstract operations operate on {{TransformStream}} instances at a 1. Let |pullAlgorithm| be the following steps: 1. Return ! [$TransformStreamDefaultSourcePullAlgorithm$](|stream|). 1. Let |cancelAlgorithm| be the following steps, taking a |reason| argument: - 1. Perform ! [$TransformStreamErrorWritableAndUnblockWrite$](|stream|, |reason|). - 1. Return [=a promise resolved with=] undefined. + 1. Return ! [$TransformStreamDefaultSourceCancelAlgorithm$](|stream|, |reason|). 1. Set |stream|.[=TransformStream/[[readable]]=] to ! [$CreateReadableStream$](|startAlgorithm|, |pullAlgorithm|, |cancelAlgorithm|, |readableHighWaterMark|, |readableSizeAlgorithm|). 1. Set |stream|.[=TransformStream/[[backpressure]]=] and @@ -5854,6 +5878,14 @@ The following abstract operations operate on {{TransformStream}} instances at a 1. Perform ! [$TransformStreamDefaultControllerClearAlgorithms$](|stream|.[=TransformStream/[[controller]]=]). 1. Perform ! [$WritableStreamDefaultControllerErrorIfNeeded$](|stream|.[=TransformStream/[[writable]]=].[=WritableStream/[[controller]]=], |e|). + 1. Perform ! [$TransformStreamUnblockWrite$](|stream|). + + +
+ TransformStreamUnblockWrite(|stream|) performs the + following steps: + 1. If |stream|.[=TransformStream/[[backpressure]]=] is true, perform ! [$TransformStreamSetBackpressure$](|stream|, false). @@ -5882,7 +5914,8 @@ The following abstract operations support the implementaiton of the
SetUpTransformStreamDefaultController(|stream|, - |controller|, |transformAlgorithm|, |flushAlgorithm|) performs the following steps: + |controller|, |transformAlgorithm|, |flushAlgorithm|, |cancelAlgorithm|) performs the + following steps: 1. Assert: |stream| [=implements=] {{TransformStream}}. 1. Assert: |stream|.[=TransformStream/[[controller]]=] is undefined. @@ -5891,6 +5924,7 @@ The following abstract operations support the implementaiton of the 1. Set |controller|.[=TransformStreamDefaultController/[[transformAlgorithm]]=] to |transformAlgorithm|. 1. Set |controller|.[=TransformStreamDefaultController/[[flushAlgorithm]]=] to |flushAlgorithm|. + 1. Set |controller|.[=TransformStreamDefaultController/[[cancelAlgorithm]]=] to |cancelAlgorithm|.
@@ -5904,6 +5938,7 @@ The following abstract operations support the implementaiton of the 1. If |result| is an abrupt completion, return [=a promise rejected with=] |result|.\[[Value]]. 1. Otherwise, return [=a promise resolved with=] undefined. 1. Let |flushAlgorithm| be an algorithm which returns [=a promise resolved with=] undefined. + 1. Let |cancelAlgorithm| be an algorithm which returns [=a promise resolved with=] undefined. 1. If |transformerDict|["{{Transformer/transform}}"] [=map/exists=], set |transformAlgorithm| to an algorithm which takes an argument |chunk| and returns the result of [=invoking=] |transformerDict|["{{Transformer/transform}}"] with argument list « |chunk|, @@ -5911,8 +5946,12 @@ The following abstract operations support the implementaiton of the 1. If |transformerDict|["{{Transformer/flush}}"] [=map/exists=], set |flushAlgorithm| to an algorithm which returns the result of [=invoking=] |transformerDict|["{{Transformer/flush}}"] with argument list « |controller| » and [=callback this value=] |transformer|. + 1. If |transformerDict|["{{Transformer/cancel}}"] [=map/exists=], set |cancelAlgorithm| to an + algorithm which takes an argument |reason| and returns the result of [=invoking=] + |transformerDict|["{{Transformer/cancel}}"] with argument list « |reason|, + |controller| » and [=callback this value=] |transformer|. 1. Perform ! [$SetUpTransformStreamDefaultController$](|stream|, |controller|, - |transformAlgorithm|, |flushAlgorithm|). + |transformAlgorithm|, |flushAlgorithm|, |cancelAlgorithm|).
@@ -5931,6 +5970,7 @@ The following abstract operations support the implementaiton of the 1. Set |controller|.[=TransformStreamDefaultController/[[transformAlgorithm]]=] to undefined. 1. Set |controller|.[=TransformStreamDefaultController/[[flushAlgorithm]]=] to undefined. + 1. Set |controller|.[=TransformStreamDefaultController/[[cancelAlgorithm]]=] to undefined.
@@ -6021,8 +6061,13 @@ side=] of [=transform streams=]. id="transform-stream-default-sink-abort-algorithm">TransformStreamDefaultSinkAbortAlgorithm(|stream|, |reason|) performs the following steps: - 1. Perform ! [$TransformStreamError$](|stream|, |reason|). - 1. Return [=a promise resolved with=] undefined. + 1. Let |readable| be |stream|.[=TransformStream/[[readable]]=]. + 1. [$ReadableStreamDefaultControllerError$](|readable|.[=ReadableStream/[[controller]]=], |reason|). + 1. Let |controller| be |stream|.[=TransformStream/[[controller]]=]. + 1. Let |cancelPromise| be the result of performing + |controller|.[=TransformStreamDefaultController/[[cancelAlgorithm]]=], passing |reason|. + 1. Perform ! [$TransformStreamDefaultControllerClearAlgorithms$](|controller|). + 1. Return |cancelPromise|.
@@ -6062,6 +6107,30 @@ side=] of [=transform streams=]. 1. Return |stream|.[=TransformStream/[[backpressureChangePromise]]=].
+ +
+ TransformStreamDefaultSourceCancelAlgorithm(|reason|, + |stream|) performs the following steps: + + 1. Let |writable| be |stream|.[=TransformStream/[[writable]]=]. + 1. If |writable|.[=WritableStream/[[state]]=] is not "`writable`", return + [=a promise resolved with=] undefined. + 1. Perform ! + [$WritableStreamDefaultControllerErrorIfNeeded$](|writable|.[=WritableStream/[[controller]]=], |reason|). + 1. Perform ! [$TransformStreamUnblockWrite$](|stream|). + 1. Let |controller| be |stream|.[=TransformStream/[[controller]]=]. + 1. Let |cancelPromise| be the result of performing + |controller|.[=TransformStreamDefaultController/[[cancelAlgorithm]]=], passing |reason|. + 1. Perform ! [$TransformStreamDefaultControllerClearAlgorithms$](|controller|). + 1. Return |cancelPromise|. + +

The early return prevents the cancellation algorithm from being called if the + writable side is already (in the process of being) closed. This is important, because the + cancellation algorithm must not run if the flush algorithm has + already been run. +

+

Queuing strategies

The queuing strategy API

@@ -7106,9 +7175,11 @@ reason.
To set up a newly-[=new|created-via-Web IDL=] {{TransformStream}} |stream| given an algorithm transformAlgorithm and an optional algorithm flushAlgorithm, perform the following steps. - |transformAlgorithm| and, if given, |flushAlgorithm|, may return a promise. + for="TransformStream/set up">transformAlgorithm, an optional algorithm flushAlgorithm, and an optional algorithm cancelAlgorithm, perform the following steps. + |transformAlgorithm|, if given, |flushAlgorithm|, and, if given, |cancelAlgorithm|, may return a + promise. 1. Let |writableHighWaterMark| be 1. 1. Let |writableSizeAlgorithm| be an algorithm that returns 1. @@ -7124,12 +7195,16 @@ reason. null otherwise. If this throws an exception |e|, return [=a promise rejected with=] |e|. 1. If |result| is a {{Promise}}, then return |result|. 1. Return [=a promise resolved with=] undefined. + 1. Let |cancelAlgorithmWrapper| be an algorithm that runs these steps given a value |reason|: + 1. Let |result| be the result of running |cancelAlgorithm| given |reason|, if |cancelAlgorithm| + was given, or null otherwise. If this throws an exception |e|, return + [=a promise rejected with=] |e|. 1. Let |startPromise| be [=a promise resolved with=] undefined. 1. Perform ! [$InitializeTransformStream$](|stream|, |startPromise|, |writableHighWaterMark|, |writableSizeAlgorithm|, |readableHighWaterMark|, |readableSizeAlgorithm|). 1. Let |controller| be a [=new=] {{TransformStreamDefaultController}}. 1. Perform ! [$SetUpTransformStreamDefaultController$](|stream|, |controller|, - |transformAlgorithmWrapper|, |flushAlgorithmWrapper|). + |transformAlgorithmWrapper|, |flushAlgorithmWrapper|, |cancelAlgorithmWrapper|). Other specifications should be careful when constructing their [=TransformStream/set up/transformAlgorithm=] to avoid [=in parallel=] reads from the given diff --git a/reference-implementation/lib/Transformer.webidl b/reference-implementation/lib/Transformer.webidl index eefea2b0d..2444bfcb3 100644 --- a/reference-implementation/lib/Transformer.webidl +++ b/reference-implementation/lib/Transformer.webidl @@ -2,6 +2,7 @@ dictionary Transformer { TransformerStartCallback start; TransformerTransformCallback transform; TransformerFlushCallback flush; + TransformerCancelCallback cancel; any readableType; any writableType; }; @@ -9,3 +10,4 @@ dictionary Transformer { callback TransformerStartCallback = any (TransformStreamDefaultController controller); callback TransformerFlushCallback = Promise (TransformStreamDefaultController controller); callback TransformerTransformCallback = Promise (any chunk, TransformStreamDefaultController controller); +callback TransformerCancelCallback = Promise (any reason, TransformStreamDefaultController controller); diff --git a/reference-implementation/lib/abstract-ops/transform-streams.js b/reference-implementation/lib/abstract-ops/transform-streams.js index 8e3f5fcc3..8e1eac73f 100644 --- a/reference-implementation/lib/abstract-ops/transform-streams.js +++ b/reference-implementation/lib/abstract-ops/transform-streams.js @@ -51,8 +51,7 @@ function InitializeTransformStream( } function cancelAlgorithm(reason) { - TransformStreamErrorWritableAndUnblockWrite(stream, reason); - return promiseResolvedWith(undefined); + return TransformStreamDefaultSourceCancelAlgorithm(stream, reason); } stream._readable = CreateReadableStream( @@ -77,6 +76,10 @@ function TransformStreamError(stream, e) { function TransformStreamErrorWritableAndUnblockWrite(stream, e) { TransformStreamDefaultControllerClearAlgorithms(stream._controller); WritableStreamDefaultControllerErrorIfNeeded(stream._writable._controller, e); + TransformStreamUnblockWrite(stream); +} + +function TransformStreamUnblockWrite(stream) { if (stream._backpressure === true) { // Pretend that pull() was called to permit any pending write() calls to complete. TransformStreamSetBackpressure() // cannot be called from enqueue() or pull() once the ReadableStream is errored, so this will will be the final time @@ -102,7 +105,8 @@ function TransformStreamSetBackpressure(stream, backpressure) { // Default controllers -function SetUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm) { +function SetUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm, + cancelAlgorithm) { assert(TransformStream.isImpl(stream)); assert(stream._controller === undefined); @@ -111,6 +115,7 @@ function SetUpTransformStreamDefaultController(stream, controller, transformAlgo controller._transformAlgorithm = transformAlgorithm; controller._flushAlgorithm = flushAlgorithm; + controller._cancelAlgorithm = cancelAlgorithm; } function SetUpTransformStreamDefaultControllerFromTransformer(stream, transformer, transformerDict) { @@ -125,7 +130,8 @@ function SetUpTransformStreamDefaultControllerFromTransformer(stream, transforme } }; - let flushAlgorithm = () => promiseResolvedWith(undefined); + let flushAlgorithm = () => promiseResolvedWith(undefined); + let cancelAlgorithm = () => promiseResolvedWith(undefined); if ('transform' in transformerDict) { transformAlgorithm = chunk => transformerDict.transform.call(transformer, chunk, controller); @@ -133,13 +139,17 @@ function SetUpTransformStreamDefaultControllerFromTransformer(stream, transforme if ('flush' in transformerDict) { flushAlgorithm = () => transformerDict.flush.call(transformer, controller); } + if ('cancel' in transformerDict) { + cancelAlgorithm = reason => transformerDict.cancel.call(transformer, reason, controller); + } - SetUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm); + SetUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm, cancelAlgorithm); } function TransformStreamDefaultControllerClearAlgorithms(controller) { controller._transformAlgorithm = undefined; controller._flushAlgorithm = undefined; + controller._cancelAlgorithm = undefined; } function TransformStreamDefaultControllerEnqueue(controller, chunk) { @@ -221,10 +231,17 @@ function TransformStreamDefaultSinkWriteAlgorithm(stream, chunk) { } function TransformStreamDefaultSinkAbortAlgorithm(stream, reason) { - // abort() is not called synchronously, so it is possible for abort() to be called when the stream is already - // errored. - TransformStreamError(stream, reason); - return promiseResolvedWith(undefined); + verbose('TransformStreamDefaultSinkAbortAlgorithm()'); + + // stream._readable cannot change after construction, so caching it across a call to user code is safe. + const readable = stream._readable; + ReadableStreamDefaultControllerError(readable._controller, reason); + + const controller = stream._controller; + const cancelPromise = controller._cancelAlgorithm(reason); + TransformStreamDefaultControllerClearAlgorithms(controller); + + return cancelPromise; } function TransformStreamDefaultSinkCloseAlgorithm(stream) { @@ -264,3 +281,21 @@ function TransformStreamDefaultSourcePullAlgorithm(stream) { // Prevent the next pull() call until there is backpressure. return stream._backpressureChangePromise; } + +function TransformStreamDefaultSourceCancelAlgorithm(stream, reason) { + verbose('TransformStreamDefaultSourceCancelAlgorithm()'); + + // stream._writable cannot change after construction, so caching it across a call to user code is safe. + const writable = stream._writable; + if (writable._state !== 'writable') { + return promiseResolvedWith(undefined); + } + WritableStreamDefaultControllerErrorIfNeeded(writable._controller, reason); + TransformStreamUnblockWrite(stream); + + const controller = stream._controller; + const cancelPromise = controller._cancelAlgorithm(reason); + TransformStreamDefaultControllerClearAlgorithms(controller); + + return cancelPromise; +}