Merge pull request #6043 from cre4ture/fix/flaky_split_round_robin_limited_fds

Fix/flaky `split` round robin limited fds
This commit is contained in:
Sylvestre Ledru 2024-03-09 22:43:41 +01:00 committed by GitHub
commit 991d71856f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 109 additions and 45 deletions

View file

@ -495,10 +495,10 @@ rstest = { workspace = true }
[target.'cfg(any(target_os = "linux", target_os = "android"))'.dev-dependencies]
procfs = { version = "0.16", default-features = false }
rlimit = "0.10.1"
[target.'cfg(unix)'.dev-dependencies]
nix = { workspace = true, features = ["process", "signal", "user", "term"] }
rlimit = "0.10.1"
rand_pcg = "0.3"
xattr = { workspace = true }

View file

@ -1130,6 +1130,11 @@ struct OutFile {
/// and [`n_chunks_by_line_round_robin`] functions.
type OutFiles = Vec<OutFile>;
trait ManageOutFiles {
fn instantiate_writer(
&mut self,
idx: usize,
settings: &Settings,
) -> UResult<&mut BufWriter<Box<dyn Write>>>;
/// Initialize a new set of output files
/// Each OutFile is generated with filename, while the writer for it could be
/// optional, to be instantiated later by the calling function as needed.
@ -1194,6 +1199,52 @@ impl ManageOutFiles for OutFiles {
Ok(out_files)
}
fn instantiate_writer(
&mut self,
idx: usize,
settings: &Settings,
) -> UResult<&mut BufWriter<Box<dyn Write>>> {
let mut count = 0;
// Use-case for doing multiple tries of closing fds:
// E.g. split running in parallel to other processes (e.g. another split) doing similar stuff,
// sharing the same limits. In this scenario, after closing one fd, the other process
// might "steel" the freed fd and open a file on its side. Then it would be beneficial
// if split would be able to close another fd before cancellation.
'loop1: loop {
let filename_to_open = self[idx].filename.as_str();
let file_to_open_is_new = self[idx].is_new;
let maybe_writer =
settings.instantiate_current_writer(filename_to_open, file_to_open_is_new);
if let Ok(writer) = maybe_writer {
self[idx].maybe_writer = Some(writer);
return Ok(self[idx].maybe_writer.as_mut().unwrap());
}
if settings.filter.is_some() {
// Propagate error if in `--filter` mode
return Err(maybe_writer.err().unwrap().into());
}
// Could have hit system limit for open files.
// Try to close one previously instantiated writer first
for (i, out_file) in self.iter_mut().enumerate() {
if i != idx && out_file.maybe_writer.is_some() {
out_file.maybe_writer.as_mut().unwrap().flush()?;
out_file.maybe_writer = None;
out_file.is_new = false;
count += 1;
// And then try to instantiate the writer again
continue 'loop1;
}
}
// If this fails - give up and propagate the error
uucore::show_error!("at file descriptor limit, but no file descriptor left to close. Closed {count} writers before.");
return Err(maybe_writer.err().unwrap().into());
}
}
fn get_writer(
&mut self,
idx: usize,
@ -1204,34 +1255,7 @@ impl ManageOutFiles for OutFiles {
} else {
// Writer was not instantiated upfront or was temporarily closed due to system resources constraints.
// Instantiate it and record for future use.
let maybe_writer =
settings.instantiate_current_writer(self[idx].filename.as_str(), self[idx].is_new);
if let Ok(writer) = maybe_writer {
self[idx].maybe_writer = Some(writer);
Ok(self[idx].maybe_writer.as_mut().unwrap())
} else if settings.filter.is_some() {
// Propagate error if in `--filter` mode
Err(maybe_writer.err().unwrap().into())
} else {
// Could have hit system limit for open files.
// Try to close one previously instantiated writer first
for (i, out_file) in self.iter_mut().enumerate() {
if i != idx && out_file.maybe_writer.is_some() {
out_file.maybe_writer.as_mut().unwrap().flush()?;
out_file.maybe_writer = None;
out_file.is_new = false;
break;
}
}
// And then try to instantiate the writer again
// If this fails - give up and propagate the error
self[idx].maybe_writer =
Some(settings.instantiate_current_writer(
self[idx].filename.as_str(),
self[idx].is_new,
)?);
Ok(self[idx].maybe_writer.as_mut().unwrap())
}
self.instantiate_writer(idx, settings)
}
}
}

View file

@ -3,15 +3,16 @@
// For the full copyright and license information, please view the LICENSE
// file that was distributed with this source code.
//spell-checker: ignore (linux) rlimit prlimit coreutil ggroups uchild uncaptured scmd SHLVL canonicalized openpty winsize xpixel ypixel
//spell-checker: ignore (linux) rlimit prlimit coreutil ggroups uchild uncaptured scmd SHLVL canonicalized openpty
//spell-checker: ignore (linux) winsize xpixel ypixel setrlimit FSIZE
#![allow(dead_code)]
#[cfg(unix)]
use nix::pty::OpenptyResult;
use pretty_assertions::assert_eq;
#[cfg(any(target_os = "linux", target_os = "android"))]
use rlimit::prlimit;
#[cfg(unix)]
use rlimit::setrlimit;
#[cfg(feature = "sleep")]
use rstest::rstest;
#[cfg(unix)]
@ -27,6 +28,8 @@ use std::os::fd::OwnedFd;
#[cfg(unix)]
use std::os::unix::fs::{symlink as symlink_dir, symlink as symlink_file, PermissionsExt};
#[cfg(unix)]
use std::os::unix::process::CommandExt;
#[cfg(unix)]
use std::os::unix::process::ExitStatusExt;
#[cfg(windows)]
use std::os::windows::fs::{symlink_dir, symlink_file};
@ -1224,7 +1227,7 @@ pub struct UCommand {
stdout: Option<Stdio>,
stderr: Option<Stdio>,
bytes_into_stdin: Option<Vec<u8>>,
#[cfg(any(target_os = "linux", target_os = "android"))]
#[cfg(unix)]
limits: Vec<(rlimit::Resource, u64, u64)>,
stderr_to_stdout: bool,
timeout: Option<Duration>,
@ -1387,7 +1390,7 @@ impl UCommand {
self
}
#[cfg(any(target_os = "linux", target_os = "android"))]
#[cfg(unix)]
pub fn limit(
&mut self,
resource: rlimit::Resource,
@ -1646,6 +1649,25 @@ impl UCommand {
command.stdin(pi_slave).stdout(po_slave).stderr(pe_slave);
}
#[cfg(unix)]
if !self.limits.is_empty() {
// just to be safe: move a copy of the limits list into the closure.
// this way the closure is fully self-contained.
let limits_copy = self.limits.clone();
let closure = move || -> Result<()> {
for &(resource, soft_limit, hard_limit) in &limits_copy {
setrlimit(resource, soft_limit, hard_limit)?;
}
Ok(())
};
// SAFETY: the closure is self-contained and doesn't do any memory
// writes that would need to be propagated back to the parent process.
// also, the closure doesn't access stdin, stdout and stderr.
unsafe {
command.pre_exec(closure);
}
}
(command, captured_stdout, captured_stderr, stdin_pty)
}
@ -1660,17 +1682,6 @@ impl UCommand {
let child = command.spawn().unwrap();
#[cfg(any(target_os = "linux", target_os = "android"))]
for &(resource, soft_limit, hard_limit) in &self.limits {
prlimit(
child.id() as i32,
resource,
Some((soft_limit, hard_limit)),
None,
)
.unwrap();
}
let mut child = UChild::from(self, child, captured_stdout, captured_stderr, stdin_pty);
if let Some(input) = self.bytes_into_stdin.take() {
@ -3706,4 +3717,33 @@ mod tests {
);
std::assert_eq!(String::from_utf8_lossy(out.stderr()), "");
}
#[cfg(unix)]
#[test]
fn test_application_of_process_resource_limits_unlimited_file_size() {
let ts = TestScenario::new("util");
ts.cmd("sh")
.args(&["-c", "ulimit -Sf; ulimit -Hf"])
.succeeds()
.no_stderr()
.stdout_is("unlimited\nunlimited\n");
}
#[cfg(unix)]
#[test]
fn test_application_of_process_resource_limits_limited_file_size() {
let unit_size_bytes = if cfg!(target_os = "macos") { 1024 } else { 512 };
let ts = TestScenario::new("util");
ts.cmd("sh")
.args(&["-c", "ulimit -Sf; ulimit -Hf"])
.limit(
rlimit::Resource::FSIZE,
8 * unit_size_bytes,
16 * unit_size_bytes,
)
.succeeds()
.no_stderr()
.stdout_is("8\n16\n");
}
}