Fill io_buffer via background thread

This is a large change to how io_buffers are filled. The essential problem
comes about with code like (example):

    echo ( /bin/pwd )

The output of /bin/pwd must go to fish, not the tty. To arrange for this,
fish does the following:

1. Invoke pipe() to create a pipe.
2. Add an io_bufferfill_t redirection that owns the write end of the pipe.
3. After fork (or equiv), call dup2() to replace pwd's stdout with this  pipe.

Now when /bin/pwd writes, it will send output to the read end of the pipe.
But who reads it?

Prior to this fix, fish would do the following in a loop:

1. select() on the pipe with a 10 msec timeout
2. waitpid(WNOHANG) on the pwd proc

This polling is ugly and confusing and is what is replaced here.

With this new change, fish now reads from the pipe via a background thread:

1. Spawn a background pthread, which select()s on the pipe's read end with
a long (100 msec) timeout.
2. In the foreground, waitpid() (allowing hanging) on the pwd proc.

The big win here is a major simplification of job_t::continue_job() since
it no longer has to worry about filling buffers. This will make things
easier for concurrent execution.

It may not be obvious why the background thread still needs a poll (100 msec).
The answer is for cases where the write end of the fd escapes, in particular
background processes invoked inside command substitutions. psub is perhaps
the only important case of this (other shells typically just hang here).
This commit is contained in:
ridiculousfish 2019-02-01 01:58:06 -08:00
parent 6e0dd06f43
commit 9a4153f5e2
5 changed files with 167 additions and 174 deletions

View file

@ -782,12 +782,8 @@ static bool exec_block_or_func_process(parser_t &parser, std::shared_ptr<job_t>
// Remove our write pipe and forget it. This may close the pipe, unless another thread has // Remove our write pipe and forget it. This may close the pipe, unless another thread has
// claimed it (background write) or another process has inherited it. // claimed it (background write) or another process has inherited it.
auto block_output_buffer = block_output_bufferfill->buffer();
io_chain.remove(block_output_bufferfill); io_chain.remove(block_output_bufferfill);
block_output_bufferfill.reset(); auto block_output_buffer = io_bufferfill_t::finish(std::move(block_output_bufferfill));
// Make the buffer populate itself from whatever was written to the write pipe.
block_output_buffer->read_to_wouldblock();
// Resolve our IO chain to a sequence of dup2s. // Resolve our IO chain to a sequence of dup2s.
auto dup2s = dup2_list_t::resolve_chain(io_chain); auto dup2s = dup2_list_t::resolve_chain(io_chain);
@ -969,7 +965,7 @@ bool exec_job(parser_t &parser, shared_ptr<job_t> j) {
if ((io->io_mode == io_mode_t::bufferfill)) { if ((io->io_mode == io_mode_t::bufferfill)) {
// The read limit is dictated by the last bufferfill. // The read limit is dictated by the last bufferfill.
const auto *bf = static_cast<io_bufferfill_t *>(io.get()); const auto *bf = static_cast<io_bufferfill_t *>(io.get());
stdout_read_limit = bf->buffer()->buffer().limit(); stdout_read_limit = bf->buffer()->read_limit();
} }
} }
@ -1040,9 +1036,7 @@ static int exec_subshell_internal(const wcstring &cmd, parser_t &parser, wcstrin
if (parser.eval(cmd, io_chain_t{bufferfill}, SUBST) == 0) { if (parser.eval(cmd, io_chain_t{bufferfill}, SUBST) == 0) {
subcommand_status = proc_get_last_status(); subcommand_status = proc_get_last_status();
} }
buffer = bufferfill->buffer(); buffer = io_bufferfill_t::finish(std::move(bufferfill));
bufferfill.reset();
buffer->read_to_wouldblock();
} }
if (buffer && buffer->buffer().discarded()) subcommand_status = STATUS_READ_TOO_MUCH; if (buffer && buffer->buffer().discarded()) subcommand_status = STATUS_READ_TOO_MUCH;

View file

@ -943,9 +943,7 @@ static void test_1_cancellation(const wchar_t *src) {
pthread_kill(thread, SIGINT); pthread_kill(thread, SIGINT);
}); });
parser_t::principal_parser().eval(src, io_chain_t{filler}, TOP); parser_t::principal_parser().eval(src, io_chain_t{filler}, TOP);
auto buffer = filler->buffer(); auto buffer = io_bufferfill_t::finish(std::move(filler));
filler.reset();
buffer->read_to_wouldblock();
if (buffer->buffer().size() != 0) { if (buffer->buffer().size() != 0) {
err(L"Expected 0 bytes in out_buff, but instead found %lu bytes\n", err(L"Expected 0 bytes in out_buff, but instead found %lu bytes\n",
buffer->buffer().size()); buffer->buffer().size());

View file

@ -11,6 +11,7 @@
#include "exec.h" #include "exec.h"
#include "fallback.h" // IWYU pragma: keep #include "fallback.h" // IWYU pragma: keep
#include "io.h" #include "io.h"
#include "iothread.h"
#include "wutil.h" // IWYU pragma: keep #include "wutil.h" // IWYU pragma: keep
io_data_t::~io_data_t() = default; io_data_t::~io_data_t() = default;
@ -28,6 +29,7 @@ void io_pipe_t::print() const {
void io_bufferfill_t::print() const { fwprintf(stderr, L"bufferfill {%d}\n", write_fd_.fd()); } void io_bufferfill_t::print() const { fwprintf(stderr, L"bufferfill {%d}\n", write_fd_.fd()); }
void io_buffer_t::append_from_stream(const output_stream_t &stream) { void io_buffer_t::append_from_stream(const output_stream_t &stream) {
scoped_lock locker(append_lock_);
if (buffer_.discarded()) return; if (buffer_.discarded()) return;
if (stream.buffer().discarded()) { if (stream.buffer().discarded()) {
buffer_.set_discard(); buffer_.set_discard();
@ -36,54 +38,102 @@ void io_buffer_t::append_from_stream(const output_stream_t &stream) {
buffer_.append_wide_buffer(stream.buffer()); buffer_.append_wide_buffer(stream.buffer());
} }
long io_buffer_t::read_some() { void io_buffer_t::run_background_fillthread(autoclose_fd_t readfd) {
int fd = read_.fd(); // Here we are running the background fillthread, executing in a background thread.
assert(fd >= 0 && "Should have a valid fd"); // Our plan is:
debug(4, L"io_buffer_t::read: blocking read on fd %d", fd); // 1. poll via select() until the fd is readable.
long len; // 2. Acquire the append lock.
char b[4096]; // 3. read until EAGAIN (would block), appending
do { // 4. release the lock
len = read(fd, b, sizeof b); // The purpose of holding the lock around the read calls is to ensure that data from background
} while (len < 0 && errno == EINTR); // processes isn't weirdly interspersed with data directly transferred (from a builtin to a buffer).
if (len > 0) {
buffer_.append(&b[0], &b[len]);
}
return len;
}
void io_buffer_t::read_to_wouldblock() { const int fd = readfd.fd();
long len;
do {
len = read_some();
} while (len > 0);
if (len < 0 && errno != EAGAIN) {
debug(1, _(L"An error occured while reading output from code block on fd %d"), read_.fd());
wperror(L"io_buffer_t::read");
}
}
bool io_buffer_t::try_read(unsigned long timeout_usec) { // 100 msec poll rate. Note that in most cases, the write end of the pipe will be closed so
struct timeval timeout; // select() will return; the polling is important only for weird cases like a background process
timeout.tv_sec = 0; // launched in a command substitution.
timeout.tv_usec = timeout_usec; const long poll_timeout_usec = 100000;
int fd = read_.fd(); struct timeval tv = {};
assert(fd >= 0 && "Should have a valid fd"); tv.tv_usec = poll_timeout_usec;
bool shutdown = false;
while (!shutdown) {
bool readable = false;
// Check the shutdown flag.
shutdown |= this->shutdown_fillthread_.load(std::memory_order_relaxed);
// Poll if our fd is readable.
// Do this even if the shutdown flag is set. It's important we wait for the fd at least
// once. For short-lived processes, it's possible for the process to execute, produce output
// (fits in the pipe buffer) and be reaped before we are even scheduled. So always wait at
// least once on the fd. Note that doesn't mean we will wait for the full poll duration;
// typically what will happen is our pipe will be widowed and so this will return quickly.
// It's only for weird cases (e.g. a background process launched inside a command
// substitution) that we'll wait out the entire poll time.
fd_set fds; fd_set fds;
FD_ZERO(&fds); FD_ZERO(&fds);
FD_SET(fd, &fds); FD_SET(fd, &fds);
int ret = select(fd + 1, &fds, nullptr, nullptr, &timeout); int ret = select(fd + 1, &fds, NULL, NULL, &tv);
if (ret < 0) { readable = ret > 0;
// Treat EINTR as timeout. if (ret < 0 && errno != EINTR) {
if (errno != EINTR) { // Surprising error.
debug(1, _(L"An error occured inside select on fd %d"), fd); wperror(L"select");
wperror(L"io_buffer_t::try_read"); return;
}
return false;
} }
if (readable || shutdown) {
// Now either our fd is readable, or we have set the shutdown flag.
// Either way acquire the lock and read until we reach EOF, or EAGAIN / EINTR.
scoped_lock locker(append_lock_);
ssize_t ret;
do {
char buff[4096];
ret = read(fd, buff, sizeof buff);
if (ret > 0) { if (ret > 0) {
read_some(); buffer_.append(&buff[0], &buff[ret]);
} else if (ret == 0) {
shutdown = true;
} else if (errno != EINTR && errno != EAGAIN) {
wperror(L"read");
return;
} }
return ret > 0; } while (ret > 0);
}
}
assert(shutdown && "Should only exit loop if shutdown flag is set");
}
void io_buffer_t::begin_background_fillthread(autoclose_fd_t fd) {
ASSERT_IS_MAIN_THREAD();
assert(!fillthread_ && "Already have a fillthread");
// We want our background thread to own the fd but it's not easy to move into a std::function.
// Use a shared_ptr.
auto fdref = std::make_shared<autoclose_fd_t>(std::move(fd));
// Our function to read until the receiver is closed.
// It's OK to capture 'this' by value because 'this' owns the background thread and joins it
// before dtor.
std::function<void(void)> func = [this, fdref]() {
this->run_background_fillthread(std::move(*fdref));
};
pthread_t fillthread{};
if (!make_pthread(&fillthread, std::move(func))) {
wperror(L"make_pthread");
}
fillthread_ = fillthread;
}
void io_buffer_t::complete_background_fillthread() {
ASSERT_IS_MAIN_THREAD();
assert(fillthread_ && "Should have a fillthread");
shutdown_fillthread_.store(true, std::memory_order_relaxed);
void *ignored = nullptr;
int err = pthread_join(*fillthread_, &ignored);
DIE_ON_FAILURE(err);
fillthread_.reset();
} }
shared_ptr<io_bufferfill_t> io_bufferfill_t::create(const io_chain_t &conflicts, shared_ptr<io_bufferfill_t> io_bufferfill_t::create(const io_chain_t &conflicts,
@ -93,27 +143,39 @@ shared_ptr<io_bufferfill_t> io_bufferfill_t::create(const io_chain_t &conflicts,
if (!pipes) { if (!pipes) {
return nullptr; return nullptr;
} }
// Our buffer will read from the read end of the pipe. This end must be non-blocking. This is // Our buffer will read from the read end of the pipe. This end must be non-blocking. This is
// because we retain the write end of the pipe in this process (even after handing it off to a // because our fillthread needs to poll to decide if it should shut down, and also accept input
// child process); therefore a read on the pipe may block forever. What we should do is arrange // from direct buffer transfers.
// for the write end of the pipe to be closed at the right time; then the read could just block.
if (make_fd_nonblocking(pipes->read.fd())) { if (make_fd_nonblocking(pipes->read.fd())) {
debug(1, PIPE_ERROR); debug(1, PIPE_ERROR);
wperror(L"fcntl"); wperror(L"fcntl");
return nullptr; return nullptr;
} }
// Our fillthread gets the read end of the pipe; out_pipe gets the write end.
// Our buffer gets the read end of the pipe; out_pipe gets the write end. auto buffer = std::make_shared<io_buffer_t>(buffer_limit);
auto buffer = std::make_shared<io_buffer_t>(std::move(pipes->read), buffer_limit); buffer->begin_background_fillthread(std::move(pipes->read));
return std::make_shared<io_bufferfill_t>(std::move(pipes->write), buffer); return std::make_shared<io_bufferfill_t>(std::move(pipes->write), buffer);
} }
std::shared_ptr<io_buffer_t> io_bufferfill_t::finish(std::shared_ptr<io_bufferfill_t> &&filler) {
// The io filler is passed in. This typically holds the only instance of the write side of the
// pipe used by the buffer's fillthread (except for that side held by other processes). Get the
// buffer out of the bufferfill and clear the shared_ptr; this will typically widow the pipe.
// Then allow the buffer to finish.
assert(filler && "Null pointer in finish");
auto buffer = filler->buffer();
filler.reset();
buffer->complete_background_fillthread();
return buffer;
}
io_pipe_t::~io_pipe_t() = default; io_pipe_t::~io_pipe_t() = default;
io_bufferfill_t::~io_bufferfill_t() = default; io_bufferfill_t::~io_bufferfill_t() = default;
io_buffer_t::~io_buffer_t() = default; io_buffer_t::~io_buffer_t() {
assert(! fillthread_ && "io_buffer_t destroyed with outstanding fillthread");
}
void io_chain_t::remove(const shared_ptr<const io_data_t> &element) { void io_chain_t::remove(const shared_ptr<const io_data_t> &element) {
// See if you can guess why std::find doesn't work here. // See if you can guess why std::find doesn't work here.

View file

@ -1,26 +1,21 @@
#ifndef FISH_IO_H #ifndef FISH_IO_H
#define FISH_IO_H #define FISH_IO_H
#include <pthread.h>
#include <stdarg.h> #include <stdarg.h>
#include <stddef.h> #include <stddef.h>
#include <stdlib.h> #include <stdlib.h>
#include <vector> #include <atomic>
// Note that we have to include something to get any _LIBCPP_VERSION defined so we can detect libc++
// So it's key that vector go above. If we didn't need vector for other reasons, we might include
// ciso646, which does nothing
#if defined(_LIBCPP_VERSION) || __cplusplus > 199711L
// C++11 or libc++ (which is a C++11-only library, but the memory header works OK in C++03)
#include <memory> #include <memory>
using std::shared_ptr; #include <mutex>
#else #include <vector>
// C++03 or libstdc++
#include <tr1/memory>
using std::tr1::shared_ptr;
#endif
#include "common.h" #include "common.h"
#include "env.h" #include "env.h"
#include "maybe.h"
using std::shared_ptr;
/// separated_buffer_t is composed of a sequence of elements, some of which may be explicitly /// separated_buffer_t is composed of a sequence of elements, some of which may be explicitly
/// separated (e.g. through string spit0) and some of which the separation is inferred. This enum /// separated (e.g. through string spit0) and some of which the separation is inferred. This enum
@ -244,6 +239,8 @@ class io_bufferfill_t : public io_data_t {
public: public:
void print() const override; void print() const override;
// The ctor is public to support make_shared() in the static create function below.
// Do not invoke this directly.
io_bufferfill_t(autoclose_fd_t write_fd, std::shared_ptr<io_buffer_t> buffer) io_bufferfill_t(autoclose_fd_t write_fd, std::shared_ptr<io_buffer_t> buffer)
: io_data_t(io_mode_t::bufferfill, STDOUT_FILENO), : io_data_t(io_mode_t::bufferfill, STDOUT_FILENO),
write_fd_(std::move(write_fd)), write_fd_(std::move(write_fd)),
@ -253,14 +250,19 @@ class io_bufferfill_t : public io_data_t {
std::shared_ptr<io_buffer_t> buffer() const { return buffer_; } std::shared_ptr<io_buffer_t> buffer() const { return buffer_; }
/// \return the fd that, when written to, fills the buffer.
int write_fd() const { return write_fd_.fd(); } int write_fd() const { return write_fd_.fd(); }
/// Create an io_bufferfill_t which, when written from, populates a buffer (also created). /// Create an io_bufferfill_t which, when written from, fills a buffer with the contents.
/// \returns nullptr on failure, e.g. too many open fds. /// \returns nullptr on failure, e.g. too many open fds.
/// ///
/// \param conflicts A set of IO redirections. The function ensures that any pipe it makes does /// \param conflicts A set of IO redirections. The function ensures that any pipe it makes does
/// not conflict with an fd redirection in this list. /// not conflict with an fd redirection in this list.
static shared_ptr<io_bufferfill_t> create(const io_chain_t &conflicts, size_t buffer_limit = 0); static shared_ptr<io_bufferfill_t> create(const io_chain_t &conflicts, size_t buffer_limit = 0);
/// Reset the receiver (possibly closing the write end of the pipe), and complete the fillthread
/// of the buffer. \return the buffer.
static std::shared_ptr<io_buffer_t> finish(std::shared_ptr<io_bufferfill_t> &&filler);
}; };
class output_stream_t; class output_stream_t;
@ -269,19 +271,34 @@ class output_stream_t;
/// It is not an io_data_t. /// It is not an io_data_t.
class io_buffer_t { class io_buffer_t {
private: private:
/// fd from which to read. friend io_bufferfill_t;
autoclose_fd_t read_;
/// Buffer storing what we have read. /// Buffer storing what we have read.
separated_buffer_t<std::string> buffer_; separated_buffer_t<std::string> buffer_;
/// Read some. Append it to our buffer. /// Atomic flag indicating our fillthread should shut down.
/// \return positive if we read, 0 on EOF, -1 on error. std::atomic<bool> shutdown_fillthread_;
long read_some();
/// The background fillthread itself, if any.
maybe_t<pthread_t> fillthread_{};
/// Read limit of the buffer.
const size_t read_limit_;
/// Lock for appending.
std::mutex append_lock_{};
/// Called in the background thread to run it.
void run_background_fillthread(autoclose_fd_t readfd);
/// Begin the background fillthread operation, reading from the given fd.
void begin_background_fillthread(autoclose_fd_t readfd);
/// End the background fillthread operation.
void complete_background_fillthread();
public: public:
explicit io_buffer_t(autoclose_fd_t read, size_t limit) explicit io_buffer_t(size_t limit) : buffer_(limit), read_limit_(limit) {
: read_(std::move(read)), buffer_(limit) {
// Explicitly reset the discard flag because we share this buffer. // Explicitly reset the discard flag because we share this buffer.
buffer_.reset_discard(); buffer_.reset_discard();
} }
@ -289,17 +306,20 @@ class io_buffer_t {
~io_buffer_t(); ~io_buffer_t();
/// Access the underlying buffer. /// Access the underlying buffer.
const separated_buffer_t<std::string> &buffer() const { return buffer_; } /// This requires that the background fillthread be none.
const separated_buffer_t<std::string> &buffer() const {
assert(!fillthread_ && "Cannot access buffer during background fill");
return buffer_;
}
/// Function to append to the buffer. /// Function to append to the buffer.
void append(const char *ptr, size_t count) { buffer_.append(ptr, ptr + count); } void append(const char *ptr, size_t count) {
scoped_lock locker(append_lock_);
buffer_.append(ptr, ptr + count);
}
/// Read from input pipe until EOF or EAGAIN (i.e. would block). /// \return the read limit.
void read_to_wouldblock(); size_t read_limit() const { return read_limit_; }
/// Read a bit, if our fd is readable, with the given timeout.
/// \return true if we read some, false on timeout.
bool try_read(unsigned long timeout_usec);
/// Appends data from a given output_stream_t. /// Appends data from a given output_stream_t.
/// Marks the receiver as discarded if the stream was discarded. /// Marks the receiver as discarded if the stream was discarded.

View file

@ -836,27 +836,6 @@ void proc_update_jiffies() {
#endif #endif
/// The return value of select_try(), indicating IO readiness or an error
enum class block_receive_try_t {
/// There is no buffer to select on.
NO_BUFFER,
/// We have a block buffer, and we read some.
DATA_READ,
/// We have a block buffer, but we were unable to read any.
TIMEOUT,
};
/// \return the last IO buffer in job j, or nullptr if none.
std::shared_ptr<io_buffer_t> last_buffer(job_t *j) {
std::shared_ptr<io_buffer_t> buff{};
for (const auto &io : j->all_io_redirections()) {
if (io->io_mode == io_mode_t::bufferfill) {
buff = static_cast<io_bufferfill_t *>(io.get())->buffer();
}
}
return buff;
}
// Return control of the terminal to a job's process group. restore_attrs is true if we are restoring // Return control of the terminal to a job's process group. restore_attrs is true if we are restoring
// a previously-stopped job, in which case we need to restore terminal attributes. // a previously-stopped job, in which case we need to restore terminal attributes.
bool terminal_give_to_job(const job_t *j, bool restore_attrs) { bool terminal_give_to_job(const job_t *j, bool restore_attrs) {
@ -1038,7 +1017,6 @@ void job_t::continue_job(bool send_sigcont) {
} }
}); });
bool read_attempted = false;
if (!is_completed()) { if (!is_completed()) {
if (get_flag(job_flag_t::TERMINAL) && is_foreground()) { if (get_flag(job_flag_t::TERMINAL) && is_foreground()) {
// Put the job into the foreground and give it control of the terminal. // Put the job into the foreground and give it control of the terminal.
@ -1071,74 +1049,15 @@ void job_t::continue_job(bool send_sigcont) {
} }
if (is_foreground()) { if (is_foreground()) {
// This is an optimization to not call select_try() in case a process has exited. While // Wait for the status of our own job to change.
// it may seem silly, unless there is IO (and there usually isn't in terms of total CPU
// time), select_try() will wait for 10ms (our timeout) before returning. If during
// these 10ms a process exited, the shell will basically hang until the timeout happens
// and we are free to call `process_mark_finished_children()` to discover that fact. By
// calling it here before calling `select_try()` below, shell responsiveness can be
// dramatically improved (noticably so, not just "theoretically speaking" per the
// discussion in #5219).
process_mark_finished_children(false);
// If this is a child job and the parent job is still under construction (i.e. job1 |
// some_func), we can't block on execution of the nested job for `some_func`. Doing
// so can cause hangs if job1 emits more data than fits in the OS pipe buffer.
// The solution is to to not block on fg from the initial call in exec_job(), which
// is also the only place that send_sigcont is false. parent_job.is_constructed()
// must also be true, which coincides with WAIT_BY_PROCESS (which will have to do
// since we don't store a reference to the parent job in the job_t structure).
bool block_on_fg = send_sigcont && job_chain_is_fully_constructed();
// Wait for data to become available or the status of our own job to change
while (!reader_exit_forced() && !is_stopped() && !is_completed()) { while (!reader_exit_forced() && !is_stopped() && !is_completed()) {
read_attempted = true;
auto read_result = block_receive_try_t::NO_BUFFER;
if (auto buff = last_buffer(this)) {
const unsigned long SELECT_TIMEOUT_USEC = 10000;
bool did_read = buff->try_read(SELECT_TIMEOUT_USEC);
read_result = did_read ? block_receive_try_t::DATA_READ : block_receive_try_t::TIMEOUT;
}
switch (read_result) {
case block_receive_try_t::DATA_READ:
// We read some data.
process_mark_finished_children(false);
break;
case block_receive_try_t::TIMEOUT:
// We read some data or timed out. Poll for finished processes.
debug(4, L"select_try: no fds returned valid data within the timeout" );
process_mark_finished_children(block_on_fg);
break;
case block_receive_try_t::NO_BUFFER:
// We are not populating a buffer.
debug(4, L"select_try: no IO fds" );
process_mark_finished_children(true); process_mark_finished_children(true);
// If it turns out that we encountered this because the file descriptor we were
// reading from has died, process_mark_finished_children() should take care of
// changing the status of our is_completed() (assuming it is appropriate to do
// so), in which case we will break out of this loop.
break;
}
} }
} }
} }
if (is_foreground()) { if (is_foreground()) {
if (is_completed()) { if (is_completed()) {
// It's possible that the job will produce output and exit before we've even read from
// it. In that case, make sure we read that output now, before we've executed any
// subsequent calls. This is why prompt colors were getting screwed up - the builtin
// `echo` calls were sometimes having their output combined with the `set_color` calls
// in the wrong order!
if (!read_attempted) {
if (auto buff = last_buffer(this)) {
buff->read_to_wouldblock();
}
}
// Set $status only if we are in the foreground and the last process in the job has // Set $status only if we are in the foreground and the last process in the job has
// finished and is not a short-circuited builtin. // finished and is not a short-circuited builtin.
auto &p = processes.back(); auto &p = processes.back();