Associate external commands in functions with extant pgrps

When a function is encountered by exec_job, a new context is created for
its execution from the ground up, with a new job and all, ultimately
resulting in a recursive call to exec_job from the same (main) thread.

Since each time exec_job encounters a new job with external commands
that needs terminal control it creates a new pgrp and gives it control
of the terminal (tcsetpgrp & co), this effectively takes control away
from the previously spawned external commands which may be (and likely
are) expecting to still have terminal access.

This commit attempts to detect when such a situation arises by handling
recursive calls to exec_job (which can only happen if the pipeline
included a function) by borrowing the pgrp from the (necessarily still
active) parent job and spawning new external commands into it.

When a parent job spawns new jobs due to the evaluation of a new
function (which shouldn't be the case in the first place), we end up
with two distinct jobs sharing one pgrp (to fix #3952). This can lead to
early termination of a pgrp if finished parent job children are reaped
before future processes in either the parent or future child jobs can
join it.

While the parent job is under construction, require that waitpid(2)
calls for the child job be done by process id and not job pgrp.

Closes #3952.
This commit is contained in:
Mahmoud Al-Qudsi 2018-10-08 12:44:47 -05:00
parent 419d7a5138
commit 4d3b56c151
3 changed files with 110 additions and 21 deletions

View file

@ -17,6 +17,7 @@
#include <string.h> #include <string.h>
#include <sys/wait.h> #include <sys/wait.h>
#include <unistd.h> #include <unistd.h>
#include <stack>
#include <algorithm> #include <algorithm>
#include <functional> #include <functional>
@ -610,7 +611,8 @@ static bool handle_builtin_output(job_t *j, process_t *p, io_chain_t *io_chain,
if (fork_was_skipped) { if (fork_was_skipped) {
p->completed = 1; p->completed = 1;
if (p->is_last_in_job) { if (p->is_last_in_job) {
debug(4, L"Set status of job %d (%ls) to %d using short circuit", j->job_id, j->preview().c_str(), p->status); debug(4, L"Set status of job %d (%ls) to %d using short circuit", j->job_id,
j->preview().c_str(), p->status);
int status = p->status; int status = p->status;
proc_set_last_status(j->get_flag(job_flag_t::NEGATE) ? (!status) : status); proc_set_last_status(j->get_flag(job_flag_t::NEGATE) ? (!status) : status);
@ -996,6 +998,47 @@ void exec_job(parser_t &parser, shared_ptr<job_t> j) {
return; return;
} }
// Unfortunately `exec_job()` is called recursively when functions are encountered, with a new
// job id (and therefore pgrp) each time, but always from the main thread. This breaks terminal
// control since new pgrps take terminal control away from commands upstream in a different pgrp.
// We try to work around this with a heuristic to determine whether to reuse the same pgrp as the
// last-spawned pgrp if part of an existing job pipeline (keeping in mind that new jobs are
// recursively started for both foreground and background jobs, and that a function can expand
// to more than one external command, one (or more?) of which may want to read from upstream or
// write to downstream of a pipe.
// By keeping track of (select) "jobs in flight" we can try to marshall newly-created external
// processes into existing pgrps. Fixes #3952.
// This is a HACK and the correct solution would be to pass the active job through the pipeline
// to the newly established parser context so that the funtion as parsed and evaluated can be
// directly associated with this job and not a new one, BUT sometimes functions NEED to start a
// new job. This HACK seeks a compromise by letting functions trigger the unilateral creation of
// a new job, but reusing the "parent" job's existing pgrp in case of terminal control.
static std::stack<decltype(j)> active_jobs;
// There's an assumption that there's a one-to-one mapping between jobs under job control and
// pgrps. When we share a parent job's pgrp, we risk reaping its processes before it is fully
// constructed, causing later setpgrp(2) calls to fails (#5219). While the parent job is still
// under construction, child jobs have job_flag_t::WAIT_BY_PROCESS set to prevent early repaing.
// We store them here until the parent job is constructed, at which point it unsets this flag.
static std::stack<decltype(j)> child_jobs;
auto parent_job = active_jobs.empty() ? nullptr : active_jobs.top();
bool job_pushed = false;
if (j->get_flag(job_flag_t::TERMINAL) && j->get_flag(job_flag_t::JOB_CONTROL)) {
// This will be popped before this job leaves exec_job
active_jobs.push(j);
job_pushed = true;
}
if (parent_job && j->processes.front()->type == EXTERNAL) {
if (parent_job->pgid != INVALID_PID) {
j->pgid = parent_job->pgid;
j->set_flag(job_flag_t::JOB_CONTROL, true);
j->set_flag(job_flag_t::NESTED, true);
j->set_flag(job_flag_t::WAIT_BY_PROCESS, true);
child_jobs.push(j);
}
}
// Verify that all IO_BUFFERs are output. We used to support a (single, hacked-in) magical input // Verify that all IO_BUFFERs are output. We used to support a (single, hacked-in) magical input
// IO_BUFFER used by fish_pager, but now the claim is that there are no more clients and it is // IO_BUFFER used by fish_pager, but now the claim is that there are no more clients and it is
// removed. This assertion double-checks that. // removed. This assertion double-checks that.
@ -1053,8 +1096,8 @@ void exec_job(parser_t &parser, shared_ptr<job_t> j) {
} }
pipe_next_read.close(); pipe_next_read.close();
debug(3, L"Created job %d from command '%ls' with pgrp %d", j->job_id, j->command_wcstr(),
debug(3, L"Created job %d from command '%ls' with pgrp %d", j->job_id, j->command_wcstr(), j->pgid); j->pgid);
j->set_flag(job_flag_t::CONSTRUCTED, true); j->set_flag(job_flag_t::CONSTRUCTED, true);
if (!j->is_foreground()) { if (!j->is_foreground()) {
@ -1062,6 +1105,21 @@ void exec_job(parser_t &parser, shared_ptr<job_t> j) {
env_set(L"last_pid", ENV_GLOBAL, { to_string(proc_last_bg_pid) }); env_set(L"last_pid", ENV_GLOBAL, { to_string(proc_last_bg_pid) });
} }
if (job_pushed) {
active_jobs.pop();
}
if (!parent_job) {
// Unset WAIT_BY_PROCESS on all child jobs. We could leave it, but this speeds up the
// execution of `process_mark_finished_children()`.
while (!child_jobs.empty()) {
auto child = child_jobs.top();
child_jobs.pop();
child->set_flag(job_flag_t::WAIT_BY_PROCESS, false);
}
}
if (!exec_error) { if (!exec_error) {
j->continue_job(false); j->continue_job(false);
} else { } else {

View file

@ -395,7 +395,10 @@ static bool process_mark_finished_children(bool block_on_fg) {
} }
if (j != job_fg && j->is_foreground() && !j->is_stopped() && !j->is_completed()) { if (j != job_fg && j->is_foreground() && !j->is_stopped() && !j->is_completed()) {
// Ignore jobs created via function evaluation in this sanity check
if (!job_fg || (!job_fg->get_flag(job_flag_t::NESTED) && !j->get_flag(job_flag_t::NESTED))) {
assert(job_fg == nullptr && "More than one active, fully-constructed foreground job!"); assert(job_fg == nullptr && "More than one active, fully-constructed foreground job!");
}
job_fg = j; job_fg = j;
} }
@ -421,15 +424,36 @@ static bool process_mark_finished_children(bool block_on_fg) {
options &= ~WNOHANG; options &= ~WNOHANG;
} }
bool wait_by_process = j->get_flag(job_flag_t::WAIT_BY_PROCESS);
process_list_t::iterator process = j->processes.begin();
// waitpid(2) returns 1 process each time, we need to keep calling it until we've reaped all // waitpid(2) returns 1 process each time, we need to keep calling it until we've reaped all
// children of the pgrp in question or else we can't reset the dirty_state flag. In all // children of the pgrp in question or else we can't reset the dirty_state flag. In all
// cases, calling waitpid(2) is faster than potentially calling select_try() on a process // cases, calling waitpid(2) is faster than potentially calling select_try() on a process
// that has exited, which will force us to wait the full timeout before coming back here and // that has exited, which will force us to wait the full timeout before coming back here and
// calling waitpid() again. // calling waitpid() again.
while (true) { while (true) {
int status = -1; int status;
pid_t pid;
if (wait_by_process) {
// If the evaluation of a function resulted in the sharing of a pgroup between the
// real job and the job that shouldn't have been created as a separate job AND the
// parent job is still under construction (which is the case when continue_job() is
// first called on the child job during the recursive call to exec_job() before the
// parent job has been fully constructed), we need to call waitpid(2) on the
// individual processes of the child job instead of using a catch-all waitpid(2)
// call on the job's process group.
if (process == j->processes.end()) {
break;
}
assert((*process)->pid != INVALID_PID && "Waiting by process on an invalid PID!");
pid = waitpid((*process)->pid, &status, options);
process++;
} else {
// A negative PID passed in to `waitpid()` means wait on any child in that process group // A negative PID passed in to `waitpid()` means wait on any child in that process group
auto pid = waitpid(-1 * j->pgid, &status, options); pid = waitpid(-1 * j->pgid, &status, options);
}
// Never make two calls to waitpid(2) without WNOHANG (i.e. with "HANG") in a row, // Never make two calls to waitpid(2) without WNOHANG (i.e. with "HANG") in a row,
// because we might wait on a non-stopped job that becomes stopped, but we don't refresh // because we might wait on a non-stopped job that becomes stopped, but we don't refresh
// our view of the process state before calling waitpid(2) again here. // our view of the process state before calling waitpid(2) again here.
@ -439,19 +463,19 @@ static bool process_mark_finished_children(bool block_on_fg) {
// A child process has been reaped // A child process has been reaped
handle_child_status(pid, status); handle_child_status(pid, status);
} }
else if (pid == 0) { else if (pid == 0 || errno == ECHILD) {
// No killed/dead children in this particular process group // No killed/dead children in this particular process group
if (!wait_by_process) {
break; break;
}
} else { } else {
// pid < 0 indicates an error. One likely failure is ECHILD (no children), which is not // pid < 0 indicates an error. One likely failure is ECHILD (no children), which is not
// an error and is ignored. The other likely failure is EINTR, which means we got a // an error and is ignored. The other likely failure is EINTR, which means we got a
// signal, which is considered an error. We absolutely do not break or return on error, // signal, which is considered an error. We absolutely do not break or return on error,
// as we need to iterate over all constructed jobs but we only call waitpid for one pgrp // as we need to iterate over all constructed jobs but we only call waitpid for one pgrp
// at a time. We do bypass future waits in case of error, however. // at a time. We do bypass future waits in case of error, however.
if (errno != ECHILD) {
has_error = true; has_error = true;
wperror(L"waitpid in process_mark_finished_children"); wperror(L"waitpid in process_mark_finished_children");
}
break; break;
} }
} }
@ -1034,13 +1058,13 @@ void job_t::continue_job(bool send_sigcont) {
// discussion in #5219). // discussion in #5219).
process_mark_finished_children(false); process_mark_finished_children(false);
// sigcont is NOT sent only when a job is first created. In the event of "nested jobs" // If this is a child job and the parent job is still under construction (i.e. job1 |
// (i.e. new jobs started due to the evaluation of functions), we can start a process // some_func), we can't block on execution of the nested job for `some_func`. Doing
// that feeds its output to a child job (e.g. `some_func | some_builtin`, which is two // so can cause hangs if job1 emits more data than fits in the OS pipe buffer.
// jobs), and if the first job in that expression generates enough data that it fills // The test for block_on_fg is this->parent_job.is_constructed(), which coincides
// its pipe buffer, it'll hang waiting for the second job to read from it. If we block // with WAIT_BY_PROCESS (which will have to do since we don't store a reference to
// on foreground jobs in this case, we'll keep waiting forever. // the parent job in the job_t structure).
bool block_on_fg = send_sigcont; bool block_on_fg = !get_flag(job_flag_t::WAIT_BY_PROCESS);
// Wait for data to become available or the status of our own job to change // Wait for data to become available or the status of our own job to change
while (!reader_exit_forced() && !is_stopped() && !is_completed()) { while (!reader_exit_forced() && !is_stopped() && !is_completed()) {

View file

@ -163,10 +163,17 @@ enum class job_flag_t {
SKIP_NOTIFICATION, SKIP_NOTIFICATION,
/// Whether the exit status should be negated. This flag can only be set by the not builtin. /// Whether the exit status should be negated. This flag can only be set by the not builtin.
NEGATE, NEGATE,
/// Whether the job is under job control. /// Whether the job is under job control, i.e. has its own pgrp.
JOB_CONTROL, JOB_CONTROL,
/// Whether the job wants to own the terminal when in the foreground. /// Whether the job wants to own the terminal when in the foreground.
TERMINAL, TERMINAL,
/// This job was created via a recursive call to exec_job upon evaluation of a function.
/// Ideally it should not be a top-level job, and there are places where it won't be treated
/// as such.
NESTED,
/// This job shares a pgrp with a not-yet-constructed job, so we can't waitpid on its pgid
/// directly. Hack to work around fucntions starting new jobs. See `exec_job()`.
WAIT_BY_PROCESS
}; };
typedef int job_id_t; typedef int job_id_t;