diff --git a/extensions/websocket/02_websocketstream.js b/extensions/websocket/02_websocketstream.js index 035875210c977..fa287ccc72c61 100644 --- a/extensions/websocket/02_websocketstream.js +++ b/extensions/websocket/02_websocketstream.js @@ -1,13 +1,25 @@ // Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. "use strict"; +/// + ((window) => { const core = window.Deno.core; const webidl = window.__bootstrap.webidl; const { writableStreamClose, Deferred } = window.__bootstrap.streams; const { + StringPrototypeEndsWith, StringPrototypeToLowerCase, + Symbol, + SymbolFor, + Set, ArrayPrototypeMap, + ArrayPrototypeJoin, + PromiseResolve, + PromiseReject, + PromisePrototypeThen, + Uint8Array, + TypeError, } = window.__bootstrap.primordials; webidl.converters.WebSocketStreamOptions = webidl.createDictionaryConverter( @@ -86,7 +98,7 @@ ); } - if (wsURL.hash !== "" || wsURL.href.endsWith("#")) { + if (wsURL.hash !== "" || StringPrototypeEndsWith(wsURL.href, "#")) { throw new DOMException( "Fragments are not allowed in a WebSocket URL.", "SyntaxError", @@ -121,148 +133,161 @@ "This operation was aborted", "AbortError", ); - this[_connection].reject(err); - this[_closed].reject(err); + PromiseReject(this[_connection], err); + PromiseReject(this[_closed], err); } else { const abort = () => { core.close(cancelRid); }; options.signal?.addEventListener("abort", abort); - core.opAsync("op_ws_create", { - url: this[_url], - protocols: options.protocols?.join(", ") ?? "", - cancelHandle: cancelRid, - }).then((create) => { - options.signal?.removeEventListener("abort", abort); - if (this[_earlyClose]) { - core.opAsync("op_ws_close", { - rid: create.rid, - }).then(() => { - const err = new DOMException( - "Closed while connecting", - "NetworkError", - ); - this[_connection].reject(err); - this[_closed].reject(err); - }).catch(() => { - const err = new DOMException( - "Closed while connecting", - "NetworkError", + PromisePrototypeThen( + core.opAsync("op_ws_create", { + url: this[_url], + protocols: options.protocol + ? ArrayPrototypeJoin(options.protocols, ", ") + : "", + cancelHandle: cancelRid, + }), + (create) => { + options.signal?.removeEventListener("abort", abort); + if (this[_earlyClose]) { + PromisePrototypeThen( + core.opAsync("op_ws_close", { + rid: create.rid, + }), + () => { + const err = new DOMException( + "Closed while connecting", + "NetworkError", + ); + PromiseReject(this[_connection], err); + PromiseReject(this[_closed], err); + }, + () => { + const err = new DOMException( + "Closed while connecting", + "NetworkError", + ); + PromiseReject(this[_connection], err); + PromiseReject(this[_closed], err); + }, ); - this[_connection].reject(err); - this[_closed].reject(err); - }); - } else { - this[_rid] = create.rid; + } else { + this[_rid] = create.rid; - const writable = new WritableStream({ - write: async (chunk) => { - if (typeof chunk === "string") { - await core.opAsync("op_ws_send", { - rid: this[_rid], - kind: "text", - text: chunk, - }); - } else if (chunk instanceof Uint8Array) { - await core.opAsync("op_ws_send", { - rid: this[_rid], - kind: "binary", - }, chunk); - } else { - throw new TypeError( - "A chunk may only be either a string or an Uint8Array", - ); - } - }, - close: async (reason) => { - try { - this.close(reason?.code !== undefined ? reason : {}); - } catch (_) { - this.close(); - } - await this.closed; - }, - abort: async (reason) => { - try { - this.close(reason?.code !== undefined ? reason : {}); - } catch (_) { - this.close(); - } - await this.closed; - }, - }); - const readable = new ReadableStream({ - start: (controller) => { - this.closed.then(() => { + const writable = new WritableStream({ + write: async (chunk) => { + if (typeof chunk === "string") { + await core.opAsync("op_ws_send", { + rid: this[_rid], + kind: "text", + text: chunk, + }); + } else if (chunk instanceof Uint8Array) { + await core.opAsync("op_ws_send", { + rid: this[_rid], + kind: "binary", + }, chunk); + } else { + throw new TypeError( + "A chunk may only be either a string or an Uint8Array", + ); + } + }, + close: async (reason) => { try { - controller.close(); + this.close(reason?.code !== undefined ? reason : {}); } catch (_) { - // needed to ignore warnings & assertions + this.close(); } + await this.closed; + }, + abort: async (reason) => { try { - writableStreamClose(writable).catch(() => {}); + this.close(reason?.code !== undefined ? reason : {}); } catch (_) { - // needed to ignore warnings & assertions + this.close(); } - }); - }, - pull: async (controller) => { - const { kind, value } = await core.opAsync( - "op_ws_next_event", - this[_rid], - ); + await this.closed; + }, + }); + const readable = new ReadableStream({ + start: (controller) => { + PromisePrototypeThen(this.closed, () => { + try { + controller.close(); + } catch (_) { + // needed to ignore warnings & assertions + } + try { + PromisePrototypeCatch( + writableStreamClose(writable), + () => {}, + ); + } catch (_) { + // needed to ignore warnings & assertions + } + }); + }, + pull: async (controller) => { + const { kind, value } = await core.opAsync( + "op_ws_next_event", + this[_rid], + ); - switch (kind) { - case "string": { - controller.enqueue(value); - break; - } - case "binary": { - controller.enqueue(value); - break; - } - case "ping": { - await core.opAsync("op_ws_send", { - rid: this[_rid], - kind: "pong", - }); - break; - } - case "close": { - this[_closed].resolve(value); - tryClose(this[_rid]); - break; + switch (kind) { + case "string": { + controller.enqueue(value); + break; + } + case "binary": { + controller.enqueue(value); + break; + } + case "ping": { + await core.opAsync("op_ws_send", { + rid: this[_rid], + kind: "pong", + }); + break; + } + case "close": { + PromiseResolve(this[_closed], value); + tryClose(this[_rid]); + break; + } + case "error": { + const err = new Error(value); + PromiseReject(this[_closed], err); + controller.error(err); + tryClose(this[_rid]); + break; + } } - case "error": { - const err = new Error(value); - this[_closed].reject(err); - controller.error(err); - tryClose(this[_rid]); - break; + }, + cancel: async (reason) => { + try { + this.close(reason?.code !== undefined ? reason : {}); + } catch (_) { + this.close(); } - } - }, - cancel: async (reason) => { - try { - this.close(reason?.code !== undefined ? reason : {}); - } catch (_) { - this.close(); - } - await this.closed; - }, - }); + await this.closed; + }, + }); - this[_connection].resolve({ - readable, - writable, - extensions: create.extensions ?? "", - protocol: create.protocol ?? "", - }); - } - }).catch((err) => { - this[_connection].reject(err); - this[_closed].reject(err); - }); + PromiseResolve(this[_connection], { + readable, + writable, + extensions: create.extensions ?? "", + protocol: create.protocol ?? "", + }); + } + }, + (err) => { + PromiseReject(this[_connection], err); + PromiseReject(this[_closed], err); + }, + ); } } @@ -312,24 +337,28 @@ if (this[_connection].state === "pending") { this[_earlyClose] = true; } else if (this[_closed].state === "pending") { - core.opAsync("op_ws_close", { - rid: this[_rid], - code, - reason: closeInfo.reason, - }).then(() => { - tryClose(this[_rid]); - this[_closed].resolve({ - code: code ?? 1005, + PromisePrototypeThen( + core.opAsync("op_ws_close", { + rid: this[_rid], + code, reason: closeInfo.reason, - }); - }).catch((err) => { - this[_rid] && tryClose(this[_rid]); - this[_closed].reject(err); - }); + }), + () => { + tryClose(this[_rid]); + PromiseResolve(this[_closed], { + code: code ?? 1005, + reason: closeInfo.reason, + }); + }, + (err) => { + this[_rid] && tryClose(this[_rid]); + PromiseReject(this[_closed], err); + }, + ); } } - [Symbol.for("Deno.customInspect")](inspect) { + [SymbolFor("Deno.customInspect")](inspect) { return `${this.constructor.name} ${ inspect({ url: this.url,