Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TransformStream cleanup using "Transformer.cancel" #1283

Merged
merged 15 commits into from
Sep 30, 2023
133 changes: 113 additions & 20 deletions index.bs
Original file line number Diff line number Diff line change
Expand Up @@ -5489,13 +5489,15 @@ dictionary Transformer {
TransformerStartCallback start;
TransformerTransformCallback transform;
TransformerFlushCallback flush;
TransformerCancelCallback cancel;
any readableType;
any writableType;
};

callback TransformerStartCallback = any (TransformStreamDefaultController controller);
callback TransformerFlushCallback = Promise<undefined> (TransformStreamDefaultController controller);
callback TransformerTransformCallback = Promise<undefined> (any chunk, TransformStreamDefaultController controller);
callback TransformerCancelCallback = Promise<undefined> (any reason);
</xmp>

<dl>
Expand Down Expand Up @@ -5558,6 +5560,25 @@ callback TransformerTransformCallback = Promise<undefined> (any chunk, Transform
{{Transformer/flush|flush()}}; the stream is already in the process of successfully closing down,
and terminating it would be counterproductive.)

<dt><dfn dict-member for="Transformer" lt="cancel">cancel(<var ignore>reason</var>)</dfn></dt>
<dd>
<p>A function called when the [=readable side=] is cancelled, or when the [=writable side=] is
aborted.

<p>Typically this is used to clean up underlying transformer resources when the stream is aborted
or cancelled.

<p>If the cancellation process is asynchronous, the function can return a promise to signal
success or failure; the result will be communicated to the caller of
{{WritableStream/abort()|stream.writable.abort()}} or
{{ReadableStream/cancel()|stream.readable.cancel()}}. Throwing an exception is treated the same
as returning a rejected promise.

<p>(Note that there is no need to call
{{TransformStreamDefaultController/terminate()|controller.terminate()}} inside
{{Transformer/cancel|cancel()}}; the stream is already in the process of cancelling/aborting, and
terminating it would be counterproductive.)

Comment on lines +5577 to +5581
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
<p>(Note that there is no need to call
{{TransformStreamDefaultController/terminate()|controller.terminate()}} inside
{{Transformer/cancel|cancel()}}; the stream is already in the process of cancelling/aborting, and
terminating it would be counterproductive.)

This part is redundant after 1c65d61.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My train of thought is that you could extract the controller in start and assign it to a local variable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, hmm, okay then. Maybe also worth having tests for that btw.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously it was impossible because of the immediate close semantics. I just pushed the new semantics, and will add tests for this case.

<dt><dfn dict-member for="Transformer">readableType</dfn></dt>
<dd>
<p>This property is reserved for future use, so any attempts to supply a value will throw an
Expand All @@ -5571,8 +5592,8 @@ callback TransformerTransformCallback = Promise<undefined> (any chunk, Transform

The <code>controller</code> object passed to {{Transformer/start|start()}},
{{Transformer/transform|transform()}}, and {{Transformer/flush|flush()}} is an instance of
{{TransformStreamDefaultController}}, and has the ability to enqueue [=chunks=] to the [=readable
side=], or to terminate or error the stream.
{{TransformStreamDefaultController}}, and has the ability to enqueue [=chunks=] to the
[=readable side=], or to terminate or error the stream.

<h4 id="ts-prototype">Constructor and properties</h4>

Expand Down Expand Up @@ -5726,6 +5747,16 @@ the following table:
<th>Internal Slot</th>
<th>Description (<em>non-normative</em>)</th>
<tbody>
<tr>
<td><dfn>\[[cancelAlgorithm]]</dfn>
<td class="non-normative">A promise-returning algorithm, taking one argument (the [=reason=] for
lucacasonato marked this conversation as resolved.
Show resolved Hide resolved
cancellation), which communicates a requested cancellation to the [=transformer=]
<tr>
<td><dfn>\[[finishPromise]]</dfn>
<td class="non-normative">A promise which resolves on completion of either the
[=TransformStreamDefaultController/[[cancelAlgorithm]]=] or the
[=TransformStreamDefaultController/[[flushAlgorithm]]=]. If this field is unpopulated (that is,
undefined), then neither of those algorithms have been [=invoked=] yet
<tr>
<td><dfn>\[[flushAlgorithm]]</dfn>
<td class="non-normative">A promise-returning algorithm which communicates a requested close to
Expand Down Expand Up @@ -5819,8 +5850,7 @@ The following abstract operations operate on {{TransformStream}} instances at a
1. Let |pullAlgorithm| be the following steps:
1. Return ! [$TransformStreamDefaultSourcePullAlgorithm$](|stream|).
1. Let |cancelAlgorithm| be the following steps, taking a |reason| argument:
1. Perform ! [$TransformStreamErrorWritableAndUnblockWrite$](|stream|, |reason|).
1. Return [=a promise resolved with=] undefined.
1. Return ! [$TransformStreamDefaultSourceCancelAlgorithm$](|stream|, |reason|).
1. Set |stream|.[=TransformStream/[[readable]]=] to ! [$CreateReadableStream$](|startAlgorithm|,
|pullAlgorithm|, |cancelAlgorithm|, |readableHighWaterMark|, |readableSizeAlgorithm|).
1. Set |stream|.[=TransformStream/[[backpressure]]=] and
Expand Down Expand Up @@ -5854,6 +5884,14 @@ The following abstract operations operate on {{TransformStream}} instances at a
1. Perform ! [$TransformStreamDefaultControllerClearAlgorithms$](|stream|.[=TransformStream/[[controller]]=]).
1. Perform !
[$WritableStreamDefaultControllerErrorIfNeeded$](|stream|.[=TransformStream/[[writable]]=].[=WritableStream/[[controller]]=], |e|).
1. Perform ! [$TransformStreamUnblockWrite$](|stream|).
</div>

<div algorithm>
<dfn abstract-op lt="TransformStreamUnblockWrite"
lucacasonato marked this conversation as resolved.
Show resolved Hide resolved
id="transform-stream-unblock-write">TransformStreamUnblockWrite(|stream|)</dfn> performs the
following steps:

1. If |stream|.[=TransformStream/[[backpressure]]=] is true, perform ! [$TransformStreamSetBackpressure$](|stream|,
false).

Expand Down Expand Up @@ -5882,7 +5920,8 @@ The following abstract operations support the implementaiton of the
<div algorithm>
<dfn abstract-op lt="SetUpTransformStreamDefaultController"
id="set-up-transform-stream-default-controller">SetUpTransformStreamDefaultController(|stream|,
|controller|, |transformAlgorithm|, |flushAlgorithm|)</dfn> performs the following steps:
|controller|, |transformAlgorithm|, |flushAlgorithm|, |cancelAlgorithm|)</dfn> performs the
following steps:

1. Assert: |stream| [=implements=] {{TransformStream}}.
1. Assert: |stream|.[=TransformStream/[[controller]]=] is undefined.
Expand All @@ -5891,6 +5930,7 @@ The following abstract operations support the implementaiton of the
1. Set |controller|.[=TransformStreamDefaultController/[[transformAlgorithm]]=] to
|transformAlgorithm|.
1. Set |controller|.[=TransformStreamDefaultController/[[flushAlgorithm]]=] to |flushAlgorithm|.
1. Set |controller|.[=TransformStreamDefaultController/[[cancelAlgorithm]]=] to |cancelAlgorithm|.
</div>

<div algorithm>
Expand All @@ -5904,15 +5944,20 @@ The following abstract operations support the implementaiton of the
1. If |result| is an abrupt completion, return [=a promise rejected with=] |result|.\[[Value]].
1. Otherwise, return [=a promise resolved with=] undefined.
1. Let |flushAlgorithm| be an algorithm which returns [=a promise resolved with=] undefined.
1. Let |cancelAlgorithm| be an algorithm which returns [=a promise resolved with=] undefined.
1. If |transformerDict|["{{Transformer/transform}}"] [=map/exists=], set |transformAlgorithm| to an
algorithm which takes an argument |chunk| and returns the result of [=invoking=]
|transformerDict|["{{Transformer/transform}}"] with argument list «&nbsp;|chunk|,
|controller|&nbsp;» and [=callback this value=] |transformer|.
1. If |transformerDict|["{{Transformer/flush}}"] [=map/exists=], set |flushAlgorithm| to an
algorithm which returns the result of [=invoking=] |transformerDict|["{{Transformer/flush}}"]
with argument list «&nbsp;|controller|&nbsp;» and [=callback this value=] |transformer|.
1. If |transformerDict|["{{Transformer/cancel}}"] [=map/exists=], set |cancelAlgorithm| to an
algorithm which takes an argument |reason| and returns the result of [=invoking=]
|transformerDict|["{{Transformer/cancel}}"] with argument list «&nbsp;|reason|&nbsp;» and
[=callback this value=] |transformer|.
1. Perform ! [$SetUpTransformStreamDefaultController$](|stream|, |controller|,
|transformAlgorithm|, |flushAlgorithm|).
|transformAlgorithm|, |flushAlgorithm|, |cancelAlgorithm|).
</div>

<div algorithm>
Expand All @@ -5931,6 +5976,7 @@ The following abstract operations support the implementaiton of the

1. Set |controller|.[=TransformStreamDefaultController/[[transformAlgorithm]]=] to undefined.
1. Set |controller|.[=TransformStreamDefaultController/[[flushAlgorithm]]=] to undefined.
1. Set |controller|.[=TransformStreamDefaultController/[[cancelAlgorithm]]=] to undefined.
</div>

<div algorithm>
Expand Down Expand Up @@ -6021,29 +6067,45 @@ side=] of [=transform streams=].
id="transform-stream-default-sink-abort-algorithm">TransformStreamDefaultSinkAbortAlgorithm(|stream|,
|reason|)</dfn> performs the following steps:

1. Perform ! [$TransformStreamError$](|stream|, |reason|).
1. Return [=a promise resolved with=] undefined.
1. Let |controller| be |stream|.[=TransformStream/[[controller]]=].
1. If |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] is not undefined, return
|controller|.[=TransformStreamDefaultController/[[finishPromise]]=].
1. Let |readable| be |stream|.[=TransformStream/[[readable]]=].
1. Let |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] be a new promise.
1. Let |cancelPromise| be the result of performing
|controller|.[=TransformStreamDefaultController/[[cancelAlgorithm]]=], passing |reason|.
1. Perform ! [$TransformStreamDefaultControllerClearAlgorithms$](|controller|).
1. [=React=] to |cancelPromise|:
1. If |cancelPromise| was fulfilled, then:
1. Perform ! [$ReadableStreamDefaultControllerError$](|readable|.[=ReadableStream/[[controller]]=], |reason|).
1. [=Resolve=] |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] with undefined.
1. If |cancelPromise| was rejected with reason |r|, then:
1. Perform ! [$ReadableStreamDefaultControllerError$](|readable|.[=ReadableStream/[[controller]]=], |r|).
1. [=Reject=] |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] with |r|.
1. Return |controller|.[=TransformStreamDefaultController/[[finishPromise]]=].
</div>

<div algorithm>
<dfn abstract-op lt="TransformStreamDefaultSinkCloseAlgorithm"
id="transform-stream-default-sink-close-algorithm">TransformStreamDefaultSinkCloseAlgorithm(|stream|)</dfn>
performs the following steps:

1. Let |readable| be |stream|.[=TransformStream/[[readable]]=].
1. Let |controller| be |stream|.[=TransformStream/[[controller]]=].
1. If |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] is not undefined, return
|controller|.[=TransformStreamDefaultController/[[finishPromise]]=].
1. Let |readable| be |stream|.[=TransformStream/[[readable]]=].
1. Let |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] be a new promise.
1. Let |flushPromise| be the result of performing
|controller|.[=TransformStreamDefaultController/[[flushAlgorithm]]=].
1. Perform ! [$TransformStreamDefaultControllerClearAlgorithms$](|controller|).
1. Return the result of [=reacting=] to |flushPromise|:
1. [=React=] to |flushPromise|:
1. If |flushPromise| was fulfilled, then:
1. If |readable|.[=ReadableStream/[[state]]=] is "`errored`", throw
|readable|.[=ReadableStream/[[storedError]]=].
lucacasonato marked this conversation as resolved.
Show resolved Hide resolved
1. Perform !
[$ReadableStreamDefaultControllerClose$](|readable|.[=ReadableStream/[[controller]]=]).
1. Perform ! [$ReadableStreamDefaultControllerClose$](|readable|.[=ReadableStream/[[controller]]=]).
1. [=Resolve=] |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] with undefined.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(At the first glance it looked like it was ignoring the error, but in that case it would have finishPromise and thus would return at the above check.)

1. If |flushPromise| was rejected with reason |r|, then:
1. Perform ! [$TransformStreamError$](|stream|, |r|).
1. Throw |readable|.[=ReadableStream/[[storedError]]=].
1. Perform ! [$ReadableStreamDefaultControllerError$](|readable|.[=ReadableStream/[[controller]]=], |r|).
1. [=Reject=] |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] with |r|.
1. Return |controller|.[=TransformStreamDefaultController/[[finishPromise]]=].
</div>

<h4 id="ts-default-source-abstract-ops">Default sources</h4>
Expand All @@ -6062,6 +6124,32 @@ side=] of [=transform streams=].
1. Return |stream|.[=TransformStream/[[backpressureChangePromise]]=].
</div>


<div algorithm>
<dfn abstract-op lt="TransformStreamDefaultSourceCancelAlgorithm"
id="transform-stream-default-source-cancel">TransformStreamDefaultSourceCancelAlgorithm(|reason|,
|stream|)</dfn> performs the following steps:

1. Let |controller| be |stream|.[=TransformStream/[[controller]]=].
1. If |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] is not undefined, return
|controller|.[=TransformStreamDefaultController/[[finishPromise]]=].
1. Let |writable| be |stream|.[=TransformStream/[[writable]]=].
1. Let |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] be a new promise.
1. Let |cancelPromise| be the result of performing
|controller|.[=TransformStreamDefaultController/[[cancelAlgorithm]]=], passing |reason|.
1. Perform ! [$TransformStreamDefaultControllerClearAlgorithms$](|controller|).
1. [=React=] to |cancelPromise|:
1. If |cancelPromise| was fulfilled, then:
1. Perform ! [$WritableStreamDefaultControllerErrorIfNeeded$](|writable|.[=WritableStream/[[controller]]=], |reason|).
1. Perform ! [$TransformStreamUnblockWrite$](|stream|).
1. [=Resolve=] |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] with undefined.
1. If |cancelPromise| was rejected with reason |r|, then:
1. Perform ! [$WritableStreamDefaultControllerErrorIfNeeded$](|writable|.[=WritableStream/[[controller]]=], |r|).
1. Perform ! [$TransformStreamUnblockWrite$](|stream|).
1. [=Reject=] |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] with |r|.
1. Return |controller|.[=TransformStreamDefaultController/[[finishPromise]]=].
</div>

<h2 id="qs">Queuing strategies</h2>

<h3 id="qs-api">The queuing strategy API</h3>
Expand Down Expand Up @@ -7106,9 +7194,10 @@ reason.
<div algorithm="create a TransformStream">
To <dfn export for="TransformStream" lt="set up|setting up">set up</dfn> a
newly-[=new|created-via-Web IDL=] {{TransformStream}} |stream| given an algorithm <dfn export
for="TransformStream/set up"><var>transformAlgorithm</var></dfn> and an optional algorithm <dfn
export for="TransformStream/set up"><var>flushAlgorithm</var></dfn>, perform the following steps.
|transformAlgorithm| and, if given, |flushAlgorithm|, may return a promise.
for="TransformStream/set up"><var>transformAlgorithm</var></dfn>, an optional algorithm <dfn
export for="TransformStream/set up"><var>flushAlgorithm</var></dfn>, and an optional algorithm <dfn
export for="TransformStream/set up"><var>cancelAlgorithm</var></dfn>, perform the following steps.
|transformAlgorithm|, if given, |flushAlgorithm| and |cancelAlgorithm|, may return a promise.
lucacasonato marked this conversation as resolved.
Show resolved Hide resolved

1. Let |writableHighWaterMark| be 1.
1. Let |writableSizeAlgorithm| be an algorithm that returns 1.
Expand All @@ -7124,12 +7213,16 @@ reason.
null otherwise. If this throws an exception |e|, return [=a promise rejected with=] |e|.
1. If |result| is a {{Promise}}, then return |result|.
1. Return [=a promise resolved with=] undefined.
1. Let |cancelAlgorithmWrapper| be an algorithm that runs these steps given a value |reason|:
1. Let |result| be the result of running |cancelAlgorithm| given |reason|, if |cancelAlgorithm|
was given, or null otherwise. If this throws an exception |e|, return
[=a promise rejected with=] |e|.
lucacasonato marked this conversation as resolved.
Show resolved Hide resolved
1. Let |startPromise| be [=a promise resolved with=] undefined.
1. Perform ! [$InitializeTransformStream$](|stream|, |startPromise|, |writableHighWaterMark|,
|writableSizeAlgorithm|, |readableHighWaterMark|, |readableSizeAlgorithm|).
1. Let |controller| be a [=new=] {{TransformStreamDefaultController}}.
1. Perform ! [$SetUpTransformStreamDefaultController$](|stream|, |controller|,
|transformAlgorithmWrapper|, |flushAlgorithmWrapper|).
|transformAlgorithmWrapper|, |flushAlgorithmWrapper|, |cancelAlgorithmWrapper|).

Other specifications should be careful when constructing their
<i>[=TransformStream/set up/transformAlgorithm=]</i> to avoid [=in parallel=] reads from the given
Expand Down
2 changes: 2 additions & 0 deletions reference-implementation/lib/Transformer.webidl
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ dictionary Transformer {
TransformerStartCallback start;
TransformerTransformCallback transform;
TransformerFlushCallback flush;
TransformerCancelCallback cancel;
any readableType;
any writableType;
};

callback TransformerStartCallback = any (TransformStreamDefaultController controller);
callback TransformerFlushCallback = Promise<undefined> (TransformStreamDefaultController controller);
callback TransformerTransformCallback = Promise<undefined> (any chunk, TransformStreamDefaultController controller);
callback TransformerCancelCallback = Promise<undefined> (any reason);