Skip to content

Commit

Permalink
refactor(ext/http): refer to HttpRecord directly using v8::External (d…
Browse files Browse the repository at this point in the history
…enoland#20770)

Makes the JavaScript Request use a v8:External opaque pointer to
directly refer to the Rust HttpRecord.

The HttpRecord is now reference counted. To avoid leaks the strong count
is checked at request completion.

Performance seems unchanged on the minimal benchmark. 118614 req/s this
branch vs 118564 req/s on main, but variance between runs on my laptop
is pretty high.

---------

Co-authored-by: Matt Mastracci <[email protected]>
  • Loading branch information
lrowe and mmastrac committed Nov 13, 2023
1 parent 1ef617e commit 542314a
Show file tree
Hide file tree
Showing 8 changed files with 602 additions and 485 deletions.
15 changes: 14 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

59 changes: 30 additions & 29 deletions ext/http/00_serve.js
Original file line number Diff line number Diff line change
Expand Up @@ -117,26 +117,26 @@ function upgradeHttpRaw(req, conn) {

function addTrailers(resp, headerList) {
const inner = toInnerResponse(resp);
op_http_set_response_trailers(inner.slabId, headerList);
op_http_set_response_trailers(inner.external, headerList);
}

class InnerRequest {
#slabId;
#external;
#context;
#methodAndUri;
#streamRid;
#body;
#upgraded;
#urlValue;

constructor(slabId, context) {
this.#slabId = slabId;
constructor(external, context) {
this.#external = external;
this.#context = context;
this.#upgraded = false;
}

close() {
this.#slabId = undefined;
this.#external = null;
}

get [_upgraded]() {
Expand All @@ -147,7 +147,7 @@ class InnerRequest {
if (this.#upgraded) {
throw new Deno.errors.Http("already upgraded");
}
if (this.#slabId === undefined) {
if (this.#external === null) {
throw new Deno.errors.Http("already closed");
}

Expand All @@ -159,7 +159,7 @@ class InnerRequest {

// upgradeHttpRaw is sync
if (upgradeType == "upgradeHttpRaw") {
const slabId = this.#slabId;
const external = this.#external;
const underlyingConn = originalArgs[0];

this.url();
Expand All @@ -168,7 +168,7 @@ class InnerRequest {

this.#upgraded = () => {};

const upgradeRid = op_http_upgrade_raw(slabId);
const upgradeRid = op_http_upgrade_raw(external);

const conn = new TcpConn(
upgradeRid,
Expand All @@ -184,7 +184,7 @@ class InnerRequest {
const response = originalArgs[0];
const ws = originalArgs[1];

const slabId = this.#slabId;
const external = this.#external;

this.url();
this.headerList;
Expand All @@ -194,15 +194,16 @@ class InnerRequest {
this.#upgraded = () => {
goAhead.resolve();
};
const wsPromise = op_http_upgrade_websocket_next(
external,
response.headerList,
);

// Start the upgrade in the background.
(async () => {
try {
// Returns the upgraded websocket connection
const wsRid = await op_http_upgrade_websocket_next(
slabId,
response.headerList,
);
const wsRid = await wsPromise;

// We have to wait for the go-ahead signal
await goAhead;
Expand Down Expand Up @@ -236,12 +237,12 @@ class InnerRequest {
}

if (this.#methodAndUri === undefined) {
if (this.#slabId === undefined) {
if (this.#external === null) {
throw new TypeError("request closed");
}
// TODO(mmastrac): This is quite slow as we're serializing a large number of values. We may want to consider
// splitting this up into multiple ops.
this.#methodAndUri = op_http_get_request_method_and_url(this.#slabId);
this.#methodAndUri = op_http_get_request_method_and_url(this.#external);
}

const path = this.#methodAndUri[2];
Expand Down Expand Up @@ -281,10 +282,10 @@ class InnerRequest {
};
}
if (this.#methodAndUri === undefined) {
if (this.#slabId === undefined) {
if (this.#external === null) {
throw new TypeError("request closed");
}
this.#methodAndUri = op_http_get_request_method_and_url(this.#slabId);
this.#methodAndUri = op_http_get_request_method_and_url(this.#external);
}
return {
transport: "tcp",
Expand All @@ -295,16 +296,16 @@ class InnerRequest {

get method() {
if (this.#methodAndUri === undefined) {
if (this.#slabId === undefined) {
if (this.#external === null) {
throw new TypeError("request closed");
}
this.#methodAndUri = op_http_get_request_method_and_url(this.#slabId);
this.#methodAndUri = op_http_get_request_method_and_url(this.#external);
}
return this.#methodAndUri[0];
}

get body() {
if (this.#slabId === undefined) {
if (this.#external === null) {
throw new TypeError("request closed");
}
if (this.#body !== undefined) {
Expand All @@ -316,25 +317,25 @@ class InnerRequest {
this.#body = null;
return null;
}
this.#streamRid = op_http_read_request_body(this.#slabId);
this.#streamRid = op_http_read_request_body(this.#external);
this.#body = new InnerBody(readableStreamForRid(this.#streamRid, false));
return this.#body;
}

get headerList() {
if (this.#slabId === undefined) {
if (this.#external === null) {
throw new TypeError("request closed");
}
const headers = [];
const reqHeaders = op_http_get_request_headers(this.#slabId);
const reqHeaders = op_http_get_request_headers(this.#external);
for (let i = 0; i < reqHeaders.length; i += 2) {
ArrayPrototypePush(headers, [reqHeaders[i], reqHeaders[i + 1]]);
}
return headers;
}

get slabId() {
return this.#slabId;
get external() {
return this.#external;
}
}

Expand Down Expand Up @@ -483,8 +484,8 @@ function mapToCallback(context, callback, onError) {
// Did everything shut down while we were waiting?
if (context.closed) {
// We're shutting down, so this status shouldn't make it back to the client but "Service Unavailable" seems appropriate
op_http_set_promise_complete(req, 503);
innerRequest?.close();
op_http_set_promise_complete(req, 503);
return;
}

Expand All @@ -498,8 +499,8 @@ function mapToCallback(context, callback, onError) {
}
}

fastSyncResponseOrStream(req, inner.body, status);
innerRequest?.close();
fastSyncResponseOrStream(req, inner.body, status);
};
}

Expand Down Expand Up @@ -659,7 +660,7 @@ function serveHttpOn(context, callback) {
try {
// Attempt to pull as many requests out of the queue as possible before awaiting. This API is
// a synchronous, non-blocking API that returns u32::MAX if anything goes wrong.
while ((req = op_http_try_wait(rid)) !== -1) {
while ((req = op_http_try_wait(rid)) !== null) {
PromisePrototypeCatch(callback(req), promiseErrorHandler);
}
currentPromise = op_http_wait(rid);
Expand All @@ -677,7 +678,7 @@ function serveHttpOn(context, callback) {
}
throw new Deno.errors.Http(error);
}
if (req === -1) {
if (req === null) {
break;
}
PromisePrototypeCatch(callback(req), promiseErrorHandler);
Expand Down
3 changes: 1 addition & 2 deletions ext/http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ repository.workspace = true
description = "HTTP server implementation for Deno"

[features]
"__zombie_http_tracking" = []
"__http_tracing" = []

[lib]
Expand Down Expand Up @@ -46,12 +45,12 @@ pin-project.workspace = true
ring.workspace = true
scopeguard.workspace = true
serde.workspace = true
slab.workspace = true
smallvec.workspace = true
thiserror.workspace = true
tokio.workspace = true
tokio-util = { workspace = true, features = ["io"] }

[dev-dependencies]
bencher.workspace = true
http-body-util = "=0.1.0-rc.3"
rand.workspace = true

0 comments on commit 542314a

Please sign in to comment.