Use type safety for pid values

The previous approach of "treat this field as an `Option<NonZeroU32>` and
remember to check `p.has_pid()` before accessing it" was a mix of C++ and rust
conventions and led to some bugs or incorrect behaviors.

* `jobs -p` would previously print both the (correct) external pid and the
  (incorrect) internal value of `0` if a backgrounded command contained a
  fish function (e.g. `function foo; end; cat | foo &; jobs`)
* Updating/calculating job cpu time and usage was incorrectly including all of
  fish's cpu usage/time for each function/builtin member of the job pipeline.

Closes #10832
This commit is contained in:
Mahmoud Al-Qudsi 2024-11-10 14:17:23 -06:00
parent 080e40aac0
commit 3307672998
4 changed files with 126 additions and 63 deletions

View file

@ -19,7 +19,6 @@ use crate::{
};
use libc::c_int;
use std::num::NonZeroU32;
use std::sync::atomic::Ordering;
/// Print modes for the jobs builtin.
@ -36,9 +35,9 @@ enum JobsPrintMode {
/// This may exceed 1 if there are multiple CPUs!
fn cpu_use(j: &Job) -> f64 {
let mut u = 0.0;
for p in j.processes() {
for p in j.external_procs() {
let now = timef();
let jiffies = proc_get_jiffies(p.pid.load(Ordering::Relaxed));
let jiffies = proc_get_jiffies(p.pid.load().unwrap().as_pid_t());
let last_jiffies = p.last_times.get().jiffies;
let since = now - last_jiffies as f64;
if since > 0.0 && jiffies > last_jiffies {
@ -104,8 +103,8 @@ fn builtin_jobs_print(j: &Job, mode: JobsPrintMode, header: bool, streams: &mut
out += wgettext!("Process\n");
}
for p in j.processes() {
out += &sprintf!("%d\n", p.pid.load(Ordering::Relaxed))[..];
for p in j.external_procs() {
out += &sprintf!("%d\n", p.pid.load().unwrap().get())[..];
}
streams.out.append(out);
}

View file

@ -36,8 +36,8 @@ use crate::null_terminated_array::{
use crate::parser::{Block, BlockId, BlockType, EvalRes, Parser};
use crate::proc::{
hup_jobs, is_interactive_session, jobs_requiring_warning_on_exit, no_exec,
print_exit_warning_for_jobs, InternalProc, Job, JobGroupRef, ProcStatus, Process, ProcessType,
TtyTransfer,
print_exit_warning_for_jobs, InternalProc, Job, JobGroupRef, Pid, ProcStatus, Process,
ProcessType, TtyTransfer,
};
use crate::reader::{reader_run_count, restore_term_mode};
use crate::redirection::{dup2_list_resolve_chain, Dup2List};
@ -715,13 +715,14 @@ fn fork_child_for_process(
job.group().set_pgid(pid);
}
{
let pid = p.pid().unwrap();
if let Some(pgid) = job.group().get_pgid() {
let err = execute_setpgid(p.pid(), pgid, is_parent);
let err = execute_setpgid(pid, pgid, is_parent);
if err != 0 {
report_setpgid_error(
err,
is_parent,
p.pid(),
pid,
pgid,
job.job_id().as_num(),
&narrow_cmd,
@ -869,7 +870,7 @@ fn exec_external_command(
return Err(());
}
};
assert!(pid > 0, "Should have either a valid pid, or an error");
let pid = Pid::new(pid).expect("Should have either a valid pid, or an error");
// This usleep can be used to test for various race conditions
// (https://github.com/fish-shell/fish-shell/issues/360).
@ -879,7 +880,7 @@ fn exec_external_command(
exec_fork,
"Fork #%d, pid %d: spawn external command '%s' from '%ls'",
count,
pid,
pid.get(),
p.actual_cmd,
file.as_ref()
.map(|s| s.as_utfstr())
@ -887,14 +888,14 @@ fn exec_external_command(
);
// these are all things do_fork() takes care of normally (for forked processes):
p.pid.store(pid, Ordering::Relaxed);
p.pid.store(pid);
if p.leads_pgrp {
j.group().set_pgid(pid);
j.group().set_pgid(pid.as_pid_t());
// posix_spawn should in principle set the pgid before returning.
// In glibc, posix_spawn uses fork() and the pgid group is set on the child side;
// therefore the parent may not have seen it be set yet.
// Ensure it gets set. See #4715, also https://github.com/Microsoft/WSL/issues/2997.
execute_setpgid(pid, pid, true /* is parent */);
execute_setpgid(pid.as_pid_t(), pid.as_pid_t(), true /* is parent */);
}
return Ok(());
}
@ -1326,7 +1327,7 @@ fn exec_process_in_job(
exec_external_command(parser, j, p, &process_net_io_chain)?;
// It's possible (though unlikely) that this is a background process which recycled a
// pid from another, previous background process. Forget any such old process.
parser.mut_wait_handles().remove_by_pid(p.pid());
parser.mut_wait_handles().remove_by_pid(p.pid().unwrap());
Ok(())
}
ProcessType::exec => {

View file

@ -949,8 +949,8 @@ impl Parser {
/// Returns the job and job index with the given pid.
pub fn job_get_with_index_from_pid(&self, pid: libc::pid_t) -> Option<(usize, JobRef)> {
for (i, job) in self.jobs().iter().enumerate() {
for p in job.processes().iter() {
if p.pid.load(Ordering::Relaxed) == pid {
for p in job.external_procs() {
if p.pid.load().unwrap().get() == pid {
return Some((i, job.clone()));
}
}

View file

@ -37,11 +37,12 @@ use portable_atomic::AtomicU64;
use std::cell::{Cell, Ref, RefCell, RefMut};
use std::fs;
use std::io::{Read, Write};
use std::num::NonZeroU32;
use std::os::fd::RawFd;
use std::rc::Rc;
#[cfg(target_has_atomic = "64")]
use std::sync::atomic::AtomicU64;
use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU8, Ordering};
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU8, Ordering};
use std::sync::Arc;
/// Types of processes.
@ -507,6 +508,56 @@ impl Drop for TtyTransfer {
}
}
#[repr(transparent)]
#[derive(Clone, Copy, Debug, PartialEq)]
pub struct Pid(NonZeroU32);
impl Pid {
#[inline(always)]
pub fn new<P: Into<i32>>(pid: P) -> Option<Pid> {
let pid = pid.into();
assert!(pid >= 0, "Invalid negative PID value!");
match NonZeroU32::new(pid as u32) {
Some(x) => Some(Pid(x)),
None => None,
}
}
#[inline(always)]
pub fn get(&self) -> i32 {
self.0.get() as i32
}
#[inline(always)]
pub fn as_pid_t(&self) -> libc::pid_t {
#[allow(clippy::useless_conversion)]
self.get().into()
}
}
impl Into<Pid> for u32 {
#[inline(always)]
fn into(self) -> Pid {
Pid::new(self as i32).unwrap()
}
}
#[derive(Default, Debug)]
pub struct AtomicPid(AtomicU32);
impl AtomicPid {
#[inline(always)]
pub fn load(&self) -> Option<Pid> {
Pid::new(self.0.load(Ordering::Relaxed) as i32)
}
#[inline(always)]
pub fn store(&self, pid: Pid) {
self.0.store(pid.get() as u32, Ordering::Relaxed);
}
#[inline(always)]
pub fn swap(&self, pid: Pid) -> Option<Pid> {
Pid::new(self.0.swap(pid.get() as u32, Ordering::Relaxed) as i32)
}
}
/// A structure representing a single fish process. Contains variables for tracking process state
/// and the process argument list. Actually, a fish process can be either a regular external
/// process, an internal builtin which may or may not spawn a fake IO process during execution, a
@ -551,9 +602,8 @@ pub struct Process {
/// Generation counts for reaping.
pub gens: GenerationsList,
/// Process ID, represented as an AtomicI32. This is actually an Option<AtomicNonZeroI32> with a
/// value of zero representing `None`.
pub pid: AtomicI32,
/// Process ID or `None` where not available.
pub pid: AtomicPid,
/// If we are an "internal process," that process.
pub internal_proc: RefCell<Option<Arc<InternalProc>>>,
@ -614,26 +664,22 @@ impl Process {
Default::default()
}
/// Retrieves the associated [`libc::pid_t`], 0 if unset.
///
/// See [`Process::has_pid()]` to safely check if the process has a pid.
pub fn pid(&self) -> libc::pid_t {
let value = self.pid.load(Ordering::Relaxed);
#[allow(clippy::useless_conversion)]
value.into()
/// Retrieves the associated [`libc::pid_t`], `None` if unset.
#[inline(always)]
pub fn pid(&self) -> Option<libc::pid_t> {
self.pid.load().map(|pid| pid.as_pid_t())
}
#[inline(always)]
pub fn has_pid(&self) -> bool {
let value = self.pid.load(Ordering::Relaxed);
value != 0
self.pid.load().is_some()
}
/// Sets the process' pid. Panics if a pid has already been set.
pub fn set_pid(&self, pid: libc::pid_t) {
assert!(pid != 0, "Invalid pid of 0 passed to Process::set_pid()");
assert!(pid >= 0);
let old = self.pid.swap(pid, Ordering::Relaxed);
assert!(old == 0, "Process::set_pid() called more than once!");
let pid = Pid::new(pid).expect("Invalid pid passed to Process::set_pid()");
let old = self.pid.swap(pid);
assert!(old.is_none(), "Process::set_pid() called more than once!");
}
/// Sets argv.
@ -707,13 +753,13 @@ impl Process {
/// As a process does not know its job id, we pass it in.
/// Note this will return null if the process is not waitable (has no pid).
pub fn make_wait_handle(&self, jid: InternalJobId) -> Option<WaitHandleRef> {
if self.typ != ProcessType::external || self.pid.load(Ordering::Relaxed) <= 0 {
if self.typ != ProcessType::external || self.pid().is_none() {
// Not waitable.
None
} else {
if self.wait_handle.borrow().is_none() {
self.wait_handle.replace(Some(WaitHandle::new(
self.pid(),
self.pid().unwrap(),
jid,
wbasename(&self.actual_cmd.clone()).to_owned(),
)));
@ -820,6 +866,14 @@ impl Job {
&mut self.processes
}
/// A read-only view of external processes running in the job's process list.
///
/// Equivalent to `processes().iter().filter(|p| p.pid.is_some())`.
#[inline(always)]
pub fn external_procs(&self) -> impl Iterator<Item = &ProcessPtr> {
self.processes.iter().filter(|p| p.pid.load().is_some())
}
/// Return whether it is OK to reap a given process. Sometimes we want to defer reaping a
/// process if it is the group leader and the job is not yet constructed, because then we might
/// also reap the process group and then we cannot add new processes to the group.
@ -828,7 +882,7 @@ impl Job {
// Can't reap twice.
p.is_completed() ||
// Can't reap the group leader in an under-construction job.
(p.has_pid() && !self.is_constructed() && self.get_pgid() == Some(p.pid()))
(!self.is_constructed() && self.get_pgid() == p.pid())
)
}
@ -859,11 +913,7 @@ impl Job {
/// This will never be fish's own pid.
// TODO: Return a type-safe result.
pub fn get_last_pid(&self) -> Option<libc::pid_t> {
self.processes()
.iter()
.rev()
.find(|proc| proc.has_pid())
.map(|proc| proc.pid())
self.processes().iter().filter_map(|proc| proc.pid()).last()
}
/// The id of this job.
@ -1044,9 +1094,8 @@ impl Job {
}
} else {
// This job lives in fish's pgroup and we need to signal procs individually.
for p in self.processes().iter() {
if !p.is_completed() && p.has_pid() && unsafe { libc::kill(p.pid(), signal) } == -1
{
for p in self.external_procs() {
if !p.is_completed() && unsafe { libc::kill(p.pid().unwrap(), signal) } == -1 {
return false;
}
}
@ -1182,7 +1231,13 @@ pub fn print_exit_warning_for_jobs(jobs: &JobList) {
printf!("%s", wgettext!("There are still jobs active:\n"));
printf!("%s", wgettext!("\n PID Command\n"));
for j in jobs {
printf!("%6d %ls\n", j.processes()[0].pid(), j.command());
// Unwrap safety: we can't have a background job that doesn't have an external process and
// external processes always have a pid set.
printf!(
"%6d %ls\n",
j.external_procs().next().unwrap().pid().unwrap(),
j.command()
);
}
printf!("\n");
printf!(
@ -1234,10 +1289,10 @@ pub fn proc_get_jiffies(inpid: libc::pid_t) -> ClockTicks {
/// process of every job.
pub fn proc_update_jiffies(parser: &Parser) {
for job in parser.jobs().iter() {
for p in job.processes.iter() {
for p in job.external_procs() {
p.last_times.replace(ProcTimes {
time: timef(),
jiffies: proc_get_jiffies(p.pid.load(Ordering::Relaxed)),
jiffies: proc_get_jiffies(p.pid.load().map(|p| p.as_pid_t()).unwrap()),
});
}
}
@ -1335,10 +1390,8 @@ pub fn hup_jobs(jobs: &JobList) {
/// jobs. Used to avoid zombie processes after disown.
pub fn add_disowned_job(j: &Job) {
let mut disowned_pids = DISOWNED_PIDS.get().borrow_mut();
for process in j.processes().iter() {
if process.has_pid() {
disowned_pids.push(process.pid());
}
for process in j.external_procs() {
disowned_pids.push(process.pid().unwrap());
}
}
@ -1403,9 +1456,9 @@ fn process_mark_finished_children(parser: &Parser, block_ok: bool) {
// We structure this as two loops for some simplicity.
// First reap all pids.
for j in parser.jobs().iter() {
for proc in j.processes.iter() {
// Does this proc have a pid that is reapable?
if proc.pid.load(Ordering::Relaxed) <= 0 || !j.can_reap(proc) {
for proc in j.external_procs() {
// It's an external proc so it has a pid, but is it reapable?
if !j.can_reap(proc) {
continue;
}
@ -1421,12 +1474,16 @@ fn process_mark_finished_children(parser: &Parser, block_ok: bool) {
// Ok, we are reapable. Run waitpid()!
let mut statusv: libc::c_int = -1;
let pid = unsafe {
libc::waitpid(proc.pid(), &mut statusv, WNOHANG | WUNTRACED | WCONTINUED)
libc::waitpid(
proc.pid().unwrap(),
&mut statusv,
WNOHANG | WUNTRACED | WCONTINUED,
)
};
assert!(pid <= 0 || pid == proc.pid(), "Unexpected waitpid() return");
if pid <= 0 {
continue;
}
assert!(pid == proc.pid().unwrap(), "Unexpected waitpid() return");
// The process has stopped or exited! Update its status.
let status = ProcStatus::from_waitpid(statusv);
@ -1451,7 +1508,7 @@ fn process_mark_finished_children(parser: &Parser, block_ok: bool) {
proc_reap_external,
"External process '%ls' (pid %d, %s)",
proc.argv0().unwrap(),
proc.pid(),
proc.pid().unwrap(),
if proc.status.stopped() {
"stopped"
} else {
@ -1511,10 +1568,13 @@ fn generate_process_exit_events(j: &Job, out_evts: &mut Vec<Event>) {
// Historically we have avoided generating events for foreground jobs from event handlers, as an
// event handler may itself produce a new event.
if !j.from_event_handler() || !j.is_foreground() {
for p in j.processes().iter() {
if p.has_pid() && p.is_completed() && !p.posted_proc_exit.load() {
for p in j.external_procs() {
if p.is_completed() && !p.posted_proc_exit.load() {
p.posted_proc_exit.store(true);
out_evts.push(Event::process_exit(p.pid(), p.status.status_value()));
out_evts.push(Event::process_exit(
p.pid().unwrap(),
p.status.status_value(),
));
}
}
}
@ -1631,8 +1691,11 @@ fn summary_command(j: &Job, p: Option<&Process>) -> WString {
buffer += &escape(sig.desc())[..];
// If we have multiple processes, we also append the pid and argv.
if j.processes().len() > 1 {
buffer += &sprintf!(" %d", p.pid())[..];
if j.external_procs().count() > 1 {
// I don't think it's safe to blindly unwrap here because even though we exited with
// a signal, the job could have contained a fish function?
let pid = p.pid().map(|p| p.to_string()).unwrap_or("-".to_string());
buffer += &sprintf!(" %s", pid)[..];
buffer.push(' ');
buffer += &escape(p.argv0().unwrap())[..];