Skip to content

Commit

Permalink
Fix PidfdReaper::drop: do not spawn any task there
Browse files Browse the repository at this point in the history
To avoid potential headaches such as:

 - The drop being run outside of a Tokio runtime context
 - The runtime being present but not actually running

Since this is the initial implementation, I chose to use the
`OrphanQueue` also used by the signal-driven `Reaper` impl.

It's simple and easy to implement without having to add runtime driver
and guaranteed to work.

Further improvement can be done on the basis of this PR.

Signed-off-by: Jiahao XU <[email protected]>
  • Loading branch information
NobodyXu committed Dec 31, 2023
1 parent c38faca commit 3b89981
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 43 deletions.
5 changes: 0 additions & 5 deletions tokio/src/io/poll_evented.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,6 @@ impl<E: Source> PollEvented<E> {
.map_err(io::Error::from)
.map_ok(|_| ())
}

#[cfg(all(feature = "process", target_os = "linux"))]
pub(crate) fn scheduler_handle(&self) -> &scheduler::Handle {
self.registration.scheduler_handle()
}
}

feature! {
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/process/unix/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl OrphanQueue<StdChild> for GlobalOrphanQueue {
pub(crate) enum Child {
SignalReaper(Reaper<StdChild, GlobalOrphanQueue, Signal>),
#[cfg(target_os = "linux")]
PidfdReaper(pidfd_reaper::PidfdReaper<StdChild>),
PidfdReaper(pidfd_reaper::PidfdReaper<StdChild, GlobalOrphanQueue>),
}

impl fmt::Debug for Child {
Expand All @@ -122,7 +122,7 @@ pub(crate) fn spawn_child(cmd: &mut std::process::Command) -> io::Result<Spawned
let stderr = child.stderr.take().map(stdio).transpose()?;

#[cfg(target_os = "linux")]
match pidfd_reaper::PidfdReaper::new(child) {
match pidfd_reaper::PidfdReaper::new(child, GlobalOrphanQueue) {
Ok(pidfd_reaper) => {
return Ok(SpawnedChild {
child: Child::PidfdReaper(pidfd_reaper),
Expand Down
89 changes: 58 additions & 31 deletions tokio/src/process/unix/pidfd_reaper.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use crate::{
io::{interest::Interest, PollEvented},
process::{imp::orphan::Wait, kill::Kill},
runtime::Handle,
process::{
imp::{orphan::Wait, OrphanQueue},
kill::Kill,
},
};

use libc::{syscall, SYS_pidfd_open, __errno_location, ENOSYS, PIDFD_NONBLOCK};
Expand Down Expand Up @@ -109,27 +111,39 @@ where
}

#[derive(Debug)]
pub(crate) struct PidfdReaper<W: Wait + Send + Sync + Unpin + 'static>(Option<PidfdReaperInner<W>>);
pub(crate) struct PidfdReaper<W, Q>
where
W: Wait + Unpin,
Q: OrphanQueue<W> + Unpin,
{
inner: Option<PidfdReaperInner<W>>,
orphan_queue: Q,
}

impl<W> Deref for PidfdReaper<W>
impl<W, Q> Deref for PidfdReaper<W, Q>
where
W: Wait + Send + Sync + Unpin + 'static,
W: Wait + Unpin,
Q: OrphanQueue<W> + Unpin,
{
type Target = W;

fn deref(&self) -> &Self::Target {
&self.0.as_ref().expect("inner has gone away").inner
&self.inner.as_ref().expect("inner has gone away").inner
}
}

impl<W> PidfdReaper<W>
impl<W, Q> PidfdReaper<W, Q>
where
W: Wait + Send + Sync + Unpin + 'static,
W: Wait + Unpin,
Q: OrphanQueue<W> + Unpin,
{
pub(crate) fn new(inner: W) -> Result<Self, (Option<io::Error>, W)> {
pub(crate) fn new(inner: W, orphan_queue: Q) -> Result<Self, (Option<io::Error>, W)> {
if let Some(pidfd) = Pidfd::open(inner.id()) {
match PollEvented::new_with_interest(pidfd, Interest::READABLE) {
Ok(pidfd) => Ok(Self(Some(PidfdReaperInner { pidfd, inner }))),
Ok(pidfd) => Ok(Self {
inner: Some(PidfdReaperInner { pidfd, inner }),
orphan_queue,
}),
Err(io_error) => Err((Some(io_error), inner)),
}
} else {
Expand All @@ -138,59 +152,60 @@ where
}

pub(crate) fn inner_mut(&mut self) -> &mut W {
&mut self.0.as_mut().expect("inner has gone away").inner
&mut self.inner.as_mut().expect("inner has gone away").inner
}
}

impl<W> Future for PidfdReaper<W>
impl<W, Q> Future for PidfdReaper<W, Q>
where
W: Wait + Send + Sync + Unpin + 'static,
W: Wait + Unpin,
Q: OrphanQueue<W> + Unpin,
{
type Output = io::Result<ExitStatus>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(
Pin::into_inner(self)
.0
.inner
.as_mut()
.expect("inner has gone away"),
)
.poll(cx)
}
}

impl<W> Kill for PidfdReaper<W>
impl<W, Q> Kill for PidfdReaper<W, Q>
where
W: Wait + Send + Sync + Unpin + Kill + 'static,
W: Wait + Unpin + Kill,
Q: OrphanQueue<W> + Unpin,
{
fn kill(&mut self) -> io::Result<()> {
self.inner_mut().kill()
}
}

impl<W> Drop for PidfdReaper<W>
impl<W, Q> Drop for PidfdReaper<W, Q>
where
W: Wait + Send + Sync + Unpin + 'static,
W: Wait + Unpin,
Q: OrphanQueue<W> + Unpin,
{
fn drop(&mut self) {
let mut reaper_inner = self.0.take().expect("inner has gone away");
if let Ok(Some(_)) = reaper_inner.inner.try_wait() {
let mut orphan = self.inner.take().expect("inner has gone away").inner;
if let Ok(Some(_)) = orphan.try_wait() {
return;
}

Handle {
inner: reaper_inner.pidfd.scheduler_handle().clone(),
}
.spawn(async move {
let _ = reaper_inner.await;
});
self.orphan_queue.push_orphan(orphan);
}
}

#[cfg(all(test, not(loom), not(miri)))]
mod test {
use super::*;
use crate::runtime::{Builder as RuntimeBuilder, Runtime};
use crate::{
process::unix::orphan::test::MockQueue,
runtime::{Builder as RuntimeBuilder, Runtime},
};
use std::process::{Command, Output};

fn create_runtime() -> Runtime {
Expand Down Expand Up @@ -223,13 +238,17 @@ mod test {
return;
}

let queue = MockQueue::new();

run_test(async {
let child = Command::new("true").spawn().unwrap();
let pidfd_reaper = PidfdReaper::new(child).unwrap();
let pidfd_reaper = PidfdReaper::new(child, &queue).unwrap();

let exit_status = pidfd_reaper.await.unwrap();
assert!(exit_status.success());
});

assert!(queue.all_enqueued.borrow().is_empty());
}

#[test]
Expand All @@ -239,15 +258,19 @@ mod test {
return;
}

let queue = MockQueue::new();

run_test(async {
let child = Command::new("sleep").arg("1800").spawn().unwrap();
let mut pidfd_reaper = PidfdReaper::new(child).unwrap();
let mut pidfd_reaper = PidfdReaper::new(child, &queue).unwrap();

pidfd_reaper.kill().unwrap();

let exit_status = pidfd_reaper.await.unwrap();
assert!(!exit_status.success());
});

assert!(queue.all_enqueued.borrow().is_empty());
}

#[test]
Expand All @@ -257,9 +280,13 @@ mod test {
return;
}

let queue = MockQueue::new();

run_test(async {
let child = Command::new("true").spawn().unwrap();
let _pidfd_reaper = PidfdReaper::new(child).unwrap();
let child = Command::new("sleep").arg("1800").spawn().unwrap();
let _pidfd_reaper = PidfdReaper::new(child, &queue).unwrap();
});

assert_eq!(queue.all_enqueued.borrow().len(), 1);
}
}
5 changes: 0 additions & 5 deletions tokio/src/runtime/io/registration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,11 +236,6 @@ impl Registration {
fn handle(&self) -> &Handle {
self.handle.driver().io()
}

#[cfg(all(feature = "process", target_os = "linux"))]
pub(crate) fn scheduler_handle(&self) -> &scheduler::Handle {
&self.handle
}
}

impl Drop for Registration {
Expand Down

0 comments on commit 3b89981

Please sign in to comment.