Skip to content

Commit

Permalink
fix: attempt to only allow one deno process to update the node_module…
Browse files Browse the repository at this point in the history
…s folder at a time (denoland#18058)

This is implemented in such a way that it should still allow processes
to go through when a file lock wasn't properly cleaned up and the OS
hasn't released it yet (but with a 200ms-ish delay).

Closes denoland#18039
  • Loading branch information
dsherret committed Mar 8, 2023
1 parent 72fe9bb commit 88b5fd9
Show file tree
Hide file tree
Showing 8 changed files with 301 additions and 7 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ data-url = "=0.2.0"
dlopen = "0.1.8"
encoding_rs = "=0.8.31"
flate2 = "=1.0.24"
fs3 = "0.5.0"
futures = "0.3.21"
http = "=0.2.8"
hyper = "0.14.18"
Expand Down
1 change: 1 addition & 0 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ env_logger = "=0.9.0"
eszip = "=0.37.0"
fancy-regex = "=0.10.0"
flate2.workspace = true
fs3.workspace = true
http.workspace = true
import_map = "=0.15.0"
indexmap.workspace = true
Expand Down
10 changes: 10 additions & 0 deletions cli/npm/resolvers/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::path::Path;
use std::path::PathBuf;

use crate::util::fs::symlink_dir;
use crate::util::fs::LaxSingleProcessFsFlag;
use async_trait::async_trait;
use deno_ast::ModuleSpecifier;
use deno_core::anyhow::bail;
Expand Down Expand Up @@ -236,6 +237,13 @@ async fn sync_resolution_with_fs(
format!("Creating '{}'", deno_local_registry_dir.display())
})?;

let single_process_lock = LaxSingleProcessFsFlag::lock(
deno_local_registry_dir.join(".deno.lock"),
// similar message used by cargo build
"waiting for file lock on node_modules directory",
)
.await;

// 1. Write all the packages out the .deno directory.
//
// Copy (hardlink in future) <global_registry_cache>/<package_id>/ to
Expand Down Expand Up @@ -394,6 +402,8 @@ async fn sync_resolution_with_fs(
}
}

drop(single_process_lock);

Ok(())
}

Expand Down
246 changes: 246 additions & 0 deletions cli/util/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@ use std::io::ErrorKind;
use std::io::Write;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use walkdir::WalkDir;

use crate::args::FilesConfig;
use crate::util::progress_bar::ProgressBar;
use crate::util::progress_bar::ProgressBarStyle;
use crate::util::progress_bar::ProgressMessagePrompt;

use super::path::specifier_to_file_path;

Expand Down Expand Up @@ -471,11 +475,167 @@ pub fn dir_size(path: &Path) -> std::io::Result<u64> {
Ok(total)
}

struct LaxSingleProcessFsFlagInner {
file_path: PathBuf,
fs_file: std::fs::File,
finished_token: Arc<tokio_util::sync::CancellationToken>,
}

impl Drop for LaxSingleProcessFsFlagInner {
fn drop(&mut self) {
use fs3::FileExt;
// kill the poll thread
self.finished_token.cancel();
// release the file lock
if let Err(err) = self.fs_file.unlock() {
log::debug!(
"Failed releasing lock for {}. {:#}",
self.file_path.display(),
err
);
}
}
}

/// A file system based flag that will attempt to synchronize multiple
/// processes so they go one after the other. In scenarios where
/// synchronization cannot be achieved, it will allow the current process
/// to proceed.
///
/// This should only be used in places where it's ideal for multiple
/// processes to not update something on the file system at the same time,
/// but it's not that big of a deal.
pub struct LaxSingleProcessFsFlag(Option<LaxSingleProcessFsFlagInner>);

impl LaxSingleProcessFsFlag {
pub async fn lock(file_path: PathBuf, long_wait_message: &str) -> Self {
log::debug!("Acquiring file lock at {}", file_path.display());
use fs3::FileExt;
let last_updated_path = file_path.with_extension("lock.poll");
let start_instant = std::time::Instant::now();
let open_result = std::fs::OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(&file_path);

match open_result {
Ok(fs_file) => {
let mut pb_update_guard = None;
let mut error_count = 0;
while error_count < 10 {
let lock_result = fs_file.try_lock_exclusive();
let poll_file_update_ms = 100;
match lock_result {
Ok(_) => {
log::debug!("Acquired file lock at {}", file_path.display());
let _ignore = std::fs::write(&last_updated_path, "");
let token = Arc::new(tokio_util::sync::CancellationToken::new());

// Spawn a blocking task that will continually update a file
// signalling the lock is alive. This is a fail safe for when
// a file lock is never released. For example, on some operating
// systems, if a process does not release the lock (say it's
// killed), then the OS may release it at an indeterminate time
//
// This uses a blocking task because we use a single threaded
// runtime and this is time sensitive so we don't want it to update
// at the whims of of whatever is occurring on the runtime thread.
tokio::task::spawn_blocking({
let token = token.clone();
let last_updated_path = last_updated_path.clone();
move || {
let mut i = 0;
while !token.is_cancelled() {
i += 1;
let _ignore =
std::fs::write(&last_updated_path, i.to_string());
std::thread::sleep(Duration::from_millis(
poll_file_update_ms,
));
}
}
});

return Self(Some(LaxSingleProcessFsFlagInner {
file_path,
fs_file,
finished_token: token,
}));
}
Err(_) => {
// show a message if it's been a while
if pb_update_guard.is_none()
&& start_instant.elapsed().as_millis() > 1_000
{
let pb = ProgressBar::new(ProgressBarStyle::TextOnly);
let guard = pb.update_with_prompt(
ProgressMessagePrompt::Blocking,
long_wait_message,
);
pb_update_guard = Some((guard, pb));
}

// sleep for a little bit
tokio::time::sleep(Duration::from_millis(20)).await;

// Poll the last updated path to check if it's stopped updating,
// which is an indication that the file lock is claimed, but
// was never properly released.
match std::fs::metadata(&last_updated_path)
.and_then(|p| p.modified())
{
Ok(last_updated_time) => {
let current_time = std::time::SystemTime::now();
match current_time.duration_since(last_updated_time) {
Ok(duration) => {
if duration.as_millis()
> (poll_file_update_ms * 2) as u128
{
// the other process hasn't updated this file in a long time
// so maybe it was killed and the operating system hasn't
// released the file lock yet
return Self(None);
} else {
error_count = 0; // reset
}
}
Err(_) => {
error_count += 1;
}
}
}
Err(_) => {
error_count += 1;
}
}
}
}
}

drop(pb_update_guard); // explicit for clarity
Self(None)
}
Err(err) => {
log::debug!(
"Failed to open file lock at {}. {:#}",
file_path.display(),
err
);
Self(None) // let the process through
}
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use deno_core::futures;
use deno_core::parking_lot::Mutex;
use pretty_assertions::assert_eq;
use test_util::TempDir;
use tokio::sync::Notify;

#[test]
fn resolve_from_cwd_child() {
Expand Down Expand Up @@ -793,4 +953,90 @@ mod tests {
);
}
}

#[tokio::test]
async fn lax_fs_lock() {
let temp_dir = TempDir::new();
let lock_path = temp_dir.path().join("file.lock");
let signal1 = Arc::new(Notify::new());
let signal2 = Arc::new(Notify::new());
let signal3 = Arc::new(Notify::new());
let signal4 = Arc::new(Notify::new());
tokio::spawn({
let lock_path = lock_path.clone();
let signal1 = signal1.clone();
let signal2 = signal2.clone();
let signal3 = signal3.clone();
let signal4 = signal4.clone();
let temp_dir = temp_dir.clone();
async move {
let flag =
LaxSingleProcessFsFlag::lock(lock_path.clone(), "waiting").await;
signal1.notify_one();
signal2.notified().await;
tokio::time::sleep(Duration::from_millis(10)).await; // give the other thread time to acquire the lock
temp_dir.write("file.txt", "update1");
signal3.notify_one();
signal4.notified().await;
drop(flag);
}
});
let signal5 = Arc::new(Notify::new());
tokio::spawn({
let temp_dir = temp_dir.clone();
let signal5 = signal5.clone();
async move {
signal1.notified().await;
signal2.notify_one();
let flag = LaxSingleProcessFsFlag::lock(lock_path, "waiting").await;
temp_dir.write("file.txt", "update2");
signal5.notify_one();
drop(flag);
}
});

signal3.notified().await;
assert_eq!(temp_dir.read_to_string("file.txt"), "update1");
signal4.notify_one();
signal5.notified().await;
assert_eq!(temp_dir.read_to_string("file.txt"), "update2");
}

#[tokio::test]
async fn lax_fs_lock_ordered() {
let temp_dir = TempDir::new();
let lock_path = temp_dir.path().join("file.lock");
let output_path = temp_dir.path().join("output");
let expected_order = Arc::new(Mutex::new(Vec::new()));
let count = 10;
let mut tasks = Vec::with_capacity(count);

std::fs::write(&output_path, "").unwrap();

for i in 0..count {
let lock_path = lock_path.clone();
let output_path = output_path.clone();
let expected_order = expected_order.clone();
tasks.push(tokio::spawn(async move {
let flag =
LaxSingleProcessFsFlag::lock(lock_path.clone(), "waiting").await;
expected_order.lock().push(i.to_string());
// be extremely racy
let mut output = std::fs::read_to_string(&output_path).unwrap();
if !output.is_empty() {
output.push('\n');
}
output.push_str(&i.to_string());
std::fs::write(&output_path, output).unwrap();
drop(flag);
}));
}

futures::future::join_all(tasks).await;
let expected_output = expected_order.lock().join("\n");
assert_eq!(
std::fs::read_to_string(output_path).unwrap(),
expected_output
);
}
}
Loading

0 comments on commit 88b5fd9

Please sign in to comment.