Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TransformStream cleanup using "Transformer.cancel" #1283

Merged
merged 15 commits into from
Sep 30, 2023
Prev Previous commit
Next Next commit
Fix controller.error() race
  • Loading branch information
lucacasonato committed Jul 17, 2023
commit 40eadec8a733451e3819a16c31934fcb2a7987e9
26 changes: 19 additions & 7 deletions index.bs
Original file line number Diff line number Diff line change
Expand Up @@ -6077,8 +6077,12 @@ side=] of [=transform streams=].
1. Perform ! [$TransformStreamDefaultControllerClearAlgorithms$](|controller|).
1. [=React=] to |cancelPromise|:
1. If |cancelPromise| was fulfilled, then:
1. Perform ! [$ReadableStreamDefaultControllerError$](|readable|.[=ReadableStream/[[controller]]=], |reason|).
1. [=Resolve=] |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] with undefined.
1. If |readable|.[=ReadableStream/[[state]]=] is "`errored`", [=reject=]
|controller|.[=TransformStreamDefaultController/[[finishPromise]]=] with
|readable|.[=ReadableStream/[[storedError]]=].
1. Otherwise:
1. Perform ! [$ReadableStreamDefaultControllerError$](|readable|.[=ReadableStream/[[controller]]=], |reason|).
1. [=Resolve=] |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] with undefined.
1. If |cancelPromise| was rejected with reason |r|, then:
1. Perform ! [$ReadableStreamDefaultControllerError$](|readable|.[=ReadableStream/[[controller]]=], |r|).
1. [=Reject=] |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] with |r|.
Expand All @@ -6100,8 +6104,12 @@ side=] of [=transform streams=].
1. Perform ! [$TransformStreamDefaultControllerClearAlgorithms$](|controller|).
1. [=React=] to |flushPromise|:
1. If |flushPromise| was fulfilled, then:
1. Perform ! [$ReadableStreamDefaultControllerClose$](|readable|.[=ReadableStream/[[controller]]=]).
1. [=Resolve=] |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] with undefined.
1. If |readable|.[=ReadableStream/[[state]]=] is "`errored`", [=reject=]
|controller|.[=TransformStreamDefaultController/[[finishPromise]]=] with
|readable|.[=ReadableStream/[[storedError]]=].
1. Otherwise:
1. Perform ! [$ReadableStreamDefaultControllerClose$](|readable|.[=ReadableStream/[[controller]]=]).
1. [=Resolve=] |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] with undefined.
1. If |flushPromise| was rejected with reason |r|, then:
1. Perform ! [$ReadableStreamDefaultControllerError$](|readable|.[=ReadableStream/[[controller]]=], |r|).
1. [=Reject=] |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] with |r|.
Expand Down Expand Up @@ -6140,9 +6148,13 @@ side=] of [=transform streams=].
1. Perform ! [$TransformStreamDefaultControllerClearAlgorithms$](|controller|).
1. [=React=] to |cancelPromise|:
1. If |cancelPromise| was fulfilled, then:
1. Perform ! [$WritableStreamDefaultControllerErrorIfNeeded$](|writable|.[=WritableStream/[[controller]]=], |reason|).
1. Perform ! [$TransformStreamUnblockWrite$](|stream|).
1. [=Resolve=] |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] with undefined.
1. If |writable|.[=WritableStream/[[state]]=] is "`errored`", [=reject=]
|controller|.[=TransformStreamDefaultController/[[finishPromise]]=] with
|writable|.[=WritableStream/[[storedError]]=].
1. Otherwise:
1. Perform ! [$WritableStreamDefaultControllerErrorIfNeeded$](|writable|.[=WritableStream/[[controller]]=], |reason|).
1. Perform ! [$TransformStreamUnblockWrite$](|stream|).
1. [=Resolve=] |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] with undefined.
1. If |cancelPromise| was rejected with reason |r|, then:
1. Perform ! [$WritableStreamDefaultControllerErrorIfNeeded$](|writable|.[=WritableStream/[[controller]]=], |r|).
1. Perform ! [$TransformStreamUnblockWrite$](|stream|).
Expand Down
26 changes: 19 additions & 7 deletions reference-implementation/lib/abstract-ops/transform-streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,12 @@
TransformStreamDefaultControllerClearAlgorithms(controller);

uponPromise(cancelPromise, () => {
ReadableStreamDefaultControllerError(readable._controller, reason);
resolvePromise(controller._finishPromise);
if (readable._state === 'errored') {
rejectPromise(controller._finishPromise, readable._storedError);
} else {
ReadableStreamDefaultControllerError(readable._controller, reason);
resolvePromise(controller._finishPromise);
}
}, r => {
ReadableStreamDefaultControllerError(readable._controller, r);
rejectPromise(controller._finishPromise, r);
Expand Down Expand Up @@ -278,8 +282,12 @@
TransformStreamDefaultControllerClearAlgorithms(controller);

uponPromise(flushPromise, () => {
ReadableStreamDefaultControllerClose(readable._controller);
resolvePromise(controller._finishPromise);
if (readable._state === 'errored') {
rejectPromise(controller._finishPromise, readable._storedError);
} else {
ReadableStreamDefaultControllerClose(readable._controller);
resolvePromise(controller._finishPromise);

Check failure on line 289 in reference-implementation/lib/abstract-ops/transform-streams.js

View workflow job for this annotation

GitHub Actions / Test

Trailing spaces not allowed
}
}, r => {
ReadableStreamDefaultControllerError(readable._controller, r);
rejectPromise(controller._finishPromise, r);
Expand Down Expand Up @@ -324,9 +332,13 @@
TransformStreamDefaultControllerClearAlgorithms(controller);

uponPromise(cancelPromise, () => {
WritableStreamDefaultControllerErrorIfNeeded(writable._controller, reason);
TransformStreamUnblockWrite(stream);
resolvePromise(controller._finishPromise);
if (writable._state === 'errored') {
rejectPromise(controller._finishPromise, writable._storedError);
} else {
WritableStreamDefaultControllerErrorIfNeeded(writable._controller, reason);
TransformStreamUnblockWrite(stream);
resolvePromise(controller._finishPromise);
}
}, r => {
WritableStreamDefaultControllerErrorIfNeeded(writable._controller, r);
TransformStreamUnblockWrite(stream);
Expand Down
Loading