Add a poke function to fd_monitor

In preparation for fixing #7559, add a function poke_item to fd_monitor.

fd_monitor has a list of file descriptors, and invokes a callback when an
fd becomes readable. With this change, we assign each item a unique ID and
return it when the item is added; the ID may then be used to invoke the
callback explicitly.

The idea is that we can stop reading from the pipe associated with the
cmdsub when the job is finished, even if the pipe is still open.
This commit is contained in:
ridiculousfish 2021-01-03 16:26:28 -08:00
parent 534bc66a43
commit fd08b660c0
4 changed files with 164 additions and 44 deletions

View file

@ -26,15 +26,16 @@ fd_monitor_t::fd_monitor_t() {
// Add an item for ourselves.
// We don't need to go through 'pending' because we have not yet launched the thread, and don't
// want to yet.
auto callback = [this](const autoclose_fd_t &fd, bool timed_out) {
auto callback = [this](const autoclose_fd_t &fd, item_wake_reason_t reason) {
ASSERT_IS_BACKGROUND_THREAD();
assert(!timed_out && "Should not time out with kNoTimeout");
(void)timed_out;
assert(reason == item_wake_reason_t::readable &&
"Should not be poked, or time out with kNoTimeout");
(void)reason;
// Read some to take data off of the notifier.
char buff[4096];
ssize_t amt = read(fd.fd(), buff, sizeof buff);
if (amt > 0) {
this->has_pending_ = true;
this->has_pending_or_pokes_ = true;
} else if (amt == 0) {
this->terminate_ = true;
} else {
@ -54,10 +55,28 @@ fd_monitor_t::~fd_monitor_t() {
}
}
void fd_monitor_t::add(fd_monitor_item_t &&item) {
fd_monitor_item_id_t fd_monitor_t::add(fd_monitor_item_t &&item) {
assert(item.fd.valid() && "Invalid fd");
assert(item.timeout_usec != 0 && "Invalid timeout");
bool start_thread = add_pending_get_start_thread(std::move(item));
assert(item.item_id == 0 && "Item should not already have an ID");
bool start_thread = false;
fd_monitor_item_id_t item_id{};
{
// Lock around a local region.
auto data = data_.acquire();
// Assign an id and add the item to pending.
item_id = ++data->last_id;
item.item_id = item_id;
data->pending.push_back(std::move(item));
// Maybe plan to start the thread.
if (!data->running) {
FLOG(fd_monitor, "Thread starting");
data->running = true;
start_thread = true;
}
}
if (start_thread) {
void *(*trampoline)(void *) = [](void *self) -> void * {
static_cast<fd_monitor_t *>(self)->run_in_background();
@ -71,17 +90,24 @@ void fd_monitor_t::add(fd_monitor_item_t &&item) {
// Tickle our notifier.
char byte = 0;
(void)write_loop(notify_write_fd_.fd(), &byte, 1);
return item_id;
}
bool fd_monitor_t::add_pending_get_start_thread(fd_monitor_item_t &&item) {
auto data = data_.acquire();
data->pending.push_back(std::move(item));
if (!data->running) {
FLOG(fd_monitor, "Thread starting");
data->running = true;
return true;
void fd_monitor_t::poke_item(fd_monitor_item_id_t item_id) {
assert(item_id > 0 && "Invalid item ID");
bool needs_notifier_byte = false;
{
auto data = data_.acquire();
needs_notifier_byte = data->pokelist.empty();
// Insert it, sorted.
auto where = std::lower_bound(data->pokelist.begin(), data->pokelist.end(), item_id);
data->pokelist.insert(where, item_id);
}
if (needs_notifier_byte) {
// Tickle our notifier.
char byte = 0;
(void)write_loop(notify_write_fd_.fd(), &byte, 1);
}
return false;
}
// Given a usec count, populate and return a timeval.
@ -108,15 +134,33 @@ bool fd_monitor_item_t::service_item(const fd_set *fds, const time_point_t &now)
bool timed_out = !readable && usec_remaining(now) == 0;
if (readable || timed_out) {
last_time = now;
callback(fd, timed_out);
item_wake_reason_t reason =
readable ? item_wake_reason_t::readable : item_wake_reason_t::timeout;
callback(fd, reason);
should_retain = fd.valid();
}
return should_retain;
}
bool fd_monitor_item_t::poke_item(const poke_list_t &pokelist) {
if (item_id == 0 || !std::binary_search(pokelist.begin(), pokelist.end(), item_id)) {
// Not pokeable or not in the pokelist.
return true;
}
callback(fd, item_wake_reason_t::poke);
return fd.valid();
}
void fd_monitor_t::run_in_background() {
ASSERT_IS_BACKGROUND_THREAD();
poke_list_t pokelist;
for (;;) {
// Poke any items that need it.
if (!pokelist.empty()) {
this->poke_in_background(std::move(pokelist));
pokelist.clear();
}
uint64_t timeout_usec = fd_monitor_item_t::kNoTimeout;
int max_fd = -1;
fd_set fds;
@ -158,7 +202,7 @@ void fd_monitor_t::run_in_background() {
return remove;
};
// Service all items that are either readable or timed our, and remove any which say to do
// Service all items that are either readable or timed out, and remove any which say to do
// so.
now = std::chrono::steady_clock::now();
items_.erase(std::remove_if(items_.begin(), items_.end(), servicer), items_.end());
@ -171,13 +215,19 @@ void fd_monitor_t::run_in_background() {
// Maybe we got some new items. Check if our callback says so, or if this is the wait
// lap, in which case we might want to commit to exiting.
if (has_pending_ || is_wait_lap) {
if (has_pending_or_pokes_ || is_wait_lap) {
auto data = data_.acquire();
// Move from 'pending' to 'items'.
items_.insert(items_.end(), std::make_move_iterator(data->pending.begin()),
std::make_move_iterator(data->pending.end()));
data->pending.clear();
has_pending_ = false;
// Grab any pokelist.
assert(pokelist.empty() && "pokelist should be empty or else we're dropping pokes");
pokelist = std::move(data->pokelist);
data->pokelist.clear();
has_pending_or_pokes_ = false;
if (is_wait_lap && items_.size() == 1) {
// We had no items, waited a bit, and still have no items. We're going to shut down.
@ -191,3 +241,14 @@ void fd_monitor_t::run_in_background() {
}
}
}
void fd_monitor_t::poke_in_background(const poke_list_t &pokelist) {
ASSERT_IS_BACKGROUND_THREAD();
auto poker = [&pokelist](fd_monitor_item_t &item) {
int fd = item.fd.fd();
bool remove = !item.poke_item(pokelist);
if (remove) FLOG(fd_monitor, "Removing fd", fd);
return remove;
};
items_.erase(std::remove_if(items_.begin(), items_.end(), poker), items_.end());
}

View file

@ -12,14 +12,24 @@
class fd_monitor_t;
/// Each item added to fd_monitor_t is assigned a unique ID, which is not recycled.
/// Items may have their callback triggered immediately by passing the ID.
/// Zero is a sentinel.
using fd_monitor_item_id_t = uint64_t;
/// Reasons for waking an item.
enum class item_wake_reason_t {
readable, // the fd became readable
timeout, // the requested timeout was hit
poke, // the item was "poked" (woken up explicitly)
};
/// An item containing an fd and callback, which can be monitored to watch when it becomes readable,
/// and invoke the callback.
struct fd_monitor_item_t {
friend class fd_monitor_t;
/// The callback type for the item.
/// It will be invoked when either \p fd is readable, or if the timeout was hit.
using callback_t = std::function<void(autoclose_fd_t &fd, bool timed_out)>;
/// The callback type for the item. It is passed \p fd, and the reason for waking \p reason.
/// The callback may close \p fd, in which case the item is removed.
using callback_t = std::function<void(autoclose_fd_t &fd, item_wake_reason_t reason)>;
/// A sentinel value meaning no timeout.
static constexpr uint64_t kNoTimeout = std::numeric_limits<uint64_t>::max();
@ -51,6 +61,9 @@ struct fd_monitor_item_t {
// The last time we were called, or the initialization point.
maybe_t<time_point_t> last_time{};
// The ID for this item. This is assigned by the fd monitor.
fd_monitor_item_id_t item_id{0};
// \return the number of microseconds until the timeout should trigger, or kNoTimeout for none.
// A 0 return means we are at or past the timeout.
uint64_t usec_remaining(const time_point_t &now) const;
@ -58,6 +71,13 @@ struct fd_monitor_item_t {
// Invoke this item's callback if its value is set in fd or has timed out.
// \return true to retain the item, false to remove it.
bool service_item(const fd_set *fds, const time_point_t &now);
// Invoke this item's callback with a poke, if its ID is present in the (sorted) pokelist.
// \return true to retain the item, false to remove it.
using poke_list_t = std::vector<fd_monitor_item_id_t>;
bool poke_item(const poke_list_t &pokelist);
friend class fd_monitor_t;
};
/// A class which can monitor a set of fds, invoking a callback when any becomes readable, or when
@ -66,34 +86,47 @@ class fd_monitor_t {
public:
using item_list_t = std::vector<fd_monitor_item_t>;
// A "pokelist" is a sorted list of item IDs which need explicit wakeups.
using poke_list_t = std::vector<fd_monitor_item_id_t>;
fd_monitor_t();
~fd_monitor_t();
/// Add an item to monitor.
void add(fd_monitor_item_t &&item);
/// Add an item to monitor. \return the ID assigned to the item.
fd_monitor_item_id_t add(fd_monitor_item_t &&item);
/// Mark that an item with a given ID needs to be explicitly woken up.
void poke_item(fd_monitor_item_id_t item_id);
private:
// The background thread runner.
void run_in_background();
// Add a pending item, marking the thread as running.
// \return true if we should start the thread.
bool add_pending_get_start_thread(fd_monitor_item_t &&item);
// Poke items in the pokelist, removing any items that close their FD.
// The pokelist is consumed after this.
// This is only called in the background thread.
void poke_in_background(const poke_list_t &pokelist);
// The list of items to monitor. This is only accessed on the background thread.
item_list_t items_{};
// Set to true by the background thread when our self-pipe becomes readable.
bool has_pending_{false};
bool has_pending_or_pokes_{false};
// Latched to true by the background thread if our self-pipe is closed, which indicates we are
// in the destructor and so should terminate.
bool terminate_{false};
struct data_t {
/// Pending items.
/// Pending items. This is set under the lock, then the background thread grabs them.
item_list_t pending{};
/// List of IDs for items that need to be poked (explicitly woken up).
poke_list_t pokelist{};
/// The last ID assigned, or if none.
fd_monitor_item_id_t last_id{0};
/// Whether the thread is running.
bool running{false};
};

View file

@ -787,7 +787,9 @@ static void test_fd_monitor() {
struct item_maker_t {
std::atomic<bool> did_timeout{false};
std::atomic<size_t> length_read{0};
std::atomic<size_t> pokes{0};
std::atomic<size_t> total_calls{0};
fd_monitor_item_id_t item_id{0};
bool always_exit{false};
fd_monitor_item_t item;
autoclose_fd_t writer;
@ -795,15 +797,21 @@ static void test_fd_monitor() {
explicit item_maker_t(uint64_t timeout_usec) {
auto pipes = make_autoclose_pipes({}).acquire();
writer = std::move(pipes.write);
auto callback = [this](autoclose_fd_t &fd, bool timed_out) {
auto callback = [this](autoclose_fd_t &fd, item_wake_reason_t reason) {
bool was_closed = false;
if (timed_out) {
this->did_timeout = true;
} else {
char buff[4096];
ssize_t amt = read(fd.fd(), buff, sizeof buff);
length_read += amt;
was_closed = (amt == 0);
switch (reason) {
case item_wake_reason_t::timeout:
this->did_timeout = true;
break;
case item_wake_reason_t::poke:
this->pokes += 1;
break;
case item_wake_reason_t::readable:
char buff[4096];
ssize_t amt = read(fd.fd(), buff, sizeof buff);
this->length_read += amt;
was_closed = (amt == 0);
break;
}
total_calls += 1;
if (always_exit || was_closed) {
@ -840,45 +848,63 @@ static void test_fd_monitor() {
// Item which should get 42 bytes, then get notified it is closed.
item_maker_t item42_thenclose(16 * usec_per_msec);
// Item which gets one poke.
item_maker_t item_pokee(fd_monitor_item_t::kNoTimeout);
// Item which should be called back once.
item_maker_t item_oneshot(16 * usec_per_msec);
item_oneshot.always_exit = true;
{
fd_monitor_t monitor;
for (auto item : {&item_never, &item_hugetimeout, &item0_timeout, &item42_timeout,
&item42_nottimeout, &item42_thenclose, &item_oneshot}) {
monitor.add(std::move(item->item));
for (item_maker_t *item :
{&item_never, &item_hugetimeout, &item0_timeout, &item42_timeout, &item42_nottimeout,
&item42_thenclose, &item_pokee, &item_oneshot}) {
item->item_id = monitor.add(std::move(item->item));
}
item42_timeout.write42();
item42_nottimeout.write42();
item42_thenclose.write42();
item42_thenclose.writer.close();
item_oneshot.write42();
monitor.poke_item(item_pokee.item_id);
std::this_thread::sleep_for(std::chrono::milliseconds(84));
}
do_test(!item_never.did_timeout);
do_test(item_never.length_read == 0);
do_test(item_never.pokes == 0);
do_test(!item_hugetimeout.did_timeout);
do_test(item_hugetimeout.length_read == 0);
do_test(item_hugetimeout.pokes == 0);
do_test(item0_timeout.length_read == 0);
do_test(item0_timeout.did_timeout);
do_test(item0_timeout.pokes == 0);
do_test(item42_timeout.length_read == 42);
do_test(item42_timeout.did_timeout);
do_test(item42_timeout.pokes == 0);
do_test(item42_nottimeout.length_read == 42);
do_test(!item42_nottimeout.did_timeout);
do_test(item42_nottimeout.pokes == 0);
do_test(item42_thenclose.did_timeout == false);
do_test(item42_thenclose.length_read == 42);
do_test(item42_thenclose.total_calls == 2);
do_test(item42_thenclose.pokes == 0);
do_test(!item_oneshot.did_timeout);
do_test(item_oneshot.length_read == 42);
do_test(item_oneshot.total_calls == 1);
do_test(item_oneshot.pokes == 0);
do_test(!item_pokee.did_timeout);
do_test(item_pokee.length_read == 0);
do_test(item_pokee.total_calls == 1);
do_test(item_pokee.pokes == 1);
}
static void test_iothread() {

View file

@ -124,14 +124,14 @@ void io_buffer_t::begin_filling(autoclose_fd_t fd) {
fd_monitor_item_t item;
item.fd = std::move(fd);
item.timeout_usec = poll_usec;
item.callback = [this, promise](autoclose_fd_t &fd, bool timed_out) {
item.callback = [this, promise](autoclose_fd_t &fd, item_wake_reason_t reason) {
ASSERT_IS_BACKGROUND_THREAD();
// Only check the shutdown flag if we timed out.
// Only check the shutdown flag if we timed out or were poked.
// It's important that if select() indicated we were readable, that we call select() again
// allowing it to time out. Note the typical case is that the fd will be closed, in which
// case select will return immediately.
bool done = false;
if (!timed_out) {
if (reason == item_wake_reason_t::readable) {
// select() reported us as readable; read a bit.
scoped_lock locker(append_lock_);
ssize_t ret = read_once(fd.fd());