Refactor the named pipe uvar notifier with a state machine

This attempts to simplify the named pipe notifier by switching to a state
machine model.
This commit is contained in:
ridiculousfish 2021-05-03 18:12:34 -07:00
parent 7c5b8b8556
commit e0bf23ad26

View file

@ -1317,15 +1317,42 @@ static autoclose_fd_t make_fifo(const wchar_t *test_path, const wchar_t *suffix)
// To post a notification, write some data to the pipe, wait a little while, and then read it back. // To post a notification, write some data to the pipe, wait a little while, and then read it back.
// //
// To receive a notification, watch for the pipe to become readable. When it does, enter a polling // To receive a notification, watch for the pipe to become readable. When it does, enter a polling
// mode until the pipe is no longer readable. To guard against the possibility of a shell exiting // mode until the pipe is no longer readable, where we poll based on the modification date of the
// when there is data remaining in the pipe, if the pipe is kept readable too long, clients will // pipe. To guard against the possibility of a shell exiting when there is data remaining in the
// attempt to read data out of it (to render it no longer readable). // pipe, if the pipe is kept readable too long, clients will attempt to read data out of it (to
// render it no longer readable).
class universal_notifier_named_pipe_t final : public universal_notifier_t { class universal_notifier_named_pipe_t final : public universal_notifier_t {
#if !defined(__CYGWIN__) #if !defined(__CYGWIN__)
// We operate a state machine.
enum state_t{
// The pipe is not yet readable. There is nothing to do in poll.
// If the pipe becomes readable we will enter the polling state.
waiting_for_readable,
// The pipe is readable. In poll, check if the pipe is still readable,
// and whether its timestamp has changed.
polling_during_readable,
// We have written to the pipe (so we expect it to be readable).
// We may read back from it in poll().
waiting_to_drain,
};
// The state we are currently in.
state_t state{waiting_for_readable};
// When we entered that state, in microseconds since epoch.
long long state_start_usec{-1};
// The pipe itself; this is opened read/write.
autoclose_fd_t pipe_fd; autoclose_fd_t pipe_fd;
file_id_t pipe_fd_id{};
long long readback_time_usec{0}; // The pipe's file ID containing the last modified timestamp.
size_t readback_amount{0}; file_id_t pipe_timestamps{};
// If we are in waiting_to_drain state, how much we have written and therefore are responsible
// for draining.
size_t drain_amount{0};
// We "flash" the pipe to make it briefly readable, for this many usec. // We "flash" the pipe to make it briefly readable, for this many usec.
static constexpr long long k_flash_duration_usec = 1e5; static constexpr long long k_flash_duration_usec = 1e5;
@ -1333,10 +1360,28 @@ class universal_notifier_named_pipe_t final : public universal_notifier_t {
// If the pipe remains readable for this many usec, we drain it. // If the pipe remains readable for this many usec, we drain it.
static constexpr long long k_readable_too_long_duration_usec = 5e6; static constexpr long long k_readable_too_long_duration_usec = 5e6;
bool polling_due_to_readable_fd{false}; /// \return the name of a state.
long long drain_if_still_readable_time_usec{0}; static const char *state_name(state_t s) {
switch (s) {
case waiting_for_readable:
return "waiting";
case polling_during_readable:
return "polling";
case waiting_to_drain:
return "draining";
}
DIE("Unreachable");
}
void drain_excessive_data() const { // Switch to a state (may or may not be new).
void set_state(state_t new_state) {
FLOGF(uvar_notifier, "changing from %s to %s", state_name(state), state_name(new_state));
state = new_state;
state_start_usec = get_time();
}
// Called when the pipe has been readable for too long.
void drain_excess() const {
// The pipe seems to have data on it, that won't go away. Read a big chunk out of it. We // The pipe seems to have data on it, that won't go away. Read a big chunk out of it. We
// don't read until it's exhausted, because if someone were to pipe say /dev/null, that // don't read until it's exhausted, because if someone were to pipe say /dev/null, that
// would cause us to hang! // would cause us to hang!
@ -1345,16 +1390,26 @@ class universal_notifier_named_pipe_t final : public universal_notifier_t {
ignore_result(read(pipe_fd.fd(), buff, sizeof buff)); ignore_result(read(pipe_fd.fd(), buff, sizeof buff));
} }
// Called when we want to read back data we have written, to mark the pipe as non-readable.
void drain_written() {
while (this->drain_amount > 0) {
char buff[64];
size_t amt = std::min(this->drain_amount, sizeof buff);
ignore_result(read(this->pipe_fd.fd(), buff, amt));
this->drain_amount -= amt;
}
}
/// Check if the pipe's file ID (aka struct stat) is different from what we have stored. /// Check if the pipe's file ID (aka struct stat) is different from what we have stored.
/// If it has changed, it indicates that someone has written to the pipe; update our stored id. /// If it has changed, it indicates that someone has modified the pipe; update our stored id.
/// \return true if changed, false if not. /// \return true if changed, false if not.
bool check_pipe_id_changed() { bool update_pipe_timestamps() {
if (!pipe_fd.valid()) return false; if (!pipe_fd.valid()) return false;
file_id_t file_id = file_id_for_fd(pipe_fd.fd()); file_id_t timestamps = file_id_for_fd(pipe_fd.fd());
if (file_id == this->pipe_fd_id) { if (timestamps == this->pipe_timestamps) {
return false; return false;
} }
this->pipe_fd_id = file_id; this->pipe_timestamps = timestamps;
return true; return true;
} }
@ -1365,120 +1420,119 @@ class universal_notifier_named_pipe_t final : public universal_notifier_t {
~universal_notifier_named_pipe_t() override = default; ~universal_notifier_named_pipe_t() override = default;
int notification_fd() const override { int notification_fd() const override {
if (polling_due_to_readable_fd) { if (!pipe_fd.valid()) return -1;
// We are in polling mode because we think our fd is readable. This means that, if we // If we are waiting for the pipe to be readable, return it for select.
// return it to be select()'d on, we'll be called back immediately. So don't return it. // Otherwise we expect it to be readable already; return invalid.
switch (state) {
case waiting_for_readable:
return pipe_fd.fd();
case polling_during_readable:
case waiting_to_drain:
return -1; return -1;
} }
// We are not in polling mode. Return the fd so it can be watched. DIE("unreachable");
return pipe_fd.fd();
} }
bool notification_fd_became_readable(int fd) override { bool notification_fd_became_readable(int fd) override {
// Our fd is readable. We deliberately do not read anything out of it: if we did, other assert(fd == pipe_fd.fd() && "Wrong fd");
// sessions may miss the notification. Instead, we go into "polling mode:" we do not
// select() on our fd for a while, and sync periodically until the fd is no longer readable.
// However, if we are the one who posted the notification, we don't sync (until we clean
// up!)
UNUSED(fd); UNUSED(fd);
bool should_sync = false; switch (state) {
if (readback_time_usec == 0) { case waiting_for_readable:
FLOG(uvar_notifier, "entering polling mode"); // We are now readable.
polling_due_to_readable_fd = true; // Grab the timestamp and return true indicating that we received a notification.
drain_if_still_readable_time_usec = get_time() + k_readable_too_long_duration_usec; set_state(polling_during_readable);
should_sync = true; update_pipe_timestamps();
// Update our pipe's timestamps. return true;
check_pipe_id_changed();
case polling_during_readable:
case waiting_to_drain:
// We did not return an fd to wait on, so should not be called.
DIE("should not be called in this state");
} }
return should_sync; DIE("unreachable");
} }
void post_notification() override { void post_notification() override {
if (!pipe_fd.valid()) return; if (!pipe_fd.valid()) return;
// We need to write some data (any data) to the pipe, then wait for a while, then read // We need to write some data (any data) to the pipe, then wait for a while, then read
// it back. Nobody is expected to read it except us. // it back. Nobody is expected to read it except us.
FLOG(uvar_notifier, "posting notification by writing to pipe"); FLOGF(uvar_notifier, "writing to pipe (written %lu)", (unsigned long)drain_amount);
char c[1] = {'\0'}; char c[1] = {'\0'};
ssize_t amt_written = write(pipe_fd.fd(), c, sizeof c); ssize_t amt_written = write(pipe_fd.fd(), c, sizeof c);
if (amt_written < 0 && (errno == EWOULDBLOCK || errno == EAGAIN)) { if (amt_written < 0 && (errno == EWOULDBLOCK || errno == EAGAIN)) {
// Very unsual: the pipe is full! // Very unsual: the pipe is full! Try to read some and repeat once.
drain_excessive_data(); drain_excess();
amt_written = write(pipe_fd.fd(), c, sizeof c);
if (amt_written < 0) {
FLOG(uvar_notifier, "pipe could not be drained, skipping notification");
return;
} }
FLOG(uvar_notifier, "pipe drained");
// Now schedule a read for some time in the future. }
this->readback_time_usec = get_time() + k_flash_duration_usec; assert(amt_written >= 0 && "Amount should not be negative");
this->readback_amount += sizeof c; this->drain_amount += amt_written;
// We unconditionally set our state to waiting to drain.
// No need to react to our own changes. set_state(waiting_to_drain);
check_pipe_id_changed(); update_pipe_timestamps();
} }
unsigned long usec_delay_between_polls() const override { unsigned long usec_delay_between_polls() const override {
unsigned long readback_delay = ULONG_MAX; if (!pipe_fd.valid()) return 0;
if (this->readback_time_usec > 0) { switch (state) {
// How long until the readback? case waiting_for_readable:
long long now = get_time(); // No polling necessary until it becomes readable.
if (now >= this->readback_time_usec) { return 0;
// Oops, it already passed! Return something tiny.
readback_delay = 1000;
} else {
readback_delay = static_cast<unsigned long>(this->readback_time_usec - now);
}
}
unsigned long polling_delay = ULONG_MAX; case polling_during_readable:
if (polling_due_to_readable_fd) { case waiting_to_drain:
// We're in polling mode. Don't return a value less than our polling interval. return k_flash_duration_usec;
polling_delay = k_flash_duration_usec;
} }
DIE("Unreachable");
// Now return the smaller of the two values. If we get ULONG_MAX, it means there's no more
// need to poll; in that case return 0.
unsigned long result = std::min(readback_delay, polling_delay);
if (result == ULONG_MAX) {
result = 0;
}
return result;
} }
bool poll() override { bool poll() override {
if (!pipe_fd.valid()) return false; if (!pipe_fd.valid()) return false;
switch (state) {
case waiting_for_readable:
// Nothing to do until the fd is readable.
return false;
// Check if we are past the readback time. case polling_during_readable: {
if (this->readback_time_usec > 0 && get_time() >= this->readback_time_usec) { // If we're no longer readable, go back to wait mode.
FLOGF(uvar_notifier, "reading back %lu bytes we wrote to pipe", this->readback_amount); // Conversely, if we have been readable too long, perhaps some fish died while its
// Read back what we wrote. We do nothing with the value. // written data was still on the pipe; drain some.
while (this->readback_amount > 0) { if (!select_wrapper_t::poll_fd_readable(pipe_fd.fd())) {
char buff[64]; set_state(waiting_for_readable);
size_t amt_to_read = std::min(this->readback_amount, sizeof(buff)); } else if (get_time() >= state_start_usec + k_readable_too_long_duration_usec) {
ignore_result(read(this->pipe_fd.fd(), buff, amt_to_read)); drain_excess();
this->readback_amount -= amt_to_read;
} }
assert(this->readback_amount == 0); // Sync if the pipe's timestamp is different, meaning someone modified the pipe
this->readback_time_usec = 0; // since we last saw it.
if (update_pipe_timestamps()) {
FLOG(uvar_notifier, "pipe changed, will sync uvars");
return true;
} }
// Check to see if we are doing readability polling.
if (!polling_due_to_readable_fd) {
return false; return false;
} }
// We are polling, so we are definitely going to sync. case waiting_to_drain: {
// See if this is still readable. // We wrote data to the pipe. Maybe read it back.
// If we are still readable, then there is still data on the pipe; maybe another
// change occurred with ours.
if (get_time() >= state_start_usec + k_flash_duration_usec) {
drain_written();
if (!select_wrapper_t::poll_fd_readable(pipe_fd.fd())) { if (!select_wrapper_t::poll_fd_readable(pipe_fd.fd())) {
// No longer readable, no longer polling. set_state(waiting_for_readable);
FLOG(uvar_notifier, "exiting polling mode");
polling_due_to_readable_fd = false;
drain_if_still_readable_time_usec = 0;
} else { } else {
// Still readable. If it's been readable for a long time, there is probably set_state(polling_during_readable);
// lingering data on the pipe.
if (get_time() >= drain_if_still_readable_time_usec) {
drain_excessive_data();
} }
} }
return check_pipe_id_changed(); return update_pipe_timestamps();
} }
}
DIE("Unreachable");
}
#else // this class isn't valid on this system #else // this class isn't valid on this system
public: public:
universal_notifier_named_pipe_t(const wchar_t *test_path) { universal_notifier_named_pipe_t(const wchar_t *test_path) {