Remove the ability for FdMonitorItems to have timeouts

FdMonitor is used to monitor a set of file descriptors and invoke a callback
when one becomes readable. Prior to this commit, they coudl also have the
callback invoked on timeout. fish used to use this feature but no longer does;
remove it.
This commit is contained in:
Peter Ammon 2024-12-22 17:13:49 -08:00
parent 6dad396498
commit b7ae159824
No known key found for this signature in database
3 changed files with 32 additions and 120 deletions

View file

@ -5,7 +5,7 @@ use std::os::unix::prelude::*;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Mutex, Weak};
use std::time::{Duration, Instant};
use std::time::Duration;
use crate::common::exit_without_destructors;
use crate::fd_readable_set::FdReadableSet;
@ -26,8 +26,6 @@ use libc::{EFD_CLOEXEC, EFD_NONBLOCK};
pub enum ItemWakeReason {
/// The fd became readable (or was HUP'd)
Readable,
/// The requested timeout was hit
Timeout,
/// The item was "poked" (woken up explicitly)
Poke,
}
@ -209,11 +207,6 @@ pub struct FdMonitorItem {
/// A callback to be invoked when the fd is readable, or for another reason given by the wake reason.
/// If the fd is invalid on return from the function, then the item is removed from the [`FdMonitor`] set.
callback: Callback,
/// The timeout associated with waiting on this item or `None` to wait indefinitely. A timeout
/// of `0` is not supported.
timeout: Option<Duration>,
/// The last time we were called or the time of initialization.
last_time: Option<Instant>,
/// The id for this item, assigned by [`FdMonitor`].
item_id: FdMonitorItemId,
}
@ -226,35 +219,14 @@ pub enum ItemAction {
}
impl FdMonitorItem {
/// Return the duration until the timeout should trigger or `None`. A return of `0` means we are
/// at or past the timeout.
fn remaining_time(&self, now: &Instant) -> Option<Duration> {
let last_time = self.last_time.expect("Should always have a last_time!");
let timeout = self.timeout?;
assert!(now >= &last_time, "Steady clock went backwards or bug!");
let since = *now - last_time;
Some(if since >= timeout {
Duration::ZERO
} else {
timeout - since
})
}
/// Invoke this item's callback if its value (when its value is set in the fd or has timed out).
/// Invoke this item's callback if its fd is readable.
/// Returns `true` if the item should be retained or `false` if it should be removed from the
/// set.
fn service_item(&mut self, fds: &FdReadableSet, now: &Instant) -> ItemAction {
fn service_item(&mut self, fds: &FdReadableSet) -> ItemAction {
let mut result = ItemAction::Retain;
let readable = fds.test(self.fd.as_raw_fd());
let timed_out = !readable && self.remaining_time(now) == Some(Duration::ZERO);
if readable || timed_out {
self.last_time = Some(*now);
let reason = if readable {
ItemWakeReason::Readable
} else {
ItemWakeReason::Timeout
};
result = (self.callback)(&mut self.fd, reason);
if readable {
result = (self.callback)(&mut self.fd, ItemWakeReason::Readable);
}
result
}
@ -269,19 +241,17 @@ impl FdMonitorItem {
(self.callback)(&mut self.fd, ItemWakeReason::Poke)
}
pub fn new(fd: AutoCloseFd, timeout: Option<Duration>, callback: Callback) -> Self {
pub fn new(fd: AutoCloseFd, callback: Callback) -> Self {
FdMonitorItem {
fd,
timeout,
callback,
item_id: FdMonitorItemId(0),
last_time: None,
}
}
}
/// A thread-safe class which can monitor a set of fds, invoking a callback when any becomes
/// readable (or has been HUP'd) or when per-item-configurable timeouts are reached.
/// readable (or has been HUP'd).
pub struct FdMonitor {
/// Our self-signaller. When this is written to, it means there are new items pending, new items
/// in the poke list, or terminate has been set.
@ -330,7 +300,6 @@ impl FdMonitor {
/// Add an item to the monitor. Returns the [`FdMonitorItemId`] assigned to the item.
pub fn add(&self, mut item: FdMonitorItem) -> FdMonitorItemId {
assert!(item.fd.is_valid());
assert!(item.timeout != Some(Duration::ZERO), "Invalid timeout!");
assert!(
item.item_id == FdMonitorItemId(0),
"Item should not already have an id!"
@ -423,16 +392,8 @@ impl BackgroundFdMonitor {
let change_signal_fd = self.change_signaller.upgrade().unwrap().read_fd();
fds.add(change_signal_fd);
let mut now = Instant::now();
// Use Duration::MAX to represent no timeout for comparison purposes.
let mut timeout = Duration::MAX;
for item in &mut self.items {
fds.add(item.fd.as_raw_fd());
if item.last_time.is_none() {
item.last_time = Some(now);
}
timeout = timeout.min(item.timeout.unwrap_or(Duration::MAX));
}
// If we have no items, then we wish to allow the thread to exit, but after a time, so
@ -440,18 +401,10 @@ impl BackgroundFdMonitor {
// msec; if nothing becomes readable by then we will exit. We refer to this as the
// wait-lap.
let is_wait_lap = self.items.is_empty();
if is_wait_lap {
assert!(
timeout == Duration::MAX,
"Should not have a timeout on wait lap!"
);
timeout = Duration::from_millis(256);
}
// Don't leave Duration::MAX as an actual timeout value
let timeout = match timeout {
Duration::MAX => None,
timeout => Some(timeout),
let timeout = if is_wait_lap {
Some(Duration::from_millis(256))
} else {
None
};
// Call select()
@ -465,22 +418,17 @@ impl BackgroundFdMonitor {
perror("select");
}
// Update the value of `now` after waiting on `fds.check_readable()`; it's used in the
// servicer closure.
now = Instant::now();
// A predicate which services each item in turn, returning true if it should be removed
let servicer = |item: &mut FdMonitorItem| {
let fd = item.fd.as_raw_fd();
let action = item.service_item(&fds, &now);
let action = item.service_item(&fds);
if action == ItemAction::Remove {
FLOG!(fd_monitor, "Removing fd", fd);
}
action
};
// Service all items that are either readable or have timed out, and remove any which
// say to do so.
// Service all items that are readable, and remove any which say to do so.
self.items
.retain_mut(|item| servicer(item) == ItemAction::Retain);

View file

@ -609,7 +609,7 @@ fn begin_filling(iobuffer: &Arc<IoBuffer>, fd: OwnedFd) {
};
let fd = AutoCloseFd::new(fd.into_raw_fd());
let item_id = fd_monitor().add(FdMonitorItem::new(fd, None, item_callback));
let item_id = fd_monitor().add(FdMonitorItem::new(fd, item_callback));
iobuffer.item_id.store(u64::from(item_id), Ordering::SeqCst);
}

View file

@ -5,7 +5,7 @@ use std::io::Write;
use std::os::fd::{AsRawFd, IntoRawFd};
#[cfg(target_has_atomic = "64")]
use std::sync::atomic::AtomicU64;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
@ -20,7 +20,6 @@ use crate::tests::prelude::*;
/// This could be structured differently to avoid the `Mutex` on `writer`, but it's not worth it
/// since this is just used for test purposes.
struct ItemMaker {
pub did_timeout: AtomicBool,
pub length_read: AtomicUsize,
pub pokes: AtomicUsize,
pub total_calls: AtomicUsize,
@ -30,19 +29,14 @@ struct ItemMaker {
}
impl ItemMaker {
pub fn insert_new_into(monitor: &FdMonitor, timeout: Option<Duration>) -> Arc<Self> {
Self::insert_new_into2(monitor, timeout, |_| {})
pub fn insert_new_into(monitor: &FdMonitor) -> Arc<Self> {
Self::insert_new_into2(monitor, |_| {})
}
pub fn insert_new_into2<F: Fn(&mut Self)>(
monitor: &FdMonitor,
timeout: Option<Duration>,
config: F,
) -> Arc<Self> {
pub fn insert_new_into2<F: Fn(&mut Self)>(monitor: &FdMonitor, config: F) -> Arc<Self> {
let pipes = make_autoclose_pipes().expect("fds exhausted!");
let mut result = ItemMaker {
did_timeout: false.into(),
length_read: 0.into(),
pokes: 0.into(),
total_calls: 0.into(),
@ -59,7 +53,7 @@ impl ItemMaker {
move |fd: &mut AutoCloseFd, reason: ItemWakeReason| result.callback(fd, reason)
};
let fd = AutoCloseFd::new(pipes.read.into_raw_fd());
let item = FdMonitorItem::new(fd, timeout, Box::new(callback));
let item = FdMonitorItem::new(fd, Box::new(callback));
let item_id = monitor.add(item);
result.item_id.store(u64::from(item_id), Ordering::Relaxed);
@ -74,9 +68,6 @@ impl ItemMaker {
let mut was_closed = false;
match reason {
ItemWakeReason::Timeout => {
self.did_timeout.store(true, Ordering::Relaxed);
}
ItemWakeReason::Poke => {
self.pokes.fetch_add(1, Ordering::Relaxed);
}
@ -116,34 +107,24 @@ fn fd_monitor_items() {
let _cleanup = test_init();
let monitor = FdMonitor::new();
// Items which will never receive data or be called.
let item_never = ItemMaker::insert_new_into(&monitor, None);
let item_huge_timeout =
ItemMaker::insert_new_into(&monitor, Some(Duration::from_millis(100_000_000)));
// Item which will never receive data or be called.
let item_never = ItemMaker::insert_new_into(&monitor);
// Item which should get no data and time out.
let item0_timeout = ItemMaker::insert_new_into(&monitor, Some(Duration::from_millis(16)));
// Item which should get exactly 42 bytes then time out.
let item42_timeout = ItemMaker::insert_new_into(&monitor, Some(Duration::from_millis(16)));
// Item which should get exactly 42 bytes and not time out.
let item42_no_timeout = ItemMaker::insert_new_into(&monitor, None);
// Item which should get exactly 42 bytes.
let item42 = ItemMaker::insert_new_into(&monitor);
// Item which should get 42 bytes then get notified it is closed.
let item42_then_close = ItemMaker::insert_new_into(&monitor, Some(Duration::from_millis(16)));
let item42_then_close = ItemMaker::insert_new_into(&monitor);
// Item which gets one poke.
let item_pokee = ItemMaker::insert_new_into(&monitor, None);
let item_pokee = ItemMaker::insert_new_into(&monitor);
// Item which should get a callback exactly once.
let item_oneshot =
ItemMaker::insert_new_into2(&monitor, Some(Duration::from_millis(16)), |item| {
item.always_exit = true;
});
let item_oneshot = ItemMaker::insert_new_into2(&monitor, |item| {
item.always_exit = true;
});
item42_timeout.write42();
item42_no_timeout.write42();
item42.write42();
item42_then_close.write42();
*item42_then_close.writer.lock().expect("Mutex poisoned!") = None;
item_oneshot.write42();
@ -153,44 +134,27 @@ fn fd_monitor_items() {
// May need to loop here to ensure our fd_monitor gets scheduled. See #7699.
for _ in 0..100 {
std::thread::sleep(Duration::from_millis(84));
if item0_timeout.did_timeout.load(Ordering::Relaxed) {
if item_oneshot.total_calls.load(Ordering::Relaxed) > 0 {
break;
}
}
drop(monitor);
assert_eq!(item_never.did_timeout.load(Ordering::Relaxed), false);
assert_eq!(item_never.length_read.load(Ordering::Relaxed), 0);
assert_eq!(item_never.pokes.load(Ordering::Relaxed), 0);
assert_eq!(item_huge_timeout.did_timeout.load(Ordering::Relaxed), false);
assert_eq!(item_huge_timeout.length_read.load(Ordering::Relaxed), 0);
assert_eq!(item_huge_timeout.pokes.load(Ordering::Relaxed), 0);
assert_eq!(item42.length_read.load(Ordering::Relaxed), 42);
assert_eq!(item42.pokes.load(Ordering::Relaxed), 0);
assert_eq!(item0_timeout.length_read.load(Ordering::Relaxed), 0);
assert_eq!(item0_timeout.did_timeout.load(Ordering::Relaxed), true);
assert_eq!(item0_timeout.pokes.load(Ordering::Relaxed), 0);
assert_eq!(item42_timeout.length_read.load(Ordering::Relaxed), 42);
assert_eq!(item42_timeout.did_timeout.load(Ordering::Relaxed), true);
assert_eq!(item42_timeout.pokes.load(Ordering::Relaxed), 0);
assert_eq!(item42_no_timeout.length_read.load(Ordering::Relaxed), 42);
assert_eq!(item42_no_timeout.did_timeout.load(Ordering::Relaxed), false);
assert_eq!(item42_no_timeout.pokes.load(Ordering::Relaxed), 0);
assert_eq!(item42_then_close.did_timeout.load(Ordering::Relaxed), false);
assert_eq!(item42_then_close.length_read.load(Ordering::Relaxed), 42);
assert_eq!(item42_then_close.total_calls.load(Ordering::Relaxed), 2);
assert_eq!(item42_then_close.pokes.load(Ordering::Relaxed), 0);
assert_eq!(item_oneshot.did_timeout.load(Ordering::Relaxed), false);
assert_eq!(item_oneshot.length_read.load(Ordering::Relaxed), 42);
assert_eq!(item_oneshot.total_calls.load(Ordering::Relaxed), 1);
assert_eq!(item_oneshot.pokes.load(Ordering::Relaxed), 0);
assert_eq!(item_pokee.did_timeout.load(Ordering::Relaxed), false);
assert_eq!(item_pokee.length_read.load(Ordering::Relaxed), 0);
assert_eq!(item_pokee.total_calls.load(Ordering::Relaxed), 1);
assert_eq!(item_pokee.pokes.load(Ordering::Relaxed), 1);