Skip to content

Commit

Permalink
feat(ext/ffi): Thread safe callbacks (denoland#14942)
Browse files Browse the repository at this point in the history
  • Loading branch information
aapoalas committed Jun 28, 2022
1 parent e1c9096 commit 00f4521
Show file tree
Hide file tree
Showing 11 changed files with 400 additions and 28 deletions.
27 changes: 27 additions & 0 deletions cli/dts/lib.deno.unstable.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,9 @@ declare namespace Deno {
* as C function pointers to ffi calls.
*
* The function pointer remains valid until the `close()` method is called.
*
* The callback can be explicitly ref'ed and deref'ed to stop Deno's
* process from exiting.
*/
export class UnsafeCallback<
Definition extends UnsafeCallbackDefinition = UnsafeCallbackDefinition,
Expand All @@ -584,6 +587,30 @@ declare namespace Deno {
Definition["result"]
>;

/**
* Adds one to this callback's reference counting.
*
* If the callback's reference count becomes non-zero, it will keep
* Deno's process from exiting.
*/
ref(): void;

/**
* Removes one from this callback's reference counting.
*
* If the callback's reference counter becomes zero, it will no longer
* keep Deno's process from exiting.
*/
unref(): void;

/**
* Removes the C function pointer associated with the UnsafeCallback.
* Continuing to use the instance after calling this object will lead to errors
* and crashes.
*
* Calling this method will also immediately set the callback's reference
* counting to zero and it will no longer keep Deno's process from exiting.
*/
close(): void;
}

Expand Down
3 changes: 2 additions & 1 deletion core/examples/schedule_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ type Task = Box<dyn FnOnce()>;
fn main() {
let my_ext = Extension::builder()
.ops(vec![op_schedule_task::decl()])
.event_loop_middleware(|state, cx| {
.event_loop_middleware(|state_rc, cx| {
let mut state = state_rc.borrow_mut();
let recv = state.borrow_mut::<mpsc::UnboundedReceiver<Task>>();
let mut ref_loop = false;
while let Poll::Ready(Some(call)) = recv.poll_next_unpin(cx) {
Expand Down
10 changes: 5 additions & 5 deletions core/extensions.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
use crate::OpState;
use anyhow::Error;
use std::task::Context;
use std::{cell::RefCell, rc::Rc, task::Context};

pub type SourcePair = (&'static str, &'static str);
pub type OpFnRef = v8::FunctionCallback;
pub type OpMiddlewareFn = dyn Fn(OpDecl) -> OpDecl;
pub type OpStateFn = dyn Fn(&mut OpState) -> Result<(), Error>;
pub type OpEventLoopFn = dyn Fn(&mut OpState, &mut Context) -> bool;
pub type OpEventLoopFn = dyn Fn(Rc<RefCell<OpState>>, &mut Context) -> bool;

#[derive(Clone, Copy)]
pub struct OpDecl {
Expand Down Expand Up @@ -90,13 +90,13 @@ impl Extension {

pub fn run_event_loop_middleware(
&self,
op_state: &mut OpState,
op_state_rc: Rc<RefCell<OpState>>,
cx: &mut Context,
) -> bool {
self
.event_loop_middleware
.as_ref()
.map(|f| f(op_state, cx))
.map(|f| f(op_state_rc, cx))
.unwrap_or(false)
}

Expand Down Expand Up @@ -148,7 +148,7 @@ impl ExtensionBuilder {

pub fn event_loop_middleware<F>(&mut self, middleware_fn: F) -> &mut Self
where
F: Fn(&mut OpState, &mut Context) -> bool + 'static,
F: Fn(Rc<RefCell<OpState>>, &mut Context) -> bool + 'static,
{
self.event_loop_middleware = Some(Box::new(middleware_fn));
self
Expand Down
2 changes: 1 addition & 1 deletion core/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -921,7 +921,7 @@ impl JsRuntime {
let state = state_rc.borrow();
let op_state = state.op_state.clone();
for f in &self.event_loop_middlewares {
if f(&mut op_state.borrow_mut(), cx) {
if f(op_state.clone(), cx) {
maybe_scheduling = true;
}
}
Expand Down
18 changes: 18 additions & 0 deletions ext/ffi/00_ffi.js
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@
}

class UnsafeCallback {
#refcount;
#rid;
definition;
callback;
Expand All @@ -217,13 +218,30 @@
definition,
callback,
);
this.#refcount = 0;
this.#rid = rid;
this.pointer = pointer;
this.definition = definition;
this.callback = callback;
}

ref() {
if (this.#refcount++ === 0) {
core.opSync("op_ffi_unsafe_callback_ref", true);
}
}

unref() {
if (--this.#refcount === 0) {
core.opSync("op_ffi_unsafe_callback_ref", false);
}
}

close() {
if (this.#refcount) {
this.#refcount = 0;
core.opSync("op_ffi_unsafe_callback_ref", false);
}
core.close(this.#rid);
}
}
Expand Down
132 changes: 118 additions & 14 deletions ext/ffi/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ use deno_core::error::generic_error;
use deno_core::error::range_error;
use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::futures::channel::mpsc;
use deno_core::futures::Future;
use deno_core::include_js_files;
use deno_core::op;
use std::sync::mpsc::sync_channel;

use deno_core::serde_json::json;
use deno_core::serde_json::Value;
Expand Down Expand Up @@ -37,7 +39,7 @@ use std::ptr;
use std::rc::Rc;

thread_local! {
static IS_ISOLATE_THREAD: RefCell<bool> = RefCell::new(false);
static LOCAL_ISOLATE_POINTER: RefCell<*const v8::Isolate> = RefCell::new(ptr::null());
}

pub struct Unstable(pub bool);
Expand Down Expand Up @@ -122,7 +124,6 @@ impl DynamicLibraryResource {
name: String,
foreign_fn: ForeignFunction,
) -> Result<(), AnyError> {
IS_ISOLATE_THREAD.with(|s| s.replace(true));
let symbol = match &foreign_fn.name {
Some(symbol) => symbol,
None => &name,
Expand Down Expand Up @@ -178,6 +179,14 @@ impl DynamicLibraryResource {
}
}

type PendingFfiAsyncWork = Box<dyn FnOnce()>;

struct FfiState {
async_work_sender: mpsc::UnboundedSender<PendingFfiAsyncWork>,
async_work_receiver: mpsc::UnboundedReceiver<PendingFfiAsyncWork>,
active_refed_functions: usize,
}

pub fn init<P: FfiPermissions + 'static>(unstable: bool) -> Extension {
Extension::builder()
.js(include_js_files!(
Expand All @@ -204,10 +213,51 @@ pub fn init<P: FfiPermissions + 'static>(unstable: bool) -> Extension {
op_ffi_read_f32::decl::<P>(),
op_ffi_read_f64::decl::<P>(),
op_ffi_unsafe_callback_create::decl::<P>(),
op_ffi_unsafe_callback_ref::decl(),
])
.event_loop_middleware(|op_state_rc, _cx| {
// FFI callbacks coming in from other threads will call in and get queued.
let mut maybe_scheduling = false;

let mut work_items: Vec<PendingFfiAsyncWork> = vec![];

{
let mut op_state = op_state_rc.borrow_mut();
let ffi_state = op_state.borrow_mut::<FfiState>();

while let Ok(Some(async_work_fut)) =
ffi_state.async_work_receiver.try_next()
{
// Move received items to a temporary vector so that we can drop the `op_state` borrow before we do the work.
work_items.push(async_work_fut);
maybe_scheduling = true;
}

if ffi_state.active_refed_functions > 0 {
maybe_scheduling = true;
}

drop(op_state);
}
while let Some(async_work_fut) = work_items.pop() {
async_work_fut();
}

maybe_scheduling
})
.state(move |state| {
// Stolen from deno_webgpu, is there a better option?
state.put(Unstable(unstable));

let (async_work_sender, async_work_receiver) =
mpsc::unbounded::<PendingFfiAsyncWork>();

state.put(FfiState {
active_refed_functions: 0,
async_work_receiver,
async_work_sender,
});

Ok(())
})
.build()
Expand Down Expand Up @@ -831,6 +881,7 @@ impl Resource for UnsafeCallbackResource {
}

struct CallbackInfo {
pub async_work_sender: mpsc::UnboundedSender<PendingFfiAsyncWork>,
pub callback: NonNull<v8::Function>,
pub context: NonNull<v8::Context>,
pub isolate: *mut v8::Isolate,
Expand All @@ -842,21 +893,55 @@ unsafe extern "C" fn deno_ffi_callback(
args: *const *const c_void,
info: &CallbackInfo,
) {
let isolate = &mut *info.isolate;
let callback = v8::Global::from_raw(isolate, info.callback);
let context = std::mem::transmute::<
NonNull<v8::Context>,
v8::Local<v8::Context>,
>(info.context);
IS_ISOLATE_THREAD.with(|is_event_loop_thread| {
if !(*is_event_loop_thread.borrow()) {
// Call from another thread, not yet supported.
eprintln!(
"Calling Deno FFI's callbacks from other threads is not supported"
LOCAL_ISOLATE_POINTER.with(|s| {
if ptr::eq(*s.borrow(), info.isolate) {
// Own isolate thread, okay to call directly
do_ffi_callback(
cif,
result,
args,
info.callback,
info.context,
info.isolate,
);
std::process::exit(1);
} else {
let async_work_sender = &info.async_work_sender;
// SAFETY: Safe as this function blocks until `do_ffi_callback` completes and a response message is received.
let cif: &'static libffi::low::ffi_cif = std::mem::transmute(cif);
let result: &'static mut c_void = std::mem::transmute(result);
let info: &'static CallbackInfo = std::mem::transmute(info);
let (response_sender, response_receiver) = sync_channel::<()>(0);
let fut = Box::new(move || {
do_ffi_callback(
cif,
result,
args,
info.callback,
info.context,
info.isolate,
);
response_sender.send(()).unwrap();
});
async_work_sender.unbounded_send(fut).unwrap();
response_receiver.recv().unwrap();
}
});
}

unsafe fn do_ffi_callback(
cif: &libffi::low::ffi_cif,
result: &mut c_void,
args: *const *const c_void,
callback: NonNull<v8::Function>,
context: NonNull<v8::Context>,
isolate: *mut v8::Isolate,
) {
let isolate = &mut *isolate;
let callback = v8::Global::from_raw(isolate, callback);
let context = std::mem::transmute::<
NonNull<v8::Context>,
v8::Local<v8::Context>,
>(context);
// Call from main thread. If this callback is being triggered due to a
// function call coming from Deno itself, then this callback will build
// ontop of that stack.
Expand Down Expand Up @@ -1096,11 +1181,20 @@ where
let cb = v8::Local::<v8::Function>::try_from(v8_value)?;

let isolate: *mut v8::Isolate = &mut *scope as &mut v8::Isolate;
LOCAL_ISOLATE_POINTER.with(|s| {
if s.borrow().is_null() {
s.replace(isolate);
}
});

let async_work_sender =
state.borrow_mut::<FfiState>().async_work_sender.clone();
let callback = v8::Global::new(scope, cb).into_raw();
let current_context = scope.get_current_context();
let context = v8::Global::new(scope, current_context).into_raw();

let info = Box::leak(Box::new(CallbackInfo {
async_work_sender,
callback,
context,
isolate,
Expand Down Expand Up @@ -1158,6 +1252,16 @@ where
Ok(result)
}

#[op]
fn op_ffi_unsafe_callback_ref(state: &mut deno_core::OpState, inc_dec: bool) {
let ffi_state = state.borrow_mut::<FfiState>();
if inc_dec {
ffi_state.active_refed_functions += 1;
} else {
ffi_state.active_refed_functions -= 1;
}
}

#[op(v8)]
fn op_ffi_call_ptr_nonblocking<'scope, FP>(
scope: &mut v8::HandleScope<'scope>,
Expand Down
13 changes: 13 additions & 0 deletions test_ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,19 @@ pub extern "C" fn call_stored_function_2(arg: u8) {
}
}

#[no_mangle]
pub extern "C" fn call_stored_function_thread_safe() {
std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::from_millis(1500));
unsafe {
if STORED_FUNCTION.is_none() {
return;
}
STORED_FUNCTION.unwrap()();
}
});
}

// FFI performance helper functions
#[no_mangle]
pub extern "C" fn nop() {}
Expand Down
Loading

0 comments on commit 00f4521

Please sign in to comment.