fish-shell/src/fd_monitor.cpp
ridiculousfish 97f29b1f4d Pipe fds to move to the "high range"
This concerns how fish prevents its own fds from interfering with
user-defined fd redirections, like `echo hi >&5`. fish has historically
done this by tracking all user defined redirections when running a job,
and ensuring that pipes are not assigned the same fds. However this is
annoying to pass around - it means that we have to thread user-defined
redirections into pipe creation.

Take a page from zsh and just ensure that all pipes we create have fds in
the "high range," which here means at least 10. The primary way to do this
is via the F_DUPFD_CLOEXEC syscall, which also sets CLOEXEC, so we aren't
invoking additional syscalls in the common case. This will free us from
having to track which fds are in user-defined redirections.
2021-02-05 17:58:08 -08:00

254 lines
9.2 KiB
C++

// Support for monitoring a set of fds.
#include "config.h" // IWYU pragma: keep
#include "fd_monitor.h"
#include "flog.h"
#include "io.h"
#include "iothread.h"
#include "wutil.h"
static constexpr uint64_t kUsecPerMsec = 1000;
static constexpr uint64_t kUsecPerSec = 1000 * kUsecPerMsec;
fd_monitor_t::fd_monitor_t() {
auto self_pipe = make_autoclose_pipes();
if (!self_pipe) {
DIE("Unable to create pipes");
}
// Ensure the write side is nonblocking to avoid deadlock.
notify_write_fd_ = std::move(self_pipe->write);
if (make_fd_nonblocking(notify_write_fd_.fd())) {
wperror(L"fcntl");
}
// Add an item for ourselves.
// We don't need to go through 'pending' because we have not yet launched the thread, and don't
// want to yet.
auto callback = [this](const autoclose_fd_t &fd, item_wake_reason_t reason) {
ASSERT_IS_BACKGROUND_THREAD();
assert(reason == item_wake_reason_t::readable &&
"Should not be poked, or time out with kNoTimeout");
(void)reason;
// Read some to take data off of the notifier.
char buff[4096];
ssize_t amt = read(fd.fd(), buff, sizeof buff);
if (amt > 0) {
this->has_pending_or_pokes_ = true;
} else if (amt == 0) {
this->terminate_ = true;
} else {
wperror(L"read");
}
};
items_.push_back(fd_monitor_item_t(std::move(self_pipe->read), std::move(callback)));
}
// Extremely hacky destructor to clean up.
// This is used in the tests to not leave stale fds around.
// In fish shell, we never invoke the dtor so it doesn't matter that this is very dumb.
fd_monitor_t::~fd_monitor_t() {
notify_write_fd_.close();
while (data_.acquire()->running) {
std::this_thread::sleep_for(std::chrono::milliseconds(5));
}
}
fd_monitor_item_id_t fd_monitor_t::add(fd_monitor_item_t &&item) {
assert(item.fd.valid() && "Invalid fd");
assert(item.timeout_usec != 0 && "Invalid timeout");
assert(item.item_id == 0 && "Item should not already have an ID");
bool start_thread = false;
fd_monitor_item_id_t item_id{};
{
// Lock around a local region.
auto data = data_.acquire();
// Assign an id and add the item to pending.
item_id = ++data->last_id;
item.item_id = item_id;
data->pending.push_back(std::move(item));
// Maybe plan to start the thread.
if (!data->running) {
FLOG(fd_monitor, "Thread starting");
data->running = true;
start_thread = true;
}
}
if (start_thread) {
void *(*trampoline)(void *) = [](void *self) -> void * {
static_cast<fd_monitor_t *>(self)->run_in_background();
return nullptr;
};
bool made_thread = make_detached_pthread(trampoline, this);
if (!made_thread) {
DIE("Unable to create a new pthread");
}
}
// Tickle our notifier.
char byte = 0;
(void)write_loop(notify_write_fd_.fd(), &byte, 1);
return item_id;
}
void fd_monitor_t::poke_item(fd_monitor_item_id_t item_id) {
assert(item_id > 0 && "Invalid item ID");
bool needs_notifier_byte = false;
{
auto data = data_.acquire();
needs_notifier_byte = data->pokelist.empty();
// Insert it, sorted.
auto where = std::lower_bound(data->pokelist.begin(), data->pokelist.end(), item_id);
data->pokelist.insert(where, item_id);
}
if (needs_notifier_byte) {
// Tickle our notifier.
char byte = 0;
(void)write_loop(notify_write_fd_.fd(), &byte, 1);
}
}
// Given a usec count, populate and return a timeval.
// If the usec count is kNoTimeout, return nullptr.
static struct timeval *usec_to_tv_or_null(uint64_t usec, struct timeval *timeout) {
if (usec == fd_monitor_item_t::kNoTimeout) return nullptr;
timeout->tv_sec = usec / kUsecPerSec;
timeout->tv_usec = usec % kUsecPerSec;
return timeout;
}
uint64_t fd_monitor_item_t::usec_remaining(const time_point_t &now) const {
assert(last_time.has_value() && "Should always have a last_time");
if (timeout_usec == kNoTimeout) return kNoTimeout;
assert(now >= *last_time && "steady clock went backwards!");
uint64_t since = static_cast<uint64_t>(
std::chrono::duration_cast<std::chrono::microseconds>(now - *last_time).count());
return since >= timeout_usec ? 0 : timeout_usec - since;
}
bool fd_monitor_item_t::service_item(const fd_set *fds, const time_point_t &now) {
bool should_retain = true;
bool readable = FD_ISSET(fd.fd(), fds);
bool timed_out = !readable && usec_remaining(now) == 0;
if (readable || timed_out) {
last_time = now;
item_wake_reason_t reason =
readable ? item_wake_reason_t::readable : item_wake_reason_t::timeout;
callback(fd, reason);
should_retain = fd.valid();
}
return should_retain;
}
bool fd_monitor_item_t::poke_item(const poke_list_t &pokelist) {
if (item_id == 0 || !std::binary_search(pokelist.begin(), pokelist.end(), item_id)) {
// Not pokeable or not in the pokelist.
return true;
}
callback(fd, item_wake_reason_t::poke);
return fd.valid();
}
void fd_monitor_t::run_in_background() {
ASSERT_IS_BACKGROUND_THREAD();
poke_list_t pokelist;
for (;;) {
// Poke any items that need it.
if (!pokelist.empty()) {
this->poke_in_background(std::move(pokelist));
pokelist.clear();
}
uint64_t timeout_usec = fd_monitor_item_t::kNoTimeout;
int max_fd = -1;
fd_set fds;
FD_ZERO(&fds);
auto now = std::chrono::steady_clock::now();
for (auto &item : items_) {
FD_SET(item.fd.fd(), &fds);
if (!item.last_time.has_value()) item.last_time = now;
timeout_usec = std::min(timeout_usec, item.usec_remaining(now));
max_fd = std::max(max_fd, item.fd.fd());
}
// If we have only one item, it means that we are not actively monitoring any fds other than
// our self-pipe. In this case we wish to allow the thread to exit, but after a time, so we
// aren't spinning up and tearing down the thread repeatedly.
// Set a timeout of 16 msec; if nothing becomes readable by then we will exit.
// We refer to this as the wait-lap.
bool is_wait_lap = (items_.size() == 1);
if (is_wait_lap) {
assert(timeout_usec == fd_monitor_item_t::kNoTimeout &&
"Should not have a timeout on wait-lap");
timeout_usec = 16 * kUsecPerMsec;
}
// Call select().
struct timeval tv;
int ret = select(max_fd + 1, &fds, nullptr, nullptr, usec_to_tv_or_null(timeout_usec, &tv));
if (ret < 0 && errno != EINTR) {
// Surprising error.
wperror(L"select");
}
// A predicate which services each item in turn, returning true if it should be removed.
auto servicer = [&fds, &now](fd_monitor_item_t &item) {
int fd = item.fd.fd();
bool remove = !item.service_item(&fds, now);
if (remove) FLOG(fd_monitor, "Removing fd", fd);
return remove;
};
// Service all items that are either readable or timed out, and remove any which say to do
// so.
now = std::chrono::steady_clock::now();
items_.erase(std::remove_if(items_.begin(), items_.end(), servicer), items_.end());
if (terminate_) {
// Time to go.
data_.acquire()->running = false;
return;
}
// Maybe we got some new items. Check if our callback says so, or if this is the wait
// lap, in which case we might want to commit to exiting.
if (has_pending_or_pokes_ || is_wait_lap) {
auto data = data_.acquire();
// Move from 'pending' to 'items'.
items_.insert(items_.end(), std::make_move_iterator(data->pending.begin()),
std::make_move_iterator(data->pending.end()));
data->pending.clear();
// Grab any pokelist.
assert(pokelist.empty() && "pokelist should be empty or else we're dropping pokes");
pokelist = std::move(data->pokelist);
data->pokelist.clear();
has_pending_or_pokes_ = false;
if (is_wait_lap && items_.size() == 1) {
// We had no items, waited a bit, and still have no items. We're going to shut down.
// It's important to do this while holding the lock, otherwise we race with new
// items being added.
assert(data->running && "Thread should be running because we're that thread");
FLOG(fd_monitor, "Thread exiting");
data->running = false;
return;
}
}
}
}
void fd_monitor_t::poke_in_background(const poke_list_t &pokelist) {
ASSERT_IS_BACKGROUND_THREAD();
auto poker = [&pokelist](fd_monitor_item_t &item) {
int fd = item.fd.fd();
bool remove = !item.poke_item(pokelist);
if (remove) FLOG(fd_monitor, "Removing fd", fd);
return remove;
};
items_.erase(std::remove_if(items_.begin(), items_.end(), poker), items_.end());
}