From e8a61ef4aad8496e426a43f4fec19ea6052e19f3 Mon Sep 17 00:00:00 2001 From: ridiculousfish Date: Sat, 10 Apr 2021 16:45:26 -0700 Subject: [PATCH] Introduce select_wrapper_t select_wrapper_t wraps up the annoying bits of using select(): keeping track of the max fd, passing null for boring parameters, and constructing the timeout. Introduce a wrapper struct for this and replace the existing uses of select() with the wrapper. --- src/env_universal_common.cpp | 7 +----- src/fd_monitor.cpp | 31 +++++++----------------- src/fd_monitor.h | 4 ++-- src/fds.cpp | 42 ++++++++++++++++++++++++++++++++ src/fds.h | 42 +++++++++++++++++++++++++++++++- src/fish_tests.cpp | 10 ++------ src/input_common.cpp | 46 +++++++++++++----------------------- src/iothread.cpp | 17 ++----------- src/iothread.h | 2 +- src/reader.cpp | 12 +--------- src/topic_monitor.cpp | 5 +--- 11 files changed, 119 insertions(+), 99 deletions(-) diff --git a/src/env_universal_common.cpp b/src/env_universal_common.cpp index 82791057e..e4370b123 100644 --- a/src/env_universal_common.cpp +++ b/src/env_universal_common.cpp @@ -1437,12 +1437,7 @@ class universal_notifier_named_pipe_t final : public universal_notifier_t { // We are polling, so we are definitely going to sync. // See if this is still readable. - fd_set fds; - FD_ZERO(&fds); - FD_SET(pipe_fd.fd(), &fds); - struct timeval timeout = {}; - select(pipe_fd.fd() + 1, &fds, nullptr, nullptr, &timeout); - if (!FD_ISSET(pipe_fd.fd(), &fds)) { + if (!select_wrapper_t::poll_fd_readable(pipe_fd.fd())) { // No longer readable, no longer polling. polling_due_to_readable_fd = false; drain_if_still_readable_time_usec = 0; diff --git a/src/fd_monitor.cpp b/src/fd_monitor.cpp index 2ed2ca4de..214b74083 100644 --- a/src/fd_monitor.cpp +++ b/src/fd_monitor.cpp @@ -12,7 +12,6 @@ #include "wutil.h" static constexpr uint64_t kUsecPerMsec = 1000; -static constexpr uint64_t kUsecPerSec = 1000 * kUsecPerMsec; fd_monitor_t::fd_monitor_t() = default; @@ -79,15 +78,6 @@ void fd_monitor_t::poke_item(fd_monitor_item_id_t item_id) { } } -// Given a usec count, populate and return a timeval. -// If the usec count is kNoTimeout, return nullptr. -static struct timeval *usec_to_tv_or_null(uint64_t usec, struct timeval *timeout) { - if (usec == fd_monitor_item_t::kNoTimeout) return nullptr; - timeout->tv_sec = usec / kUsecPerSec; - timeout->tv_usec = usec % kUsecPerSec; - return timeout; -} - uint64_t fd_monitor_item_t::usec_remaining(const time_point_t &now) const { assert(last_time.has_value() && "Should always have a last_time"); if (timeout_usec == kNoTimeout) return kNoTimeout; @@ -97,9 +87,9 @@ uint64_t fd_monitor_item_t::usec_remaining(const time_point_t &now) const { return since >= timeout_usec ? 0 : timeout_usec - since; } -bool fd_monitor_item_t::service_item(const fd_set *fds, const time_point_t &now) { +bool fd_monitor_item_t::service_item(const select_wrapper_t &fds, const time_point_t &now) { bool should_retain = true; - bool readable = FD_ISSET(fd.fd(), fds); + bool readable = fds.test(fd.fd()); bool timed_out = !readable && usec_remaining(now) == 0; if (readable || timed_out) { last_time = now; @@ -123,6 +113,7 @@ bool fd_monitor_item_t::poke_item(const poke_list_t &pokelist) { void fd_monitor_t::run_in_background() { ASSERT_IS_BACKGROUND_THREAD(); poke_list_t pokelist; + select_wrapper_t fds; for (;;) { // Poke any items that need it. if (!pokelist.empty()) { @@ -130,22 +121,19 @@ void fd_monitor_t::run_in_background() { pokelist.clear(); } - fd_set fds; - FD_ZERO(&fds); + fds.clear(); // Our change_signaller is special cased. int change_signal_fd = change_signaller_.read_fd(); - FD_SET(change_signal_fd, &fds); - int max_fd = change_signal_fd; + fds.add(change_signal_fd); auto now = std::chrono::steady_clock::now(); uint64_t timeout_usec = fd_monitor_item_t::kNoTimeout; for (auto &item : items_) { - FD_SET(item.fd.fd(), &fds); + fds.add(item.fd.fd()); if (!item.last_time.has_value()) item.last_time = now; timeout_usec = std::min(timeout_usec, item.usec_remaining(now)); - max_fd = std::max(max_fd, item.fd.fd()); } // If we have only one item, it means that we are not actively monitoring any fds other than @@ -161,8 +149,7 @@ void fd_monitor_t::run_in_background() { } // Call select(). - struct timeval tv; - int ret = select(max_fd + 1, &fds, nullptr, nullptr, usec_to_tv_or_null(timeout_usec, &tv)); + int ret = fds.select(timeout_usec); if (ret < 0 && errno != EINTR) { // Surprising error. wperror(L"select"); @@ -171,7 +158,7 @@ void fd_monitor_t::run_in_background() { // A predicate which services each item in turn, returning true if it should be removed. auto servicer = [&fds, &now](fd_monitor_item_t &item) { int fd = item.fd.fd(); - bool remove = !item.service_item(&fds, now); + bool remove = !item.service_item(fds, now); if (remove) FLOG(fd_monitor, "Removing fd", fd); return remove; }; @@ -183,7 +170,7 @@ void fd_monitor_t::run_in_background() { // Handle any changes if the change signaller was set. Alternatively this may be the wait // lap, in which case we might want to commit to exiting. - if (FD_ISSET(change_signal_fd, &fds) || is_wait_lap) { + if (fds.test(change_signal_fd) || is_wait_lap) { // Clear the change signaller before processing incoming changes. change_signaller_.try_consume(); auto data = data_.acquire(); diff --git a/src/fd_monitor.h b/src/fd_monitor.h index f3b004265..ae409ed30 100644 --- a/src/fd_monitor.h +++ b/src/fd_monitor.h @@ -33,7 +33,7 @@ struct fd_monitor_item_t { using callback_t = std::function; /// A sentinel value meaning no timeout. - static constexpr uint64_t kNoTimeout = std::numeric_limits::max(); + static constexpr uint64_t kNoTimeout = select_wrapper_t::kNoTimeout; /// The fd to monitor. autoclose_fd_t fd{}; @@ -71,7 +71,7 @@ struct fd_monitor_item_t { // Invoke this item's callback if its value is set in fd or has timed out. // \return true to retain the item, false to remove it. - bool service_item(const fd_set *fds, const time_point_t &now); + bool service_item(const select_wrapper_t &fds, const time_point_t &now); // Invoke this item's callback with a poke, if its ID is present in the (sorted) pokelist. // \return true to retain the item, false to remove it. diff --git a/src/fds.cpp b/src/fds.cpp index 85fb88965..0c4af031d 100644 --- a/src/fds.cpp +++ b/src/fds.cpp @@ -26,12 +26,54 @@ // redirections, e.g. >&3 const int k_first_high_fd = 10; +static constexpr uint64_t kUsecPerMsec = 1000; +static constexpr uint64_t kUsecPerSec = 1000 * kUsecPerMsec; + void autoclose_fd_t::close() { if (fd_ < 0) return; exec_close(fd_); fd_ = -1; } +select_wrapper_t::select_wrapper_t() { clear(); } + +void select_wrapper_t::clear() { + FD_ZERO(&fdset_); + nfds_ = 0; +} + +void select_wrapper_t::add(int fd) { + if (fd >= 0) { + FD_SET(fd, &fdset_); + nfds_ = std::max(nfds_, fd + 1); + } +} + +bool select_wrapper_t::test(int fd) const { return fd >= 0 && FD_ISSET(fd, &fdset_); } + +int select_wrapper_t::select(uint64_t timeout_usec) { + if (timeout_usec == kNoTimeout) { + return ::select(nfds_, &fdset_, nullptr, nullptr, nullptr); + } else { + struct timeval tvs; + tvs.tv_sec = timeout_usec / kUsecPerSec; + tvs.tv_usec = timeout_usec % kUsecPerSec; + return ::select(nfds_, &fdset_, nullptr, nullptr, &tvs); + } +} + +// static +bool select_wrapper_t::is_fd_readable(int fd, uint64_t timeout_usec) { + if (fd < 0) return false; + select_wrapper_t s; + s.add(fd); + int res = s.select(timeout_usec); + return res > 0 && s.test(fd); +} + +// static +bool select_wrapper_t::poll_fd_readable(int fd) { return is_fd_readable(fd, 0); } + #ifdef HAVE_EVENTFD // Note we do not want to use EFD_SEMAPHORE because we are binary (not counting) semaphore. fd_event_signaller_t::fd_event_signaller_t() { diff --git a/src/fds.h b/src/fds.h index be9cdf71a..c8d3d5bbe 100644 --- a/src/fds.h +++ b/src/fds.h @@ -5,9 +5,11 @@ #include "config.h" // IWYU pragma: keep +#include #include #include +#include #include #include @@ -18,7 +20,8 @@ using wcstring = std::wstring; /// Pipe redirection error message. #define PIPE_ERROR _(L"An error occurred while setting up pipe") -/// The first "high fd", which is considered outside the range of valid user-specified redirections (like >&5). +/// The first "high fd", which is considered outside the range of valid user-specified redirections +/// (like >&5). extern const int k_first_high_fd; /// A helper class for managing and automatically closing a file descriptor. @@ -62,6 +65,43 @@ class autoclose_fd_t { ~autoclose_fd_t() { close(); } }; +/// A modest wrapper around fd_set and select(). +/// This allows accumulating a set of fds and then select()ing on them. +/// This only handles readability. +struct select_wrapper_t { + /// Construct an empty set. + select_wrapper_t(); + + /// Reset back to an empty set. + void clear(); + + /// Add an fd to the set. The fd is ignored if negative (for convenience). + void add(int fd); + + /// \return true if the given fd is marked as set, in our set. \returns false if negative. + bool test(int fd) const; + + /// Call select(), with this set as 'readfds' and null for the other sets, with a timeout given + /// by timeout_usec. Note this destructively modifies the set. \return the result of select(). + int select(uint64_t timeout_usec = select_wrapper_t::kNoTimeout); + + /// Poll a single fd: select() on it with a given timeout. + /// \return true if readable, false if not. + static bool is_fd_readable(int fd, uint64_t timeout_usec); + + /// Poll a single fd: select() on it with zero timeout. + /// \return true if readable, false if not. + static bool poll_fd_readable(int fd); + + /// A special timeout value which may be passed to indicate no timeout. + static constexpr uint64_t kNoTimeout = std::numeric_limits::max(); + + private: + /// The underlying fdset and nfds value to pass to select(). + fd_set fdset_; + int nfds_{0}; +}; + /// Helper type returned from making autoclose pipes. struct autoclose_pipes_t { /// Read end of the pipe. diff --git a/src/fish_tests.cpp b/src/fish_tests.cpp index c096c788c..7d726e9e3 100644 --- a/src/fish_tests.cpp +++ b/src/fish_tests.cpp @@ -3993,14 +3993,8 @@ bool poll_notifier(const std::unique_ptr ¬e) { bool result = false; int fd = note->notification_fd(); - if (fd >= 0) { - fd_set fds; - FD_ZERO(&fds); - FD_SET(fd, &fds); - struct timeval tv = {0, 0}; - if (select(fd + 1, &fds, NULL, NULL, &tv) > 0 && FD_ISSET(fd, &fds)) { - result = note->notification_fd_became_readable(fd); - } + if (fd >= 0 && select_wrapper_t::poll_fd_readable(fd)) { + result = note->notification_fd_became_readable(fd); } return result; } diff --git a/src/input_common.cpp b/src/input_common.cpp index 5299ac028..3258bf50e 100644 --- a/src/input_common.cpp +++ b/src/input_common.cpp @@ -44,40 +44,30 @@ void input_common_init(interrupt_func_t func) { interrupt_handler = func; } /// Internal function used by input_common_readch to read one byte from fd 0. This function should /// only be called by input_common_readch(). char_event_t input_event_queue_t::readb() { + select_wrapper_t fdset; for (;;) { - fd_set fdset; - int fd_max = in_; - int ioport = iothread_port(); - int res; + fdset.clear(); + fdset.add(in_); - FD_ZERO(&fdset); - FD_SET(in_, &fdset); + int ioport = iothread_port(); if (ioport > 0) { - FD_SET(ioport, &fdset); - fd_max = std::max(fd_max, ioport); + fdset.add(ioport); } // Get our uvar notifier. universal_notifier_t& notifier = universal_notifier_t::default_notifier(); - - // Get the notification fd (possibly none). int notifier_fd = notifier.notification_fd(); if (notifier_fd > 0) { - FD_SET(notifier_fd, &fdset); - fd_max = std::max(fd_max, notifier_fd); + fdset.add(notifier_fd); } // Get its suggested delay (possibly none). - struct timeval tv = {}; - const unsigned long usecs_delay = notifier.usec_delay_between_polls(); - if (usecs_delay > 0) { - unsigned long usecs_per_sec = 1000000; - tv.tv_sec = static_cast(usecs_delay / usecs_per_sec); - tv.tv_usec = static_cast(usecs_delay % usecs_per_sec); + uint64_t timeout_usec = select_wrapper_t::kNoTimeout; + if (auto notifier_usec_delay = notifier.usec_delay_between_polls()) { + timeout_usec = notifier_usec_delay; } - - res = select(fd_max + 1, &fdset, nullptr, nullptr, usecs_delay > 0 ? &tv : nullptr); - if (res == -1) { + int res = fdset.select(timeout_usec); + if (res < 0) { if (errno == EINTR || errno == EAGAIN) { // Some uvar notifiers rely on signals - see #7671. if (notifier.poll()) { @@ -98,7 +88,7 @@ char_event_t input_event_queue_t::readb() { // Check to see if we want a universal variable barrier. bool barrier_from_poll = notifier.poll(); bool barrier_from_readability = false; - if (notifier_fd > 0 && FD_ISSET(notifier_fd, &fdset)) { + if (notifier_fd > 0 && fdset.test(notifier_fd)) { barrier_from_readability = notifier.notification_fd_became_readable(notifier_fd); } if (barrier_from_poll || barrier_from_readability) { @@ -110,7 +100,7 @@ char_event_t input_event_queue_t::readb() { } } - if (FD_ISSET(in_, &fdset)) { + if (fdset.test(in_)) { unsigned char arr[1]; if (read_blocked(in_, arr, 1) != 1) { // The teminal has been closed. @@ -123,7 +113,7 @@ char_event_t input_event_queue_t::readb() { // Check for iothread completions only if there is no data to be read from the stdin. // This gives priority to the foreground. - if (ioport > 0 && FD_ISSET(ioport, &fdset)) { + if (ioport > 0 && fdset.test(ioport)) { iothread_service_main(); if (auto mc = pop_discard_timeouts()) { return *mc; @@ -214,11 +204,9 @@ char_event_t input_event_queue_t::readch_timed(bool dequeue_timeouts) { if (has_lookahead()) { result = pop(); } else { - fd_set fds; - FD_ZERO(&fds); - FD_SET(in_, &fds); - struct timeval tm = {wait_on_escape_ms / 1000, 1000 * (wait_on_escape_ms % 1000)}; - if (select(in_ + 1, &fds, nullptr, nullptr, &tm) > 0) { + const uint64_t usec_per_msec = 1000; + uint64_t timeout_usec = static_cast(wait_on_escape_ms) * usec_per_msec; + if (select_wrapper_t::is_fd_readable(in_, timeout_usec)) { result = readch(); } } diff --git a/src/iothread.cpp b/src/iothread.cpp index 568bd8575..79487f941 100644 --- a/src/iothread.cpp +++ b/src/iothread.cpp @@ -260,21 +260,8 @@ void iothread_perform_impl(void_function_t &&func, bool cant_wait) { int iothread_port() { return get_notify_signaller().read_fd(); } -static bool iothread_wait_for_main_requests(long timeout_usec) { - const long usec_per_sec = 1000000; - struct timeval tv; - tv.tv_sec = timeout_usec / usec_per_sec; - tv.tv_usec = timeout_usec % usec_per_sec; - const int fd = iothread_port(); - fd_set fds; - FD_ZERO(&fds); - FD_SET(fd, &fds); - int ret = select(fd + 1, &fds, nullptr, nullptr, &tv); - return ret > 0; -} - -void iothread_service_main_with_timeout(long timeout_usec) { - if (iothread_wait_for_main_requests(timeout_usec)) { +void iothread_service_main_with_timeout(uint64_t timeout_usec) { + if (select_wrapper_t::is_fd_readable(iothread_port(), timeout_usec)) { iothread_service_main(); } } diff --git a/src/iothread.h b/src/iothread.h index cfdf2fb17..164d9c19e 100644 --- a/src/iothread.h +++ b/src/iothread.h @@ -19,7 +19,7 @@ int iothread_port(); void iothread_service_main(); // Services any main thread requests. Does not wait more than \p timeout_usec. -void iothread_service_main_with_timeout(long timeout_usec); +void iothread_service_main_with_timeout(uint64_t timeout_usec); /// Waits for all iothreads to terminate. /// \return the number of threads that were running. diff --git a/src/reader.cpp b/src/reader.cpp index eb49aa6c1..1a40ea677 100644 --- a/src/reader.cpp +++ b/src/reader.cpp @@ -2751,16 +2751,6 @@ static int read_i(parser_t &parser) { return 0; } -/// Test if there are bytes available for reading on the specified file descriptor. -static int can_read(int fd) { - struct timeval can_read_timeout = {0, 0}; - fd_set fds; - - FD_ZERO(&fds); - FD_SET(fd, &fds); - return select(fd + 1, &fds, nullptr, nullptr, &can_read_timeout) == 1; -} - /// Test if the specified character in the specified string is backslashed. pos may be at the end of /// the string, which indicates if there is a trailing backslash. static bool is_backslashed(const wcstring &str, size_t pos) { @@ -2867,7 +2857,7 @@ maybe_t reader_data_t::read_normal_chars(readline_loop_state_t &rl while (accumulated_chars.size() < limit) { bool allow_commands = (accumulated_chars.empty()); auto evt = inputter.readch(allow_commands ? normal_handler : empty_handler); - if (!event_is_normal_char(evt) || !can_read(conf.in)) { + if (!event_is_normal_char(evt) || !select_wrapper_t::poll_fd_readable(conf.in)) { event_needing_handling = std::move(evt); break; } else if (evt.input_style == char_input_style_t::notfirst && accumulated_chars.empty() && diff --git a/src/topic_monitor.cpp b/src/topic_monitor.cpp index 75f54a86b..c6ab5ec10 100644 --- a/src/topic_monitor.cpp +++ b/src/topic_monitor.cpp @@ -90,10 +90,7 @@ void binary_semaphore_t::wait() { // Under tsan our notifying pipe is non-blocking, so we would busy-loop on the read() // call until data is available (that is, fish would use 100% cpu while waiting for // processes). The select prevents that. - fd_set fds; - FD_ZERO(&fds); - FD_SET(fd, &fds); - (void)select(fd + 1, &fds, nullptr, nullptr, nullptr /* timeout */); + (void)select_wrapper_t::is_fd_readable(fd, select_wrapper_t::kNoTimeout); #endif uint8_t ignored; auto amt = read(fd, &ignored, sizeof ignored);