Stop storing block_io in job_t

Prior to this fix, a job would hold onto any IO redirections from its
parent. For example:

    begin
        echo a
    end < file.txt

The "echo a" job would hold a reference to the I/O redirection.
The problem is that jobs then extend the life of pipes until the job is
cleaned up. This can prevent pipes from closing, leading to hangs.

Fix this by not storing the block IO; this ensures that jobs do not
prolong the life of pipes.

Fixes #6397
This commit is contained in:
ridiculousfish 2019-12-11 16:34:20 -08:00
parent 16dc606001
commit c0b3be9fb4
6 changed files with 52 additions and 48 deletions

View file

@ -827,10 +827,11 @@ static proc_performer_t get_performer_for_process(process_t *p, const job_t *job
const io_chain_t &io_chain) { const io_chain_t &io_chain) {
assert((p->type == process_type_t::function || p->type == process_type_t::block_node) && assert((p->type == process_type_t::function || p->type == process_type_t::block_node) &&
"Unexpected process type"); "Unexpected process type");
// Construct a lineage, starting from the job's lineage. // Make a lineage for our children.
job_lineage_t lineage = job->lineage(); job_lineage_t lineage;
lineage.block_io = io_chain;
lineage.parent_pgid = (job->pgid == INVALID_PID ? none() : maybe_t<pid_t>(job->pgid)); lineage.parent_pgid = (job->pgid == INVALID_PID ? none() : maybe_t<pid_t>(job->pgid));
lineage.block_io = io_chain;
lineage.root_constructed = job->root_constructed;
if (p->type == process_type_t::block_node) { if (p->type == process_type_t::block_node) {
const parsed_source_ref_t &source = p->block_node_source; const parsed_source_ref_t &source = p->block_node_source;
@ -938,9 +939,9 @@ static bool exec_block_or_func_process(parser_t &parser, std::shared_ptr<job_t>
/// in any child. If \p is_deferred_run is true, then this is a deferred run; this affects how /// in any child. If \p is_deferred_run is true, then this is a deferred run; this affects how
/// certain buffering works. \returns true on success, false on exec error. /// certain buffering works. \returns true on success, false on exec error.
static bool exec_process_in_job(parser_t &parser, process_t *p, std::shared_ptr<job_t> j, static bool exec_process_in_job(parser_t &parser, process_t *p, std::shared_ptr<job_t> j,
autoclose_pipes_t pipes, const io_chain_t &all_ios, const io_chain_t &block_io, autoclose_pipes_t pipes,
const autoclose_pipes_t &deferred_pipes, size_t stdout_read_limit, const io_chain_t &all_ios, const autoclose_pipes_t &deferred_pipes,
bool is_deferred_run = false) { size_t stdout_read_limit, bool is_deferred_run = false) {
// The write pipe (destined for stdout) needs to occur before redirections. For example, // The write pipe (destined for stdout) needs to occur before redirections. For example,
// with a redirection like this: // with a redirection like this:
// //
@ -976,7 +977,7 @@ static bool exec_process_in_job(parser_t &parser, process_t *p, std::shared_ptr<
} }
// The IO chain for this process. // The IO chain for this process.
io_chain_t process_net_io_chain = j->block_io_chain(); io_chain_t process_net_io_chain = block_io;
if (pipes.write.valid()) { if (pipes.write.valid()) {
process_net_io_chain.push_back(std::make_shared<io_pipe_t>( process_net_io_chain.push_back(std::make_shared<io_pipe_t>(
p->pipe_write_fd, false /* not input */, std::move(pipes.write))); p->pipe_write_fd, false /* not input */, std::move(pipes.write)));
@ -1119,7 +1120,7 @@ static bool should_claim_process_group_for_job(const shared_ptr<job_t> &j) {
DIE("unreachable"); DIE("unreachable");
} }
bool exec_job(parser_t &parser, shared_ptr<job_t> j) { bool exec_job(parser_t &parser, shared_ptr<job_t> j, const job_lineage_t &lineage) {
assert(j && "null job_t passed to exec_job!"); assert(j && "null job_t passed to exec_job!");
// Set to true if something goes wrong while executing the job, in which case the cleanup // Set to true if something goes wrong while executing the job, in which case the cleanup
@ -1135,9 +1136,10 @@ bool exec_job(parser_t &parser, shared_ptr<job_t> j) {
const bool reclaim_foreground_pgrp = (tcgetpgrp(STDIN_FILENO) == getpgrp()); const bool reclaim_foreground_pgrp = (tcgetpgrp(STDIN_FILENO) == getpgrp());
// If our lineage indicates a pgid, share it. // If our lineage indicates a pgid, share it.
if (auto parent_pgid = j->lineage().parent_pgid) { if (lineage.parent_pgid.has_value()) {
assert(*parent_pgid != INVALID_PID && "parent pgid should be none, not INVALID_PID"); assert(*lineage.parent_pgid != INVALID_PID &&
j->pgid = *parent_pgid; "parent pgid should be none, not INVALID_PID");
j->pgid = *lineage.parent_pgid;
j->mut_flags().job_control = true; j->mut_flags().job_control = true;
} }
@ -1146,7 +1148,12 @@ bool exec_job(parser_t &parser, shared_ptr<job_t> j) {
} }
const size_t stdout_read_limit = parser.libdata().read_limit; const size_t stdout_read_limit = parser.libdata().read_limit;
io_chain_t all_ios = j->all_io_redirections();
// Get the list of all IOs so we can ensure our pipes do not conflict.
io_chain_t all_ios = lineage.block_io;
for (const auto &p : j->processes) {
all_ios.append(p->io_chain());
}
// Handle an exec call. // Handle an exec call.
if (j->processes.front()->type == process_type_t::exec) { if (j->processes.front()->type == process_type_t::exec) {
@ -1196,8 +1203,8 @@ bool exec_job(parser_t &parser, shared_ptr<job_t> j) {
if (p.get() == deferred_process) { if (p.get() == deferred_process) {
deferred_pipes = std::move(proc_pipes); deferred_pipes = std::move(proc_pipes);
} else { } else {
if (!exec_process_in_job(parser, p.get(), j, std::move(proc_pipes), all_ios, if (!exec_process_in_job(parser, p.get(), j, lineage.block_io, std::move(proc_pipes),
deferred_pipes, stdout_read_limit)) { all_ios, deferred_pipes, stdout_read_limit)) {
exec_error = true; exec_error = true;
break; break;
} }
@ -1208,8 +1215,8 @@ bool exec_job(parser_t &parser, shared_ptr<job_t> j) {
// Now execute any deferred process. // Now execute any deferred process.
if (!exec_error && deferred_process) { if (!exec_error && deferred_process) {
assert(deferred_pipes.write.valid() && "Deferred process should always have a write pipe"); assert(deferred_pipes.write.valid() && "Deferred process should always have a write pipe");
if (!exec_process_in_job(parser, deferred_process, j, std::move(deferred_pipes), all_ios, if (!exec_process_in_job(parser, deferred_process, j, lineage.block_io,
{}, stdout_read_limit, true)) { std::move(deferred_pipes), all_ios, {}, stdout_read_limit, true)) {
exec_error = true; exec_error = true;
} }
} }

View file

@ -13,8 +13,9 @@
/// Execute the processes specified by \p j in the parser \p. /// Execute the processes specified by \p j in the parser \p.
class job_t; class job_t;
struct job_lineage_t;
class parser_t; class parser_t;
bool exec_job(parser_t &parser, std::shared_ptr<job_t> j); bool exec_job(parser_t &parser, std::shared_ptr<job_t> j, const job_lineage_t &lineage);
/// Evaluate the expression cmd in a subshell, add the outputs into the list l. On return, the /// Evaluate the expression cmd in a subshell, add the outputs into the list l. On return, the
/// status flag as returned bu \c proc_gfet_last_status will not be changed. /// status flag as returned bu \c proc_gfet_last_status will not be changed.

View file

@ -1348,7 +1348,7 @@ parse_execution_result_t parse_execution_context_t::run_1_job(tnode_t<g::job> jo
} }
// Actually execute the job. // Actually execute the job.
if (!exec_job(*this->parser, job)) { if (!exec_job(*this->parser, job, lineage)) {
remove_job(*this->parser, job.get()); remove_job(*this->parser, job.get());
} }

View file

@ -168,7 +168,7 @@ bool job_t::should_report_process_exits() const {
return false; return false;
} }
bool job_t::job_chain_is_fully_constructed() const { return *lineage().root_constructed; } bool job_t::job_chain_is_fully_constructed() const { return *root_constructed; }
bool job_t::signal(int signal) { bool job_t::signal(int signal) {
// Presumably we are distinguishing between the two cases below because we do // Presumably we are distinguishing between the two cases below because we do
@ -269,24 +269,12 @@ void process_t::check_generations_before_launch() {
} }
job_t::job_t(job_id_t job_id, const properties_t &props, job_lineage_t lineage) job_t::job_t(job_id_t job_id, const properties_t &props, job_lineage_t lineage)
: properties(props), job_lineage(std::move(lineage)), job_id(job_id) { : properties(props),
if (!job_lineage.root_constructed) { job_id(job_id),
// We are the root job, share our constructed pointer. root_constructed(lineage.root_constructed ? lineage.root_constructed : this->constructed) {}
job_lineage.root_constructed = this->constructed;
}
}
job_t::~job_t() { release_job_id(job_id); } job_t::~job_t() { release_job_id(job_id); }
/// Return all the IO redirections. Start with the block IO, then walk over the processes.
io_chain_t job_t::all_io_redirections() const {
io_chain_t result = this->block_io_chain();
for (const process_ptr_t &p : this->processes) {
result.append(p->io_chain());
}
return result;
}
void job_t::mark_constructed() { void job_t::mark_constructed() {
assert(!is_constructed() && "Job was already constructed"); assert(!is_constructed() && "Job was already constructed");
*constructed = true; *constructed = true;

View file

@ -266,7 +266,8 @@ void release_job_id(job_id_t jid);
/// Information about where a job comes from. /// Information about where a job comes from.
/// This should be safe to copy across threads; in particular that means this cannot contain a /// This should be safe to copy across threads; in particular that means this cannot contain a
/// job_t. /// job_t. It is also important that job_t not contain this: because it stores block IO, it will
/// extend the life of the IO which may prevent pipes from closing in a timely manner. See #6397.
struct job_lineage_t { struct job_lineage_t {
/// The pgid of the parental job. /// The pgid of the parental job.
/// If our job is "nested" as part of a function or block execution, and that function or block /// If our job is "nested" as part of a function or block execution, and that function or block
@ -308,9 +309,6 @@ class job_t {
/// messages about job status on the terminal. /// messages about job status on the terminal.
wcstring command_str; wcstring command_str;
// The lineage associated with the job.
job_lineage_t job_lineage;
// No copying. // No copying.
job_t(const job_t &rhs) = delete; job_t(const job_t &rhs) = delete;
void operator=(const job_t &) = delete; void operator=(const job_t &) = delete;
@ -387,6 +385,9 @@ class job_t {
const std::shared_ptr<relaxed_atomic_bool_t> constructed = const std::shared_ptr<relaxed_atomic_bool_t> constructed =
std::make_shared<relaxed_atomic_bool_t>(false); std::make_shared<relaxed_atomic_bool_t>(false);
/// Whether the root job is constructed; this may share a reference with 'constructed'.
const std::shared_ptr<relaxed_atomic_bool_t> root_constructed;
/// Flags associated with the job. /// Flags associated with the job.
struct flags_t { struct flags_t {
/// Whether the user has been told about stopped job. /// Whether the user has been told about stopped job.
@ -417,16 +418,6 @@ class job_t {
/// \return if this job should own the terminal when it runs. /// \return if this job should own the terminal when it runs.
bool should_claim_terminal() const { return properties.wants_terminal && is_foreground(); } bool should_claim_terminal() const { return properties.wants_terminal && is_foreground(); }
/// \return the job lineage.
const job_lineage_t &lineage() const { return job_lineage; }
/// Returns the block IO redirections associated with the job. These are things like the IO
/// redirections associated with the begin...end statement.
const io_chain_t &block_io_chain() const { return lineage().block_io; }
/// Fetch all the IO redirections associated with the job.
io_chain_t all_io_redirections() const;
/// Mark this job as constructed. The job must not have previously been marked as constructed. /// Mark this job as constructed. The job must not have previously been marked as constructed.
void mark_constructed(); void mark_constructed();

View file

@ -0,0 +1,17 @@
#RUN: %fish %s
# See #6397
function f --on-signal USR1
begin
echo a
echo b
end | while read -l line
echo $line
end
end
kill -USR1 $fish_pid
#CHECK: a
#CHECK: b