Skip to content

Commit

Permalink
fix(workers): Don't panic when a worker's parent thread stops running (
Browse files Browse the repository at this point in the history
…denoland#12156)

This panic could happen in the following cases:

- A non-fatal error being thrown from a worker, that doesn't terminate
  the worker's execution, but propagates to the main thread without
  being handled, and makes the main thread terminate.
- A nested worker being alive while its parent worker gets terminated.
- A race condition if the main event loop terminates the worker as part
  of its last task, but the worker doesn't fully terminate before the
  main event loop stops running.

This panic happens because a worker's event loop should have pending ops
as long as the worker isn't closed or terminated – but if an event loop
finishes running while it has living workers, its associated
`WorkerThread` structs will be dropped, closing the channels that keep
those ops pending.

This change adds a `Drop` implementation to `WorkerThread`, which
terminates the worker without waiting for a response. This fixes the
panic, and makes it so nested workers are automatically terminated once
any of their ancestors is closed or terminated.

This change also refactors a worker's termination code into a
`WorkerThread::terminate()` method.

Closes denoland#11342.

Co-authored-by: Bartek Iwańczuk <[email protected]>
  • Loading branch information
Andreu Botella and bartlomieju committed Sep 22, 2021
1 parent eddae41 commit 5c5f4ea
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 18 deletions.
6 changes: 6 additions & 0 deletions cli/tests/integration/run_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1188,6 +1188,12 @@ itest!(worker_close_race {
output: "worker_close_race.js.out",
});

itest!(worker_drop_handle_race {
args: "run --quiet --reload --allow-read worker_drop_handle_race.js",
output: "worker_drop_handle_race.js.out",
exit_code: 1,
});

itest!(worker_message_before_close {
args: "run --quiet --reload --allow-read worker_message_before_close.js",
output: "worker_message_before_close.js.out",
Expand Down
12 changes: 12 additions & 0 deletions cli/tests/testdata/worker_drop_handle_race.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.

// https://github.com/denoland/deno/issues/11342
// Test for a panic that happens when the main thread's event loop finishes
// running while the worker's event loop is still spinning.

// The exception thrown in the worker will not terminate the worker, but it will
// propagate to the main thread and cause it to exit.
new Worker(
new URL("./workers/drop_handle_race.js", import.meta.url).href,
{ type: "module" },
);
8 changes: 8 additions & 0 deletions cli/tests/testdata/worker_drop_handle_race.js.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
error: Uncaught (in worker "") Error
throw new Error();
^
at [WILDCARD]/workers/drop_handle_race.js:2:9
at fire (deno:ext/timers/[WILDCARD])
at handleTimerMacrotask (deno:ext/timers/[WILDCARD])
error: Uncaught (in promise) Error: Unhandled error event reached main worker.
at Worker.#pollControl (deno:runtime/js/11_workers.js:[WILDCARD])
3 changes: 3 additions & 0 deletions cli/tests/testdata/workers/drop_handle_race.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
setTimeout(() => {
throw new Error();
}, 1000);
48 changes: 33 additions & 15 deletions runtime/ops/worker_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ pub type CreateWebWorkerCb = dyn Fn(CreateWebWorkerArgs) -> (WebWorker, Sendable
pub struct CreateWebWorkerCbHolder(Arc<CreateWebWorkerCb>);

pub struct WorkerThread {
join_handle: JoinHandle<Result<(), AnyError>>,
// It's an Option so we can take the value before dropping the WorkerThread.
join_handle: Option<JoinHandle<Result<(), AnyError>>>,
worker_handle: WebWorkerHandle,

// A WorkerThread that hasn't been explicitly terminated can only be removed
Expand All @@ -75,6 +76,34 @@ pub struct WorkerThread {
message_closed: bool,
}

impl WorkerThread {
fn terminate(mut self) {
self.worker_handle.clone().terminate();
self
.join_handle
.take()
.unwrap()
.join()
.expect("Worker thread panicked")
.expect("Panic in worker event loop");

// Optimization so the Drop impl doesn't try to terminate the worker handle
// again.
self.ctrl_closed = true;
self.message_closed = true;
}
}

impl Drop for WorkerThread {
fn drop(&mut self) {
// If either of the channels is closed, the worker thread has at least
// started closing, and its event loop won't start another run.
if !(self.ctrl_closed || self.message_closed) {
self.worker_handle.clone().terminate();
}
}
}

pub type WorkersTable = HashMap<WorkerId, WorkerThread>;

pub fn init(create_web_worker_cb: Arc<CreateWebWorkerCb>) -> Extension {
Expand Down Expand Up @@ -557,7 +586,7 @@ fn op_create_worker(
let worker_handle = handle_receiver.recv().unwrap()?;

let worker_thread = WorkerThread {
join_handle,
join_handle: Some(join_handle),
worker_handle: worker_handle.into(),
ctrl_closed: false,
message_closed: false,
Expand All @@ -578,12 +607,7 @@ fn op_host_terminate_worker(
_: (),
) -> Result<(), AnyError> {
if let Some(worker_thread) = state.borrow_mut::<WorkersTable>().remove(&id) {
worker_thread.worker_handle.terminate();
worker_thread
.join_handle
.join()
.expect("Panic in worker thread")
.expect("Panic in worker event loop");
worker_thread.terminate();
} else {
debug!("tried to terminate non-existent worker {}", id);
}
Expand Down Expand Up @@ -625,13 +649,7 @@ fn close_channel(
};

if terminate {
let worker_thread = entry.remove();
worker_thread.worker_handle.terminate();
worker_thread
.join_handle
.join()
.expect("Worker thread panicked")
.expect("Panic in worker event loop");
entry.remove().terminate();
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions runtime/web_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,9 @@ impl WebWorkerHandle {
/// Terminate the worker
/// This function will set terminated to true, terminate the isolate and close the message channel
pub fn terminate(self) {
// This function can be called multiple times by whomever holds
// the handle. However only a single "termination" should occur so
// we need a guard here.
// A WebWorkerHandle can be terminated / dropped after `self.close()` has
// been called inside the worker, but only a single "termination" can occur,
// so we need a guard here.
let already_terminated = self.terminated.swap(true, Ordering::SeqCst);

if !already_terminated {
Expand Down

0 comments on commit 5c5f4ea

Please sign in to comment.