Skip to content

Commit

Permalink
core: avoid async op future reboxing to bundle PromiseId (denoland#10123
Browse files Browse the repository at this point in the history
)
  • Loading branch information
AaronO committed Apr 11, 2021
1 parent 8aa0d5f commit 29eca72
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 16 deletions.
9 changes: 3 additions & 6 deletions core/bindings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use crate::OpResponse;
use crate::OpTable;
use crate::PromiseId;
use crate::ZeroCopyBuf;
use futures::future::FutureExt;
use rusty_v8 as v8;
use serde::Serialize;
use serde_v8::to_v8;
Expand Down Expand Up @@ -433,7 +432,7 @@ fn send<'s>(
}
};

let payload = OpPayload::new(scope, v);
let payload = OpPayload::new(scope, v, promise_id);
let op = OpTable::route_op(op_id, state.op_state.clone(), payload, buf);
match op {
Op::Sync(resp) => match resp {
Expand All @@ -445,13 +444,11 @@ fn send<'s>(
}
},
Op::Async(fut) => {
let fut2 = fut.map(move |resp| (promise_id, resp));
state.pending_ops.push(fut2.boxed_local());
state.pending_ops.push(fut);
state.have_unpolled_ops = true;
}
Op::AsyncUnref(fut) => {
let fut2 = fut.map(move |resp| (promise_id, resp));
state.pending_unref_ops.push(fut2.boxed_local());
state.pending_unref_ops.push(fut);
state.have_unpolled_ops = true;
}
Op::NotFound => {
Expand Down
6 changes: 5 additions & 1 deletion core/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,31 +21,35 @@ use std::pin::Pin;
use std::rc::Rc;

pub type PromiseId = u64;
pub type OpAsyncFuture = Pin<Box<dyn Future<Output = OpResponse>>>;
pub type OpAsyncFuture = Pin<Box<dyn Future<Output = (PromiseId, OpResponse)>>>;
pub type OpFn =
dyn Fn(Rc<RefCell<OpState>>, OpPayload, Option<ZeroCopyBuf>) -> Op + 'static;
pub type OpId = usize;

pub struct OpPayload<'a, 'b, 'c> {
pub(crate) scope: Option<&'a mut v8::HandleScope<'b>>,
pub(crate) value: Option<v8::Local<'c, v8::Value>>,
pub(crate) promise_id: PromiseId,
}

impl<'a, 'b, 'c> OpPayload<'a, 'b, 'c> {
pub fn new(
scope: &'a mut v8::HandleScope<'b>,
value: v8::Local<'c, v8::Value>,
promise_id: PromiseId,
) -> Self {
Self {
scope: Some(scope),
value: Some(value),
promise_id,
}
}

pub fn empty() -> Self {
Self {
scope: None,
value: None,
promise_id: 0,
}
}

Expand Down
6 changes: 3 additions & 3 deletions core/ops_bin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,11 @@ where
p: OpPayload,
b: Option<ZeroCopyBuf>|
-> Op {
let pid = p.promise_id;
let min_arg: u32 = p.deserialize().unwrap();
let fut = op_fn(state.clone(), min_arg, b)
.map(move |result| serialize_bin_result(result, state));
let temp = Box::pin(fut);
Op::Async(temp)
.map(move |result| (pid, serialize_bin_result(result, state)));
Op::Async(Box::pin(fut))
},
)
}
3 changes: 2 additions & 1 deletion core/ops_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,13 @@ where
p: OpPayload,
buf: Option<ZeroCopyBuf>|
-> Result<Op, AnyError> {
let pid = p.promise_id;
// Parse args
let args = p.deserialize()?;

use crate::futures::FutureExt;
let fut = op_fn(state.clone(), args, buf)
.map(move |result| serialize_op_result(result, state));
.map(move |result| (pid, serialize_op_result(result, state)));
Ok(Op::Async(Box::pin(fut)))
};

Expand Down
8 changes: 4 additions & 4 deletions core/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1506,7 +1506,7 @@ pub mod tests {
Mode::Async => {
let control: u8 = payload.deserialize().unwrap();
assert_eq!(control, 42);
let resp = OpResponse::Value(Box::new(43));
let resp = (0, OpResponse::Value(Box::new(43)));
Op::Async(Box::pin(futures::future::ready(resp)))
}
Mode::AsyncUnref => {
Expand All @@ -1515,7 +1515,7 @@ pub mod tests {
let fut = async {
// This future never finish.
futures::future::pending::<()>().await;
OpResponse::Value(Box::new(43))
(0, OpResponse::Value(Box::new(43)))
};
Op::AsyncUnref(Box::pin(fut))
}
Expand All @@ -1526,7 +1526,7 @@ pub mod tests {
}

let resp = OpResponse::Value(Box::new(43));
Op::Async(Box::pin(futures::future::ready(resp)))
Op::Async(Box::pin(futures::future::ready((0, resp))))
}
}
}
Expand Down Expand Up @@ -1970,7 +1970,7 @@ pub mod tests {
dispatch_count_.fetch_add(1, Ordering::Relaxed);
let control: u8 = payload.deserialize().unwrap();
assert_eq!(control, 42);
let resp = OpResponse::Value(Box::new(43));
let resp = (0, OpResponse::Value(Box::new(43)));
Op::Async(Box::pin(futures::future::ready(resp)))
};

Expand Down
2 changes: 1 addition & 1 deletion test_plugin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ fn op_test_async(
assert!(rx.await.is_ok());
let result = b"test";
let result_box: Box<[u8]> = Box::new(*result);
OpResponse::Buffer(result_box)
(0, OpResponse::Buffer(result_box))
};

Op::Async(fut.boxed())
Expand Down

0 comments on commit 29eca72

Please sign in to comment.