Skip to content

Commit

Permalink
use scopeguard defer to handle drop
Browse files Browse the repository at this point in the history
  • Loading branch information
lrowe committed Sep 25, 2023
1 parent e29379a commit fd031af
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 52 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ rustls-pemfile = "1.0.0"
rustls-webpki = "0.101.4"
rustls-native-certs = "0.6.2"
webpki-roots = "0.25.2"
scopeguard = "1.2.0"
serde = { version = "1.0.149", features = ["derive"] }
serde_bytes = "0.11"
serde_json = "1.0.85"
Expand Down
1 change: 1 addition & 0 deletions ext/http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ percent-encoding.workspace = true
phf = { version = "0.10", features = ["macros"] }
pin-project.workspace = true
ring.workspace = true
scopeguard.workspace = true
serde.workspace = true
slab.workspace = true
smallvec.workspace = true
Expand Down
53 changes: 1 addition & 52 deletions ext/http/http_next.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,9 @@ use crate::request_properties::HttpPropertyExtractor;
use crate::response_body::Compression;
use crate::response_body::ResponseBytes;
use crate::response_body::ResponseBytesInner;
use crate::slab::http_trace;
use crate::slab::slab_drop;
use crate::slab::new_slab_future;
use crate::slab::slab_get;
use crate::slab::slab_init;
use crate::slab::slab_insert;
use crate::slab::HttpRequestBodyAutocloser;
use crate::slab::RefCount;
use crate::slab::SlabId;
Expand Down Expand Up @@ -61,8 +59,6 @@ use hyper1::service::service_fn;
use hyper1::service::HttpService;
use hyper1::StatusCode;
use once_cell::sync::Lazy;
use pin_project::pin_project;
use pin_project::pinned_drop;
use smallvec::SmallVec;
use std::borrow::Cow;
use std::cell::RefCell;
Expand All @@ -76,7 +72,6 @@ use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;

type Request = hyper1::Request<Incoming>;
type Response = hyper1::Response<ResponseBytes>;

static USE_WRITEV: Lazy<bool> = Lazy::new(|| {
let enable = std::env::var("DENO_USE_WRITEV").ok();
Expand Down Expand Up @@ -706,52 +701,6 @@ pub async fn op_http_track(
}
}

#[pin_project(PinnedDrop)]
pub struct SlabFuture<F: Future<Output = ()>>(SlabId, #[pin] F);

pub fn new_slab_future(
request: Request,
request_info: HttpConnectionProperties,
refcount: RefCount,
tx: tokio::sync::mpsc::Sender<SlabId>,
) -> SlabFuture<impl Future<Output = ()>> {
let index = slab_insert(request, request_info, refcount);
let rx = slab_get(index).promise();
SlabFuture(index, async move {
if tx.send(index).await.is_ok() {
http_trace!(index, "SlabFuture await");
// We only need to wait for completion if we aren't closed
rx.await;
http_trace!(index, "SlabFuture complete");
}
})
}

impl<F: Future<Output = ()>> SlabFuture<F> {}

#[pinned_drop]
impl<F: Future<Output = ()>> PinnedDrop for SlabFuture<F> {
fn drop(self: Pin<&mut Self>) {
slab_drop(self.0);
}
}

impl<F: Future<Output = ()>> Future for SlabFuture<F> {
type Output = Result<Response, hyper::Error>;

fn poll(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let index = self.0;
self
.project()
.1
.poll(cx)
.map(|_| Ok(slab_get(index).take_response()))
}
}

fn serve_http11_unconditional(
io: impl HttpServeStream,
svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static,
Expand Down
22 changes: 22 additions & 0 deletions ext/http/slab.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use http::HeaderMap;
use hyper1::body::Incoming;
use hyper1::upgrade::OnUpgrade;

use scopeguard::defer;
use slab::Slab;
use std::cell::RefCell;
use std::cell::RefMut;
Expand Down Expand Up @@ -52,6 +53,27 @@ impl Drop for HttpRequestBodyAutocloser {
}
}

pub async fn new_slab_future(
request: Request,
request_info: HttpConnectionProperties,
refcount: RefCount,
tx: tokio::sync::mpsc::Sender<SlabId>,
) -> Result<Response, hyper::Error> {
let index = slab_insert(request, request_info, refcount);
defer! {
slab_drop(index);
}
let rx = slab_get(index).promise();
if tx.send(index).await.is_ok() {
http_trace!(index, "SlabFuture await");
// We only need to wait for completion if we aren't closed
rx.await;
http_trace!(index, "SlabFuture complete");
}
let response = slab_get(index).take_response();
Ok(response)
}

pub struct HttpSlabRecord {
request_info: HttpConnectionProperties,
request_parts: Parts,
Expand Down

0 comments on commit fd031af

Please sign in to comment.