diff --git a/src/fd_monitor.rs b/src/fd_monitor.rs index 3b9b5f819..9a6f548aa 100644 --- a/src/fd_monitor.rs +++ b/src/fd_monitor.rs @@ -1,5 +1,6 @@ #[cfg(not(target_has_atomic = "64"))] use portable_atomic::AtomicU64; +use std::collections::HashMap; use std::os::unix::prelude::*; #[cfg(target_has_atomic = "64")] use std::sync::atomic::AtomicU64; @@ -178,7 +179,7 @@ impl FdEventSignaller { /// Each item added to FdMonitor is assigned a unique ID, which is not recycled. Items may have /// their callback triggered immediately by passing the ID. Zero is a sentinel. -#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct FdMonitorItemId(u64); impl From for u64 { @@ -225,13 +226,9 @@ impl FdMonitorItem { (self.callback)(&mut self.fd, ItemWakeReason::Readable) } - /// Invoke this item's callback with a poke, if its id is present in the sorted poke list. - fn maybe_poke_item(&mut self, pokelist: &[FdMonitorItemId]) -> ItemAction { - if self.item_id.0 == 0 || pokelist.binary_search(&self.item_id).is_err() { - // Not pokeable or not in the poke list. - return ItemAction::Retain; - } - + /// Invoke this item's callback because the item was poked. + /// Returns the [`ItemAction`] to indicate whether the item should be removed from the [`FdMonitor`] set. + fn service_poke(&mut self) -> ItemAction { (self.callback)(&mut self.fd, ItemWakeReason::Poke) } @@ -267,9 +264,8 @@ const _: () = { /// Data shared between the `FdMonitor` instance and its associated `BackgroundFdMonitor`. struct SharedData { - /// Pending items. This is set by the main thread with the mutex locked, then the background - /// thread grabs them. - pending: Vec, + /// The map of items. This may be modified by the main thread with the mutex locked. + items: HashMap, /// List of IDs for items that need to be poked (explicitly woken up). pokelist: Vec, /// Whether the background thread is running. @@ -280,9 +276,6 @@ struct SharedData { /// The background half of the fd monitor, running on its own thread. struct BackgroundFdMonitor { - /// The list of items to monitor. This is only accessed from the background thread. - /// This doesn't need to be in any particular order. - items: Vec, /// Our self-signaller. When this is written to, it means there are new items pending, new items /// in the poke list, or terminate has been set. change_signaller: Arc, @@ -305,9 +298,10 @@ impl FdMonitor { // Lock around a local region let mut data = self.data.lock().expect("Mutex poisoned!"); - // Assign an id and add the item to pending + // Assign an id and add the item. item.item_id = item_id; - data.pending.push(item); + let old_value = data.items.insert(item_id, item); + assert!(old_value.is_none(), "Item ID {} already exists!", item_id.0); // Start the thread if it hasn't already been started let already_started = data.running; @@ -320,7 +314,6 @@ impl FdMonitor { let background_monitor = BackgroundFdMonitor { data: Arc::clone(&self.data), change_signaller: Arc::clone(&self.change_signaller), - items: Vec::new(), }; crate::threads::spawn(move || { background_monitor.run(); @@ -354,7 +347,7 @@ impl FdMonitor { pub fn new() -> Self { Self { data: Arc::new(Mutex::new(SharedData { - pending: Vec::new(), + items: HashMap::new(), pokelist: Vec::new(), running: false, terminate: false, @@ -368,66 +361,129 @@ impl FdMonitor { impl BackgroundFdMonitor { /// Starts monitoring the fd set and listening for new fds to add to the set. Takes ownership /// over its instance so that this method cannot be called again. - fn run(mut self) { + fn run(self) { assert_is_background_thread(); - let mut pokelist: Vec = Vec::new(); let mut fds = FdReadableSet::new(); + let mut item_ids: Vec = Vec::new(); + let mut pokelist: Vec = Vec::new(); loop { - // Poke any items that need it - if !pokelist.is_empty() { - self.poke(&pokelist); - pokelist.clear(); - } - fds.clear(); + // Our general flow is that a client thread adds an item for us to monitor, + // and we use select() or poll() to wait on it. However, the client thread + // may then reclaim the item. We are currently blocked in select(): + // how then do we stop waiting on it? + // + // The safest, slowest approach is: + // - The background thread waits on select() for the set of active file descriptors. + // - The client thread records a request to remove an item. + // - The client thread wakes up the background thread via change_signaller. + // - The background thread check for any pending removals, and removes and returns them. + // - The client thread accepts the removed item and continues on. + // However this means a round-trip from the client thread to this background thread, + // plus additional blocking system calls. This slows down the client thread. + // + // A second possibility is that: + // - The background thread waits on select() for the set of active file descriptors. + // - The client thread directly removes an item (protected by the mutex). + // - After select() returns the set of active file descriptors, we only invoke callbacks + // for items whose file descriptors are still in the set. + // However this risks the ABA problem: if the client thread reclaims an item, closes its + // fd, and then adds a new item which happens to get the same fd, we might falsely + // trigger the callback of the new item even though its fd is not readable. + // + // So we use the following approach: + // - The background thread creates a snapshotted list of active ItemIDs. + // - The background thread waits in select() on the set of active file descriptors, + // without holding the lock. + // - The client thread directly removes an item (protected by the mutex). + // - After select() returns the set of active file descriptors, we only invoke callbacks + // for items whose file descriptors are marked active, and whose ItemID was snapshotted. + // This avoids the ABA problem because ItemIDs are never recycled. It does have a race where + // we might select() on a file descriptor that has been closed or recycled. Thus we must be + // prepared to handle EBADF. This race is otherwise considered benign. - // Our change_signaller is special-cased + // Construct the set of fds to monitor. + // Our change_signaller is special-cased. + fds.clear(); let change_signal_fd = self.change_signaller.read_fd(); fds.add(change_signal_fd); - for item in &mut self.items { - fds.add(item.fd.as_raw_fd()); + // Grab the lock and snapshot the item_ids. + let mut data = self.data.lock().expect("Mutex poisoned!"); + item_ids.clear(); + item_ids.reserve(data.items.len()); + for (item_id, item) in &data.items { + let fd = item.fd.as_raw_fd(); + fds.add(fd); + item_ids.push(*item_id); } + // Sort it to avoid the non-determinism of the hash table. + item_ids.sort(); + // If we have no items, then we wish to allow the thread to exit, but after a time, so // we aren't spinning up and tearing down the thread repeatedly. Set a timeout of 256 // msec; if nothing becomes readable by then we will exit. We refer to this as the // wait-lap. - let is_wait_lap = self.items.is_empty(); + let is_wait_lap = item_ids.is_empty(); let timeout = if is_wait_lap { Some(Duration::from_millis(256)) } else { None }; - // Call select() + // Call select(). + // We must release and then re-acquire the lock around select() to avoid deadlock. + // Note that while we are waiting in select(), the client thread may add or remove items; + // in particular it may even close file descriptors that we are waiting on. That is why + // we handle EBADF. Note that even if the file descriptor is recycled, we don't invoke + // a callback for it unless its ItemID is still present. + drop(data); let ret = fds.check_readable( timeout .map(|duration| duration.as_micros() as u64) .unwrap_or(FdReadableSet::kNoTimeout), ); - if ret < 0 && errno::errno().0 != libc::EINTR { + if ret < 0 && !matches!(errno().0, libc::EINTR | libc::EBADF) { // Surprising error perror("select"); } - // A predicate which services each item in turn, returning false if it should be removed. - let servicer = |item: &mut FdMonitorItem| -> bool { + // Re-acquire the lock. + data = self.data.lock().expect("Mutex poisoned!"); + + // For each item id that we snapshotted, if the corresponding item is still in our + // set of active items and its fd was readable, then service it. + for item_id in &item_ids { + let Some(item) = data.items.get_mut(item_id) else { + // Item was removed while we were waiting. + // Note there is no risk of an ABA problem because ItemIDs are never recycled. + continue; + }; let fd = item.fd.as_raw_fd(); if !fds.test(fd) { - // Not readable, so retain it. - return true; + // Not readable. + continue; } let action = item.service_readable(); if action == ItemAction::Remove { FLOG!(fd_monitor, "Removing fd", fd); + data.items.remove(item_id); } - action == ItemAction::Retain - }; + } - // Service all items that are readable, and remove any which say to do so. - self.items.retain_mut(servicer); + pokelist.clear(); + std::mem::swap(&mut pokelist, &mut data.pokelist); + for item_id in pokelist.drain(..) { + if let Some(item) = data.items.get_mut(&item_id) { + let action = item.service_poke(); + if action == ItemAction::Remove { + FLOG!(fd_monitor, "Removing fd", item.fd.as_raw_fd()); + data.items.remove(&item_id); + } + } + } // Handle any changes if the change signaller was set. Alternatively, this may be the // wait lap, in which case we might want to commit to exiting. @@ -435,22 +491,11 @@ impl BackgroundFdMonitor { if change_signalled || is_wait_lap { // Clear the change signaller before processing incoming changes self.change_signaller.try_consume(); - let mut data = self.data.lock().expect("Mutex poisoned!"); - - // Move from `pending` to the end of `items` - self.items.extend(&mut data.pending.drain(..)); - - // Grab any poke list - assert!( - pokelist.is_empty(), - "poke list should be empty or else we're dropping pokes!" - ); - std::mem::swap(&mut pokelist, &mut data.pokelist); if data.terminate || (is_wait_lap - && self.items.is_empty() - && pokelist.is_empty() + && data.items.is_empty() + && data.pokelist.is_empty() && !change_signalled) { // Maybe terminate is set. Alternatively, maybe we had no items, waited a bit, @@ -467,18 +512,6 @@ impl BackgroundFdMonitor { } } } - - /// Poke items in the poke list, removing any items that close their fd in their callback. The - /// poke list is consumed after this. This is only called from the background thread. - fn poke(&mut self, pokelist: &[FdMonitorItemId]) { - self.items.retain_mut(|item| { - let action = item.maybe_poke_item(pokelist); - if action == ItemAction::Remove { - FLOG!(fd_monitor, "Removing fd", item.fd.as_raw_fd()); - } - action == ItemAction::Retain - }); - } } /// In ordinary usage, we never invoke the destructor. This is used in the tests to not leave stale