Skip to content

Commit

Permalink
Merge pull request #7 from elliptic-kitty/main
Browse files Browse the repository at this point in the history
upgrade nix -> 0.27.1, address kqueue EINTR issue
  • Loading branch information
khng300 committed Oct 10, 2023
2 parents ee68c2f + 958394a commit d79ad05
Show file tree
Hide file tree
Showing 20 changed files with 162 additions and 193 deletions.
2 changes: 1 addition & 1 deletion freebsd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ tokio = ["dep:tokio"]
command-macros = "0.2.9"
ipcidr = { path = "../ipcidr" }
jail = "*"
nix = { version = "0.25.0", features = ["term", "process", "event"] }
nix = { version = "0.27.1", features = ["term", "process", "event", "fs", "signal", "socket", "user"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
thiserror = "1"
Expand Down
56 changes: 52 additions & 4 deletions freebsd/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
// OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
// SUCH DAMAGE.

use nix::errno::Errno;
use nix::libc::{close, intptr_t, EFD_NONBLOCK};
use nix::sys::event::{EventFilter, EventFlag, FilterFlag, KEvent};
use nix::unistd::dup;
Expand Down Expand Up @@ -105,16 +106,17 @@ impl EventFdNotify {

pub fn notified_sync(&self) {
let kevent = KEvent::from_read(self.fd);
let kq = nix::sys::event::kqueue().unwrap();
let kq = nix::sys::event::Kqueue::new().unwrap();
let out = KEvent::zero();
_ = nix::sys::event::kevent_ts(kq, &[kevent], &mut [out], None);
kq.wait_events(&[kevent], &mut [out]);
}

pub fn notified_sync_take_value(&self) -> std::io::Result<u64> {
let kevent = KEvent::from_read(self.fd);
let kq = nix::sys::event::kqueue().unwrap();
let kq = nix::sys::event::Kqueue::new().unwrap();
let out = KEvent::zero();
_ = nix::sys::event::kevent_ts(kq, &[kevent], &mut [out], None);
kq.wait_events(&[kevent], &mut [out]);

unsafe {
let mut v = 0u64;
if eventfd_read(self.fd, &mut v) != 0 {
Expand Down Expand Up @@ -149,6 +151,52 @@ impl std::future::Future for EventFdNotified<'_> {
}
}

pub trait KqueueExt {
fn wait_events(&self, changelist: &[KEvent], eventlist: &mut [KEvent]) -> nix::Result<usize>;
}

pub fn kevent_classic(
kq: i32,
changelist: &[KEvent],
eventlist: &mut [KEvent],
) -> nix::Result<usize> {
loop {
let res = unsafe {
nix::libc::kevent(
kq,
changelist.as_ptr() as *const nix::libc::kevent,
changelist.len() as i32,
eventlist.as_mut_ptr() as *mut nix::libc::kevent,
eventlist.len() as i32,
std::ptr::null(),
)
};

let errno = nix::errno::errno();

match res {
-1 => {
if errno != nix::libc::EINTR {
break Err(nix::errno::Errno::from_i32(errno))
}
}
size => break Ok(size as usize),
}
}
}

impl KqueueExt for nix::sys::event::Kqueue {
fn wait_events(&self, changelist: &[KEvent], eventlist: &mut [KEvent]) -> nix::Result<usize> {
loop {
match self.kevent(changelist, eventlist, None) {
Ok(size) => break Ok(size),
Err(errno) if errno != Errno::EINTR => break Err(errno),
_ => continue
}
}
}
}

pub trait KEventExt {
fn zero() -> KEvent {
unsafe { std::mem::zeroed() }
Expand Down
11 changes: 6 additions & 5 deletions freebsd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use nix::pty::OpenptyResult;
use nix::sys::stat::Mode;
use nix::unistd::{chdir, close, dup2, setgid, setsid, setuid};
use serde::Deserialize;
use std::os::fd::AsRawFd;
use std::os::raw::{c_int, c_uint};
use std::os::unix::process::CommandExt;
use std::path::Path;
Expand Down Expand Up @@ -398,16 +399,16 @@ impl FreeBSDTokioCommandExt for tokio::process::Command {
}

fn pty(&mut self, pty: &OpenptyResult) -> &mut tokio::process::Command {
let primary = pty.master;
let replica = pty.slave;
let primary = pty.master.as_raw_fd();
let replica = pty.slave.as_raw_fd();
unsafe {
self.pre_exec(move || {
// detach from the controlling terminal
if let Ok(fd) = open("/dev/tty", OFlag::O_RDWR, Mode::empty()) {
ioctl(fd, TIOCNOTTY);
}
setsid().expect("Cannot setsid");
if ioctl(replica, TIOCSCTTY) == -1 {
if ioctl(replica.as_raw_fd(), TIOCSCTTY) == -1 {
Err(std::io::Error::last_os_error())?;
}
close(primary)?;
Expand Down Expand Up @@ -455,8 +456,8 @@ impl FreeBSDCommandExt for std::process::Command {
}

fn pty(&mut self, pty: &OpenptyResult) -> &mut Command {
let primary = pty.master;
let replica = pty.slave;
let primary = pty.master.as_raw_fd();
let replica = pty.slave.as_raw_fd();
unsafe {
self.pre_exec(move || {
// detach from the controlling terminal
Expand Down
8 changes: 4 additions & 4 deletions freebsd/src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub mod ifconfig;
pub mod pf;

use nix::sys::socket::{getsockopt, XuCred};
use std::os::fd::AsRawFd;
use std::os::fd::AsFd;
use std::os::unix::net::UnixStream;

#[derive(Debug, Clone)]
Expand All @@ -35,8 +35,8 @@ pub struct UnixCredential {
}

impl UnixCredential {
pub fn from_socket(fd: &impl AsRawFd) -> Result<UnixCredential, std::io::Error> {
let cred = getsockopt(fd.as_raw_fd(), nix::sys::socket::sockopt::LocalPeerCred)?;
pub fn from_socket(fd: &impl AsFd) -> Result<UnixCredential, std::io::Error> {
let cred = getsockopt(fd, nix::sys::socket::sockopt::LocalPeerCred)?;
Ok(UnixCredential {
uid: cred.uid(),
gids: cred.groups().to_vec(),
Expand All @@ -50,7 +50,7 @@ pub trait UnixStreamExt {

impl UnixStreamExt for UnixStream {
fn xucred(&self) -> Result<XuCred, std::io::Error> {
let cred = getsockopt(self.as_raw_fd(), nix::sys::socket::sockopt::LocalPeerCred)?;
let cred = getsockopt(self, nix::sys::socket::sockopt::LocalPeerCred)?;
Ok(cred)
}
}
37 changes: 4 additions & 33 deletions freebsd/src/procdesc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,10 @@
// OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
// SUCH DAMAGE.

use crate::event::KEventExt;
use crate::event::{KEventExt, KqueueExt};

use nix::poll::{poll, PollFlags};
use nix::sys::event::{kevent_ts, kqueue, KEvent};
use std::future::Future;
use nix::sys::event::{KEvent, Kqueue};
use std::os::unix::io::AsRawFd;
use std::pin::Pin;
use std::task::{Context, Poll};

pub struct ProcFd(i32);

Expand All @@ -40,10 +36,10 @@ extern "C" {
}

pub fn pdwait(fd: i32) -> nix::Result<()> {
let kq = kqueue()?;
let kq = Kqueue::new()?;
let change_list = vec![KEvent::from_wait_pfd(fd)];
let mut event_list = vec![KEvent::zero()];
kevent_ts(kq, &change_list, &mut event_list, None)?;
kq.wait_events(&change_list, &mut event_list)?;
Ok(())
}

Expand All @@ -53,31 +49,6 @@ impl AsRawFd for ProcFd {
}
}

pub struct PollProcFd {
fd: i32,
}

impl Future for PollProcFd {
type Output = nix::Result<()>;
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut pollfd = [nix::poll::PollFd::new(self.fd, PollFlags::POLLHUP)];
if poll(&mut pollfd, 0)? > 0 {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
}
}

impl ProcFd {
pub async fn exited(&self) -> nix::Result<()> {
PollProcFd { fd: self.0 }.await
}
pub fn close(self) -> nix::Result<()> {
nix::unistd::close(self.0)
}
}

pub enum PdForkResult {
Parent { child: ProcFd, pid: i32 },
Child,
Expand Down
1 change: 0 additions & 1 deletion pty_process/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,4 @@ edition = "2021"
[dependencies]
freebsd = { path = "../freebsd" }
futures = "0.3.25"
nix = { version = "0.25.0", features = ["term"] }
tokio = { version = "^1.21", features = ["net", "process", "macros", "sync"] }
34 changes: 20 additions & 14 deletions pty_process/src/kqueue_forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@
// OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
// SUCH DAMAGE.
use crate::buffer::Buffer;
use crate::PtyCommandExt;
use freebsd::event::KEventExt;
use nix::pty::openpty;
use nix::sys::event::{kevent_ts, kqueue, EventFilter, EventFlag, KEvent};
use freebsd::event::{KEventExt, KqueueExt};
use freebsd::nix::pty::openpty;
use freebsd::nix::sys::event::{Kqueue, EventFilter, EventFlag, KEvent};
use freebsd::FreeBSDCommandExt;
use std::io::Write;
use std::os::fd::OwnedFd;
use std::os::unix::io::AsRawFd;
//use std::os::fd::AsRawFd;
use std::os::unix::net::{UnixListener, UnixStream};
Expand All @@ -52,7 +53,7 @@ pub struct PtyForwarder<W: Write + Send + Sync> {
child: std::process::Child,
listener: UnixListener,
clients: Vec<Client>,
pty: i32,
pty: OwnedFd,
ingress: Vec<u8>,
egress: Buffer<1048576>,
output_log: W,
Expand All @@ -66,6 +67,7 @@ impl<W: Write + Send + Sync> PtyForwarder<W> {
) -> std::io::Result<PtyForwarder<W>> {
let pty_result = openpty(None, None)?;
let child = command.pty(&pty_result).spawn()?;

Ok(PtyForwarder {
child,
listener,
Expand All @@ -81,9 +83,13 @@ impl<W: Write + Send + Sync> PtyForwarder<W> {
self.child.id()
}

pub fn pty(&self) -> i32 {
self.pty.as_raw_fd()
}

pub fn spawn(mut self) -> Result<std::process::ExitStatus, std::io::Error> {
let mut buf = [0u8; 512];
let kq = kqueue()?;
let kq = Kqueue::new()?;

let listener_fd = self.listener.as_raw_fd();

Expand All @@ -93,11 +99,11 @@ impl<W: Write + Send + Sync> PtyForwarder<W> {
let mut event_list = [KEvent::zero(); 8];

change_list.push(KEvent::from_read(listener_fd));
change_list.push(KEvent::from_read(self.pty));
change_list.push(KEvent::from_read(self.pty()));
change_list.push(KEvent::from_wait_pid(self.child.id()));

'kqloop: loop {
let n_events = kevent_ts(kq, &change_list, &mut event_list, None)?;
let n_events = kq.wait_events(&change_list, &mut event_list)?;
let event_list = &event_list[..n_events];

change_list.clear();
Expand All @@ -117,7 +123,7 @@ impl<W: Write + Send + Sync> PtyForwarder<W> {
continue;
}

if evfilt == EventFilter::EVFILT_READ && event.ident() as i32 == self.pty {
if evfilt == EventFilter::EVFILT_READ && event.ident() as i32 == self.pty() {
let mut bytes_available = event.data() as usize;

// if the pty is closed, there are never going to be anything for clients to
Expand All @@ -129,16 +135,16 @@ impl<W: Write + Send + Sync> PtyForwarder<W> {
while bytes_available > 0 {
let read_len = buf.len().min(bytes_available);
let buf = &mut buf[..read_len];
let read = nix::unistd::read(self.pty, buf)?;
let read = freebsd::nix::unistd::read(self.pty(), buf)?;
_ = self.output_log.write_all(buf);
self.egress.append_from_slice(buf);
bytes_available -= read;
}
continue;
}

if evfilt == EventFilter::EVFILT_WRITE && event.ident() as i32 == self.pty {
let written = nix::unistd::write(self.pty, &self.ingress).unwrap();
if evfilt == EventFilter::EVFILT_WRITE && event.ident() as i32 == self.pty() {
let written = freebsd::nix::unistd::write(self.pty(), &self.ingress).unwrap();
self.ingress.drain(..written);
continue;
}
Expand All @@ -162,7 +168,7 @@ impl<W: Write + Send + Sync> PtyForwarder<W> {
while bytes_available > 0 {
let read_len = buf.len().min(bytes_available);
let buf = &mut buf[..read_len];
let read = nix::unistd::read(client.fd, buf)?;
let read = freebsd::nix::unistd::read(client.fd, buf)?;
self.ingress.extend_from_slice(&buf[..read]);
bytes_available -= read_len;
}
Expand All @@ -189,7 +195,7 @@ impl<W: Write + Send + Sync> PtyForwarder<W> {
}
// if there are client written into ingress, mark pty to handle the input
if !self.ingress.is_empty() {
change_list.push(KEvent::from_write(self.pty).set_flags(EventFlag::EV_ONESHOT));
change_list.push(KEvent::from_write(self.pty()).set_flags(EventFlag::EV_ONESHOT));
}

// even the worst case is O(rn^2), it is probably faster with Vec than Hmap as we are
Expand Down
58 changes: 0 additions & 58 deletions pty_process/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,61 +23,3 @@
// SUCH DAMAGE.
pub mod buffer;
pub mod kqueue_forwarder;

use nix::fcntl::{open, OFlag};
use nix::libc::{ioctl, TIOCNOTTY, TIOCSCTTY};
use nix::pty::OpenptyResult;
use nix::sys::stat::Mode;
use nix::unistd::{close, dup2, setsid};
use std::os::unix::process::CommandExt;
use std::process::Command;
use tokio::process::Command as TokioCommand;

pub trait PtyCommandExt {
fn pty(&mut self, pty: &OpenptyResult) -> &mut Command;
}
pub trait TokioPtyCommandExt {
fn pty(&mut self, pty: &OpenptyResult) -> &mut TokioCommand;
}

macro_rules! pty_impl {
($self:expr, $pty:expr) => {{
let primary = $pty.master;
let replica = $pty.slave;
unsafe {
$self.pre_exec(move || {
if let Ok(fd) = open("/dev/tty", OFlag::O_RDWR, Mode::empty()) {
ioctl(fd, TIOCNOTTY);
}
setsid().expect("setsid");
if ioctl(replica, TIOCSCTTY) == -1 {
Err(std::io::Error::last_os_error())?;
}
close(primary)?;
dup2(replica, 0)?;
dup2(replica, 1)?;
dup2(replica, 2)?;
close(replica)?;
Ok(())
});
/*
$self.stdin(std::process::Stdio::from_raw_fd(replica));
$self.stdout(std::process::Stdio::from_raw_fd(replica));
$self.stderr(std::process::Stdio::from_raw_fd(replica));
*/
}
$self
}};
}

impl TokioPtyCommandExt for TokioCommand {
fn pty(&mut self, pty: &OpenptyResult) -> &mut TokioCommand {
pty_impl!(self, pty)
}
}

impl PtyCommandExt for Command {
fn pty(&mut self, pty: &OpenptyResult) -> &mut Command {
pty_impl!(self, pty)
}
}
Loading

0 comments on commit d79ad05

Please sign in to comment.