Skip to content

Commit

Permalink
process: use pidfd on Linux when available (#6152)
Browse files Browse the repository at this point in the history
Signed-off-by: Jiahao XU <[email protected]>
  • Loading branch information
NobodyXu committed Jan 11, 2024
1 parent 8463af9 commit e4f9bcb
Show file tree
Hide file tree
Showing 4 changed files with 414 additions and 13 deletions.
19 changes: 19 additions & 0 deletions tokio/src/io/poll_evented.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,25 @@ impl<E: Source> PollEvented<E> {
self.registration.deregister(&mut inner)?;
Ok(inner)
}

#[cfg(all(feature = "process", target_os = "linux"))]
pub(crate) fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.registration
.poll_read_ready(cx)
.map_err(io::Error::from)
.map_ok(|_| ())
}

/// Re-register under new runtime with `interest`.
#[cfg(all(feature = "process", target_os = "linux"))]
pub(crate) fn reregister(&mut self, interest: Interest) -> io::Result<()> {
let io = self.io.as_mut().unwrap(); // As io shouldn't ever be None, just unwrap here.
let _ = self.registration.deregister(io);
self.registration =
Registration::new_with_interest_and_handle(io, interest, scheduler::Handle::current())?;

Ok(())
}
}

feature! {
Expand Down
57 changes: 44 additions & 13 deletions tokio/src/process/unix/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ use orphan::{OrphanQueue, OrphanQueueImpl, Wait};
mod reap;
use reap::Reaper;

#[cfg(all(target_os = "linux", feature = "rt"))]
mod pidfd_reaper;

use crate::io::{AsyncRead, AsyncWrite, PollEvented, ReadBuf};
use crate::process::kill::Kill;
use crate::process::SpawnedChild;
Expand Down Expand Up @@ -100,15 +103,15 @@ impl OrphanQueue<StdChild> for GlobalOrphanQueue {
}

#[must_use = "futures do nothing unless polled"]
pub(crate) struct Child {
inner: Reaper<StdChild, GlobalOrphanQueue, Signal>,
pub(crate) enum Child {
SignalReaper(Reaper<StdChild, GlobalOrphanQueue, Signal>),
#[cfg(all(target_os = "linux", feature = "rt"))]
PidfdReaper(pidfd_reaper::PidfdReaper<StdChild, GlobalOrphanQueue>),
}

impl fmt::Debug for Child {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Child")
.field("pid", &self.inner.id())
.finish()
fmt.debug_struct("Child").field("pid", &self.id()).finish()
}
}

Expand All @@ -118,12 +121,24 @@ pub(crate) fn spawn_child(cmd: &mut std::process::Command) -> io::Result<Spawned
let stdout = child.stdout.take().map(stdio).transpose()?;
let stderr = child.stderr.take().map(stdio).transpose()?;

#[cfg(all(target_os = "linux", feature = "rt"))]
match pidfd_reaper::PidfdReaper::new(child, GlobalOrphanQueue) {
Ok(pidfd_reaper) => {
return Ok(SpawnedChild {
child: Child::PidfdReaper(pidfd_reaper),
stdin,
stdout,
stderr,
})
}
Err((Some(err), _child)) => return Err(err),
Err((None, child_returned)) => child = child_returned,
}

let signal = signal(SignalKind::child())?;

Ok(SpawnedChild {
child: Child {
inner: Reaper::new(child, GlobalOrphanQueue, signal),
},
child: Child::SignalReaper(Reaper::new(child, GlobalOrphanQueue, signal)),
stdin,
stdout,
stderr,
Expand All @@ -132,25 +147,41 @@ pub(crate) fn spawn_child(cmd: &mut std::process::Command) -> io::Result<Spawned

impl Child {
pub(crate) fn id(&self) -> u32 {
self.inner.id()
match self {
Self::SignalReaper(signal_reaper) => signal_reaper.id(),
#[cfg(all(target_os = "linux", feature = "rt"))]
Self::PidfdReaper(pidfd_reaper) => pidfd_reaper.id(),
}
}

fn std_child(&mut self) -> &mut StdChild {
match self {
Self::SignalReaper(signal_reaper) => signal_reaper.inner_mut(),
#[cfg(all(target_os = "linux", feature = "rt"))]
Self::PidfdReaper(pidfd_reaper) => pidfd_reaper.inner_mut(),
}
}

pub(crate) fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> {
self.inner.inner_mut().try_wait()
self.std_child().try_wait()
}
}

impl Kill for Child {
fn kill(&mut self) -> io::Result<()> {
self.inner.kill()
self.std_child().kill()
}
}

impl Future for Child {
type Output = io::Result<ExitStatus>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.inner).poll(cx)
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::into_inner(self) {
Self::SignalReaper(signal_reaper) => Pin::new(signal_reaper).poll(cx),
#[cfg(all(target_os = "linux", feature = "rt"))]
Self::PidfdReaper(pidfd_reaper) => Pin::new(pidfd_reaper).poll(cx),
}
}
}

Expand Down
Loading

0 comments on commit e4f9bcb

Please sign in to comment.