Further cleanup of FdMonitor

This commit is contained in:
ridiculousfish 2024-01-13 12:35:55 -08:00
parent d8da79717e
commit 8554eb5f80
3 changed files with 25 additions and 29 deletions

View file

@ -192,10 +192,10 @@ impl From<u64> for FdMonitorItemId {
}
/// The callback type used by [`FdMonitorItem`]. It is passed a mutable reference to the
/// `FdMonitorItem`'s [`FdMonitorItem::fd`] and [the reason](ItemWakeupReason) for the wakeup. The
/// callback may close the fd, in which case the `FdMonitorItem` is removed from [`FdMonitor`]'s
/// set.
pub type Callback = Box<dyn Fn(&mut AutoCloseFd, ItemWakeReason) + Send + Sync>;
/// `FdMonitorItem`'s [`FdMonitorItem::fd`] and [the reason](ItemWakeupReason) for the wakeup.
/// It should return an [`ItemAction`] to indicate whether the item should be removed from the
/// [`FdMonitor`] set.
pub type Callback = Box<dyn Fn(&mut AutoCloseFd, ItemWakeReason) -> ItemAction + Send + Sync>;
/// An item containing an fd and callback, which can be monitored to watch when it becomes readable
/// and invoke the callback.
@ -214,11 +214,9 @@ pub struct FdMonitorItem {
item_id: FdMonitorItemId,
}
/// Unlike C++, rust's `Vec` has `Vec::retain()` instead of `std::remove_if(...)` with the inverse
/// logic. It's hard to keep track of which bool means what across the different layers, so be more
/// explicit.
/// A value returned by the callback to indicate what to do with the item.
#[derive(PartialEq, Eq)]
enum ItemAction {
pub enum ItemAction {
Remove,
Retain,
}
@ -257,12 +255,9 @@ impl FdMonitorItem {
} else {
ItemWakeReason::Timeout
};
(self.callback)(&mut self.fd, reason);
if !self.fd.is_valid() {
result = ItemAction::Remove;
result = (self.callback)(&mut self.fd, reason);
}
}
return result;
result
}
/// Invoke this item's callback with a poke, if its id is present in the sorted poke list.
@ -272,12 +267,7 @@ impl FdMonitorItem {
return ItemAction::Retain;
}
(self.callback)(&mut self.fd, ItemWakeReason::Poke);
// Return `ItemAction::Remove` if the callback closed the fd
match self.fd.is_valid() {
true => ItemAction::Retain,
false => ItemAction::Remove,
}
(self.callback)(&mut self.fd, ItemWakeReason::Poke)
}
pub fn new(fd: AutoCloseFd, timeout: Option<Duration>, callback: Callback) -> Self {
@ -483,11 +473,11 @@ impl BackgroundFdMonitor {
// A predicate which services each item in turn, returning true if it should be removed
let servicer = |item: &mut FdMonitorItem| {
let fd = item.fd.as_raw_fd();
if item.service_item(&fds, &now) == ItemAction::Remove {
let action = item.service_item(&fds, &now);
if action == ItemAction::Remove {
FLOG!(fd_monitor, "Removing fd", fd);
return ItemAction::Remove;
}
return ItemAction::Retain;
action
};
// Service all items that are either readable or have timed out, and remove any which

View file

@ -1,6 +1,8 @@
use crate::builtins::shared::{STATUS_CMD_ERROR, STATUS_CMD_OK, STATUS_READ_TOO_MUCH};
use crate::common::{str2wcstring, wcs2string, EMPTY_STRING};
use crate::fd_monitor::{Callback, FdMonitor, FdMonitorItem, FdMonitorItemId, ItemWakeReason};
use crate::fd_monitor::{
Callback, FdMonitor, FdMonitorItem, FdMonitorItemId, ItemAction, ItemWakeReason,
};
use crate::fds::{
make_autoclose_pipes, make_fd_nonblocking, wopen_cloexec, AutoCloseFd, PIPE_ERROR,
};
@ -580,7 +582,9 @@ fn begin_filling(iobuffer: &Arc<IoBuffer>, fd: AutoCloseFd) {
}
done = true;
}
if done {
if !done {
ItemAction::Retain
} else {
fd.close();
let (mutex, condvar) = &*promise;
{
@ -588,6 +592,7 @@ fn begin_filling(iobuffer: &Arc<IoBuffer>, fd: AutoCloseFd) {
*done = true;
}
condvar.notify_one();
ItemAction::Remove
}
})
};

View file

@ -5,7 +5,7 @@ use std::sync::{Arc, Mutex};
use std::time::Duration;
use crate::fd_monitor::{
FdEventSignaller, FdMonitor, FdMonitorItem, FdMonitorItemId, ItemWakeReason,
FdEventSignaller, FdMonitor, FdMonitorItem, FdMonitorItemId, ItemAction, ItemWakeReason,
};
use crate::fds::{make_autoclose_pipes, AutoCloseFd};
use crate::tests::prelude::*;
@ -51,9 +51,7 @@ impl ItemMaker {
let result = Arc::new(result);
let callback = {
let result = Arc::clone(&result);
move |fd: &mut AutoCloseFd, reason: ItemWakeReason| {
result.callback(fd, reason);
}
move |fd: &mut AutoCloseFd, reason: ItemWakeReason| result.callback(fd, reason)
};
let item = FdMonitorItem::new(pipes.read, timeout, Box::new(callback));
let item_id = monitor.add(item);
@ -66,7 +64,7 @@ impl ItemMaker {
self.item_id.load(Ordering::Relaxed).into()
}
fn callback(&self, fd: &mut AutoCloseFd, reason: ItemWakeReason) {
fn callback(&self, fd: &mut AutoCloseFd, reason: ItemWakeReason) -> ItemAction {
let mut was_closed = false;
match reason {
@ -89,6 +87,9 @@ impl ItemMaker {
self.total_calls.fetch_add(1, Ordering::Relaxed);
if self.always_exit || was_closed {
fd.close();
ItemAction::Remove
} else {
ItemAction::Retain
}
}