Skip to content

Commit

Permalink
fix(core): poll async ops eagerly (denoland#12385)
Browse files Browse the repository at this point in the history
Currently all async ops are polled lazily, which means that op
initialization code is postponed until control is yielded to the event
loop. This has some weird consequences, e.g.

```js
let listener = Deno.listen(...);
let conn_promise = listener.accept();
listener.close();
// `BadResource` is thrown. A reasonable error would be `Interrupted`.
let conn = await conn_promise;
```

JavaScript promises are expected to be eagerly evaluated. This patch
makes ops actually do that.
  • Loading branch information
piscisaureus committed Oct 17, 2021
1 parent ff95fc1 commit ff932b4
Show file tree
Hide file tree
Showing 9 changed files with 121 additions and 74 deletions.
47 changes: 15 additions & 32 deletions cli/tests/unit/http_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -892,15 +892,13 @@ unitTest(
respondPromise,
]);

httpConn.close();
listener.close();

assert(errors.length >= 1);
for (const error of errors) {
assertEquals(error.name, "Http");
assertEquals(
error.message,
"connection closed before message completed",
);
assert(error.message.includes("connection"));
}
},
);
Expand Down Expand Up @@ -975,44 +973,29 @@ unitTest(
unitTest(
{ permissions: { net: true } },
async function droppedConnSenderNoPanic() {
async function server(listener: Deno.Listener) {
async function server() {
const listener = Deno.listen({ port: 8000 });
const conn = await listener.accept();
const http = Deno.serveHttp(conn);

for (;;) {
const req = await http.nextRequest();
if (req == null) break;

nextloop()
.then(() => {
http.close();
return req.respondWith(new Response("boom"));
})
.catch(() => {});
}

const evt = await http.nextRequest();
http.close();
try {
http.close();
await evt!.respondWith(new Response("boom"));
} catch {
"nop";
// Ignore error.
}

listener.close();
}

async function client() {
const resp = await fetch("http:https://127.0.0.1:8000/");
await resp.body?.cancel();
}

function nextloop() {
return new Promise((resolve) => setTimeout(resolve, 0));
try {
const resp = await fetch("http:https://127.0.0.1:8000/");
await resp.body?.cancel();
} catch {
// Ignore error
}
}

async function main() {
const listener = Deno.listen({ port: 8000 });
await Promise.all([server(listener), client()]);
}
await main();
await Promise.all([server(), client()]);
},
);
37 changes: 16 additions & 21 deletions cli/tests/unit/net_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,10 @@ unitTest(
const listener = Deno.listen({ port: 4501 });
const p = listener.accept();
listener.close();
// TODO(piscisaureus): the error type should be `Interrupted` here, which
// gets thrown, but then ext/net catches it and rethrows `BadResource`.
await assertRejects(
async () => {
await p;
},
() => p,
Deno.errors.BadResource,
"Listener has been closed",
);
Expand All @@ -141,11 +141,8 @@ unitTest(
const p = listener.accept();
listener.close();
await assertRejects(
async () => {
await p;
},
Deno.errors.BadResource,
"Listener has been closed",
() => p,
Deno.errors.Interrupted,
);
},
);
Expand Down Expand Up @@ -173,27 +170,29 @@ unitTest(
},
);

// TODO(jsouto): Enable when tokio updates mio to v0.7!
unitTest(
{ ignore: true, permissions: { read: true, write: true } },
{
ignore: Deno.build.os === "windows",
permissions: { read: true, write: true },
},
async function netUnixConcurrentAccept() {
const filePath = await Deno.makeTempFile();
const listener = Deno.listen({ transport: "unix", path: filePath });
let acceptErrCount = 0;
const checkErr = (e: Error) => {
if (e.message === "Listener has been closed") {
if (e instanceof Deno.errors.Interrupted) { // "operation canceled"
assertEquals(acceptErrCount, 1);
} else if (e.message === "Another accept task is ongoing") {
} else if (e instanceof Deno.errors.Busy) { // "Listener already in use"
acceptErrCount++;
} else {
throw new Error("Unexpected error message");
throw e;
}
};
const p = listener.accept().catch(checkErr);
const p1 = listener.accept().catch(checkErr);
await Promise.race([p, p1]);
listener.close();
await [p, p1];
await Promise.all([p, p1]);
assertEquals(acceptErrCount, 1);
},
);
Expand Down Expand Up @@ -500,11 +499,7 @@ unitTest(
);

unitTest(
{
// FIXME(bartlomieju)
ignore: true,
permissions: { net: true },
},
{ permissions: { net: true } },
async function netListenAsyncIterator() {
const addr = { hostname: "127.0.0.1", port: 3500 };
const listener = Deno.listen(addr);
Expand Down Expand Up @@ -590,8 +585,8 @@ unitTest(
await conn.write(new Uint8Array([1, 2, 3]));
}
} catch (err) {
assert(!!err);
assert(err instanceof Deno.errors.BadResource);
assert(err);
assert(err instanceof Deno.errors.Interrupted);
}
}

Expand Down
1 change: 1 addition & 0 deletions core/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ pub use crate::normalize_path::normalize_path;
pub use crate::ops::serialize_op_result;
pub use crate::ops::Op;
pub use crate::ops::OpAsyncFuture;
pub use crate::ops::OpCall;
pub use crate::ops::OpFn;
pub use crate::ops::OpId;
pub use crate::ops::OpPayload;
Expand Down
3 changes: 2 additions & 1 deletion core/modules.rs
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,7 @@ impl ModuleMap {
#[cfg(test)]
mod tests {
use super::*;
use crate::ops::OpCall;
use crate::serialize_op_result;
use crate::JsRuntime;
use crate::Op;
Expand Down Expand Up @@ -1009,7 +1010,7 @@ mod tests {
let (control, _): (u8, ()) = payload.deserialize().unwrap();
assert_eq!(control, 42);
let resp = (0, 1, serialize_op_result(Ok(43), state));
Op::Async(Box::pin(futures::future::ready(resp)))
Op::Async(OpCall::ready(resp))
};

let mut runtime = JsRuntime::new(RuntimeOptions {
Expand Down
66 changes: 64 additions & 2 deletions core/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ use crate::gotham_state::GothamState;
use crate::ops_metrics::OpsTracker;
use crate::resources::ResourceTable;
use crate::runtime::GetErrorClassFn;
use futures::future::maybe_done;
use futures::future::FusedFuture;
use futures::future::MaybeDone;
use futures::ready;
use futures::task::noop_waker;
use futures::Future;
use indexmap::IndexMap;
use rusty_v8 as v8;
Expand All @@ -17,10 +22,67 @@ use std::ops::Deref;
use std::ops::DerefMut;
use std::pin::Pin;
use std::rc::Rc;
use std::task::Context;
use std::task::Poll;

/// Wrapper around a Future, which causes that Future to be polled immediately.
/// (Background: ops are stored in a `FuturesUnordered` structure which polls
/// them, but without the `OpCall` wrapper this doesn't happen until the next
/// turn of the event loop, which is too late for certain ops.)
pub struct OpCall<T>(MaybeDone<Pin<Box<dyn Future<Output = T>>>>);

impl<T> OpCall<T> {
/// Wraps a future, and polls the inner future immediately.
/// This should be the default choice for ops.
pub fn eager(fut: impl Future<Output = T> + 'static) -> Self {
let boxed = Box::pin(fut) as Pin<Box<dyn Future<Output = T>>>;
let mut inner = maybe_done(boxed);
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let mut pinned = Pin::new(&mut inner);
let _ = pinned.as_mut().poll(&mut cx);
Self(inner)
}

/// Wraps a future; the inner future is polled the usual way (lazily).
pub fn lazy(fut: impl Future<Output = T> + 'static) -> Self {
let boxed = Box::pin(fut) as Pin<Box<dyn Future<Output = T>>>;
let inner = maybe_done(boxed);
Self(inner)
}

/// Create a future by specifying its output. This is basically the same as
/// `async { value }` or `futures::future::ready(value)`.
pub fn ready(value: T) -> Self {
Self(MaybeDone::Done(value))
}
}

impl<T> Future for OpCall<T> {
type Output = T;

fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let inner = unsafe { &mut self.get_unchecked_mut().0 };
let mut pinned = Pin::new(inner);
ready!(pinned.as_mut().poll(cx));
Poll::Ready(pinned.as_mut().take_output().unwrap())
}
}

impl<F> FusedFuture for OpCall<F>
where
F: Future,
{
fn is_terminated(&self) -> bool {
self.0.is_terminated()
}
}

pub type PromiseId = u64;
pub type OpAsyncFuture =
Pin<Box<dyn Future<Output = (PromiseId, OpId, OpResult)>>>;
pub type OpAsyncFuture = OpCall<(PromiseId, OpId, OpResult)>;
pub type OpFn = dyn Fn(Rc<RefCell<OpState>>, OpPayload) -> Op + 'static;
pub type OpId = usize;

Expand Down
7 changes: 4 additions & 3 deletions core/ops_json.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.

use crate::error::AnyError;
use crate::ops::OpCall;
use crate::serialize_op_result;
use crate::Op;
use crate::OpFn;
Expand Down Expand Up @@ -35,7 +36,7 @@ pub fn void_op_async() -> Box<OpFn> {
let op_id = payload.op_id;
let pid = payload.promise_id;
let op_result = serialize_op_result(Ok(()), state);
Op::Async(Box::pin(futures::future::ready((pid, op_id, op_result))))
Op::Async(OpCall::ready((pid, op_id, op_result)))
})
}

Expand Down Expand Up @@ -127,7 +128,7 @@ where
use crate::futures::FutureExt;
let fut = op_fn(state.clone(), a, b)
.map(move |result| (pid, op_id, serialize_op_result(result, state)));
Op::Async(Box::pin(fut))
Op::Async(OpCall::eager(fut))
})
}

Expand Down Expand Up @@ -159,7 +160,7 @@ where
use crate::futures::FutureExt;
let fut = op_fn(state.clone(), a, b)
.map(move |result| (pid, op_id, serialize_op_result(result, state)));
Op::AsyncUnref(Box::pin(fut))
Op::AsyncUnref(OpCall::eager(fut))
})
}

Expand Down
13 changes: 5 additions & 8 deletions core/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,21 @@ use futures::future::FutureExt;
use futures::stream::FuturesUnordered;
use futures::stream::StreamExt;
use futures::task::AtomicWaker;
use futures::Future;
use std::any::Any;
use std::cell::RefCell;
use std::collections::HashMap;
use std::convert::TryFrom;
use std::ffi::c_void;
use std::mem::forget;
use std::option::Option;
use std::pin::Pin;
use std::rc::Rc;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::Once;
use std::task::Context;
use std::task::Poll;

type PendingOpFuture =
Pin<Box<dyn Future<Output = (PromiseId, OpId, OpResult)>>>;
type PendingOpFuture = OpCall<(PromiseId, OpId, OpResult)>;

pub enum Snapshot {
Static(&'static [u8]),
Expand Down Expand Up @@ -1613,6 +1610,7 @@ pub mod tests {
use crate::ZeroCopyBuf;
use futures::future::lazy;
use std::ops::FnOnce;
use std::pin::Pin;
use std::rc::Rc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
Expand Down Expand Up @@ -1645,16 +1643,15 @@ pub mod tests {
Mode::Async => {
assert_eq!(control, 42);
let resp = (0, 1, serialize_op_result(Ok(43), rc_op_state));
Op::Async(Box::pin(futures::future::ready(resp)))
Op::Async(OpCall::ready(resp))
}
Mode::AsyncZeroCopy(has_buffer) => {
assert_eq!(buf.is_some(), has_buffer);
if let Some(buf) = buf {
assert_eq!(buf.len(), 1);
}

let resp = serialize_op_result(Ok(43), rc_op_state);
Op::Async(Box::pin(futures::future::ready((0, 1, resp))))
let resp = (0, 1, serialize_op_result(Ok(43), rc_op_state));
Op::Async(OpCall::ready(resp))
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions ext/net/01_net.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

((window) => {
const core = window.Deno.core;
const { BadResource } = core;
const { BadResource, Interrupted } = core;
const {
PromiseResolve,
SymbolAsyncIterator,
Expand Down Expand Up @@ -124,7 +124,7 @@
try {
conn = await this.accept();
} catch (error) {
if (error instanceof BadResource) {
if (error instanceof BadResource || error instanceof Interrupted) {
return { value: undefined, done: true };
}
throw error;
Expand Down Expand Up @@ -191,7 +191,7 @@
try {
yield await this.receive();
} catch (err) {
if (err instanceof BadResource) {
if (err instanceof BadResource || err instanceof Interrupted) {
break;
}
throw err;
Expand Down
Loading

0 comments on commit ff932b4

Please sign in to comment.