io_buffer_t becomes io_bufferfill_t

This makes some significant architectual improvements to io_pipe_t and
io_buffer_t.

Prior to this fix, io_buffer_t subclassed io_pipe_t. io_buffer_t is now
replaced with a class io_bufferfill_t, which does not subclass pipe.

io_pipe_t no longer remembers both fds. Instead it has an autoclose_fd_t,
so that the file descriptor ownership is clear.
This commit is contained in:
ridiculousfish 2019-01-31 16:05:42 -08:00
parent 7c256e7e51
commit 78bbcef356
8 changed files with 289 additions and 341 deletions

View file

@ -786,6 +786,9 @@ class autoclose_fd_t {
fd_ = fd;
}
// \return if this has a valid fd.
bool valid() const { return fd_ >= 0; }
autoclose_fd_t(const autoclose_fd_t &) = delete;
void operator=(const autoclose_fd_t &) = delete;
autoclose_fd_t(autoclose_fd_t &&rhs) : fd_(rhs.fd_) { rhs.fd_ = -1; }

View file

@ -81,25 +81,6 @@ void exec_close(int fd) {
}
}
int exec_pipe(int fd[2]) {
ASSERT_IS_MAIN_THREAD();
int res;
while ((res = pipe(fd))) {
if (errno != EINTR) {
return res; // caller will call wperror
}
}
debug(4, L"Created pipe using fds %d and %d", fd[0], fd[1]);
// Pipes ought to be cloexec. Pipes are dup2'd the corresponding fds; the resulting fds are not
// cloexec.
set_cloexec(fd[0]);
set_cloexec(fd[1]);
return res;
}
/// Returns true if the redirection is a file redirection to a file other than /dev/null.
static bool redirection_is_to_real_file(const io_data_t *io) {
bool result = false;
@ -246,8 +227,8 @@ static bool io_transmogrify(const io_chain_t &in_chain, io_chain_t *out_chain,
switch (in->io_mode) {
case io_mode_t::pipe:
case io_mode_t::bufferfill:
case io_mode_t::fd:
case io_mode_t::buffer:
case io_mode_t::close: {
// These redirections don't need transmogrification. They can be passed through.
out = in;
@ -424,12 +405,12 @@ static bool exec_internal_builtin_proc(parser_t &parser, const std::shared_ptr<j
const io_chain_t &proc_io_chain, io_streams_t &streams) {
assert(p->type == INTERNAL_BUILTIN && "Process must be a builtin");
int local_builtin_stdin = STDIN_FILENO;
bool close_stdin = false;
autoclose_fd_t locally_opened_stdin{};
// If this is the first process, check the io redirections and see where we should
// be reading from.
if (pipe_read) {
local_builtin_stdin = pipe_read->pipe_fd[0];
local_builtin_stdin = pipe_read->pipe_fd();
} else if (const auto in = proc_io_chain.get_io_for_fd(STDIN_FILENO)) {
switch (in->io_mode) {
case io_mode_t::fd: {
@ -448,20 +429,20 @@ static bool exec_internal_builtin_proc(parser_t &parser, const std::shared_ptr<j
}
case io_mode_t::pipe: {
const io_pipe_t *in_pipe = static_cast<const io_pipe_t *>(in.get());
local_builtin_stdin = in_pipe->pipe_fd[0];
if (in_pipe->fd == STDIN_FILENO) {
local_builtin_stdin = in_pipe->pipe_fd();
}
break;
}
case io_mode_t::file: {
// Do not set CLO_EXEC because child needs access.
const io_file_t *in_file = static_cast<const io_file_t *>(in.get());
local_builtin_stdin = open(in_file->filename_cstr, in_file->flags, OPEN_MASK);
if (local_builtin_stdin == -1) {
locally_opened_stdin =
autoclose_fd_t{open(in_file->filename_cstr, in_file->flags, OPEN_MASK)};
if (!locally_opened_stdin.valid()) {
debug(1, FILE_ERROR, in_file->filename_cstr);
wperror(L"open");
} else {
close_stdin = true;
}
local_builtin_stdin = locally_opened_stdin.fd();
break;
}
case io_mode_t::close: {
@ -517,10 +498,6 @@ static bool exec_internal_builtin_proc(parser_t &parser, const std::shared_ptr<j
// execution so as not to confuse some job-handling builtins.
j->set_flag(job_flag_t::FOREGROUND, fg);
// If stdin has been redirected, close the redirection stream.
if (close_stdin) {
exec_close(local_builtin_stdin);
}
return true; // "success"
}
@ -548,7 +525,11 @@ static bool handle_builtin_output(const std::shared_ptr<job_t> &j, process_t *p,
if (!must_fork && p->is_last_in_job) {
// We are handling reads directly in the main loop. Note that we may still end
// up forking.
const bool stdout_is_to_buffer = stdout_io && stdout_io->io_mode == io_mode_t::buffer;
const bool stdout_is_bufferfill =
(stdout_io && stdout_io->io_mode == io_mode_t::bufferfill);
const std::shared_ptr<io_buffer_t> stdout_buffer =
stdout_is_bufferfill ? static_cast<io_bufferfill_t *>(stdout_io.get())->buffer()
: nullptr;
const bool no_stdout_output = stdout_stream.empty();
const bool no_stderr_output = stderr_stream.empty();
const bool stdout_discarded = stdout_stream.buffer().discarded();
@ -558,7 +539,7 @@ static bool handle_builtin_output(const std::shared_ptr<job_t> &j, process_t *p,
// need to fork or even output anything.
debug(4, L"Skipping fork: no output for internal builtin '%ls'", p->argv0());
fork_was_skipped = true;
} else if (no_stderr_output && stdout_is_to_buffer) {
} else if (no_stderr_output && stdout_buffer) {
// The builtin produced no stderr, and its stdout is going to an
// internal buffer. There is no need to fork. This helps out the
// performance quite a bit in complex completion code.
@ -570,8 +551,7 @@ static bool handle_builtin_output(const std::shared_ptr<job_t> &j, process_t *p,
// also produce stderr.
debug(4, L"Skipping fork: buffered output for internal builtin '%ls'", p->argv0());
io_buffer_t *io_buffer = static_cast<io_buffer_t *>(stdout_io.get());
io_buffer->append_from_stream(stdout_stream);
stdout_buffer->append_from_stream(stdout_stream);
fork_was_skipped = true;
} else if (stdout_io.get() == NULL && stderr_io.get() == NULL) {
// We are writing to normal stdout and stderr. Just do it - no need to fork.
@ -749,20 +729,16 @@ static bool exec_block_or_func_process(parser_t &parser, std::shared_ptr<job_t>
"Unexpected process type");
// Create an output buffer if we're piping to another process.
shared_ptr<io_buffer_t> block_output_io_buffer{};
shared_ptr<io_bufferfill_t> block_output_bufferfill{};
if (!p->is_last_in_job) {
// Be careful to handle failure, e.g. too many open fds.
block_output_io_buffer = io_buffer_t::create(STDOUT_FILENO, user_ios);
if (!block_output_io_buffer) {
block_output_bufferfill = io_bufferfill_t::create(user_ios);
if (!block_output_bufferfill) {
job_mark_process_as_failed(j, p);
return false;
} else {
// This looks sketchy, because we're adding this io buffer locally - they
// aren't in the process or job redirection list. Therefore select_try won't
// be able to read them. However we call block_output_io_buffer->read()
// below, which reads until EOF. So there's no need to select on this.
io_chain.push_back(block_output_io_buffer);
}
// Teach the job about its bufferfill, and add it to our io chain.
io_chain.push_back(block_output_bufferfill);
}
if (p->type == INTERNAL_FUNCTION) {
@ -792,10 +768,8 @@ static bool exec_block_or_func_process(parser_t &parser, std::shared_ptr<job_t>
int status = proc_get_last_status();
// Handle output from a block or function. This usually means do nothing, but in the
// case of pipes, we have to buffer such io, since otherwise the internal pipe
// buffer might overflow.
if (!block_output_io_buffer.get()) {
// If we have a block output buffer, populate it now.
if (!block_output_bufferfill) {
// No buffer, so we exit directly. This means we have to manually set the exit
// status.
if (p->is_last_in_job) {
@ -804,11 +778,16 @@ static bool exec_block_or_func_process(parser_t &parser, std::shared_ptr<job_t>
p->completed = 1;
return true;
}
assert(block_output_bufferfill && "Must have a block output bufferfiller");
// Here we must have a non-NULL block_output_io_buffer.
assert(block_output_io_buffer.get() != NULL);
io_chain.remove(block_output_io_buffer);
block_output_io_buffer->read();
// Remove our write pipe and forget it. This may close the pipe, unless another thread has
// claimed it (background write) or another process has inherited it.
auto block_output_buffer = block_output_bufferfill->buffer();
io_chain.remove(block_output_bufferfill);
block_output_bufferfill.reset();
// Make the buffer populate itself from whatever was written to the write pipe.
block_output_buffer->read_to_wouldblock();
// Resolve our IO chain to a sequence of dup2s.
auto dup2s = dup2_list_t::resolve_chain(io_chain);
@ -816,7 +795,7 @@ static bool exec_block_or_func_process(parser_t &parser, std::shared_ptr<job_t>
return false;
}
const std::string buffer_contents = block_output_io_buffer->buffer().newline_serialized();
const std::string buffer_contents = block_output_buffer->buffer().newline_serialized();
const char *buffer = buffer_contents.data();
size_t count = buffer_contents.size();
if (count > 0) {
@ -824,7 +803,7 @@ static bool exec_block_or_func_process(parser_t &parser, std::shared_ptr<job_t>
const char *fork_reason =
p->type == INTERNAL_BLOCK_NODE ? "internal block io" : "internal function io";
if (!fork_child_for_process(j, p, *dup2s, false, fork_reason, [&] {
exec_write_and_exit(block_output_io_buffer->fd, buffer, count, status);
exec_write_and_exit(STDOUT_FILENO, buffer, count, status);
})) {
return false;
}
@ -844,19 +823,28 @@ static bool exec_process_in_job(parser_t &parser, process_t *p, std::shared_ptr<
autoclose_fd_t pipe_current_read,
autoclose_fd_t *out_pipe_next_read, const io_chain_t &all_ios,
size_t stdout_read_limit) {
// The IO chain for this process. It starts with the block IO, then pipes, and then gets any
// from the process.
io_chain_t process_net_io_chain = j->block_io_chain();
// The pipe this command will write to (if any).
shared_ptr<io_pipe_t> pipe_write;
// The pipe this command will read from (if any).
shared_ptr<io_pipe_t> pipe_read;
// See if we need a pipe.
// See if we need a pipe for the next command.
const bool pipes_to_next_command = !p->is_last_in_job;
if (pipes_to_next_command) {
// Construct our pipes.
auto local_pipes = make_autoclose_pipes(all_ios);
if (!local_pipes) {
debug(1, PIPE_ERROR);
wperror(L"pipe");
job_mark_process_as_failed(j, p);
return false;
}
// The write end of any pipe we create.
autoclose_fd_t pipe_current_write{};
pipe_write = std::make_shared<io_pipe_t>(p->pipe_write_fd, false /* not input */,
std::move(local_pipes->write));
*out_pipe_next_read = std::move(local_pipes->read);
}
// The pipes the current process write to and read from. Unfortunately these can't be just
// allocated on the stack, since j->io wants shared_ptr.
//
// The write pipe (destined for stdout) needs to occur before redirections. For example,
// with a redirection like this:
//
@ -884,12 +872,10 @@ static bool exec_process_in_job(parser_t &parser, process_t *p, std::shared_ptr<
//
// which depends on the redirection being evaluated before the pipe. So the write end of the
// pipe comes first, the read pipe of the pipe comes last. See issue #966.
shared_ptr<io_pipe_t> pipe_write;
shared_ptr<io_pipe_t> pipe_read;
// Write pipe goes first.
if (pipes_to_next_command) {
pipe_write.reset(new io_pipe_t(p->pipe_write_fd, false));
// The IO chain for this process.
io_chain_t process_net_io_chain = j->block_io_chain();
if (pipe_write) {
process_net_io_chain.push_back(pipe_write);
}
@ -897,10 +883,9 @@ static bool exec_process_in_job(parser_t &parser, process_t *p, std::shared_ptr<
process_net_io_chain.append(p->io_chain());
// Read pipe goes last.
if (!p->is_first_in_job) {
pipe_read.reset(new io_pipe_t(STDIN_FILENO, true));
// Record the current read in pipe_read.
pipe_read->pipe_fd[0] = pipe_current_read.fd();
if (pipe_current_read.valid()) {
pipe_read = std::make_shared<io_pipe_t>(STDIN_FILENO, true /* input */,
std::move(pipe_current_read));
process_net_io_chain.push_back(pipe_read);
}
@ -918,36 +903,6 @@ static bool exec_process_in_job(parser_t &parser, process_t *p, std::shared_ptr<
parser.vars().export_arr();
}
// Set up fds that will be used in the pipe.
if (pipes_to_next_command) {
// debug( 1, L"%ls|%ls" , p->argv[0], p->next->argv[0]);
int local_pipe[2] = {-1, -1};
if (exec_pipe(local_pipe) == -1) {
debug(1, PIPE_ERROR);
wperror(L"pipe");
job_mark_process_as_failed(j, p);
return false;
}
// Ensure our pipe fds not conflict with any fd redirections. E.g. if the process is
// like 'cat <&5' then fd 5 must not be used by the pipe.
if (!pipe_avoid_conflicts_with_io_chain(local_pipe, all_ios)) {
// We failed. The pipes were closed for us.
wperror(L"dup");
job_mark_process_as_failed(j, p);
return false;
}
// This tells the redirection about the fds, but the redirection does not close them.
assert(local_pipe[0] >= 0);
assert(local_pipe[1] >= 0);
memcpy(pipe_write->pipe_fd, local_pipe, sizeof(int) * 2);
// Record our pipes.
pipe_current_write.reset(local_pipe[1]);
out_pipe_next_read->reset(local_pipe[0]);
}
// Execute the process.
switch (p->type) {
case INTERNAL_FUNCTION:
@ -1008,18 +963,13 @@ bool exec_job(parser_t &parser, shared_ptr<job_t> j) {
}
}
// Verify that all io_mode_t::buffers are output. We used to support a (single, hacked-in)
// magical input io_mode_t::buffer used by fish_pager, but now the claim is that there are no
// more clients and it is removed. This assertion double-checks that.
size_t stdout_read_limit = 0;
const io_chain_t all_ios = j->all_io_redirections();
for (size_t idx = 0; idx < all_ios.size(); idx++) {
const shared_ptr<io_data_t> &io = all_ios.at(idx);
if ((io->io_mode == io_mode_t::buffer)) {
io_buffer_t *io_buffer = static_cast<io_buffer_t *>(io.get());
assert(!io_buffer->is_input);
stdout_read_limit = io_buffer->buffer().limit();
for (auto &io : all_ios) {
if ((io->io_mode == io_mode_t::bufferfill)) {
// The read limit is dictated by the last bufferfill.
const auto *bf = static_cast<io_bufferfill_t *>(io.get());
stdout_read_limit = bf->buffer()->buffer().limit();
}
}
@ -1028,21 +978,6 @@ bool exec_job(parser_t &parser, shared_ptr<job_t> j) {
DIE("this should be unreachable");
}
// We may have block IOs that conflict with fd redirections. For example, we may have a command
// with a redireciton like <&3; we may also have chosen 3 as the fd for our pipe. Ensure we have
// no conflicts.
for (const auto io : all_ios) {
if (io->io_mode == io_mode_t::buffer) {
auto *io_buffer = static_cast<io_buffer_t *>(io.get());
if (!io_buffer->avoid_conflicts_with_io_chain(all_ios)) {
// We could not avoid conflicts, probably due to fd exhaustion. Mark an error.
exec_error = true;
job_mark_process_as_failed(j, j->processes.front().get());
break;
}
}
}
// This loop loops over every process_t in the job, starting it as appropriate. This turns out
// to be rather complex, since a process_t can be one of many rather different things.
//
@ -1098,29 +1033,30 @@ static int exec_subshell_internal(const wcstring &cmd, parser_t &parser, wcstrin
// IO buffer creation may fail (e.g. if we have too many open files to make a pipe), so this may
// be null.
const shared_ptr<io_buffer_t> io_buffer(
io_buffer_t::create(STDOUT_FILENO, io_chain_t(), is_subcmd ? read_byte_limit : 0));
if (io_buffer.get() != NULL) {
size_t read_limit = is_subcmd ? read_byte_limit : 0;
std::shared_ptr<io_buffer_t> buffer;
if (auto bufferfill = io_bufferfill_t::create(io_chain_t{}, read_limit)) {
parser_t &parser = parser_t::principal_parser();
if (parser.eval(cmd, io_chain_t{io_buffer}, SUBST) == 0) {
if (parser.eval(cmd, io_chain_t{bufferfill}, SUBST) == 0) {
subcommand_status = proc_get_last_status();
}
io_buffer->read();
buffer = bufferfill->buffer();
bufferfill.reset();
buffer->read_to_wouldblock();
}
if (io_buffer->buffer().discarded()) subcommand_status = STATUS_READ_TOO_MUCH;
if (buffer && buffer->buffer().discarded()) subcommand_status = STATUS_READ_TOO_MUCH;
// If the caller asked us to preserve the exit status, restore the old status. Otherwise set the
// status of the subcommand.
proc_set_last_status(apply_exit_status ? subcommand_status : prev_status);
is_subshell = prev_subshell;
if (lst == NULL || io_buffer.get() == NULL) {
if (lst == NULL || !buffer) {
return subcommand_status;
}
// Walk over all the elements.
for (const auto &elem : io_buffer->buffer().elements()) {
for (const auto &elem : buffer->buffer().elements()) {
if (elem.is_explicitly_separated()) {
// Just append this one.
lst->push_back(str2wcstring(elem.contents));

View file

@ -31,10 +31,6 @@ int exec_subshell(const wcstring &cmd, parser_t &parser, bool preserve_exit_stat
/// Loops over close until the syscall was run without being interrupted.
void exec_close(int fd);
/// Call pipe(), and add resulting fds to open_fds, the list of opened file descriptors for pipes.
/// The pipes are marked CLO_EXEC.
int exec_pipe(int fd[2]);
/// Gets the interpreter for a given command.
char *get_interpreter(const char *command, char *interpreter, size_t buff_size);

View file

@ -920,8 +920,7 @@ static void test_parser() {
}
static void test_1_cancellation(const wchar_t *src) {
shared_ptr<io_buffer_t> out_buff(io_buffer_t::create(STDOUT_FILENO, io_chain_t()));
const io_chain_t io_chain{out_buff};
auto filler = io_bufferfill_t::create(io_chain_t{});
pthread_t thread = pthread_self();
double delay = 0.25 /* seconds */;
iothread_perform([=]() {
@ -929,11 +928,13 @@ static void test_1_cancellation(const wchar_t *src) {
usleep(delay * 1E6);
pthread_kill(thread, SIGINT);
});
parser_t::principal_parser().eval(src, io_chain, TOP);
out_buff->read();
if (out_buff->buffer().size() != 0) {
parser_t::principal_parser().eval(src, io_chain_t{filler}, TOP);
auto buffer = filler->buffer();
filler.reset();
buffer->read_to_wouldblock();
if (buffer->buffer().size() != 0) {
err(L"Expected 0 bytes in out_buff, but instead found %lu bytes\n",
out_buff->buffer().size());
buffer->buffer().size());
}
iothread_drain_all();
}

View file

@ -22,14 +22,10 @@ void io_fd_t::print() const { fwprintf(stderr, L"FD map %d -> %d\n", old_fd, fd)
void io_file_t::print() const { fwprintf(stderr, L"file (%s)\n", filename_cstr); }
void io_pipe_t::print() const {
fwprintf(stderr, L"pipe {%d, %d} (input: %s)\n", pipe_fd[0], pipe_fd[1],
is_input ? "yes" : "no");
fwprintf(stderr, L"pipe {%d} (input: %s)\n", pipe_fd(), is_input_ ? "yes" : "no");
}
void io_buffer_t::print() const {
fwprintf(stderr, L"buffer (input: %s, size %lu)\n",
is_input ? "yes" : "no", (unsigned long)buffer_.size());
}
void io_bufferfill_t::print() const { fwprintf(stderr, L"bufferfill {%d}\n", write_fd_.fd()); }
void io_buffer_t::append_from_stream(const output_stream_t &stream) {
if (buffer_.discarded()) return;
@ -40,75 +36,84 @@ void io_buffer_t::append_from_stream(const output_stream_t &stream) {
buffer_.append_wide_buffer(stream.buffer());
}
void io_buffer_t::read() {
exec_close(pipe_fd[1]);
long io_buffer_t::read_some() {
int fd = read_.fd();
assert(fd >= 0 && "Should have a valid fd");
debug(4, L"io_buffer_t::read: blocking read on fd %d", fd);
long len;
char b[4096];
do {
len = read(fd, b, sizeof b);
} while (len < 0 && errno == EINTR);
if (len > 0) {
buffer_.append(&b[0], &b[len]);
}
return len;
}
if (io_mode == io_mode_t::buffer) {
debug(4, L"io_buffer_t::read: blocking read on fd %d", pipe_fd[0]);
while (1) {
char b[4096];
long len = read_blocked(pipe_fd[0], b, 4096);
if (len == 0) {
break;
} else if (len < 0) {
// exec_read_io_buffer is only called on jobs that have exited, and will therefore
// never block. But a broken pipe seems to cause some flags to reset, causing the
// EOF flag to not be set. Therefore, EAGAIN is ignored and we exit anyway.
if (errno != EAGAIN) {
const wchar_t *fmt =
_(L"An error occured while reading output from code block on fd %d");
debug(1, fmt, pipe_fd[0]);
wperror(L"io_buffer_t::read");
}
void io_buffer_t::read_to_wouldblock() {
long len;
do {
len = read_some();
} while (len > 0);
if (len < 0 && errno != EAGAIN) {
debug(1, _(L"An error occured while reading output from code block on fd %d"), read_.fd());
wperror(L"io_buffer_t::read");
}
}
break;
} else {
buffer_.append(&b[0], &b[len]);
}
bool io_buffer_t::try_read(unsigned long timeout_usec) {
struct timeval timeout;
timeout.tv_sec = 0;
timeout.tv_usec = timeout_usec;
int fd = read_.fd();
assert(fd >= 0 && "Should have a valid fd");
fd_set fds;
FD_ZERO(&fds);
FD_SET(fd, &fds);
int ret = select(fd + 1, &fds, nullptr, nullptr, &timeout);
if (ret < 0) {
// Treat EINTR as timeout.
if (errno != EINTR) {
debug(1, _(L"An error occured inside select on fd %d"), fd);
wperror(L"io_buffer_t::try_read");
}
return false;
}
if (ret > 0) {
read_some();
}
return ret > 0;
}
bool io_buffer_t::avoid_conflicts_with_io_chain(const io_chain_t &ios) {
bool result = pipe_avoid_conflicts_with_io_chain(this->pipe_fd, ios);
if (!result) {
wperror(L"dup");
shared_ptr<io_bufferfill_t> io_bufferfill_t::create(const io_chain_t &conflicts,
size_t buffer_limit) {
// Construct our pipes.
auto pipes = make_autoclose_pipes(conflicts);
if (!pipes) {
return nullptr;
}
return result;
}
shared_ptr<io_buffer_t> io_buffer_t::create(int fd, const io_chain_t &conflicts,
size_t buffer_limit) {
bool success = true;
assert(fd >= 0);
shared_ptr<io_buffer_t> buffer_redirect(new io_buffer_t(fd, buffer_limit));
if (exec_pipe(buffer_redirect->pipe_fd) == -1) {
debug(1, PIPE_ERROR);
wperror(L"pipe");
success = false;
} else if (!buffer_redirect->avoid_conflicts_with_io_chain(conflicts)) {
// The above call closes the fds on error.
success = false;
} else if (make_fd_nonblocking(buffer_redirect->pipe_fd[0]) != 0) {
// Our buffer will read from the read end of the pipe. This end must be non-blocking. This is
// because we retain the write end of the pipe in this process (even after handing it off to a
// child process); therefore a read on the pipe may block forever. What we should do is arrange
// for the write end of the pipe to be closed at the right time; then the read could just block.
if (make_fd_nonblocking(pipes->read.fd())) {
debug(1, PIPE_ERROR);
wperror(L"fcntl");
success = false;
return nullptr;
}
if (!success) {
buffer_redirect.reset();
}
return buffer_redirect;
// Our buffer gets the read end of the pipe; out_pipe gets the write end.
auto buffer = std::make_shared<io_buffer_t>(std::move(pipes->read), buffer_limit);
return std::make_shared<io_bufferfill_t>(std::move(pipes->write), buffer);
}
io_buffer_t::~io_buffer_t() {
if (pipe_fd[0] >= 0) {
exec_close(pipe_fd[0]);
}
// Dont free fd for writing. This should already be free'd before calling exec_read_io_buffer on
// the buffer.
}
io_pipe_t::~io_pipe_t() = default;
io_bufferfill_t::~io_bufferfill_t() = default;
io_buffer_t::~io_buffer_t() = default;
void io_chain_t::remove(const shared_ptr<const io_data_t> &element) {
// See if you can guess why std::find doesn't work here.
@ -197,7 +202,7 @@ int move_fd_to_unused(int fd, const io_chain_t &io_chain, bool cloexec) {
return new_fd;
}
bool pipe_avoid_conflicts_with_io_chain(int fds[2], const io_chain_t &ios) {
static bool pipe_avoid_conflicts_with_io_chain(int fds[2], const io_chain_t &ios) {
bool success = true;
for (int i = 0; i < 2; i++) {
fds[i] = move_fd_to_unused(fds[i], ios);
@ -221,6 +226,27 @@ bool pipe_avoid_conflicts_with_io_chain(int fds[2], const io_chain_t &ios) {
return success;
}
maybe_t<autoclose_pipes_t> make_autoclose_pipes(const io_chain_t &ios) {
int pipes[2] = {-1, -1};
if (pipe(pipes) < 0) {
debug(1, PIPE_ERROR);
wperror(L"pipe");
return none();
}
set_cloexec(pipes[0]);
set_cloexec(pipes[1]);
if (!pipe_avoid_conflicts_with_io_chain(pipes, ios)) {
// The pipes are closed on failure here.
return none();
}
autoclose_pipes_t result;
result.read = autoclose_fd_t(pipes[0]);
result.write = autoclose_fd_t(pipes[1]);
return result;
}
/// Return the last IO for the given fd.
shared_ptr<const io_data_t> io_chain_t::get_io_for_fd(int fd) const {
size_t idx = this->size();

115
src/io.h
View file

@ -150,7 +150,7 @@ public:
};
/// Describes what type of IO operation an io_data_t represents.
enum class io_mode_t { file, pipe, fd, buffer, close };
enum class io_mode_t { file, pipe, fd, close, bufferfill };
/// Represents an FD redirection.
class io_data_t {
@ -210,39 +210,83 @@ class io_file_t : public io_data_t {
~io_file_t() override { free((void *)filename_cstr); }
};
/// Represents (one end) of a pipe.
class io_pipe_t : public io_data_t {
protected:
io_pipe_t(io_mode_t m, int f, bool i) : io_data_t(m, f), is_input(i) {
pipe_fd[0] = pipe_fd[1] = -1;
}
// The pipe's fd. Conceptually this is dup2'd to io_data_t::fd.
autoclose_fd_t pipe_fd_;
/// Whether this is an input pipe. This is used only for informational purposes.
const bool is_input_;
public:
int pipe_fd[2];
const bool is_input;
void print() const override;
io_pipe_t(int f, bool i) : io_data_t(io_mode_t::pipe, f), is_input(i) {
pipe_fd[0] = pipe_fd[1] = -1;
}
io_pipe_t(int fd, bool is_input, autoclose_fd_t pipe_fd)
: io_data_t(io_mode_t::pipe, fd), pipe_fd_(std::move(pipe_fd)), is_input_(is_input) {}
~io_pipe_t();
int pipe_fd() const { return pipe_fd_.fd(); }
};
class io_buffer_t;
class io_chain_t;
/// Represents filling an io_buffer_t. Very similar to io_pipe_t.
/// Bufferfills always target stdout.
class io_bufferfill_t : public io_data_t {
/// Write end. The other end is connected to an io_buffer_t.
const autoclose_fd_t write_fd_;
/// The receiving buffer.
const std::shared_ptr<io_buffer_t> buffer_;
public:
void print() const override;
io_bufferfill_t(autoclose_fd_t write_fd, std::shared_ptr<io_buffer_t> buffer)
: io_data_t(io_mode_t::bufferfill, STDOUT_FILENO),
write_fd_(std::move(write_fd)),
buffer_(std::move(buffer)) {}
~io_bufferfill_t();
std::shared_ptr<io_buffer_t> buffer() const { return buffer_; }
int write_fd() const { return write_fd_.fd(); }
/// Create an io_bufferfill_t which, when written from, populates a buffer (also created).
/// \returns nullptr on failure, e.g. too many open fds.
///
/// \param conflicts A set of IO redirections. The function ensures that any pipe it makes does
/// not conflict with an fd redirection in this list.
static shared_ptr<io_bufferfill_t> create(const io_chain_t &conflicts, size_t buffer_limit = 0);
};
class output_stream_t;
class io_buffer_t : public io_pipe_t {
/// An io_buffer_t is a buffer which can populate itself by reading from an fd.
/// It is not an io_data_t.
class io_buffer_t {
private:
/// fd from which to read.
autoclose_fd_t read_;
/// Buffer storing what we have read.
separated_buffer_t<std::string> buffer_;
explicit io_buffer_t(int f, size_t limit)
: io_pipe_t(io_mode_t::buffer, f, false /* not input */), buffer_(limit) {
/// Read some. Append it to our buffer.
/// \return positive if we read, 0 on EOF, -1 on error.
long read_some();
public:
explicit io_buffer_t(autoclose_fd_t read, size_t limit)
: read_(std::move(read)), buffer_(limit) {
// Explicitly reset the discard flag because we share this buffer.
buffer_.reset_discard();
}
public:
void print() const override;
~io_buffer_t() override;
~io_buffer_t();
/// Access the underlying buffer.
const separated_buffer_t<std::string> &buffer() const { return buffer_; }
@ -250,24 +294,16 @@ class io_buffer_t : public io_pipe_t {
/// Function to append to the buffer.
void append(const char *ptr, size_t count) { buffer_.append(ptr, ptr + count); }
/// Ensures that the pipes do not conflict with any fd redirections in the chain.
bool avoid_conflicts_with_io_chain(const io_chain_t &ios);
/// Read from input pipe until EOF or EAGAIN (i.e. would block).
void read_to_wouldblock();
/// Close output pipe, and read from input pipe until eof.
void read();
/// Read a bit, if our fd is readable, with the given timeout.
/// \return true if we read some, false on timeout.
bool try_read(unsigned long timeout_usec);
/// Appends data from a given output_stream_t.
/// Marks the receiver as discarded if the stream was discarded.
void append_from_stream(const output_stream_t &stream);
/// Create a io_mode_t::buffer type io redirection, complete with a pipe and a vector<char> for
/// output. The default file descriptor used is STDOUT_FILENO for buffering.
///
/// \param fd the fd that will be mapped in the child process, typically STDOUT_FILENO
/// \param conflicts A set of IO redirections. The function ensures that any pipe it makes does
/// not conflict with an fd redirection in this list.
static shared_ptr<io_buffer_t> create(int fd, const io_chain_t &conflicts,
size_t buffer_limit = 0);
};
class io_chain_t : public std::vector<shared_ptr<io_data_t>> {
@ -287,11 +323,18 @@ class io_chain_t : public std::vector<shared_ptr<io_data_t>> {
shared_ptr<const io_data_t> io_chain_get(const io_chain_t &src, int fd);
shared_ptr<io_data_t> io_chain_get(io_chain_t &src, int fd);
/// Given a pair of fds, if an fd is used by the given io chain, duplicate that fd repeatedly until
/// we find one that does not conflict, or we run out of fds. Returns the new fds by reference,
/// closing the old ones. If we get an error, returns false (in which case both fds are closed and
/// set to -1).
bool pipe_avoid_conflicts_with_io_chain(int fds[2], const io_chain_t &ios);
/// Helper type returned from making autoclose pipes.
struct autoclose_pipes_t {
/// Read end of the pipe.
autoclose_fd_t read;
/// Write end of the pipe.
autoclose_fd_t write;
};
/// Call pipe(), populating autoclose fds, avoiding conflicts.
/// The pipes are marked CLO_EXEC.
/// \return pipes on success, none() on error.
maybe_t<autoclose_pipes_t> make_autoclose_pipes(const io_chain_t &ios);
/// If the given fd is used by the io chain, duplicates it repeatedly until an fd not used in the io
/// chain is found, or we run out. If we return a new fd or an error, closes the old one.

View file

@ -837,86 +837,24 @@ void proc_update_jiffies() {
#endif
/// The return value of select_try(), indicating IO readiness or an error
enum class select_try_t {
/// One or more fds have data ready for read
DATA_READY,
/// The timeout elapsed without any data becoming available for read
enum class block_receive_try_t {
/// There is no buffer to select on.
NO_BUFFER,
/// We have a block buffer, and we read some.
DATA_READ,
/// We have a block buffer, but we were unable to read any.
TIMEOUT,
/// There were no FDs in the io chain for which to select on.
IOCHAIN_EMPTY,
};
/// Check if there are buffers associated with the job, and select on them for a while if available.
///
/// \param j the job to test
/// \return the status of the select operation
static select_try_t select_try(job_t *j) {
fd_set fds;
int maxfd = -1;
FD_ZERO(&fds);
const io_chain_t chain = j->all_io_redirections();
for (const auto &io : chain) {
if (io->io_mode == io_mode_t::buffer) {
auto io_pipe = static_cast<const io_pipe_t *>(io.get());
int fd = io_pipe->pipe_fd[0];
FD_SET(fd, &fds);
maxfd = std::max(maxfd, fd);
debug(4, L"select_try on fd %d", fd);
}
}
if (maxfd >= 0) {
struct timeval timeout;
timeout.tv_sec = 0;
timeout.tv_usec = 10000;
int retval = select(maxfd + 1, &fds, 0, 0, &timeout);
if (retval == 0) {
debug(4, L"select_try hit timeout");
return select_try_t::TIMEOUT;
}
return select_try_t::DATA_READY;
}
return select_try_t::IOCHAIN_EMPTY;
}
/// Read from descriptors until they are empty.
///
/// \param j the job to test
static void read_try(job_t *j) {
io_buffer_t *buff = NULL;
// Find the last buffer, which is the one we want to read from.
const io_chain_t chain = j->all_io_redirections();
for (size_t idx = 0; idx < chain.size(); idx++) {
io_data_t *d = chain.at(idx).get();
if (d->io_mode == io_mode_t::buffer) {
buff = static_cast<io_buffer_t *>(d);
}
}
if (buff) {
debug(4, L"proc::read_try('%ls')", j->command_wcstr());
while (1) {
char b[BUFFER_SIZE];
long len = read_blocked(buff->pipe_fd[0], b, BUFFER_SIZE);
if (len == 0) {
break;
} else if (len < 0) {
if (errno != EAGAIN) {
debug(1, _(L"An error occured while reading output from code block"));
wperror(L"read_try");
}
break;
} else {
buff->append(b, len);
}
/// \return the last IO buffer in job j, or nullptr if none.
std::shared_ptr<io_buffer_t> last_buffer(job_t *j) {
std::shared_ptr<io_buffer_t> buff{};
for (const auto &io : j->all_io_redirections()) {
if (io->io_mode == io_mode_t::bufferfill) {
buff = static_cast<io_bufferfill_t *>(io.get())->buffer();
}
}
return buff;
}
// Return control of the terminal to a job's process group. restore_attrs is true if we are restoring
@ -1154,26 +1092,27 @@ void job_t::continue_job(bool send_sigcont) {
// Wait for data to become available or the status of our own job to change
while (!reader_exit_forced() && !is_stopped() && !is_completed()) {
auto result = select_try(this);
read_attempted = true;
switch (result) {
case select_try_t::DATA_READY:
// Read the data that we know is now available, then scan for finished processes
// but do not block. We don't block so long as we have IO to process, once the
// fd buffers are empty we'll block in the second case below.
read_try(this);
auto read_result = block_receive_try_t::NO_BUFFER;
if (auto buff = last_buffer(this)) {
const unsigned long SELECT_TIMEOUT_USEC = 10000;
bool did_read = buff->try_read(SELECT_TIMEOUT_USEC);
read_result = did_read ? block_receive_try_t::DATA_READ : block_receive_try_t::TIMEOUT;
}
switch (read_result) {
case block_receive_try_t::DATA_READ:
// We read some data.
process_mark_finished_children(false);
break;
case select_try_t::TIMEOUT:
// No FDs are ready. Look for finished processes instead.
case block_receive_try_t::TIMEOUT:
// We read some data or timed out. Poll for finished processes.
debug(4, L"select_try: no fds returned valid data within the timeout" );
process_mark_finished_children(block_on_fg);
break;
case select_try_t::IOCHAIN_EMPTY:
// There were no IO fds to select on.
case block_receive_try_t::NO_BUFFER:
// We are not populating a buffer.
debug(4, L"select_try: no IO fds" );
process_mark_finished_children(true);
@ -1195,7 +1134,9 @@ void job_t::continue_job(bool send_sigcont) {
// `echo` calls were sometimes having their output combined with the `set_color` calls
// in the wrong order!
if (!read_attempted) {
read_try(this);
if (auto buff = last_buffer(this)) {
buff->read_to_wouldblock();
}
}
// Set $status only if we are in the foreground and the last process in the job has

View file

@ -73,15 +73,17 @@ maybe_t<dup2_list_t> dup2_list_t::resolve_chain(const io_chain_t &io_chain) {
break;
}
case io_mode_t::buffer:
case io_mode_t::pipe: {
const io_pipe_t *io = static_cast<const io_pipe_t *>(io_ref.get());
// If write_pipe_idx is 0, it means we're connecting to the read end (first pipe
// fd). If it's 1, we're connecting to the write end (second pipe fd).
unsigned int write_pipe_idx = (io->is_input ? 0 : 1);
result.add_dup2(io->pipe_fd[write_pipe_idx], io->fd);
if (io->pipe_fd[0] >= 0) result.add_close(io->pipe_fd[0]);
if (io->pipe_fd[1] >= 0) result.add_close(io->pipe_fd[1]);
result.add_dup2(io->pipe_fd(), io->fd);
result.add_close(io->pipe_fd());
break;
}
case io_mode_t::bufferfill: {
const io_bufferfill_t *io = static_cast<const io_bufferfill_t *>(io_ref.get());
result.add_dup2(io->write_fd(), io->fd);
result.add_close(io->write_fd());
break;
}
}