diff --git a/src/fd_monitor.cpp b/src/fd_monitor.cpp index 00b7a717d..d85b86014 100644 --- a/src/fd_monitor.cpp +++ b/src/fd_monitor.cpp @@ -26,15 +26,16 @@ fd_monitor_t::fd_monitor_t() { // Add an item for ourselves. // We don't need to go through 'pending' because we have not yet launched the thread, and don't // want to yet. - auto callback = [this](const autoclose_fd_t &fd, bool timed_out) { + auto callback = [this](const autoclose_fd_t &fd, item_wake_reason_t reason) { ASSERT_IS_BACKGROUND_THREAD(); - assert(!timed_out && "Should not time out with kNoTimeout"); - (void)timed_out; + assert(reason == item_wake_reason_t::readable && + "Should not be poked, or time out with kNoTimeout"); + (void)reason; // Read some to take data off of the notifier. char buff[4096]; ssize_t amt = read(fd.fd(), buff, sizeof buff); if (amt > 0) { - this->has_pending_ = true; + this->has_pending_or_pokes_ = true; } else if (amt == 0) { this->terminate_ = true; } else { @@ -54,10 +55,28 @@ fd_monitor_t::~fd_monitor_t() { } } -void fd_monitor_t::add(fd_monitor_item_t &&item) { +fd_monitor_item_id_t fd_monitor_t::add(fd_monitor_item_t &&item) { assert(item.fd.valid() && "Invalid fd"); assert(item.timeout_usec != 0 && "Invalid timeout"); - bool start_thread = add_pending_get_start_thread(std::move(item)); + assert(item.item_id == 0 && "Item should not already have an ID"); + bool start_thread = false; + fd_monitor_item_id_t item_id{}; + { + // Lock around a local region. + auto data = data_.acquire(); + + // Assign an id and add the item to pending. + item_id = ++data->last_id; + item.item_id = item_id; + data->pending.push_back(std::move(item)); + + // Maybe plan to start the thread. + if (!data->running) { + FLOG(fd_monitor, "Thread starting"); + data->running = true; + start_thread = true; + } + } if (start_thread) { void *(*trampoline)(void *) = [](void *self) -> void * { static_cast(self)->run_in_background(); @@ -71,17 +90,24 @@ void fd_monitor_t::add(fd_monitor_item_t &&item) { // Tickle our notifier. char byte = 0; (void)write_loop(notify_write_fd_.fd(), &byte, 1); + return item_id; } -bool fd_monitor_t::add_pending_get_start_thread(fd_monitor_item_t &&item) { - auto data = data_.acquire(); - data->pending.push_back(std::move(item)); - if (!data->running) { - FLOG(fd_monitor, "Thread starting"); - data->running = true; - return true; +void fd_monitor_t::poke_item(fd_monitor_item_id_t item_id) { + assert(item_id > 0 && "Invalid item ID"); + bool needs_notifier_byte = false; + { + auto data = data_.acquire(); + needs_notifier_byte = data->pokelist.empty(); + // Insert it, sorted. + auto where = std::lower_bound(data->pokelist.begin(), data->pokelist.end(), item_id); + data->pokelist.insert(where, item_id); + } + if (needs_notifier_byte) { + // Tickle our notifier. + char byte = 0; + (void)write_loop(notify_write_fd_.fd(), &byte, 1); } - return false; } // Given a usec count, populate and return a timeval. @@ -108,15 +134,33 @@ bool fd_monitor_item_t::service_item(const fd_set *fds, const time_point_t &now) bool timed_out = !readable && usec_remaining(now) == 0; if (readable || timed_out) { last_time = now; - callback(fd, timed_out); + item_wake_reason_t reason = + readable ? item_wake_reason_t::readable : item_wake_reason_t::timeout; + callback(fd, reason); should_retain = fd.valid(); } return should_retain; } +bool fd_monitor_item_t::poke_item(const poke_list_t &pokelist) { + if (item_id == 0 || !std::binary_search(pokelist.begin(), pokelist.end(), item_id)) { + // Not pokeable or not in the pokelist. + return true; + } + callback(fd, item_wake_reason_t::poke); + return fd.valid(); +} + void fd_monitor_t::run_in_background() { ASSERT_IS_BACKGROUND_THREAD(); + poke_list_t pokelist; for (;;) { + // Poke any items that need it. + if (!pokelist.empty()) { + this->poke_in_background(std::move(pokelist)); + pokelist.clear(); + } + uint64_t timeout_usec = fd_monitor_item_t::kNoTimeout; int max_fd = -1; fd_set fds; @@ -158,7 +202,7 @@ void fd_monitor_t::run_in_background() { return remove; }; - // Service all items that are either readable or timed our, and remove any which say to do + // Service all items that are either readable or timed out, and remove any which say to do // so. now = std::chrono::steady_clock::now(); items_.erase(std::remove_if(items_.begin(), items_.end(), servicer), items_.end()); @@ -171,13 +215,19 @@ void fd_monitor_t::run_in_background() { // Maybe we got some new items. Check if our callback says so, or if this is the wait // lap, in which case we might want to commit to exiting. - if (has_pending_ || is_wait_lap) { + if (has_pending_or_pokes_ || is_wait_lap) { auto data = data_.acquire(); // Move from 'pending' to 'items'. items_.insert(items_.end(), std::make_move_iterator(data->pending.begin()), std::make_move_iterator(data->pending.end())); data->pending.clear(); - has_pending_ = false; + + // Grab any pokelist. + assert(pokelist.empty() && "pokelist should be empty or else we're dropping pokes"); + pokelist = std::move(data->pokelist); + data->pokelist.clear(); + + has_pending_or_pokes_ = false; if (is_wait_lap && items_.size() == 1) { // We had no items, waited a bit, and still have no items. We're going to shut down. @@ -191,3 +241,14 @@ void fd_monitor_t::run_in_background() { } } } + +void fd_monitor_t::poke_in_background(const poke_list_t &pokelist) { + ASSERT_IS_BACKGROUND_THREAD(); + auto poker = [&pokelist](fd_monitor_item_t &item) { + int fd = item.fd.fd(); + bool remove = !item.poke_item(pokelist); + if (remove) FLOG(fd_monitor, "Removing fd", fd); + return remove; + }; + items_.erase(std::remove_if(items_.begin(), items_.end(), poker), items_.end()); +} diff --git a/src/fd_monitor.h b/src/fd_monitor.h index 2c4fb0425..1127e102c 100644 --- a/src/fd_monitor.h +++ b/src/fd_monitor.h @@ -12,14 +12,24 @@ class fd_monitor_t; +/// Each item added to fd_monitor_t is assigned a unique ID, which is not recycled. +/// Items may have their callback triggered immediately by passing the ID. +/// Zero is a sentinel. +using fd_monitor_item_id_t = uint64_t; + +/// Reasons for waking an item. +enum class item_wake_reason_t { + readable, // the fd became readable + timeout, // the requested timeout was hit + poke, // the item was "poked" (woken up explicitly) +}; + /// An item containing an fd and callback, which can be monitored to watch when it becomes readable, /// and invoke the callback. struct fd_monitor_item_t { - friend class fd_monitor_t; - - /// The callback type for the item. - /// It will be invoked when either \p fd is readable, or if the timeout was hit. - using callback_t = std::function; + /// The callback type for the item. It is passed \p fd, and the reason for waking \p reason. + /// The callback may close \p fd, in which case the item is removed. + using callback_t = std::function; /// A sentinel value meaning no timeout. static constexpr uint64_t kNoTimeout = std::numeric_limits::max(); @@ -51,6 +61,9 @@ struct fd_monitor_item_t { // The last time we were called, or the initialization point. maybe_t last_time{}; + // The ID for this item. This is assigned by the fd monitor. + fd_monitor_item_id_t item_id{0}; + // \return the number of microseconds until the timeout should trigger, or kNoTimeout for none. // A 0 return means we are at or past the timeout. uint64_t usec_remaining(const time_point_t &now) const; @@ -58,6 +71,13 @@ struct fd_monitor_item_t { // Invoke this item's callback if its value is set in fd or has timed out. // \return true to retain the item, false to remove it. bool service_item(const fd_set *fds, const time_point_t &now); + + // Invoke this item's callback with a poke, if its ID is present in the (sorted) pokelist. + // \return true to retain the item, false to remove it. + using poke_list_t = std::vector; + bool poke_item(const poke_list_t &pokelist); + + friend class fd_monitor_t; }; /// A class which can monitor a set of fds, invoking a callback when any becomes readable, or when @@ -66,34 +86,47 @@ class fd_monitor_t { public: using item_list_t = std::vector; + // A "pokelist" is a sorted list of item IDs which need explicit wakeups. + using poke_list_t = std::vector; + fd_monitor_t(); ~fd_monitor_t(); - /// Add an item to monitor. - void add(fd_monitor_item_t &&item); + /// Add an item to monitor. \return the ID assigned to the item. + fd_monitor_item_id_t add(fd_monitor_item_t &&item); + + /// Mark that an item with a given ID needs to be explicitly woken up. + void poke_item(fd_monitor_item_id_t item_id); private: // The background thread runner. void run_in_background(); - // Add a pending item, marking the thread as running. - // \return true if we should start the thread. - bool add_pending_get_start_thread(fd_monitor_item_t &&item); + // Poke items in the pokelist, removing any items that close their FD. + // The pokelist is consumed after this. + // This is only called in the background thread. + void poke_in_background(const poke_list_t &pokelist); // The list of items to monitor. This is only accessed on the background thread. item_list_t items_{}; // Set to true by the background thread when our self-pipe becomes readable. - bool has_pending_{false}; + bool has_pending_or_pokes_{false}; // Latched to true by the background thread if our self-pipe is closed, which indicates we are // in the destructor and so should terminate. bool terminate_{false}; struct data_t { - /// Pending items. + /// Pending items. This is set under the lock, then the background thread grabs them. item_list_t pending{}; + /// List of IDs for items that need to be poked (explicitly woken up). + poke_list_t pokelist{}; + + /// The last ID assigned, or if none. + fd_monitor_item_id_t last_id{0}; + /// Whether the thread is running. bool running{false}; }; diff --git a/src/fish_tests.cpp b/src/fish_tests.cpp index 379a475bc..2784e2b72 100644 --- a/src/fish_tests.cpp +++ b/src/fish_tests.cpp @@ -787,7 +787,9 @@ static void test_fd_monitor() { struct item_maker_t { std::atomic did_timeout{false}; std::atomic length_read{0}; + std::atomic pokes{0}; std::atomic total_calls{0}; + fd_monitor_item_id_t item_id{0}; bool always_exit{false}; fd_monitor_item_t item; autoclose_fd_t writer; @@ -795,15 +797,21 @@ static void test_fd_monitor() { explicit item_maker_t(uint64_t timeout_usec) { auto pipes = make_autoclose_pipes({}).acquire(); writer = std::move(pipes.write); - auto callback = [this](autoclose_fd_t &fd, bool timed_out) { + auto callback = [this](autoclose_fd_t &fd, item_wake_reason_t reason) { bool was_closed = false; - if (timed_out) { - this->did_timeout = true; - } else { - char buff[4096]; - ssize_t amt = read(fd.fd(), buff, sizeof buff); - length_read += amt; - was_closed = (amt == 0); + switch (reason) { + case item_wake_reason_t::timeout: + this->did_timeout = true; + break; + case item_wake_reason_t::poke: + this->pokes += 1; + break; + case item_wake_reason_t::readable: + char buff[4096]; + ssize_t amt = read(fd.fd(), buff, sizeof buff); + this->length_read += amt; + was_closed = (amt == 0); + break; } total_calls += 1; if (always_exit || was_closed) { @@ -840,45 +848,63 @@ static void test_fd_monitor() { // Item which should get 42 bytes, then get notified it is closed. item_maker_t item42_thenclose(16 * usec_per_msec); + // Item which gets one poke. + item_maker_t item_pokee(fd_monitor_item_t::kNoTimeout); + // Item which should be called back once. item_maker_t item_oneshot(16 * usec_per_msec); item_oneshot.always_exit = true; + { fd_monitor_t monitor; - for (auto item : {&item_never, &item_hugetimeout, &item0_timeout, &item42_timeout, - &item42_nottimeout, &item42_thenclose, &item_oneshot}) { - monitor.add(std::move(item->item)); + for (item_maker_t *item : + {&item_never, &item_hugetimeout, &item0_timeout, &item42_timeout, &item42_nottimeout, + &item42_thenclose, &item_pokee, &item_oneshot}) { + item->item_id = monitor.add(std::move(item->item)); } item42_timeout.write42(); item42_nottimeout.write42(); item42_thenclose.write42(); item42_thenclose.writer.close(); item_oneshot.write42(); + monitor.poke_item(item_pokee.item_id); std::this_thread::sleep_for(std::chrono::milliseconds(84)); } do_test(!item_never.did_timeout); do_test(item_never.length_read == 0); + do_test(item_never.pokes == 0); do_test(!item_hugetimeout.did_timeout); do_test(item_hugetimeout.length_read == 0); + do_test(item_hugetimeout.pokes == 0); do_test(item0_timeout.length_read == 0); do_test(item0_timeout.did_timeout); + do_test(item0_timeout.pokes == 0); do_test(item42_timeout.length_read == 42); do_test(item42_timeout.did_timeout); + do_test(item42_timeout.pokes == 0); do_test(item42_nottimeout.length_read == 42); do_test(!item42_nottimeout.did_timeout); + do_test(item42_nottimeout.pokes == 0); do_test(item42_thenclose.did_timeout == false); do_test(item42_thenclose.length_read == 42); do_test(item42_thenclose.total_calls == 2); + do_test(item42_thenclose.pokes == 0); do_test(!item_oneshot.did_timeout); do_test(item_oneshot.length_read == 42); do_test(item_oneshot.total_calls == 1); + do_test(item_oneshot.pokes == 0); + + do_test(!item_pokee.did_timeout); + do_test(item_pokee.length_read == 0); + do_test(item_pokee.total_calls == 1); + do_test(item_pokee.pokes == 1); } static void test_iothread() { diff --git a/src/io.cpp b/src/io.cpp index 1e5376146..ccb2a05be 100644 --- a/src/io.cpp +++ b/src/io.cpp @@ -124,14 +124,14 @@ void io_buffer_t::begin_filling(autoclose_fd_t fd) { fd_monitor_item_t item; item.fd = std::move(fd); item.timeout_usec = poll_usec; - item.callback = [this, promise](autoclose_fd_t &fd, bool timed_out) { + item.callback = [this, promise](autoclose_fd_t &fd, item_wake_reason_t reason) { ASSERT_IS_BACKGROUND_THREAD(); - // Only check the shutdown flag if we timed out. + // Only check the shutdown flag if we timed out or were poked. // It's important that if select() indicated we were readable, that we call select() again // allowing it to time out. Note the typical case is that the fd will be closed, in which // case select will return immediately. bool done = false; - if (!timed_out) { + if (reason == item_wake_reason_t::readable) { // select() reported us as readable; read a bit. scoped_lock locker(append_lock_); ssize_t ret = read_once(fd.fd());