mirror of
https://github.com/fish-shell/fish-shell
synced 2024-12-27 05:13:10 +00:00
Port fd_monitor (and its needed components)
I needed to rename some types already ported to rust so they don't clash with their still-extant cpp counterparts. Helper ffi functions added to avoid needing to dynamically allocate an FdMonitorItem for every fd (we use dozens per basic prompt). I ported some functions from cpp to rust that are used only in the backend but without removing their existing cpp counterparts so cpp code can continue to use their version of them (`wperror` and `make_detached_pthread`). I ran into issues porting line-by-line logic because rust inverts the behavior of `std::remove_if(..)` by making it (basically) `Vec::retain_if(..)` so I replaced bools with an explict enum to make everything clearer. I'll port the cpp tests for this separately, for now they're using ffi. Porting closures was ugly. It's nothing hard, but it's very ugly as now each capturing lambda has been changed into an explicit struct that contains its parameters (that needs to be dynamically allocated), a standalone callback (member) function to replace the lambda contents, and a separate trampoline function to call it from rust over the shared C abi (not really relevant to x86_64 w/ its single calling convention but probably needed on other platforms). I don't like that `fd_monitor.rs` has its own `c_void`. I couldn't find a way to move that to `ffi.rs` but still get cxx bridge to consider it a shared POD. Every time I moved it to a different module, it would consider it to be an opaque rust type instead. I worry this means we're going to have multiple `c_void1`, `c_void2`, etc. types as we continue to port code to use function pointers. Also, rust treats raw pointers as foreign so you can't do `impl Send for * const Foo` even if `Foo` is from the same module. That necessitated a wrapper type (`void_ptr`) that implements `Send` and `Sync` so we can move stuff between threads. The code in fd_monitor_t has been split into two objects, one that is used by the caller and a separate one associated with the background thread (this is made nice and clean by rust's ownership model). Objects not needed under the lock (i.e. accessed by the background thread exclusively) were moved to the separate `BackgroundFdMonitor` type.
This commit is contained in:
parent
f01a5d2a1b
commit
ce559bc20e
17 changed files with 872 additions and 463 deletions
|
@ -117,7 +117,7 @@ set(FISH_BUILTIN_SRCS
|
|||
set(FISH_SRCS
|
||||
src/ast.cpp src/abbrs.cpp src/autoload.cpp src/color.cpp src/common.cpp src/complete.cpp
|
||||
src/env.cpp src/env_dispatch.cpp src/env_universal_common.cpp src/event.cpp
|
||||
src/exec.cpp src/expand.cpp src/fallback.cpp src/fd_monitor.cpp src/fish_version.cpp
|
||||
src/exec.cpp src/expand.cpp src/fallback.cpp src/fish_version.cpp
|
||||
src/flog.cpp src/function.cpp src/highlight.cpp
|
||||
src/history.cpp src/history_file.cpp src/input.cpp src/input_common.cpp
|
||||
src/io.cpp src/iothread.cpp src/job_group.cpp src/kill.cpp
|
||||
|
|
|
@ -19,7 +19,9 @@ fn main() -> miette::Result<()> {
|
|||
// This allows "Rust to be used from C++"
|
||||
// This must come before autocxx so that cxx can emit its cxx.h header.
|
||||
let source_files = vec![
|
||||
"src/fd_monitor.rs",
|
||||
"src/fd_readable_set.rs",
|
||||
"src/fds.rs",
|
||||
"src/ffi_init.rs",
|
||||
"src/ffi_tests.rs",
|
||||
"src/future_feature_flags.rs",
|
||||
|
|
|
@ -2,7 +2,7 @@ pub mod shared;
|
|||
|
||||
pub mod echo;
|
||||
pub mod emit;
|
||||
mod exit;
|
||||
pub mod exit;
|
||||
pub mod random;
|
||||
pub mod r#return;
|
||||
pub mod wait;
|
||||
|
|
567
fish-rust/src/fd_monitor.rs
Normal file
567
fish-rust/src/fd_monitor.rs
Normal file
|
@ -0,0 +1,567 @@
|
|||
use std::os::fd::{AsRawFd, RawFd};
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use self::fd_monitor::{c_void, new_fd_event_signaller, FdEventSignaller, ItemWakeReason};
|
||||
use crate::fd_readable_set::FdReadableSet;
|
||||
use crate::fds::AutoCloseFd;
|
||||
use crate::ffi::void_ptr;
|
||||
use crate::flog::FLOG;
|
||||
use crate::wutil::perror;
|
||||
use cxx::SharedPtr;
|
||||
|
||||
#[cxx::bridge]
|
||||
mod fd_monitor {
|
||||
/// Reason for waking an item
|
||||
#[repr(u8)]
|
||||
#[cxx_name = "item_wake_reason_t"]
|
||||
enum ItemWakeReason {
|
||||
/// The fd became readable (or was HUP'd)
|
||||
Readable,
|
||||
/// The requested timeout was hit
|
||||
Timeout,
|
||||
/// The item was "poked" (woken up explicitly)
|
||||
Poke,
|
||||
}
|
||||
|
||||
// Defines and exports a type shared between C++ and rust
|
||||
struct c_void {
|
||||
_unused: u8,
|
||||
}
|
||||
|
||||
unsafe extern "C++" {
|
||||
include!("fds.h");
|
||||
|
||||
/// An event signaller implemented using a file descriptor, so it can plug into
|
||||
/// [`select()`](libc::select).
|
||||
///
|
||||
/// This is like a binary semaphore. A call to [`post()`](FdEventSignaller::post) will
|
||||
/// signal an event, making the fd readable. Multiple calls to `post()` may be coalesced.
|
||||
/// On Linux this uses [`eventfd()`](libc::eventfd), on other systems this uses a pipe.
|
||||
/// [`try_consume()`](FdEventSignaller::try_consume) may be used to consume the event.
|
||||
/// Importantly this is async signal safe. Of course it is `CLO_EXEC` as well.
|
||||
#[rust_name = "FdEventSignaller"]
|
||||
type fd_event_signaller_t = crate::ffi::fd_event_signaller_t;
|
||||
#[rust_name = "new_fd_event_signaller"]
|
||||
fn ffi_new_fd_event_signaller_t() -> SharedPtr<FdEventSignaller>;
|
||||
}
|
||||
extern "Rust" {
|
||||
#[cxx_name = "fd_monitor_item_id_t"]
|
||||
type FdMonitorItemId;
|
||||
}
|
||||
|
||||
extern "Rust" {
|
||||
#[cxx_name = "fd_monitor_item_t"]
|
||||
type FdMonitorItem;
|
||||
|
||||
#[cxx_name = "make_fd_monitor_item_t"]
|
||||
fn new_fd_monitor_item_ffi(
|
||||
fd: i32,
|
||||
timeout_usecs: u64,
|
||||
callback: *const c_void,
|
||||
param: *const c_void,
|
||||
) -> Box<FdMonitorItem>;
|
||||
}
|
||||
|
||||
extern "Rust" {
|
||||
#[cxx_name = "fd_monitor_t"]
|
||||
type FdMonitor;
|
||||
|
||||
#[cxx_name = "make_fd_monitor_t"]
|
||||
fn new_fd_monitor_ffi() -> Box<FdMonitor>;
|
||||
|
||||
#[cxx_name = "add_item"]
|
||||
fn add_item_ffi(
|
||||
&mut self,
|
||||
fd: i32,
|
||||
timeout_usecs: u64,
|
||||
callback: *const c_void,
|
||||
param: *const c_void,
|
||||
) -> u64;
|
||||
|
||||
#[cxx_name = "poke_item"]
|
||||
fn poke_item_ffi(&self, item_id: u64);
|
||||
|
||||
#[cxx_name = "add"]
|
||||
pub fn add_ffi(&mut self, item: Box<FdMonitorItem>) -> u64;
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Remove once we're no longer using the FFI variant of FdEventSignaller
|
||||
unsafe impl Sync for FdEventSignaller {}
|
||||
unsafe impl Send for FdEventSignaller {}
|
||||
|
||||
/// Each item added to fd_monitor_t is assigned a unique ID, which is not recycled. Items may have
|
||||
/// their callback triggered immediately by passing the ID. Zero is a sentinel.
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
|
||||
pub struct FdMonitorItemId(u64);
|
||||
|
||||
type FfiCallback = extern "C" fn(*mut AutoCloseFd, u8, void_ptr);
|
||||
|
||||
/// The callback type used by [`FdMonitorItem`]. It is passed a mutable reference to the
|
||||
/// `FdMonitorItem`'s [`FdMonitorItem::fd`] and [the reason](ItemWakeupReason) for the wakeup. The
|
||||
/// callback may close the fd, in which case the `FdMonitorItem` is removed from [`FdMonitor`]'s
|
||||
/// set.
|
||||
///
|
||||
/// As capturing C++ closures can't be safely used via ffi interop and cxx bridge doesn't support
|
||||
/// passing typed `fn(...)` pointers from C++ to rust, we have a separate variant of the type that
|
||||
/// uses the C abi to invoke a callback. This will be removed when the dependent C++ code (currently
|
||||
/// only `src/io.cpp`) is ported to rust
|
||||
enum FdMonitorCallback {
|
||||
None,
|
||||
Native(Box<dyn Fn(&mut AutoCloseFd, ItemWakeReason) + Send + Sync>),
|
||||
Ffi(FfiCallback /* fn ptr */, void_ptr /* param */),
|
||||
}
|
||||
|
||||
/// An item containing an fd and callback, which can be monitored to watch when it becomes readable
|
||||
/// and invoke the callback.
|
||||
pub struct FdMonitorItem {
|
||||
/// The fd to monitor
|
||||
fd: AutoCloseFd,
|
||||
/// A callback to be invoked when the fd is readable, or when we are timed out. If we time out,
|
||||
/// then timed_out will be true. If the fd is invalid on return from the function, then the item
|
||||
/// is removed from the [`FdMonitor`] set.
|
||||
callback: FdMonitorCallback,
|
||||
/// The timeout associated with waiting on this item or `None` to wait indefinitely. A timeout
|
||||
/// of `0` is not supported.
|
||||
timeout: Option<Duration>,
|
||||
/// The last time we were called or the time of initialization.
|
||||
last_time: Option<Instant>,
|
||||
/// The id for this item, assigned by [`FdMonitor`].
|
||||
item_id: FdMonitorItemId,
|
||||
}
|
||||
|
||||
/// Unlike C++, rust's `Vec` has `Vec::retain()` instead of `std::remove_if(...)` with the inverse
|
||||
/// logic. It's hard to keep track of which bool means what across the different layers, so be more
|
||||
/// explicit.
|
||||
#[derive(PartialEq, Eq)]
|
||||
enum ItemAction {
|
||||
Remove,
|
||||
Retain,
|
||||
}
|
||||
|
||||
impl FdMonitorItem {
|
||||
/// Return the duration until the timeout should trigger or `None`. A return of `0` means we are
|
||||
/// at or past the timeout.
|
||||
fn remaining_time(&self, now: &Instant) -> Option<Duration> {
|
||||
let last_time = self.last_time.expect("Should always have a last_time!");
|
||||
let timeout = self.timeout?;
|
||||
assert!(now >= &last_time, "Steady clock went backwards or bug!");
|
||||
let since = *now - last_time;
|
||||
Some(if since >= timeout {
|
||||
Duration::ZERO
|
||||
} else {
|
||||
timeout - since
|
||||
})
|
||||
}
|
||||
|
||||
/// Invoke this item's callback if its value (when its value is set in the fd or has timed out).
|
||||
/// Returns `true` if the item should be retained or `false` if it should be removed from the
|
||||
/// set.
|
||||
fn service_item(&mut self, fds: &FdReadableSet, now: &Instant) -> ItemAction {
|
||||
let mut result = ItemAction::Retain;
|
||||
let readable = fds.test(self.fd.as_raw_fd());
|
||||
let timed_out = !readable && self.remaining_time(now) == Some(Duration::ZERO);
|
||||
if readable || timed_out {
|
||||
self.last_time = Some(*now);
|
||||
let reason = if readable {
|
||||
ItemWakeReason::Readable
|
||||
} else {
|
||||
ItemWakeReason::Timeout
|
||||
};
|
||||
match &self.callback {
|
||||
FdMonitorCallback::None => panic!("Callback not assigned!"),
|
||||
FdMonitorCallback::Native(callback) => (callback)(&mut self.fd, reason),
|
||||
FdMonitorCallback::Ffi(callback, param) => {
|
||||
// Safety: identical objects are generated on both sides by cxx bridge as
|
||||
// integers of the same size (minimum size to fit the enum).
|
||||
let reason = unsafe { std::mem::transmute(reason) };
|
||||
(callback)(&mut self.fd as *mut _, reason, *param)
|
||||
}
|
||||
}
|
||||
if !self.fd.is_valid() {
|
||||
result = ItemAction::Remove;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/// Invoke this item's callback with a poke, if its id is present in the sorted poke list.
|
||||
// TODO: Rename to `maybe_poke_item()` to reflect its actual behavior.
|
||||
fn poke_item(&mut self, pokelist: &[FdMonitorItemId]) -> ItemAction {
|
||||
if self.item_id.0 == 0 || pokelist.binary_search(&self.item_id).is_err() {
|
||||
// Not pokeable or not in the poke list.
|
||||
return ItemAction::Retain;
|
||||
}
|
||||
|
||||
match &self.callback {
|
||||
FdMonitorCallback::None => panic!("Callback not assigned!"),
|
||||
FdMonitorCallback::Native(callback) => (callback)(&mut self.fd, ItemWakeReason::Poke),
|
||||
FdMonitorCallback::Ffi(callback, param) => {
|
||||
// Safety: identical objects are generated on both sides by cxx bridge as
|
||||
// integers of the same size (minimum size to fit the enum).
|
||||
let reason = unsafe { std::mem::transmute(ItemWakeReason::Poke) };
|
||||
(callback)(&mut self.fd as *mut _, reason, *param)
|
||||
}
|
||||
}
|
||||
// Return `ItemAction::Remove` if the callback closed the fd
|
||||
match self.fd.is_valid() {
|
||||
true => ItemAction::Retain,
|
||||
false => ItemAction::Remove,
|
||||
}
|
||||
}
|
||||
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
callback: FdMonitorCallback::None,
|
||||
fd: AutoCloseFd::empty(),
|
||||
timeout: None,
|
||||
last_time: None,
|
||||
item_id: FdMonitorItemId(0),
|
||||
}
|
||||
}
|
||||
|
||||
fn set_callback_ffi(&mut self, callback: *const c_void, param: *const c_void) {
|
||||
// Safety: we are just marshalling our function pointers with identical definitions on both
|
||||
// sides of the ffi bridge as void pointers to keep cxx bridge happy. Whether we invoke the
|
||||
// raw function as a void pointer or as a typed fn that helps us keep track of what we're
|
||||
// doing is unsafe in all cases, so might as well make the best of it.
|
||||
let callback = unsafe { std::mem::transmute(callback) };
|
||||
self.callback = FdMonitorCallback::Ffi(callback, void_ptr(param as _));
|
||||
}
|
||||
}
|
||||
|
||||
// cxx bridge does not support "static member functions" in C++ or rust, so we need a top-level fn.
|
||||
fn new_fd_monitor_ffi() -> Box<FdMonitor> {
|
||||
Box::new(FdMonitor::new())
|
||||
}
|
||||
|
||||
// cxx bridge does not support "static member functions" in C++ or rust, so we need a top-level fn.
|
||||
fn new_fd_monitor_item_ffi(
|
||||
fd: RawFd,
|
||||
timeout_usecs: u64,
|
||||
callback: *const c_void,
|
||||
param: *const c_void,
|
||||
) -> Box<FdMonitorItem> {
|
||||
// Safety: we are just marshalling our function pointers with identical definitions on both
|
||||
// sides of the ffi bridge as void pointers to keep cxx bridge happy. Whether we invoke the
|
||||
// raw function as a void pointer or as a typed fn that helps us keep track of what we're
|
||||
// doing is unsafe in all cases, so might as well make the best of it.
|
||||
let callback = unsafe { std::mem::transmute(callback) };
|
||||
let mut item = FdMonitorItem::new();
|
||||
item.fd.reset(fd);
|
||||
item.callback = FdMonitorCallback::Ffi(callback, void_ptr(param as _));
|
||||
if timeout_usecs != FdReadableSet::kNoTimeout {
|
||||
item.timeout = Some(Duration::from_micros(timeout_usecs));
|
||||
}
|
||||
return Box::new(item);
|
||||
}
|
||||
|
||||
/// A thread-safe class which can monitor a set of fds, invoking a callback when any becomes
|
||||
/// readable (or has been HUP'd) or when per-item-configurable timeouts are reached.
|
||||
pub struct FdMonitor {
|
||||
/// Our self-signaller. When this is written to, it means there are new items pending, new items
|
||||
/// in the poke list, or terminate has been set.
|
||||
change_signaller: SharedPtr<FdEventSignaller>,
|
||||
/// The data shared between the background thread and the `FdMonitor` instance.
|
||||
data: Arc<Mutex<SharedData>>,
|
||||
/// The last ID assigned or `0` if none.
|
||||
last_id: AtomicU64,
|
||||
}
|
||||
|
||||
// We don't want to manually implement `Sync` for `FdMonitor` but we do want to make sure that it's
|
||||
// always using interior mutability correctly and therefore automatically `Sync`.
|
||||
const _: () = {
|
||||
// It is sufficient to declare the generic function pointers; calling them too would require
|
||||
// using `const fn` with Send/Sync constraints which wasn't stabilized until rustc 1.61.0
|
||||
fn assert_sync<T: Sync>() {}
|
||||
let _ = assert_sync::<FdMonitor>;
|
||||
};
|
||||
|
||||
/// Data shared between the `FdMonitor` instance and its associated `BackgroundFdMonitor`.
|
||||
struct SharedData {
|
||||
/// Pending items. This is set by the main thread with the mutex locked, then the background
|
||||
/// thread grabs them.
|
||||
pending: Vec<FdMonitorItem>,
|
||||
/// List of IDs for items that need to be poked (explicitly woken up).
|
||||
pokelist: Vec<FdMonitorItemId>,
|
||||
/// Whether the background thread is running.
|
||||
running: bool,
|
||||
/// Used to signal that the background thread should terminate.
|
||||
terminate: bool,
|
||||
}
|
||||
|
||||
/// The background half of the fd monitor, running on its own thread.
|
||||
struct BackgroundFdMonitor {
|
||||
/// The list of items to monitor. This is only accessed from the background thread.
|
||||
/// This doesn't need to be in any particular order.
|
||||
items: Vec<FdMonitorItem>,
|
||||
/// Our self-signaller. When this is written to, it means there are new items pending, new items
|
||||
/// in the poke list, or terminate has been set.
|
||||
change_signaller: SharedPtr<FdEventSignaller>,
|
||||
/// The data shared between the background thread and the `FdMonitor` instance.
|
||||
data: Arc<Mutex<SharedData>>,
|
||||
}
|
||||
|
||||
impl FdMonitor {
|
||||
pub fn add_ffi(&self, item: Box<FdMonitorItem>) -> u64 {
|
||||
self.add(*item).0
|
||||
}
|
||||
|
||||
/// Add an item to the monitor. Returns the [`FdMonitorItemId`] assigned to the item.
|
||||
pub fn add(&self, mut item: FdMonitorItem) -> FdMonitorItemId {
|
||||
assert!(item.fd.is_valid());
|
||||
assert!(item.timeout != Some(Duration::ZERO), "Invalid timeout!");
|
||||
assert!(
|
||||
item.item_id == FdMonitorItemId(0),
|
||||
"Item should not already have an id!"
|
||||
);
|
||||
|
||||
let item_id = self.last_id.fetch_add(1, Ordering::Relaxed) + 1;
|
||||
let item_id = FdMonitorItemId(item_id);
|
||||
let start_thread = {
|
||||
// Lock around a local region
|
||||
let mut data = self.data.lock().expect("Mutex poisoned!");
|
||||
|
||||
// Assign an id and add the item to pending
|
||||
item.item_id = item_id;
|
||||
data.pending.push(item);
|
||||
|
||||
// Start the thread if it hasn't already been started
|
||||
let already_started = data.running;
|
||||
data.running = true;
|
||||
!already_started
|
||||
};
|
||||
|
||||
if start_thread {
|
||||
FLOG!(fd_monitor, "Thread starting");
|
||||
let background_monitor = BackgroundFdMonitor {
|
||||
data: Arc::clone(&self.data),
|
||||
change_signaller: SharedPtr::clone(&self.change_signaller),
|
||||
items: Vec::new(),
|
||||
};
|
||||
crate::threads::spawn(move || {
|
||||
background_monitor.run();
|
||||
});
|
||||
}
|
||||
|
||||
item_id
|
||||
}
|
||||
|
||||
/// Avoid requiring a separate UniquePtr for each item C++ wants to add to the set by giving an
|
||||
/// all-in-one entry point that can initialize the item on our end and insert it to the set.
|
||||
fn add_item_ffi(
|
||||
&mut self,
|
||||
fd: RawFd,
|
||||
timeout_usecs: u64,
|
||||
callback: *const c_void,
|
||||
param: *const c_void,
|
||||
) -> u64 {
|
||||
// Safety: we are just marshalling our function pointers with identical definitions on both
|
||||
// sides of the ffi bridge as void pointers to keep cxx bridge happy. Whether we invoke the
|
||||
// raw function as a void pointer or as a typed fn that helps us keep track of what we're
|
||||
// doing is unsafe in all cases, so might as well make the best of it.
|
||||
let callback = unsafe { std::mem::transmute(callback) };
|
||||
let mut item = FdMonitorItem::new();
|
||||
item.fd.reset(fd);
|
||||
item.callback = FdMonitorCallback::Ffi(callback, void_ptr(param as _));
|
||||
if timeout_usecs != FdReadableSet::kNoTimeout {
|
||||
item.timeout = Some(Duration::from_micros(timeout_usecs));
|
||||
}
|
||||
let item_id = self.add(item).0;
|
||||
item_id
|
||||
}
|
||||
|
||||
/// Mark that the item with the given ID needs to be woken up explicitly.
|
||||
pub fn poke_item(&self, item_id: FdMonitorItemId) {
|
||||
assert!(item_id.0 > 0, "Invalid item id!");
|
||||
let needs_notification = {
|
||||
let mut data = self.data.lock().expect("Mutex poisoned!");
|
||||
let needs_notification = data.pokelist.is_empty();
|
||||
// Insert it, sorted.
|
||||
// TODO: The C++ code inserts it even if it's already in the poke list. That seems
|
||||
// unnecessary?
|
||||
let pos = match data.pokelist.binary_search(&item_id) {
|
||||
Ok(pos) => pos,
|
||||
Err(pos) => pos,
|
||||
};
|
||||
data.pokelist.insert(pos, item_id);
|
||||
needs_notification
|
||||
};
|
||||
|
||||
if needs_notification {
|
||||
self.change_signaller.post();
|
||||
}
|
||||
}
|
||||
|
||||
fn poke_item_ffi(&self, item_id: u64) {
|
||||
self.poke_item(FdMonitorItemId(item_id))
|
||||
}
|
||||
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
data: Arc::new(Mutex::new(SharedData {
|
||||
pending: Vec::new(),
|
||||
pokelist: Vec::new(),
|
||||
running: false,
|
||||
terminate: false,
|
||||
})),
|
||||
change_signaller: new_fd_event_signaller(),
|
||||
last_id: AtomicU64::new(0),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl BackgroundFdMonitor {
|
||||
/// Starts monitoring the fd set and listening for new fds to add to the set. Takes ownership
|
||||
/// over its instance so that this method cannot be called again.
|
||||
fn run(mut self) {
|
||||
let mut pokelist: Vec<FdMonitorItemId> = Vec::new();
|
||||
let mut fds = FdReadableSet::new();
|
||||
|
||||
loop {
|
||||
// Poke any items that need it
|
||||
if !pokelist.is_empty() {
|
||||
self.poke(&mut pokelist);
|
||||
pokelist.clear();
|
||||
}
|
||||
fds.clear();
|
||||
|
||||
// Our change_signaller is special-cased
|
||||
let change_signal_fd = self.change_signaller.read_fd().into();
|
||||
fds.add(change_signal_fd);
|
||||
|
||||
let mut now = Instant::now();
|
||||
// Use Duration::MAX to represent no timeout for comparison purposes.
|
||||
let mut timeout = Duration::MAX;
|
||||
|
||||
for item in &mut self.items {
|
||||
fds.add(item.fd.as_raw_fd());
|
||||
if !item.last_time.is_some() {
|
||||
item.last_time = Some(now);
|
||||
}
|
||||
timeout = timeout.min(item.timeout.unwrap_or(Duration::MAX));
|
||||
}
|
||||
|
||||
// If we have no items, then we wish to allow the thread to exit, but after a time, so
|
||||
// we aren't spinning up and tearing down the thread repeatedly. Set a timeout of 256
|
||||
// msec; if nothing becomes readable by then we will exit. We refer to this as the
|
||||
// wait-lap.
|
||||
let is_wait_lap = self.items.is_empty();
|
||||
if is_wait_lap {
|
||||
assert!(
|
||||
timeout == Duration::MAX,
|
||||
"Should not have a timeout on wait lap!"
|
||||
);
|
||||
timeout = Duration::from_millis(256);
|
||||
}
|
||||
|
||||
// Don't leave Duration::MAX as an actual timeout value
|
||||
let timeout = match timeout {
|
||||
Duration::MAX => None,
|
||||
timeout => Some(timeout),
|
||||
};
|
||||
|
||||
// Call select()
|
||||
let ret = fds.check_readable(
|
||||
timeout
|
||||
.map(|duration| duration.as_micros() as u64)
|
||||
.unwrap_or(FdReadableSet::kNoTimeout),
|
||||
);
|
||||
if ret < 0 && errno::errno().0 != libc::EINTR {
|
||||
// Surprising error
|
||||
perror("select");
|
||||
}
|
||||
|
||||
// Update the value of `now` after waiting on `fds.check_readable()`; it's used in the
|
||||
// servicer closure.
|
||||
now = Instant::now();
|
||||
|
||||
// A predicate which services each item in turn, returning true if it should be removed
|
||||
let servicer = |item: &mut FdMonitorItem| {
|
||||
let fd = item.fd.as_raw_fd();
|
||||
if item.service_item(&fds, &now) == ItemAction::Remove {
|
||||
FLOG!(fd_monitor, "Removing fd", fd);
|
||||
return ItemAction::Remove;
|
||||
}
|
||||
return ItemAction::Retain;
|
||||
};
|
||||
|
||||
// Service all items that are either readable or have timed out, and remove any which
|
||||
// say to do so.
|
||||
|
||||
// This line is from the C++ codebase (fd_monitor.cpp:170) but this write is never read.
|
||||
// now = Instant::now();
|
||||
|
||||
self.items
|
||||
.retain_mut(|item| servicer(item) == ItemAction::Retain);
|
||||
|
||||
// Handle any changes if the change signaller was set. Alternatively, this may be the
|
||||
// wait lap, in which case we might want to commit to exiting.
|
||||
let change_signalled = fds.test(change_signal_fd);
|
||||
if change_signalled || is_wait_lap {
|
||||
// Clear the change signaller before processing incoming changes
|
||||
self.change_signaller.try_consume();
|
||||
let mut data = self.data.lock().expect("Mutex poisoned!");
|
||||
|
||||
// Move from `pending` to the end of `items`
|
||||
self.items.extend(&mut data.pending.drain(..));
|
||||
|
||||
// Grab any poke list
|
||||
assert!(
|
||||
pokelist.is_empty(),
|
||||
"poke list should be empty or else we're dropping pokes!"
|
||||
);
|
||||
std::mem::swap(&mut pokelist, &mut data.pokelist);
|
||||
|
||||
if data.terminate
|
||||
|| (is_wait_lap
|
||||
&& self.items.is_empty()
|
||||
&& pokelist.is_empty()
|
||||
&& !change_signalled)
|
||||
{
|
||||
// Maybe terminate is set. Alternatively, maybe we had no items, waited a bit,
|
||||
// and still have no items. It's important to do this while holding the lock,
|
||||
// otherwise we race with new items being added.
|
||||
assert!(
|
||||
data.running,
|
||||
"Thread should be running because we're that thread"
|
||||
);
|
||||
FLOG!(fd_monitor, "Thread exiting");
|
||||
data.running = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Poke items in the poke list, removing any items that close their fd in their callback. The
|
||||
/// poke list is consumed after this. This is only called from the background thread.
|
||||
fn poke(&mut self, pokelist: &[FdMonitorItemId]) {
|
||||
self.items.retain_mut(|item| {
|
||||
let action = item.poke_item(&*pokelist);
|
||||
if action == ItemAction::Remove {
|
||||
FLOG!(fd_monitor, "Removing fd", item.fd.as_raw_fd());
|
||||
}
|
||||
return action == ItemAction::Retain;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// In ordinary usage, we never invoke the destructor. This is used in the tests to not leave stale
|
||||
/// fds arounds; this is why it's very hacky!
|
||||
impl Drop for FdMonitor {
|
||||
fn drop(&mut self) {
|
||||
// Safety: this is a port of the C++ code and we are running in the destructor. The C++ code
|
||||
// had no way to bubble back any errors encountered here, and the pthread mutex the C++ code
|
||||
// uses does not have a concept of mutex poisoning.
|
||||
self.data.lock().expect("Mutex poisoned!").terminate = true;
|
||||
self.change_signaller.post();
|
||||
|
||||
// Safety: see note above.
|
||||
while self.data.lock().expect("Mutex poisoned!").running {
|
||||
std::thread::sleep(Duration::from_millis(5));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,6 +1,8 @@
|
|||
use libc::c_int;
|
||||
use std::os::unix::io::RawFd;
|
||||
|
||||
pub use fd_readable_set_t as FdReadableSet;
|
||||
|
||||
#[cxx::bridge]
|
||||
mod fd_readable_set_ffi {
|
||||
extern "Rust" {
|
||||
|
@ -20,13 +22,13 @@ pub fn new_fd_readable_set() -> Box<fd_readable_set_t> {
|
|||
Box::new(fd_readable_set_t::new())
|
||||
}
|
||||
|
||||
/// \return true if the fd is or becomes readable within the given timeout.
|
||||
/// This returns false if the waiting is interrupted by a signal.
|
||||
/// Returns `true` if the fd is or becomes readable within the given timeout.
|
||||
/// This returns `false` if the waiting is interrupted by a signal.
|
||||
pub fn is_fd_readable(fd: i32, timeout_usec: u64) -> bool {
|
||||
fd_readable_set_t::is_fd_readable(fd, timeout_usec)
|
||||
}
|
||||
|
||||
/// \return whether an fd is readable.
|
||||
/// Returns whether an fd is readable.
|
||||
pub fn poll_fd_readable(fd: i32) -> bool {
|
||||
fd_readable_set_t::poll_fd_readable(fd)
|
||||
}
|
||||
|
@ -75,13 +77,14 @@ impl fd_readable_set_t {
|
|||
}
|
||||
}
|
||||
|
||||
/// \return true if the given fd is marked as set, in our set. \returns false if negative.
|
||||
/// Returns `true` if the given `fd` is marked as set, in our set. Returns `false` if `fd` is
|
||||
/// negative.
|
||||
pub fn test(&self, fd: RawFd) -> bool {
|
||||
fd >= 0 && unsafe { libc::FD_ISSET(fd, &self.fdset_) }
|
||||
}
|
||||
|
||||
/// Call select() or poll(), according to FISH_READABLE_SET_USE_POLL. Note this destructively
|
||||
/// modifies the set. \return the result of select() or poll().
|
||||
/// Call `select()` or `poll()`, according to FISH_READABLE_SET_USE_POLL. Note this
|
||||
/// destructively modifies the set. Returns the result of `select()` or `poll()`.
|
||||
pub fn check_readable(&mut self, timeout_usec: u64) -> c_int {
|
||||
let null = std::ptr::null_mut();
|
||||
if timeout_usec == Self::kNoTimeout {
|
||||
|
@ -106,7 +109,7 @@ impl fd_readable_set_t {
|
|||
}
|
||||
|
||||
/// Check if a single fd is readable, with a given timeout.
|
||||
/// \return true if readable, false if not.
|
||||
/// Returns `true` if readable, `false` otherwise.
|
||||
pub fn is_fd_readable(fd: RawFd, timeout_usec: u64) -> bool {
|
||||
if fd < 0 {
|
||||
return false;
|
||||
|
@ -118,7 +121,7 @@ impl fd_readable_set_t {
|
|||
}
|
||||
|
||||
/// Check if a single fd is readable, without blocking.
|
||||
/// \return true if readable, false if not.
|
||||
/// Returns `true` if readable, `false` if not.
|
||||
pub fn poll_fd_readable(fd: RawFd) -> bool {
|
||||
return Self::is_fd_readable(fd, 0);
|
||||
}
|
||||
|
@ -151,23 +154,29 @@ impl fd_readable_set_t {
|
|||
pollfd.fd
|
||||
}
|
||||
|
||||
/// Add an fd to the set. The fd is ignored if negative (for convenience).
|
||||
/// Add an fd to the set. The fd is ignored if negative (for convenience). The fd is also
|
||||
/// ignored if it's already in the set.
|
||||
pub fn add(&mut self, fd: RawFd) {
|
||||
if fd >= 0 {
|
||||
if let Err(pos) = self.pollfds_.binary_search_by_key(&fd, Self::pollfd_get_fd) {
|
||||
self.pollfds_.insert(
|
||||
pos,
|
||||
libc::pollfd {
|
||||
fd,
|
||||
events: libc::POLLIN,
|
||||
revents: 0,
|
||||
},
|
||||
);
|
||||
}
|
||||
if fd < 0 {
|
||||
return;
|
||||
}
|
||||
let pos = match self.pollfds_.binary_search_by_key(&fd, Self::pollfd_get_fd) {
|
||||
Ok(_) => return,
|
||||
Err(pos) => pos,
|
||||
};
|
||||
|
||||
self.pollfds_.insert(
|
||||
pos,
|
||||
libc::pollfd {
|
||||
fd,
|
||||
events: libc::POLLIN,
|
||||
revents: 0,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
/// \return true if the given fd is marked as set, in our set. \returns false if negative.
|
||||
/// Returns `true` if the given `fd` has input available to read or has been HUP'd.
|
||||
/// Returns `false` if `fd` is negative or was not found in the set.
|
||||
pub fn test(&self, fd: RawFd) -> bool {
|
||||
// If a pipe is widowed with no data, Linux sets POLLHUP but not POLLIN, so test for both.
|
||||
if let Ok(pos) = self.pollfds_.binary_search_by_key(&fd, Self::pollfd_get_fd) {
|
||||
|
@ -178,7 +187,7 @@ impl fd_readable_set_t {
|
|||
return false;
|
||||
}
|
||||
|
||||
// Convert from a usec to a poll-friendly msec.
|
||||
/// Convert from usecs to poll-friendly msecs.
|
||||
fn usec_to_poll_msec(timeout_usec: u64) -> c_int {
|
||||
let mut timeout_msec: u64 = timeout_usec / kUsecPerMsec;
|
||||
// Round to nearest, down for halfway.
|
||||
|
@ -206,6 +215,8 @@ impl fd_readable_set_t {
|
|||
|
||||
/// Call select() or poll(), according to FISH_READABLE_SET_USE_POLL. Note this destructively
|
||||
/// modifies the set. \return the result of select() or poll().
|
||||
///
|
||||
/// TODO: Change to [`Duration`](std::time::Duration) once FFI usage is done.
|
||||
pub fn check_readable(&mut self, timeout_usec: u64) -> c_int {
|
||||
if self.pollfds_.is_empty() {
|
||||
return 0;
|
||||
|
@ -214,7 +225,7 @@ impl fd_readable_set_t {
|
|||
}
|
||||
|
||||
/// Check if a single fd is readable, with a given timeout.
|
||||
/// \return true if readable, false if not.
|
||||
/// \return true if `fd` is our set and is readable, `false` otherwise.
|
||||
pub fn is_fd_readable(fd: RawFd, timeout_usec: u64) -> bool {
|
||||
if fd < 0 {
|
||||
return false;
|
||||
|
|
|
@ -1,13 +1,30 @@
|
|||
use crate::ffi;
|
||||
use nix::unistd;
|
||||
use std::os::unix::io::RawFd;
|
||||
use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
|
||||
|
||||
/// A helper type for managing and automatically closing a file descriptor
|
||||
pub struct autoclose_fd_t {
|
||||
///
|
||||
/// This was implemented in rust as a port of the existing C++ code but it didn't take its place
|
||||
/// (yet) and there's still the original cpp implementation in `src/fds.h`, so its name is
|
||||
/// disambiguated because some code uses a mix of both for interop purposes.
|
||||
pub struct AutoCloseFd {
|
||||
fd_: RawFd,
|
||||
}
|
||||
|
||||
impl autoclose_fd_t {
|
||||
#[cxx::bridge]
|
||||
mod autoclose_fd_t {
|
||||
extern "Rust" {
|
||||
#[cxx_name = "autoclose_fd_t2"]
|
||||
type AutoCloseFd;
|
||||
|
||||
#[cxx_name = "valid"]
|
||||
fn is_valid(&self) -> bool;
|
||||
fn close(&mut self);
|
||||
fn fd(&self) -> i32;
|
||||
}
|
||||
}
|
||||
|
||||
impl AutoCloseFd {
|
||||
// Closes the fd if not already closed.
|
||||
pub fn close(&mut self) {
|
||||
if self.fd_ != -1 {
|
||||
|
@ -37,24 +54,41 @@ impl autoclose_fd_t {
|
|||
self.fd_ = fd;
|
||||
}
|
||||
|
||||
// \return if this has a valid fd.
|
||||
pub fn valid(&self) -> bool {
|
||||
// Returns if this has a valid fd.
|
||||
pub fn is_valid(&self) -> bool {
|
||||
self.fd_ >= 0
|
||||
}
|
||||
|
||||
// Construct, taking ownership of an fd.
|
||||
pub fn new(fd: RawFd) -> autoclose_fd_t {
|
||||
autoclose_fd_t { fd_: fd }
|
||||
// Create a new AutoCloseFd instance taking ownership of the passed fd
|
||||
pub fn new(fd: RawFd) -> Self {
|
||||
AutoCloseFd { fd_: fd }
|
||||
}
|
||||
|
||||
// Create a new AutoCloseFd without an open fd
|
||||
pub fn empty() -> Self {
|
||||
AutoCloseFd { fd_: -1 }
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for autoclose_fd_t {
|
||||
fn default() -> autoclose_fd_t {
|
||||
autoclose_fd_t { fd_: -1 }
|
||||
impl FromRawFd for AutoCloseFd {
|
||||
unsafe fn from_raw_fd(fd: RawFd) -> Self {
|
||||
AutoCloseFd { fd_: fd }
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for autoclose_fd_t {
|
||||
impl AsRawFd for AutoCloseFd {
|
||||
fn as_raw_fd(&self) -> RawFd {
|
||||
self.fd()
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for AutoCloseFd {
|
||||
fn default() -> AutoCloseFd {
|
||||
AutoCloseFd { fd_: -1 }
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for AutoCloseFd {
|
||||
fn drop(&mut self) {
|
||||
self.close()
|
||||
}
|
||||
|
@ -64,10 +98,10 @@ impl Drop for autoclose_fd_t {
|
|||
#[derive(Default)]
|
||||
pub struct autoclose_pipes_t {
|
||||
/// Read end of the pipe.
|
||||
pub read: autoclose_fd_t,
|
||||
pub read: AutoCloseFd,
|
||||
|
||||
/// Write end of the pipe.
|
||||
pub write: autoclose_fd_t,
|
||||
pub write: AutoCloseFd,
|
||||
}
|
||||
|
||||
/// Construct a pair of connected pipes, set to close-on-exec.
|
||||
|
@ -75,9 +109,9 @@ pub struct autoclose_pipes_t {
|
|||
pub fn make_autoclose_pipes() -> Option<autoclose_pipes_t> {
|
||||
let pipes = ffi::make_pipes_ffi();
|
||||
|
||||
let readp = autoclose_fd_t::new(pipes.read);
|
||||
let writep = autoclose_fd_t::new(pipes.write);
|
||||
if !readp.valid() || !writep.valid() {
|
||||
let readp = AutoCloseFd::new(pipes.read);
|
||||
let writep = AutoCloseFd::new(pipes.write);
|
||||
if !readp.is_valid() || !writep.is_valid() {
|
||||
None
|
||||
} else {
|
||||
Some(autoclose_pipes_t {
|
||||
|
|
|
@ -1,9 +1,7 @@
|
|||
use crate::wchar;
|
||||
#[rustfmt::skip]
|
||||
use ::std::pin::Pin;
|
||||
#[rustfmt::skip]
|
||||
use ::std::slice;
|
||||
use autocxx::prelude::*;
|
||||
use core::pin::Pin;
|
||||
use core::slice;
|
||||
use cxx::SharedPtr;
|
||||
|
||||
// autocxx has been hacked up to know about this.
|
||||
|
@ -73,6 +71,8 @@ include_cpp! {
|
|||
generate!("sig2wcs")
|
||||
generate!("wcs2sig")
|
||||
generate!("signal_get_desc")
|
||||
|
||||
generate!("fd_event_signaller_t")
|
||||
}
|
||||
|
||||
impl parser_t {
|
||||
|
@ -135,3 +135,19 @@ impl Repin for output_stream_t {}
|
|||
pub use autocxx::c_int;
|
||||
pub use ffi::*;
|
||||
pub use libc::c_char;
|
||||
|
||||
/// A version of [`* const core::ffi::c_void`] (or [`* const libc::c_void`], if you prefer) that
|
||||
/// implements `Copy` and `Clone`, because those two don't. Used to represent a `void *` ptr for ffi
|
||||
/// purposes.
|
||||
#[repr(transparent)]
|
||||
#[derive(Copy, Clone)]
|
||||
pub struct void_ptr(pub *const core::ffi::c_void);
|
||||
|
||||
impl core::fmt::Debug for void_ptr {
|
||||
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
|
||||
write!(f, "{:p}", &self.0)
|
||||
}
|
||||
}
|
||||
|
||||
unsafe impl Send for void_ptr {}
|
||||
unsafe impl Sync for void_ptr {}
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
#![allow(clippy::manual_is_ascii_check)]
|
||||
|
||||
mod common;
|
||||
mod fd_monitor;
|
||||
mod fd_readable_set;
|
||||
mod fds;
|
||||
#[allow(rustdoc::broken_intra_doc_links)]
|
||||
|
@ -22,6 +23,7 @@ mod parse_constants;
|
|||
mod redirection;
|
||||
mod signal;
|
||||
mod smoke;
|
||||
mod threads;
|
||||
mod timer;
|
||||
mod tokenizer;
|
||||
mod topic_monitor;
|
||||
|
|
67
fish-rust/src/threads.rs
Normal file
67
fish-rust/src/threads.rs
Normal file
|
@ -0,0 +1,67 @@
|
|||
//! The rusty version of iothreads from the cpp code, to be consumed by native rust code. This isn't
|
||||
//! ported directly from the cpp code so we can use rust threads instead of using pthreads.
|
||||
|
||||
use crate::flog::FLOG;
|
||||
|
||||
/// The rusty version of `iothreads::make_detached_pthread()`. We will probably need a
|
||||
/// `spawn_scoped` version of the same to handle some more advanced borrow cases safely, and maybe
|
||||
/// an unsafe version that doesn't do any lifetime checking akin to
|
||||
/// `spawn_unchecked()`[std::thread::Builder::spawn_unchecked], which is a nightly-only feature.
|
||||
///
|
||||
/// Returns a boolean indicating whether or not the thread was successfully launched. Failure here
|
||||
/// is not dependent on the passed callback and implies a system error (likely insufficient
|
||||
/// resources).
|
||||
pub fn spawn<F: FnOnce() + Send + 'static>(callback: F) -> bool {
|
||||
// The spawned thread inherits our signal mask. Temporarily block signals, spawn the thread, and
|
||||
// then restore it. But we must not block SIGBUS, SIGFPE, SIGILL, or SIGSEGV; that's undefined
|
||||
// (#7837). Conservatively don't try to mask SIGKILL or SIGSTOP either; that's ignored on Linux
|
||||
// but maybe has an effect elsewhere.
|
||||
let saved_set = unsafe {
|
||||
let mut new_set: libc::sigset_t = std::mem::zeroed();
|
||||
let new_set = &mut new_set as *mut _;
|
||||
libc::sigfillset(new_set);
|
||||
libc::sigdelset(new_set, libc::SIGILL); // bad jump
|
||||
libc::sigdelset(new_set, libc::SIGFPE); // divide-by-zero
|
||||
libc::sigdelset(new_set, libc::SIGBUS); // unaligned memory access
|
||||
libc::sigdelset(new_set, libc::SIGSEGV); // bad memory access
|
||||
libc::sigdelset(new_set, libc::SIGSTOP); // unblockable
|
||||
libc::sigdelset(new_set, libc::SIGKILL); // unblockable
|
||||
|
||||
let mut saved_set: libc::sigset_t = std::mem::zeroed();
|
||||
let result = libc::pthread_sigmask(libc::SIG_BLOCK, new_set, &mut saved_set as *mut _);
|
||||
assert_eq!(result, 0, "Failed to override thread signal mask!");
|
||||
saved_set
|
||||
};
|
||||
|
||||
// Spawn a thread. If this fails, it means there's already a bunch of threads; it is very
|
||||
// unlikely that they are all on the verge of exiting, so one is likely to be ready to handle
|
||||
// extant requests. So we can ignore failure with some confidence.
|
||||
// We don't have to port the PTHREAD_CREATE_DETACHED logic. Rust threads are detached
|
||||
// automatically if the returned join handle is dropped.
|
||||
|
||||
let result = match std::thread::Builder::new().spawn(|| callback()) {
|
||||
Ok(handle) => {
|
||||
let id = handle.thread().id();
|
||||
FLOG!(iothread, "rust thread", id, "spawned");
|
||||
// Drop the handle to detach the thread
|
||||
drop(handle);
|
||||
true
|
||||
}
|
||||
Err(e) => {
|
||||
eprintln!("rust thread spawn failure: {e}");
|
||||
false
|
||||
}
|
||||
};
|
||||
|
||||
// Restore our sigmask
|
||||
unsafe {
|
||||
let result = libc::pthread_sigmask(
|
||||
libc::SIG_SETMASK,
|
||||
&saved_set as *const _,
|
||||
std::ptr::null_mut(),
|
||||
);
|
||||
assert_eq!(result, 0, "Failed to restore thread signal mask!");
|
||||
};
|
||||
|
||||
result
|
||||
}
|
|
@ -2,6 +2,24 @@ pub mod format;
|
|||
pub mod gettext;
|
||||
mod wcstoi;
|
||||
|
||||
use std::io::Write;
|
||||
|
||||
pub(crate) use format::printf::sprintf;
|
||||
pub(crate) use gettext::{wgettext, wgettext_fmt};
|
||||
pub use wcstoi::*;
|
||||
|
||||
/// Port of the wide-string wperror from `src/wutil.cpp` but for rust `&str`.
|
||||
pub fn perror(s: &str) {
|
||||
let e = errno::errno().0;
|
||||
let mut stderr = std::io::stderr().lock();
|
||||
if !s.is_empty() {
|
||||
let _ = write!(stderr, "{s}: ");
|
||||
}
|
||||
let slice = unsafe {
|
||||
let msg = libc::strerror(e) as *const u8;
|
||||
let len = libc::strlen(msg as *const _);
|
||||
std::slice::from_raw_parts(msg, len)
|
||||
};
|
||||
let _ = stderr.write_all(slice);
|
||||
let _ = stderr.write_all(b"\n");
|
||||
}
|
||||
|
|
|
@ -1,215 +0,0 @@
|
|||
// Support for monitoring a set of fds.
|
||||
#include "config.h" // IWYU pragma: keep
|
||||
|
||||
#include "fd_monitor.h"
|
||||
|
||||
#include <errno.h>
|
||||
|
||||
#include <algorithm>
|
||||
#include <iterator>
|
||||
#include <thread> //this_thread::sleep_for
|
||||
#include <type_traits>
|
||||
|
||||
#include "flog.h"
|
||||
#include "iothread.h"
|
||||
#include "wutil.h"
|
||||
|
||||
static constexpr uint64_t kUsecPerMsec = 1000;
|
||||
|
||||
fd_monitor_t::fd_monitor_t() = default;
|
||||
|
||||
fd_monitor_t::~fd_monitor_t() {
|
||||
// In ordinary usage, we never invoke the dtor.
|
||||
// This is used in the tests to not leave stale fds around.
|
||||
// That is why this is very hacky!
|
||||
data_.acquire()->terminate = true;
|
||||
change_signaller_.post();
|
||||
while (data_.acquire()->running) {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(5));
|
||||
}
|
||||
}
|
||||
|
||||
fd_monitor_item_id_t fd_monitor_t::add(fd_monitor_item_t &&item) {
|
||||
assert(item.fd.valid() && "Invalid fd");
|
||||
assert(item.timeout_usec != 0 && "Invalid timeout");
|
||||
assert(item.item_id == 0 && "Item should not already have an ID");
|
||||
bool start_thread = false;
|
||||
fd_monitor_item_id_t item_id{};
|
||||
{
|
||||
// Lock around a local region.
|
||||
auto data = data_.acquire();
|
||||
|
||||
// Assign an id and add the item to pending.
|
||||
item_id = ++data->last_id;
|
||||
item.item_id = item_id;
|
||||
data->pending.push_back(std::move(item));
|
||||
|
||||
// Maybe plan to start the thread.
|
||||
if (!data->running) {
|
||||
FLOG(fd_monitor, "Thread starting");
|
||||
data->running = true;
|
||||
start_thread = true;
|
||||
}
|
||||
}
|
||||
if (start_thread) {
|
||||
void *(*trampoline)(void *) = [](void *self) -> void * {
|
||||
static_cast<fd_monitor_t *>(self)->run_in_background();
|
||||
return nullptr;
|
||||
};
|
||||
bool made_thread = make_detached_pthread(trampoline, this);
|
||||
if (!made_thread) {
|
||||
DIE("Unable to create a new pthread");
|
||||
}
|
||||
}
|
||||
// Tickle our signaller.
|
||||
change_signaller_.post();
|
||||
return item_id;
|
||||
}
|
||||
|
||||
void fd_monitor_t::poke_item(fd_monitor_item_id_t item_id) {
|
||||
assert(item_id > 0 && "Invalid item ID");
|
||||
bool needs_notification = false;
|
||||
{
|
||||
auto data = data_.acquire();
|
||||
needs_notification = data->pokelist.empty();
|
||||
// Insert it, sorted.
|
||||
auto where = std::lower_bound(data->pokelist.begin(), data->pokelist.end(), item_id);
|
||||
data->pokelist.insert(where, item_id);
|
||||
}
|
||||
if (needs_notification) {
|
||||
change_signaller_.post();
|
||||
}
|
||||
}
|
||||
|
||||
uint64_t fd_monitor_item_t::usec_remaining(const time_point_t &now) const {
|
||||
assert(last_time.has_value() && "Should always have a last_time");
|
||||
if (timeout_usec == kNoTimeout) return kNoTimeout;
|
||||
assert(now >= *last_time && "steady clock went backwards!");
|
||||
uint64_t since = static_cast<uint64_t>(
|
||||
std::chrono::duration_cast<std::chrono::microseconds>(now - *last_time).count());
|
||||
return since >= timeout_usec ? 0 : timeout_usec - since;
|
||||
}
|
||||
|
||||
bool fd_monitor_item_t::service_item(const fd_readable_set_t &fds, const time_point_t &now) {
|
||||
bool should_retain = true;
|
||||
bool readable = fds.test(fd.fd());
|
||||
bool timed_out = !readable && usec_remaining(now) == 0;
|
||||
if (readable || timed_out) {
|
||||
last_time = now;
|
||||
item_wake_reason_t reason =
|
||||
readable ? item_wake_reason_t::readable : item_wake_reason_t::timeout;
|
||||
callback(fd, reason);
|
||||
should_retain = fd.valid();
|
||||
}
|
||||
return should_retain;
|
||||
}
|
||||
|
||||
bool fd_monitor_item_t::poke_item(const poke_list_t &pokelist) {
|
||||
if (item_id == 0 || !std::binary_search(pokelist.begin(), pokelist.end(), item_id)) {
|
||||
// Not pokeable or not in the pokelist.
|
||||
return true;
|
||||
}
|
||||
callback(fd, item_wake_reason_t::poke);
|
||||
return fd.valid();
|
||||
}
|
||||
|
||||
void fd_monitor_t::run_in_background() {
|
||||
ASSERT_IS_BACKGROUND_THREAD();
|
||||
poke_list_t pokelist;
|
||||
auto fds_box = new_fd_readable_set();
|
||||
auto &fds = *fds_box;
|
||||
for (;;) {
|
||||
// Poke any items that need it.
|
||||
if (!pokelist.empty()) {
|
||||
this->poke_in_background(pokelist);
|
||||
pokelist.clear();
|
||||
}
|
||||
|
||||
fds.clear();
|
||||
|
||||
// Our change_signaller is special cased.
|
||||
int change_signal_fd = change_signaller_.read_fd();
|
||||
fds.add(change_signal_fd);
|
||||
|
||||
auto now = std::chrono::steady_clock::now();
|
||||
uint64_t timeout_usec = kNoTimeout;
|
||||
|
||||
for (auto &item : items_) {
|
||||
fds.add(item.fd.fd());
|
||||
if (!item.last_time.has_value()) item.last_time = now;
|
||||
timeout_usec = std::min(timeout_usec, item.usec_remaining(now));
|
||||
}
|
||||
|
||||
// If we have no items, then we wish to allow the thread to exit, but after a time, so we
|
||||
// aren't spinning up and tearing down the thread repeatedly.
|
||||
// Set a timeout of 256 msec; if nothing becomes readable by then we will exit.
|
||||
// We refer to this as the wait-lap.
|
||||
bool is_wait_lap = (items_.size() == 0);
|
||||
if (is_wait_lap) {
|
||||
assert(timeout_usec == kNoTimeout && "Should not have a timeout on wait-lap");
|
||||
timeout_usec = 256 * kUsecPerMsec;
|
||||
}
|
||||
|
||||
// Call select().
|
||||
int ret = fds.check_readable(timeout_usec);
|
||||
if (ret < 0 && errno != EINTR) {
|
||||
// Surprising error.
|
||||
wperror(L"select");
|
||||
}
|
||||
|
||||
// A predicate which services each item in turn, returning true if it should be removed.
|
||||
auto servicer = [&fds, &now](fd_monitor_item_t &item) {
|
||||
int fd = item.fd.fd();
|
||||
bool remove = !item.service_item(fds, now);
|
||||
if (remove) FLOG(fd_monitor, "Removing fd", fd);
|
||||
return remove;
|
||||
};
|
||||
|
||||
// Service all items that are either readable or timed out, and remove any which say to do
|
||||
// so.
|
||||
now = std::chrono::steady_clock::now();
|
||||
items_.erase(std::remove_if(items_.begin(), items_.end(), servicer), items_.end());
|
||||
|
||||
// Handle any changes if the change signaller was set. Alternatively this may be the wait
|
||||
// lap, in which case we might want to commit to exiting.
|
||||
bool change_signalled = fds.test(change_signal_fd);
|
||||
if (change_signalled || is_wait_lap) {
|
||||
// Clear the change signaller before processing incoming changes.
|
||||
change_signaller_.try_consume();
|
||||
auto data = data_.acquire();
|
||||
|
||||
// Move from 'pending' to 'items'.
|
||||
items_.insert(items_.end(), std::make_move_iterator(data->pending.begin()),
|
||||
std::make_move_iterator(data->pending.end()));
|
||||
data->pending.clear();
|
||||
|
||||
// Grab any pokelist.
|
||||
assert(pokelist.empty() && "pokelist should be empty or else we're dropping pokes");
|
||||
pokelist = std::move(data->pokelist);
|
||||
data->pokelist.clear();
|
||||
|
||||
if (data->terminate ||
|
||||
(is_wait_lap && items_.empty() && pokelist.empty() && !change_signalled)) {
|
||||
// Maybe terminate is set.
|
||||
// Alternatively, maybe we had no items, waited a bit, and still have no items.
|
||||
// It's important to do this while holding the lock, otherwise we race with new
|
||||
// items being added.
|
||||
assert(data->running && "Thread should be running because we're that thread");
|
||||
FLOG(fd_monitor, "Thread exiting");
|
||||
data->running = false;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void fd_monitor_t::poke_in_background(const poke_list_t &pokelist) {
|
||||
ASSERT_IS_BACKGROUND_THREAD();
|
||||
auto poker = [&pokelist](fd_monitor_item_t &item) {
|
||||
int fd = item.fd.fd();
|
||||
bool remove = !item.poke_item(pokelist);
|
||||
if (remove) FLOG(fd_monitor, "Removing fd", fd);
|
||||
return remove;
|
||||
};
|
||||
items_.erase(std::remove_if(items_.begin(), items_.end(), poker), items_.end());
|
||||
}
|
140
src/fd_monitor.h
140
src/fd_monitor.h
|
@ -1,140 +0,0 @@
|
|||
#ifndef FISH_FD_MONITOR_H
|
||||
#define FISH_FD_MONITOR_H
|
||||
|
||||
#include <chrono>
|
||||
#include <cstdint>
|
||||
#include <functional>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
// Needed for musl
|
||||
#include <sys/select.h> // IWYU pragma: keep
|
||||
|
||||
#include "common.h"
|
||||
#include "fd_readable_set.rs.h"
|
||||
#include "fds.h"
|
||||
#include "maybe.h"
|
||||
|
||||
/// Each item added to fd_monitor_t is assigned a unique ID, which is not recycled.
|
||||
/// Items may have their callback triggered immediately by passing the ID.
|
||||
/// Zero is a sentinel.
|
||||
using fd_monitor_item_id_t = uint64_t;
|
||||
|
||||
/// Reasons for waking an item.
|
||||
enum class item_wake_reason_t {
|
||||
readable, // the fd became readable
|
||||
timeout, // the requested timeout was hit
|
||||
poke, // the item was "poked" (woken up explicitly)
|
||||
};
|
||||
|
||||
/// An item containing an fd and callback, which can be monitored to watch when it becomes readable,
|
||||
/// and invoke the callback.
|
||||
struct fd_monitor_item_t {
|
||||
/// The callback type for the item. It is passed \p fd, and the reason for waking \p reason.
|
||||
/// The callback may close \p fd, in which case the item is removed.
|
||||
using callback_t = std::function<void(autoclose_fd_t &fd, item_wake_reason_t reason)>;
|
||||
|
||||
/// The fd to monitor.
|
||||
autoclose_fd_t fd{};
|
||||
|
||||
/// A callback to be invoked when the fd is readable, or when we are timed out.
|
||||
/// If we time out, then timed_out will be true.
|
||||
/// If the fd is invalid on return from the function, then the item is removed.
|
||||
callback_t callback{};
|
||||
|
||||
/// The timeout in microseconds, or kNoTimeout for none.
|
||||
/// 0 timeouts are unsupported.
|
||||
uint64_t timeout_usec{kNoTimeout};
|
||||
|
||||
/// Construct from a file, callback, and optional timeout.
|
||||
fd_monitor_item_t(autoclose_fd_t fd, callback_t callback, uint64_t timeout_usec = kNoTimeout)
|
||||
: fd(std::move(fd)), callback(std::move(callback)), timeout_usec(timeout_usec) {
|
||||
assert(timeout_usec > 0 && "Invalid timeout");
|
||||
}
|
||||
|
||||
fd_monitor_item_t() = default;
|
||||
|
||||
private:
|
||||
// Fields and methods for the private use of fd_monitor_t.
|
||||
using time_point_t = std::chrono::time_point<std::chrono::steady_clock>;
|
||||
|
||||
// The last time we were called, or the initialization point.
|
||||
maybe_t<time_point_t> last_time{};
|
||||
|
||||
// The ID for this item. This is assigned by the fd monitor.
|
||||
fd_monitor_item_id_t item_id{0};
|
||||
|
||||
// \return the number of microseconds until the timeout should trigger, or kNoTimeout for none.
|
||||
// A 0 return means we are at or past the timeout.
|
||||
uint64_t usec_remaining(const time_point_t &now) const;
|
||||
|
||||
// Invoke this item's callback if its value is set in fd or has timed out.
|
||||
// \return true to retain the item, false to remove it.
|
||||
bool service_item(const fd_readable_set_t &fds, const time_point_t &now);
|
||||
|
||||
// Invoke this item's callback with a poke, if its ID is present in the (sorted) pokelist.
|
||||
// \return true to retain the item, false to remove it.
|
||||
using poke_list_t = std::vector<fd_monitor_item_id_t>;
|
||||
bool poke_item(const poke_list_t &pokelist);
|
||||
|
||||
friend class fd_monitor_t;
|
||||
};
|
||||
|
||||
/// A class which can monitor a set of fds, invoking a callback when any becomes readable, or when
|
||||
/// per-item-configurable timeouts are hit.
|
||||
class fd_monitor_t {
|
||||
public:
|
||||
using item_list_t = std::vector<fd_monitor_item_t>;
|
||||
|
||||
// A "pokelist" is a sorted list of item IDs which need explicit wakeups.
|
||||
using poke_list_t = std::vector<fd_monitor_item_id_t>;
|
||||
|
||||
fd_monitor_t();
|
||||
~fd_monitor_t();
|
||||
|
||||
/// Add an item to monitor. \return the ID assigned to the item.
|
||||
fd_monitor_item_id_t add(fd_monitor_item_t &&item);
|
||||
|
||||
/// Mark that an item with a given ID needs to be explicitly woken up.
|
||||
void poke_item(fd_monitor_item_id_t item_id);
|
||||
|
||||
private:
|
||||
// The background thread runner.
|
||||
void run_in_background();
|
||||
|
||||
// If our self-signaller is reported as ready, this reads from it and handles any changes.
|
||||
// Called in the background thread.
|
||||
void handle_self_signal_in_background();
|
||||
|
||||
// Poke items in the pokelist, removing any items that close their FD.
|
||||
// The pokelist is consumed after this.
|
||||
// This is only called in the background thread.
|
||||
void poke_in_background(const poke_list_t &pokelist);
|
||||
|
||||
// The list of items to monitor. This is only accessed on the background thread.
|
||||
item_list_t items_{};
|
||||
|
||||
struct data_t {
|
||||
/// Pending items. This is set under the lock, then the background thread grabs them.
|
||||
item_list_t pending{};
|
||||
|
||||
/// List of IDs for items that need to be poked (explicitly woken up).
|
||||
poke_list_t pokelist{};
|
||||
|
||||
/// The last ID assigned, or if none.
|
||||
fd_monitor_item_id_t last_id{0};
|
||||
|
||||
/// Whether the thread is running.
|
||||
bool running{false};
|
||||
|
||||
// Set if we should terminate.
|
||||
bool terminate{false};
|
||||
};
|
||||
owning_lock<data_t> data_;
|
||||
|
||||
/// Our self-signaller. When this is written to, it means there are new items pending, or new
|
||||
/// items in the pokelist, or terminate is set.
|
||||
fd_event_signaller_t change_signaller_;
|
||||
};
|
||||
|
||||
#endif
|
|
@ -29,6 +29,10 @@ void autoclose_fd_t::close() {
|
|||
fd_ = -1;
|
||||
}
|
||||
|
||||
std::shared_ptr<fd_event_signaller_t> ffi_new_fd_event_signaller_t() {
|
||||
return std::make_shared<fd_event_signaller_t>();
|
||||
}
|
||||
|
||||
#ifdef HAVE_EVENTFD
|
||||
// Note we do not want to use EFD_SEMAPHORE because we are binary (not counting) semaphore.
|
||||
fd_event_signaller_t::fd_event_signaller_t() {
|
||||
|
@ -78,7 +82,7 @@ bool fd_event_signaller_t::try_consume() const {
|
|||
return ret > 0;
|
||||
}
|
||||
|
||||
void fd_event_signaller_t::post() {
|
||||
void fd_event_signaller_t::post() const {
|
||||
// eventfd writes uint64; pipes write 1 byte.
|
||||
#ifdef HAVE_EVENTFD
|
||||
const uint64_t c = 1;
|
||||
|
|
|
@ -109,7 +109,7 @@ class fd_event_signaller_t {
|
|||
|
||||
/// Mark that an event has been received. This may be coalesced.
|
||||
/// This retries on EINTR.
|
||||
void post();
|
||||
void post() const;
|
||||
|
||||
/// Perform a poll to see if an event is received.
|
||||
/// If \p wait is set, wait until it is readable; this does not consume the event
|
||||
|
@ -135,6 +135,8 @@ class fd_event_signaller_t {
|
|||
#endif
|
||||
};
|
||||
|
||||
std::shared_ptr<fd_event_signaller_t> ffi_new_fd_event_signaller_t();
|
||||
|
||||
/// Sets CLO_EXEC on a given fd according to the value of \p should_set.
|
||||
int set_cloexec(int fd, bool should_set = true);
|
||||
|
||||
|
|
|
@ -41,6 +41,8 @@
|
|||
#include <unordered_map>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
#include "fds.rs.h"
|
||||
#include "parse_constants.rs.h"
|
||||
|
||||
#ifdef FISH_CI_SAN
|
||||
#include <sanitizer/lsan_interface.h>
|
||||
|
@ -59,7 +61,7 @@
|
|||
#include "env_universal_common.h"
|
||||
#include "expand.h"
|
||||
#include "fallback.h" // IWYU pragma: keep
|
||||
#include "fd_monitor.h"
|
||||
#include "fd_monitor.rs.h"
|
||||
#include "fd_readable_set.rs.h"
|
||||
#include "fds.h"
|
||||
#include "ffi_init.rs.h"
|
||||
|
@ -806,36 +808,42 @@ static void test_fd_monitor() {
|
|||
std::atomic<size_t> length_read{0};
|
||||
std::atomic<size_t> pokes{0};
|
||||
std::atomic<size_t> total_calls{0};
|
||||
fd_monitor_item_id_t item_id{0};
|
||||
uint64_t item_id{0};
|
||||
bool always_exit{false};
|
||||
fd_monitor_item_t item;
|
||||
std::unique_ptr<rust::Box<fd_monitor_item_t>> item;
|
||||
autoclose_fd_t writer;
|
||||
|
||||
void callback(autoclose_fd_t2 &fd, item_wake_reason_t reason) {
|
||||
bool was_closed = false;
|
||||
switch (reason) {
|
||||
case item_wake_reason_t::Timeout:
|
||||
this->did_timeout = true;
|
||||
break;
|
||||
case item_wake_reason_t::Poke:
|
||||
this->pokes += 1;
|
||||
break;
|
||||
case item_wake_reason_t::Readable:
|
||||
char buff[4096];
|
||||
ssize_t amt = read(fd.fd(), buff, sizeof buff);
|
||||
this->length_read += amt;
|
||||
was_closed = (amt == 0);
|
||||
break;
|
||||
}
|
||||
total_calls += 1;
|
||||
if (always_exit || was_closed) {
|
||||
fd.close();
|
||||
}
|
||||
}
|
||||
|
||||
static void trampoline(autoclose_fd_t2 &fd, item_wake_reason_t reason, c_void *param) {
|
||||
auto &instance = *(item_maker_t*)(param);
|
||||
instance.callback(fd, reason);
|
||||
}
|
||||
|
||||
explicit item_maker_t(uint64_t timeout_usec) {
|
||||
auto pipes = make_autoclose_pipes().acquire();
|
||||
writer = std::move(pipes.write);
|
||||
auto callback = [this](autoclose_fd_t &fd, item_wake_reason_t reason) {
|
||||
bool was_closed = false;
|
||||
switch (reason) {
|
||||
case item_wake_reason_t::timeout:
|
||||
this->did_timeout = true;
|
||||
break;
|
||||
case item_wake_reason_t::poke:
|
||||
this->pokes += 1;
|
||||
break;
|
||||
case item_wake_reason_t::readable:
|
||||
char buff[4096];
|
||||
ssize_t amt = read(fd.fd(), buff, sizeof buff);
|
||||
this->length_read += amt;
|
||||
was_closed = (amt == 0);
|
||||
break;
|
||||
}
|
||||
total_calls += 1;
|
||||
if (always_exit || was_closed) {
|
||||
fd.close();
|
||||
}
|
||||
};
|
||||
item = fd_monitor_item_t(std::move(pipes.read), std::move(callback), timeout_usec);
|
||||
item = std::make_unique<rust::Box<fd_monitor_item_t>>(make_fd_monitor_item_t(pipes.read.acquire(), timeout_usec, (c_void *)item_maker_t::trampoline, (c_void*)this));
|
||||
}
|
||||
|
||||
// Write 42 bytes to our write end.
|
||||
|
@ -871,18 +879,18 @@ static void test_fd_monitor() {
|
|||
item_oneshot.always_exit = true;
|
||||
|
||||
{
|
||||
fd_monitor_t monitor;
|
||||
auto monitor = make_fd_monitor_t();
|
||||
for (item_maker_t *item :
|
||||
{&item_never, &item_hugetimeout, &item0_timeout, &item42_timeout, &item42_nottimeout,
|
||||
&item42_thenclose, &item_pokee, &item_oneshot}) {
|
||||
item->item_id = monitor.add(std::move(item->item));
|
||||
item->item_id = monitor->add(std::move(*(std::move(item->item))));
|
||||
}
|
||||
item42_timeout.write42();
|
||||
item42_nottimeout.write42();
|
||||
item42_thenclose.write42();
|
||||
item42_thenclose.writer.close();
|
||||
item_oneshot.write42();
|
||||
monitor.poke_item(item_pokee.item_id);
|
||||
monitor->poke_item(item_pokee.item_id);
|
||||
|
||||
// May need to loop here to ensure our fd_monitor gets scheduled - see #7699.
|
||||
for (int i = 0; i < 100; i++) {
|
||||
|
|
91
src/io.cpp
91
src/io.cpp
|
@ -14,7 +14,9 @@
|
|||
|
||||
#include "common.h"
|
||||
#include "fallback.h" // IWYU pragma: keep
|
||||
#include "fd_monitor.h"
|
||||
#include "fd_monitor.rs.h"
|
||||
#include "fds.h"
|
||||
#include "fds.rs.h"
|
||||
#include "flog.h"
|
||||
#include "maybe.h"
|
||||
#include "path.h"
|
||||
|
@ -31,7 +33,7 @@
|
|||
/// Provide the fd monitor used for background fillthread operations.
|
||||
static fd_monitor_t &fd_monitor() {
|
||||
// Deliberately leaked to avoid shutdown dtors.
|
||||
static auto fdm = new fd_monitor_t();
|
||||
static auto fdm = make_fd_monitor_t();
|
||||
return *fdm;
|
||||
}
|
||||
|
||||
|
@ -75,6 +77,18 @@ ssize_t io_buffer_t::read_once(int fd, acquired_lock<separated_buffer_t> &buffer
|
|||
return amt;
|
||||
}
|
||||
|
||||
struct callback_args_t {
|
||||
io_buffer_t *instance;
|
||||
std::shared_ptr<std::promise<void>> promise;
|
||||
};
|
||||
|
||||
extern "C" {
|
||||
static void item_callback_trampoline(autoclose_fd_t2 &fd, item_wake_reason_t reason,
|
||||
callback_args_t *args) {
|
||||
(args->instance)->item_callback(fd, (uint8_t)reason, args);
|
||||
}
|
||||
}
|
||||
|
||||
void io_buffer_t::begin_filling(autoclose_fd_t fd) {
|
||||
assert(!fillthread_running() && "Already have a fillthread");
|
||||
|
||||
|
@ -102,38 +116,51 @@ void io_buffer_t::begin_filling(autoclose_fd_t fd) {
|
|||
|
||||
// Run our function to read until the receiver is closed.
|
||||
// It's OK to capture 'this' by value because 'this' waits for the promise in its dtor.
|
||||
fd_monitor_item_t item;
|
||||
item.fd = std::move(fd);
|
||||
item.callback = [this, promise](autoclose_fd_t &fd, item_wake_reason_t reason) {
|
||||
ASSERT_IS_BACKGROUND_THREAD();
|
||||
// Only check the shutdown flag if we timed out or were poked.
|
||||
// It's important that if select() indicated we were readable, that we call select() again
|
||||
// allowing it to time out. Note the typical case is that the fd will be closed, in which
|
||||
// case select will return immediately.
|
||||
bool done = false;
|
||||
if (reason == item_wake_reason_t::readable) {
|
||||
// select() reported us as readable; read a bit.
|
||||
auto buffer = buffer_.acquire();
|
||||
ssize_t ret = read_once(fd.fd(), buffer);
|
||||
done = (ret == 0 || (ret < 0 && errno != EAGAIN && errno != EWOULDBLOCK));
|
||||
} else if (shutdown_fillthread_) {
|
||||
// Here our caller asked us to shut down; read while we keep getting data.
|
||||
// This will stop when the fd is closed or if we get EAGAIN.
|
||||
auto buffer = buffer_.acquire();
|
||||
ssize_t ret;
|
||||
do {
|
||||
ret = read_once(fd.fd(), buffer);
|
||||
} while (ret > 0);
|
||||
done = true;
|
||||
}
|
||||
if (done) {
|
||||
fd.close();
|
||||
promise->set_value();
|
||||
}
|
||||
};
|
||||
this->item_id_ = fd_monitor().add(std::move(item));
|
||||
auto args = new callback_args_t;
|
||||
args->instance = this;
|
||||
args->promise = std::move(promise);
|
||||
|
||||
item_id_ =
|
||||
fd_monitor().add_item(fd.acquire(), kNoTimeout, (::c_void *)item_callback_trampoline, (::c_void *)args);
|
||||
}
|
||||
|
||||
/// This is a hack to work around the difficulties in passing a capturing lambda across FFI
|
||||
/// boundaries. A static function that takes a generic/untyped callback parameter is easy to
|
||||
/// marshall with the basic C ABI.
|
||||
void io_buffer_t::item_callback(autoclose_fd_t2 &fd, uint8_t r, callback_args_t *args) {
|
||||
item_wake_reason_t reason = (item_wake_reason_t)r;
|
||||
auto &promise = *args->promise;
|
||||
|
||||
// Only check the shutdown flag if we timed out or were poked.
|
||||
// It's important that if select() indicated we were readable, that we call select() again
|
||||
// allowing it to time out. Note the typical case is that the fd will be closed, in which
|
||||
// case select will return immediately.
|
||||
bool done = false;
|
||||
if (reason == item_wake_reason_t::Readable) {
|
||||
// select() reported us as readable; read a bit.
|
||||
auto buffer = buffer_.acquire();
|
||||
ssize_t ret = read_once(fd.fd(), buffer);
|
||||
done = (ret == 0 || (ret < 0 && errno != EAGAIN && errno != EWOULDBLOCK));
|
||||
} else if (shutdown_fillthread_) {
|
||||
// Here our caller asked us to shut down; read while we keep getting data.
|
||||
// This will stop when the fd is closed or if we get EAGAIN.
|
||||
auto buffer = buffer_.acquire();
|
||||
ssize_t ret;
|
||||
do {
|
||||
ret = read_once(fd.fd(), buffer);
|
||||
} while (ret > 0);
|
||||
done = true;
|
||||
}
|
||||
if (done) {
|
||||
fd.close();
|
||||
promise.set_value();
|
||||
// When we close the fd, we signal to the caller that the fd should be removed from its set
|
||||
// and that this callback should never be called again.
|
||||
// Manual memory management is not nice but this is just during the cpp-to-rust transition.
|
||||
delete args;
|
||||
}
|
||||
};
|
||||
|
||||
separated_buffer_t io_buffer_t::complete_background_fillthread_and_take_buffer() {
|
||||
// Mark that our fillthread is done, then wake it up.
|
||||
assert(fillthread_running() && "Should have a fillthread");
|
||||
|
|
6
src/io.h
6
src/io.h
|
@ -275,6 +275,9 @@ class io_bufferfill_t final : public io_data_t {
|
|||
static separated_buffer_t finish(std::shared_ptr<io_bufferfill_t> &&filler);
|
||||
};
|
||||
|
||||
struct callback_args_t;
|
||||
struct autoclose_fd_t2;
|
||||
|
||||
/// An io_buffer_t is a buffer which can populate itself by reading from an fd.
|
||||
/// It is not an io_data_t.
|
||||
class io_buffer_t {
|
||||
|
@ -291,6 +294,9 @@ class io_buffer_t {
|
|||
/// \return true if output was discarded due to exceeding the read limit.
|
||||
bool discarded() { return buffer_.acquire()->discarded(); }
|
||||
|
||||
/// FFI callback workaround.
|
||||
void item_callback(autoclose_fd_t2 &fd, uint8_t reason, callback_args_t *args);
|
||||
|
||||
private:
|
||||
/// Read some, filling the buffer. The buffer is passed in to enforce that the append lock is
|
||||
/// held. \return positive on success, 0 if closed, -1 on error (in which case errno will be
|
||||
|
|
Loading…
Reference in a new issue