FdMonitor: Use a HashMap instead of Vec of items

Preparing for a substantial optimization.
This commit is contained in:
Peter Ammon 2024-12-23 11:48:47 -08:00
parent 69fdbc89d6
commit 5e59762117

View file

@ -1,5 +1,6 @@
#[cfg(not(target_has_atomic = "64"))] #[cfg(not(target_has_atomic = "64"))]
use portable_atomic::AtomicU64; use portable_atomic::AtomicU64;
use std::collections::HashMap;
use std::os::unix::prelude::*; use std::os::unix::prelude::*;
#[cfg(target_has_atomic = "64")] #[cfg(target_has_atomic = "64")]
use std::sync::atomic::AtomicU64; use std::sync::atomic::AtomicU64;
@ -178,7 +179,7 @@ impl FdEventSignaller {
/// Each item added to FdMonitor is assigned a unique ID, which is not recycled. Items may have /// Each item added to FdMonitor is assigned a unique ID, which is not recycled. Items may have
/// their callback triggered immediately by passing the ID. Zero is a sentinel. /// their callback triggered immediately by passing the ID. Zero is a sentinel.
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct FdMonitorItemId(u64); pub struct FdMonitorItemId(u64);
impl From<FdMonitorItemId> for u64 { impl From<FdMonitorItemId> for u64 {
@ -225,13 +226,9 @@ impl FdMonitorItem {
(self.callback)(&mut self.fd, ItemWakeReason::Readable) (self.callback)(&mut self.fd, ItemWakeReason::Readable)
} }
/// Invoke this item's callback with a poke, if its id is present in the sorted poke list. /// Invoke this item's callback because the item was poked.
fn maybe_poke_item(&mut self, pokelist: &[FdMonitorItemId]) -> ItemAction { /// Returns the [`ItemAction`] to indicate whether the item should be removed from the [`FdMonitor`] set.
if self.item_id.0 == 0 || pokelist.binary_search(&self.item_id).is_err() { fn service_poke(&mut self) -> ItemAction {
// Not pokeable or not in the poke list.
return ItemAction::Retain;
}
(self.callback)(&mut self.fd, ItemWakeReason::Poke) (self.callback)(&mut self.fd, ItemWakeReason::Poke)
} }
@ -267,9 +264,8 @@ const _: () = {
/// Data shared between the `FdMonitor` instance and its associated `BackgroundFdMonitor`. /// Data shared between the `FdMonitor` instance and its associated `BackgroundFdMonitor`.
struct SharedData { struct SharedData {
/// Pending items. This is set by the main thread with the mutex locked, then the background /// The map of items. This may be modified by the main thread with the mutex locked.
/// thread grabs them. items: HashMap<FdMonitorItemId, FdMonitorItem>,
pending: Vec<FdMonitorItem>,
/// List of IDs for items that need to be poked (explicitly woken up). /// List of IDs for items that need to be poked (explicitly woken up).
pokelist: Vec<FdMonitorItemId>, pokelist: Vec<FdMonitorItemId>,
/// Whether the background thread is running. /// Whether the background thread is running.
@ -280,9 +276,6 @@ struct SharedData {
/// The background half of the fd monitor, running on its own thread. /// The background half of the fd monitor, running on its own thread.
struct BackgroundFdMonitor { struct BackgroundFdMonitor {
/// The list of items to monitor. This is only accessed from the background thread.
/// This doesn't need to be in any particular order.
items: Vec<FdMonitorItem>,
/// Our self-signaller. When this is written to, it means there are new items pending, new items /// Our self-signaller. When this is written to, it means there are new items pending, new items
/// in the poke list, or terminate has been set. /// in the poke list, or terminate has been set.
change_signaller: Arc<FdEventSignaller>, change_signaller: Arc<FdEventSignaller>,
@ -305,9 +298,10 @@ impl FdMonitor {
// Lock around a local region // Lock around a local region
let mut data = self.data.lock().expect("Mutex poisoned!"); let mut data = self.data.lock().expect("Mutex poisoned!");
// Assign an id and add the item to pending // Assign an id and add the item.
item.item_id = item_id; item.item_id = item_id;
data.pending.push(item); let old_value = data.items.insert(item_id, item);
assert!(old_value.is_none(), "Item ID {} already exists!", item_id.0);
// Start the thread if it hasn't already been started // Start the thread if it hasn't already been started
let already_started = data.running; let already_started = data.running;
@ -320,7 +314,6 @@ impl FdMonitor {
let background_monitor = BackgroundFdMonitor { let background_monitor = BackgroundFdMonitor {
data: Arc::clone(&self.data), data: Arc::clone(&self.data),
change_signaller: Arc::clone(&self.change_signaller), change_signaller: Arc::clone(&self.change_signaller),
items: Vec::new(),
}; };
crate::threads::spawn(move || { crate::threads::spawn(move || {
background_monitor.run(); background_monitor.run();
@ -354,7 +347,7 @@ impl FdMonitor {
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
data: Arc::new(Mutex::new(SharedData { data: Arc::new(Mutex::new(SharedData {
pending: Vec::new(), items: HashMap::new(),
pokelist: Vec::new(), pokelist: Vec::new(),
running: false, running: false,
terminate: false, terminate: false,
@ -368,66 +361,129 @@ impl FdMonitor {
impl BackgroundFdMonitor { impl BackgroundFdMonitor {
/// Starts monitoring the fd set and listening for new fds to add to the set. Takes ownership /// Starts monitoring the fd set and listening for new fds to add to the set. Takes ownership
/// over its instance so that this method cannot be called again. /// over its instance so that this method cannot be called again.
fn run(mut self) { fn run(self) {
assert_is_background_thread(); assert_is_background_thread();
let mut pokelist: Vec<FdMonitorItemId> = Vec::new();
let mut fds = FdReadableSet::new(); let mut fds = FdReadableSet::new();
let mut item_ids: Vec<FdMonitorItemId> = Vec::new();
let mut pokelist: Vec<FdMonitorItemId> = Vec::new();
loop { loop {
// Poke any items that need it // Our general flow is that a client thread adds an item for us to monitor,
if !pokelist.is_empty() { // and we use select() or poll() to wait on it. However, the client thread
self.poke(&pokelist); // may then reclaim the item. We are currently blocked in select():
pokelist.clear(); // how then do we stop waiting on it?
} //
fds.clear(); // The safest, slowest approach is:
// - The background thread waits on select() for the set of active file descriptors.
// - The client thread records a request to remove an item.
// - The client thread wakes up the background thread via change_signaller.
// - The background thread check for any pending removals, and removes and returns them.
// - The client thread accepts the removed item and continues on.
// However this means a round-trip from the client thread to this background thread,
// plus additional blocking system calls. This slows down the client thread.
//
// A second possibility is that:
// - The background thread waits on select() for the set of active file descriptors.
// - The client thread directly removes an item (protected by the mutex).
// - After select() returns the set of active file descriptors, we only invoke callbacks
// for items whose file descriptors are still in the set.
// However this risks the ABA problem: if the client thread reclaims an item, closes its
// fd, and then adds a new item which happens to get the same fd, we might falsely
// trigger the callback of the new item even though its fd is not readable.
//
// So we use the following approach:
// - The background thread creates a snapshotted list of active ItemIDs.
// - The background thread waits in select() on the set of active file descriptors,
// without holding the lock.
// - The client thread directly removes an item (protected by the mutex).
// - After select() returns the set of active file descriptors, we only invoke callbacks
// for items whose file descriptors are marked active, and whose ItemID was snapshotted.
// This avoids the ABA problem because ItemIDs are never recycled. It does have a race where
// we might select() on a file descriptor that has been closed or recycled. Thus we must be
// prepared to handle EBADF. This race is otherwise considered benign.
// Our change_signaller is special-cased // Construct the set of fds to monitor.
// Our change_signaller is special-cased.
fds.clear();
let change_signal_fd = self.change_signaller.read_fd(); let change_signal_fd = self.change_signaller.read_fd();
fds.add(change_signal_fd); fds.add(change_signal_fd);
for item in &mut self.items { // Grab the lock and snapshot the item_ids.
fds.add(item.fd.as_raw_fd()); let mut data = self.data.lock().expect("Mutex poisoned!");
item_ids.clear();
item_ids.reserve(data.items.len());
for (item_id, item) in &data.items {
let fd = item.fd.as_raw_fd();
fds.add(fd);
item_ids.push(*item_id);
} }
// Sort it to avoid the non-determinism of the hash table.
item_ids.sort();
// If we have no items, then we wish to allow the thread to exit, but after a time, so // If we have no items, then we wish to allow the thread to exit, but after a time, so
// we aren't spinning up and tearing down the thread repeatedly. Set a timeout of 256 // we aren't spinning up and tearing down the thread repeatedly. Set a timeout of 256
// msec; if nothing becomes readable by then we will exit. We refer to this as the // msec; if nothing becomes readable by then we will exit. We refer to this as the
// wait-lap. // wait-lap.
let is_wait_lap = self.items.is_empty(); let is_wait_lap = item_ids.is_empty();
let timeout = if is_wait_lap { let timeout = if is_wait_lap {
Some(Duration::from_millis(256)) Some(Duration::from_millis(256))
} else { } else {
None None
}; };
// Call select() // Call select().
// We must release and then re-acquire the lock around select() to avoid deadlock.
// Note that while we are waiting in select(), the client thread may add or remove items;
// in particular it may even close file descriptors that we are waiting on. That is why
// we handle EBADF. Note that even if the file descriptor is recycled, we don't invoke
// a callback for it unless its ItemID is still present.
drop(data);
let ret = fds.check_readable( let ret = fds.check_readable(
timeout timeout
.map(|duration| duration.as_micros() as u64) .map(|duration| duration.as_micros() as u64)
.unwrap_or(FdReadableSet::kNoTimeout), .unwrap_or(FdReadableSet::kNoTimeout),
); );
if ret < 0 && errno::errno().0 != libc::EINTR { if ret < 0 && !matches!(errno().0, libc::EINTR | libc::EBADF) {
// Surprising error // Surprising error
perror("select"); perror("select");
} }
// A predicate which services each item in turn, returning false if it should be removed. // Re-acquire the lock.
let servicer = |item: &mut FdMonitorItem| -> bool { data = self.data.lock().expect("Mutex poisoned!");
// For each item id that we snapshotted, if the corresponding item is still in our
// set of active items and its fd was readable, then service it.
for item_id in &item_ids {
let Some(item) = data.items.get_mut(item_id) else {
// Item was removed while we were waiting.
// Note there is no risk of an ABA problem because ItemIDs are never recycled.
continue;
};
let fd = item.fd.as_raw_fd(); let fd = item.fd.as_raw_fd();
if !fds.test(fd) { if !fds.test(fd) {
// Not readable, so retain it. // Not readable.
return true; continue;
} }
let action = item.service_readable(); let action = item.service_readable();
if action == ItemAction::Remove { if action == ItemAction::Remove {
FLOG!(fd_monitor, "Removing fd", fd); FLOG!(fd_monitor, "Removing fd", fd);
data.items.remove(item_id);
}
} }
action == ItemAction::Retain
};
// Service all items that are readable, and remove any which say to do so. pokelist.clear();
self.items.retain_mut(servicer); std::mem::swap(&mut pokelist, &mut data.pokelist);
for item_id in pokelist.drain(..) {
if let Some(item) = data.items.get_mut(&item_id) {
let action = item.service_poke();
if action == ItemAction::Remove {
FLOG!(fd_monitor, "Removing fd", item.fd.as_raw_fd());
data.items.remove(&item_id);
}
}
}
// Handle any changes if the change signaller was set. Alternatively, this may be the // Handle any changes if the change signaller was set. Alternatively, this may be the
// wait lap, in which case we might want to commit to exiting. // wait lap, in which case we might want to commit to exiting.
@ -435,22 +491,11 @@ impl BackgroundFdMonitor {
if change_signalled || is_wait_lap { if change_signalled || is_wait_lap {
// Clear the change signaller before processing incoming changes // Clear the change signaller before processing incoming changes
self.change_signaller.try_consume(); self.change_signaller.try_consume();
let mut data = self.data.lock().expect("Mutex poisoned!");
// Move from `pending` to the end of `items`
self.items.extend(&mut data.pending.drain(..));
// Grab any poke list
assert!(
pokelist.is_empty(),
"poke list should be empty or else we're dropping pokes!"
);
std::mem::swap(&mut pokelist, &mut data.pokelist);
if data.terminate if data.terminate
|| (is_wait_lap || (is_wait_lap
&& self.items.is_empty() && data.items.is_empty()
&& pokelist.is_empty() && data.pokelist.is_empty()
&& !change_signalled) && !change_signalled)
{ {
// Maybe terminate is set. Alternatively, maybe we had no items, waited a bit, // Maybe terminate is set. Alternatively, maybe we had no items, waited a bit,
@ -467,18 +512,6 @@ impl BackgroundFdMonitor {
} }
} }
} }
/// Poke items in the poke list, removing any items that close their fd in their callback. The
/// poke list is consumed after this. This is only called from the background thread.
fn poke(&mut self, pokelist: &[FdMonitorItemId]) {
self.items.retain_mut(|item| {
let action = item.maybe_poke_item(pokelist);
if action == ItemAction::Remove {
FLOG!(fd_monitor, "Removing fd", item.fd.as_raw_fd());
}
action == ItemAction::Retain
});
}
} }
/// In ordinary usage, we never invoke the destructor. This is used in the tests to not leave stale /// In ordinary usage, we never invoke the destructor. This is used in the tests to not leave stale