Skip to content

Commit

Permalink
fix(node/http2): fixes to support grpc (denoland#20712)
Browse files Browse the repository at this point in the history
This commit improves "node:http2" module implementation, by enabling
to use "options.createConnection" callback when starting an HTTP2
session.
This change enables to pass basic client-side test with "grpc-js/grpc"
package.
Smaller fixes like "Http2Session.unref()" and "Http2Session.setTimeout()"
were handled as well.

Fixes denoland#16647
  • Loading branch information
bartlomieju committed Oct 12, 2023
1 parent 2fb9ddd commit cee2211
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 31 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 46 additions & 0 deletions cli/tests/unit_node/http2_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,52 @@ for (const url of ["http:https://127.0.0.1:4246", "https://127.0.0.1:4247"]) {
});
}

Deno.test(`[node/http2 client createConnection]`, {
ignore: Deno.build.os === "windows",
}, async () => {
const url = "http:https://127.0.0.1:4246";
const createConnPromise = deferred();
// Create a server to respond to the HTTP2 requests
const client = http2.connect(url, {
createConnection() {
const socket = net.connect({ host: "127.0.0.1", port: 4246 });

socket.on("connect", () => {
createConnPromise.resolve();
});

return socket;
},
});
client.on("error", (err) => console.error(err));

const req = client.request({ ":method": "POST", ":path": "/" });

let receivedData = "";

req.write("hello");
req.setEncoding("utf8");

req.on("data", (chunk) => {
receivedData += chunk;
});
req.end();

const endPromise = deferred();
setTimeout(() => {
try {
client.close();
} catch (_) {
// pass
}
endPromise.resolve();
}, 2000);

await createConnPromise;
await endPromise;
assertEquals(receivedData, "hello world\n");
});

// TODO(bartlomieju): reenable sanitizers
Deno.test("[node/http2 server]", { sanitizeOps: false }, async () => {
const server = http2.createServer();
Expand Down
144 changes: 115 additions & 29 deletions ext/node/polyfills/http2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ const core = globalThis.Deno.core;
import { notImplemented, warnNotImplemented } from "ext:deno_node/_utils.ts";
import { EventEmitter } from "node:events";
import { Buffer } from "node:buffer";
import { Server, Socket, TCP } from "node:net";
import { connect as netConnect, Server, Socket, TCP } from "node:net";
import { connect as tlsConnect } from "node:tls";
import { TypedArray } from "ext:deno_node/internal/util/types.ts";
import {
kHandle,
kMaybeDestroy,
kUpdateTimer,
setStreamTimeout,
Expand All @@ -36,11 +38,11 @@ import {
ERR_HTTP2_STREAM_ERROR,
ERR_HTTP2_TRAILERS_ALREADY_SENT,
ERR_HTTP2_TRAILERS_NOT_READY,
ERR_HTTP2_UNSUPPORTED_PROTOCOL,
ERR_INVALID_HTTP_TOKEN,
ERR_SOCKET_CLOSED,
} from "ext:deno_node/internal/errors.ts";
import { _checkIsHttpToken } from "ext:deno_node/_http_common.ts";
import { TcpConn } from "ext:deno_net/01_net.js";
import { TlsConn } from "ext:deno_net/02_tls.js";

const {
op_http2_connect,
Expand All @@ -66,6 +68,7 @@ const kDenoResponse = Symbol("kDenoResponse");
const kDenoRid = Symbol("kDenoRid");
const kDenoClientRid = Symbol("kDenoClientRid");
const kDenoConnRid = Symbol("kDenoConnRid");
const kPollConnPromiseId = Symbol("kPollConnPromiseId");

const STREAM_FLAGS_PENDING = 0x0;
const STREAM_FLAGS_READY = 0x1;
Expand Down Expand Up @@ -205,8 +208,12 @@ export class Http2Session extends EventEmitter {
_opaqueData: Buffer | TypedArray | DataView,
) {
warnNotImplemented("Http2Session.goaway");
core.tryClose(this[kDenoConnRid]);
core.tryClose(this[kDenoClientRid]);
if (this[kDenoConnRid]) {
core.tryClose(this[kDenoConnRid]);
}
if (this[kDenoClientRid]) {
core.tryClose(this[kDenoClientRid]);
}
}

destroy(error = constants.NGHTTP2_NO_ERROR, code?: number) {
Expand Down Expand Up @@ -264,7 +271,7 @@ export class Http2Session extends EventEmitter {
}

setTimeout(msecs: number, callback?: () => void) {
setStreamTimeout(this, msecs, callback);
setStreamTimeout.call(this, msecs, callback);
}
}

Expand Down Expand Up @@ -302,8 +309,13 @@ function closeSession(session: Http2Session, code?: number, error?: Error) {
session[kDenoConnRid],
session[kDenoClientRid],
);
core.tryClose(session[kDenoConnRid]);
core.tryClose(session[kDenoClientRid]);
console.table(Deno.resources());
if (session[kDenoConnRid]) {
core.tryClose(session[kDenoConnRid]);
}
if (session[kDenoClientRid]) {
core.tryClose(session[kDenoClientRid]);
}

finishSessionClose(session, error);
}
Expand Down Expand Up @@ -340,32 +352,54 @@ function assertValidPseudoHeader(header: string) {

export class ClientHttp2Session extends Http2Session {
#connectPromise: Promise<void>;
#refed = true;

constructor(
connPromise: Promise<TcpConn> | Promise<TlsConn>,
// deno-lint-ignore no-explicit-any
socket: any,
url: string,
options: Record<string, unknown>,
) {
super(constants.NGHTTP2_SESSION_CLIENT, options);
this[kPendingRequestCalls] = null;
this[kDenoClientRid] = undefined;
this[kDenoConnRid] = undefined;
this[kPollConnPromiseId] = undefined;

socket.on("error", socketOnError);
socket.on("close", socketOnClose);
const connPromise = new Promise((resolve) => {
const eventName = url.startsWith("https") ? "secureConnect" : "connect";
socket.once(eventName, () => {
const rid = socket[kHandle][kStreamBaseField].rid;
nextTick(() => {
resolve(rid);
});
});
});
socket[kSession] = this;

// TODO(bartlomieju): cleanup
this.#connectPromise = (async () => {
debugHttp2(">>> before connect");
const conn = await connPromise;
const [clientRid, connRid] = await op_http2_connect(conn.rid, url);
debugHttp2(">>> after connect");
const connRid_ = await connPromise;
// console.log(">>>> awaited connRid", connRid_, url);
const [clientRid, connRid] = await op_http2_connect(connRid_, url);
debugHttp2(">>> after connect", clientRid, connRid);
this[kDenoClientRid] = clientRid;
this[kDenoConnRid] = connRid;
// TODO(bartlomieju): save this promise, so the session can be unrefed
(async () => {
try {
await core.opAsync(
const promise = core.opAsync(
"op_http2_poll_client_connection",
this[kDenoConnRid],
);
this[kPollConnPromiseId] =
promise[Symbol.for("Deno.core.internalPromiseId")];
if (!this.#refed) {
this.unref();
}
await promise;
} catch (e) {
this.emit("error", e);
}
Expand All @@ -374,6 +408,20 @@ export class ClientHttp2Session extends Http2Session {
})();
}

ref() {
this.#refed = true;
if (this[kPollConnPromiseId]) {
core.refOp(this[kPollConnPromiseId]);
}
}

unref() {
this.#refed = false;
if (this[kPollConnPromiseId]) {
core.unrefOp(this[kPollConnPromiseId]);
}
}

request(
headers: Http2Headers,
options?: Record<string, unknown>,
Expand Down Expand Up @@ -1190,7 +1238,9 @@ function finishCloseStream(stream, code) {
);
core.tryClose(stream[kDenoRid]);
core.tryClose(stream[kDenoResponse].bodyRid);
stream.emit("close");
nextTick(() => {
stream.emit("close");
});
}).catch(() => {
debugHttp2(
">>> finishCloseStream close2 catch",
Expand All @@ -1199,7 +1249,9 @@ function finishCloseStream(stream, code) {
);
core.tryClose(stream[kDenoRid]);
core.tryClose(stream[kDenoResponse].bodyRid);
stream.emit("close");
nextTick(() => {
stream.emit("close");
});
});
}
}
Expand Down Expand Up @@ -1488,24 +1540,32 @@ export function connect(
host = authority.host;
}

// TODO(bartlomieju): handle defaults
if (typeof options.createConnection === "function") {
console.error("Not implemented: http2.connect.options.createConnection");
// notImplemented("http2.connect.options.createConnection");
}
let url, socket;

let conn, url;
if (protocol == "http:") {
conn = Deno.connect({ port, hostname: host });
if (typeof options.createConnection === "function") {
url = `http:https://${host}${port == 80 ? "" : (":" + port)}`;
} else if (protocol == "https:") {
conn = Deno.connectTls({ port, hostname: host, alpnProtocols: ["h2"] });
url = `http:https://${host}${port == 443 ? "" : (":" + port)}`;
socket = options.createConnection(host, options);
} else {
throw new TypeError("Unexpected URL protocol");
switch (protocol) {
case "http:":
url = `http:https://${host}${port == 80 ? "" : (":" + port)}`;
socket = netConnect({ port, host, ...options, pauseOnCreate: true });
break;
case "https:":
// TODO(bartlomieju): handle `initializeTLSOptions` here
url = `https://${host}${port == 443 ? "" : (":" + port)}`;
socket = tlsConnect(port, host, { manualStart: true });
break;
default:
throw new ERR_HTTP2_UNSUPPORTED_PROTOCOL(protocol);
}
}

const session = new ClientHttp2Session(conn, url, options);
// Pause so no "socket.read()" starts in the background that would
// prevent us from taking ownership of the socket in `ClientHttp2Session`
socket.pause();
const session = new ClientHttp2Session(socket, url, options);

session[kAuthority] = `${options.servername || host}:${port}`;
session[kProtocol] = protocol;

Expand All @@ -1515,6 +1575,32 @@ export function connect(
return session;
}

function socketOnError(error) {
const session = this[kSession];
if (session !== undefined) {
if (error.code === "ECONNRESET" && session[kState].goawayCode !== null) {
return session.destroy();
}
debugHttp2(">>>> socket error", error);
session.destroy(error);
}
}

function socketOnClose() {
const session = this[kSession];
if (session !== undefined) {
debugHttp2(">>>> socket closed");
const err = session.connecting ? new ERR_SOCKET_CLOSED() : null;
const state = session[kState];
state.streams.forEach((stream) => stream.close(constants.NGHTTP2_CANCEL));
state.pendingStreams.forEach((stream) =>
stream.close(constants.NGHTTP2_CANCEL)
);
session.close();
session[kMaybeDestroy](err);
}
}

export const constants = {
NGHTTP2_ERR_FRAME_SIZE_ERROR: -522,
NGHTTP2_NV_FLAG_NONE: 0,
Expand Down

0 comments on commit cee2211

Please sign in to comment.