diff --git a/src/exec.cpp b/src/exec.cpp index d22e81334..4775dc27b 100644 --- a/src/exec.cpp +++ b/src/exec.cpp @@ -861,35 +861,14 @@ static bool exec_block_or_func_process(parser_t &parser, std::shared_ptr return true; } -/// Executes a process \p in job \j, using the read pipe \p pipe_current_read. -/// If the process pipes to a command, the read end of the created pipe is returned in -/// out_pipe_next_read. \returns true on success, false on exec error. +/// Executes a process \p in job \j, using the pipes \p pipes (which may have invalid fds if this is +/// the first or last process). +/// deferred_pipes represents the pipes from our deferred process; if set ensure they get closed in +/// any child. +/// \returns true on success, false on exec error. static bool exec_process_in_job(parser_t &parser, process_t *p, std::shared_ptr j, - autoclose_fd_t pipe_current_read, - autoclose_fd_t *out_pipe_next_read, const io_chain_t &all_ios, - size_t stdout_read_limit) { - // The pipe this command will write to (if any). - shared_ptr pipe_write; - // The pipe this command will read from (if any). - shared_ptr pipe_read; - - // See if we need a pipe for the next command. - const bool pipes_to_next_command = !p->is_last_in_job; - if (pipes_to_next_command) { - // Construct our pipes. - auto local_pipes = make_autoclose_pipes(all_ios); - if (!local_pipes) { - debug(1, PIPE_ERROR); - wperror(L"pipe"); - job_mark_process_as_failed(j, p); - return false; - } - - pipe_write = std::make_shared(p->pipe_write_fd, false /* not input */, - std::move(local_pipes->write)); - *out_pipe_next_read = std::move(local_pipes->read); - } - + autoclose_pipes_t pipes, const io_chain_t &all_ios, + const autoclose_pipes_t &deferred_pipes, size_t stdout_read_limit) { // The write pipe (destined for stdout) needs to occur before redirections. For example, // with a redirection like this: // @@ -920,20 +899,30 @@ static bool exec_process_in_job(parser_t &parser, process_t *p, std::shared_ptr< // The IO chain for this process. io_chain_t process_net_io_chain = j->block_io_chain(); - if (pipe_write) { - process_net_io_chain.push_back(pipe_write); + + if (pipes.write.valid()) { + process_net_io_chain.push_back(std::make_shared( + p->pipe_write_fd, false /* not input */, std::move(pipes.write))); } // The explicit IO redirections associated with the process. process_net_io_chain.append(p->io_chain()); // Read pipe goes last. - if (pipe_current_read.valid()) { - pipe_read = std::make_shared(STDIN_FILENO, true /* input */, - std::move(pipe_current_read)); + shared_ptr pipe_read{}; + if (pipes.read.valid()) { + pipe_read = + std::make_shared(STDIN_FILENO, true /* input */, std::move(pipes.read)); process_net_io_chain.push_back(pipe_read); } + // If we have stashed pipes, make sure those get closed in the child. + for (const autoclose_fd_t *afd : {&deferred_pipes.read, &deferred_pipes.write}) { + if (afd->valid()) { + process_net_io_chain.push_back(std::make_shared(afd->fd())); + } + } + // This call is used so the global environment variable array is regenerated, if needed, // before the fork. That way, we avoid a lot of duplicate work where EVERY child would need // to generate it, since that result would not get written back to the parent. This call @@ -988,6 +977,31 @@ static bool exec_process_in_job(parser_t &parser, process_t *p, std::shared_ptr< return true; } +// Do we have a fish internal process that pipes into a real process? If so, we are going to +// launch it last (if there's more than one, just the last one). That is to prevent buffering +// from blocking further processes. See #1396. +// Example: +// for i in (seq 1 5); sleep 1; echo $i; end | cat +// This should show the output as it comes, not buffer until the end. +// Any such process (only one per job) will be called the "deferred" process. +static process_t *get_deferred_process(const shared_ptr &j) { + process_t *last_internal = nullptr; + for (const auto &p : j->processes) { + if (p->type == process_type_t::exec) { + // No tail reordering for execs. + return nullptr; + } else if (p->type != process_type_t::external) { + last_internal = p.get(); + } + } + if (last_internal && !last_internal->is_last_in_job) { + // This is the last internal process, and it pipes to an external process. + return last_internal; + } else { + return nullptr; + } +} + bool exec_job(parser_t &parser, shared_ptr j) { assert(j && "null job_t passed to exec_job!"); @@ -1011,15 +1025,17 @@ bool exec_job(parser_t &parser, shared_ptr j) { } size_t stdout_read_limit = 0; - const io_chain_t all_ios = j->all_io_redirections(); + io_chain_t all_ios = j->all_io_redirections(); + + // The read limit is dictated by the last bufferfill. for (auto &io : all_ios) { if ((io->io_mode == io_mode_t::bufferfill)) { - // The read limit is dictated by the last bufferfill. const auto *bf = static_cast(io.get()); stdout_read_limit = bf->buffer()->read_limit(); } } + // Handle an exec call. if (j->processes.front()->type == process_type_t::exec) { internal_exec(parser.vars(), j.get(), all_ios); // internal_exec only returns if it failed to set up redirections. @@ -1029,6 +1045,10 @@ bool exec_job(parser_t &parser, shared_ptr j) { return false; } + // Get the deferred process, if any. We will have to remember its pipes. + autoclose_pipes_t deferred_pipes; + process_t *const deferred_process = get_deferred_process(j); + // This loop loops over every process_t in the job, starting it as appropriate. This turns out // to be rather complex, since a process_t can be one of many rather different things. // @@ -1041,16 +1061,46 @@ bool exec_job(parser_t &parser, shared_ptr j) { // 3. The pipe that the next process should read from (courtesy of us) // autoclose_fd_t pipe_next_read; - for (const auto &unique_p : j->processes) { - autoclose_fd_t current_read = std::move(pipe_next_read); - if (!exec_process_in_job(parser, unique_p.get(), j, std::move(current_read), - &pipe_next_read, all_ios, stdout_read_limit)) { - exec_error = true; - break; + for (const auto &p : j->processes) { + // proc_pipes is the pipes applied to this process. That is, it is the read end + // containing the output of the previous process (if any), plus the write end that will + // output to the next process (if any). + autoclose_pipes_t proc_pipes; + proc_pipes.read = std::move(pipe_next_read); + if (!p->is_last_in_job) { + auto pipes = make_autoclose_pipes(all_ios); + if (!pipes) { + debug(1, PIPE_ERROR); + wperror(L"pipe"); + job_mark_process_as_failed(j, p.get()); + exec_error = true; + break; + } + pipe_next_read = std::move(pipes->read); + proc_pipes.write = std::move(pipes->write); + } + + if (p.get() == deferred_process) { + deferred_pipes = std::move(proc_pipes); + } else { + if (!exec_process_in_job(parser, p.get(), j, std::move(proc_pipes), all_ios, + deferred_pipes, stdout_read_limit)) { + exec_error = true; + break; + } } } pipe_next_read.close(); + // Now execute any deferred process. + if (!exec_error && deferred_process) { + assert(deferred_pipes.write.valid() && "Deferred process should always have a write pipe"); + if (!exec_process_in_job(parser, deferred_process, j, std::move(deferred_pipes), all_ios, + {}, stdout_read_limit)) { + exec_error = true; + } + } + debug(3, L"Created job %d from command '%ls' with pgrp %d", j->job_id, j->command_wcstr(), j->pgid); diff --git a/src/io.h b/src/io.h index 4017af474..e72a00291 100644 --- a/src/io.h +++ b/src/io.h @@ -352,6 +352,10 @@ struct autoclose_pipes_t { /// Write end of the pipe. autoclose_fd_t write; + + autoclose_pipes_t() = default; + autoclose_pipes_t(autoclose_fd_t r, autoclose_fd_t w) + : read(std::move(r)), write(std::move(w)) {} }; /// Call pipe(), populating autoclose fds, avoiding conflicts. /// The pipes are marked CLO_EXEC.