Skip to content

Commit

Permalink
refactor(core): Allow multiple overflown responses in single poll (de…
Browse files Browse the repository at this point in the history
…noland#9433)

This commit rewrites "JsRuntime::poll" function to fix a corner case that
might caused "overflown_response" to be overwritten by other overflown response.

The logic has been changed to allow returning multiple overflown response
alongside responses from shared queue.
  • Loading branch information
inteon committed Feb 23, 2021
1 parent 2e24af2 commit dccf5e0
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 48 deletions.
9 changes: 9 additions & 0 deletions cli/bench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,15 @@ const EXEC_TIME_BENCHMARKS: &[(&str, &[&str], Option<i32>)] = &[
],
None,
),
(
"workers_large_message",
&[
"run",
"--allow-read",
"cli/tests/workers_large_message_bench.ts",
],
None,
),
(
"text_decoder",
&["run", "cli/tests/text_decoder_perf.js"],
Expand Down
14 changes: 14 additions & 0 deletions cli/tests/workers/large_message_worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright 2020 the Deno authors. All rights reserved. MIT license.

const dataSmall = "";
const dataLarge = "x".repeat(10 * 1024);

onmessage = function (e) {
for (let i = 0; i <= 10; i++) {
if (i % 2 == 0) {
postMessage(dataLarge);
} else {
postMessage(dataSmall);
}
}
};
35 changes: 35 additions & 0 deletions cli/tests/workers_large_message_bench.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2020 the Deno authors. All rights reserved. MIT license.

// deno-lint-ignore-file

import { deferred } from "../../test_util/std/async/deferred.ts";

function oneWorker(i: any): Promise<void> {
return new Promise<void>((resolve) => {
let countDown = 10;
const worker = new Worker(
new URL("workers/large_message_worker.js", import.meta.url).href,
{ type: "module" },
);
worker.onmessage = (e): void => {
if (countDown > 0) {
countDown--;
return;
}
worker.terminate();
resolve();
};
worker.postMessage("hi " + i);
});
}

function bench(): Promise<any> {
let promises = [];
for (let i = 0; i < 50; i++) {
promises.push(oneWorker(i));
}

return Promise.all(promises);
}

bench();
6 changes: 3 additions & 3 deletions core/bindings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ fn send<'s>(
mut rv: v8::ReturnValue,
) {
let state_rc = JsRuntime::state(scope);
let state = state_rc.borrow_mut();
let mut state = state_rc.borrow_mut();

let op_id = match v8::Local::<v8::Integer>::try_from(args.get(0))
.map_err(AnyError::from)
Expand Down Expand Up @@ -412,12 +412,12 @@ fn send<'s>(
Op::Async(fut) => {
let fut2 = fut.map(move |buf| (op_id, buf));
state.pending_ops.push(fut2.boxed_local());
state.have_unpolled_ops.set(true);
state.have_unpolled_ops = true;
}
Op::AsyncUnref(fut) => {
let fut2 = fut.map(move |buf| (op_id, buf));
state.pending_unref_ops.push(fut2.boxed_local());
state.have_unpolled_ops.set(true);
state.have_unpolled_ops = true;
}
Op::NotFound => {
let msg = format!("Unknown op id: {}", op_id);
Expand Down
11 changes: 5 additions & 6 deletions core/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,7 @@ SharedQueue Binary Layout
asyncHandlers[opId] = cb;
}

function handleAsyncMsgFromRust(opId, buf) {
if (buf) {
// This is the overflow_response case of deno::JsRuntime::poll().
asyncHandlers[opId](buf);
return;
}
function handleAsyncMsgFromRust() {
while (true) {
const opIdBuf = shift();
if (opIdBuf == null) {
Expand All @@ -169,6 +164,10 @@ SharedQueue Binary Layout
assert(asyncHandlers[opIdBuf[0]] != null);
asyncHandlers[opIdBuf[0]](opIdBuf[1]);
}

for (let i = 0; i < arguments.length; i += 2) {
asyncHandlers[arguments[i]](arguments[i + 1]);
}
}

function dispatch(opName, control, ...zeroCopy) {
Expand Down
129 changes: 90 additions & 39 deletions core/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use futures::stream::StreamFuture;
use futures::task::AtomicWaker;
use futures::Future;
use std::any::Any;
use std::cell::Cell;
use std::cell::RefCell;
use std::collections::HashMap;
use std::convert::TryFrom;
Expand Down Expand Up @@ -110,8 +109,7 @@ pub(crate) struct JsRuntimeState {
pub(crate) shared: SharedQueue,
pub(crate) pending_ops: FuturesUnordered<PendingOpFuture>,
pub(crate) pending_unref_ops: FuturesUnordered<PendingOpFuture>,
pub(crate) have_unpolled_ops: Cell<bool>,
//pub(crate) op_table: OpTable,
pub(crate) have_unpolled_ops: bool,
pub(crate) op_state: Rc<RefCell<OpState>>,
pub loader: Rc<dyn ModuleLoader>,
pub modules: Modules,
Expand Down Expand Up @@ -287,7 +285,7 @@ impl JsRuntime {
pending_ops: FuturesUnordered::new(),
pending_unref_ops: FuturesUnordered::new(),
op_state: Rc::new(RefCell::new(op_state)),
have_unpolled_ops: Cell::new(false),
have_unpolled_ops: false,
modules: Modules::new(),
loader,
dyn_import_map: HashMap::new(),
Expand Down Expand Up @@ -562,7 +560,7 @@ impl JsRuntime {

// Check if more async ops have been dispatched
// during this turn of event loop.
if state.have_unpolled_ops.get() {
if state.have_unpolled_ops {
state.waker.wake();
}

Expand Down Expand Up @@ -1346,50 +1344,38 @@ impl JsRuntime {
self.mod_instantiate(root_id, None).map(|_| root_id)
}

fn poll_pending_ops(
&mut self,
cx: &mut Context,
) -> Option<(OpId, Box<[u8]>)> {
fn poll_pending_ops(&mut self, cx: &mut Context) -> Vec<(OpId, Box<[u8]>)> {
let state_rc = Self::state(self.v8_isolate());
let mut overflow_response: Option<(OpId, Box<[u8]>)> = None;
let mut overflow_response: Vec<(OpId, Box<[u8]>)> = Vec::new();

loop {
let mut state = state_rc.borrow_mut();
// Now handle actual ops.
state.have_unpolled_ops.set(false);
let mut state = state_rc.borrow_mut();

// Now handle actual ops.
state.have_unpolled_ops = false;

loop {
let pending_r = state.pending_ops.poll_next_unpin(cx);
match pending_r {
Poll::Ready(None) => break,
Poll::Pending => break,
Poll::Ready(Some((op_id, buf))) => {
let successful_push = state.shared.push(op_id, &buf);
if !successful_push {
// If we couldn't push the response to the shared queue, because
// there wasn't enough size, we will return the buffer via the
// legacy route, using the argument of deno_respond.
overflow_response = Some((op_id, buf));
break;
overflow_response.push((op_id, buf));
}
}
};
}

loop {
let mut state = state_rc.borrow_mut();
let unref_r = state.pending_unref_ops.poll_next_unpin(cx);
#[allow(clippy::match_wild_err_arm)]
match unref_r {
Poll::Ready(None) => break,
Poll::Pending => break,
Poll::Ready(Some((op_id, buf))) => {
let successful_push = state.shared.push(op_id, &buf);
if !successful_push {
// If we couldn't push the response to the shared queue, because
// there wasn't enough size, we will return the buffer via the
// legacy route, using the argument of deno_respond.
overflow_response = Some((op_id, buf));
break;
overflow_response.push((op_id, buf));
}
}
};
Expand Down Expand Up @@ -1427,13 +1413,14 @@ impl JsRuntime {
// Respond using shared queue and optionally overflown response
fn async_op_response(
&mut self,
maybe_overflown_response: Option<(OpId, Box<[u8]>)>,
overflown_responses: Vec<(OpId, Box<[u8]>)>,
) -> Result<(), AnyError> {
let state_rc = Self::state(self.v8_isolate());

let shared_queue_size = state_rc.borrow().shared.size();
let overflown_responses_size = overflown_responses.len();

if shared_queue_size == 0 && maybe_overflown_response.is_none() {
if shared_queue_size == 0 && overflown_responses_size == 0 {
return Ok(());
}

Expand All @@ -1454,22 +1441,21 @@ impl JsRuntime {

let tc_scope = &mut v8::TryCatch::new(scope);

if shared_queue_size > 0 {
js_recv_cb.call(tc_scope, global, &[]);
let mut args: Vec<v8::Local<v8::Value>> =
Vec::with_capacity(2 * overflown_responses_size);
for overflown_response in overflown_responses {
let (op_id, buf) = overflown_response;
args.push(v8::Integer::new(tc_scope, op_id as i32).into());
args.push(bindings::boxed_slice_to_uint8array(tc_scope, buf).into());
}

if shared_queue_size > 0 || overflown_responses_size > 0 {
js_recv_cb.call(tc_scope, global, args.as_slice());
// The other side should have shifted off all the messages.
let shared_queue_size = state_rc.borrow().shared.size();
assert_eq!(shared_queue_size, 0);
}

if let Some(overflown_response) = maybe_overflown_response {
let (op_id, buf) = overflown_response;
let op_id: v8::Local<v8::Value> =
v8::Integer::new(tc_scope, op_id as i32).into();
let ui8: v8::Local<v8::Value> =
bindings::boxed_slice_to_uint8array(tc_scope, buf).into();
js_recv_cb.call(tc_scope, global, &[op_id, ui8]);
}

match tc_scope.exception() {
None => Ok(()),
Some(exception) => exception_to_err_result(tc_scope, exception, false),
Expand Down Expand Up @@ -1924,6 +1910,71 @@ pub mod tests {
});
}

#[test]
fn overflow_res_async_combined_with_unref() {
run_in_task(|cx| {
let mut runtime = JsRuntime::new(Default::default());

runtime.register_op(
"test1",
|_op_state: Rc<RefCell<OpState>>, _bufs: BufVec| -> Op {
let mut vec = vec![0u8; 100 * 1024 * 1024];
vec[0] = 4;
let buf = vec.into_boxed_slice();
Op::Async(futures::future::ready(buf).boxed())
},
);

runtime.register_op(
"test2",
|_op_state: Rc<RefCell<OpState>>, _bufs: BufVec| -> Op {
let mut vec = vec![0u8; 100 * 1024 * 1024];
vec[0] = 4;
let buf = vec.into_boxed_slice();
Op::AsyncUnref(futures::future::ready(buf).boxed())
},
);

runtime
.execute(
"overflow_res_async_combined_with_unref.js",
r#"
function assert(cond) {
if (!cond) {
throw Error("assert");
}
}
let asyncRecv = 0;
Deno.core.setAsyncHandler(1, (buf) => {
assert(buf.byteLength === 100 * 1024 * 1024);
assert(buf[0] === 4);
asyncRecv++;
});
Deno.core.setAsyncHandler(2, (buf) => {
assert(buf.byteLength === 100 * 1024 * 1024);
assert(buf[0] === 4);
asyncRecv++;
});
let control = new Uint8Array(1);
let response1 = Deno.core.dispatch(1, control);
// Async messages always have null response.
assert(response1 == null);
assert(asyncRecv == 0);
let response2 = Deno.core.dispatch(2, control);
// Async messages always have null response.
assert(response2 == null);
assert(asyncRecv == 0);
"#,
)
.unwrap();
assert!(matches!(runtime.poll_event_loop(cx), Poll::Ready(Ok(_))));
runtime
.execute("check.js", "assert(asyncRecv == 2);")
.unwrap();
});
}

#[test]
fn overflow_res_async() {
run_in_task(|_cx| {
Expand Down

0 comments on commit dccf5e0

Please sign in to comment.