mirror of
https://github.com/uutils/coreutils
synced 2024-11-17 02:08:09 +00:00
Merge pull request #2364 from miDeb/sort/compress
sort: implement --batch-size and --compress-program
This commit is contained in:
commit
f3bc9d234c
8 changed files with 266 additions and 50 deletions
21
Cargo.lock
generated
21
Cargo.lock
generated
|
@ -43,12 +43,6 @@ dependencies = [
|
|||
"winapi 0.3.9",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "array-init"
|
||||
version = "2.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6945cc5422176fc5e602e590c2878d2c2acd9a4fe20a4baa7c28022521698ec6"
|
||||
|
||||
[[package]]
|
||||
name = "arrayref"
|
||||
version = "0.3.6"
|
||||
|
@ -710,9 +704,9 @@ checksum = "62aca2aba2d62b4a7f5b33f3712cb1b0692779a56fb510499d5c0aa594daeaf3"
|
|||
|
||||
[[package]]
|
||||
name = "heck"
|
||||
version = "0.3.2"
|
||||
version = "0.3.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "87cbf45460356b7deeb5e3415b5563308c0a9b057c85e12b06ad551f98d0a6ac"
|
||||
checksum = "6d621efb26863f0e9924c6ac577e8275e5e6b77455db64ffa6c65c904e9e132c"
|
||||
dependencies = [
|
||||
"unicode-segmentation",
|
||||
]
|
||||
|
@ -1393,12 +1387,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "regex-automata"
|
||||
version = "0.1.9"
|
||||
version = "0.1.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ae1ded71d66a4a97f5e961fd0cb25a5f366a42a41570d16a763a69c092c26ae4"
|
||||
dependencies = [
|
||||
"byteorder",
|
||||
]
|
||||
checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
|
||||
|
||||
[[package]]
|
||||
name = "regex-syntax"
|
||||
|
@ -1511,9 +1502,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "signal-hook-registry"
|
||||
version = "1.3.0"
|
||||
version = "1.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "16f1d0fef1604ba8f7a073c7e701f213e056707210e9020af4528e0101ce11a6"
|
||||
checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
|
|
@ -102,17 +102,17 @@ pub fn read(
|
|||
carry_over.clear();
|
||||
carry_over.extend_from_slice(&buffer[read..]);
|
||||
|
||||
let payload = Chunk::new(buffer, |buf| {
|
||||
let mut lines = unsafe {
|
||||
// SAFETY: It is safe to transmute to a vector of lines with shorter lifetime,
|
||||
// because it was only temporarily transmuted to a Vec<Line<'static>> to make recycling possible.
|
||||
std::mem::transmute::<Vec<Line<'static>>, Vec<Line<'_>>>(lines)
|
||||
};
|
||||
let read = crash_if_err!(1, std::str::from_utf8(&buf[..read]));
|
||||
parse_lines(read, &mut lines, separator, &settings);
|
||||
lines
|
||||
});
|
||||
if !payload.borrow_lines().is_empty() {
|
||||
if read != 0 {
|
||||
let payload = Chunk::new(buffer, |buf| {
|
||||
let mut lines = unsafe {
|
||||
// SAFETY: It is safe to transmute to a vector of lines with shorter lifetime,
|
||||
// because it was only temporarily transmuted to a Vec<Line<'static>> to make recycling possible.
|
||||
std::mem::transmute::<Vec<Line<'static>>, Vec<Line<'_>>>(lines)
|
||||
};
|
||||
let read = crash_if_err!(1, std::str::from_utf8(&buf[..read]));
|
||||
parse_lines(read, &mut lines, separator, &settings);
|
||||
lines
|
||||
});
|
||||
sender.send(payload).unwrap();
|
||||
}
|
||||
if !should_continue {
|
||||
|
@ -175,6 +175,7 @@ fn read_to_buffer(
|
|||
separator: u8,
|
||||
) -> (usize, bool) {
|
||||
let mut read_target = &mut buffer[start_offset..];
|
||||
let mut last_file_target_size = read_target.len();
|
||||
loop {
|
||||
match file.read(read_target) {
|
||||
Ok(0) => {
|
||||
|
@ -208,14 +209,27 @@ fn read_to_buffer(
|
|||
read_target = &mut buffer[len..];
|
||||
}
|
||||
} else {
|
||||
// This file is empty.
|
||||
// This file has been fully read.
|
||||
let mut leftover_len = read_target.len();
|
||||
if last_file_target_size != leftover_len {
|
||||
// The file was not empty.
|
||||
let read_len = buffer.len() - leftover_len;
|
||||
if buffer[read_len - 1] != separator {
|
||||
// The file did not end with a separator. We have to insert one.
|
||||
buffer[read_len] = separator;
|
||||
leftover_len -= 1;
|
||||
}
|
||||
let read_len = buffer.len() - leftover_len;
|
||||
read_target = &mut buffer[read_len..];
|
||||
}
|
||||
if let Some(next_file) = next_files.next() {
|
||||
// There is another file.
|
||||
last_file_target_size = leftover_len;
|
||||
*file = next_file;
|
||||
} else {
|
||||
// This was the last file.
|
||||
let leftover_len = read_target.len();
|
||||
return (buffer.len() - leftover_len, false);
|
||||
let read_len = buffer.len() - leftover_len;
|
||||
return (read_len, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -12,8 +12,12 @@
|
|||
//! The buffers for the individual chunks are recycled. There are two buffers.
|
||||
|
||||
use std::cmp::Ordering;
|
||||
use std::fs::File;
|
||||
use std::io::BufReader;
|
||||
use std::io::{BufWriter, Write};
|
||||
use std::path::Path;
|
||||
use std::process::Child;
|
||||
use std::process::{Command, Stdio};
|
||||
use std::{
|
||||
fs::OpenOptions,
|
||||
io::Read,
|
||||
|
@ -25,12 +29,13 @@ use itertools::Itertools;
|
|||
|
||||
use tempfile::TempDir;
|
||||
|
||||
use crate::Line;
|
||||
use crate::{
|
||||
chunks::{self, Chunk},
|
||||
compare_by, merge, output_sorted_lines, sort_by, GlobalSettings,
|
||||
};
|
||||
|
||||
const MIN_BUFFER_SIZE: usize = 8_000;
|
||||
const START_BUFFER_SIZE: usize = 8_000;
|
||||
|
||||
/// Sort files by using auxiliary files for storing intermediate chunks (if needed), and output the result.
|
||||
pub fn ext_sort(files: &mut impl Iterator<Item = Box<dyn Read + Send>>, settings: &GlobalSettings) {
|
||||
|
@ -63,10 +68,31 @@ pub fn ext_sort(files: &mut impl Iterator<Item = Box<dyn Read + Send>>, settings
|
|||
);
|
||||
match read_result {
|
||||
ReadResult::WroteChunksToFile { chunks_written } => {
|
||||
let files = (0..chunks_written)
|
||||
.map(|chunk_num| tmp_dir.path().join(chunk_num.to_string()))
|
||||
.collect::<Vec<_>>();
|
||||
let mut merger = merge::merge(&files, settings);
|
||||
let mut children = Vec::new();
|
||||
let files = (0..chunks_written).map(|chunk_num| {
|
||||
let file_path = tmp_dir.path().join(chunk_num.to_string());
|
||||
let file = File::open(file_path).unwrap();
|
||||
if let Some(compress_prog) = &settings.compress_prog {
|
||||
let mut command = Command::new(compress_prog);
|
||||
command.stdin(file).stdout(Stdio::piped()).arg("-d");
|
||||
let mut child = crash_if_err!(
|
||||
2,
|
||||
command.spawn().map_err(|err| format!(
|
||||
"couldn't execute compress program: errno {}",
|
||||
err.raw_os_error().unwrap()
|
||||
))
|
||||
);
|
||||
let child_stdout = child.stdout.take().unwrap();
|
||||
children.push(child);
|
||||
Box::new(BufReader::new(child_stdout)) as Box<dyn Read + Send>
|
||||
} else {
|
||||
Box::new(BufReader::new(file)) as Box<dyn Read + Send>
|
||||
}
|
||||
});
|
||||
let mut merger = merge::merge_with_file_limit(files, settings);
|
||||
for child in children {
|
||||
assert_child_success(child, settings.compress_prog.as_ref().unwrap());
|
||||
}
|
||||
merger.write_all(settings);
|
||||
}
|
||||
ReadResult::SortedSingleChunk(chunk) => {
|
||||
|
@ -132,7 +158,14 @@ fn reader_writer(
|
|||
for _ in 0..2 {
|
||||
chunks::read(
|
||||
&mut sender_option,
|
||||
vec![0; MIN_BUFFER_SIZE],
|
||||
vec![
|
||||
0;
|
||||
if START_BUFFER_SIZE < buffer_size {
|
||||
START_BUFFER_SIZE
|
||||
} else {
|
||||
buffer_size
|
||||
}
|
||||
],
|
||||
Some(buffer_size),
|
||||
&mut carry_over,
|
||||
&mut file,
|
||||
|
@ -171,6 +204,7 @@ fn reader_writer(
|
|||
write(
|
||||
&mut chunk,
|
||||
&tmp_dir.path().join(file_number.to_string()),
|
||||
settings.compress_prog.as_deref(),
|
||||
separator,
|
||||
);
|
||||
|
||||
|
@ -193,14 +227,45 @@ fn reader_writer(
|
|||
}
|
||||
|
||||
/// Write the lines in `chunk` to `file`, separated by `separator`.
|
||||
fn write(chunk: &mut Chunk, file: &Path, separator: u8) {
|
||||
/// `compress_prog` is used to optionally compress file contents.
|
||||
fn write(chunk: &mut Chunk, file: &Path, compress_prog: Option<&str>, separator: u8) {
|
||||
chunk.with_lines_mut(|lines| {
|
||||
// Write the lines to the file
|
||||
let file = crash_if_err!(1, OpenOptions::new().create(true).write(true).open(file));
|
||||
let mut writer = BufWriter::new(file);
|
||||
for s in lines.iter() {
|
||||
crash_if_err!(1, writer.write_all(s.line.as_bytes()));
|
||||
crash_if_err!(1, writer.write_all(&[separator]));
|
||||
}
|
||||
if let Some(compress_prog) = compress_prog {
|
||||
let mut command = Command::new(compress_prog);
|
||||
command.stdin(Stdio::piped()).stdout(file);
|
||||
let mut child = crash_if_err!(
|
||||
2,
|
||||
command.spawn().map_err(|err| format!(
|
||||
"couldn't execute compress program: errno {}",
|
||||
err.raw_os_error().unwrap()
|
||||
))
|
||||
);
|
||||
let mut writer = BufWriter::new(child.stdin.take().unwrap());
|
||||
write_lines(lines, &mut writer, separator);
|
||||
writer.flush().unwrap();
|
||||
drop(writer);
|
||||
assert_child_success(child, compress_prog);
|
||||
} else {
|
||||
let mut writer = BufWriter::new(file);
|
||||
write_lines(lines, &mut writer, separator);
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
fn write_lines<'a, T: Write>(lines: &[Line<'a>], writer: &mut T, separator: u8) {
|
||||
for s in lines {
|
||||
crash_if_err!(1, writer.write_all(s.line.as_bytes()));
|
||||
crash_if_err!(1, writer.write_all(&[separator]));
|
||||
}
|
||||
}
|
||||
|
||||
fn assert_child_success(mut child: Child, program: &str) {
|
||||
if !matches!(
|
||||
child.wait().map(|e| e.code()),
|
||||
Ok(Some(0)) | Ok(None) | Err(_)
|
||||
) {
|
||||
crash!(2, "'{}' terminated abnormally", program)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,8 +9,8 @@
|
|||
|
||||
use std::{
|
||||
cmp::Ordering,
|
||||
ffi::OsStr,
|
||||
io::{Read, Write},
|
||||
fs::File,
|
||||
io::{BufWriter, Read, Write},
|
||||
iter,
|
||||
rc::Rc,
|
||||
sync::mpsc::{channel, sync_channel, Receiver, Sender, SyncSender},
|
||||
|
@ -18,18 +18,69 @@ use std::{
|
|||
};
|
||||
|
||||
use compare::Compare;
|
||||
use itertools::Itertools;
|
||||
|
||||
use crate::{
|
||||
chunks::{self, Chunk},
|
||||
compare_by, open, GlobalSettings,
|
||||
compare_by, GlobalSettings,
|
||||
};
|
||||
|
||||
// Merge already sorted files.
|
||||
pub fn merge<'a>(files: &[impl AsRef<OsStr>], settings: &'a GlobalSettings) -> FileMerger<'a> {
|
||||
pub fn merge_with_file_limit<F: ExactSizeIterator<Item = Box<dyn Read + Send>>>(
|
||||
files: F,
|
||||
settings: &GlobalSettings,
|
||||
) -> FileMerger {
|
||||
if files.len() > settings.merge_batch_size {
|
||||
let tmp_dir = tempfile::Builder::new()
|
||||
.prefix("uutils_sort")
|
||||
.tempdir_in(&settings.tmp_dir)
|
||||
.unwrap();
|
||||
let mut batch_number = 0;
|
||||
let mut remaining_files = files.len();
|
||||
let batches = files.chunks(settings.merge_batch_size);
|
||||
let mut batches = batches.into_iter();
|
||||
while batch_number + remaining_files > settings.merge_batch_size && remaining_files != 0 {
|
||||
remaining_files = remaining_files.saturating_sub(settings.merge_batch_size);
|
||||
let mut merger = merge_without_limit(batches.next().unwrap(), settings);
|
||||
let tmp_file = File::create(tmp_dir.path().join(batch_number.to_string())).unwrap();
|
||||
merger.write_all_to(settings, &mut BufWriter::new(tmp_file));
|
||||
batch_number += 1;
|
||||
}
|
||||
let batch_files = (0..batch_number).map(|n| {
|
||||
Box::new(File::open(tmp_dir.path().join(n.to_string())).unwrap())
|
||||
as Box<dyn Read + Send>
|
||||
});
|
||||
if batch_number > settings.merge_batch_size {
|
||||
assert!(batches.next().is_none());
|
||||
merge_with_file_limit(
|
||||
Box::new(batch_files) as Box<dyn ExactSizeIterator<Item = Box<dyn Read + Send>>>,
|
||||
settings,
|
||||
)
|
||||
} else {
|
||||
let final_batch = batches.next();
|
||||
assert!(batches.next().is_none());
|
||||
merge_without_limit(
|
||||
batch_files.chain(final_batch.into_iter().flatten()),
|
||||
settings,
|
||||
)
|
||||
}
|
||||
} else {
|
||||
merge_without_limit(files, settings)
|
||||
}
|
||||
}
|
||||
|
||||
/// Merge files without limiting how many files are concurrently open
|
||||
///
|
||||
/// It is the responsibility of the caller to ensure that `files` yields only
|
||||
/// as many files as we are allowed to open concurrently.
|
||||
fn merge_without_limit<F: Iterator<Item = Box<dyn Read + Send>>>(
|
||||
files: F,
|
||||
settings: &GlobalSettings,
|
||||
) -> FileMerger {
|
||||
let (request_sender, request_receiver) = channel();
|
||||
let mut reader_files = Vec::with_capacity(files.len());
|
||||
let mut loaded_receivers = Vec::with_capacity(files.len());
|
||||
for (file_number, file) in files.iter().map(open).enumerate() {
|
||||
let mut reader_files = Vec::with_capacity(files.size_hint().0);
|
||||
let mut loaded_receivers = Vec::with_capacity(files.size_hint().0);
|
||||
for (file_number, file) in files.enumerate() {
|
||||
let (sender, receiver) = sync_channel(2);
|
||||
loaded_receivers.push(receiver);
|
||||
reader_files.push(ReaderFile {
|
||||
|
@ -146,7 +197,11 @@ impl<'a> FileMerger<'a> {
|
|||
/// Write the merged contents to the output file.
|
||||
pub fn write_all(&mut self, settings: &GlobalSettings) {
|
||||
let mut out = settings.out_writer();
|
||||
while self.write_next(settings, &mut out) {}
|
||||
self.write_all_to(settings, &mut out);
|
||||
}
|
||||
|
||||
pub fn write_all_to(&mut self, settings: &GlobalSettings, out: &mut impl Write) {
|
||||
while self.write_next(settings, out) {}
|
||||
}
|
||||
|
||||
fn write_next(&mut self, settings: &GlobalSettings, out: &mut impl Write) -> bool {
|
||||
|
|
|
@ -95,6 +95,8 @@ static OPT_PARALLEL: &str = "parallel";
|
|||
static OPT_FILES0_FROM: &str = "files0-from";
|
||||
static OPT_BUF_SIZE: &str = "buffer-size";
|
||||
static OPT_TMP_DIR: &str = "temporary-directory";
|
||||
static OPT_COMPRESS_PROG: &str = "compress-program";
|
||||
static OPT_BATCH_SIZE: &str = "batch-size";
|
||||
|
||||
static ARG_FILES: &str = "files";
|
||||
|
||||
|
@ -155,6 +157,8 @@ pub struct GlobalSettings {
|
|||
zero_terminated: bool,
|
||||
buffer_size: usize,
|
||||
tmp_dir: PathBuf,
|
||||
compress_prog: Option<String>,
|
||||
merge_batch_size: usize,
|
||||
}
|
||||
|
||||
impl GlobalSettings {
|
||||
|
@ -223,6 +227,8 @@ impl Default for GlobalSettings {
|
|||
zero_terminated: false,
|
||||
buffer_size: DEFAULT_BUF_SIZE,
|
||||
tmp_dir: PathBuf::new(),
|
||||
compress_prog: None,
|
||||
merge_batch_size: 16,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1076,6 +1082,19 @@ pub fn uumain(args: impl uucore::Args) -> i32 {
|
|||
.takes_value(true)
|
||||
.value_name("DIR"),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name(OPT_COMPRESS_PROG)
|
||||
.long(OPT_COMPRESS_PROG)
|
||||
.help("compress temporary files with PROG, decompress with PROG -d")
|
||||
.long_help("PROG has to take input from stdin and output to stdout")
|
||||
.value_name("PROG")
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name(OPT_BATCH_SIZE)
|
||||
.long(OPT_BATCH_SIZE)
|
||||
.help("Merge at most N_MERGE inputs at once.")
|
||||
.value_name("N_MERGE")
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name(OPT_FILES0_FROM)
|
||||
.long(OPT_FILES0_FROM)
|
||||
|
@ -1165,6 +1184,14 @@ pub fn uumain(args: impl uucore::Args) -> i32 {
|
|||
.map(PathBuf::from)
|
||||
.unwrap_or_else(env::temp_dir);
|
||||
|
||||
settings.compress_prog = matches.value_of(OPT_COMPRESS_PROG).map(String::from);
|
||||
|
||||
if let Some(n_merge) = matches.value_of(OPT_BATCH_SIZE) {
|
||||
settings.merge_batch_size = n_merge
|
||||
.parse()
|
||||
.unwrap_or_else(|_| crash!(2, "invalid --batch-size argument '{}'", n_merge));
|
||||
}
|
||||
|
||||
settings.zero_terminated = matches.is_present(OPT_ZERO_TERMINATED);
|
||||
settings.merge = matches.is_present(OPT_MERGE);
|
||||
|
||||
|
@ -1240,7 +1267,7 @@ fn output_sorted_lines<'a>(iter: impl Iterator<Item = &'a Line<'a>>, settings: &
|
|||
|
||||
fn exec(files: &[String], settings: &GlobalSettings) -> i32 {
|
||||
if settings.merge {
|
||||
let mut file_merger = merge::merge(files, settings);
|
||||
let mut file_merger = merge::merge_with_file_limit(files.iter().map(open), settings);
|
||||
file_merger.write_all(settings);
|
||||
} else if settings.check {
|
||||
if files.len() > 1 {
|
||||
|
|
|
@ -792,3 +792,64 @@ fn test_nonexistent_file() {
|
|||
fn test_blanks() {
|
||||
test_helper("blanks", &["-b", "--ignore-blanks"]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sort_multiple() {
|
||||
new_ucmd!()
|
||||
.args(&["no_trailing_newline1.txt", "no_trailing_newline2.txt"])
|
||||
.succeeds()
|
||||
.stdout_is("a\nb\nb\n");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sort_empty_chunk() {
|
||||
new_ucmd!()
|
||||
.args(&["-S", "40B"])
|
||||
.pipe_in("a\na\n")
|
||||
.succeeds()
|
||||
.stdout_is("a\na\n");
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg(target_os = "linux")]
|
||||
fn test_compress() {
|
||||
new_ucmd!()
|
||||
.args(&[
|
||||
"ext_sort.txt",
|
||||
"-n",
|
||||
"--compress-program",
|
||||
"gzip",
|
||||
"-S",
|
||||
"10",
|
||||
])
|
||||
.succeeds()
|
||||
.stdout_only_fixture("ext_sort.expected");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_compress_fail() {
|
||||
new_ucmd!()
|
||||
.args(&[
|
||||
"ext_sort.txt",
|
||||
"-n",
|
||||
"--compress-program",
|
||||
"nonexistent-program",
|
||||
"-S",
|
||||
"10",
|
||||
])
|
||||
.fails()
|
||||
.stderr_only("sort: couldn't execute compress program: errno 2");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_merge_batches() {
|
||||
new_ucmd!()
|
||||
.args(&[
|
||||
"ext_sort.txt",
|
||||
"-n",
|
||||
"-S",
|
||||
"150B",
|
||||
])
|
||||
.succeeds()
|
||||
.stdout_only_fixture("ext_sort.expected");
|
||||
}
|
||||
|
|
2
tests/fixtures/sort/no_trailing_newline1.txt
vendored
Normal file
2
tests/fixtures/sort/no_trailing_newline1.txt
vendored
Normal file
|
@ -0,0 +1,2 @@
|
|||
a
|
||||
b
|
1
tests/fixtures/sort/no_trailing_newline2.txt
vendored
Normal file
1
tests/fixtures/sort/no_trailing_newline2.txt
vendored
Normal file
|
@ -0,0 +1 @@
|
|||
b
|
Loading…
Reference in a new issue