mirror of
https://github.com/fish-shell/fish-shell
synced 2025-01-14 05:53:59 +00:00
Introduce fd_monitor
fd_monitor is a new class which can monitor a set of fds, waiting for them to become readable. When an fd becomes readable, a callback is invoked. Timeouts are also supported. This is intended to replace the "bufferfill" threads. Rather than one thread per bufferfill, we will have a single fd_monitor which can service multiple bufferfills. This helps today with nested command substitutions, and will help in the future with concurrent execution.
This commit is contained in:
parent
9bf5dfd738
commit
057c3a9e75
5 changed files with 402 additions and 1 deletions
|
@ -118,7 +118,7 @@ SET(FISH_SRCS
|
|||
src/wcstringutil.cpp src/wgetopt.cpp src/wildcard.cpp src/wutil.cpp
|
||||
src/future_feature_flags.cpp src/redirection.cpp src/topic_monitor.cpp
|
||||
src/flog.cpp src/trace.cpp src/timer.cpp src/null_terminated_array.cpp
|
||||
src/operation_context.cpp
|
||||
src/operation_context.cpp src/fd_monitor.cpp
|
||||
)
|
||||
|
||||
# Header files are just globbed.
|
||||
|
|
193
src/fd_monitor.cpp
Normal file
193
src/fd_monitor.cpp
Normal file
|
@ -0,0 +1,193 @@
|
|||
// Support for monitoring a set of fds.
|
||||
#include "config.h" // IWYU pragma: keep
|
||||
|
||||
#include "fd_monitor.h"
|
||||
|
||||
#include "flog.h"
|
||||
#include "io.h"
|
||||
#include "iothread.h"
|
||||
#include "wutil.h"
|
||||
|
||||
static constexpr uint64_t kUsecPerMsec = 1000;
|
||||
static constexpr uint64_t kUsecPerSec = 1000 * kUsecPerMsec;
|
||||
|
||||
fd_monitor_t::fd_monitor_t() {
|
||||
auto self_pipe = make_autoclose_pipes({});
|
||||
if (!self_pipe) {
|
||||
DIE("Unable to create pipes");
|
||||
}
|
||||
|
||||
// Ensure the write side is nonblocking to avoid deadlock.
|
||||
notify_write_fd_ = std::move(self_pipe->write);
|
||||
if (make_fd_nonblocking(notify_write_fd_.fd())) {
|
||||
wperror(L"fcntl");
|
||||
}
|
||||
|
||||
// Add an item for ourselves.
|
||||
// We don't need to go through 'pending' because we have not yet launched the thread, and don't
|
||||
// want to yet.
|
||||
auto callback = [this](autoclose_fd_t &fd, bool timed_out) {
|
||||
ASSERT_IS_BACKGROUND_THREAD();
|
||||
assert(!timed_out && "Should not time out with kNoTimeout");
|
||||
(void)timed_out;
|
||||
// Read some to take data off of the notifier.
|
||||
char buff[4096];
|
||||
ssize_t amt = read(fd.fd(), buff, sizeof buff);
|
||||
if (amt > 0) {
|
||||
this->has_pending_ = true;
|
||||
} else if (amt == 0) {
|
||||
this->terminate_ = true;
|
||||
} else {
|
||||
wperror(L"read");
|
||||
}
|
||||
};
|
||||
items_.push_back(fd_monitor_item_t(std::move(self_pipe->read), std::move(callback)));
|
||||
}
|
||||
|
||||
// Extremely hacky destructor to clean up.
|
||||
// This is used in the tests to not leave stale fds around.
|
||||
// In fish shell, we never invoke the dtor so it doesn't matter that this is very dumb.
|
||||
fd_monitor_t::~fd_monitor_t() {
|
||||
notify_write_fd_.close();
|
||||
while (data_.acquire()->running) {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(5));
|
||||
}
|
||||
}
|
||||
|
||||
void fd_monitor_t::add(fd_monitor_item_t &&item) {
|
||||
assert(item.fd.valid() && "Invalid fd");
|
||||
assert(item.timeout_usec != 0 && "Invalid timeout");
|
||||
bool start_thread = add_pending_get_start_thread(std::move(item));
|
||||
if (start_thread) {
|
||||
void *(*trampoline)(void *) = [](void *self) -> void * {
|
||||
static_cast<fd_monitor_t *>(self)->run_in_background();
|
||||
return nullptr;
|
||||
};
|
||||
bool made_thread = make_detached_pthread(trampoline, this);
|
||||
if (!made_thread) {
|
||||
DIE("Unable to create a new pthread");
|
||||
}
|
||||
}
|
||||
// Tickle our notifier.
|
||||
char byte = 0;
|
||||
(void)write_loop(notify_write_fd_.fd(), &byte, 1);
|
||||
}
|
||||
|
||||
bool fd_monitor_t::add_pending_get_start_thread(fd_monitor_item_t &&item) {
|
||||
auto data = data_.acquire();
|
||||
data->pending.push_back(std::move(item));
|
||||
if (!data->running) {
|
||||
FLOG(fd_monitor, "Thread starting");
|
||||
data->running = true;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// Given a usec count, populate and return a timeval.
|
||||
// If the usec count is kNoTimeout, return nullptr.
|
||||
static struct timeval *usec_to_tv_or_null(uint64_t usec, struct timeval *timeout) {
|
||||
if (usec == fd_monitor_item_t::kNoTimeout) return nullptr;
|
||||
timeout->tv_sec = usec / kUsecPerSec;
|
||||
timeout->tv_usec = usec % kUsecPerSec;
|
||||
return timeout;
|
||||
}
|
||||
|
||||
uint64_t fd_monitor_item_t::usec_remaining(const time_point_t &now) const {
|
||||
assert(last_time.has_value() && "Should always have a last_time");
|
||||
if (timeout_usec == kNoTimeout) return kNoTimeout;
|
||||
assert(now >= *last_time && "steady clock went backwards!");
|
||||
uint64_t since = static_cast<uint64_t>(
|
||||
std::chrono::duration_cast<std::chrono::microseconds>(now - *last_time).count());
|
||||
return since >= timeout_usec ? 0 : timeout_usec - since;
|
||||
}
|
||||
|
||||
bool fd_monitor_item_t::service_item(const fd_set *fds, const time_point_t &now) {
|
||||
bool should_retain = true;
|
||||
bool readable = FD_ISSET(fd.fd(), fds);
|
||||
bool timed_out = !readable && usec_remaining(now) == 0;
|
||||
if (readable || timed_out) {
|
||||
last_time = now;
|
||||
callback(fd, timed_out);
|
||||
should_retain = fd.valid();
|
||||
}
|
||||
return should_retain;
|
||||
}
|
||||
|
||||
void fd_monitor_t::run_in_background() {
|
||||
ASSERT_IS_BACKGROUND_THREAD();
|
||||
for (;;) {
|
||||
uint64_t timeout_usec = fd_monitor_item_t::kNoTimeout;
|
||||
int max_fd = -1;
|
||||
fd_set fds;
|
||||
FD_ZERO(&fds);
|
||||
auto now = std::chrono::steady_clock::now();
|
||||
|
||||
for (auto &item : items_) {
|
||||
FD_SET(item.fd.fd(), &fds);
|
||||
if (!item.last_time.has_value()) item.last_time = now;
|
||||
timeout_usec = std::min(timeout_usec, item.usec_remaining(now));
|
||||
max_fd = std::max(max_fd, item.fd.fd());
|
||||
}
|
||||
|
||||
// If we have only one item, it means that we are not actively monitoring any fds other than
|
||||
// our self-pipe. In this case we wish to allow the thread to exit, but after a time, so we
|
||||
// aren't spinning up and tearing down the thread repeatedly.
|
||||
// Set a timeout of 16 msec; if nothing becomes readable by then we will exit.
|
||||
// We refer to this as the wait-lap.
|
||||
bool is_wait_lap = (items_.size() == 1);
|
||||
if (is_wait_lap) {
|
||||
assert(timeout_usec == fd_monitor_item_t::kNoTimeout &&
|
||||
"Should not have a timeout on wait-lap");
|
||||
timeout_usec = 16 * kUsecPerMsec;
|
||||
}
|
||||
|
||||
// Call select().
|
||||
struct timeval tv;
|
||||
int ret = select(max_fd + 1, &fds, nullptr, nullptr, usec_to_tv_or_null(timeout_usec, &tv));
|
||||
if (ret < 0 && errno != EINTR) {
|
||||
// Surprising error.
|
||||
wperror(L"select");
|
||||
}
|
||||
|
||||
// A predicate which services each item in turn, returning true if it should be removed.
|
||||
auto servicer = [&fds, &now](fd_monitor_item_t &item) {
|
||||
int fd = item.fd.fd();
|
||||
bool remove = !item.service_item(&fds, now);
|
||||
if (remove) FLOG(fd_monitor, "Removing fd", fd);
|
||||
return remove;
|
||||
};
|
||||
|
||||
// Service all items that are either readable or timed our, and remove any which say to do
|
||||
// so.
|
||||
now = std::chrono::steady_clock::now();
|
||||
items_.erase(std::remove_if(items_.begin(), items_.end(), servicer), items_.end());
|
||||
|
||||
if (terminate_) {
|
||||
// Time to go.
|
||||
data_.acquire()->running = false;
|
||||
return;
|
||||
}
|
||||
|
||||
// Maybe we got some new items. Check if our callback says so, or if this is the wait
|
||||
// lap, in which case we might want to commit to exiting.
|
||||
if (has_pending_ || is_wait_lap) {
|
||||
auto data = data_.acquire();
|
||||
// Move from 'pending' to 'items'.
|
||||
items_.insert(items_.end(), std::make_move_iterator(data->pending.begin()),
|
||||
std::make_move_iterator(data->pending.end()));
|
||||
data->pending.clear();
|
||||
has_pending_ = false;
|
||||
|
||||
if (is_wait_lap && items_.size() == 1) {
|
||||
// We had no items, waited a bit, and still have no items. We're going to shut down.
|
||||
// It's important to do this while holding the lock, otherwise we race with new
|
||||
// items being added.
|
||||
assert(data->running && "Thread should be running because we're that thread");
|
||||
FLOG(fd_monitor, "Thread exiting");
|
||||
data->running = false;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
104
src/fd_monitor.h
Normal file
104
src/fd_monitor.h
Normal file
|
@ -0,0 +1,104 @@
|
|||
#ifndef FISH_FD_MONITOR_H
|
||||
#define FISH_FD_MONITOR_H
|
||||
|
||||
#include <chrono>
|
||||
#include <cstdint>
|
||||
#include <functional>
|
||||
|
||||
#include "common.h"
|
||||
#include "maybe.h"
|
||||
|
||||
class fd_monitor_t;
|
||||
|
||||
/// An item containing an fd and callback, which can be monitored to watch when it becomes readable,
|
||||
/// and invoke the callback.
|
||||
struct fd_monitor_item_t {
|
||||
friend class fd_monitor_t;
|
||||
|
||||
/// The callback type for the item.
|
||||
/// It will be invoked when either \p fd is readable, or if the timeout was hit.
|
||||
using callback_t = std::function<void(autoclose_fd_t &fd, bool timed_out)>;
|
||||
|
||||
/// A sentinel value meaning no timeout.
|
||||
static constexpr uint64_t kNoTimeout = std::numeric_limits<uint64_t>::max();
|
||||
|
||||
/// The fd to monitor.
|
||||
autoclose_fd_t fd{};
|
||||
|
||||
/// A callback to be invoked when the fd is readable, or when we are timed out.
|
||||
/// If we time out, then timed_out will be true.
|
||||
/// If the fd is invalid on return from the function, then the item is removed.
|
||||
callback_t callback{};
|
||||
|
||||
/// The timeout in microseconds, or kNoTimeout for none.
|
||||
/// 0 timeouts are unsupported.
|
||||
uint64_t timeout_usec{kNoTimeout};
|
||||
|
||||
/// Construct from a file, callback, and optional timeout.
|
||||
fd_monitor_item_t(autoclose_fd_t fd, callback_t callback, uint64_t timeout_usec = kNoTimeout)
|
||||
: fd(std::move(fd)), callback(std::move(callback)), timeout_usec(timeout_usec) {
|
||||
assert(timeout_usec > 0 && "Invalid timeout");
|
||||
}
|
||||
|
||||
fd_monitor_item_t() = default;
|
||||
|
||||
private:
|
||||
// Fields and methods for the private use of fd_monitor_t.
|
||||
using time_point_t = std::chrono::time_point<std::chrono::steady_clock>;
|
||||
|
||||
// The last time we were called, or the initialization point.
|
||||
maybe_t<time_point_t> last_time{};
|
||||
|
||||
// \return the number of microseconds until the timeout should trigger, or kNoTimeout for none.
|
||||
// A 0 return means we are at or past the timeout.
|
||||
uint64_t usec_remaining(const time_point_t &now) const;
|
||||
|
||||
// Invoke this item's callback if its value is set in fd or has timed out.
|
||||
// \return true to retain the item, false to remove it.
|
||||
bool service_item(const fd_set *fds, const time_point_t &now);
|
||||
};
|
||||
|
||||
/// A class which can monitor a set of fds, invoking a callback when any becomes readable, or when
|
||||
/// per-item-configurable timeouts are hit.
|
||||
class fd_monitor_t {
|
||||
public:
|
||||
using item_list_t = std::vector<fd_monitor_item_t>;
|
||||
|
||||
fd_monitor_t();
|
||||
~fd_monitor_t();
|
||||
|
||||
/// Add an item to monitor.
|
||||
void add(fd_monitor_item_t &&item);
|
||||
|
||||
private:
|
||||
// The background thread runner.
|
||||
void run_in_background();
|
||||
|
||||
// Add a pending item, marking the thread as running.
|
||||
// \return true if we should start the thread.
|
||||
bool add_pending_get_start_thread(fd_monitor_item_t &&item);
|
||||
|
||||
// The list of items to monitor. This is only accessed on the background thread.
|
||||
item_list_t items_{};
|
||||
|
||||
// Set to true by the background thread when our self-pipe becomes readable.
|
||||
bool has_pending_{false};
|
||||
|
||||
// Latched to true by the background thread if our self-pipe is closed, which indicates we are
|
||||
// in the destructor and so should terminate.
|
||||
bool terminate_{false};
|
||||
|
||||
struct data_t {
|
||||
/// Pending items.
|
||||
item_list_t pending{};
|
||||
|
||||
/// Whether the thread is running.
|
||||
bool running{false};
|
||||
};
|
||||
owning_lock<data_t> data_;
|
||||
|
||||
/// The write end of our self-pipe.
|
||||
autoclose_fd_t notify_write_fd_{};
|
||||
};
|
||||
|
||||
#endif
|
|
@ -49,6 +49,7 @@
|
|||
#include "event.h"
|
||||
#include "expand.h"
|
||||
#include "fallback.h" // IWYU pragma: keep
|
||||
#include "fd_monitor.h"
|
||||
#include "function.h"
|
||||
#include "future_feature_flags.h"
|
||||
#include "highlight.h"
|
||||
|
@ -742,6 +743,107 @@ static int test_iothread_thread_call(std::atomic<int> *addr) {
|
|||
return after;
|
||||
}
|
||||
|
||||
static void test_fd_monitor() {
|
||||
say(L"Testing fd_monitor");
|
||||
|
||||
// Helper to make an item which counts how many times its callback is invoked.
|
||||
struct item_maker_t {
|
||||
std::atomic<bool> did_timeout{false};
|
||||
std::atomic<size_t> length_read{0};
|
||||
std::atomic<size_t> total_calls{0};
|
||||
bool always_exit{false};
|
||||
fd_monitor_item_t item;
|
||||
autoclose_fd_t writer;
|
||||
|
||||
explicit item_maker_t(uint64_t timeout_usec) {
|
||||
auto pipes = make_autoclose_pipes({}).acquire();
|
||||
writer = std::move(pipes.write);
|
||||
auto callback = [this](autoclose_fd_t &fd, bool timed_out) {
|
||||
bool was_closed = false;
|
||||
if (timed_out) {
|
||||
this->did_timeout = true;
|
||||
} else {
|
||||
char buff[4096];
|
||||
ssize_t amt = read(fd.fd(), buff, sizeof buff);
|
||||
length_read += amt;
|
||||
was_closed = (amt == 0);
|
||||
}
|
||||
total_calls += 1;
|
||||
if (always_exit || was_closed) {
|
||||
fd.close();
|
||||
}
|
||||
};
|
||||
item = fd_monitor_item_t(std::move(pipes.read), std::move(callback), timeout_usec);
|
||||
}
|
||||
|
||||
item_maker_t(const item_maker_t &) = delete;
|
||||
|
||||
// Write 42 bytes to our write end.
|
||||
void write42() {
|
||||
char buff[42] = {0};
|
||||
(void)write_loop(writer.fd(), buff, sizeof buff);
|
||||
}
|
||||
};
|
||||
|
||||
constexpr uint64_t usec_per_msec = 1000;
|
||||
|
||||
// Items which will never receive data or be called back.
|
||||
item_maker_t item_never(fd_monitor_item_t::kNoTimeout);
|
||||
item_maker_t item_hugetimeout(100000000llu * usec_per_msec);
|
||||
|
||||
// Item which should get no data, and time out.
|
||||
item_maker_t item0_timeout(16 * usec_per_msec);
|
||||
|
||||
// Item which should get exactly 42 bytes, then time out.
|
||||
item_maker_t item42_timeout(16 * usec_per_msec);
|
||||
|
||||
// Item which should get exactly 42 bytes, and not time out.
|
||||
item_maker_t item42_nottimeout(fd_monitor_item_t::kNoTimeout);
|
||||
|
||||
// Item which should get 42 bytes, then get notified it is closed.
|
||||
item_maker_t item42_thenclose(16 * usec_per_msec);
|
||||
|
||||
// Item which should be called back once.
|
||||
item_maker_t item_oneshot(16 * usec_per_msec);
|
||||
item_oneshot.always_exit = true;
|
||||
{
|
||||
fd_monitor_t monitor;
|
||||
for (auto *item : {&item_never, &item_hugetimeout, &item0_timeout, &item42_timeout,
|
||||
&item42_nottimeout, &item42_thenclose, &item_oneshot}) {
|
||||
monitor.add(std::move(item->item));
|
||||
}
|
||||
item42_timeout.write42();
|
||||
item42_nottimeout.write42();
|
||||
item42_thenclose.write42();
|
||||
item42_thenclose.writer.close();
|
||||
item_oneshot.write42();
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(84));
|
||||
}
|
||||
|
||||
do_test(!item_never.did_timeout);
|
||||
do_test(item_never.length_read == 0);
|
||||
|
||||
do_test(!item_hugetimeout.did_timeout);
|
||||
do_test(item_hugetimeout.length_read == 0);
|
||||
|
||||
do_test(item0_timeout.length_read == 0);
|
||||
do_test(item0_timeout.did_timeout);
|
||||
|
||||
do_test(item42_timeout.length_read == 42);
|
||||
do_test(item42_timeout.did_timeout);
|
||||
|
||||
do_test(item42_nottimeout.length_read == 42);
|
||||
do_test(!item42_nottimeout.did_timeout);
|
||||
|
||||
do_test(item42_thenclose.did_timeout == false);
|
||||
do_test(item42_thenclose.length_read == 42);
|
||||
do_test(item42_thenclose.total_calls == 2);
|
||||
|
||||
do_test(!item_oneshot.did_timeout);
|
||||
do_test(item_oneshot.length_read == 42);
|
||||
do_test(item_oneshot.total_calls == 1);
|
||||
}
|
||||
|
||||
static void test_iothread() {
|
||||
say(L"Testing iothreads");
|
||||
std::unique_ptr<std::atomic<int>> int_ptr = make_unique<std::atomic<int>>(0);
|
||||
|
@ -5447,6 +5549,7 @@ int main(int argc, char **argv) {
|
|||
if (should_test_function("convert")) test_convert();
|
||||
if (should_test_function("convert_nulls")) test_convert_nulls();
|
||||
if (should_test_function("tokenizer")) test_tokenizer();
|
||||
if (should_test_function("fd_monitor")) test_fd_monitor();
|
||||
if (should_test_function("iothread")) test_iothread();
|
||||
if (should_test_function("pthread")) test_pthread();
|
||||
if (should_test_function("parser")) test_parser();
|
||||
|
|
|
@ -94,6 +94,7 @@ class category_list_t {
|
|||
category_t profile_history{L"profile-history", L"History performance measurements"};
|
||||
|
||||
category_t iothread{L"iothread", L"Background IO thread events"};
|
||||
category_t fd_monitor{L"fd-monitor", L"FD monitor events"};
|
||||
|
||||
category_t term_support{L"term-support", L"Terminal feature detection"};
|
||||
|
||||
|
|
Loading…
Reference in a new issue