From e0bf23ad26266dee0871ceffdb4ffc1184415e50 Mon Sep 17 00:00:00 2001 From: ridiculousfish Date: Mon, 3 May 2021 18:12:34 -0700 Subject: [PATCH] Refactor the named pipe uvar notifier with a state machine This attempts to simplify the named pipe notifier by switching to a state machine model. --- src/env_universal_common.cpp | 248 +++++++++++++++++++++-------------- 1 file changed, 151 insertions(+), 97 deletions(-) diff --git a/src/env_universal_common.cpp b/src/env_universal_common.cpp index cdc6735d1..c1cfa24c7 100644 --- a/src/env_universal_common.cpp +++ b/src/env_universal_common.cpp @@ -1317,15 +1317,42 @@ static autoclose_fd_t make_fifo(const wchar_t *test_path, const wchar_t *suffix) // To post a notification, write some data to the pipe, wait a little while, and then read it back. // // To receive a notification, watch for the pipe to become readable. When it does, enter a polling -// mode until the pipe is no longer readable. To guard against the possibility of a shell exiting -// when there is data remaining in the pipe, if the pipe is kept readable too long, clients will -// attempt to read data out of it (to render it no longer readable). +// mode until the pipe is no longer readable, where we poll based on the modification date of the +// pipe. To guard against the possibility of a shell exiting when there is data remaining in the +// pipe, if the pipe is kept readable too long, clients will attempt to read data out of it (to +// render it no longer readable). class universal_notifier_named_pipe_t final : public universal_notifier_t { #if !defined(__CYGWIN__) + // We operate a state machine. + enum state_t{ + // The pipe is not yet readable. There is nothing to do in poll. + // If the pipe becomes readable we will enter the polling state. + waiting_for_readable, + + // The pipe is readable. In poll, check if the pipe is still readable, + // and whether its timestamp has changed. + polling_during_readable, + + // We have written to the pipe (so we expect it to be readable). + // We may read back from it in poll(). + waiting_to_drain, + }; + + // The state we are currently in. + state_t state{waiting_for_readable}; + + // When we entered that state, in microseconds since epoch. + long long state_start_usec{-1}; + + // The pipe itself; this is opened read/write. autoclose_fd_t pipe_fd; - file_id_t pipe_fd_id{}; - long long readback_time_usec{0}; - size_t readback_amount{0}; + + // The pipe's file ID containing the last modified timestamp. + file_id_t pipe_timestamps{}; + + // If we are in waiting_to_drain state, how much we have written and therefore are responsible + // for draining. + size_t drain_amount{0}; // We "flash" the pipe to make it briefly readable, for this many usec. static constexpr long long k_flash_duration_usec = 1e5; @@ -1333,10 +1360,28 @@ class universal_notifier_named_pipe_t final : public universal_notifier_t { // If the pipe remains readable for this many usec, we drain it. static constexpr long long k_readable_too_long_duration_usec = 5e6; - bool polling_due_to_readable_fd{false}; - long long drain_if_still_readable_time_usec{0}; + /// \return the name of a state. + static const char *state_name(state_t s) { + switch (s) { + case waiting_for_readable: + return "waiting"; + case polling_during_readable: + return "polling"; + case waiting_to_drain: + return "draining"; + } + DIE("Unreachable"); + } - void drain_excessive_data() const { + // Switch to a state (may or may not be new). + void set_state(state_t new_state) { + FLOGF(uvar_notifier, "changing from %s to %s", state_name(state), state_name(new_state)); + state = new_state; + state_start_usec = get_time(); + } + + // Called when the pipe has been readable for too long. + void drain_excess() const { // The pipe seems to have data on it, that won't go away. Read a big chunk out of it. We // don't read until it's exhausted, because if someone were to pipe say /dev/null, that // would cause us to hang! @@ -1345,16 +1390,26 @@ class universal_notifier_named_pipe_t final : public universal_notifier_t { ignore_result(read(pipe_fd.fd(), buff, sizeof buff)); } + // Called when we want to read back data we have written, to mark the pipe as non-readable. + void drain_written() { + while (this->drain_amount > 0) { + char buff[64]; + size_t amt = std::min(this->drain_amount, sizeof buff); + ignore_result(read(this->pipe_fd.fd(), buff, amt)); + this->drain_amount -= amt; + } + } + /// Check if the pipe's file ID (aka struct stat) is different from what we have stored. - /// If it has changed, it indicates that someone has written to the pipe; update our stored id. + /// If it has changed, it indicates that someone has modified the pipe; update our stored id. /// \return true if changed, false if not. - bool check_pipe_id_changed() { + bool update_pipe_timestamps() { if (!pipe_fd.valid()) return false; - file_id_t file_id = file_id_for_fd(pipe_fd.fd()); - if (file_id == this->pipe_fd_id) { + file_id_t timestamps = file_id_for_fd(pipe_fd.fd()); + if (timestamps == this->pipe_timestamps) { return false; } - this->pipe_fd_id = file_id; + this->pipe_timestamps = timestamps; return true; } @@ -1365,120 +1420,119 @@ class universal_notifier_named_pipe_t final : public universal_notifier_t { ~universal_notifier_named_pipe_t() override = default; int notification_fd() const override { - if (polling_due_to_readable_fd) { - // We are in polling mode because we think our fd is readable. This means that, if we - // return it to be select()'d on, we'll be called back immediately. So don't return it. - return -1; + if (!pipe_fd.valid()) return -1; + // If we are waiting for the pipe to be readable, return it for select. + // Otherwise we expect it to be readable already; return invalid. + switch (state) { + case waiting_for_readable: + return pipe_fd.fd(); + case polling_during_readable: + case waiting_to_drain: + return -1; } - // We are not in polling mode. Return the fd so it can be watched. - return pipe_fd.fd(); + DIE("unreachable"); } bool notification_fd_became_readable(int fd) override { - // Our fd is readable. We deliberately do not read anything out of it: if we did, other - // sessions may miss the notification. Instead, we go into "polling mode:" we do not - // select() on our fd for a while, and sync periodically until the fd is no longer readable. - // However, if we are the one who posted the notification, we don't sync (until we clean - // up!) + assert(fd == pipe_fd.fd() && "Wrong fd"); UNUSED(fd); - bool should_sync = false; - if (readback_time_usec == 0) { - FLOG(uvar_notifier, "entering polling mode"); - polling_due_to_readable_fd = true; - drain_if_still_readable_time_usec = get_time() + k_readable_too_long_duration_usec; - should_sync = true; - // Update our pipe's timestamps. - check_pipe_id_changed(); + switch (state) { + case waiting_for_readable: + // We are now readable. + // Grab the timestamp and return true indicating that we received a notification. + set_state(polling_during_readable); + update_pipe_timestamps(); + return true; + + case polling_during_readable: + case waiting_to_drain: + // We did not return an fd to wait on, so should not be called. + DIE("should not be called in this state"); } - return should_sync; + DIE("unreachable"); } void post_notification() override { if (!pipe_fd.valid()) return; // We need to write some data (any data) to the pipe, then wait for a while, then read // it back. Nobody is expected to read it except us. - FLOG(uvar_notifier, "posting notification by writing to pipe"); + FLOGF(uvar_notifier, "writing to pipe (written %lu)", (unsigned long)drain_amount); char c[1] = {'\0'}; ssize_t amt_written = write(pipe_fd.fd(), c, sizeof c); if (amt_written < 0 && (errno == EWOULDBLOCK || errno == EAGAIN)) { - // Very unsual: the pipe is full! - drain_excessive_data(); + // Very unsual: the pipe is full! Try to read some and repeat once. + drain_excess(); + amt_written = write(pipe_fd.fd(), c, sizeof c); + if (amt_written < 0) { + FLOG(uvar_notifier, "pipe could not be drained, skipping notification"); + return; + } + FLOG(uvar_notifier, "pipe drained"); } - - // Now schedule a read for some time in the future. - this->readback_time_usec = get_time() + k_flash_duration_usec; - this->readback_amount += sizeof c; - - // No need to react to our own changes. - check_pipe_id_changed(); + assert(amt_written >= 0 && "Amount should not be negative"); + this->drain_amount += amt_written; + // We unconditionally set our state to waiting to drain. + set_state(waiting_to_drain); + update_pipe_timestamps(); } unsigned long usec_delay_between_polls() const override { - unsigned long readback_delay = ULONG_MAX; - if (this->readback_time_usec > 0) { - // How long until the readback? - long long now = get_time(); - if (now >= this->readback_time_usec) { - // Oops, it already passed! Return something tiny. - readback_delay = 1000; - } else { - readback_delay = static_cast(this->readback_time_usec - now); - } - } + if (!pipe_fd.valid()) return 0; + switch (state) { + case waiting_for_readable: + // No polling necessary until it becomes readable. + return 0; - unsigned long polling_delay = ULONG_MAX; - if (polling_due_to_readable_fd) { - // We're in polling mode. Don't return a value less than our polling interval. - polling_delay = k_flash_duration_usec; + case polling_during_readable: + case waiting_to_drain: + return k_flash_duration_usec; } - - // Now return the smaller of the two values. If we get ULONG_MAX, it means there's no more - // need to poll; in that case return 0. - unsigned long result = std::min(readback_delay, polling_delay); - if (result == ULONG_MAX) { - result = 0; - } - return result; + DIE("Unreachable"); } bool poll() override { if (!pipe_fd.valid()) return false; + switch (state) { + case waiting_for_readable: + // Nothing to do until the fd is readable. + return false; - // Check if we are past the readback time. - if (this->readback_time_usec > 0 && get_time() >= this->readback_time_usec) { - FLOGF(uvar_notifier, "reading back %lu bytes we wrote to pipe", this->readback_amount); - // Read back what we wrote. We do nothing with the value. - while (this->readback_amount > 0) { - char buff[64]; - size_t amt_to_read = std::min(this->readback_amount, sizeof(buff)); - ignore_result(read(this->pipe_fd.fd(), buff, amt_to_read)); - this->readback_amount -= amt_to_read; + case polling_during_readable: { + // If we're no longer readable, go back to wait mode. + // Conversely, if we have been readable too long, perhaps some fish died while its + // written data was still on the pipe; drain some. + if (!select_wrapper_t::poll_fd_readable(pipe_fd.fd())) { + set_state(waiting_for_readable); + } else if (get_time() >= state_start_usec + k_readable_too_long_duration_usec) { + drain_excess(); + } + // Sync if the pipe's timestamp is different, meaning someone modified the pipe + // since we last saw it. + if (update_pipe_timestamps()) { + FLOG(uvar_notifier, "pipe changed, will sync uvars"); + return true; + } + return false; } - assert(this->readback_amount == 0); - this->readback_time_usec = 0; - } - // Check to see if we are doing readability polling. - if (!polling_due_to_readable_fd) { - return false; - } - - // We are polling, so we are definitely going to sync. - // See if this is still readable. - if (!select_wrapper_t::poll_fd_readable(pipe_fd.fd())) { - // No longer readable, no longer polling. - FLOG(uvar_notifier, "exiting polling mode"); - polling_due_to_readable_fd = false; - drain_if_still_readable_time_usec = 0; - } else { - // Still readable. If it's been readable for a long time, there is probably - // lingering data on the pipe. - if (get_time() >= drain_if_still_readable_time_usec) { - drain_excessive_data(); + case waiting_to_drain: { + // We wrote data to the pipe. Maybe read it back. + // If we are still readable, then there is still data on the pipe; maybe another + // change occurred with ours. + if (get_time() >= state_start_usec + k_flash_duration_usec) { + drain_written(); + if (!select_wrapper_t::poll_fd_readable(pipe_fd.fd())) { + set_state(waiting_for_readable); + } else { + set_state(polling_during_readable); + } + } + return update_pipe_timestamps(); } } - return check_pipe_id_changed(); + DIE("Unreachable"); } + #else // this class isn't valid on this system public: universal_notifier_named_pipe_t(const wchar_t *test_path) {