Skip to content

Commit

Permalink
fix(workers): Make worker.terminate() not immediately kill the isol…
Browse files Browse the repository at this point in the history
…ate (denoland#12831)

Due to a bug in V8, terminating an isolate while a module with top-level
await is being evaluated would crash the process. This change makes it
so calling `worker.terminate()` will signal the worker to terminate at
the next iteration of the event loop, and it schedules a proper
termination of the worker's isolate after 2 seconds.
  • Loading branch information
Andreu Botella committed Nov 29, 2021
1 parent 5178e09 commit 4a13c32
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 21 deletions.
5 changes: 5 additions & 0 deletions cli/tests/integration/worker_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,8 @@ itest!(worker_permissions_blob_local {
http_server: true,
exit_code: 1,
});

itest!(worker_terminate_tla_crash {
args: "run --quiet --reload workers/terminate_tla_crash.js",
output: "workers/terminate_tla_crash.js.out",
});
21 changes: 21 additions & 0 deletions cli/tests/testdata/workers/terminate_tla_crash.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Test for https://github.com/denoland/deno/issues/12658
//
// If a worker is terminated immediately after construction, and the worker's
// main module uses top-level await, V8 has a chance to crash.
//
// These crashes are so rare in debug mode that I've only seen them once. They
// happen a lot more often in release mode.

const workerModule = `
await new Promise(resolve => setTimeout(resolve, 1000));
`;

// Iterating 10 times to increase the likelihood of triggering the crash, at
// least in release mode.
for (let i = 0; i < 10; i++) {
const worker = new Worker(
`data:application/javascript;base64,${btoa(workerModule)}`,
{ type: "module" },
);
worker.terminate();
}
Empty file.
91 changes: 70 additions & 21 deletions runtime/web_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use deno_core::error::JsError;
use deno_core::futures::channel::mpsc;
use deno_core::futures::future::poll_fn;
use deno_core::futures::stream::StreamExt;
use deno_core::futures::task::AtomicWaker;
use deno_core::located_script_name;
use deno_core::serde::Deserialize;
use deno_core::serde::Serialize;
Expand Down Expand Up @@ -113,7 +114,9 @@ pub struct WebWorkerInternalHandle {
sender: mpsc::Sender<WorkerControlEvent>,
pub port: Rc<MessagePort>,
pub cancel: Rc<CancelHandle>,
terminated: Arc<AtomicBool>,
termination_signal: Arc<AtomicBool>,
has_terminated: Arc<AtomicBool>,
terminate_waker: Arc<AtomicWaker>,
isolate_handle: v8::IsolateHandle,
pub worker_type: WebWorkerType,
}
Expand All @@ -127,7 +130,7 @@ impl WebWorkerInternalHandle {
//
// Therefore just treat it as if the worker has terminated and return.
if sender.is_closed() {
self.terminated.store(true, Ordering::SeqCst);
self.has_terminated.store(true, Ordering::SeqCst);
return Ok(());
}
sender.try_send(event)?;
Expand All @@ -136,7 +139,21 @@ impl WebWorkerInternalHandle {

/// Check if this worker is terminated or being terminated
pub fn is_terminated(&self) -> bool {
self.terminated.load(Ordering::SeqCst)
self.has_terminated.load(Ordering::SeqCst)
}

/// Check if this worker must terminate (because the termination signal is
/// set), and terminates it if so. Returns whether the worker is terminated or
/// being terminated, as with [`Self::is_terminated()`].
pub fn terminate_if_needed(&mut self) -> bool {
let has_terminated = self.is_terminated();

if !has_terminated && self.termination_signal.load(Ordering::SeqCst) {
self.terminate();
return true;
}

has_terminated
}

/// Terminate the worker
Expand All @@ -147,7 +164,7 @@ impl WebWorkerInternalHandle {
// This function can be called multiple times by whomever holds
// the handle. However only a single "termination" should occur so
// we need a guard here.
let already_terminated = self.terminated.swap(true, Ordering::SeqCst);
let already_terminated = self.has_terminated.swap(true, Ordering::SeqCst);

if !already_terminated {
// Stop javascript execution
Expand All @@ -162,7 +179,9 @@ impl WebWorkerInternalHandle {
pub struct SendableWebWorkerHandle {
port: MessagePort,
receiver: mpsc::Receiver<WorkerControlEvent>,
terminated: Arc<AtomicBool>,
termination_signal: Arc<AtomicBool>,
has_terminated: Arc<AtomicBool>,
terminate_waker: Arc<AtomicWaker>,
isolate_handle: v8::IsolateHandle,
}

Expand All @@ -171,7 +190,9 @@ impl From<SendableWebWorkerHandle> for WebWorkerHandle {
WebWorkerHandle {
receiver: Rc::new(RefCell::new(handle.receiver)),
port: Rc::new(handle.port),
terminated: handle.terminated,
termination_signal: handle.termination_signal,
has_terminated: handle.has_terminated,
terminate_waker: handle.terminate_waker,
isolate_handle: handle.isolate_handle,
}
}
Expand All @@ -188,7 +209,9 @@ impl From<SendableWebWorkerHandle> for WebWorkerHandle {
pub struct WebWorkerHandle {
pub port: Rc<MessagePort>,
receiver: Rc<RefCell<mpsc::Receiver<WorkerControlEvent>>>,
terminated: Arc<AtomicBool>,
termination_signal: Arc<AtomicBool>,
has_terminated: Arc<AtomicBool>,
terminate_waker: Arc<AtomicWaker>,
isolate_handle: v8::IsolateHandle,
}

Expand All @@ -204,19 +227,37 @@ impl WebWorkerHandle {
}

/// Terminate the worker
/// This function will set terminated to true, terminate the isolate and close the message channel
/// This function will set the termination signal, close the message channel,
/// and schedule to terminate the isolate after two seconds.
pub fn terminate(self) {
// A WebWorkerHandle can be terminated / dropped after `self.close()` has
// been called inside the worker, but only a single "termination" can occur,
// so we need a guard here.
let already_terminated = self.terminated.swap(true, Ordering::SeqCst);
use std::thread::{sleep, spawn};
use std::time::Duration;

if !already_terminated {
// Stop javascript execution
self.isolate_handle.terminate_execution();
}
let schedule_termination =
!self.termination_signal.swap(true, Ordering::SeqCst);

self.port.disentangle();

if schedule_termination && !self.has_terminated.load(Ordering::SeqCst) {
// Wake up the worker's event loop so it can terminate.
self.terminate_waker.wake();

let has_terminated = self.has_terminated.clone();

// Schedule to terminate the isolate's execution.
spawn(move || {
sleep(Duration::from_secs(2));

// A worker's isolate can only be terminated once, so we need a guard
// here.
let already_terminated = has_terminated.swap(true, Ordering::SeqCst);

if !already_terminated {
// Stop javascript execution
self.isolate_handle.terminate_execution();
}
});
}
}
}

Expand All @@ -226,19 +267,25 @@ fn create_handles(
) -> (WebWorkerInternalHandle, SendableWebWorkerHandle) {
let (parent_port, worker_port) = create_entangled_message_port();
let (ctrl_tx, ctrl_rx) = mpsc::channel::<WorkerControlEvent>(1);
let terminated = Arc::new(AtomicBool::new(false));
let termination_signal = Arc::new(AtomicBool::new(false));
let has_terminated = Arc::new(AtomicBool::new(false));
let terminate_waker = Arc::new(AtomicWaker::new());
let internal_handle = WebWorkerInternalHandle {
sender: ctrl_tx,
port: Rc::new(parent_port),
terminated: terminated.clone(),
termination_signal: termination_signal.clone(),
has_terminated: has_terminated.clone(),
terminate_waker: terminate_waker.clone(),
isolate_handle: isolate_handle.clone(),
cancel: CancelHandle::new_rc(),
worker_type,
};
let external_handle = SendableWebWorkerHandle {
receiver: ctrl_rx,
port: worker_port,
terminated,
termination_signal,
has_terminated,
terminate_waker,
isolate_handle,
};
(internal_handle, external_handle)
Expand Down Expand Up @@ -496,14 +543,16 @@ impl WebWorker {
wait_for_inspector: bool,
) -> Poll<Result<(), AnyError>> {
// If awakened because we are terminating, just return Ok
if self.internal_handle.is_terminated() {
if self.internal_handle.terminate_if_needed() {
return Poll::Ready(Ok(()));
}

self.internal_handle.terminate_waker.register(cx.waker());

match self.js_runtime.poll_event_loop(cx, wait_for_inspector) {
Poll::Ready(r) => {
// If js ended because we are terminating, just return Ok
if self.internal_handle.is_terminated() {
if self.internal_handle.terminate_if_needed() {
return Poll::Ready(Ok(()));
}

Expand Down

0 comments on commit 4a13c32

Please sign in to comment.