diff --git a/src/env_universal_common.cpp b/src/env_universal_common.cpp index 2c5a2edb2..1dc4459ef 100644 --- a/src/env_universal_common.cpp +++ b/src/env_universal_common.cpp @@ -1459,7 +1459,7 @@ class universal_notifier_named_pipe_t final : public universal_notifier_t { // If we're no longer readable, go back to wait mode. // Conversely, if we have been readable too long, perhaps some fish died while its // written data was still on the pipe; drain some. - if (!select_wrapper_t::poll_fd_readable(pipe_fd.fd())) { + if (!fd_readable_set_t::poll_fd_readable(pipe_fd.fd())) { set_state(waiting_for_readable); } else if (get_time() >= state_start_usec + k_readable_too_long_duration_usec) { drain_excess(); @@ -1479,7 +1479,7 @@ class universal_notifier_named_pipe_t final : public universal_notifier_t { // change occurred with ours. if (get_time() >= state_start_usec + k_flash_duration_usec) { drain_written(); - if (!select_wrapper_t::poll_fd_readable(pipe_fd.fd())) { + if (!fd_readable_set_t::poll_fd_readable(pipe_fd.fd())) { set_state(waiting_for_readable); } else { set_state(polling_during_readable); diff --git a/src/fd_monitor.cpp b/src/fd_monitor.cpp index 020ae3d2c..25b745476 100644 --- a/src/fd_monitor.cpp +++ b/src/fd_monitor.cpp @@ -87,7 +87,7 @@ uint64_t fd_monitor_item_t::usec_remaining(const time_point_t &now) const { return since >= timeout_usec ? 0 : timeout_usec - since; } -bool fd_monitor_item_t::service_item(const select_wrapper_t &fds, const time_point_t &now) { +bool fd_monitor_item_t::service_item(const fd_readable_set_t &fds, const time_point_t &now) { bool should_retain = true; bool readable = fds.test(fd.fd()); bool timed_out = !readable && usec_remaining(now) == 0; @@ -113,7 +113,7 @@ bool fd_monitor_item_t::poke_item(const poke_list_t &pokelist) { void fd_monitor_t::run_in_background() { ASSERT_IS_BACKGROUND_THREAD(); poke_list_t pokelist; - select_wrapper_t fds; + fd_readable_set_t fds; for (;;) { // Poke any items that need it. if (!pokelist.empty()) { @@ -149,7 +149,7 @@ void fd_monitor_t::run_in_background() { } // Call select(). - int ret = fds.select(timeout_usec); + int ret = fds.check_readable(timeout_usec); if (ret < 0 && errno != EINTR) { // Surprising error. wperror(L"select"); diff --git a/src/fd_monitor.h b/src/fd_monitor.h index c2b85221d..7ac71da25 100644 --- a/src/fd_monitor.h +++ b/src/fd_monitor.h @@ -33,7 +33,7 @@ struct fd_monitor_item_t { using callback_t = std::function; /// A sentinel value meaning no timeout. - static constexpr uint64_t kNoTimeout = select_wrapper_t::kNoTimeout; + static constexpr uint64_t kNoTimeout = fd_readable_set_t::kNoTimeout; /// The fd to monitor. autoclose_fd_t fd{}; @@ -71,7 +71,7 @@ struct fd_monitor_item_t { // Invoke this item's callback if its value is set in fd or has timed out. // \return true to retain the item, false to remove it. - bool service_item(const select_wrapper_t &fds, const time_point_t &now); + bool service_item(const fd_readable_set_t &fds, const time_point_t &now); // Invoke this item's callback with a poke, if its ID is present in the (sorted) pokelist. // \return true to retain the item, false to remove it. diff --git a/src/fds.cpp b/src/fds.cpp index 5be48d77f..385196b8c 100644 --- a/src/fds.cpp +++ b/src/fds.cpp @@ -20,9 +20,8 @@ // The first fd in the "high range." fds below this are allowed to be used directly by users in // redirections, e.g. >&3 const int k_first_high_fd = 10; - static constexpr uint64_t kUsecPerMsec = 1000; -static constexpr uint64_t kUsecPerSec = 1000 * kUsecPerMsec; +static constexpr uint64_t kUsecPerSec [[gnu::unused]] = 1000 * kUsecPerMsec; void autoclose_fd_t::close() { if (fd_ < 0) return; @@ -30,23 +29,85 @@ void autoclose_fd_t::close() { fd_ = -1; } -select_wrapper_t::select_wrapper_t() { clear(); } +fd_readable_set_t::fd_readable_set_t() { clear(); } -void select_wrapper_t::clear() { +#if FISH_READABLE_SET_USE_POLL + +// Convert from a usec to a poll-friendly msec. +static int usec_to_poll_msec(uint64_t timeout_usec) { + uint64_t timeout_msec = timeout_usec / kUsecPerMsec; + // Round to nearest, down for halfway. + timeout_msec += ((timeout_usec % kUsecPerMsec) > kUsecPerMsec / 2) ? 1 : 0; + if (timeout_usec == fd_readable_set_t::kNoTimeout || + timeout_msec > std::numeric_limits::max()) { + // Negative values mean wait forever in poll-speak. + return -1; + } + return static_cast(timeout_msec); +} + +void fd_readable_set_t::clear() { pollfds_.clear(); } + +static inline bool pollfd_less_than(const pollfd &lhs, int rhs) { return lhs.fd < rhs; } + +void fd_readable_set_t::add(int fd) { + if (fd >= 0) { + auto where = std::lower_bound(pollfds_.begin(), pollfds_.end(), fd, pollfd_less_than); + if (where == pollfds_.end() || where->fd != fd) { + pollfds_.insert(where, pollfd{fd, POLLIN, 0}); + } + } +} + +bool fd_readable_set_t::test(int fd) const { + // If a pipe is widowed with no data, Linux sets POLLHUP but not POLLIN, so test for both. + auto where = std::lower_bound(pollfds_.begin(), pollfds_.end(), fd, pollfd_less_than); + return where != pollfds_.end() && where->fd == fd && (where->revents & (POLLIN | POLLHUP)); +} + +// static +int fd_readable_set_t::do_poll(struct pollfd *fds, size_t count, uint64_t timeout_usec) { + assert(count <= std::numeric_limits::max() && "count too big"); + return ::poll(fds, static_cast(count), usec_to_poll_msec(timeout_usec)); +} + +int fd_readable_set_t::check_readable(uint64_t timeout_usec) { + if (pollfds_.empty()) return 0; + return do_poll(&pollfds_[0], pollfds_.size(), timeout_usec); +} + +// static +bool fd_readable_set_t::is_fd_readable(int fd, uint64_t timeout_usec) { + if (fd < 0) return false; + struct pollfd pfd { + fd, POLLIN, 0 + }; + int ret = fd_readable_set_t::do_poll(&pfd, 1, timeout_usec); + return ret > 0 && (pfd.revents & POLLIN); +} + +#else +// Implementation based on select(). + +void fd_readable_set_t::clear() { FD_ZERO(&fdset_); nfds_ = 0; } -void select_wrapper_t::add(int fd) { +void fd_readable_set_t::add(int fd) { + if (fd >= FD_SETSIZE) { + FLOGF(error, "fd %d too large for select()", fd); + return; + } if (fd >= 0) { FD_SET(fd, &fdset_); nfds_ = std::max(nfds_, fd + 1); } } -bool select_wrapper_t::test(int fd) const { return fd >= 0 && FD_ISSET(fd, &fdset_); } +bool fd_readable_set_t::test(int fd) const { return fd >= 0 && FD_ISSET(fd, &fdset_); } -int select_wrapper_t::select(uint64_t timeout_usec) { +int fd_readable_set_t::check_readable(uint64_t timeout_usec) { if (timeout_usec == kNoTimeout) { return ::select(nfds_, &fdset_, nullptr, nullptr, nullptr); } else { @@ -58,16 +119,18 @@ int select_wrapper_t::select(uint64_t timeout_usec) { } // static -bool select_wrapper_t::is_fd_readable(int fd, uint64_t timeout_usec) { +bool fd_readable_set_t::is_fd_readable(int fd, uint64_t timeout_usec) { if (fd < 0) return false; - select_wrapper_t s; + fd_readable_set_t s; s.add(fd); - int res = s.select(timeout_usec); + int res = s.check_readable(timeout_usec); return res > 0 && s.test(fd); } +#endif // not FISH_READABLE_SET_USE_POLL + // static -bool select_wrapper_t::poll_fd_readable(int fd) { return is_fd_readable(fd, 0); } +bool fd_readable_set_t::poll_fd_readable(int fd) { return is_fd_readable(fd, 0); } #ifdef HAVE_EVENTFD // Note we do not want to use EFD_SEMAPHORE because we are binary (not counting) semaphore. diff --git a/src/fds.h b/src/fds.h index 3d8be84cb..2ae7101ff 100644 --- a/src/fds.h +++ b/src/fds.h @@ -5,6 +5,7 @@ #include "config.h" // IWYU pragma: keep +#include #include #include @@ -62,12 +63,23 @@ class autoclose_fd_t : noncopyable_t { ~autoclose_fd_t() { close(); } }; -/// A modest wrapper around fd_set and select(). -/// This allows accumulating a set of fds and then select()ing on them. +// Resolve whether to use poll() or select(). +#ifndef FISH_READABLE_SET_USE_POLL +#ifdef __APPLE__ +// Apple's `man poll`: "The poll() system call currently does not support devices." +#define FISH_READABLE_SET_USE_POLL 0 +#else +// Use poll other places so we can support unlimited fds. +#define FISH_READABLE_SET_USE_POLL 1 +#endif +#endif + +/// A modest wrapper around select() or poll(), according to FISH_READABLE_SET_USE_POLL. +/// This allows accumulating a set of fds and then seeing if they are readable. /// This only handles readability. -struct select_wrapper_t { +struct fd_readable_set_t { /// Construct an empty set. - select_wrapper_t(); + fd_readable_set_t(); /// Reset back to an empty set. void clear(); @@ -78,15 +90,15 @@ struct select_wrapper_t { /// \return true if the given fd is marked as set, in our set. \returns false if negative. bool test(int fd) const; - /// Call select(), with this set as 'readfds' and null for the other sets, with a timeout given - /// by timeout_usec. Note this destructively modifies the set. \return the result of select(). - int select(uint64_t timeout_usec = select_wrapper_t::kNoTimeout); + /// Call select() or poll(), according to FISH_READABLE_SET_USE_POLL. Note this destructively + /// modifies the set. \return the result of select() or poll(). + int check_readable(uint64_t timeout_usec = fd_readable_set_t::kNoTimeout); - /// Poll a single fd: select() on it with a given timeout. + /// Check if a single fd is readable, with a given timeout. /// \return true if readable, false if not. static bool is_fd_readable(int fd, uint64_t timeout_usec); - /// Poll a single fd: select() on it with zero timeout. + /// Check if a single fd is readable, without blocking. /// \return true if readable, false if not. static bool poll_fd_readable(int fd); @@ -94,9 +106,17 @@ struct select_wrapper_t { static constexpr uint64_t kNoTimeout = std::numeric_limits::max(); private: - /// The underlying fdset and nfds value to pass to select(). +#if FISH_READABLE_SET_USE_POLL + // Our list of FDs, sorted by fd. + std::vector pollfds_{}; + + // Helper function. + static int do_poll(struct pollfd *fds, size_t count, uint64_t timeout_usec); +#else + // The underlying fdset and nfds value to pass to select(). fd_set fdset_; int nfds_{0}; +#endif }; /// Helper type returned from making autoclose pipes. diff --git a/src/fish_tests.cpp b/src/fish_tests.cpp index 97379e7c5..3ad051c91 100644 --- a/src/fish_tests.cpp +++ b/src/fish_tests.cpp @@ -4131,7 +4131,7 @@ bool poll_notifier(const std::unique_ptr ¬e) { bool result = false; int fd = note->notification_fd(); - if (fd >= 0 && select_wrapper_t::poll_fd_readable(fd)) { + if (fd >= 0 && fd_readable_set_t::poll_fd_readable(fd)) { result = note->notification_fd_became_readable(fd); } return result; diff --git a/src/input_common.cpp b/src/input_common.cpp index e280c9605..d741b96e7 100644 --- a/src/input_common.cpp +++ b/src/input_common.cpp @@ -60,7 +60,7 @@ using readb_result_t = int; static readb_result_t readb(int in_fd) { assert(in_fd >= 0 && "Invalid in fd"); universal_notifier_t& notifier = universal_notifier_t::default_notifier(); - select_wrapper_t fdset; + fd_readable_set_t fdset; for (;;) { fdset.clear(); fdset.add(in_fd); @@ -75,13 +75,13 @@ static readb_result_t readb(int in_fd) { // Get its suggested delay (possibly none). // Note a 0 here means do not poll. - uint64_t timeout = select_wrapper_t::kNoTimeout; + uint64_t timeout = fd_readable_set_t::kNoTimeout; if (uint64_t usecs_delay = notifier.usec_delay_between_polls()) { timeout = usecs_delay; } // Here's where we call select(). - int select_res = fdset.select(timeout); + int select_res = fdset.check_readable(timeout); if (select_res < 0) { if (errno == EINTR || errno == EAGAIN) { // A signal. @@ -225,7 +225,7 @@ maybe_t input_event_queue_t::readch_timed() { } const uint64_t usec_per_msec = 1000; uint64_t timeout_usec = static_cast(wait_on_escape_ms) * usec_per_msec; - if (select_wrapper_t::is_fd_readable(in_, timeout_usec)) { + if (fd_readable_set_t::is_fd_readable(in_, timeout_usec)) { return readch(); } return none(); diff --git a/src/iothread.cpp b/src/iothread.cpp index 91d1bd024..684feb815 100644 --- a/src/iothread.cpp +++ b/src/iothread.cpp @@ -247,7 +247,7 @@ void iothread_perform_impl(void_function_t &&func, bool cant_wait) { int iothread_port() { return get_notify_signaller().read_fd(); } void iothread_service_main_with_timeout(uint64_t timeout_usec) { - if (select_wrapper_t::is_fd_readable(iothread_port(), timeout_usec)) { + if (fd_readable_set_t::is_fd_readable(iothread_port(), timeout_usec)) { iothread_service_main(); } } diff --git a/src/reader.cpp b/src/reader.cpp index 644349e4c..08f2f323b 100644 --- a/src/reader.cpp +++ b/src/reader.cpp @@ -2935,7 +2935,7 @@ maybe_t reader_data_t::read_normal_chars(readline_loop_state_t &rl while (accumulated_chars.size() < limit) { bool allow_commands = (accumulated_chars.empty()); auto evt = inputter.read_char(allow_commands ? normal_handler : empty_handler); - if (!event_is_normal_char(evt) || !select_wrapper_t::poll_fd_readable(conf.in)) { + if (!event_is_normal_char(evt) || !fd_readable_set_t::poll_fd_readable(conf.in)) { event_needing_handling = std::move(evt); break; } else if (evt.input_style == char_input_style_t::notfirst && accumulated_chars.empty() && diff --git a/src/topic_monitor.cpp b/src/topic_monitor.cpp index c6ab5ec10..a2562189f 100644 --- a/src/topic_monitor.cpp +++ b/src/topic_monitor.cpp @@ -89,8 +89,8 @@ void binary_semaphore_t::wait() { #ifdef FISH_TSAN_WORKAROUNDS // Under tsan our notifying pipe is non-blocking, so we would busy-loop on the read() // call until data is available (that is, fish would use 100% cpu while waiting for - // processes). The select prevents that. - (void)select_wrapper_t::is_fd_readable(fd, select_wrapper_t::kNoTimeout); + // processes). This call prevents that. + (void)fd_readable_set_t::is_fd_readable(fd, fd_readable_set_t::kNoTimeout); #endif uint8_t ignored; auto amt = read(fd, &ignored, sizeof ignored);