diff --git a/src/fd_monitor.cpp b/src/fd_monitor.cpp index 5939537fa..b9a23677a 100644 --- a/src/fd_monitor.cpp +++ b/src/fd_monitor.cpp @@ -11,45 +11,14 @@ static constexpr uint64_t kUsecPerMsec = 1000; static constexpr uint64_t kUsecPerSec = 1000 * kUsecPerMsec; -fd_monitor_t::fd_monitor_t() { - auto self_pipe = make_autoclose_pipes(); - if (!self_pipe) { - DIE("Unable to create pipes"); - } +fd_monitor_t::fd_monitor_t() = default; - // Ensure the write side is nonblocking to avoid deadlock. - notify_write_fd_ = std::move(self_pipe->write); - if (make_fd_nonblocking(notify_write_fd_.fd())) { - wperror(L"fcntl"); - } - - // Add an item for ourselves. - // We don't need to go through 'pending' because we have not yet launched the thread, and don't - // want to yet. - auto callback = [this](const autoclose_fd_t &fd, item_wake_reason_t reason) { - ASSERT_IS_BACKGROUND_THREAD(); - assert(reason == item_wake_reason_t::readable && - "Should not be poked, or time out with kNoTimeout"); - (void)reason; - // Read some to take data off of the notifier. - char buff[4096]; - ssize_t amt = read(fd.fd(), buff, sizeof buff); - if (amt > 0) { - this->has_pending_or_pokes_ = true; - } else if (amt == 0) { - this->terminate_ = true; - } else { - wperror(L"read"); - } - }; - items_.push_back(fd_monitor_item_t(std::move(self_pipe->read), std::move(callback))); -} - -// Extremely hacky destructor to clean up. -// This is used in the tests to not leave stale fds around. -// In fish shell, we never invoke the dtor so it doesn't matter that this is very dumb. fd_monitor_t::~fd_monitor_t() { - notify_write_fd_.close(); + // In orindary usage, we never invoke the dtor. + // This is used in the tests to not leave stale fds around. + // That is why this is very hacky! + data_.acquire()->terminate = true; + change_signaller_.post(); while (data_.acquire()->running) { std::this_thread::sleep_for(std::chrono::milliseconds(5)); } @@ -87,26 +56,23 @@ fd_monitor_item_id_t fd_monitor_t::add(fd_monitor_item_t &&item) { DIE("Unable to create a new pthread"); } } - // Tickle our notifier. - char byte = 0; - (void)write_loop(notify_write_fd_.fd(), &byte, 1); + // Tickle our signaller. + change_signaller_.post(); return item_id; } void fd_monitor_t::poke_item(fd_monitor_item_id_t item_id) { assert(item_id > 0 && "Invalid item ID"); - bool needs_notifier_byte = false; + bool needs_notification = false; { auto data = data_.acquire(); - needs_notifier_byte = data->pokelist.empty(); + needs_notification = data->pokelist.empty(); // Insert it, sorted. auto where = std::lower_bound(data->pokelist.begin(), data->pokelist.end(), item_id); data->pokelist.insert(where, item_id); } - if (needs_notifier_byte) { - // Tickle our notifier. - char byte = 0; - (void)write_loop(notify_write_fd_.fd(), &byte, 1); + if (needs_notification) { + change_signaller_.post(); } } @@ -161,11 +127,16 @@ void fd_monitor_t::run_in_background() { pokelist.clear(); } - uint64_t timeout_usec = fd_monitor_item_t::kNoTimeout; - int max_fd = -1; fd_set fds; FD_ZERO(&fds); + + // Our change_signaller is special cased. + int change_signal_fd = change_signaller_.read_fd(); + FD_SET(change_signal_fd, &fds); + int max_fd = change_signal_fd; + auto now = std::chrono::steady_clock::now(); + uint64_t timeout_usec = fd_monitor_item_t::kNoTimeout; for (auto &item : items_) { FD_SET(item.fd.fd(), &fds); @@ -207,16 +178,13 @@ void fd_monitor_t::run_in_background() { now = std::chrono::steady_clock::now(); items_.erase(std::remove_if(items_.begin(), items_.end(), servicer), items_.end()); - if (terminate_) { - // Time to go. - data_.acquire()->running = false; - return; - } - - // Maybe we got some new items. Check if our callback says so, or if this is the wait + // Handle any changes if the change signaller was set. Alternatively this may be the wait // lap, in which case we might want to commit to exiting. - if (has_pending_or_pokes_ || is_wait_lap) { + if (FD_ISSET(change_signal_fd, &fds) || is_wait_lap) { + // Clear the change signaller before processing incoming changes. + change_signaller_.try_consume(); auto data = data_.acquire(); + // Move from 'pending' to 'items'. items_.insert(items_.end(), std::make_move_iterator(data->pending.begin()), std::make_move_iterator(data->pending.end())); @@ -227,10 +195,9 @@ void fd_monitor_t::run_in_background() { pokelist = std::move(data->pokelist); data->pokelist.clear(); - has_pending_or_pokes_ = false; - - if (is_wait_lap && items_.size() == 1) { - // We had no items, waited a bit, and still have no items. We're going to shut down. + if (data->terminate || (is_wait_lap && items_.empty())) { + // Maybe terminate is set. + // Alternatively, maybe we had no items, waited a bit, and still have no items. // It's important to do this while holding the lock, otherwise we race with new // items being added. assert(data->running && "Thread should be running because we're that thread"); diff --git a/src/fd_monitor.h b/src/fd_monitor.h index 34b8706f2..f3b004265 100644 --- a/src/fd_monitor.h +++ b/src/fd_monitor.h @@ -103,6 +103,10 @@ class fd_monitor_t { // The background thread runner. void run_in_background(); + // If our self-signaller is reported as ready, this reads from it and handles any changes. + // Called in the background thread. + void handle_self_signal_in_background(); + // Poke items in the pokelist, removing any items that close their FD. // The pokelist is consumed after this. // This is only called in the background thread. @@ -111,13 +115,6 @@ class fd_monitor_t { // The list of items to monitor. This is only accessed on the background thread. item_list_t items_{}; - // Set to true by the background thread when our self-pipe becomes readable. - bool has_pending_or_pokes_{false}; - - // Latched to true by the background thread if our self-pipe is closed, which indicates we are - // in the destructor and so should terminate. - bool terminate_{false}; - struct data_t { /// Pending items. This is set under the lock, then the background thread grabs them. item_list_t pending{}; @@ -130,11 +127,15 @@ class fd_monitor_t { /// Whether the thread is running. bool running{false}; + + // Set if we should terminate. + bool terminate{false}; }; owning_lock data_; - /// The write end of our self-pipe. - autoclose_fd_t notify_write_fd_{}; + /// Our self-signaller. When this is written to, it means there are new items pending, or new + /// items in the pokelist, or terminate is set. + fd_event_signaller_t change_signaller_; }; #endif