Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Primordials #2

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
309 changes: 169 additions & 140 deletions extensions/websocket/02_websocketstream.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,25 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
"use strict";

/// <reference path="../../core/internal.d.ts" />

((window) => {
const core = window.Deno.core;
const webidl = window.__bootstrap.webidl;
const { writableStreamClose, Deferred } = window.__bootstrap.streams;
const {
StringPrototypeEndsWith,
StringPrototypeToLowerCase,
Symbol,
SymbolFor,
Set,
ArrayPrototypeMap,
ArrayPrototypeJoin,
PromiseResolve,
PromiseReject,
PromisePrototypeThen,
Uint8Array,
TypeError,
} = window.__bootstrap.primordials;

webidl.converters.WebSocketStreamOptions = webidl.createDictionaryConverter(
Expand Down Expand Up @@ -86,7 +98,7 @@
);
}

if (wsURL.hash !== "" || wsURL.href.endsWith("#")) {
if (wsURL.hash !== "" || StringPrototypeEndsWith(wsURL.href, "#")) {
throw new DOMException(
"Fragments are not allowed in a WebSocket URL.",
"SyntaxError",
Expand Down Expand Up @@ -121,148 +133,161 @@
"This operation was aborted",
"AbortError",
);
this[_connection].reject(err);
this[_closed].reject(err);
PromiseReject(this[_connection], err);
PromiseReject(this[_closed], err);
} else {
const abort = () => {
core.close(cancelRid);
};
options.signal?.addEventListener("abort", abort);
core.opAsync("op_ws_create", {
url: this[_url],
protocols: options.protocols?.join(", ") ?? "",
cancelHandle: cancelRid,
}).then((create) => {
options.signal?.removeEventListener("abort", abort);
if (this[_earlyClose]) {
core.opAsync("op_ws_close", {
rid: create.rid,
}).then(() => {
const err = new DOMException(
"Closed while connecting",
"NetworkError",
);
this[_connection].reject(err);
this[_closed].reject(err);
}).catch(() => {
const err = new DOMException(
"Closed while connecting",
"NetworkError",
PromisePrototypeThen(
core.opAsync("op_ws_create", {
url: this[_url],
protocols: options.protocol
? ArrayPrototypeJoin(options.protocols, ", ")
: "",
cancelHandle: cancelRid,
}),
(create) => {
options.signal?.removeEventListener("abort", abort);
if (this[_earlyClose]) {
PromisePrototypeThen(
core.opAsync("op_ws_close", {
rid: create.rid,
}),
() => {
const err = new DOMException(
"Closed while connecting",
"NetworkError",
);
PromiseReject(this[_connection], err);
PromiseReject(this[_closed], err);
},
() => {
const err = new DOMException(
"Closed while connecting",
"NetworkError",
);
PromiseReject(this[_connection], err);
PromiseReject(this[_closed], err);
},
);
this[_connection].reject(err);
this[_closed].reject(err);
});
} else {
this[_rid] = create.rid;
} else {
this[_rid] = create.rid;

const writable = new WritableStream({
write: async (chunk) => {
if (typeof chunk === "string") {
await core.opAsync("op_ws_send", {
rid: this[_rid],
kind: "text",
text: chunk,
});
} else if (chunk instanceof Uint8Array) {
await core.opAsync("op_ws_send", {
rid: this[_rid],
kind: "binary",
}, chunk);
} else {
throw new TypeError(
"A chunk may only be either a string or an Uint8Array",
);
}
},
close: async (reason) => {
try {
this.close(reason?.code !== undefined ? reason : {});
} catch (_) {
this.close();
}
await this.closed;
},
abort: async (reason) => {
try {
this.close(reason?.code !== undefined ? reason : {});
} catch (_) {
this.close();
}
await this.closed;
},
});
const readable = new ReadableStream({
start: (controller) => {
this.closed.then(() => {
const writable = new WritableStream({
write: async (chunk) => {
if (typeof chunk === "string") {
await core.opAsync("op_ws_send", {
rid: this[_rid],
kind: "text",
text: chunk,
});
} else if (chunk instanceof Uint8Array) {
await core.opAsync("op_ws_send", {
rid: this[_rid],
kind: "binary",
}, chunk);
} else {
throw new TypeError(
"A chunk may only be either a string or an Uint8Array",
);
}
},
close: async (reason) => {
try {
controller.close();
this.close(reason?.code !== undefined ? reason : {});
} catch (_) {
// needed to ignore warnings & assertions
this.close();
}
await this.closed;
},
abort: async (reason) => {
try {
writableStreamClose(writable).catch(() => {});
this.close(reason?.code !== undefined ? reason : {});
} catch (_) {
// needed to ignore warnings & assertions
this.close();
}
});
},
pull: async (controller) => {
const { kind, value } = await core.opAsync(
"op_ws_next_event",
this[_rid],
);
await this.closed;
},
});
const readable = new ReadableStream({
start: (controller) => {
PromisePrototypeThen(this.closed, () => {
try {
controller.close();
} catch (_) {
// needed to ignore warnings & assertions
}
try {
PromisePrototypeCatch(
writableStreamClose(writable),
() => {},
);
} catch (_) {
// needed to ignore warnings & assertions
}
});
},
pull: async (controller) => {
const { kind, value } = await core.opAsync(
"op_ws_next_event",
this[_rid],
);

switch (kind) {
case "string": {
controller.enqueue(value);
break;
}
case "binary": {
controller.enqueue(value);
break;
}
case "ping": {
await core.opAsync("op_ws_send", {
rid: this[_rid],
kind: "pong",
});
break;
}
case "close": {
this[_closed].resolve(value);
tryClose(this[_rid]);
break;
switch (kind) {
case "string": {
controller.enqueue(value);
break;
}
case "binary": {
controller.enqueue(value);
break;
}
case "ping": {
await core.opAsync("op_ws_send", {
rid: this[_rid],
kind: "pong",
});
break;
}
case "close": {
PromiseResolve(this[_closed], value);
tryClose(this[_rid]);
break;
}
case "error": {
const err = new Error(value);
PromiseReject(this[_closed], err);
controller.error(err);
tryClose(this[_rid]);
break;
}
}
case "error": {
const err = new Error(value);
this[_closed].reject(err);
controller.error(err);
tryClose(this[_rid]);
break;
},
cancel: async (reason) => {
try {
this.close(reason?.code !== undefined ? reason : {});
} catch (_) {
this.close();
}
}
},
cancel: async (reason) => {
try {
this.close(reason?.code !== undefined ? reason : {});
} catch (_) {
this.close();
}
await this.closed;
},
});
await this.closed;
},
});

this[_connection].resolve({
readable,
writable,
extensions: create.extensions ?? "",
protocol: create.protocol ?? "",
});
}
}).catch((err) => {
this[_connection].reject(err);
this[_closed].reject(err);
});
PromiseResolve(this[_connection], {
readable,
writable,
extensions: create.extensions ?? "",
protocol: create.protocol ?? "",
});
}
},
(err) => {
PromiseReject(this[_connection], err);
PromiseReject(this[_closed], err);
},
);
}
}

Expand Down Expand Up @@ -312,24 +337,28 @@
if (this[_connection].state === "pending") {
this[_earlyClose] = true;
} else if (this[_closed].state === "pending") {
core.opAsync("op_ws_close", {
rid: this[_rid],
code,
reason: closeInfo.reason,
}).then(() => {
tryClose(this[_rid]);
this[_closed].resolve({
code: code ?? 1005,
PromisePrototypeThen(
core.opAsync("op_ws_close", {
rid: this[_rid],
code,
reason: closeInfo.reason,
});
}).catch((err) => {
this[_rid] && tryClose(this[_rid]);
this[_closed].reject(err);
});
}),
() => {
tryClose(this[_rid]);
PromiseResolve(this[_closed], {
code: code ?? 1005,
reason: closeInfo.reason,
});
},
(err) => {
this[_rid] && tryClose(this[_rid]);
PromiseReject(this[_closed], err);
},
);
}
}

[Symbol.for("Deno.customInspect")](inspect) {
[SymbolFor("Deno.customInspect")](inspect) {
return `${this.constructor.name} ${
inspect({
url: this.url,
Expand Down