Skip to content

Commit

Permalink
Fix Op definitions (denoland#4814)
Browse files Browse the repository at this point in the history
  • Loading branch information
ry authored Apr 19, 2020
1 parent e2fd729 commit 4d2b9cd
Show file tree
Hide file tree
Showing 11 changed files with 98 additions and 102 deletions.
20 changes: 10 additions & 10 deletions cli/ops/dispatch_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ struct AsyncArgs {
promise_id: Option<u64>,
}

pub fn json_op<D>(d: D) -> impl Fn(&[u8], Option<ZeroCopyBuf>) -> CoreOp
pub fn json_op<D>(d: D) -> impl Fn(&[u8], Option<ZeroCopyBuf>) -> Op
where
D: Fn(Value, Option<ZeroCopyBuf>) -> Result<JsonOp, OpError>,
{
Expand All @@ -50,7 +50,7 @@ where
Ok(args) => args,
Err(e) => {
let buf = serialize_result(None, Err(OpError::from(e)));
return CoreOp::Sync(buf);
return Op::Sync(buf);
}
};
let promise_id = async_args.promise_id;
Expand All @@ -60,32 +60,32 @@ where
.map_err(OpError::from)
.and_then(|args| d(args, zero_copy));

// Convert to CoreOp
// Convert to Op
match result {
Ok(JsonOp::Sync(sync_value)) => {
assert!(promise_id.is_none());
CoreOp::Sync(serialize_result(promise_id, Ok(sync_value)))
Op::Sync(serialize_result(promise_id, Ok(sync_value)))
}
Ok(JsonOp::Async(fut)) => {
assert!(promise_id.is_some());
let fut2 = fut.then(move |result| {
futures::future::ok(serialize_result(promise_id, result))
futures::future::ready(serialize_result(promise_id, result))
});
CoreOp::Async(fut2.boxed_local())
Op::Async(fut2.boxed_local())
}
Ok(JsonOp::AsyncUnref(fut)) => {
assert!(promise_id.is_some());
let fut2 = fut.then(move |result| {
futures::future::ok(serialize_result(promise_id, result))
futures::future::ready(serialize_result(promise_id, result))
});
CoreOp::AsyncUnref(fut2.boxed_local())
Op::AsyncUnref(fut2.boxed_local())
}
Err(sync_err) => {
let buf = serialize_result(promise_id, Err(sync_err));
if is_sync {
CoreOp::Sync(buf)
Op::Sync(buf)
} else {
CoreOp::Async(futures::future::ok(buf).boxed_local())
Op::Async(futures::future::ready(buf).boxed_local())
}
}
}
Expand Down
12 changes: 5 additions & 7 deletions cli/ops/dispatch_minimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
use crate::op_error::OpError;
use byteorder::{LittleEndian, WriteBytesExt};
use deno_core::Buf;
use deno_core::CoreOp;
use deno_core::Op;
use deno_core::ZeroCopyBuf;
use futures::future::FutureExt;
Expand Down Expand Up @@ -114,7 +113,7 @@ fn test_parse_min_record() {
assert_eq!(parse_min_record(&buf), None);
}

pub fn minimal_op<D>(d: D) -> impl Fn(&[u8], Option<ZeroCopyBuf>) -> CoreOp
pub fn minimal_op<D>(d: D) -> impl Fn(&[u8], Option<ZeroCopyBuf>) -> Op
where
D: Fn(bool, i32, Option<ZeroCopyBuf>) -> MinimalOp,
{
Expand Down Expand Up @@ -153,12 +152,11 @@ where
}
}),
MinimalOp::Async(min_fut) => {
// Convert to CoreOp
let core_fut = async move {
let fut = async move {
match min_fut.await {
Ok(r) => {
record.result = r;
Ok(record.into())
record.into()
}
Err(err) => {
let error_record = ErrorRecord {
Expand All @@ -167,11 +165,11 @@ where
error_code: err.kind as i32,
error_message: err.msg.as_bytes().to_owned(),
};
Ok(error_record.into())
error_record.into()
}
}
};
Op::Async(core_fut.boxed_local())
Op::Async(fut.boxed_local())
}
}
}
Expand Down
14 changes: 6 additions & 8 deletions cli/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,13 @@ use crate::ops::MinimalOp;
use crate::permissions::DenoPermissions;
use crate::web_worker::WebWorkerHandle;
use deno_core::Buf;
use deno_core::CoreOp;
use deno_core::ErrBox;
use deno_core::ModuleLoader;
use deno_core::ModuleSpecifier;
use deno_core::Op;
use deno_core::ResourceTable;
use deno_core::ZeroCopyBuf;
use futures::future::FutureExt;
use futures::future::TryFutureExt;
use rand::rngs::StdRng;
use rand::SeedableRng;
use serde_json::Value;
Expand Down Expand Up @@ -75,7 +73,7 @@ impl State {
pub fn stateful_json_op<D>(
&self,
dispatcher: D,
) -> impl Fn(&[u8], Option<ZeroCopyBuf>) -> CoreOp
) -> impl Fn(&[u8], Option<ZeroCopyBuf>) -> Op
where
D: Fn(&State, Value, Option<ZeroCopyBuf>) -> Result<JsonOp, OpError>,
{
Expand All @@ -87,13 +85,13 @@ impl State {
pub fn core_op<D>(
&self,
dispatcher: D,
) -> impl Fn(&[u8], Option<ZeroCopyBuf>) -> CoreOp
) -> impl Fn(&[u8], Option<ZeroCopyBuf>) -> Op
where
D: Fn(&[u8], Option<ZeroCopyBuf>) -> CoreOp,
D: Fn(&[u8], Option<ZeroCopyBuf>) -> Op,
{
let state = self.clone();

move |control: &[u8], zero_copy: Option<ZeroCopyBuf>| -> CoreOp {
move |control: &[u8], zero_copy: Option<ZeroCopyBuf>| -> Op {
let bytes_sent_control = control.len() as u64;
let bytes_sent_zero_copy =
zero_copy.as_ref().map(|b| b.len()).unwrap_or(0) as u64;
Expand All @@ -116,7 +114,7 @@ impl State {
.metrics
.op_dispatched_async(bytes_sent_control, bytes_sent_zero_copy);
let state = state.clone();
let result_fut = fut.map_ok(move |buf: Buf| {
let result_fut = fut.map(move |buf: Buf| {
let mut state_ = state.borrow_mut();
state_.metrics.op_completed_async(buf.len() as u64);
buf
Expand All @@ -130,7 +128,7 @@ impl State {
bytes_sent_zero_copy,
);
let state = state.clone();
let result_fut = fut.map_ok(move |buf: Buf| {
let result_fut = fut.map(move |buf: Buf| {
let mut state_ = state.borrow_mut();
state_.metrics.op_completed_async_unref(buf.len() as u64);
buf
Expand Down
4 changes: 2 additions & 2 deletions core/es_isolate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -581,12 +581,12 @@ pub mod tests {
let mut isolate = EsIsolate::new(loader, StartupData::None, false);

let dispatcher =
move |control: &[u8], _zero_copy: Option<ZeroCopyBuf>| -> CoreOp {
move |control: &[u8], _zero_copy: Option<ZeroCopyBuf>| -> Op {
dispatch_count_.fetch_add(1, Ordering::Relaxed);
assert_eq!(control.len(), 1);
assert_eq!(control[0], 42);
let buf = vec![43u8, 0, 0, 0].into_boxed_slice();
Op::Async(futures::future::ok(buf).boxed())
Op::Async(futures::future::ready(buf).boxed())
};

isolate.register_op("test", dispatcher);
Expand Down
69 changes: 42 additions & 27 deletions core/examples/http_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,38 @@ impl Isolate {
state: Default::default(),
};

isolate.register_op("listen", op_listen);
isolate.register_sync_op("listen", op_listen);
isolate.register_op("accept", op_accept);
isolate.register_op("read", op_read);
isolate.register_op("write", op_write);
isolate.register_op("close", op_close);
isolate.register_sync_op("close", op_close);

isolate
}

fn register_sync_op<F>(&mut self, name: &'static str, handler: F)
where
F: 'static + Fn(State, u32, Option<ZeroCopyBuf>) -> Result<u32, Error>,
{
let state = self.state.clone();
let core_handler =
move |control_buf: &[u8], zero_copy_buf: Option<ZeroCopyBuf>| -> Op {
let state = state.clone();
let record = Record::from(control_buf);
let is_sync = record.promise_id == 0;
assert!(is_sync);

let result: i32 = match handler(state, record.rid, zero_copy_buf) {
Ok(r) => r as i32,
Err(_) => -1,
};
let buf = RecordBuf::from(Record { result, ..record })[..].into();
Op::Sync(buf)
};

self.core_isolate.register_op(name, core_handler);
}

fn register_op<F>(
&mut self,
name: &'static str,
Expand All @@ -117,25 +140,22 @@ impl Isolate {
{
let state = self.state.clone();
let core_handler =
move |control_buf: &[u8], zero_copy_buf: Option<ZeroCopyBuf>| -> CoreOp {
move |control_buf: &[u8], zero_copy_buf: Option<ZeroCopyBuf>| -> Op {
let state = state.clone();
let record = Record::from(control_buf);
let is_sync = record.promise_id == 0;
assert!(!is_sync);

let fut = async move {
let op = handler(state, record.rid, zero_copy_buf);
let result = op
.map_ok(|r| r.try_into().expect("op result does not fit in i32"))
.unwrap_or_else(|_| -1)
.await;
Ok(RecordBuf::from(Record { result, ..record })[..].into())
RecordBuf::from(Record { result, ..record })[..].into()
};

if is_sync {
Op::Sync(futures::executor::block_on(fut).unwrap())
} else {
Op::Async(fut.boxed_local())
}
Op::Async(fut.boxed_local())
};

self.core_isolate.register_op(name, core_handler);
Expand All @@ -154,32 +174,27 @@ fn op_close(
state: State,
rid: u32,
_buf: Option<ZeroCopyBuf>,
) -> impl TryFuture<Ok = u32, Error = Error> {
) -> Result<u32, Error> {
debug!("close rid={}", rid);

async move {
let resource_table = &mut state.borrow_mut().resource_table;
resource_table
.close(rid)
.map(|_| 0)
.ok_or_else(bad_resource)
}
let resource_table = &mut state.borrow_mut().resource_table;
resource_table
.close(rid)
.map(|_| 0)
.ok_or_else(bad_resource)
}

fn op_listen(
state: State,
_rid: u32,
_buf: Option<ZeroCopyBuf>,
) -> impl TryFuture<Ok = u32, Error = Error> {
) -> Result<u32, Error> {
debug!("listen");

async move {
let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap();
let listener = tokio::net::TcpListener::bind(&addr).await?;
let resource_table = &mut state.borrow_mut().resource_table;
let rid = resource_table.add("tcpListener", Box::new(listener));
Ok(rid)
}
let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap();
let std_listener = std::net::TcpListener::bind(&addr)?;
let listener = TcpListener::from_std(std_listener)?;
let resource_table = &mut state.borrow_mut().resource_table;
let rid = resource_table.add("tcpListener", Box::new(listener));
Ok(rid)
}

fn op_accept(
Expand Down
23 changes: 11 additions & 12 deletions core/isolate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use crate::ops::*;
use crate::shared_queue::SharedQueue;
use crate::shared_queue::RECOMMENDED_SIZE;
use futures::future::FutureExt;
use futures::future::TryFutureExt;
use futures::stream::select;
use futures::stream::FuturesUnordered;
use futures::stream::StreamExt;
Expand All @@ -34,6 +33,8 @@ use std::sync::{Arc, Mutex, Once};
use std::task::Context;
use std::task::Poll;

type PendingOpFuture = Pin<Box<dyn Future<Output = (OpId, Buf)>>>;

/// A ZeroCopyBuf encapsulates a slice that's been borrowed from a JavaScript
/// ArrayBuffer object. JavaScript objects can normally be garbage collected,
/// but the existence of a ZeroCopyBuf inhibits this until it is dropped. It
Expand Down Expand Up @@ -344,7 +345,7 @@ impl Isolate {
/// Requires runtime to explicitly ask for op ids before using any of the ops.
pub fn register_op<F>(&self, name: &str, op: F) -> OpId
where
F: Fn(&[u8], Option<ZeroCopyBuf>) -> CoreOp + 'static,
F: Fn(&[u8], Option<ZeroCopyBuf>) -> Op + 'static,
{
self.op_registry.register(name, op)
}
Expand Down Expand Up @@ -402,13 +403,13 @@ impl Isolate {
Some((op_id, buf))
}
Op::Async(fut) => {
let fut2 = fut.map_ok(move |buf| (op_id, buf));
let fut2 = fut.map(move |buf| (op_id, buf));
self.pending_ops.push(fut2.boxed_local());
self.have_unpolled_ops = true;
None
}
Op::AsyncUnref(fut) => {
let fut2 = fut.map_ok(move |buf| (op_id, buf));
let fut2 = fut.map(move |buf| (op_id, buf));
self.pending_unref_ops.push(fut2.boxed_local());
self.have_unpolled_ops = true;
None
Expand Down Expand Up @@ -528,10 +529,9 @@ impl Future for Isolate {
match select(&mut inner.pending_ops, &mut inner.pending_unref_ops)
.poll_next_unpin(cx)
{
Poll::Ready(Some(Err(_))) => panic!("unexpected op error"),
Poll::Ready(None) => break,
Poll::Pending => break,
Poll::Ready(Some(Ok((op_id, buf)))) => {
Poll::Ready(Some((op_id, buf))) => {
let successful_push = inner.shared.push(op_id, &buf);
if !successful_push {
// If we couldn't push the response to the shared queue, because
Expand Down Expand Up @@ -769,23 +769,22 @@ pub mod tests {
let mut isolate = Isolate::new(StartupData::None, false);

let dispatcher =
move |control: &[u8], _zero_copy: Option<ZeroCopyBuf>| -> CoreOp {
move |control: &[u8], _zero_copy: Option<ZeroCopyBuf>| -> Op {
dispatch_count_.fetch_add(1, Ordering::Relaxed);
match mode {
Mode::Async => {
assert_eq!(control.len(), 1);
assert_eq!(control[0], 42);
let buf = vec![43u8].into_boxed_slice();
Op::Async(futures::future::ok(buf).boxed())
Op::Async(futures::future::ready(buf).boxed())
}
Mode::AsyncUnref => {
assert_eq!(control.len(), 1);
assert_eq!(control[0], 42);
let fut = async {
// This future never finish.
futures::future::pending::<()>().await;
let buf = vec![43u8].into_boxed_slice();
Ok(buf)
vec![43u8].into_boxed_slice()
};
Op::AsyncUnref(fut.boxed())
}
Expand All @@ -806,7 +805,7 @@ pub mod tests {
Mode::OverflowReqAsync => {
assert_eq!(control.len(), 100 * 1024 * 1024);
let buf = vec![43u8].into_boxed_slice();
Op::Async(futures::future::ok(buf).boxed())
Op::Async(futures::future::ready(buf).boxed())
}
Mode::OverflowResAsync => {
assert_eq!(control.len(), 1);
Expand All @@ -815,7 +814,7 @@ pub mod tests {
vec.resize(100 * 1024 * 1024, 0);
vec[0] = 4;
let buf = vec.into_boxed_slice();
Op::Async(futures::future::ok(buf).boxed())
Op::Async(futures::future::ready(buf).boxed())
}
}
};
Expand Down
Loading

0 comments on commit 4d2b9cd

Please sign in to comment.