From f16fe443038454ef49641b097c0a0161a97bd1c3 Mon Sep 17 00:00:00 2001 From: Andreu Botella Date: Wed, 10 Aug 2022 20:04:20 +0200 Subject: [PATCH] feat(core): Add support for async ops in realms (#14734) Pull request #14019 enabled initial support for realms, but it did not include support for async ops anywhere other than the main realm. The main issue was that the `js_recv_cb` callback, which resolves promises corresponding to async ops, was only set for the main realm, so async ops in other realms would never resolve. Furthermore, promise ID's are specific to each realm, which meant that async ops from other realms would result in a wrong promise from the main realm being resolved. This change creates a `ContextState` struct, similar to `JsRuntimeState` but stored in a slot of each `v8::Context`, which contains a `js_recv_cb` callback for each realm. Combined with a new list of known realms, which stores them as `v8::Weak`, and a change in the `#[op]` macro to pass the current context to `queue_async_op`, this makes it possible to send the results of promises for different realms to their realm, and prevent the ID's from getting mixed up. Additionally, since promise ID's are no longer unique to the isolate, having a single set of unrefed ops doesn't work. This change therefore also moves `unrefed_ops` from `JsRuntimeState` to `ContextState`, and adds the lengths of the unrefed op sets for all known realms to get the total number of unrefed ops to compare in the event loop. Co-authored-by: Luis Malheiro --- core/ops_builtin_v8.rs | 9 +- core/runtime.rs | 348 +++++++++++++++++++++++++++++++++++------ ops/lib.rs | 9 +- 3 files changed, 308 insertions(+), 58 deletions(-) diff --git a/core/ops_builtin_v8.rs b/core/ops_builtin_v8.rs index fbbcdd440cffc..f2d28346e482f 100644 --- a/core/ops_builtin_v8.rs +++ b/core/ops_builtin_v8.rs @@ -8,6 +8,7 @@ use crate::ops_builtin::WasmStreamingResource; use crate::resolve_url_or_path; use crate::serde_v8::from_v8; use crate::source_map::apply_source_map as apply_source_map_; +use crate::JsRealm; use crate::JsRuntime; use crate::OpDecl; use crate::ZeroCopyBuf; @@ -64,14 +65,14 @@ fn to_v8_fn( #[op(v8)] fn op_ref_op(scope: &mut v8::HandleScope, promise_id: i32) { - let state_rc = JsRuntime::state(scope); - state_rc.borrow_mut().unrefed_ops.remove(&promise_id); + let context_state = JsRealm::state_from_scope(scope); + context_state.borrow_mut().unrefed_ops.remove(&promise_id); } #[op(v8)] fn op_unref_op(scope: &mut v8::HandleScope, promise_id: i32) { - let state_rc = JsRuntime::state(scope); - state_rc.borrow_mut().unrefed_ops.insert(promise_id); + let context_state = JsRealm::state_from_scope(scope); + context_state.borrow_mut().unrefed_ops.insert(promise_id); } #[op(v8)] diff --git a/core/runtime.rs b/core/runtime.rs index 0176ff79104cc..b13ed0b7eaa1a 100644 --- a/core/runtime.rs +++ b/core/runtime.rs @@ -46,7 +46,8 @@ use std::sync::Once; use std::task::Context; use std::task::Poll; -type PendingOpFuture = OpCall<(PromiseId, OpId, OpResult)>; +type PendingOpFuture = + OpCall<(v8::Global, PromiseId, OpId, OpResult)>; pub enum Snapshot { Static(&'static [u8]), @@ -146,11 +147,19 @@ pub type SharedArrayBufferStore = pub type CompiledWasmModuleStore = CrossIsolateStore; +#[derive(Default)] +pub(crate) struct ContextState { + js_recv_cb: Option>, + // TODO(andreubotella): Move the rest of Option> fields from + // JsRuntimeState to this struct. + pub(crate) unrefed_ops: HashSet, +} + /// Internal state for JsRuntime which is stored in one of v8::Isolate's /// embedder slots. pub(crate) struct JsRuntimeState { global_realm: Option, - pub(crate) js_recv_cb: Option>, + known_realms: Vec>, pub(crate) js_macrotask_cbs: Vec>, pub(crate) js_nexttick_cbs: Vec>, pub(crate) js_promise_reject_cb: Option>, @@ -167,7 +176,6 @@ pub(crate) struct JsRuntimeState { pub(crate) source_map_getter: Option>, pub(crate) source_map_cache: SourceMapCache, pub(crate) pending_ops: FuturesUnordered, - pub(crate) unrefed_ops: HashSet, pub(crate) have_unpolled_ops: bool, pub(crate) op_state: Rc>, #[allow(dead_code)] @@ -383,13 +391,14 @@ impl JsRuntime { .module_loader .unwrap_or_else(|| Rc::new(NoopModuleLoader)); + let known_realms = vec![v8::Weak::new(&mut isolate, &global_context)]; isolate.set_slot(Rc::new(RefCell::new(JsRuntimeState { - global_realm: Some(JsRealm(global_context)), + global_realm: Some(JsRealm(global_context.clone())), + known_realms, pending_promise_exceptions: HashMap::new(), pending_dyn_mod_evaluate: vec![], pending_mod_evaluate: None, dyn_module_evaluate_idle_counter: 0, - js_recv_cb: None, js_macrotask_cbs: vec![], js_nexttick_cbs: vec![], js_promise_reject_cb: None, @@ -399,7 +408,6 @@ impl JsRuntime { source_map_getter: options.source_map_getter, source_map_cache: Default::default(), pending_ops: FuturesUnordered::new(), - unrefed_ops: HashSet::new(), shared_array_buffer_store: options.shared_array_buffer_store, compiled_wasm_module_store: options.compiled_wasm_module_store, op_state: op_state.clone(), @@ -409,6 +417,10 @@ impl JsRuntime { waker: AtomicWaker::new(), }))); + global_context + .open(&mut isolate) + .set_slot(&mut isolate, Rc::>::default()); + let module_map = ModuleMap::new(loader, op_state); isolate.set_slot(Rc::new(RefCell::new(module_map))); @@ -432,7 +444,8 @@ impl JsRuntime { // Init extension ops js_runtime.init_extension_ops().unwrap(); // Init callbacks (opresolve) - js_runtime.init_cbs(); + let global_realm = js_runtime.global_realm(); + js_runtime.init_cbs(&global_realm); js_runtime } @@ -470,12 +483,22 @@ impl JsRuntime { &Self::state(self.v8_isolate()).borrow().op_ctxs, self.built_from_snapshot, ); + context.set_slot(scope, Rc::>::default()); + + Self::state(scope) + .borrow_mut() + .known_realms + .push(v8::Weak::new(scope, &context)); + JsRealm::new(v8::Global::new(scope, context)) }; if !self.built_from_snapshot { self.init_extension_js(&realm)?; } + + self.init_cbs(&realm); + Ok(realm) } @@ -629,15 +652,17 @@ impl JsRuntime { } /// Grabs a reference to core.js' opresolve & syncOpsCache() - fn init_cbs(&mut self) { - let scope = &mut self.handle_scope(); - let recv_cb = - Self::grab_global::(scope, "Deno.core.opresolve").unwrap(); - let recv_cb = v8::Global::new(scope, recv_cb); - // Put global handles in state - let state_rc = JsRuntime::state(scope); - let mut state = state_rc.borrow_mut(); - state.js_recv_cb.replace(recv_cb); + fn init_cbs(&mut self, realm: &JsRealm) { + let recv_cb = { + let scope = &mut realm.handle_scope(self.v8_isolate()); + let recv_cb = + Self::grab_global::(scope, "Deno.core.opresolve") + .expect("Deno.core.opresolve is undefined in the realm"); + v8::Global::new(scope, recv_cb) + }; + // Put global handle in callback state + let state = realm.state(self.v8_isolate()); + state.borrow_mut().js_recv_cb.replace(recv_cb); } /// Returns the runtime's op state, which can be used to maintain ops @@ -704,8 +729,21 @@ impl JsRuntime { Rc::new(NoopModuleLoader), state.borrow().op_state.clone(), )))); + // Drop other v8::Global handles before snapshotting - std::mem::take(&mut state.borrow_mut().js_recv_cb); + { + for weak_context in &state.borrow().known_realms { + if let Some(context) = weak_context.to_global(self.v8_isolate()) { + let realm = JsRealm::new(context.clone()); + let realm_state = realm.state(self.v8_isolate()); + std::mem::take(&mut realm_state.borrow_mut().js_recv_cb); + context + .open(self.v8_isolate()) + .clear_all_slots(self.v8_isolate()); + } + } + state.borrow_mut().known_realms.clear(); + } let snapshot_creator = self.snapshot_creator.as_mut().unwrap(); let snapshot = snapshot_creator @@ -1023,8 +1061,16 @@ Pending dynamic modules:\n".to_string(); let state = state_rc.borrow_mut(); let module_map = module_map_rc.borrow(); + let mut num_unrefed_ops = 0; + for weak_context in &state.known_realms { + if let Some(context) = weak_context.to_global(isolate) { + let realm = JsRealm::new(context); + num_unrefed_ops += realm.state(isolate).borrow().unrefed_ops.len(); + } + } + EventLoopPendingState { - has_pending_refed_ops: state.pending_ops.len() > state.unrefed_ops.len(), + has_pending_refed_ops: state.pending_ops.len() > num_unrefed_ops, has_pending_dyn_imports: module_map.has_pending_dynamic_imports(), has_pending_dyn_module_evaluation: !state .pending_dyn_mod_evaluate @@ -1792,17 +1838,30 @@ impl JsRuntime { fn resolve_async_ops(&mut self, cx: &mut Context) -> Result<(), Error> { let state_rc = Self::state(self.v8_isolate()); - let js_recv_cb_handle = state_rc.borrow().js_recv_cb.clone().unwrap(); - let scope = &mut self.handle_scope(); + // We keep a list of promise IDs and OpResults per realm. Since v8::Context + // isn't hashable, `results_per_realm` is a vector of (context, list) tuples + type ResultList = Vec<(i32, OpResult)>; + let mut results_per_realm: Vec<(v8::Global, ResultList)> = { + let known_realms = &mut state_rc.borrow_mut().known_realms; + let mut results = Vec::with_capacity(known_realms.len()); + + // Avoid calling the method multiple times + let isolate = self.v8_isolate(); + + // Remove GC'd realms from `known_realms` at the same time as we populate + // `results` with those that are still alive. + known_realms.retain(|weak| { + if !weak.is_empty() { + let context = weak.to_global(isolate).unwrap(); + results.push((context, vec![])); + true + } else { + false + } + }); - // We return async responses to JS in unbounded batches (may change), - // each batch is a flat vector of tuples: - // `[promise_id1, op_result1, promise_id2, op_result2, ...]` - // promise_id is a simple integer, op_result is an ops::OpResult - // which contains a value OR an error, encoded as a tuple. - // This batch is received in JS via the special `arguments` variable - // and then each tuple is used to resolve or reject promises - let mut args: Vec> = vec![]; + results + }; // Now handle actual ops. { @@ -1811,27 +1870,61 @@ impl JsRuntime { while let Poll::Ready(Some(item)) = state.pending_ops.poll_next_unpin(cx) { - let (promise_id, op_id, resp) = item; - state.unrefed_ops.remove(&promise_id); + let (context, promise_id, op_id, resp) = item; state.op_state.borrow().tracker.track_async_completed(op_id); - args.push(v8::Integer::new(scope, promise_id as i32).into()); - args.push(resp.to_v8(scope).unwrap()); + for (context2, results) in results_per_realm.iter_mut() { + if context == *context2 { + results.push((promise_id, resp)); + break; + } + } + JsRealm::new(context) + .state(self.v8_isolate()) + .borrow_mut() + .unrefed_ops + .remove(&promise_id); } } - if args.is_empty() { - return Ok(()); - } + for (context, results) in results_per_realm { + if results.is_empty() { + continue; + } - let tc_scope = &mut v8::TryCatch::new(scope); - let js_recv_cb = js_recv_cb_handle.open(tc_scope); - let this = v8::undefined(tc_scope).into(); - js_recv_cb.call(tc_scope, this, args.as_slice()); + let realm = JsRealm::new(context); + let js_recv_cb_handle = realm + .state(self.v8_isolate()) + .borrow() + .js_recv_cb + .clone() + .unwrap(); + let scope = &mut realm.handle_scope(self.v8_isolate()); + + // We return async responses to JS in unbounded batches (may change), + // each batch is a flat vector of tuples: + // `[promise_id1, op_result1, promise_id2, op_result2, ...]` + // promise_id is a simple integer, op_result is an ops::OpResult + // which contains a value OR an error, encoded as a tuple. + // This batch is received in JS via the special `arguments` variable + // and then each tuple is used to resolve or reject promises + let mut args = vec![]; + + for (promise_id, resp) in results.into_iter() { + args.push(v8::Integer::new(scope, promise_id as i32).into()); + args.push(resp.to_v8(scope).unwrap()); + } - match tc_scope.exception() { - None => Ok(()), - Some(exception) => exception_to_err_result(tc_scope, exception, false), + let tc_scope = &mut v8::TryCatch::new(scope); + let js_recv_cb = js_recv_cb_handle.open(tc_scope); + let this = v8::undefined(tc_scope).into(); + js_recv_cb.call(tc_scope, this, args.as_slice()); + + if let Some(exception) = tc_scope.exception() { + return exception_to_err_result(tc_scope, exception, false); + } } + + Ok(()) } fn drain_macrotasks(&mut self) -> Result<(), Error> { @@ -1937,6 +2030,28 @@ impl JsRealm { &self.0 } + pub(crate) fn state( + &self, + isolate: &mut v8::Isolate, + ) -> Rc> { + self + .context() + .open(isolate) + .get_slot::>>(isolate) + .unwrap() + .clone() + } + + pub(crate) fn state_from_scope( + scope: &mut v8::HandleScope, + ) -> Rc> { + let context = scope.get_current_context(); + context + .get_slot::>>(scope) + .unwrap() + .clone() + } + pub fn handle_scope<'s>( &self, isolate: &'s mut v8::Isolate, @@ -2005,7 +2120,8 @@ impl JsRealm { #[inline] pub fn queue_async_op( scope: &v8::Isolate, - op: impl Future + 'static, + op: impl Future, PromiseId, OpId, OpResult)> + + 'static, ) { let state_rc = JsRuntime::state(scope); let mut state = state_rc.borrow_mut(); @@ -2162,11 +2278,11 @@ pub mod tests { ) .unwrap(); { + let realm = runtime.global_realm(); let isolate = runtime.v8_isolate(); let state_rc = JsRuntime::state(isolate); - let state = state_rc.borrow(); - assert_eq!(state.pending_ops.len(), 2); - assert_eq!(state.unrefed_ops.len(), 0); + assert_eq!(state_rc.borrow().pending_ops.len(), 2); + assert_eq!(realm.state(isolate).borrow().unrefed_ops.len(), 0); } runtime .execute_script( @@ -2178,11 +2294,11 @@ pub mod tests { ) .unwrap(); { + let realm = runtime.global_realm(); let isolate = runtime.v8_isolate(); let state_rc = JsRuntime::state(isolate); - let state = state_rc.borrow(); - assert_eq!(state.pending_ops.len(), 2); - assert_eq!(state.unrefed_ops.len(), 2); + assert_eq!(state_rc.borrow().pending_ops.len(), 2); + assert_eq!(realm.state(isolate).borrow().unrefed_ops.len(), 2); } runtime .execute_script( @@ -2194,11 +2310,11 @@ pub mod tests { ) .unwrap(); { + let realm = runtime.global_realm(); let isolate = runtime.v8_isolate(); let state_rc = JsRuntime::state(isolate); - let state = state_rc.borrow(); - assert_eq!(state.pending_ops.len(), 2); - assert_eq!(state.unrefed_ops.len(), 0); + assert_eq!(state_rc.borrow().pending_ops.len(), 2); + assert_eq!(realm.state(isolate).borrow().unrefed_ops.len(), 0); } } @@ -3635,4 +3751,132 @@ assertEquals(1, notify_return_value); assert!(ret.open(runtime.v8_isolate()).is_true()); } } + + #[tokio::test] + async fn js_realm_async_ops() { + // Test that returning a ZeroCopyBuf and throwing an exception from a async + // op result in objects with prototypes from the right realm. Note that we + // don't test the result of returning structs, because they will be + // serialized to objects with null prototype. + + #[op] + async fn op_test(fail: bool) -> Result { + if !fail { + Ok(ZeroCopyBuf::empty()) + } else { + Err(crate::error::type_error("Test")) + } + } + + let mut runtime = JsRuntime::new(RuntimeOptions { + extensions: vec![Extension::builder().ops(vec![op_test::decl()]).build()], + get_error_class_fn: Some(&|error| { + crate::error::get_custom_error_class(error).unwrap() + }), + ..Default::default() + }); + + let global_realm = runtime.global_realm(); + let new_realm = runtime.create_realm().unwrap(); + + let mut rets = vec![]; + + // Test in both realms + for realm in [global_realm, new_realm].into_iter() { + let ret = realm + .execute_script( + runtime.v8_isolate(), + "", + r#" + (async function () { + const buf = await Deno.core.opAsync("op_test", false); + let err; + try { + await Deno.core.opAsync("op_test", true); + } catch(e) { + err = e; + } + return buf instanceof Uint8Array && buf.byteLength === 0 && + err instanceof TypeError && err.message === "Test" ; + })(); + "#, + ) + .unwrap(); + rets.push((realm, ret)); + } + + runtime.run_event_loop(false).await.unwrap(); + + for ret in rets { + let scope = &mut ret.0.handle_scope(runtime.v8_isolate()); + let value = v8::Local::new(scope, ret.1); + let promise = v8::Local::::try_from(value).unwrap(); + let result = promise.result(scope); + + assert!(result.is_boolean() && result.is_true()); + } + } + + #[tokio::test] + async fn js_realm_ref_unref_ops() { + run_in_task(|cx| { + // Never resolves. + #[op] + async fn op_pending() { + futures::future::pending().await + } + + let mut runtime = JsRuntime::new(RuntimeOptions { + extensions: vec![Extension::builder() + .ops(vec![op_pending::decl()]) + .build()], + ..Default::default() + }); + let main_realm = runtime.global_realm(); + let other_realm = runtime.create_realm().unwrap(); + + main_realm + .execute_script( + runtime.v8_isolate(), + "", + "var promise = Deno.core.opAsync('op_pending');", + ) + .unwrap(); + other_realm + .execute_script( + runtime.v8_isolate(), + "", + "var promise = Deno.core.opAsync('op_pending');", + ) + .unwrap(); + assert!(matches!(runtime.poll_event_loop(cx, false), Poll::Pending)); + + main_realm + .execute_script( + runtime.v8_isolate(), + "", + r#" + let promiseIdSymbol = Symbol.for("Deno.core.internalPromiseId"); + Deno.core.opSync("op_unref_op", promise[promiseIdSymbol]); + "#, + ) + .unwrap(); + assert!(matches!(runtime.poll_event_loop(cx, false), Poll::Pending)); + + other_realm + .execute_script( + runtime.v8_isolate(), + "", + r#" + let promiseIdSymbol = Symbol.for("Deno.core.internalPromiseId"); + Deno.core.opSync("op_unref_op", promise[promiseIdSymbol]); + "#, + ) + .unwrap(); + assert!(matches!( + runtime.poll_event_loop(cx, false), + Poll::Ready(Ok(())) + )); + }); + } } diff --git a/ops/lib.rs b/ops/lib.rs index df8abf61eaeb4..fdafca16597ce 100644 --- a/ops/lib.rs +++ b/ops/lib.rs @@ -190,7 +190,7 @@ fn codegen_v8_async( quote! { let result = match result { Ok(fut) => fut.await, - Err(e) => return (promise_id, op_id, #core::_ops::to_op_result::<()>(get_class, Err(e))), + Err(e) => return (context, promise_id, op_id, #core::_ops::to_op_result::<()>(get_class, Err(e))), }; } } else { @@ -233,11 +233,16 @@ fn codegen_v8_async( state.get_error_class_fn }; + let context = { + let local = scope.get_current_context(); + #core::v8::Global::new(scope, local) + }; + #pre_result #core::_ops::queue_async_op(scope, async move { let result = #result_fut #result_wrapper - (promise_id, op_id, #core::_ops::to_op_result(get_class, result)) + (context, promise_id, op_id, #core::_ops::to_op_result(get_class, result)) }); } }