From 56da15d11fe4c939f24a21562e2420ab45b5369b Mon Sep 17 00:00:00 2001 From: Peter Ammon Date: Mon, 23 Dec 2024 13:20:19 -0800 Subject: [PATCH] Rework the file descriptor monitor [Do NOT cherry-pick to 4.0 - this needs more time to be tested] fish sometimes needs to capture the output of a command or block of commands. Examples include fish_prompt or any command substitution ("cmdsubs"). It does this the obvious way: by creating a pipe, using dup2 to replace stdout of the command with the write end of the pipe, and then reading from the read end into a buffer, until EOF or the command substitution completes. Importantly, this task also overlaps with waiting for the process to exit; that is when executing: set var (some_cmd) fish needs to both wait on `some_cmd` and ALSO read its output into memory. This is awkward to do in a portable way in a single thread (though maybe doable on Linux with pidfd). So we wait and read on different threads. To make things worse, command substitutions may themselves create additional command substitutions (recursion, etc). Creating a read thread for every command substitution would result in excessive threads. So rather than a thread per cmdsub, we have a single dedicated thread that handles ALL command substitutions, by multiplexing multiple file descriptors via select/poll. This is the "fd monitor." You hand it a file descriptor and it lets you know when it's readable, and then you can read from it (via a callback). Also, it has a "wakeup" fd: if you write to that then the fd monitor wakes up, figures out what it has to do, and resumes. When the command substitution ends, we need to remove the fd from the fd monitor, because we intend to close it. You might object "the commands in the cmdsub have all completed so the write end of the pipe has been closed so the fd monitor can just notice that the pipe is closed" but it's not so: consider the horrible case of `set var (yes &)` and abandon all hope. The current mechanism for removing the fd from the monitor is called a "poke." We tell the fd monitor (through a "control" self-pipe) to explicitly wake up the item. It then invokes the callback ("pokes") the item on the dedicated fd monitor thread. The item notices that the command substitution is complete, and it returns a value meaning "remove me" and the fd monitor does so. The client thread is stuck waiting for this process to complete. So basically removing a fd from the monitor requires a round trip to its dedicated thread. This is slow and also complicated (Rust doesn't have futures)! So let's not do that. The big idea is to remove this round-trip synchronization. That is, when we intend to remove the fd from the fd monitor, we _just do it_ and then close the fd. Use a lock rather than a round-trip to the thread. Crucially that lock is unlocked while the monitor thread waits in select/poll. This invites all sorts of races: 1. fish might remove and close the fd right before the monitor polls it. It will thus attempt to poll a closed fd. 2. fish might remove and close the fd, and then something else opens a file and receives the same fd. Now the fd monitor will poll an fd that was never added. 3. fish might remove and close the fd _while the fd monitor is polling it_. What happens then? (Turns out on macOS we get EBADF, and on Linux the fd is marked readable). The Big Idea is that *all of these races are benign*. As long as poll/select doesn't crash or hang, we don't care *what* it returns, because the source of truth are the set of items stored in the fd monitor and these item IDs are never recycled. (This also assumes that it's OK to select/poll on random file descriptors; there ought to be no side effects). Not only is this a large simplification since we no longer need that round trip, it's a substantial performance improvement as well. The "aliases.fish" benchmark goes from 164 to 154 msec on my Mac, and from 124 to 112 msec on my Linux machine - nearly 10%. Add some tests to verify our assumptions about the behavior of closing or replacing a file descriptor during poll. But even if these fail, all we care about is that poll/select doesn't crash or hang. --- src/fd_monitor.rs | 153 +++++++++------------------------------- src/io.rs | 112 ++++++++--------------------- src/tests/fd_monitor.rs | 153 +++++++++++++++++++++++++++------------- 3 files changed, 168 insertions(+), 250 deletions(-) diff --git a/src/fd_monitor.rs b/src/fd_monitor.rs index 9a6f548aa..ab98b8b80 100644 --- a/src/fd_monitor.rs +++ b/src/fd_monitor.rs @@ -22,15 +22,6 @@ use crate::fds::{make_autoclose_pipes, make_fd_nonblocking}; #[cfg(HAVE_EVENTFD)] use libc::{EFD_CLOEXEC, EFD_NONBLOCK}; -/// Reason for waking an item -#[derive(PartialEq, Eq)] -pub enum ItemWakeReason { - /// The fd became readable (or was HUP'd) - Readable, - /// The item was "poked" (woken up explicitly) - Poke, -} - /// An event signaller implemented using a file descriptor, so it can plug into /// [`select()`](libc::select). /// @@ -145,27 +136,8 @@ impl FdEventSignaller { /// but guarantees that the next call to wait() will not block. /// Return true if readable, false if not readable, or not interrupted by a signal. pub fn poll(&self, wait: bool /* = false */) -> bool { - let mut timeout = libc::timeval { - tv_sec: 0, - tv_usec: 0, - }; - let mut fds: libc::fd_set = unsafe { std::mem::zeroed() }; - unsafe { libc::FD_ZERO(&mut fds) }; - unsafe { libc::FD_SET(self.read_fd(), &mut fds) }; - let res = unsafe { - libc::select( - self.read_fd() + 1, - &mut fds, - std::ptr::null_mut(), - std::ptr::null_mut(), - if wait { - std::ptr::null_mut() - } else { - &mut timeout - }, - ) - }; - res > 0 + let timeout = if wait { FdReadableSet::kNoTimeout } else { 0 }; + FdReadableSet::is_fd_readable(self.read_fd(), timeout) } /// Return the fd to write to. @@ -195,10 +167,9 @@ impl From for FdMonitorItemId { } /// The callback type used by [`FdMonitorItem`]. It is passed a mutable reference to the -/// `FdMonitorItem`'s [`FdMonitorItem::fd`] and [the reason](ItemWakeupReason) for the wakeup. -/// It should return an [`ItemAction`] to indicate whether the item should be removed from the -/// [`FdMonitor`] set. -pub type Callback = Box ItemAction + Send + Sync>; +/// `FdMonitorItem`'s [`FdMonitorItem::fd`]. If the fd is closed, the callback will not +/// be invoked again. +pub type Callback = Box; /// An item containing an fd and callback, which can be monitored to watch when it becomes readable /// and invoke the callback. @@ -208,44 +179,20 @@ pub struct FdMonitorItem { /// A callback to be invoked when the fd is readable, or for another reason given by the wake reason. /// If the fd is invalid on return from the function, then the item is removed from the [`FdMonitor`] set. callback: Callback, - /// The id for this item, assigned by [`FdMonitor`]. - item_id: FdMonitorItemId, -} - -/// A value returned by the callback to indicate what to do with the item. -#[derive(PartialEq, Eq)] -pub enum ItemAction { - Remove, - Retain, } impl FdMonitorItem { /// Invoke this item's callback because the fd is readable. /// Returns the [`ItemAction`] to indicate whether the item should be removed from the [`FdMonitor`] set. - fn service_readable(&mut self) -> ItemAction { - (self.callback)(&mut self.fd, ItemWakeReason::Readable) - } - - /// Invoke this item's callback because the item was poked. - /// Returns the [`ItemAction`] to indicate whether the item should be removed from the [`FdMonitor`] set. - fn service_poke(&mut self) -> ItemAction { - (self.callback)(&mut self.fd, ItemWakeReason::Poke) - } - - pub fn new(fd: AutoCloseFd, callback: Callback) -> Self { - FdMonitorItem { - fd, - callback, - item_id: FdMonitorItemId(0), - } + fn service(&mut self) { + (self.callback)(&mut self.fd) } } /// A thread-safe class which can monitor a set of fds, invoking a callback when any becomes /// readable (or has been HUP'd). pub struct FdMonitor { - /// Our self-signaller. When this is written to, it means there are new items pending, new items - /// in the poke list, or terminate has been set. + /// Our self-signaller, used to wake up the background thread out of select(). change_signaller: Arc, /// The data shared between the background thread and the `FdMonitor` instance. data: Arc>, @@ -266,8 +213,6 @@ const _: () = { struct SharedData { /// The map of items. This may be modified by the main thread with the mutex locked. items: HashMap, - /// List of IDs for items that need to be poked (explicitly woken up). - pokelist: Vec, /// Whether the background thread is running. running: bool, /// Used to signal that the background thread should terminate. @@ -280,26 +225,25 @@ struct BackgroundFdMonitor { /// in the poke list, or terminate has been set. change_signaller: Arc, /// The data shared between the background thread and the `FdMonitor` instance. + /// Note the locking here is very coarse and the lock is held while servicing items. + /// This means that an item which reads a lot of data may prevent adding other items. + /// When we do true multithreaded execution, we may want to make the locking more fine-grained (per-item). data: Arc>, } impl FdMonitor { /// Add an item to the monitor. Returns the [`FdMonitorItemId`] assigned to the item. - pub fn add(&self, mut item: FdMonitorItem) -> FdMonitorItemId { - assert!(item.fd.is_valid()); - assert!( - item.item_id == FdMonitorItemId(0), - "Item should not already have an id!" - ); + pub fn add(&self, fd: AutoCloseFd, callback: Callback) -> FdMonitorItemId { + assert!(fd.is_valid()); let item_id = self.last_id.fetch_add(1, Ordering::Relaxed) + 1; let item_id = FdMonitorItemId(item_id); + let item: FdMonitorItem = FdMonitorItem { fd, callback }; let start_thread = { // Lock around a local region let mut data = self.data.lock().expect("Mutex poisoned!"); // Assign an id and add the item. - item.item_id = item_id; let old_value = data.items.insert(item_id, item); assert!(old_value.is_none(), "Item ID {} already exists!", item_id.0); @@ -326,29 +270,24 @@ impl FdMonitor { item_id } - /// Mark that the item with the given ID needs to be woken up explicitly. - pub fn poke_item(&self, item_id: FdMonitorItemId) { + /// Remove an item from the monitor and return its file descriptor. + /// Note we may remove an item whose fd is currently being waited on in select(); this is + /// considered benign because the underlying item will no longer be present and so its + /// callback will not be invoked. + pub fn remove_item(&self, item_id: FdMonitorItemId) -> AutoCloseFd { assert!(item_id.0 > 0, "Invalid item id!"); - let needs_notification = { - let mut data = self.data.lock().expect("Mutex poisoned!"); - let needs_notification = data.pokelist.is_empty(); - // Insert it, sorted. But not if it already exists. - if let Err(pos) = data.pokelist.binary_search(&item_id) { - data.pokelist.insert(pos, item_id); - }; - needs_notification - }; - - if needs_notification { - self.change_signaller.post(); - } + let mut data = self.data.lock().expect("Mutex poisoned!"); + let removed = data.items.remove(&item_id).expect("Item ID not found"); + drop(data); + // Allow it to recompute the wait set. + self.change_signaller.post(); + removed.fd } pub fn new() -> Self { Self { data: Arc::new(Mutex::new(SharedData { items: HashMap::new(), - pokelist: Vec::new(), running: false, terminate: false, })), @@ -366,7 +305,6 @@ impl BackgroundFdMonitor { let mut fds = FdReadableSet::new(); let mut item_ids: Vec = Vec::new(); - let mut pokelist: Vec = Vec::new(); loop { // Our general flow is that a client thread adds an item for us to monitor, @@ -399,6 +337,7 @@ impl BackgroundFdMonitor { // - The client thread directly removes an item (protected by the mutex). // - After select() returns the set of active file descriptors, we only invoke callbacks // for items whose file descriptors are marked active, and whose ItemID was snapshotted. + // // This avoids the ABA problem because ItemIDs are never recycled. It does have a race where // we might select() on a file descriptor that has been closed or recycled. Thus we must be // prepared to handle EBADF. This race is otherwise considered benign. @@ -409,18 +348,20 @@ impl BackgroundFdMonitor { let change_signal_fd = self.change_signaller.read_fd(); fds.add(change_signal_fd); - // Grab the lock and snapshot the item_ids. + // Grab the lock and snapshot the item_ids. Skip items with invalid fds. let mut data = self.data.lock().expect("Mutex poisoned!"); item_ids.clear(); item_ids.reserve(data.items.len()); for (item_id, item) in &data.items { let fd = item.fd.as_raw_fd(); - fds.add(fd); - item_ids.push(*item_id); + if fd >= 0 { + fds.add(fd); + item_ids.push(*item_id); + } } // Sort it to avoid the non-determinism of the hash table. - item_ids.sort(); + item_ids.sort_unstable(); // If we have no items, then we wish to allow the thread to exit, but after a time, so // we aren't spinning up and tearing down the thread repeatedly. Set a timeout of 256 @@ -461,27 +402,8 @@ impl BackgroundFdMonitor { // Note there is no risk of an ABA problem because ItemIDs are never recycled. continue; }; - let fd = item.fd.as_raw_fd(); - if !fds.test(fd) { - // Not readable. - continue; - } - let action = item.service_readable(); - if action == ItemAction::Remove { - FLOG!(fd_monitor, "Removing fd", fd); - data.items.remove(item_id); - } - } - - pokelist.clear(); - std::mem::swap(&mut pokelist, &mut data.pokelist); - for item_id in pokelist.drain(..) { - if let Some(item) = data.items.get_mut(&item_id) { - let action = item.service_poke(); - if action == ItemAction::Remove { - FLOG!(fd_monitor, "Removing fd", item.fd.as_raw_fd()); - data.items.remove(&item_id); - } + if fds.test(item.fd.as_raw_fd()) { + item.service(); } } @@ -492,12 +414,7 @@ impl BackgroundFdMonitor { // Clear the change signaller before processing incoming changes self.change_signaller.try_consume(); - if data.terminate - || (is_wait_lap - && data.items.is_empty() - && data.pokelist.is_empty() - && !change_signalled) - { + if data.terminate || (is_wait_lap && data.items.is_empty() && !change_signalled) { // Maybe terminate is set. Alternatively, maybe we had no items, waited a bit, // and still have no items. It's important to do this while holding the lock, // otherwise we race with new items being added. diff --git a/src/io.rs b/src/io.rs index 306304443..67aa0ec1e 100644 --- a/src/io.rs +++ b/src/io.rs @@ -1,8 +1,6 @@ use crate::builtins::shared::{STATUS_CMD_ERROR, STATUS_CMD_OK, STATUS_READ_TOO_MUCH}; use crate::common::{str2wcstring, wcs2string, EMPTY_STRING}; -use crate::fd_monitor::{ - Callback, FdMonitor, FdMonitorItem, FdMonitorItemId, ItemAction, ItemWakeReason, -}; +use crate::fd_monitor::{Callback, FdMonitor}; use crate::fds::{ make_autoclose_pipes, make_fd_nonblocking, wopen_cloexec, AutoCloseFd, PIPE_ERROR, }; @@ -23,14 +21,13 @@ use nix::sys::stat::Mode; use once_cell::sync::Lazy; #[cfg(not(target_has_atomic = "64"))] use portable_atomic::AtomicU64; -use std::cell::RefCell; use std::fs::File; use std::io; use std::os::fd::{AsRawFd, IntoRawFd, OwnedFd, RawFd}; #[cfg(target_has_atomic = "64")] use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; -use std::sync::{Arc, Condvar, Mutex, MutexGuard}; +use std::sync::{Arc, Mutex, MutexGuard}; /// separated_buffer_t represents a buffer of output from commands, prepared to be turned into a /// variable. For example, command substitutions output into one of these. Most commands just @@ -436,13 +433,8 @@ pub struct IoBuffer { /// Buffer storing what we have read. buffer: Mutex, - /// Atomic flag indicating our fillthread should shut down. - shutdown_fillthread: RelaxedAtomicBool, - - /// A promise, allowing synchronization with the background fill operation. - /// The operation has a reference to this as well, and fulfills this promise when it exits. - #[allow(clippy::type_complexity)] - fill_waiter: RefCell, Condvar)>>>, + /// Atomic flag indicating our fillthread is running. + fillthread_running: RelaxedAtomicBool, /// The item id of our background fillthread fd monitor item. item_id: AtomicU64, @@ -456,8 +448,7 @@ impl IoBuffer { pub fn new(limit: usize) -> Self { IoBuffer { buffer: Mutex::new(SeparatedBuffer::new(limit)), - shutdown_fillthread: RelaxedAtomicBool::new(false), - fill_waiter: RefCell::new(None), + fillthread_running: RelaxedAtomicBool::new(false), item_id: AtomicU64::new(0), } } @@ -508,108 +499,65 @@ impl IoBuffer { /// End the background fillthread operation, and return the buffer, transferring ownership. pub fn complete_background_fillthread_and_take_buffer(&self) -> SeparatedBuffer { // Mark that our fillthread is done, then wake it up. - assert!(self.fillthread_running(), "Should have a fillthread"); + assert!(self.fillthread_running.load(), "Should have a fillthread"); assert!( self.item_id.load(Ordering::SeqCst) != 0, "Should have a valid item ID" ); - self.shutdown_fillthread.store(true); - fd_monitor().poke_item(FdMonitorItemId::from(self.item_id.load(Ordering::SeqCst))); + let item_id = self.item_id.load(Ordering::SeqCst).into(); + let fd = fd_monitor().remove_item(item_id); - // Wait for the fillthread to fulfill its promise, and then clear the future so we know we no - // longer have one. - - let mut promise = self.fill_waiter.borrow_mut(); - let (mutex, condvar) = &**promise.as_ref().unwrap(); - { - let done_guard = mutex.lock().unwrap(); - let _done_guard = condvar.wait_while(done_guard, |done| !*done).unwrap(); + // Read any remaining data from the pipe. + let mut locked_buff = self.buffer.lock().unwrap(); + while fd.is_valid() && IoBuffer::read_once(fd.as_raw_fd(), &mut locked_buff) > 0 { + // pass } - *promise = None; // Return our buffer, transferring ownership. - let mut locked_buff = self.buffer.lock().unwrap(); let mut result = SeparatedBuffer::new(locked_buff.limit()); std::mem::swap(&mut result, &mut locked_buff); locked_buff.clear(); result } - - /// Helper to return whether the fillthread is running. - pub fn fillthread_running(&self) -> bool { - return self.fill_waiter.borrow().is_some(); - } } /// Begin the fill operation, reading from the given fd in the background. fn begin_filling(iobuffer: &Arc, fd: OwnedFd) { - assert!(!iobuffer.fillthread_running(), "Already have a fillthread"); + assert!( + !iobuffer.fillthread_running.load(), + "Already have a fillthread" + ); + iobuffer.fillthread_running.store(true); // We want to fill buffer_ by reading from fd. fd is the read end of a pipe; the write end is // owned by another process, or something else writing in fish. // Pass fd to an fd_monitor. It will add fd to its select() loop, and give us a callback when - // the fd is readable, or when our item is poked. The usual path is that we will get called - // back, read a bit from the fd, and append it to the buffer. Eventually the write end of the - // pipe will be closed - probably the other process exited - and fd will be widowed; read() will - // then return 0 and we will stop reading. + // the fd is readable. The usual path is that we will get called back, read a bit from the fd, + // and append it to the buffer. Eventually the write end of the pipe will be closed - probably + // the other process exited - and fd will be widowed; read() will then return 0 and we will stop + // reading. // In exotic circumstances the write end of the pipe will not be closed; this may happen in // e.g.: // cmd ( background & ; echo hi ) // Here the background process will inherit the write end of the pipe and hold onto it forever. - // In this case, when complete_background_fillthread() is called, the callback will be invoked - // with item_wake_reason_t::poke, and we will notice that the shutdown flag is set (this - // indicates that the command substitution is done); in this case we will read until we get - // EAGAIN and then give up. - - // Construct a promise. We will fulfill it in our fill thread, and wait for it in - // complete_background_fillthread(). Note that TSan complains if the promise's dtor races with - // the future's call to wait(), so we store the promise, not just its future (#7681). - let promise = Arc::new((Mutex::new(false), Condvar::new())); - iobuffer.fill_waiter.replace(Some(promise.clone())); + // In this case, when complete_background_fillthread() is called, we grab the file descriptor + // and read until we get EAGAIN and then give up. // Run our function to read until the receiver is closed. - // It's OK to capture 'buffer' because 'this' waits for the promise in its dtor. let item_callback: Callback = { let iobuffer = iobuffer.clone(); - Box::new(move |fd: &mut AutoCloseFd, reason: ItemWakeReason| { - // Only check the shutdown flag if we timed out or were poked. - // It's important that if select() indicated we were readable, that we call select() again - // allowing it to time out. Note the typical case is that the fd will be closed, in which - // case select will return immediately. - let mut done = false; - if reason == ItemWakeReason::Readable { - // select() reported us as readable; read a bit. - let mut buf = iobuffer.buffer.lock().unwrap(); - let ret = IoBuffer::read_once(fd.as_raw_fd(), &mut buf); - done = ret == 0 || (ret < 0 && ![EAGAIN, EWOULDBLOCK].contains(&errno::errno().0)); - } else if iobuffer.shutdown_fillthread.load() { - // Here our caller asked us to shut down; read while we keep getting data. - // This will stop when the fd is closed or if we get EAGAIN. - let mut buf = iobuffer.buffer.lock().unwrap(); - loop { - let ret = IoBuffer::read_once(fd.as_raw_fd(), &mut buf); - if ret <= 0 { - break; - } - } - done = true; - } - if !done { - ItemAction::Retain - } else { + Box::new(move |fd: &mut AutoCloseFd| { + assert!(fd.as_raw_fd() >= 0, "Invalid fd"); + let mut buf = iobuffer.buffer.lock().unwrap(); + let ret = IoBuffer::read_once(fd.as_raw_fd(), &mut buf); + if ret == 0 || (ret < 0 && ![EAGAIN, EWOULDBLOCK].contains(&errno::errno().0)) { + // Either it's finished or some other error - we're done. fd.close(); - let (mutex, condvar) = &*promise; - { - let mut done = mutex.lock().unwrap(); - *done = true; - } - condvar.notify_one(); - ItemAction::Remove } }) }; let fd = AutoCloseFd::new(fd.into_raw_fd()); - let item_id = fd_monitor().add(FdMonitorItem::new(fd, item_callback)); + let item_id = fd_monitor().add(fd, item_callback); iobuffer.item_id.store(u64::from(item_id), Ordering::SeqCst); } diff --git a/src/tests/fd_monitor.rs b/src/tests/fd_monitor.rs index 7cfba0953..f10e2b7d7 100644 --- a/src/tests/fd_monitor.rs +++ b/src/tests/fd_monitor.rs @@ -2,17 +2,19 @@ use portable_atomic::AtomicU64; use std::fs::File; use std::io::Write; -use std::os::fd::{AsRawFd, IntoRawFd}; +use std::os::fd::{AsRawFd, IntoRawFd, OwnedFd}; #[cfg(target_has_atomic = "64")] use std::sync::atomic::AtomicU64; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Barrier, Mutex}; +use std::thread; use std::time::Duration; -use crate::fd_monitor::{ - FdEventSignaller, FdMonitor, FdMonitorItem, FdMonitorItemId, ItemAction, ItemWakeReason, -}; -use crate::fds::{make_autoclose_pipes, AutoCloseFd}; +use errno::errno; + +use crate::fd_monitor::{FdEventSignaller, FdMonitor}; +use crate::fd_readable_set::FdReadableSet; +use crate::fds::{make_autoclose_pipes, AutoCloseFd, AutoClosePipes}; use crate::tests::prelude::*; /// Helper to make an item which counts how many times its callback was invoked. @@ -21,10 +23,9 @@ use crate::tests::prelude::*; /// since this is just used for test purposes. struct ItemMaker { pub length_read: AtomicUsize, - pub pokes: AtomicUsize, pub total_calls: AtomicUsize, item_id: AtomicU64, - pub always_exit: bool, + pub always_close: bool, pub writer: Mutex>, } @@ -38,10 +39,9 @@ impl ItemMaker { let mut result = ItemMaker { length_read: 0.into(), - pokes: 0.into(), total_calls: 0.into(), item_id: 0.into(), - always_exit: false, + always_close: false, writer: Mutex::new(Some(File::from(pipes.write))), }; @@ -50,42 +50,25 @@ impl ItemMaker { let result = Arc::new(result); let callback = { let result = Arc::clone(&result); - move |fd: &mut AutoCloseFd, reason: ItemWakeReason| result.callback(fd, reason) + move |fd: &mut AutoCloseFd| result.callback(fd) }; let fd = AutoCloseFd::new(pipes.read.into_raw_fd()); - let item = FdMonitorItem::new(fd, Box::new(callback)); - let item_id = monitor.add(item); + let item_id = monitor.add(fd, Box::new(callback)); result.item_id.store(u64::from(item_id), Ordering::Relaxed); result } - fn item_id(&self) -> FdMonitorItemId { - self.item_id.load(Ordering::Relaxed).into() - } - - fn callback(&self, fd: &mut AutoCloseFd, reason: ItemWakeReason) -> ItemAction { - let mut was_closed = false; - - match reason { - ItemWakeReason::Poke => { - self.pokes.fetch_add(1, Ordering::Relaxed); - } - ItemWakeReason::Readable => { - let mut buf = [0u8; 1024]; - let res = nix::unistd::read(fd.as_raw_fd(), &mut buf); - let amt = res.expect("read error!"); - self.length_read.fetch_add(amt as usize, Ordering::Relaxed); - was_closed = amt == 0; - } - } + fn callback(&self, fd: &mut AutoCloseFd) { + let mut buf = [0u8; 1024]; + let res = nix::unistd::read(fd.as_raw_fd(), &mut buf); + let amt = res.expect("read error!"); + self.length_read.fetch_add(amt as usize, Ordering::Relaxed); + let was_closed = amt == 0; self.total_calls.fetch_add(1, Ordering::Relaxed); - if self.always_exit || was_closed { + if was_closed || self.always_close { fd.close(); - ItemAction::Remove - } else { - ItemAction::Retain } } @@ -116,12 +99,9 @@ fn fd_monitor_items() { // Item which should get 42 bytes then get notified it is closed. let item42_then_close = ItemMaker::insert_new_into(&monitor); - // Item which gets one poke. - let item_pokee = ItemMaker::insert_new_into(&monitor); - // Item which should get a callback exactly once. let item_oneshot = ItemMaker::insert_new_into2(&monitor, |item| { - item.always_exit = true; + item.always_close = true; }); item42.write42(); @@ -129,8 +109,6 @@ fn fd_monitor_items() { *item42_then_close.writer.lock().expect("Mutex poisoned!") = None; item_oneshot.write42(); - monitor.poke_item(item_pokee.item_id()); - // May need to loop here to ensure our fd_monitor gets scheduled. See #7699. for _ in 0..100 { std::thread::sleep(Duration::from_millis(84)); @@ -142,22 +120,14 @@ fn fd_monitor_items() { drop(monitor); assert_eq!(item_never.length_read.load(Ordering::Relaxed), 0); - assert_eq!(item_never.pokes.load(Ordering::Relaxed), 0); assert_eq!(item42.length_read.load(Ordering::Relaxed), 42); - assert_eq!(item42.pokes.load(Ordering::Relaxed), 0); assert_eq!(item42_then_close.length_read.load(Ordering::Relaxed), 42); assert_eq!(item42_then_close.total_calls.load(Ordering::Relaxed), 2); - assert_eq!(item42_then_close.pokes.load(Ordering::Relaxed), 0); assert_eq!(item_oneshot.length_read.load(Ordering::Relaxed), 42); assert_eq!(item_oneshot.total_calls.load(Ordering::Relaxed), 1); - assert_eq!(item_oneshot.pokes.load(Ordering::Relaxed), 0); - - assert_eq!(item_pokee.length_read.load(Ordering::Relaxed), 0); - assert_eq!(item_pokee.total_calls.load(Ordering::Relaxed), 1); - assert_eq!(item_pokee.pokes.load(Ordering::Relaxed), 1); } #[test] @@ -184,3 +154,86 @@ fn test_fd_event_signaller() { assert!(!sema.poll(false)); assert!(!sema.try_consume()); } + +// A helper function which calls poll() or selects() on a file descriptor in the background, +// and then invokes the `bad_action` function on the file descriptor while the poll/select is +// waiting. The function returns Result: either the number of readable file descriptors +// or the error code from poll/select. +fn do_something_bad_during_select(bad_action: F) -> Result +where + F: FnOnce(OwnedFd) -> Option, +{ + let AutoClosePipes { + read: read_fd, + write: write_fd, + } = make_autoclose_pipes().expect("Failed to create pipe"); + let raw_read_fd = read_fd.as_raw_fd(); + + // Try to ensure that the thread will be scheduled by waiting until it is. + let barrier = Arc::new(Barrier::new(2)); + let barrier_clone = Arc::clone(&barrier); + + let select_thread = thread::spawn(move || -> Result { + let mut fd_set = FdReadableSet::new(); + fd_set.add(raw_read_fd); + + barrier_clone.wait(); + + // Timeout after 500 msec. + // macOS will eagerly return EBADF if the fd is closed; Linux will hit the timeout. + let timeout_usec = 500 * 1_000; + let ret = fd_set.check_readable(timeout_usec); + if ret < 0 { + Err(errno().0) + } else { + Ok(ret) + } + }); + + barrier.wait(); + thread::sleep(Duration::from_millis(100)); + let read_fd = bad_action(read_fd); + + let result = select_thread.join().expect("Select thread panicked"); + // Ensure these stay alive until after thread is joined. + drop(read_fd); + drop(write_fd); + result +} + +#[test] +fn test_close_during_select_ebadf() { + let close_it = |read_fd: OwnedFd| { + drop(read_fd); + None + }; + let result = do_something_bad_during_select(close_it); + assert!( + matches!(result, Err(libc::EBADF) | Ok(1)), + "select/poll should have failed with EBADF or marked readable" + ); +} + +#[test] +fn test_dup2_during_select_ebadf() { + // Make a random file descriptor that we can dup2 stdin to. + let AutoClosePipes { + read: pipe_read, + write: pipe_write, + } = make_autoclose_pipes().expect("Failed to create pipe"); + + let dup2_it = |read_fd: OwnedFd| { + // We are going to dup2 stdin to this fd, which should cause select/poll to fail. + assert!(read_fd.as_raw_fd() > 0, "fd should be valid and not stdin"); + unsafe { libc::dup2(pipe_read.as_raw_fd(), read_fd.as_raw_fd()) }; + Some(read_fd) + }; + let result = do_something_bad_during_select(dup2_it); + assert!( + matches!(result, Err(libc::EBADF) | Ok(0)), + "select/poll should have failed with EBADF or timed out" + ); + // Ensure these stay alive until after thread is joined. + drop(pipe_read); + drop(pipe_write); +}