Skip to content

Commit

Permalink
primordials
Browse files Browse the repository at this point in the history
  • Loading branch information
littledivy committed Jul 10, 2021
1 parent b0c9c41 commit c0b5a48
Showing 1 changed file with 169 additions and 140 deletions.
309 changes: 169 additions & 140 deletions extensions/websocket/02_websocketstream.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,25 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
"use strict";

/// <reference path="../../core/internal.d.ts" />

((window) => {
const core = window.Deno.core;
const webidl = window.__bootstrap.webidl;
const { writableStreamClose, Deferred } = window.__bootstrap.streams;
const {
StringPrototypeEndsWith,
StringPrototypeToLowerCase,
Symbol,
SymbolFor,
Set,
ArrayPrototypeMap,
ArrayPrototypeJoin,
PromiseResolve,
PromiseReject,
PromisePrototypeThen,
Uint8Array,
TypeError,
} = window.__bootstrap.primordials;

webidl.converters.WebSocketStreamOptions = webidl.createDictionaryConverter(
Expand Down Expand Up @@ -86,7 +98,7 @@
);
}

if (wsURL.hash !== "" || wsURL.href.endsWith("#")) {
if (wsURL.hash !== "" || StringPrototypeEndsWith(wsURL.href, "#")) {
throw new DOMException(
"Fragments are not allowed in a WebSocket URL.",
"SyntaxError",
Expand Down Expand Up @@ -121,148 +133,161 @@
"This operation was aborted",
"AbortError",
);
this[_connection].reject(err);
this[_closed].reject(err);
PromiseReject(this[_connection], err);
PromiseReject(this[_closed], err);
} else {
const abort = () => {
core.close(cancelRid);
};
options.signal?.addEventListener("abort", abort);
core.opAsync("op_ws_create", {
url: this[_url],
protocols: options.protocols?.join(", ") ?? "",
cancelHandle: cancelRid,
}).then((create) => {
options.signal?.removeEventListener("abort", abort);
if (this[_earlyClose]) {
core.opAsync("op_ws_close", {
rid: create.rid,
}).then(() => {
const err = new DOMException(
"Closed while connecting",
"NetworkError",
);
this[_connection].reject(err);
this[_closed].reject(err);
}).catch(() => {
const err = new DOMException(
"Closed while connecting",
"NetworkError",
PromisePrototypeThen(
core.opAsync("op_ws_create", {
url: this[_url],
protocols: options.protocol
? ArrayPrototypeJoin(options.protocols, ", ")
: "",
cancelHandle: cancelRid,
}),
(create) => {
options.signal?.removeEventListener("abort", abort);
if (this[_earlyClose]) {
PromisePrototypeThen(
core.opAsync("op_ws_close", {
rid: create.rid,
}),
() => {
const err = new DOMException(
"Closed while connecting",
"NetworkError",
);
PromiseReject(this[_connection], err);
PromiseReject(this[_closed], err);
},
() => {
const err = new DOMException(
"Closed while connecting",
"NetworkError",
);
PromiseReject(this[_connection], err);
PromiseReject(this[_closed], err);
},
);
this[_connection].reject(err);
this[_closed].reject(err);
});
} else {
this[_rid] = create.rid;
} else {
this[_rid] = create.rid;

const writable = new WritableStream({
write: async (chunk) => {
if (typeof chunk === "string") {
await core.opAsync("op_ws_send", {
rid: this[_rid],
kind: "text",
text: chunk,
});
} else if (chunk instanceof Uint8Array) {
await core.opAsync("op_ws_send", {
rid: this[_rid],
kind: "binary",
}, chunk);
} else {
throw new TypeError(
"A chunk may only be either a string or an Uint8Array",
);
}
},
close: async (reason) => {
try {
this.close(reason?.code !== undefined ? reason : {});
} catch (_) {
this.close();
}
await this.closed;
},
abort: async (reason) => {
try {
this.close(reason?.code !== undefined ? reason : {});
} catch (_) {
this.close();
}
await this.closed;
},
});
const readable = new ReadableStream({
start: (controller) => {
this.closed.then(() => {
const writable = new WritableStream({
write: async (chunk) => {
if (typeof chunk === "string") {
await core.opAsync("op_ws_send", {
rid: this[_rid],
kind: "text",
text: chunk,
});
} else if (chunk instanceof Uint8Array) {
await core.opAsync("op_ws_send", {
rid: this[_rid],
kind: "binary",
}, chunk);
} else {
throw new TypeError(
"A chunk may only be either a string or an Uint8Array",
);
}
},
close: async (reason) => {
try {
controller.close();
this.close(reason?.code !== undefined ? reason : {});
} catch (_) {
// needed to ignore warnings & assertions
this.close();
}
await this.closed;
},
abort: async (reason) => {
try {
writableStreamClose(writable).catch(() => {});
this.close(reason?.code !== undefined ? reason : {});
} catch (_) {
// needed to ignore warnings & assertions
this.close();
}
});
},
pull: async (controller) => {
const { kind, value } = await core.opAsync(
"op_ws_next_event",
this[_rid],
);
await this.closed;
},
});
const readable = new ReadableStream({
start: (controller) => {
PromisePrototypeThen(this.closed, () => {
try {
controller.close();
} catch (_) {
// needed to ignore warnings & assertions
}
try {
PromisePrototypeCatch(
writableStreamClose(writable),
() => {},
);
} catch (_) {
// needed to ignore warnings & assertions
}
});
},
pull: async (controller) => {
const { kind, value } = await core.opAsync(
"op_ws_next_event",
this[_rid],
);

switch (kind) {
case "string": {
controller.enqueue(value);
break;
}
case "binary": {
controller.enqueue(value);
break;
}
case "ping": {
await core.opAsync("op_ws_send", {
rid: this[_rid],
kind: "pong",
});
break;
}
case "close": {
this[_closed].resolve(value);
tryClose(this[_rid]);
break;
switch (kind) {
case "string": {
controller.enqueue(value);
break;
}
case "binary": {
controller.enqueue(value);
break;
}
case "ping": {
await core.opAsync("op_ws_send", {
rid: this[_rid],
kind: "pong",
});
break;
}
case "close": {
PromiseResolve(this[_closed], value);
tryClose(this[_rid]);
break;
}
case "error": {
const err = new Error(value);
PromiseReject(this[_closed], err);
controller.error(err);
tryClose(this[_rid]);
break;
}
}
case "error": {
const err = new Error(value);
this[_closed].reject(err);
controller.error(err);
tryClose(this[_rid]);
break;
},
cancel: async (reason) => {
try {
this.close(reason?.code !== undefined ? reason : {});
} catch (_) {
this.close();
}
}
},
cancel: async (reason) => {
try {
this.close(reason?.code !== undefined ? reason : {});
} catch (_) {
this.close();
}
await this.closed;
},
});
await this.closed;
},
});

this[_connection].resolve({
readable,
writable,
extensions: create.extensions ?? "",
protocol: create.protocol ?? "",
});
}
}).catch((err) => {
this[_connection].reject(err);
this[_closed].reject(err);
});
PromiseResolve(this[_connection], {
readable,
writable,
extensions: create.extensions ?? "",
protocol: create.protocol ?? "",
});
}
},
(err) => {
PromiseReject(this[_connection], err);
PromiseReject(this[_closed], err);
},
);
}
}

Expand Down Expand Up @@ -312,24 +337,28 @@
if (this[_connection].state === "pending") {
this[_earlyClose] = true;
} else if (this[_closed].state === "pending") {
core.opAsync("op_ws_close", {
rid: this[_rid],
code,
reason: closeInfo.reason,
}).then(() => {
tryClose(this[_rid]);
this[_closed].resolve({
code: code ?? 1005,
PromisePrototypeThen(
core.opAsync("op_ws_close", {
rid: this[_rid],
code,
reason: closeInfo.reason,
});
}).catch((err) => {
this[_rid] && tryClose(this[_rid]);
this[_closed].reject(err);
});
}),
() => {
tryClose(this[_rid]);
PromiseResolve(this[_closed], {
code: code ?? 1005,
reason: closeInfo.reason,
});
},
(err) => {
this[_rid] && tryClose(this[_rid]);
PromiseReject(this[_closed], err);
},
);
}
}

[Symbol.for("Deno.customInspect")](inspect) {
[SymbolFor("Deno.customInspect")](inspect) {
return `${this.constructor.name} ${
inspect({
url: this.url,
Expand Down

0 comments on commit c0b5a48

Please sign in to comment.