diff --git a/src/exec.cpp b/src/exec.cpp index 7de91f5be..eb1a88259 100644 --- a/src/exec.cpp +++ b/src/exec.cpp @@ -782,12 +782,8 @@ static bool exec_block_or_func_process(parser_t &parser, std::shared_ptr // Remove our write pipe and forget it. This may close the pipe, unless another thread has // claimed it (background write) or another process has inherited it. - auto block_output_buffer = block_output_bufferfill->buffer(); io_chain.remove(block_output_bufferfill); - block_output_bufferfill.reset(); - - // Make the buffer populate itself from whatever was written to the write pipe. - block_output_buffer->read_to_wouldblock(); + auto block_output_buffer = io_bufferfill_t::finish(std::move(block_output_bufferfill)); // Resolve our IO chain to a sequence of dup2s. auto dup2s = dup2_list_t::resolve_chain(io_chain); @@ -969,7 +965,7 @@ bool exec_job(parser_t &parser, shared_ptr j) { if ((io->io_mode == io_mode_t::bufferfill)) { // The read limit is dictated by the last bufferfill. const auto *bf = static_cast(io.get()); - stdout_read_limit = bf->buffer()->buffer().limit(); + stdout_read_limit = bf->buffer()->read_limit(); } } @@ -1040,9 +1036,7 @@ static int exec_subshell_internal(const wcstring &cmd, parser_t &parser, wcstrin if (parser.eval(cmd, io_chain_t{bufferfill}, SUBST) == 0) { subcommand_status = proc_get_last_status(); } - buffer = bufferfill->buffer(); - bufferfill.reset(); - buffer->read_to_wouldblock(); + buffer = io_bufferfill_t::finish(std::move(bufferfill)); } if (buffer && buffer->buffer().discarded()) subcommand_status = STATUS_READ_TOO_MUCH; diff --git a/src/fish_tests.cpp b/src/fish_tests.cpp index e45f22e34..cecb566f3 100644 --- a/src/fish_tests.cpp +++ b/src/fish_tests.cpp @@ -943,9 +943,7 @@ static void test_1_cancellation(const wchar_t *src) { pthread_kill(thread, SIGINT); }); parser_t::principal_parser().eval(src, io_chain_t{filler}, TOP); - auto buffer = filler->buffer(); - filler.reset(); - buffer->read_to_wouldblock(); + auto buffer = io_bufferfill_t::finish(std::move(filler)); if (buffer->buffer().size() != 0) { err(L"Expected 0 bytes in out_buff, but instead found %lu bytes\n", buffer->buffer().size()); diff --git a/src/io.cpp b/src/io.cpp index 074b83159..a8246504c 100644 --- a/src/io.cpp +++ b/src/io.cpp @@ -11,6 +11,7 @@ #include "exec.h" #include "fallback.h" // IWYU pragma: keep #include "io.h" +#include "iothread.h" #include "wutil.h" // IWYU pragma: keep io_data_t::~io_data_t() = default; @@ -28,6 +29,7 @@ void io_pipe_t::print() const { void io_bufferfill_t::print() const { fwprintf(stderr, L"bufferfill {%d}\n", write_fd_.fd()); } void io_buffer_t::append_from_stream(const output_stream_t &stream) { + scoped_lock locker(append_lock_); if (buffer_.discarded()) return; if (stream.buffer().discarded()) { buffer_.set_discard(); @@ -36,54 +38,102 @@ void io_buffer_t::append_from_stream(const output_stream_t &stream) { buffer_.append_wide_buffer(stream.buffer()); } -long io_buffer_t::read_some() { - int fd = read_.fd(); - assert(fd >= 0 && "Should have a valid fd"); - debug(4, L"io_buffer_t::read: blocking read on fd %d", fd); - long len; - char b[4096]; - do { - len = read(fd, b, sizeof b); - } while (len < 0 && errno == EINTR); - if (len > 0) { - buffer_.append(&b[0], &b[len]); - } - return len; -} +void io_buffer_t::run_background_fillthread(autoclose_fd_t readfd) { + // Here we are running the background fillthread, executing in a background thread. + // Our plan is: + // 1. poll via select() until the fd is readable. + // 2. Acquire the append lock. + // 3. read until EAGAIN (would block), appending + // 4. release the lock + // The purpose of holding the lock around the read calls is to ensure that data from background + // processes isn't weirdly interspersed with data directly transferred (from a builtin to a buffer). -void io_buffer_t::read_to_wouldblock() { - long len; - do { - len = read_some(); - } while (len > 0); - if (len < 0 && errno != EAGAIN) { - debug(1, _(L"An error occured while reading output from code block on fd %d"), read_.fd()); - wperror(L"io_buffer_t::read"); - } -} + const int fd = readfd.fd(); -bool io_buffer_t::try_read(unsigned long timeout_usec) { - struct timeval timeout; - timeout.tv_sec = 0; - timeout.tv_usec = timeout_usec; - int fd = read_.fd(); - assert(fd >= 0 && "Should have a valid fd"); - fd_set fds; - FD_ZERO(&fds); - FD_SET(fd, &fds); - int ret = select(fd + 1, &fds, nullptr, nullptr, &timeout); - if (ret < 0) { - // Treat EINTR as timeout. - if (errno != EINTR) { - debug(1, _(L"An error occured inside select on fd %d"), fd); - wperror(L"io_buffer_t::try_read"); + // 100 msec poll rate. Note that in most cases, the write end of the pipe will be closed so + // select() will return; the polling is important only for weird cases like a background process + // launched in a command substitution. + const long poll_timeout_usec = 100000; + struct timeval tv = {}; + tv.tv_usec = poll_timeout_usec; + + bool shutdown = false; + while (!shutdown) { + bool readable = false; + // Check the shutdown flag. + shutdown |= this->shutdown_fillthread_.load(std::memory_order_relaxed); + + // Poll if our fd is readable. + // Do this even if the shutdown flag is set. It's important we wait for the fd at least + // once. For short-lived processes, it's possible for the process to execute, produce output + // (fits in the pipe buffer) and be reaped before we are even scheduled. So always wait at + // least once on the fd. Note that doesn't mean we will wait for the full poll duration; + // typically what will happen is our pipe will be widowed and so this will return quickly. + // It's only for weird cases (e.g. a background process launched inside a command + // substitution) that we'll wait out the entire poll time. + fd_set fds; + FD_ZERO(&fds); + FD_SET(fd, &fds); + int ret = select(fd + 1, &fds, NULL, NULL, &tv); + readable = ret > 0; + if (ret < 0 && errno != EINTR) { + // Surprising error. + wperror(L"select"); + return; + } + + if (readable || shutdown) { + // Now either our fd is readable, or we have set the shutdown flag. + // Either way acquire the lock and read until we reach EOF, or EAGAIN / EINTR. + scoped_lock locker(append_lock_); + ssize_t ret; + do { + char buff[4096]; + ret = read(fd, buff, sizeof buff); + if (ret > 0) { + buffer_.append(&buff[0], &buff[ret]); + } else if (ret == 0) { + shutdown = true; + } else if (errno != EINTR && errno != EAGAIN) { + wperror(L"read"); + return; + } + } while (ret > 0); } - return false; } - if (ret > 0) { - read_some(); + assert(shutdown && "Should only exit loop if shutdown flag is set"); +} + +void io_buffer_t::begin_background_fillthread(autoclose_fd_t fd) { + ASSERT_IS_MAIN_THREAD(); + assert(!fillthread_ && "Already have a fillthread"); + + // We want our background thread to own the fd but it's not easy to move into a std::function. + // Use a shared_ptr. + auto fdref = std::make_shared(std::move(fd)); + + // Our function to read until the receiver is closed. + // It's OK to capture 'this' by value because 'this' owns the background thread and joins it + // before dtor. + std::function func = [this, fdref]() { + this->run_background_fillthread(std::move(*fdref)); + }; + + pthread_t fillthread{}; + if (!make_pthread(&fillthread, std::move(func))) { + wperror(L"make_pthread"); } - return ret > 0; + fillthread_ = fillthread; +} + +void io_buffer_t::complete_background_fillthread() { + ASSERT_IS_MAIN_THREAD(); + assert(fillthread_ && "Should have a fillthread"); + shutdown_fillthread_.store(true, std::memory_order_relaxed); + void *ignored = nullptr; + int err = pthread_join(*fillthread_, &ignored); + DIE_ON_FAILURE(err); + fillthread_.reset(); } shared_ptr io_bufferfill_t::create(const io_chain_t &conflicts, @@ -93,27 +143,39 @@ shared_ptr io_bufferfill_t::create(const io_chain_t &conflicts, if (!pipes) { return nullptr; } - // Our buffer will read from the read end of the pipe. This end must be non-blocking. This is - // because we retain the write end of the pipe in this process (even after handing it off to a - // child process); therefore a read on the pipe may block forever. What we should do is arrange - // for the write end of the pipe to be closed at the right time; then the read could just block. + // because our fillthread needs to poll to decide if it should shut down, and also accept input + // from direct buffer transfers. if (make_fd_nonblocking(pipes->read.fd())) { debug(1, PIPE_ERROR); wperror(L"fcntl"); return nullptr; } - - // Our buffer gets the read end of the pipe; out_pipe gets the write end. - auto buffer = std::make_shared(std::move(pipes->read), buffer_limit); + // Our fillthread gets the read end of the pipe; out_pipe gets the write end. + auto buffer = std::make_shared(buffer_limit); + buffer->begin_background_fillthread(std::move(pipes->read)); return std::make_shared(std::move(pipes->write), buffer); } +std::shared_ptr io_bufferfill_t::finish(std::shared_ptr &&filler) { + // The io filler is passed in. This typically holds the only instance of the write side of the + // pipe used by the buffer's fillthread (except for that side held by other processes). Get the + // buffer out of the bufferfill and clear the shared_ptr; this will typically widow the pipe. + // Then allow the buffer to finish. + assert(filler && "Null pointer in finish"); + auto buffer = filler->buffer(); + filler.reset(); + buffer->complete_background_fillthread(); + return buffer; +} + io_pipe_t::~io_pipe_t() = default; io_bufferfill_t::~io_bufferfill_t() = default; -io_buffer_t::~io_buffer_t() = default; +io_buffer_t::~io_buffer_t() { + assert(! fillthread_ && "io_buffer_t destroyed with outstanding fillthread"); +} void io_chain_t::remove(const shared_ptr &element) { // See if you can guess why std::find doesn't work here. diff --git a/src/io.h b/src/io.h index ce656cfec..5cffd0d21 100644 --- a/src/io.h +++ b/src/io.h @@ -1,26 +1,21 @@ #ifndef FISH_IO_H #define FISH_IO_H +#include #include #include #include -#include -// Note that we have to include something to get any _LIBCPP_VERSION defined so we can detect libc++ -// So it's key that vector go above. If we didn't need vector for other reasons, we might include -// ciso646, which does nothing -#if defined(_LIBCPP_VERSION) || __cplusplus > 199711L -// C++11 or libc++ (which is a C++11-only library, but the memory header works OK in C++03) +#include #include -using std::shared_ptr; -#else -// C++03 or libstdc++ -#include -using std::tr1::shared_ptr; -#endif +#include +#include #include "common.h" #include "env.h" +#include "maybe.h" + +using std::shared_ptr; /// separated_buffer_t is composed of a sequence of elements, some of which may be explicitly /// separated (e.g. through string spit0) and some of which the separation is inferred. This enum @@ -244,6 +239,8 @@ class io_bufferfill_t : public io_data_t { public: void print() const override; + // The ctor is public to support make_shared() in the static create function below. + // Do not invoke this directly. io_bufferfill_t(autoclose_fd_t write_fd, std::shared_ptr buffer) : io_data_t(io_mode_t::bufferfill, STDOUT_FILENO), write_fd_(std::move(write_fd)), @@ -253,14 +250,19 @@ class io_bufferfill_t : public io_data_t { std::shared_ptr buffer() const { return buffer_; } + /// \return the fd that, when written to, fills the buffer. int write_fd() const { return write_fd_.fd(); } - /// Create an io_bufferfill_t which, when written from, populates a buffer (also created). + /// Create an io_bufferfill_t which, when written from, fills a buffer with the contents. /// \returns nullptr on failure, e.g. too many open fds. /// /// \param conflicts A set of IO redirections. The function ensures that any pipe it makes does /// not conflict with an fd redirection in this list. static shared_ptr create(const io_chain_t &conflicts, size_t buffer_limit = 0); + + /// Reset the receiver (possibly closing the write end of the pipe), and complete the fillthread + /// of the buffer. \return the buffer. + static std::shared_ptr finish(std::shared_ptr &&filler); }; class output_stream_t; @@ -269,19 +271,34 @@ class output_stream_t; /// It is not an io_data_t. class io_buffer_t { private: - /// fd from which to read. - autoclose_fd_t read_; + friend io_bufferfill_t; /// Buffer storing what we have read. separated_buffer_t buffer_; - /// Read some. Append it to our buffer. - /// \return positive if we read, 0 on EOF, -1 on error. - long read_some(); + /// Atomic flag indicating our fillthread should shut down. + std::atomic shutdown_fillthread_; + + /// The background fillthread itself, if any. + maybe_t fillthread_{}; + + /// Read limit of the buffer. + const size_t read_limit_; + + /// Lock for appending. + std::mutex append_lock_{}; + + /// Called in the background thread to run it. + void run_background_fillthread(autoclose_fd_t readfd); + + /// Begin the background fillthread operation, reading from the given fd. + void begin_background_fillthread(autoclose_fd_t readfd); + + /// End the background fillthread operation. + void complete_background_fillthread(); public: - explicit io_buffer_t(autoclose_fd_t read, size_t limit) - : read_(std::move(read)), buffer_(limit) { + explicit io_buffer_t(size_t limit) : buffer_(limit), read_limit_(limit) { // Explicitly reset the discard flag because we share this buffer. buffer_.reset_discard(); } @@ -289,17 +306,20 @@ class io_buffer_t { ~io_buffer_t(); /// Access the underlying buffer. - const separated_buffer_t &buffer() const { return buffer_; } + /// This requires that the background fillthread be none. + const separated_buffer_t &buffer() const { + assert(!fillthread_ && "Cannot access buffer during background fill"); + return buffer_; + } /// Function to append to the buffer. - void append(const char *ptr, size_t count) { buffer_.append(ptr, ptr + count); } + void append(const char *ptr, size_t count) { + scoped_lock locker(append_lock_); + buffer_.append(ptr, ptr + count); + } - /// Read from input pipe until EOF or EAGAIN (i.e. would block). - void read_to_wouldblock(); - - /// Read a bit, if our fd is readable, with the given timeout. - /// \return true if we read some, false on timeout. - bool try_read(unsigned long timeout_usec); + /// \return the read limit. + size_t read_limit() const { return read_limit_; } /// Appends data from a given output_stream_t. /// Marks the receiver as discarded if the stream was discarded. diff --git a/src/proc.cpp b/src/proc.cpp index a58217f66..a440d1cda 100644 --- a/src/proc.cpp +++ b/src/proc.cpp @@ -836,27 +836,6 @@ void proc_update_jiffies() { #endif -/// The return value of select_try(), indicating IO readiness or an error -enum class block_receive_try_t { - /// There is no buffer to select on. - NO_BUFFER, - /// We have a block buffer, and we read some. - DATA_READ, - /// We have a block buffer, but we were unable to read any. - TIMEOUT, -}; - -/// \return the last IO buffer in job j, or nullptr if none. -std::shared_ptr last_buffer(job_t *j) { - std::shared_ptr buff{}; - for (const auto &io : j->all_io_redirections()) { - if (io->io_mode == io_mode_t::bufferfill) { - buff = static_cast(io.get())->buffer(); - } - } - return buff; -} - // Return control of the terminal to a job's process group. restore_attrs is true if we are restoring // a previously-stopped job, in which case we need to restore terminal attributes. bool terminal_give_to_job(const job_t *j, bool restore_attrs) { @@ -1038,7 +1017,6 @@ void job_t::continue_job(bool send_sigcont) { } }); - bool read_attempted = false; if (!is_completed()) { if (get_flag(job_flag_t::TERMINAL) && is_foreground()) { // Put the job into the foreground and give it control of the terminal. @@ -1071,74 +1049,15 @@ void job_t::continue_job(bool send_sigcont) { } if (is_foreground()) { - // This is an optimization to not call select_try() in case a process has exited. While - // it may seem silly, unless there is IO (and there usually isn't in terms of total CPU - // time), select_try() will wait for 10ms (our timeout) before returning. If during - // these 10ms a process exited, the shell will basically hang until the timeout happens - // and we are free to call `process_mark_finished_children()` to discover that fact. By - // calling it here before calling `select_try()` below, shell responsiveness can be - // dramatically improved (noticably so, not just "theoretically speaking" per the - // discussion in #5219). - process_mark_finished_children(false); - - // If this is a child job and the parent job is still under construction (i.e. job1 | - // some_func), we can't block on execution of the nested job for `some_func`. Doing - // so can cause hangs if job1 emits more data than fits in the OS pipe buffer. - // The solution is to to not block on fg from the initial call in exec_job(), which - // is also the only place that send_sigcont is false. parent_job.is_constructed() - // must also be true, which coincides with WAIT_BY_PROCESS (which will have to do - // since we don't store a reference to the parent job in the job_t structure). - bool block_on_fg = send_sigcont && job_chain_is_fully_constructed(); - - // Wait for data to become available or the status of our own job to change + // Wait for the status of our own job to change. while (!reader_exit_forced() && !is_stopped() && !is_completed()) { - read_attempted = true; - auto read_result = block_receive_try_t::NO_BUFFER; - if (auto buff = last_buffer(this)) { - const unsigned long SELECT_TIMEOUT_USEC = 10000; - bool did_read = buff->try_read(SELECT_TIMEOUT_USEC); - read_result = did_read ? block_receive_try_t::DATA_READ : block_receive_try_t::TIMEOUT; - } - switch (read_result) { - case block_receive_try_t::DATA_READ: - // We read some data. - process_mark_finished_children(false); - break; - - case block_receive_try_t::TIMEOUT: - // We read some data or timed out. Poll for finished processes. - debug(4, L"select_try: no fds returned valid data within the timeout" ); - process_mark_finished_children(block_on_fg); - break; - - case block_receive_try_t::NO_BUFFER: - // We are not populating a buffer. - debug(4, L"select_try: no IO fds" ); - process_mark_finished_children(true); - - // If it turns out that we encountered this because the file descriptor we were - // reading from has died, process_mark_finished_children() should take care of - // changing the status of our is_completed() (assuming it is appropriate to do - // so), in which case we will break out of this loop. - break; - } + process_mark_finished_children(true); } } } if (is_foreground()) { if (is_completed()) { - // It's possible that the job will produce output and exit before we've even read from - // it. In that case, make sure we read that output now, before we've executed any - // subsequent calls. This is why prompt colors were getting screwed up - the builtin - // `echo` calls were sometimes having their output combined with the `set_color` calls - // in the wrong order! - if (!read_attempted) { - if (auto buff = last_buffer(this)) { - buff->read_to_wouldblock(); - } - } - // Set $status only if we are in the foreground and the last process in the job has // finished and is not a short-circuited builtin. auto &p = processes.back();