From 178b72b2fd67d37215f973bd55290a5d0ec6eb06 Mon Sep 17 00:00:00 2001 From: ridiculousfish Date: Thu, 31 Jan 2019 16:05:42 -0800 Subject: [PATCH] io_buffer_t becomes io_bufferfill_t This makes some significant architectual improvements to io_pipe_t and io_buffer_t. Prior to this fix, io_buffer_t subclassed io_pipe_t. io_buffer_t is now replaced with a class io_bufferfill_t, which does not subclass pipe. io_pipe_t no longer remembers both fds. Instead it has an autoclose_fd_t, so that the file descriptor ownership is clear. --- src/common.h | 3 + src/exec.cpp | 212 ++++++++++++++++---------------------------- src/exec.h | 4 - src/fish_tests.cpp | 13 +-- src/io.cpp | 150 ++++++++++++++++++------------- src/io.h | 115 ++++++++++++++++-------- src/proc.cpp | 117 ++++++------------------ src/redirection.cpp | 16 ++-- 8 files changed, 289 insertions(+), 341 deletions(-) diff --git a/src/common.h b/src/common.h index 8b45a808c..5fa2119ba 100644 --- a/src/common.h +++ b/src/common.h @@ -786,6 +786,9 @@ class autoclose_fd_t { fd_ = fd; } + // \return if this has a valid fd. + bool valid() const { return fd_ >= 0; } + autoclose_fd_t(const autoclose_fd_t &) = delete; void operator=(const autoclose_fd_t &) = delete; autoclose_fd_t(autoclose_fd_t &&rhs) : fd_(rhs.fd_) { rhs.fd_ = -1; } diff --git a/src/exec.cpp b/src/exec.cpp index 78444b8b8..7de91f5be 100644 --- a/src/exec.cpp +++ b/src/exec.cpp @@ -81,25 +81,6 @@ void exec_close(int fd) { } } -int exec_pipe(int fd[2]) { - ASSERT_IS_MAIN_THREAD(); - - int res; - while ((res = pipe(fd))) { - if (errno != EINTR) { - return res; // caller will call wperror - } - } - - debug(4, L"Created pipe using fds %d and %d", fd[0], fd[1]); - - // Pipes ought to be cloexec. Pipes are dup2'd the corresponding fds; the resulting fds are not - // cloexec. - set_cloexec(fd[0]); - set_cloexec(fd[1]); - return res; -} - /// Returns true if the redirection is a file redirection to a file other than /dev/null. static bool redirection_is_to_real_file(const io_data_t *io) { bool result = false; @@ -246,8 +227,8 @@ static bool io_transmogrify(const io_chain_t &in_chain, io_chain_t *out_chain, switch (in->io_mode) { case io_mode_t::pipe: + case io_mode_t::bufferfill: case io_mode_t::fd: - case io_mode_t::buffer: case io_mode_t::close: { // These redirections don't need transmogrification. They can be passed through. out = in; @@ -424,12 +405,12 @@ static bool exec_internal_builtin_proc(parser_t &parser, const std::shared_ptrtype == INTERNAL_BUILTIN && "Process must be a builtin"); int local_builtin_stdin = STDIN_FILENO; - bool close_stdin = false; + autoclose_fd_t locally_opened_stdin{}; // If this is the first process, check the io redirections and see where we should // be reading from. if (pipe_read) { - local_builtin_stdin = pipe_read->pipe_fd[0]; + local_builtin_stdin = pipe_read->pipe_fd(); } else if (const auto in = proc_io_chain.get_io_for_fd(STDIN_FILENO)) { switch (in->io_mode) { case io_mode_t::fd: { @@ -448,20 +429,20 @@ static bool exec_internal_builtin_proc(parser_t &parser, const std::shared_ptr(in.get()); - local_builtin_stdin = in_pipe->pipe_fd[0]; + if (in_pipe->fd == STDIN_FILENO) { + local_builtin_stdin = in_pipe->pipe_fd(); + } break; } case io_mode_t::file: { - // Do not set CLO_EXEC because child needs access. const io_file_t *in_file = static_cast(in.get()); - local_builtin_stdin = open(in_file->filename_cstr, in_file->flags, OPEN_MASK); - if (local_builtin_stdin == -1) { + locally_opened_stdin = + autoclose_fd_t{open(in_file->filename_cstr, in_file->flags, OPEN_MASK)}; + if (!locally_opened_stdin.valid()) { debug(1, FILE_ERROR, in_file->filename_cstr); wperror(L"open"); - } else { - close_stdin = true; } - + local_builtin_stdin = locally_opened_stdin.fd(); break; } case io_mode_t::close: { @@ -517,10 +498,6 @@ static bool exec_internal_builtin_proc(parser_t &parser, const std::shared_ptrset_flag(job_flag_t::FOREGROUND, fg); - // If stdin has been redirected, close the redirection stream. - if (close_stdin) { - exec_close(local_builtin_stdin); - } return true; // "success" } @@ -548,7 +525,11 @@ static bool handle_builtin_output(const std::shared_ptr &j, process_t *p, if (!must_fork && p->is_last_in_job) { // We are handling reads directly in the main loop. Note that we may still end // up forking. - const bool stdout_is_to_buffer = stdout_io && stdout_io->io_mode == io_mode_t::buffer; + const bool stdout_is_bufferfill = + (stdout_io && stdout_io->io_mode == io_mode_t::bufferfill); + const std::shared_ptr stdout_buffer = + stdout_is_bufferfill ? static_cast(stdout_io.get())->buffer() + : nullptr; const bool no_stdout_output = stdout_stream.empty(); const bool no_stderr_output = stderr_stream.empty(); const bool stdout_discarded = stdout_stream.buffer().discarded(); @@ -558,7 +539,7 @@ static bool handle_builtin_output(const std::shared_ptr &j, process_t *p, // need to fork or even output anything. debug(4, L"Skipping fork: no output for internal builtin '%ls'", p->argv0()); fork_was_skipped = true; - } else if (no_stderr_output && stdout_is_to_buffer) { + } else if (no_stderr_output && stdout_buffer) { // The builtin produced no stderr, and its stdout is going to an // internal buffer. There is no need to fork. This helps out the // performance quite a bit in complex completion code. @@ -570,8 +551,7 @@ static bool handle_builtin_output(const std::shared_ptr &j, process_t *p, // also produce stderr. debug(4, L"Skipping fork: buffered output for internal builtin '%ls'", p->argv0()); - io_buffer_t *io_buffer = static_cast(stdout_io.get()); - io_buffer->append_from_stream(stdout_stream); + stdout_buffer->append_from_stream(stdout_stream); fork_was_skipped = true; } else if (stdout_io.get() == NULL && stderr_io.get() == NULL) { // We are writing to normal stdout and stderr. Just do it - no need to fork. @@ -749,20 +729,16 @@ static bool exec_block_or_func_process(parser_t &parser, std::shared_ptr "Unexpected process type"); // Create an output buffer if we're piping to another process. - shared_ptr block_output_io_buffer{}; + shared_ptr block_output_bufferfill{}; if (!p->is_last_in_job) { // Be careful to handle failure, e.g. too many open fds. - block_output_io_buffer = io_buffer_t::create(STDOUT_FILENO, user_ios); - if (!block_output_io_buffer) { + block_output_bufferfill = io_bufferfill_t::create(user_ios); + if (!block_output_bufferfill) { job_mark_process_as_failed(j, p); return false; - } else { - // This looks sketchy, because we're adding this io buffer locally - they - // aren't in the process or job redirection list. Therefore select_try won't - // be able to read them. However we call block_output_io_buffer->read() - // below, which reads until EOF. So there's no need to select on this. - io_chain.push_back(block_output_io_buffer); } + // Teach the job about its bufferfill, and add it to our io chain. + io_chain.push_back(block_output_bufferfill); } if (p->type == INTERNAL_FUNCTION) { @@ -792,10 +768,8 @@ static bool exec_block_or_func_process(parser_t &parser, std::shared_ptr int status = proc_get_last_status(); - // Handle output from a block or function. This usually means do nothing, but in the - // case of pipes, we have to buffer such io, since otherwise the internal pipe - // buffer might overflow. - if (!block_output_io_buffer.get()) { + // If we have a block output buffer, populate it now. + if (!block_output_bufferfill) { // No buffer, so we exit directly. This means we have to manually set the exit // status. if (p->is_last_in_job) { @@ -804,11 +778,16 @@ static bool exec_block_or_func_process(parser_t &parser, std::shared_ptr p->completed = 1; return true; } + assert(block_output_bufferfill && "Must have a block output bufferfiller"); - // Here we must have a non-NULL block_output_io_buffer. - assert(block_output_io_buffer.get() != NULL); - io_chain.remove(block_output_io_buffer); - block_output_io_buffer->read(); + // Remove our write pipe and forget it. This may close the pipe, unless another thread has + // claimed it (background write) or another process has inherited it. + auto block_output_buffer = block_output_bufferfill->buffer(); + io_chain.remove(block_output_bufferfill); + block_output_bufferfill.reset(); + + // Make the buffer populate itself from whatever was written to the write pipe. + block_output_buffer->read_to_wouldblock(); // Resolve our IO chain to a sequence of dup2s. auto dup2s = dup2_list_t::resolve_chain(io_chain); @@ -816,7 +795,7 @@ static bool exec_block_or_func_process(parser_t &parser, std::shared_ptr return false; } - const std::string buffer_contents = block_output_io_buffer->buffer().newline_serialized(); + const std::string buffer_contents = block_output_buffer->buffer().newline_serialized(); const char *buffer = buffer_contents.data(); size_t count = buffer_contents.size(); if (count > 0) { @@ -824,7 +803,7 @@ static bool exec_block_or_func_process(parser_t &parser, std::shared_ptr const char *fork_reason = p->type == INTERNAL_BLOCK_NODE ? "internal block io" : "internal function io"; if (!fork_child_for_process(j, p, *dup2s, false, fork_reason, [&] { - exec_write_and_exit(block_output_io_buffer->fd, buffer, count, status); + exec_write_and_exit(STDOUT_FILENO, buffer, count, status); })) { return false; } @@ -844,19 +823,28 @@ static bool exec_process_in_job(parser_t &parser, process_t *p, std::shared_ptr< autoclose_fd_t pipe_current_read, autoclose_fd_t *out_pipe_next_read, const io_chain_t &all_ios, size_t stdout_read_limit) { - // The IO chain for this process. It starts with the block IO, then pipes, and then gets any - // from the process. - io_chain_t process_net_io_chain = j->block_io_chain(); + // The pipe this command will write to (if any). + shared_ptr pipe_write; + // The pipe this command will read from (if any). + shared_ptr pipe_read; - // See if we need a pipe. + // See if we need a pipe for the next command. const bool pipes_to_next_command = !p->is_last_in_job; + if (pipes_to_next_command) { + // Construct our pipes. + auto local_pipes = make_autoclose_pipes(all_ios); + if (!local_pipes) { + debug(1, PIPE_ERROR); + wperror(L"pipe"); + job_mark_process_as_failed(j, p); + return false; + } - // The write end of any pipe we create. - autoclose_fd_t pipe_current_write{}; + pipe_write = std::make_shared(p->pipe_write_fd, false /* not input */, + std::move(local_pipes->write)); + *out_pipe_next_read = std::move(local_pipes->read); + } - // The pipes the current process write to and read from. Unfortunately these can't be just - // allocated on the stack, since j->io wants shared_ptr. - // // The write pipe (destined for stdout) needs to occur before redirections. For example, // with a redirection like this: // @@ -884,12 +872,10 @@ static bool exec_process_in_job(parser_t &parser, process_t *p, std::shared_ptr< // // which depends on the redirection being evaluated before the pipe. So the write end of the // pipe comes first, the read pipe of the pipe comes last. See issue #966. - shared_ptr pipe_write; - shared_ptr pipe_read; - // Write pipe goes first. - if (pipes_to_next_command) { - pipe_write.reset(new io_pipe_t(p->pipe_write_fd, false)); + // The IO chain for this process. + io_chain_t process_net_io_chain = j->block_io_chain(); + if (pipe_write) { process_net_io_chain.push_back(pipe_write); } @@ -897,10 +883,9 @@ static bool exec_process_in_job(parser_t &parser, process_t *p, std::shared_ptr< process_net_io_chain.append(p->io_chain()); // Read pipe goes last. - if (!p->is_first_in_job) { - pipe_read.reset(new io_pipe_t(STDIN_FILENO, true)); - // Record the current read in pipe_read. - pipe_read->pipe_fd[0] = pipe_current_read.fd(); + if (pipe_current_read.valid()) { + pipe_read = std::make_shared(STDIN_FILENO, true /* input */, + std::move(pipe_current_read)); process_net_io_chain.push_back(pipe_read); } @@ -918,36 +903,6 @@ static bool exec_process_in_job(parser_t &parser, process_t *p, std::shared_ptr< parser.vars().export_arr(); } - // Set up fds that will be used in the pipe. - if (pipes_to_next_command) { - // debug( 1, L"%ls|%ls" , p->argv[0], p->next->argv[0]); - int local_pipe[2] = {-1, -1}; - if (exec_pipe(local_pipe) == -1) { - debug(1, PIPE_ERROR); - wperror(L"pipe"); - job_mark_process_as_failed(j, p); - return false; - } - - // Ensure our pipe fds not conflict with any fd redirections. E.g. if the process is - // like 'cat <&5' then fd 5 must not be used by the pipe. - if (!pipe_avoid_conflicts_with_io_chain(local_pipe, all_ios)) { - // We failed. The pipes were closed for us. - wperror(L"dup"); - job_mark_process_as_failed(j, p); - return false; - } - - // This tells the redirection about the fds, but the redirection does not close them. - assert(local_pipe[0] >= 0); - assert(local_pipe[1] >= 0); - memcpy(pipe_write->pipe_fd, local_pipe, sizeof(int) * 2); - - // Record our pipes. - pipe_current_write.reset(local_pipe[1]); - out_pipe_next_read->reset(local_pipe[0]); - } - // Execute the process. switch (p->type) { case INTERNAL_FUNCTION: @@ -1008,18 +963,13 @@ bool exec_job(parser_t &parser, shared_ptr j) { } } - // Verify that all io_mode_t::buffers are output. We used to support a (single, hacked-in) - // magical input io_mode_t::buffer used by fish_pager, but now the claim is that there are no - // more clients and it is removed. This assertion double-checks that. size_t stdout_read_limit = 0; const io_chain_t all_ios = j->all_io_redirections(); - for (size_t idx = 0; idx < all_ios.size(); idx++) { - const shared_ptr &io = all_ios.at(idx); - - if ((io->io_mode == io_mode_t::buffer)) { - io_buffer_t *io_buffer = static_cast(io.get()); - assert(!io_buffer->is_input); - stdout_read_limit = io_buffer->buffer().limit(); + for (auto &io : all_ios) { + if ((io->io_mode == io_mode_t::bufferfill)) { + // The read limit is dictated by the last bufferfill. + const auto *bf = static_cast(io.get()); + stdout_read_limit = bf->buffer()->buffer().limit(); } } @@ -1028,21 +978,6 @@ bool exec_job(parser_t &parser, shared_ptr j) { DIE("this should be unreachable"); } - // We may have block IOs that conflict with fd redirections. For example, we may have a command - // with a redireciton like <&3; we may also have chosen 3 as the fd for our pipe. Ensure we have - // no conflicts. - for (const auto io : all_ios) { - if (io->io_mode == io_mode_t::buffer) { - auto *io_buffer = static_cast(io.get()); - if (!io_buffer->avoid_conflicts_with_io_chain(all_ios)) { - // We could not avoid conflicts, probably due to fd exhaustion. Mark an error. - exec_error = true; - job_mark_process_as_failed(j, j->processes.front().get()); - break; - } - } - } - // This loop loops over every process_t in the job, starting it as appropriate. This turns out // to be rather complex, since a process_t can be one of many rather different things. // @@ -1098,29 +1033,30 @@ static int exec_subshell_internal(const wcstring &cmd, parser_t &parser, wcstrin // IO buffer creation may fail (e.g. if we have too many open files to make a pipe), so this may // be null. - const shared_ptr io_buffer( - io_buffer_t::create(STDOUT_FILENO, io_chain_t(), is_subcmd ? read_byte_limit : 0)); - if (io_buffer.get() != NULL) { + size_t read_limit = is_subcmd ? read_byte_limit : 0; + std::shared_ptr buffer; + if (auto bufferfill = io_bufferfill_t::create(io_chain_t{}, read_limit)) { parser_t &parser = parser_t::principal_parser(); - if (parser.eval(cmd, io_chain_t{io_buffer}, SUBST) == 0) { + if (parser.eval(cmd, io_chain_t{bufferfill}, SUBST) == 0) { subcommand_status = proc_get_last_status(); } - - io_buffer->read(); + buffer = bufferfill->buffer(); + bufferfill.reset(); + buffer->read_to_wouldblock(); } - if (io_buffer->buffer().discarded()) subcommand_status = STATUS_READ_TOO_MUCH; + if (buffer && buffer->buffer().discarded()) subcommand_status = STATUS_READ_TOO_MUCH; // If the caller asked us to preserve the exit status, restore the old status. Otherwise set the // status of the subcommand. proc_set_last_status(apply_exit_status ? subcommand_status : prev_status); is_subshell = prev_subshell; - if (lst == NULL || io_buffer.get() == NULL) { + if (lst == NULL || !buffer) { return subcommand_status; } // Walk over all the elements. - for (const auto &elem : io_buffer->buffer().elements()) { + for (const auto &elem : buffer->buffer().elements()) { if (elem.is_explicitly_separated()) { // Just append this one. lst->push_back(str2wcstring(elem.contents)); diff --git a/src/exec.h b/src/exec.h index 010de4654..63b78abc7 100644 --- a/src/exec.h +++ b/src/exec.h @@ -31,10 +31,6 @@ int exec_subshell(const wcstring &cmd, parser_t &parser, bool preserve_exit_stat /// Loops over close until the syscall was run without being interrupted. void exec_close(int fd); -/// Call pipe(), and add resulting fds to open_fds, the list of opened file descriptors for pipes. -/// The pipes are marked CLO_EXEC. -int exec_pipe(int fd[2]); - /// Gets the interpreter for a given command. char *get_interpreter(const char *command, char *interpreter, size_t buff_size); diff --git a/src/fish_tests.cpp b/src/fish_tests.cpp index 8f266947d..2ed26d3b2 100644 --- a/src/fish_tests.cpp +++ b/src/fish_tests.cpp @@ -920,8 +920,7 @@ static void test_parser() { } static void test_1_cancellation(const wchar_t *src) { - shared_ptr out_buff(io_buffer_t::create(STDOUT_FILENO, io_chain_t())); - const io_chain_t io_chain{out_buff}; + auto filler = io_bufferfill_t::create(io_chain_t{}); pthread_t thread = pthread_self(); double delay = 0.25 /* seconds */; iothread_perform([=]() { @@ -929,11 +928,13 @@ static void test_1_cancellation(const wchar_t *src) { usleep(delay * 1E6); pthread_kill(thread, SIGINT); }); - parser_t::principal_parser().eval(src, io_chain, TOP); - out_buff->read(); - if (out_buff->buffer().size() != 0) { + parser_t::principal_parser().eval(src, io_chain_t{filler}, TOP); + auto buffer = filler->buffer(); + filler.reset(); + buffer->read_to_wouldblock(); + if (buffer->buffer().size() != 0) { err(L"Expected 0 bytes in out_buff, but instead found %lu bytes\n", - out_buff->buffer().size()); + buffer->buffer().size()); } iothread_drain_all(); } diff --git a/src/io.cpp b/src/io.cpp index 091d250f5..32731776c 100644 --- a/src/io.cpp +++ b/src/io.cpp @@ -22,14 +22,10 @@ void io_fd_t::print() const { fwprintf(stderr, L"FD map %d -> %d\n", old_fd, fd) void io_file_t::print() const { fwprintf(stderr, L"file (%s)\n", filename_cstr); } void io_pipe_t::print() const { - fwprintf(stderr, L"pipe {%d, %d} (input: %s)\n", pipe_fd[0], pipe_fd[1], - is_input ? "yes" : "no"); + fwprintf(stderr, L"pipe {%d} (input: %s)\n", pipe_fd(), is_input_ ? "yes" : "no"); } -void io_buffer_t::print() const { - fwprintf(stderr, L"buffer (input: %s, size %lu)\n", - is_input ? "yes" : "no", (unsigned long)buffer_.size()); -} +void io_bufferfill_t::print() const { fwprintf(stderr, L"bufferfill {%d}\n", write_fd_.fd()); } void io_buffer_t::append_from_stream(const output_stream_t &stream) { if (buffer_.discarded()) return; @@ -40,75 +36,84 @@ void io_buffer_t::append_from_stream(const output_stream_t &stream) { buffer_.append_wide_buffer(stream.buffer()); } -void io_buffer_t::read() { - exec_close(pipe_fd[1]); +long io_buffer_t::read_some() { + int fd = read_.fd(); + assert(fd >= 0 && "Should have a valid fd"); + debug(4, L"io_buffer_t::read: blocking read on fd %d", fd); + long len; + char b[4096]; + do { + len = read(fd, b, sizeof b); + } while (len < 0 && errno == EINTR); + if (len > 0) { + buffer_.append(&b[0], &b[len]); + } + return len; +} - if (io_mode == io_mode_t::buffer) { - debug(4, L"io_buffer_t::read: blocking read on fd %d", pipe_fd[0]); - while (1) { - char b[4096]; - long len = read_blocked(pipe_fd[0], b, 4096); - if (len == 0) { - break; - } else if (len < 0) { - // exec_read_io_buffer is only called on jobs that have exited, and will therefore - // never block. But a broken pipe seems to cause some flags to reset, causing the - // EOF flag to not be set. Therefore, EAGAIN is ignored and we exit anyway. - if (errno != EAGAIN) { - const wchar_t *fmt = - _(L"An error occured while reading output from code block on fd %d"); - debug(1, fmt, pipe_fd[0]); - wperror(L"io_buffer_t::read"); - } +void io_buffer_t::read_to_wouldblock() { + long len; + do { + len = read_some(); + } while (len > 0); + if (len < 0 && errno != EAGAIN) { + debug(1, _(L"An error occured while reading output from code block on fd %d"), read_.fd()); + wperror(L"io_buffer_t::read"); + } +} - break; - } else { - buffer_.append(&b[0], &b[len]); - } +bool io_buffer_t::try_read(unsigned long timeout_usec) { + struct timeval timeout; + timeout.tv_sec = 0; + timeout.tv_usec = timeout_usec; + int fd = read_.fd(); + assert(fd >= 0 && "Should have a valid fd"); + fd_set fds; + FD_ZERO(&fds); + FD_SET(fd, &fds); + int ret = select(fd + 1, &fds, nullptr, nullptr, &timeout); + if (ret < 0) { + // Treat EINTR as timeout. + if (errno != EINTR) { + debug(1, _(L"An error occured inside select on fd %d"), fd); + wperror(L"io_buffer_t::try_read"); } + return false; } + if (ret > 0) { + read_some(); + } + return ret > 0; } -bool io_buffer_t::avoid_conflicts_with_io_chain(const io_chain_t &ios) { - bool result = pipe_avoid_conflicts_with_io_chain(this->pipe_fd, ios); - if (!result) { - wperror(L"dup"); +shared_ptr io_bufferfill_t::create(const io_chain_t &conflicts, + size_t buffer_limit) { + // Construct our pipes. + auto pipes = make_autoclose_pipes(conflicts); + if (!pipes) { + return nullptr; } - return result; -} -shared_ptr io_buffer_t::create(int fd, const io_chain_t &conflicts, - size_t buffer_limit) { - bool success = true; - assert(fd >= 0); - shared_ptr buffer_redirect(new io_buffer_t(fd, buffer_limit)); - - if (exec_pipe(buffer_redirect->pipe_fd) == -1) { - debug(1, PIPE_ERROR); - wperror(L"pipe"); - success = false; - } else if (!buffer_redirect->avoid_conflicts_with_io_chain(conflicts)) { - // The above call closes the fds on error. - success = false; - } else if (make_fd_nonblocking(buffer_redirect->pipe_fd[0]) != 0) { + // Our buffer will read from the read end of the pipe. This end must be non-blocking. This is + // because we retain the write end of the pipe in this process (even after handing it off to a + // child process); therefore a read on the pipe may block forever. What we should do is arrange + // for the write end of the pipe to be closed at the right time; then the read could just block. + if (make_fd_nonblocking(pipes->read.fd())) { debug(1, PIPE_ERROR); wperror(L"fcntl"); - success = false; + return nullptr; } - if (!success) { - buffer_redirect.reset(); - } - return buffer_redirect; + // Our buffer gets the read end of the pipe; out_pipe gets the write end. + auto buffer = std::make_shared(std::move(pipes->read), buffer_limit); + return std::make_shared(std::move(pipes->write), buffer); } -io_buffer_t::~io_buffer_t() { - if (pipe_fd[0] >= 0) { - exec_close(pipe_fd[0]); - } - // Dont free fd for writing. This should already be free'd before calling exec_read_io_buffer on - // the buffer. -} +io_pipe_t::~io_pipe_t() = default; + +io_bufferfill_t::~io_bufferfill_t() = default; + +io_buffer_t::~io_buffer_t() = default; void io_chain_t::remove(const shared_ptr &element) { // See if you can guess why std::find doesn't work here. @@ -197,7 +202,7 @@ int move_fd_to_unused(int fd, const io_chain_t &io_chain, bool cloexec) { return new_fd; } -bool pipe_avoid_conflicts_with_io_chain(int fds[2], const io_chain_t &ios) { +static bool pipe_avoid_conflicts_with_io_chain(int fds[2], const io_chain_t &ios) { bool success = true; for (int i = 0; i < 2; i++) { fds[i] = move_fd_to_unused(fds[i], ios); @@ -221,6 +226,27 @@ bool pipe_avoid_conflicts_with_io_chain(int fds[2], const io_chain_t &ios) { return success; } +maybe_t make_autoclose_pipes(const io_chain_t &ios) { + int pipes[2] = {-1, -1}; + + if (pipe(pipes) < 0) { + debug(1, PIPE_ERROR); + wperror(L"pipe"); + return none(); + } + set_cloexec(pipes[0]); + set_cloexec(pipes[1]); + + if (!pipe_avoid_conflicts_with_io_chain(pipes, ios)) { + // The pipes are closed on failure here. + return none(); + } + autoclose_pipes_t result; + result.read = autoclose_fd_t(pipes[0]); + result.write = autoclose_fd_t(pipes[1]); + return {std::move(result)}; +} + /// Return the last IO for the given fd. shared_ptr io_chain_t::get_io_for_fd(int fd) const { size_t idx = this->size(); diff --git a/src/io.h b/src/io.h index 3081a240f..6599790ff 100644 --- a/src/io.h +++ b/src/io.h @@ -150,7 +150,7 @@ public: }; /// Describes what type of IO operation an io_data_t represents. -enum class io_mode_t { file, pipe, fd, buffer, close }; +enum class io_mode_t { file, pipe, fd, close, bufferfill }; /// Represents an FD redirection. class io_data_t { @@ -210,39 +210,83 @@ class io_file_t : public io_data_t { ~io_file_t() override { free((void *)filename_cstr); } }; +/// Represents (one end) of a pipe. class io_pipe_t : public io_data_t { - protected: - io_pipe_t(io_mode_t m, int f, bool i) : io_data_t(m, f), is_input(i) { - pipe_fd[0] = pipe_fd[1] = -1; - } + // The pipe's fd. Conceptually this is dup2'd to io_data_t::fd. + autoclose_fd_t pipe_fd_; + + /// Whether this is an input pipe. This is used only for informational purposes. + const bool is_input_; public: - int pipe_fd[2]; - const bool is_input; - void print() const override; - io_pipe_t(int f, bool i) : io_data_t(io_mode_t::pipe, f), is_input(i) { - pipe_fd[0] = pipe_fd[1] = -1; - } + io_pipe_t(int fd, bool is_input, autoclose_fd_t pipe_fd) + : io_data_t(io_mode_t::pipe, fd), pipe_fd_(std::move(pipe_fd)), is_input_(is_input) {} + + ~io_pipe_t(); + + int pipe_fd() const { return pipe_fd_.fd(); } }; +class io_buffer_t; class io_chain_t; + +/// Represents filling an io_buffer_t. Very similar to io_pipe_t. +/// Bufferfills always target stdout. +class io_bufferfill_t : public io_data_t { + /// Write end. The other end is connected to an io_buffer_t. + const autoclose_fd_t write_fd_; + + /// The receiving buffer. + const std::shared_ptr buffer_; + + public: + void print() const override; + + io_bufferfill_t(autoclose_fd_t write_fd, std::shared_ptr buffer) + : io_data_t(io_mode_t::bufferfill, STDOUT_FILENO), + write_fd_(std::move(write_fd)), + buffer_(std::move(buffer)) {} + + ~io_bufferfill_t(); + + std::shared_ptr buffer() const { return buffer_; } + + int write_fd() const { return write_fd_.fd(); } + + /// Create an io_bufferfill_t which, when written from, populates a buffer (also created). + /// \returns nullptr on failure, e.g. too many open fds. + /// + /// \param conflicts A set of IO redirections. The function ensures that any pipe it makes does + /// not conflict with an fd redirection in this list. + static shared_ptr create(const io_chain_t &conflicts, size_t buffer_limit = 0); +}; + class output_stream_t; -class io_buffer_t : public io_pipe_t { + +/// An io_buffer_t is a buffer which can populate itself by reading from an fd. +/// It is not an io_data_t. +class io_buffer_t { private: + /// fd from which to read. + autoclose_fd_t read_; + + /// Buffer storing what we have read. separated_buffer_t buffer_; - explicit io_buffer_t(int f, size_t limit) - : io_pipe_t(io_mode_t::buffer, f, false /* not input */), buffer_(limit) { + /// Read some. Append it to our buffer. + /// \return positive if we read, 0 on EOF, -1 on error. + long read_some(); + + public: + explicit io_buffer_t(autoclose_fd_t read, size_t limit) + : read_(std::move(read)), buffer_(limit) { // Explicitly reset the discard flag because we share this buffer. buffer_.reset_discard(); } - public: - void print() const override; - - ~io_buffer_t() override; + ~io_buffer_t(); /// Access the underlying buffer. const separated_buffer_t &buffer() const { return buffer_; } @@ -250,24 +294,16 @@ class io_buffer_t : public io_pipe_t { /// Function to append to the buffer. void append(const char *ptr, size_t count) { buffer_.append(ptr, ptr + count); } - /// Ensures that the pipes do not conflict with any fd redirections in the chain. - bool avoid_conflicts_with_io_chain(const io_chain_t &ios); + /// Read from input pipe until EOF or EAGAIN (i.e. would block). + void read_to_wouldblock(); - /// Close output pipe, and read from input pipe until eof. - void read(); + /// Read a bit, if our fd is readable, with the given timeout. + /// \return true if we read some, false on timeout. + bool try_read(unsigned long timeout_usec); /// Appends data from a given output_stream_t. /// Marks the receiver as discarded if the stream was discarded. void append_from_stream(const output_stream_t &stream); - - /// Create a io_mode_t::buffer type io redirection, complete with a pipe and a vector for - /// output. The default file descriptor used is STDOUT_FILENO for buffering. - /// - /// \param fd the fd that will be mapped in the child process, typically STDOUT_FILENO - /// \param conflicts A set of IO redirections. The function ensures that any pipe it makes does - /// not conflict with an fd redirection in this list. - static shared_ptr create(int fd, const io_chain_t &conflicts, - size_t buffer_limit = 0); }; class io_chain_t : public std::vector> { @@ -289,11 +325,18 @@ class io_chain_t : public std::vector> { shared_ptr io_chain_get(const io_chain_t &src, int fd); shared_ptr io_chain_get(io_chain_t &src, int fd); -/// Given a pair of fds, if an fd is used by the given io chain, duplicate that fd repeatedly until -/// we find one that does not conflict, or we run out of fds. Returns the new fds by reference, -/// closing the old ones. If we get an error, returns false (in which case both fds are closed and -/// set to -1). -bool pipe_avoid_conflicts_with_io_chain(int fds[2], const io_chain_t &ios); +/// Helper type returned from making autoclose pipes. +struct autoclose_pipes_t { + /// Read end of the pipe. + autoclose_fd_t read; + + /// Write end of the pipe. + autoclose_fd_t write; +}; +/// Call pipe(), populating autoclose fds, avoiding conflicts. +/// The pipes are marked CLO_EXEC. +/// \return pipes on success, none() on error. +maybe_t make_autoclose_pipes(const io_chain_t &ios); /// If the given fd is used by the io chain, duplicates it repeatedly until an fd not used in the io /// chain is found, or we run out. If we return a new fd or an error, closes the old one. diff --git a/src/proc.cpp b/src/proc.cpp index cdacd05bb..a58217f66 100644 --- a/src/proc.cpp +++ b/src/proc.cpp @@ -837,86 +837,24 @@ void proc_update_jiffies() { #endif /// The return value of select_try(), indicating IO readiness or an error -enum class select_try_t { - /// One or more fds have data ready for read - DATA_READY, - /// The timeout elapsed without any data becoming available for read +enum class block_receive_try_t { + /// There is no buffer to select on. + NO_BUFFER, + /// We have a block buffer, and we read some. + DATA_READ, + /// We have a block buffer, but we were unable to read any. TIMEOUT, - /// There were no FDs in the io chain for which to select on. - IOCHAIN_EMPTY, }; -/// Check if there are buffers associated with the job, and select on them for a while if available. -/// -/// \param j the job to test -/// \return the status of the select operation -static select_try_t select_try(job_t *j) { - fd_set fds; - int maxfd = -1; - - FD_ZERO(&fds); - - const io_chain_t chain = j->all_io_redirections(); - for (const auto &io : chain) { - if (io->io_mode == io_mode_t::buffer) { - auto io_pipe = static_cast(io.get()); - int fd = io_pipe->pipe_fd[0]; - FD_SET(fd, &fds); - maxfd = std::max(maxfd, fd); - debug(4, L"select_try on fd %d", fd); - } - } - - if (maxfd >= 0) { - struct timeval timeout; - - timeout.tv_sec = 0; - timeout.tv_usec = 10000; - - int retval = select(maxfd + 1, &fds, 0, 0, &timeout); - if (retval == 0) { - debug(4, L"select_try hit timeout"); - return select_try_t::TIMEOUT; - } - return select_try_t::DATA_READY; - } - - return select_try_t::IOCHAIN_EMPTY; -} - -/// Read from descriptors until they are empty. -/// -/// \param j the job to test -static void read_try(job_t *j) { - io_buffer_t *buff = NULL; - - // Find the last buffer, which is the one we want to read from. - const io_chain_t chain = j->all_io_redirections(); - for (size_t idx = 0; idx < chain.size(); idx++) { - io_data_t *d = chain.at(idx).get(); - if (d->io_mode == io_mode_t::buffer) { - buff = static_cast(d); - } - } - - if (buff) { - debug(4, L"proc::read_try('%ls')", j->command_wcstr()); - while (1) { - char b[BUFFER_SIZE]; - long len = read_blocked(buff->pipe_fd[0], b, BUFFER_SIZE); - if (len == 0) { - break; - } else if (len < 0) { - if (errno != EAGAIN) { - debug(1, _(L"An error occured while reading output from code block")); - wperror(L"read_try"); - } - break; - } else { - buff->append(b, len); - } +/// \return the last IO buffer in job j, or nullptr if none. +std::shared_ptr last_buffer(job_t *j) { + std::shared_ptr buff{}; + for (const auto &io : j->all_io_redirections()) { + if (io->io_mode == io_mode_t::bufferfill) { + buff = static_cast(io.get())->buffer(); } } + return buff; } // Return control of the terminal to a job's process group. restore_attrs is true if we are restoring @@ -1154,26 +1092,27 @@ void job_t::continue_job(bool send_sigcont) { // Wait for data to become available or the status of our own job to change while (!reader_exit_forced() && !is_stopped() && !is_completed()) { - auto result = select_try(this); read_attempted = true; - - switch (result) { - case select_try_t::DATA_READY: - // Read the data that we know is now available, then scan for finished processes - // but do not block. We don't block so long as we have IO to process, once the - // fd buffers are empty we'll block in the second case below. - read_try(this); + auto read_result = block_receive_try_t::NO_BUFFER; + if (auto buff = last_buffer(this)) { + const unsigned long SELECT_TIMEOUT_USEC = 10000; + bool did_read = buff->try_read(SELECT_TIMEOUT_USEC); + read_result = did_read ? block_receive_try_t::DATA_READ : block_receive_try_t::TIMEOUT; + } + switch (read_result) { + case block_receive_try_t::DATA_READ: + // We read some data. process_mark_finished_children(false); break; - case select_try_t::TIMEOUT: - // No FDs are ready. Look for finished processes instead. + case block_receive_try_t::TIMEOUT: + // We read some data or timed out. Poll for finished processes. debug(4, L"select_try: no fds returned valid data within the timeout" ); process_mark_finished_children(block_on_fg); break; - case select_try_t::IOCHAIN_EMPTY: - // There were no IO fds to select on. + case block_receive_try_t::NO_BUFFER: + // We are not populating a buffer. debug(4, L"select_try: no IO fds" ); process_mark_finished_children(true); @@ -1195,7 +1134,9 @@ void job_t::continue_job(bool send_sigcont) { // `echo` calls were sometimes having their output combined with the `set_color` calls // in the wrong order! if (!read_attempted) { - read_try(this); + if (auto buff = last_buffer(this)) { + buff->read_to_wouldblock(); + } } // Set $status only if we are in the foreground and the last process in the job has diff --git a/src/redirection.cpp b/src/redirection.cpp index 8aed4f9a4..1cce609df 100644 --- a/src/redirection.cpp +++ b/src/redirection.cpp @@ -73,15 +73,17 @@ maybe_t dup2_list_t::resolve_chain(const io_chain_t &io_chain) { break; } - case io_mode_t::buffer: case io_mode_t::pipe: { const io_pipe_t *io = static_cast(io_ref.get()); - // If write_pipe_idx is 0, it means we're connecting to the read end (first pipe - // fd). If it's 1, we're connecting to the write end (second pipe fd). - unsigned int write_pipe_idx = (io->is_input ? 0 : 1); - result.add_dup2(io->pipe_fd[write_pipe_idx], io->fd); - if (io->pipe_fd[0] >= 0) result.add_close(io->pipe_fd[0]); - if (io->pipe_fd[1] >= 0) result.add_close(io->pipe_fd[1]); + result.add_dup2(io->pipe_fd(), io->fd); + result.add_close(io->pipe_fd()); + break; + } + + case io_mode_t::bufferfill: { + const io_bufferfill_t *io = static_cast(io_ref.get()); + result.add_dup2(io->write_fd(), io->fd); + result.add_close(io->write_fd()); break; } }