diff --git a/index.bs b/index.bs index 764d152f..9d5d5194 100644 --- a/index.bs +++ b/index.bs @@ -5489,6 +5489,7 @@ dictionary Transformer { TransformerStartCallback start; TransformerTransformCallback transform; TransformerFlushCallback flush; + TransformerCancelCallback cancel; any readableType; any writableType; }; @@ -5496,6 +5497,7 @@ dictionary Transformer { callback TransformerStartCallback = any (TransformStreamDefaultController controller); callback TransformerFlushCallback = Promise (TransformStreamDefaultController controller); callback TransformerTransformCallback = Promise (any chunk, TransformStreamDefaultController controller); +callback TransformerCancelCallback = Promise (any reason, TransformStreamDefaultController controller);
@@ -5558,6 +5560,25 @@ callback TransformerTransformCallback = Promise (any chunk, Transform {{Transformer/flush|flush()}}; the stream is already in the process of successfully closing down, and terminating it would be counterproductive.) +
cancel(reason, controller)
+
+

A function called when the [=writable side=] is aborted, or when the [=readable side=] is + cancelled. + +

Typically this is used to clean up underlying transformer resources when the stream is + aborted or cancelled. + +

If the cancellation process is asynchronous, the function can return a promise to signal + success or failure; the result will be communicated to the caller of + {{WritableStream/abort()|stream.writable.abort()}} or + {{ReadableStream/cancel()|stream.readable.cancel()}}. Throwing an exception is treated the same + as returning a rejected promise. + +

(Note that there is no need to call + {{TransformStreamDefaultController/terminate()|controller.terminate()}} inside + {{Transformer/cancel|cancel()}}; the stream is already in the process of cancelling/aborting, and + terminating it would be counterproductive.) +

readableType

This property is reserved for future use, so any attempts to supply a value will throw an @@ -5570,9 +5591,9 @@ callback TransformerTransformCallback = Promise (any chunk, Transform

The controller object passed to {{Transformer/start|start()}}, -{{Transformer/transform|transform()}}, and {{Transformer/flush|flush()}} is an instance of -{{TransformStreamDefaultController}}, and has the ability to enqueue [=chunks=] to the [=readable -side=], or to terminate or error the stream. +{{Transformer/transform|transform()}}, {{Transformer/flush|flush()}}, and +{{Transformer/cancel|cancel()}} is an instance of {{TransformStreamDefaultController}}, and has the +ability to enqueue [=chunks=] to the [=readable side=], or to terminate or error the stream.

Constructor and properties

@@ -5726,6 +5747,10 @@ the following table: Internal Slot Description (non-normative) + + \[[cancelAlgorithm]] + A promise-returning algorithm, taking one argument (the [=reason=] for + cancellation), which communicates a requested cancellation to the [=transformer=] \[[flushAlgorithm]] A promise-returning algorithm which communicates a requested close to @@ -5819,8 +5844,7 @@ The following abstract operations operate on {{TransformStream}} instances at a 1. Let |pullAlgorithm| be the following steps: 1. Return ! [$TransformStreamDefaultSourcePullAlgorithm$](|stream|). 1. Let |cancelAlgorithm| be the following steps, taking a |reason| argument: - 1. Perform ! [$TransformStreamErrorWritableAndUnblockWrite$](|stream|, |reason|). - 1. Return [=a promise resolved with=] undefined. + 1. Return ! [$TransformStreamDefaultSourceCancelAlgorithm$](|stream|, |reason|). 1. Set |stream|.[=TransformStream/[[readable]]=] to ! [$CreateReadableStream$](|startAlgorithm|, |pullAlgorithm|, |cancelAlgorithm|, |readableHighWaterMark|, |readableSizeAlgorithm|). 1. Set |stream|.[=TransformStream/[[backpressure]]=] and @@ -5854,6 +5878,14 @@ The following abstract operations operate on {{TransformStream}} instances at a 1. Perform ! [$TransformStreamDefaultControllerClearAlgorithms$](|stream|.[=TransformStream/[[controller]]=]). 1. Perform ! [$WritableStreamDefaultControllerErrorIfNeeded$](|stream|.[=TransformStream/[[writable]]=].[=WritableStream/[[controller]]=], |e|). + 1. Perform ! [$TransformStreamUnblockWrite$](|stream|). + + +
+ TransformStreamUnblockWrite(|stream|) performs the + following steps: + 1. If |stream|.[=TransformStream/[[backpressure]]=] is true, perform ! [$TransformStreamSetBackpressure$](|stream|, false). @@ -5882,7 +5914,8 @@ The following abstract operations support the implementaiton of the
SetUpTransformStreamDefaultController(|stream|, - |controller|, |transformAlgorithm|, |flushAlgorithm|) performs the following steps: + |controller|, |transformAlgorithm|, |flushAlgorithm|, |cancelAlgorithm|) performs the + following steps: 1. Assert: |stream| [=implements=] {{TransformStream}}. 1. Assert: |stream|.[=TransformStream/[[controller]]=] is undefined. @@ -5891,6 +5924,7 @@ The following abstract operations support the implementaiton of the 1. Set |controller|.[=TransformStreamDefaultController/[[transformAlgorithm]]=] to |transformAlgorithm|. 1. Set |controller|.[=TransformStreamDefaultController/[[flushAlgorithm]]=] to |flushAlgorithm|. + 1. Set |controller|.[=TransformStreamDefaultController/[[cancelAlgorithm]]=] to |cancelAlgorithm|.
@@ -5904,6 +5938,7 @@ The following abstract operations support the implementaiton of the 1. If |result| is an abrupt completion, return [=a promise rejected with=] |result|.\[[Value]]. 1. Otherwise, return [=a promise resolved with=] undefined. 1. Let |flushAlgorithm| be an algorithm which returns [=a promise resolved with=] undefined. + 1. Let |cancelAlgorithm| be an algorithm which returns [=a promise resolved with=] undefined. 1. If |transformerDict|["{{Transformer/transform}}"] [=map/exists=], set |transformAlgorithm| to an algorithm which takes an argument |chunk| and returns the result of [=invoking=] |transformerDict|["{{Transformer/transform}}"] with argument list « |chunk|, @@ -5911,8 +5946,12 @@ The following abstract operations support the implementaiton of the 1. If |transformerDict|["{{Transformer/flush}}"] [=map/exists=], set |flushAlgorithm| to an algorithm which returns the result of [=invoking=] |transformerDict|["{{Transformer/flush}}"] with argument list « |controller| » and [=callback this value=] |transformer|. + 1. If |transformerDict|["{{Transformer/cancel}}"] [=map/exists=], set |cancelAlgorithm| to an + algorithm which takes an argument |reason| and returns the result of [=invoking=] + |transformerDict|["{{Transformer/cancel}}"] with argument list « |reason|, + |controller| » and [=callback this value=] |transformer|. 1. Perform ! [$SetUpTransformStreamDefaultController$](|stream|, |controller|, - |transformAlgorithm|, |flushAlgorithm|). + |transformAlgorithm|, |flushAlgorithm|, |cancelAlgorithm|).
@@ -5931,6 +5970,7 @@ The following abstract operations support the implementaiton of the 1. Set |controller|.[=TransformStreamDefaultController/[[transformAlgorithm]]=] to undefined. 1. Set |controller|.[=TransformStreamDefaultController/[[flushAlgorithm]]=] to undefined. + 1. Set |controller|.[=TransformStreamDefaultController/[[cancelAlgorithm]]=] to undefined.
@@ -6021,8 +6061,13 @@ side=] of [=transform streams=]. id="transform-stream-default-sink-abort-algorithm">TransformStreamDefaultSinkAbortAlgorithm(|stream|, |reason|) performs the following steps: - 1. Perform ! [$TransformStreamError$](|stream|, |reason|). - 1. Return [=a promise resolved with=] undefined. + 1. Let |readable| be |stream|.[=TransformStream/[[readable]]=]. + 1. [$ReadableStreamDefaultControllerError$](|readable|.[=ReadableStream/[[controller]]=], |reason|). + 1. Let |controller| be |stream|.[=TransformStream/[[controller]]=]. + 1. Let |cancelPromise| be the result of performing + |controller|.[=TransformStreamDefaultController/[[cancelAlgorithm]]=], passing |reason|. + 1. Perform ! [$TransformStreamDefaultControllerClearAlgorithms$](|controller|). + 1. Return |cancelPromise|.
@@ -6062,6 +6107,30 @@ side=] of [=transform streams=]. 1. Return |stream|.[=TransformStream/[[backpressureChangePromise]]=].
+ +
+ TransformStreamDefaultSourceCancelAlgorithm(|reason|, + |stream|) performs the following steps: + + 1. Let |writable| be |stream|.[=TransformStream/[[writable]]=]. + 1. If |writable|.[=WritableStream/[[state]]=] is not "`writable`", return + [=a promise resolved with=] undefined. + 1. Perform ! + [$WritableStreamDefaultControllerErrorIfNeeded$](|writable|.[=WritableStream/[[controller]]=], |reason|). + 1. Perform ! [$TransformStreamUnblockWrite$](|stream|). + 1. Let |controller| be |stream|.[=TransformStream/[[controller]]=]. + 1. Let |cancelPromise| be the result of performing + |controller|.[=TransformStreamDefaultController/[[cancelAlgorithm]]=], passing |reason|. + 1. Perform ! [$TransformStreamDefaultControllerClearAlgorithms$](|controller|). + 1. Return |cancelPromise|. + +

The early return prevents the cancellation algorithm from being called if the + writable side is already (in the process of being) closed. This is important, because the + cancellation algorithm must not run if the flush algorithm has + already been run. +

+

Queuing strategies

The queuing strategy API

@@ -7106,9 +7175,11 @@ reason.
To set up a newly-[=new|created-via-Web IDL=] {{TransformStream}} |stream| given an algorithm transformAlgorithm and an optional algorithm flushAlgorithm, perform the following steps. - |transformAlgorithm| and, if given, |flushAlgorithm|, may return a promise. + for="TransformStream/set up">transformAlgorithm, an optional algorithm flushAlgorithm, and an optional algorithm cancelAlgorithm, perform the following steps. + |transformAlgorithm|, if given, |flushAlgorithm|, and, if given, |cancelAlgorithm|, may return a + promise. 1. Let |writableHighWaterMark| be 1. 1. Let |writableSizeAlgorithm| be an algorithm that returns 1. @@ -7124,12 +7195,16 @@ reason. null otherwise. If this throws an exception |e|, return [=a promise rejected with=] |e|. 1. If |result| is a {{Promise}}, then return |result|. 1. Return [=a promise resolved with=] undefined. + 1. Let |cancelAlgorithmWrapper| be an algorithm that runs these steps given a value |reason|: + 1. Let |result| be the result of running |cancelAlgorithm| given |reason|, if |cancelAlgorithm| + was given, or null otherwise. If this throws an exception |e|, return + [=a promise rejected with=] |e|. 1. Let |startPromise| be [=a promise resolved with=] undefined. 1. Perform ! [$InitializeTransformStream$](|stream|, |startPromise|, |writableHighWaterMark|, |writableSizeAlgorithm|, |readableHighWaterMark|, |readableSizeAlgorithm|). 1. Let |controller| be a [=new=] {{TransformStreamDefaultController}}. 1. Perform ! [$SetUpTransformStreamDefaultController$](|stream|, |controller|, - |transformAlgorithmWrapper|, |flushAlgorithmWrapper|). + |transformAlgorithmWrapper|, |flushAlgorithmWrapper|, |cancelAlgorithmWrapper|). Other specifications should be careful when constructing their [=TransformStream/set up/transformAlgorithm=] to avoid [=in parallel=] reads from the given diff --git a/reference-implementation/lib/Transformer.webidl b/reference-implementation/lib/Transformer.webidl index eefea2b0..2444bfcb 100644 --- a/reference-implementation/lib/Transformer.webidl +++ b/reference-implementation/lib/Transformer.webidl @@ -2,6 +2,7 @@ dictionary Transformer { TransformerStartCallback start; TransformerTransformCallback transform; TransformerFlushCallback flush; + TransformerCancelCallback cancel; any readableType; any writableType; }; @@ -9,3 +10,4 @@ dictionary Transformer { callback TransformerStartCallback = any (TransformStreamDefaultController controller); callback TransformerFlushCallback = Promise (TransformStreamDefaultController controller); callback TransformerTransformCallback = Promise (any chunk, TransformStreamDefaultController controller); +callback TransformerCancelCallback = Promise (any reason, TransformStreamDefaultController controller); diff --git a/reference-implementation/lib/abstract-ops/transform-streams.js b/reference-implementation/lib/abstract-ops/transform-streams.js index 8e3f5fcc..eb9c4d5d 100644 --- a/reference-implementation/lib/abstract-ops/transform-streams.js +++ b/reference-implementation/lib/abstract-ops/transform-streams.js @@ -51,8 +51,7 @@ function InitializeTransformStream( } function cancelAlgorithm(reason) { - TransformStreamErrorWritableAndUnblockWrite(stream, reason); - return promiseResolvedWith(undefined); + return TransformStreamDefaultSourceCancelAlgorithm(stream, reason); } stream._readable = CreateReadableStream( @@ -77,6 +76,10 @@ function TransformStreamError(stream, e) { function TransformStreamErrorWritableAndUnblockWrite(stream, e) { TransformStreamDefaultControllerClearAlgorithms(stream._controller); WritableStreamDefaultControllerErrorIfNeeded(stream._writable._controller, e); + TransformStreamUnblockWrite(stream); +} + +function TransformStreamUnblockWrite(stream) { if (stream._backpressure === true) { // Pretend that pull() was called to permit any pending write() calls to complete. TransformStreamSetBackpressure() // cannot be called from enqueue() or pull() once the ReadableStream is errored, so this will will be the final time @@ -102,7 +105,8 @@ function TransformStreamSetBackpressure(stream, backpressure) { // Default controllers -function SetUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm) { +function SetUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm, + cancelAlgorithm) { assert(TransformStream.isImpl(stream)); assert(stream._controller === undefined); @@ -111,6 +115,7 @@ function SetUpTransformStreamDefaultController(stream, controller, transformAlgo controller._transformAlgorithm = transformAlgorithm; controller._flushAlgorithm = flushAlgorithm; + controller._cancelAlgorithm = cancelAlgorithm; } function SetUpTransformStreamDefaultControllerFromTransformer(stream, transformer, transformerDict) { @@ -126,6 +131,7 @@ function SetUpTransformStreamDefaultControllerFromTransformer(stream, transforme }; let flushAlgorithm = () => promiseResolvedWith(undefined); + let cancelAlgorithm = () => promiseResolvedWith(undefined); if ('transform' in transformerDict) { transformAlgorithm = chunk => transformerDict.transform.call(transformer, chunk, controller); @@ -133,13 +139,17 @@ function SetUpTransformStreamDefaultControllerFromTransformer(stream, transforme if ('flush' in transformerDict) { flushAlgorithm = () => transformerDict.flush.call(transformer, controller); } + if ('cancel' in transformerDict) { + cancelAlgorithm = reason => transformerDict.cancel.call(transformer, reason, controller); + } - SetUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm); + SetUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm, cancelAlgorithm); } function TransformStreamDefaultControllerClearAlgorithms(controller) { controller._transformAlgorithm = undefined; controller._flushAlgorithm = undefined; + controller._cancelAlgorithm = undefined; } function TransformStreamDefaultControllerEnqueue(controller, chunk) { @@ -221,10 +231,17 @@ function TransformStreamDefaultSinkWriteAlgorithm(stream, chunk) { } function TransformStreamDefaultSinkAbortAlgorithm(stream, reason) { - // abort() is not called synchronously, so it is possible for abort() to be called when the stream is already - // errored. - TransformStreamError(stream, reason); - return promiseResolvedWith(undefined); + verbose('TransformStreamDefaultSinkAbortAlgorithm()'); + + // stream._readable cannot change after construction, so caching it across a call to user code is safe. + const readable = stream._readable; + ReadableStreamDefaultControllerError(readable._controller, reason); + + const controller = stream._controller; + const cancelPromise = controller._cancelAlgorithm(reason); + TransformStreamDefaultControllerClearAlgorithms(controller); + + return cancelPromise; } function TransformStreamDefaultSinkCloseAlgorithm(stream) { @@ -264,3 +281,21 @@ function TransformStreamDefaultSourcePullAlgorithm(stream) { // Prevent the next pull() call until there is backpressure. return stream._backpressureChangePromise; } + +function TransformStreamDefaultSourceCancelAlgorithm(stream, reason) { + verbose('TransformStreamDefaultSourceCancelAlgorithm()'); + + // stream._writable cannot change after construction, so caching it across a call to user code is safe. + const writable = stream._writable; + if (writable._state !== 'writable') { + return promiseResolvedWith(undefined); + } + WritableStreamDefaultControllerErrorIfNeeded(writable._controller, reason); + TransformStreamUnblockWrite(stream); + + const controller = stream._controller; + const cancelPromise = controller._cancelAlgorithm(reason); + TransformStreamDefaultControllerClearAlgorithms(controller); + + return cancelPromise; +}