From 542314a0becbba120dbee13b3f410f647b4c9cb7 Mon Sep 17 00:00:00 2001 From: Laurence Rowe Date: Mon, 13 Nov 2023 06:04:49 -0800 Subject: [PATCH] refactor(ext/http): refer to HttpRecord directly using v8::External (#20770) Makes the JavaScript Request use a v8:External opaque pointer to directly refer to the Rust HttpRecord. The HttpRecord is now reference counted. To avoid leaks the strong count is checked at request completion. Performance seems unchanged on the minimal benchmark. 118614 req/s this branch vs 118564 req/s on main, but variance between runs on my laptop is pretty high. --------- Co-authored-by: Matt Mastracci --- Cargo.lock | 15 +- ext/http/00_serve.js | 59 +++--- ext/http/Cargo.toml | 3 +- ext/http/http_next.rs | 230 ++++++++++++++-------- ext/http/lib.rs | 2 +- ext/http/response_body.rs | 3 +- ext/http/service.rs | 401 ++++++++++++++++++++++++++++++++++++++ ext/http/slab.rs | 374 ----------------------------------- 8 files changed, 602 insertions(+), 485 deletions(-) create mode 100644 ext/http/service.rs delete mode 100644 ext/http/slab.rs diff --git a/Cargo.lock b/Cargo.lock index 6b7f42c11bd81..7c9816668561a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1285,6 +1285,7 @@ dependencies = [ "flate2", "fly-accept-encoding", "http", + "http-body-util", "httparse", "hyper 0.14.27", "hyper 1.0.0-rc.4", @@ -1298,7 +1299,6 @@ dependencies = [ "ring", "scopeguard", "serde", - "slab", "smallvec", "thiserror", "tokio", @@ -2716,6 +2716,19 @@ dependencies = [ "http", ] +[[package]] +name = "http-body-util" +version = "0.1.0-rc.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08ef12f041acdd397010e5fb6433270c147d3b8b2d0a840cd7fff8e531dca5c8" +dependencies = [ + "bytes", + "futures-util", + "http", + "http-body 1.0.0-rc.2", + "pin-project-lite", +] + [[package]] name = "httparse" version = "1.8.0" diff --git a/ext/http/00_serve.js b/ext/http/00_serve.js index fbd2014a7cb56..05e0bb5c33a4e 100644 --- a/ext/http/00_serve.js +++ b/ext/http/00_serve.js @@ -117,11 +117,11 @@ function upgradeHttpRaw(req, conn) { function addTrailers(resp, headerList) { const inner = toInnerResponse(resp); - op_http_set_response_trailers(inner.slabId, headerList); + op_http_set_response_trailers(inner.external, headerList); } class InnerRequest { - #slabId; + #external; #context; #methodAndUri; #streamRid; @@ -129,14 +129,14 @@ class InnerRequest { #upgraded; #urlValue; - constructor(slabId, context) { - this.#slabId = slabId; + constructor(external, context) { + this.#external = external; this.#context = context; this.#upgraded = false; } close() { - this.#slabId = undefined; + this.#external = null; } get [_upgraded]() { @@ -147,7 +147,7 @@ class InnerRequest { if (this.#upgraded) { throw new Deno.errors.Http("already upgraded"); } - if (this.#slabId === undefined) { + if (this.#external === null) { throw new Deno.errors.Http("already closed"); } @@ -159,7 +159,7 @@ class InnerRequest { // upgradeHttpRaw is sync if (upgradeType == "upgradeHttpRaw") { - const slabId = this.#slabId; + const external = this.#external; const underlyingConn = originalArgs[0]; this.url(); @@ -168,7 +168,7 @@ class InnerRequest { this.#upgraded = () => {}; - const upgradeRid = op_http_upgrade_raw(slabId); + const upgradeRid = op_http_upgrade_raw(external); const conn = new TcpConn( upgradeRid, @@ -184,7 +184,7 @@ class InnerRequest { const response = originalArgs[0]; const ws = originalArgs[1]; - const slabId = this.#slabId; + const external = this.#external; this.url(); this.headerList; @@ -194,15 +194,16 @@ class InnerRequest { this.#upgraded = () => { goAhead.resolve(); }; + const wsPromise = op_http_upgrade_websocket_next( + external, + response.headerList, + ); // Start the upgrade in the background. (async () => { try { // Returns the upgraded websocket connection - const wsRid = await op_http_upgrade_websocket_next( - slabId, - response.headerList, - ); + const wsRid = await wsPromise; // We have to wait for the go-ahead signal await goAhead; @@ -236,12 +237,12 @@ class InnerRequest { } if (this.#methodAndUri === undefined) { - if (this.#slabId === undefined) { + if (this.#external === null) { throw new TypeError("request closed"); } // TODO(mmastrac): This is quite slow as we're serializing a large number of values. We may want to consider // splitting this up into multiple ops. - this.#methodAndUri = op_http_get_request_method_and_url(this.#slabId); + this.#methodAndUri = op_http_get_request_method_and_url(this.#external); } const path = this.#methodAndUri[2]; @@ -281,10 +282,10 @@ class InnerRequest { }; } if (this.#methodAndUri === undefined) { - if (this.#slabId === undefined) { + if (this.#external === null) { throw new TypeError("request closed"); } - this.#methodAndUri = op_http_get_request_method_and_url(this.#slabId); + this.#methodAndUri = op_http_get_request_method_and_url(this.#external); } return { transport: "tcp", @@ -295,16 +296,16 @@ class InnerRequest { get method() { if (this.#methodAndUri === undefined) { - if (this.#slabId === undefined) { + if (this.#external === null) { throw new TypeError("request closed"); } - this.#methodAndUri = op_http_get_request_method_and_url(this.#slabId); + this.#methodAndUri = op_http_get_request_method_and_url(this.#external); } return this.#methodAndUri[0]; } get body() { - if (this.#slabId === undefined) { + if (this.#external === null) { throw new TypeError("request closed"); } if (this.#body !== undefined) { @@ -316,25 +317,25 @@ class InnerRequest { this.#body = null; return null; } - this.#streamRid = op_http_read_request_body(this.#slabId); + this.#streamRid = op_http_read_request_body(this.#external); this.#body = new InnerBody(readableStreamForRid(this.#streamRid, false)); return this.#body; } get headerList() { - if (this.#slabId === undefined) { + if (this.#external === null) { throw new TypeError("request closed"); } const headers = []; - const reqHeaders = op_http_get_request_headers(this.#slabId); + const reqHeaders = op_http_get_request_headers(this.#external); for (let i = 0; i < reqHeaders.length; i += 2) { ArrayPrototypePush(headers, [reqHeaders[i], reqHeaders[i + 1]]); } return headers; } - get slabId() { - return this.#slabId; + get external() { + return this.#external; } } @@ -483,8 +484,8 @@ function mapToCallback(context, callback, onError) { // Did everything shut down while we were waiting? if (context.closed) { // We're shutting down, so this status shouldn't make it back to the client but "Service Unavailable" seems appropriate - op_http_set_promise_complete(req, 503); innerRequest?.close(); + op_http_set_promise_complete(req, 503); return; } @@ -498,8 +499,8 @@ function mapToCallback(context, callback, onError) { } } - fastSyncResponseOrStream(req, inner.body, status); innerRequest?.close(); + fastSyncResponseOrStream(req, inner.body, status); }; } @@ -659,7 +660,7 @@ function serveHttpOn(context, callback) { try { // Attempt to pull as many requests out of the queue as possible before awaiting. This API is // a synchronous, non-blocking API that returns u32::MAX if anything goes wrong. - while ((req = op_http_try_wait(rid)) !== -1) { + while ((req = op_http_try_wait(rid)) !== null) { PromisePrototypeCatch(callback(req), promiseErrorHandler); } currentPromise = op_http_wait(rid); @@ -677,7 +678,7 @@ function serveHttpOn(context, callback) { } throw new Deno.errors.Http(error); } - if (req === -1) { + if (req === null) { break; } PromisePrototypeCatch(callback(req), promiseErrorHandler); diff --git a/ext/http/Cargo.toml b/ext/http/Cargo.toml index ff44a699243e3..923d9e2eab55c 100644 --- a/ext/http/Cargo.toml +++ b/ext/http/Cargo.toml @@ -11,7 +11,6 @@ repository.workspace = true description = "HTTP server implementation for Deno" [features] -"__zombie_http_tracking" = [] "__http_tracing" = [] [lib] @@ -46,7 +45,6 @@ pin-project.workspace = true ring.workspace = true scopeguard.workspace = true serde.workspace = true -slab.workspace = true smallvec.workspace = true thiserror.workspace = true tokio.workspace = true @@ -54,4 +52,5 @@ tokio-util = { workspace = true, features = ["io"] } [dev-dependencies] bencher.workspace = true +http-body-util = "=0.1.0-rc.3" rand.workspace = true diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs index 7fc396b38683c..399515159eb74 100644 --- a/ext/http/http_next.rs +++ b/ext/http/http_next.rs @@ -10,12 +10,11 @@ use crate::request_properties::HttpPropertyExtractor; use crate::response_body::Compression; use crate::response_body::ResponseBytes; use crate::response_body::ResponseBytesInner; -use crate::slab::new_slab_future; -use crate::slab::slab_get; -use crate::slab::slab_init; -use crate::slab::HttpRequestBodyAutocloser; -use crate::slab::RefCount; -use crate::slab::SlabId; +use crate::service::handle_request; +use crate::service::http_trace; +use crate::service::HttpRecord; +use crate::service::HttpRequestBodyAutocloser; +use crate::service::RefCount; use crate::websocket_upgrade::WebSocketUpgrade; use crate::LocalExecutor; use cache_control::CacheControl; @@ -33,6 +32,7 @@ use deno_core::ByteString; use deno_core::CancelFuture; use deno_core::CancelHandle; use deno_core::CancelTryFuture; +use deno_core::ExternalPointer; use deno_core::JsBuffer; use deno_core::OpState; use deno_core::RcRef; @@ -62,9 +62,11 @@ use once_cell::sync::Lazy; use smallvec::SmallVec; use std::borrow::Cow; use std::cell::RefCell; +use std::ffi::c_void; use std::future::Future; use std::io; use std::pin::Pin; +use std::ptr::null; use std::rc::Rc; use std::time::Duration; @@ -116,14 +118,66 @@ impl< { } +#[repr(transparent)] +struct RcHttpRecord(Rc); + +// Temp copy +/// Define an external type. +macro_rules! external { + ($type:ident, $name:literal) => { + impl deno_core::Externalizable for $type { + fn external_marker() -> usize { + // Use the address of a static mut as a way to get around lack of usize-sized TypeId. Because it is mutable, the + // compiler cannot collapse multiple definitions into one. + static mut DEFINITION: deno_core::ExternalDefinition = + deno_core::ExternalDefinition::new($name); + // Wash the pointer through black_box so the compiler cannot see what we're going to do with it and needs + // to assume it will be used for valid purposes. + // SAFETY: temporary while waiting on deno core bump + let ptr = std::hint::black_box(unsafe { &mut DEFINITION } as *mut _); + ptr as usize + } + + fn external_name() -> &'static str { + $name + } + } + }; +} + +// Register the [`HttpRecord`] as an external. +external!(RcHttpRecord, "http record"); + +/// Construct Rc from raw external pointer, consuming +/// refcount. You must make sure the external is deleted on the JS side. +macro_rules! take_external { + ($external:expr, $args:tt) => {{ + let ptr = ExternalPointer::::from_raw($external); + let record = ptr.unsafely_take().0; + http_trace!(record, $args); + record + }}; +} + +/// Clone Rc from raw external pointer. +macro_rules! clone_external { + ($external:expr, $args:tt) => {{ + let ptr = ExternalPointer::::from_raw($external); + ptr.unsafely_deref().0.clone() + }}; +} + #[op2(fast)] #[smi] pub fn op_http_upgrade_raw( state: &mut OpState, - #[smi] slab_id: SlabId, + external: *const c_void, ) -> Result { + // SAFETY: external is deleted before calling this op. + let http = unsafe { take_external!(external, "op_http_upgrade_raw") }; + // Stage 1: extract the upgrade future - let upgrade = slab_get(slab_id).upgrade()?; + let upgrade = http.upgrade()?; let (read, write) = tokio::io::duplex(1024); let (read_rx, write_tx) = tokio::io::split(read); let (mut write_rx, mut read_tx) = tokio::io::split(write); @@ -137,7 +191,6 @@ pub fn op_http_upgrade_raw( match upgrade_stream.write(&buf[..read]) { Ok(None) => continue, Ok(Some((response, bytes))) => { - let mut http = slab_get(slab_id); *http.response() = response; http.complete(); let mut upgraded = TokioIo::new(upgrade.await?); @@ -188,20 +241,23 @@ pub fn op_http_upgrade_raw( #[smi] pub async fn op_http_upgrade_websocket_next( state: Rc>, - #[smi] slab_id: SlabId, + external: *const c_void, #[serde] headers: Vec<(ByteString, ByteString)>, ) -> Result { - let mut http = slab_get(slab_id); + let http = + // SAFETY: external is deleted before calling this op. + unsafe { take_external!(external, "op_http_upgrade_websocket_next") }; // Stage 1: set the response to 101 Switching Protocols and send it let upgrade = http.upgrade()?; - - let response = http.response(); - *response.status_mut() = StatusCode::SWITCHING_PROTOCOLS; - for (name, value) in headers { - response.headers_mut().append( - HeaderName::from_bytes(&name).unwrap(), - HeaderValue::from_bytes(&value).unwrap(), - ); + { + let mut response = http.response(); + *response.status_mut() = StatusCode::SWITCHING_PROTOCOLS; + for (name, value) in headers { + response.headers_mut().append( + HeaderName::from_bytes(&name).unwrap(), + HeaderValue::from_bytes(&value).unwrap(), + ); + } } http.complete(); @@ -214,8 +270,10 @@ pub async fn op_http_upgrade_websocket_next( } #[op2(fast)] -pub fn op_http_set_promise_complete(#[smi] slab_id: SlabId, status: u16) { - let mut http = slab_get(slab_id); +pub fn op_http_set_promise_complete(external: *const c_void, status: u16) { + let http = + // SAFETY: external is deleted before calling this op. + unsafe { take_external!(external, "op_http_set_promise_complete") }; // The Javascript code should never provide a status that is invalid here (see 23_response.js), so we // will quitely ignore invalid values. if let Ok(code) = StatusCode::from_u16(status) { @@ -227,16 +285,18 @@ pub fn op_http_set_promise_complete(#[smi] slab_id: SlabId, status: u16) { #[op2] pub fn op_http_get_request_method_and_url<'scope, HTTP>( scope: &mut v8::HandleScope<'scope>, - #[smi] slab_id: SlabId, + external: *const c_void, ) -> v8::Local<'scope, v8::Array> where HTTP: HttpPropertyExtractor, { - let http = slab_get(slab_id); + let http = + // SAFETY: op is called with external. + unsafe { clone_external!(external, "op_http_get_request_method_and_url") }; let request_info = http.request_info(); let request_parts = http.request_parts(); let request_properties = HTTP::request_properties( - request_info, + &request_info, &request_parts.uri, &request_parts.headers, ); @@ -291,20 +351,25 @@ where #[op2] #[serde] pub fn op_http_get_request_header( - #[smi] slab_id: SlabId, + external: *const c_void, #[string] name: String, ) -> Option { - let http = slab_get(slab_id); - let value = http.request_parts().headers.get(name); + let http = + // SAFETY: op is called with external. + unsafe { clone_external!(external, "op_http_get_request_header") }; + let request_parts = http.request_parts(); + let value = request_parts.headers.get(name); value.map(|value| value.as_bytes().into()) } #[op2] pub fn op_http_get_request_headers<'scope>( scope: &mut v8::HandleScope<'scope>, - #[smi] slab_id: SlabId, + external: *const c_void, ) -> v8::Local<'scope, v8::Array> { - let http = slab_get(slab_id); + let http = + // SAFETY: op is called with external. + unsafe { clone_external!(external, "op_http_get_request_headers") }; let headers = &http.request_parts().headers; // Two slots for each header key/value pair let mut vec: SmallVec<[v8::Local; 32]> = @@ -371,9 +436,11 @@ pub fn op_http_get_request_headers<'scope>( #[smi] pub fn op_http_read_request_body( state: Rc>, - #[smi] slab_id: SlabId, + external: *const c_void, ) -> ResourceId { - let mut http = slab_get(slab_id); + let http = + // SAFETY: op is called with external. + unsafe { clone_external!(external, "op_http_read_request_body") }; let rid = if let Some(incoming) = http.take_body() { let body_resource = Rc::new(HttpRequestBody::new(incoming)); state.borrow_mut().resource_table.add_rc(body_resource) @@ -388,12 +455,15 @@ pub fn op_http_read_request_body( #[op2(fast)] pub fn op_http_set_response_header( - #[smi] slab_id: SlabId, + external: *const c_void, #[string(onebyte)] name: Cow<[u8]>, #[string(onebyte)] value: Cow<[u8]>, ) { - let mut http = slab_get(slab_id); - let resp_headers = http.response().headers_mut(); + let http = + // SAFETY: op is called with external. + unsafe { clone_external!(external, "op_http_set_response_header") }; + let mut response = http.response(); + let resp_headers = response.headers_mut(); // These are valid latin-1 strings let name = HeaderName::from_bytes(&name).unwrap(); let value = match value { @@ -409,12 +479,15 @@ pub fn op_http_set_response_header( #[op2] pub fn op_http_set_response_headers( scope: &mut v8::HandleScope, - #[smi] slab_id: SlabId, + external: *const c_void, headers: v8::Local, ) { - let mut http = slab_get(slab_id); + let http = + // SAFETY: op is called with external. + unsafe { clone_external!(external, "op_http_set_response_headers") }; // TODO(mmastrac): Invalid headers should be handled? - let resp_headers = http.response().headers_mut(); + let mut response = http.response(); + let resp_headers = response.headers_mut(); let len = headers.length(); let header_len = len * 2; @@ -438,10 +511,12 @@ pub fn op_http_set_response_headers( #[op2] pub fn op_http_set_response_trailers( - #[smi] slab_id: SlabId, + external: *const c_void, #[serde] trailers: Vec<(ByteString, ByteString)>, ) { - let mut http = slab_get(slab_id); + let http = + // SAFETY: op is called with external. + unsafe { clone_external!(external, "op_http_set_response_trailers") }; let mut trailer_map: HeaderMap = HeaderMap::with_capacity(trailers.len()); for (name, value) in trailers { // These are valid latin-1 strings @@ -577,20 +652,21 @@ fn ensure_vary_accept_encoding(hmap: &mut HeaderMap) { /// Sets the appropriate response body. Use `force_instantiate_body` if you need /// to ensure that the response is cleaned up correctly (eg: for resources). fn set_response( - slab_id: SlabId, + external: *const c_void, length: Option, status: u16, force_instantiate_body: bool, response_fn: impl FnOnce(Compression) -> ResponseBytesInner, ) { - let mut http = slab_get(slab_id); + // SAFETY: external is deleted before calling this op. + let http = unsafe { take_external!(external, "set_response") }; // The request may have been cancelled by this point and if so, there's no need for us to // do all of this work to send the response. if !http.cancelled() { let resource = http.take_resource(); let compression = is_request_compressible(length, &http.request_parts().headers); - let response = http.response(); + let mut response = http.response(); let compression = modify_compressibility_from_response(compression, response.headers_mut()); response @@ -612,7 +688,7 @@ fn set_response( #[op2(fast)] pub fn op_http_set_response_body_resource( state: Rc>, - #[smi] slab_id: SlabId, + external: *const c_void, #[smi] stream_rid: ResourceId, auto_close: bool, status: u16, @@ -634,7 +710,7 @@ pub fn op_http_set_response_body_resource( }; set_response( - slab_id, + external, resource.size_hint().1.map(|s| s as usize), status, true, @@ -648,41 +724,42 @@ pub fn op_http_set_response_body_resource( #[op2(fast)] pub fn op_http_set_response_body_text( - #[smi] slab_id: SlabId, + external: *const c_void, #[string] text: String, status: u16, ) { if !text.is_empty() { - set_response(slab_id, Some(text.len()), status, false, |compression| { + set_response(external, Some(text.len()), status, false, |compression| { ResponseBytesInner::from_vec(compression, text.into_bytes()) }); } else { - op_http_set_promise_complete::call(slab_id, status); + op_http_set_promise_complete::call(external, status); } } #[op2(fast)] pub fn op_http_set_response_body_bytes( - #[smi] slab_id: SlabId, + external: *const c_void, #[buffer] buffer: JsBuffer, status: u16, ) { if !buffer.is_empty() { - set_response(slab_id, Some(buffer.len()), status, false, |compression| { + set_response(external, Some(buffer.len()), status, false, |compression| { ResponseBytesInner::from_bufview(compression, BufView::from(buffer)) }); } else { - op_http_set_promise_complete::call(slab_id, status); + op_http_set_promise_complete::call(external, status); } } #[op2(async)] pub async fn op_http_track( state: Rc>, - #[smi] slab_id: SlabId, + external: *const c_void, #[smi] server_rid: ResourceId, ) -> Result<(), AnyError> { - let http = slab_get(slab_id); + // SAFETY: op is called with external. + let http = unsafe { clone_external!(external, "op_http_track") }; let handle = http.body_promise(); let join_handle = state @@ -764,7 +841,7 @@ fn serve_https( mut io: TlsStream, request_info: HttpConnectionProperties, lifetime: HttpLifetime, - tx: tokio::sync::mpsc::Sender, + tx: tokio::sync::mpsc::Sender>, ) -> JoinHandle> { let HttpLifetime { refcount, @@ -773,7 +850,7 @@ fn serve_https( } = lifetime; let svc = service_fn(move |req: Request| { - new_slab_future(req, request_info.clone(), refcount.clone(), tx.clone()) + handle_request(req, request_info.clone(), refcount.clone(), tx.clone()) }); spawn( async { @@ -801,7 +878,7 @@ fn serve_http( io: impl HttpServeStream, request_info: HttpConnectionProperties, lifetime: HttpLifetime, - tx: tokio::sync::mpsc::Sender, + tx: tokio::sync::mpsc::Sender>, ) -> JoinHandle> { let HttpLifetime { refcount, @@ -810,7 +887,7 @@ fn serve_http( } = lifetime; let svc = service_fn(move |req: Request| { - new_slab_future(req, request_info.clone(), refcount.clone(), tx.clone()) + handle_request(req, request_info.clone(), refcount.clone(), tx.clone()) }); spawn( serve_http2_autodetect(io, svc, listen_cancel_handle) @@ -822,7 +899,7 @@ fn serve_http_on( connection: HTTP::Connection, listen_properties: &HttpListenProperties, lifetime: HttpLifetime, - tx: tokio::sync::mpsc::Sender, + tx: tokio::sync::mpsc::Sender>, ) -> JoinHandle> where HTTP: HttpPropertyExtractor, @@ -857,12 +934,12 @@ struct HttpJoinHandle { join_handle: AsyncRefCell>>>, connection_cancel_handle: Rc, listen_cancel_handle: Rc, - rx: AsyncRefCell>, + rx: AsyncRefCell>>, refcount: RefCount, } impl HttpJoinHandle { - fn new(rx: tokio::sync::mpsc::Receiver) -> Self { + fn new(rx: tokio::sync::mpsc::Receiver>) -> Self { Self { join_handle: AsyncRefCell::new(None), connection_cancel_handle: CancelHandle::new_rc(), @@ -918,8 +995,6 @@ pub fn op_http_serve( where HTTP: HttpPropertyExtractor, { - slab_init(); - let listener = HTTP::get_listener_for_rid(&mut state.borrow_mut(), listener_rid)?; @@ -969,8 +1044,6 @@ pub fn op_http_serve_on( where HTTP: HttpPropertyExtractor, { - slab_init(); - let connection = HTTP::get_connection_for_rid(&mut state.borrow_mut(), connection_rid)?; @@ -1000,36 +1073,38 @@ where } /// Synchronous, non-blocking call to see if there are any further HTTP requests. If anything -/// goes wrong in this method we return [`SlabId::MAX`] and let the async handler pick up the real error. +/// goes wrong in this method we return null and let the async handler pick up the real error. #[op2(fast)] -#[smi] -pub fn op_http_try_wait(state: &mut OpState, #[smi] rid: ResourceId) -> SlabId { +pub fn op_http_try_wait( + state: &mut OpState, + #[smi] rid: ResourceId, +) -> *const c_void { // The resource needs to exist. let Ok(join_handle) = state.resource_table.get::(rid) else { - return SlabId::MAX; + return null(); }; // If join handle is somehow locked, just abort. let Some(mut handle) = RcRef::map(&join_handle, |this| &this.rx).try_borrow_mut() else { - return SlabId::MAX; + return null(); }; // See if there are any requests waiting on this channel. If not, return. - let Ok(id) = handle.try_recv() else { - return SlabId::MAX; + let Ok(record) = handle.try_recv() else { + return null(); }; - id + let ptr = ExternalPointer::new(RcHttpRecord(record)); + ptr.into_raw() } #[op2(async)] -#[smi] pub async fn op_http_wait( state: Rc>, #[smi] rid: ResourceId, -) -> Result { +) -> Result<*const c_void, AnyError> { // We will get the join handle initially, as we might be consuming requests still let join_handle = state .borrow_mut() @@ -1046,8 +1121,9 @@ pub async fn op_http_wait( .await; // Do we have a request? - if let Some(req) = next { - return Ok(req); + if let Some(record) = next { + let ptr = ExternalPointer::new(RcHttpRecord(record)); + return Ok(ptr.into_raw()); } // No - we're shutting down @@ -1063,14 +1139,14 @@ pub async fn op_http_wait( if let Some(err) = err.source() { if let Some(err) = err.downcast_ref::() { if err.kind() == io::ErrorKind::NotConnected { - return Ok(SlabId::MAX); + return Ok(null()); } } } return Err(err); } - Ok(SlabId::MAX) + Ok(null()) } /// Cancels the HTTP handle. diff --git a/ext/http/lib.rs b/ext/http/lib.rs index d470111195063..0460a3707f9d0 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -86,7 +86,7 @@ mod reader_stream; mod request_body; mod request_properties; mod response_body; -mod slab; +mod service; mod websocket_upgrade; pub use request_properties::DefaultHttpPropertyExtractor; diff --git a/ext/http/response_body.rs b/ext/http/response_body.rs index 4f7e3b0a5d439..7d91dce6b4e3e 100644 --- a/ext/http/response_body.rs +++ b/ext/http/response_body.rs @@ -23,7 +23,7 @@ use hyper1::body::Frame; use hyper1::body::SizeHint; use pin_project::pin_project; -use crate::slab::HttpRequestBodyAutocloser; +use crate::service::HttpRequestBodyAutocloser; /// Simplification for nested types we use for our streams. We provide a way to convert from /// this type into Hyper's body [`Frame`]. @@ -80,6 +80,7 @@ impl CompletionHandle { } } + #[allow(dead_code)] pub fn is_completed(&self) -> bool { self.inner.borrow().complete } diff --git a/ext/http/service.rs b/ext/http/service.rs new file mode 100644 index 0000000000000..ea67980f3e0e5 --- /dev/null +++ b/ext/http/service.rs @@ -0,0 +1,401 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +use crate::request_properties::HttpConnectionProperties; +use crate::response_body::CompletionHandle; +use crate::response_body::ResponseBytes; +use deno_core::error::AnyError; +use deno_core::OpState; +use deno_core::ResourceId; +use http::request::Parts; +use http::HeaderMap; +use hyper1::body::Incoming; +use hyper1::upgrade::OnUpgrade; + +use scopeguard::guard; +use scopeguard::ScopeGuard; +use std::cell::Ref; +use std::cell::RefCell; +use std::cell::RefMut; +use std::future::Future; +use std::rc::Rc; + +pub type Request = hyper1::Request; +pub type Response = hyper1::Response; + +macro_rules! http_trace { + ($record:expr, $args:tt) => { + #[cfg(feature = "__http_tracing")] + { + println!( + "HTTP id={:p} strong={}: {}", + $record, + std::rc::Rc::strong_count(&$record), + format!($args), + ); + } + }; +} + +pub(crate) use http_trace; + +#[repr(transparent)] +#[derive(Clone, Default)] +pub struct RefCount(pub Rc<()>); + +enum RequestBodyState { + Incoming(Incoming), + Resource(HttpRequestBodyAutocloser), +} + +impl From for RequestBodyState { + fn from(value: Incoming) -> Self { + RequestBodyState::Incoming(value) + } +} + +/// Ensures that the request body closes itself when no longer needed. +pub struct HttpRequestBodyAutocloser(ResourceId, Rc>); + +impl HttpRequestBodyAutocloser { + pub fn new(res: ResourceId, op_state: Rc>) -> Self { + Self(res, op_state) + } +} + +impl Drop for HttpRequestBodyAutocloser { + fn drop(&mut self) { + if let Ok(res) = self.1.borrow_mut().resource_table.take_any(self.0) { + res.close(); + } + } +} + +pub async fn handle_request( + request: Request, + request_info: HttpConnectionProperties, + _refcount: RefCount, // Keep server alive for duration of this future. + tx: tokio::sync::mpsc::Sender>, +) -> Result { + // If the underlying TCP connection is closed, this future will be dropped + // and execution could stop at any await point. + // The HttpRecord must live until JavaScript is done processing so is wrapped + // in an Rc. The guard ensures unneeded resources are freed at cancellation. + let guarded_record = + guard(HttpRecord::new(request, request_info), HttpRecord::cancel); + + // Clone HttpRecord and send to JavaScript for processing. + // Safe to unwrap as channel receiver is never closed. + tx.send(guarded_record.clone()).await.unwrap(); + + // Wait for JavaScript handler to return request. + http_trace!(*guarded_record, "handle_request response_ready.await"); + guarded_record.response_ready().await; + + // Defuse the guard. Must not await after the point. + let record = ScopeGuard::into_inner(guarded_record); + http_trace!(record, "handle_request complete"); + assert!( + Rc::strong_count(&record) == 1, + "HTTP state error: Expected to be last strong reference (handle_request)" + ); + let response = record.take_response(); + Ok(response) +} + +struct HttpRecordInner { + request_info: HttpConnectionProperties, + request_parts: Parts, + request_body: Option, + /// The response may get taken before we tear this down + response: Option, + response_ready: bool, + response_waker: Option, + trailers: Rc>>, + been_dropped: bool, +} + +pub struct HttpRecord(RefCell); + +#[cfg(feature = "__http_tracing")] +pub static RECORD_COUNT: std::sync::atomic::AtomicUsize = + std::sync::atomic::AtomicUsize::new(0); + +#[cfg(feature = "__http_tracing")] +impl Drop for HttpRecord { + fn drop(&mut self) { + let count = RECORD_COUNT + .fetch_sub(1, std::sync::atomic::Ordering::SeqCst) + .checked_sub(1) + .expect("Count went below zero"); + println!("HTTP count={count}: HttpRecord::drop"); + } +} + +impl HttpRecord { + fn new(request: Request, request_info: HttpConnectionProperties) -> Rc { + #[cfg(feature = "__http_tracing")] + { + RECORD_COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + } + let (request_parts, request_body) = request.into_parts(); + let body = ResponseBytes::default(); + let trailers = body.trailers(); + let request_body = Some(request_body.into()); + let inner = HttpRecordInner { + request_info, + request_parts, + request_body, + response: Some(Response::new(body)), + response_ready: false, + response_waker: None, + trailers, + been_dropped: false, + }; + #[allow(clippy::let_and_return)] + let record = Rc::new(Self(RefCell::new(inner))); + http_trace!(record, "HttpRecord::new"); + record + } + + fn self_ref(&self) -> Ref<'_, HttpRecordInner> { + self.0.borrow() + } + + fn self_mut(&self) -> RefMut<'_, HttpRecordInner> { + self.0.borrow_mut() + } + + /// Perform the Hyper upgrade on this record. + pub fn upgrade(&self) -> Result { + // Manually perform the upgrade. We're peeking into hyper's underlying machinery here a bit + self + .self_mut() + .request_parts + .extensions + .remove::() + .ok_or_else(|| AnyError::msg("upgrade unavailable")) + } + + /// Take the Hyper body from this record. + pub fn take_body(&self) -> Option { + let body_holder = &mut self.self_mut().request_body; + let body = body_holder.take(); + match body { + Some(RequestBodyState::Incoming(body)) => Some(body), + x => { + *body_holder = x; + None + } + } + } + + pub fn take_resource(&self) -> Option { + let body_holder = &mut self.self_mut().request_body; + let body = body_holder.take(); + match body { + Some(RequestBodyState::Resource(res)) => Some(res), + x => { + *body_holder = x; + None + } + } + } + + /// Replace the request body with a resource ID and the OpState we'll need to shut it down. + /// We cannot keep just the resource itself, as JS code might be reading from the resource ID + /// to generate the response data (requiring us to keep it in the resource table). + pub fn put_resource(&self, res: HttpRequestBodyAutocloser) { + self.self_mut().request_body = Some(RequestBodyState::Resource(res)); + } + + /// Cleanup resources not needed after the future is dropped. + fn cancel(self: Rc) { + http_trace!(self, "HttpRecord::cancel"); + let mut inner = self.0.borrow_mut(); + inner.been_dropped = true; + // The request body might include actual resources. + inner.request_body.take(); + } + + /// Complete this record, potentially expunging it if it is fully complete (ie: cancelled as well). + pub fn complete(self: Rc) { + http_trace!(self, "HttpRecord::complete"); + let mut inner = self.self_mut(); + assert!( + !inner.been_dropped || Rc::strong_count(&self) == 1, + "HTTP state error: Expected to be last strong reference (been_dropped)" + ); + assert!( + !inner.response_ready, + "HTTP state error: Entry has already been completed" + ); + inner.response_ready = true; + if let Some(waker) = inner.response_waker.take() { + drop(inner); + waker.wake(); + } + } + + /// Has the future for this record been dropped? ie, has the underlying TCP connection + /// been closed? + pub fn cancelled(&self) -> bool { + self.self_ref().been_dropped + } + + /// Get a mutable reference to the response. + pub fn response(&self) -> RefMut<'_, Response> { + RefMut::map(self.self_mut(), |inner| inner.response.as_mut().unwrap()) + } + + /// Get a mutable reference to the trailers. + pub fn trailers(&self) -> Ref<'_, Rc>>> { + Ref::map(self.self_ref(), |inner| &inner.trailers) + } + + /// Take the response. + fn take_response(&self) -> Response { + self.self_mut().response.take().unwrap() + } + + /// Get a reference to the connection properties. + pub fn request_info(&self) -> Ref<'_, HttpConnectionProperties> { + Ref::map(self.self_ref(), |inner| &inner.request_info) + } + + /// Get a reference to the request parts. + pub fn request_parts(&self) -> Ref<'_, Parts> { + Ref::map(self.self_ref(), |inner| &inner.request_parts) + } + + /// Get a reference to the completion handle. + fn response_ready(&self) -> impl Future + '_ { + struct HttpRecordComplete<'a>(&'a HttpRecord); + + impl<'a> Future for HttpRecordComplete<'a> { + type Output = (); + + fn poll( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + let mut mut_self = self.0 .0.borrow_mut(); + if mut_self.response_ready { + return std::task::Poll::Ready(()); + } + mut_self.response_waker = Some(cx.waker().clone()); + std::task::Poll::Pending + } + } + + HttpRecordComplete(self) + } + + /// Get a reference to the response body completion handle. + pub fn body_promise(&self) -> CompletionHandle { + self + .self_ref() + .response + .as_ref() + .unwrap() + .body() + .completion_handle() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::hyper_util_tokioio::TokioIo; + use crate::response_body::Compression; + use crate::response_body::ResponseBytesInner; + use bytes::Buf; + use deno_net::raw::NetworkStreamType; + use hyper1::body::Body; + use hyper1::service::service_fn; + use hyper1::service::HttpService; + use std::error::Error as StdError; + + /// Execute client request on service and concurrently map the response. + async fn serve_request( + req: http::Request, + service: S, + map_response: impl FnOnce(hyper1::Response) -> F, + ) -> hyper1::Result + where + B: Body + Send + 'static, // Send bound due to DuplexStream + B::Data: Send, + B::Error: Into>, + S: HttpService, + S::Error: Into>, + S::ResBody: 'static, + ::Error: Into>, + F: std::future::Future>, + { + use hyper1::client::conn::http1::handshake; + use hyper1::server::conn::http1::Builder; + let (stream_client, stream_server) = tokio::io::duplex(16 * 1024); + let conn_server = + Builder::new().serve_connection(TokioIo::new(stream_server), service); + let (mut sender, conn_client) = + handshake(TokioIo::new(stream_client)).await?; + + let (res, _, _) = tokio::try_join!( + async move { + let res = sender.send_request(req).await?; + map_response(res).await + }, + conn_server, + conn_client, + )?; + Ok(res) + } + + #[tokio::test] + async fn test_handle_request() -> Result<(), AnyError> { + let (tx, mut rx) = tokio::sync::mpsc::channel(10); + let refcount = RefCount::default(); + let refcount_check = refcount.clone(); + let request_info = HttpConnectionProperties { + peer_address: "".into(), + peer_port: None, + local_port: None, + stream_type: NetworkStreamType::Tcp, + }; + let svc = service_fn(move |req: hyper1::Request| { + handle_request(req, request_info.clone(), refcount.clone(), tx.clone()) + }); + + let client_req = http::Request::builder().uri("/").body("".to_string())?; + + // Response produced by concurrent tasks + tokio::try_join!( + async move { + // JavaScript handler produces response + let record = rx.recv().await.unwrap(); + let resource = record.take_resource(); + record.response().body_mut().initialize( + ResponseBytesInner::from_vec( + Compression::None, + b"hello world".to_vec(), + ), + resource, + ); + record.complete(); + Ok(()) + }, + // Server connection executes service + async move { + serve_request(client_req, svc, |res| async { + // Client reads the response + use http_body_util::BodyExt; + assert_eq!(res.status(), 200); + let body = res.collect().await?.to_bytes(); + assert_eq!(body.chunk(), b"hello world"); + Ok(()) + }) + .await + }, + )?; + assert_eq!(Rc::strong_count(&refcount_check.0), 1); + Ok(()) + } +} diff --git a/ext/http/slab.rs b/ext/http/slab.rs deleted file mode 100644 index 790b4649a20cc..0000000000000 --- a/ext/http/slab.rs +++ /dev/null @@ -1,374 +0,0 @@ -// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. -use crate::request_properties::HttpConnectionProperties; -use crate::response_body::CompletionHandle; -use crate::response_body::ResponseBytes; -use deno_core::error::AnyError; -use deno_core::OpState; -use deno_core::ResourceId; -use http::request::Parts; -use http::HeaderMap; -use hyper1::body::Incoming; -use hyper1::upgrade::OnUpgrade; - -use scopeguard::defer; -use slab::Slab; -use std::cell::RefCell; -use std::cell::RefMut; -use std::ptr::NonNull; -use std::rc::Rc; - -pub type Request = hyper1::Request; -pub type Response = hyper1::Response; -pub type SlabId = u32; - -#[repr(transparent)] -#[derive(Clone, Default)] -pub struct RefCount(pub Rc<()>); - -enum RequestBodyState { - Incoming(Incoming), - Resource(HttpRequestBodyAutocloser), -} - -impl From for RequestBodyState { - fn from(value: Incoming) -> Self { - RequestBodyState::Incoming(value) - } -} - -/// Ensures that the request body closes itself when no longer needed. -pub struct HttpRequestBodyAutocloser(ResourceId, Rc>); - -impl HttpRequestBodyAutocloser { - pub fn new(res: ResourceId, op_state: Rc>) -> Self { - Self(res, op_state) - } -} - -impl Drop for HttpRequestBodyAutocloser { - fn drop(&mut self) { - if let Ok(res) = self.1.borrow_mut().resource_table.take_any(self.0) { - res.close(); - } - } -} - -pub async fn new_slab_future( - request: Request, - request_info: HttpConnectionProperties, - refcount: RefCount, - tx: tokio::sync::mpsc::Sender, -) -> Result { - let index = slab_insert(request, request_info, refcount); - defer! { - slab_drop(index); - } - let rx = slab_get(index).promise(); - if tx.send(index).await.is_ok() { - http_trace!(index, "SlabFuture await"); - // We only need to wait for completion if we aren't closed - rx.await; - http_trace!(index, "SlabFuture complete"); - } - let response = slab_get(index).take_response(); - Ok(response) -} - -pub struct HttpSlabRecord { - request_info: HttpConnectionProperties, - request_parts: Parts, - request_body: Option, - /// The response may get taken before we tear this down - response: Option, - promise: CompletionHandle, - trailers: Rc>>, - been_dropped: bool, - /// Use a `Rc` to keep track of outstanding requests. We don't use this, but - /// when it drops, it decrements the refcount of the server itself. - refcount: Option, - #[cfg(feature = "__zombie_http_tracking")] - alive: bool, -} - -thread_local! { - pub(crate) static SLAB: RefCell> = const { RefCell::new(Slab::new()) }; -} - -macro_rules! http_trace { - ($index:expr, $args:tt) => { - #[cfg(feature = "__http_tracing")] - { - let total = $crate::slab::SLAB.with(|x| x.try_borrow().map(|x| x.len())); - if let Ok(total) = total { - println!("HTTP id={} total={}: {}", $index, total, format!($args)); - } else { - println!("HTTP id={} total=?: {}", $index, format!($args)); - } - } - }; -} - -pub(crate) use http_trace; - -/// Hold a lock on the slab table and a reference to one entry in the table. -pub struct SlabEntry( - NonNull, - SlabId, - RefMut<'static, Slab>, -); - -const SLAB_CAPACITY: usize = 1024; - -pub fn slab_init() { - SLAB.with(|slab: &RefCell>| { - // Note that there might already be an active HTTP server, so this may just - // end up adding room for an additional SLAB_CAPACITY items. All HTTP servers - // on a single thread share the same slab. - let mut slab = slab.borrow_mut(); - slab.reserve(SLAB_CAPACITY); - }) -} - -pub fn slab_get(index: SlabId) -> SlabEntry { - http_trace!(index, "slab_get"); - let mut lock: RefMut<'static, Slab> = SLAB.with(|x| { - // SAFETY: We're extracting a lock here and placing it into an object that is thread-local, !Send as a &'static - unsafe { std::mem::transmute(x.borrow_mut()) } - }); - let Some(entry) = lock.get_mut(index as usize) else { - panic!("HTTP state error: Attempted to access invalid request {} ({} in total available)", - index, - lock.len()) - }; - #[cfg(feature = "__zombie_http_tracking")] - { - assert!(entry.alive, "HTTP state error: Entry is not alive"); - } - let entry = NonNull::new(entry as _).unwrap(); - - SlabEntry(entry, index, lock) -} - -#[allow(clippy::let_and_return)] -fn slab_insert_raw( - request_parts: Parts, - request_body: Option, - request_info: HttpConnectionProperties, - refcount: RefCount, -) -> SlabId { - let index = SLAB.with(|slab| { - let mut slab = slab.borrow_mut(); - let body = ResponseBytes::default(); - let trailers = body.trailers(); - let request_body = request_body.map(|r| r.into()); - slab.insert(HttpSlabRecord { - request_info, - request_parts, - request_body, - response: Some(Response::new(body)), - trailers, - been_dropped: false, - promise: CompletionHandle::default(), - refcount: Some(refcount), - #[cfg(feature = "__zombie_http_tracking")] - alive: true, - }) - }) as u32; - http_trace!(index, "slab_insert"); - index -} - -pub fn slab_insert( - request: Request, - request_info: HttpConnectionProperties, - refcount: RefCount, -) -> SlabId { - let (request_parts, request_body) = request.into_parts(); - slab_insert_raw(request_parts, Some(request_body), request_info, refcount) -} - -pub fn slab_drop(index: SlabId) { - http_trace!(index, "slab_drop"); - let mut entry = slab_get(index); - let record = entry.self_mut(); - assert!( - !record.been_dropped, - "HTTP state error: Entry has already been dropped" - ); - - // The logic here is somewhat complicated. A slab record cannot be expunged until it has been dropped by Rust AND - // the promise has been completed (indicating that JavaScript is done processing). However, if Rust has finished - // dealing with this entry, we DO want to clean up some of the associated items -- namely the request body, which - // might include actual resources, and the refcount, which is keeping the server alive. - record.been_dropped = true; - if record.promise.is_completed() { - drop(entry); - slab_expunge(index); - } else { - // Take the request body, as the future has been dropped and this will allow some resources to close - record.request_body.take(); - // Take the refcount keeping the server alive. The future is no longer alive, which means this request - // is toast. - record.refcount.take(); - } -} - -fn slab_expunge(index: SlabId) { - SLAB.with(|slab| { - #[cfg(__zombie_http_tracking)] - { - slab.borrow_mut().get_mut(index as usize).unwrap().alive = false; - } - #[cfg(not(__zombie_http_tracking))] - { - slab.borrow_mut().remove(index as usize); - } - }); - http_trace!(index, "slab_expunge"); -} - -impl SlabEntry { - fn self_ref(&self) -> &HttpSlabRecord { - // SAFETY: We have the lock and we're borrowing lifetime from self - unsafe { self.0.as_ref() } - } - - fn self_mut(&mut self) -> &mut HttpSlabRecord { - // SAFETY: We have the lock and we're borrowing lifetime from self - unsafe { self.0.as_mut() } - } - - /// Perform the Hyper upgrade on this entry. - pub fn upgrade(&mut self) -> Result { - // Manually perform the upgrade. We're peeking into hyper's underlying machinery here a bit - self - .self_mut() - .request_parts - .extensions - .remove::() - .ok_or_else(|| AnyError::msg("upgrade unavailable")) - } - - /// Take the Hyper body from this entry. - pub fn take_body(&mut self) -> Option { - let body_holder = &mut self.self_mut().request_body; - let body = body_holder.take(); - match body { - Some(RequestBodyState::Incoming(body)) => Some(body), - x => { - *body_holder = x; - None - } - } - } - - pub fn take_resource(&mut self) -> Option { - let body_holder = &mut self.self_mut().request_body; - let body = body_holder.take(); - match body { - Some(RequestBodyState::Resource(res)) => Some(res), - x => { - *body_holder = x; - None - } - } - } - - /// Replace the request body with a resource ID and the OpState we'll need to shut it down. - /// We cannot keep just the resource itself, as JS code might be reading from the resource ID - /// to generate the response data (requiring us to keep it in the resource table). - pub fn put_resource(&mut self, res: HttpRequestBodyAutocloser) { - self.self_mut().request_body = Some(RequestBodyState::Resource(res)); - } - - /// Complete this entry, potentially expunging it if it is fully complete (ie: dropped as well). - pub fn complete(self) { - let promise = &self.self_ref().promise; - assert!( - !promise.is_completed(), - "HTTP state error: Entry has already been completed" - ); - http_trace!(self.1, "SlabEntry::complete"); - promise.complete(true); - // If we're all done, we need to drop ourself to release the lock before we expunge this record - if self.self_ref().been_dropped { - let index = self.1; - drop(self); - slab_expunge(index); - } - } - - /// Has the future for this entry been dropped? ie, has the underlying TCP connection - /// been closed? - pub fn cancelled(&self) -> bool { - self.self_ref().been_dropped - } - - /// Get a mutable reference to the response. - pub fn response(&mut self) -> &mut Response { - self.self_mut().response.as_mut().unwrap() - } - - /// Get a mutable reference to the trailers. - pub fn trailers(&mut self) -> &RefCell> { - &self.self_mut().trailers - } - - /// Take the response. - pub fn take_response(&mut self) -> Response { - self.self_mut().response.take().unwrap() - } - - /// Get a reference to the connection properties. - pub fn request_info(&self) -> &HttpConnectionProperties { - &self.self_ref().request_info - } - - /// Get a reference to the request parts. - pub fn request_parts(&self) -> &Parts { - &self.self_ref().request_parts - } - - /// Get a reference to the completion handle. - pub fn promise(&self) -> CompletionHandle { - self.self_ref().promise.clone() - } - - /// Get a reference to the response body completion handle. - pub fn body_promise(&self) -> CompletionHandle { - self - .self_ref() - .response - .as_ref() - .unwrap() - .body() - .completion_handle() - } -} - -#[cfg(test)] -mod tests { - use super::*; - use deno_net::raw::NetworkStreamType; - use http::Request; - - #[test] - fn test_slab() { - let req = Request::builder().body(()).unwrap(); - let (parts, _) = req.into_parts(); - let id = slab_insert_raw( - parts, - None, - HttpConnectionProperties { - peer_address: "".into(), - peer_port: None, - local_port: None, - stream_type: NetworkStreamType::Tcp, - }, - RefCount::default(), - ); - let entry = slab_get(id); - entry.complete(); - slab_drop(id); - } -}