Skip to content

Commit

Permalink
refactor: move TaskQueue from deno_runtime to deno_core (denoland#18016)
Browse files Browse the repository at this point in the history
This utility is useful in several contexts so it seems reasonable to
have it in `deno_core`.
  • Loading branch information
bartlomieju committed Mar 5, 2023
1 parent 19bb23b commit 4894e50
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 149 deletions.
2 changes: 2 additions & 0 deletions core/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod resources;
mod runtime;
pub mod snapshot_util;
mod source_map;
mod task_queue;

// Re-exports
pub use anyhow;
Expand Down Expand Up @@ -116,6 +117,7 @@ pub use crate::runtime::Snapshot;
pub use crate::runtime::V8_WRAPPER_OBJECT_INDEX;
pub use crate::runtime::V8_WRAPPER_TYPE_INDEX;
pub use crate::source_map::SourceMapGetter;
pub use crate::task_queue::TaskQueue;

pub fn v8_version() -> &'static str {
v8::V8::get_version()
Expand Down
148 changes: 148 additions & 0 deletions core/task_queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.

use futures::task::AtomicWaker;
use futures::Future;
use parking_lot::Mutex;
use std::collections::LinkedList;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;

#[derive(Default)]
struct TaskQueueTaskWaker {
is_ready: AtomicBool,
waker: AtomicWaker,
}

#[derive(Default)]
struct TaskQueueTasks {
is_running: bool,
wakers: LinkedList<Arc<TaskQueueTaskWaker>>,
}

/// A queue that executes tasks sequentially one after the other
/// ensuring order and that no task runs at the same time as another.
///
/// Note that tokio's semaphore doesn't seem to maintain order
/// and so we can't use that in the code that uses this or use
/// that here.
#[derive(Clone, Default)]
pub struct TaskQueue {
tasks: Arc<Mutex<TaskQueueTasks>>,
}

impl TaskQueue {
/// Alternate API that acquires a permit internally
/// for the duration of the future.
#[cfg(test)]
pub async fn queue<R>(&self, future: impl Future<Output = R>) -> R {
let _permit = self.acquire().await;
future.await
}

/// Acquires a permit where the tasks are executed one at a time
/// and in the order that they were acquired.
pub async fn acquire(&self) -> TaskQueuePermit {
let acquire = TaskQueuePermitAcquire::new(self.tasks.clone());
acquire.await;
TaskQueuePermit {
tasks: self.tasks.clone(),
}
}
}

/// A permit that when dropped will allow another task to proceed.
pub struct TaskQueuePermit {
tasks: Arc<Mutex<TaskQueueTasks>>,
}

impl Drop for TaskQueuePermit {
fn drop(&mut self) {
let next_item = {
let mut tasks = self.tasks.lock();
let next_item = tasks.wakers.pop_front();
tasks.is_running = next_item.is_some();
next_item
};
if let Some(next_item) = next_item {
next_item.is_ready.store(true, Ordering::SeqCst);
next_item.waker.wake();
}
}
}

struct TaskQueuePermitAcquire {
tasks: Arc<Mutex<TaskQueueTasks>>,
initialized: AtomicBool,
waker: Arc<TaskQueueTaskWaker>,
}

impl TaskQueuePermitAcquire {
pub fn new(tasks: Arc<Mutex<TaskQueueTasks>>) -> Self {
Self {
tasks,
initialized: Default::default(),
waker: Default::default(),
}
}
}

impl Future for TaskQueuePermitAcquire {
type Output = ();

fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
// update with the latest waker
self.waker.waker.register(cx.waker());

// ensure this is initialized
if !self.initialized.swap(true, Ordering::SeqCst) {
let mut tasks = self.tasks.lock();
if !tasks.is_running {
tasks.is_running = true;
return std::task::Poll::Ready(());
}
tasks.wakers.push_back(self.waker.clone());
return std::task::Poll::Pending;
}

// check if we're ready to run
if self.waker.is_ready.load(Ordering::SeqCst) {
std::task::Poll::Ready(())
} else {
std::task::Poll::Pending
}
}
}

#[cfg(test)]
mod tests {
use parking_lot::Mutex;
use std::sync::Arc;

use super::TaskQueue;

#[tokio::test]
async fn task_queue_runs_one_after_other() {
let task_queue = TaskQueue::default();
let mut tasks = Vec::new();
let data = Arc::new(Mutex::new(0));
for i in 0..100 {
let data = data.clone();
tasks.push(task_queue.queue(async move {
tokio::task::spawn_blocking(move || {
let mut data = data.lock();
if *data != i {
panic!("Value was not equal.");
}
*data = i + 1;
})
.await
.unwrap();
}));
}
futures::future::join_all(tasks).await;
}
}
3 changes: 1 addition & 2 deletions runtime/ops/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use deno_core::OpState;
use deno_core::RcRef;
use deno_core::Resource;
use deno_core::ResourceId;
use deno_core::TaskQueue;
use once_cell::sync::Lazy;
use std::borrow::Cow;
use std::cell::RefCell;
Expand Down Expand Up @@ -170,8 +171,6 @@ pub fn init_stdio(stdio: Stdio) -> Extension {
#[cfg(unix)]
use nix::sys::termios;

use super::utils::TaskQueue;

#[derive(Default)]
pub struct TtyMetadata {
#[cfg(unix)]
Expand Down
147 changes: 0 additions & 147 deletions runtime/ops/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,6 @@

use deno_core::error::custom_error;
use deno_core::error::AnyError;
use deno_core::futures::task::AtomicWaker;
use deno_core::futures::Future;
use deno_core::parking_lot::Mutex;
use std::collections::LinkedList;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;

/// A utility function to map OsStrings to Strings
pub fn into_string(s: std::ffi::OsString) -> Result<String, AnyError> {
Expand All @@ -17,143 +10,3 @@ pub fn into_string(s: std::ffi::OsString) -> Result<String, AnyError> {
custom_error("InvalidData", message)
})
}

#[derive(Default)]
struct TaskQueueTaskWaker {
is_ready: AtomicBool,
waker: AtomicWaker,
}

#[derive(Default)]
struct TaskQueueTasks {
is_running: bool,
wakers: LinkedList<Arc<TaskQueueTaskWaker>>,
}

/// A queue that executes tasks sequentially one after the other
/// ensuring order and that no task runs at the same time as another.
///
/// Note that tokio's semaphore doesn't seem to maintain order
/// and so we can't use that in the code that uses this or use
/// that here.
#[derive(Clone, Default)]
pub struct TaskQueue {
tasks: Arc<Mutex<TaskQueueTasks>>,
}

impl TaskQueue {
/// Alternate API that acquires a permit internally
/// for the duration of the future.
#[cfg(test)]
pub async fn queue<R>(&self, future: impl Future<Output = R>) -> R {
let _permit = self.acquire().await;
future.await
}

/// Acquires a permit where the tasks are executed one at a time
/// and in the order that they were acquired.
pub async fn acquire(&self) -> TaskQueuePermit {
let acquire = TaskQueuePermitAcquire::new(self.tasks.clone());
acquire.await;
TaskQueuePermit {
tasks: self.tasks.clone(),
}
}
}

/// A permit that when dropped will allow another task to proceed.
pub struct TaskQueuePermit {
tasks: Arc<Mutex<TaskQueueTasks>>,
}

impl Drop for TaskQueuePermit {
fn drop(&mut self) {
let next_item = {
let mut tasks = self.tasks.lock();
let next_item = tasks.wakers.pop_front();
tasks.is_running = next_item.is_some();
next_item
};
if let Some(next_item) = next_item {
next_item.is_ready.store(true, Ordering::SeqCst);
next_item.waker.wake();
}
}
}

struct TaskQueuePermitAcquire {
tasks: Arc<Mutex<TaskQueueTasks>>,
initialized: AtomicBool,
waker: Arc<TaskQueueTaskWaker>,
}

impl TaskQueuePermitAcquire {
pub fn new(tasks: Arc<Mutex<TaskQueueTasks>>) -> Self {
Self {
tasks,
initialized: Default::default(),
waker: Default::default(),
}
}
}

impl Future for TaskQueuePermitAcquire {
type Output = ();

fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
// update with the latest waker
self.waker.waker.register(cx.waker());

// ensure this is initialized
if !self.initialized.swap(true, Ordering::SeqCst) {
let mut tasks = self.tasks.lock();
if !tasks.is_running {
tasks.is_running = true;
return std::task::Poll::Ready(());
}
tasks.wakers.push_back(self.waker.clone());
return std::task::Poll::Pending;
}

// check if we're ready to run
if self.waker.is_ready.load(Ordering::SeqCst) {
std::task::Poll::Ready(())
} else {
std::task::Poll::Pending
}
}
}

#[cfg(test)]
mod tests {
use deno_core::futures;
use deno_core::parking_lot::Mutex;
use std::sync::Arc;

use super::TaskQueue;

#[tokio::test]
async fn task_queue_runs_one_after_other() {
let task_queue = TaskQueue::default();
let mut tasks = Vec::new();
let data = Arc::new(Mutex::new(0));
for i in 0..100 {
let data = data.clone();
tasks.push(task_queue.queue(async move {
tokio::task::spawn_blocking(move || {
let mut data = data.lock();
if *data != i {
panic!("Value was not equal.");
}
*data = i + 1;
})
.await
.unwrap();
}));
}
futures::future::join_all(tasks).await;
}
}

0 comments on commit 4894e50

Please sign in to comment.