Skip to content

Commit

Permalink
feat: add AsyncUnref ops (denoland#3721)
Browse files Browse the repository at this point in the history
This is in order to support features like signal handlers, which
shouldn't prevent the program from exiting.
  • Loading branch information
kt3k authored and ry committed Jan 21, 2020
1 parent ecd1d3a commit 9de8178
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 1 deletion.
10 changes: 10 additions & 0 deletions cli/ops/dispatch_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ pub type AsyncJsonOp =
pub enum JsonOp {
Sync(Value),
Async(AsyncJsonOp),
/// AsyncUnref is the variation of Async, which doesn't block the program
/// exiting.
AsyncUnref(AsyncJsonOp),
}

fn json_err(err: ErrBox) -> Value {
Expand Down Expand Up @@ -77,6 +80,13 @@ where
});
CoreOp::Async(fut2.boxed())
}
Ok(JsonOp::AsyncUnref(fut)) => {
assert!(promise_id.is_some());
let fut2 = fut.then(move |result| {
futures::future::ok(serialize_result(promise_id, result))
});
CoreOp::AsyncUnref(fut2.boxed())
}
Err(sync_err) => {
let buf = serialize_result(promise_id, Err(sync_err));
if is_sync {
Expand Down
8 changes: 8 additions & 0 deletions cli/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,14 @@ impl ThreadSafeState {
});
Op::Async(result_fut.boxed())
}
Op::AsyncUnref(fut) => {
let state = state.clone();
let result_fut = fut.map_ok(move |buf: Buf| {
state.metrics_op_completed(buf.len());
buf
});
Op::AsyncUnref(result_fut.boxed())
}
}
}
}
Expand Down
50 changes: 49 additions & 1 deletion core/isolate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::shared_queue::SharedQueue;
use crate::shared_queue::RECOMMENDED_SIZE;
use futures::future::FutureExt;
use futures::future::TryFutureExt;
use futures::stream::select;
use futures::stream::FuturesUnordered;
use futures::stream::StreamExt;
use futures::task::AtomicWaker;
Expand Down Expand Up @@ -178,6 +179,7 @@ pub struct Isolate {
needs_init: bool,
pub(crate) shared: SharedQueue,
pending_ops: FuturesUnordered<PendingOpFuture>,
pending_unref_ops: FuturesUnordered<PendingOpFuture>,
have_unpolled_ops: bool,
startup_script: Option<OwnedScript>,
pub op_registry: Arc<OpRegistry>,
Expand Down Expand Up @@ -340,6 +342,7 @@ impl Isolate {
shared,
needs_init,
pending_ops: FuturesUnordered::new(),
pending_unref_ops: FuturesUnordered::new(),
have_unpolled_ops: false,
startup_script,
op_registry: Arc::new(OpRegistry::new()),
Expand Down Expand Up @@ -519,6 +522,12 @@ impl Isolate {
self.have_unpolled_ops = true;
None
}
Op::AsyncUnref(fut) => {
let fut2 = fut.map_ok(move |buf| (op_id, buf));
self.pending_unref_ops.push(fut2.boxed());
self.have_unpolled_ops = true;
None
}
}
}

Expand Down Expand Up @@ -713,7 +722,9 @@ impl Future for Isolate {
// Now handle actual ops.
inner.have_unpolled_ops = false;
#[allow(clippy::match_wild_err_arm)]
match inner.pending_ops.poll_next_unpin(cx) {
match select(&mut inner.pending_ops, &mut inner.pending_unref_ops)
.poll_next_unpin(cx)
{
Poll::Ready(Some(Err(_))) => panic!("unexpected op error"),
Poll::Ready(None) => break,
Poll::Pending => break,
Expand Down Expand Up @@ -816,6 +827,7 @@ pub mod tests {

pub enum Mode {
Async,
AsyncUnref,
OverflowReqSync,
OverflowResSync,
OverflowReqAsync,
Expand All @@ -838,6 +850,17 @@ pub mod tests {
let buf = vec![43u8, 0, 0, 0].into_boxed_slice();
Op::Async(futures::future::ok(buf).boxed())
}
Mode::AsyncUnref => {
assert_eq!(control.len(), 1);
assert_eq!(control[0], 42);
let fut = async {
// This future never finish.
futures::future::pending::<()>().await;
let buf = vec![43u8, 0, 0, 0].into_boxed_slice();
Ok(buf)
};
Op::AsyncUnref(fut.boxed())
}
Mode::OverflowReqSync => {
assert_eq!(control.len(), 100 * 1024 * 1024);
let buf = vec![43u8, 0, 0, 0].into_boxed_slice();
Expand Down Expand Up @@ -955,6 +978,31 @@ pub mod tests {
});
}

#[test]
fn test_poll_async_optional_ops() {
run_in_task(|cx| {
let (mut isolate, dispatch_count) = setup(Mode::AsyncUnref);
js_check(isolate.execute(
"check1.js",
r#"
Deno.core.setAsyncHandler(1, (buf) => {
// This handler will never be called
assert(false);
});
let control = new Uint8Array([42]);
Deno.core.send(1, control);
"#,
));
assert_eq!(dispatch_count.load(Ordering::Relaxed), 1);
// The above op never finish, but isolate can finish
// because the op is an unreffed async op.
assert!(match isolate.poll_unpin(cx) {
Poll::Ready(Ok(_)) => true,
_ => false,
});
})
}

#[test]
fn terminate_execution() {
let (tx, rx) = std::sync::mpsc::channel::<bool>();
Expand Down
3 changes: 3 additions & 0 deletions core/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ pub type OpResult<E> = Result<Op<E>, E>;
pub enum Op<E> {
Sync(Buf),
Async(OpAsyncFuture<E>),
/// AsyncUnref is the variation of Async, which doesn't block the program
/// exiting.
AsyncUnref(OpAsyncFuture<E>),
}

pub type CoreError = ();
Expand Down

0 comments on commit 9de8178

Please sign in to comment.