Introduce the notion of a deferred process

In a job, a deferred process is the last fish internal process which pipes
to an external command. Execute the deferred process last; this will allow
for streaming its output.
This commit is contained in:
ridiculousfish 2019-03-24 14:27:23 -07:00
parent 165c82e68a
commit 3bbee06248
2 changed files with 95 additions and 41 deletions

View file

@ -861,35 +861,14 @@ static bool exec_block_or_func_process(parser_t &parser, std::shared_ptr<job_t>
return true;
}
/// Executes a process \p in job \j, using the read pipe \p pipe_current_read.
/// If the process pipes to a command, the read end of the created pipe is returned in
/// out_pipe_next_read. \returns true on success, false on exec error.
/// Executes a process \p in job \j, using the pipes \p pipes (which may have invalid fds if this is
/// the first or last process).
/// deferred_pipes represents the pipes from our deferred process; if set ensure they get closed in
/// any child.
/// \returns true on success, false on exec error.
static bool exec_process_in_job(parser_t &parser, process_t *p, std::shared_ptr<job_t> j,
autoclose_fd_t pipe_current_read,
autoclose_fd_t *out_pipe_next_read, const io_chain_t &all_ios,
size_t stdout_read_limit) {
// The pipe this command will write to (if any).
shared_ptr<io_pipe_t> pipe_write;
// The pipe this command will read from (if any).
shared_ptr<io_pipe_t> pipe_read;
// See if we need a pipe for the next command.
const bool pipes_to_next_command = !p->is_last_in_job;
if (pipes_to_next_command) {
// Construct our pipes.
auto local_pipes = make_autoclose_pipes(all_ios);
if (!local_pipes) {
debug(1, PIPE_ERROR);
wperror(L"pipe");
job_mark_process_as_failed(j, p);
return false;
}
pipe_write = std::make_shared<io_pipe_t>(p->pipe_write_fd, false /* not input */,
std::move(local_pipes->write));
*out_pipe_next_read = std::move(local_pipes->read);
}
autoclose_pipes_t pipes, const io_chain_t &all_ios,
const autoclose_pipes_t &deferred_pipes, size_t stdout_read_limit) {
// The write pipe (destined for stdout) needs to occur before redirections. For example,
// with a redirection like this:
//
@ -920,20 +899,30 @@ static bool exec_process_in_job(parser_t &parser, process_t *p, std::shared_ptr<
// The IO chain for this process.
io_chain_t process_net_io_chain = j->block_io_chain();
if (pipe_write) {
process_net_io_chain.push_back(pipe_write);
if (pipes.write.valid()) {
process_net_io_chain.push_back(std::make_shared<io_pipe_t>(
p->pipe_write_fd, false /* not input */, std::move(pipes.write)));
}
// The explicit IO redirections associated with the process.
process_net_io_chain.append(p->io_chain());
// Read pipe goes last.
if (pipe_current_read.valid()) {
pipe_read = std::make_shared<io_pipe_t>(STDIN_FILENO, true /* input */,
std::move(pipe_current_read));
shared_ptr<io_pipe_t> pipe_read{};
if (pipes.read.valid()) {
pipe_read =
std::make_shared<io_pipe_t>(STDIN_FILENO, true /* input */, std::move(pipes.read));
process_net_io_chain.push_back(pipe_read);
}
// If we have stashed pipes, make sure those get closed in the child.
for (const autoclose_fd_t *afd : {&deferred_pipes.read, &deferred_pipes.write}) {
if (afd->valid()) {
process_net_io_chain.push_back(std::make_shared<io_close_t>(afd->fd()));
}
}
// This call is used so the global environment variable array is regenerated, if needed,
// before the fork. That way, we avoid a lot of duplicate work where EVERY child would need
// to generate it, since that result would not get written back to the parent. This call
@ -988,6 +977,31 @@ static bool exec_process_in_job(parser_t &parser, process_t *p, std::shared_ptr<
return true;
}
// Do we have a fish internal process that pipes into a real process? If so, we are going to
// launch it last (if there's more than one, just the last one). That is to prevent buffering
// from blocking further processes. See #1396.
// Example:
// for i in (seq 1 5); sleep 1; echo $i; end | cat
// This should show the output as it comes, not buffer until the end.
// Any such process (only one per job) will be called the "deferred" process.
static process_t *get_deferred_process(const shared_ptr<job_t> &j) {
process_t *last_internal = nullptr;
for (const auto &p : j->processes) {
if (p->type == process_type_t::exec) {
// No tail reordering for execs.
return nullptr;
} else if (p->type != process_type_t::external) {
last_internal = p.get();
}
}
if (last_internal && !last_internal->is_last_in_job) {
// This is the last internal process, and it pipes to an external process.
return last_internal;
} else {
return nullptr;
}
}
bool exec_job(parser_t &parser, shared_ptr<job_t> j) {
assert(j && "null job_t passed to exec_job!");
@ -1011,15 +1025,17 @@ bool exec_job(parser_t &parser, shared_ptr<job_t> j) {
}
size_t stdout_read_limit = 0;
const io_chain_t all_ios = j->all_io_redirections();
io_chain_t all_ios = j->all_io_redirections();
// The read limit is dictated by the last bufferfill.
for (auto &io : all_ios) {
if ((io->io_mode == io_mode_t::bufferfill)) {
// The read limit is dictated by the last bufferfill.
const auto *bf = static_cast<io_bufferfill_t *>(io.get());
stdout_read_limit = bf->buffer()->read_limit();
}
}
// Handle an exec call.
if (j->processes.front()->type == process_type_t::exec) {
internal_exec(parser.vars(), j.get(), all_ios);
// internal_exec only returns if it failed to set up redirections.
@ -1029,6 +1045,10 @@ bool exec_job(parser_t &parser, shared_ptr<job_t> j) {
return false;
}
// Get the deferred process, if any. We will have to remember its pipes.
autoclose_pipes_t deferred_pipes;
process_t *const deferred_process = get_deferred_process(j);
// This loop loops over every process_t in the job, starting it as appropriate. This turns out
// to be rather complex, since a process_t can be one of many rather different things.
//
@ -1041,16 +1061,46 @@ bool exec_job(parser_t &parser, shared_ptr<job_t> j) {
// 3. The pipe that the next process should read from (courtesy of us)
//
autoclose_fd_t pipe_next_read;
for (const auto &unique_p : j->processes) {
autoclose_fd_t current_read = std::move(pipe_next_read);
if (!exec_process_in_job(parser, unique_p.get(), j, std::move(current_read),
&pipe_next_read, all_ios, stdout_read_limit)) {
exec_error = true;
break;
for (const auto &p : j->processes) {
// proc_pipes is the pipes applied to this process. That is, it is the read end
// containing the output of the previous process (if any), plus the write end that will
// output to the next process (if any).
autoclose_pipes_t proc_pipes;
proc_pipes.read = std::move(pipe_next_read);
if (!p->is_last_in_job) {
auto pipes = make_autoclose_pipes(all_ios);
if (!pipes) {
debug(1, PIPE_ERROR);
wperror(L"pipe");
job_mark_process_as_failed(j, p.get());
exec_error = true;
break;
}
pipe_next_read = std::move(pipes->read);
proc_pipes.write = std::move(pipes->write);
}
if (p.get() == deferred_process) {
deferred_pipes = std::move(proc_pipes);
} else {
if (!exec_process_in_job(parser, p.get(), j, std::move(proc_pipes), all_ios,
deferred_pipes, stdout_read_limit)) {
exec_error = true;
break;
}
}
}
pipe_next_read.close();
// Now execute any deferred process.
if (!exec_error && deferred_process) {
assert(deferred_pipes.write.valid() && "Deferred process should always have a write pipe");
if (!exec_process_in_job(parser, deferred_process, j, std::move(deferred_pipes), all_ios,
{}, stdout_read_limit)) {
exec_error = true;
}
}
debug(3, L"Created job %d from command '%ls' with pgrp %d", j->job_id, j->command_wcstr(),
j->pgid);

View file

@ -352,6 +352,10 @@ struct autoclose_pipes_t {
/// Write end of the pipe.
autoclose_fd_t write;
autoclose_pipes_t() = default;
autoclose_pipes_t(autoclose_fd_t r, autoclose_fd_t w)
: read(std::move(r)), write(std::move(w)) {}
};
/// Call pipe(), populating autoclose fds, avoiding conflicts.
/// The pipes are marked CLO_EXEC.