mirror of
https://github.com/uutils/coreutils
synced 2024-11-16 17:58:06 +00:00
sort
: delete temporary files as soon as possible
- When we have finished reading from a temproary file, we can immediately delete it. - Use one single directory for all temporary files. - Only create the temporary directory when needed. - Also compress temporary files created by the merge step if requested.
This commit is contained in:
parent
83a8ec1a67
commit
956ff57e2e
5 changed files with 449 additions and 227 deletions
|
@ -36,7 +36,7 @@ pub fn check(path: &str, settings: &GlobalSettings) -> i32 {
|
|||
for _ in 0..2 {
|
||||
recycled_sender
|
||||
.send(Chunk::new(vec![0; 100 * 1024], |_| Vec::new()))
|
||||
.unwrap();
|
||||
.ok();
|
||||
}
|
||||
|
||||
let mut prev_chunk: Option<Chunk> = None;
|
||||
|
@ -80,12 +80,11 @@ fn reader(
|
|||
sender: SyncSender<Chunk>,
|
||||
settings: &GlobalSettings,
|
||||
) {
|
||||
let mut sender = Some(sender);
|
||||
let mut carry_over = vec![];
|
||||
for chunk in receiver.iter() {
|
||||
let (recycled_lines, recycled_buffer) = chunk.recycle();
|
||||
chunks::read(
|
||||
&mut sender,
|
||||
let should_continue = chunks::read(
|
||||
&sender,
|
||||
recycled_buffer,
|
||||
None,
|
||||
&mut carry_over,
|
||||
|
@ -98,6 +97,9 @@ fn reader(
|
|||
},
|
||||
recycled_lines,
|
||||
settings,
|
||||
)
|
||||
);
|
||||
if !should_continue {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -52,17 +52,17 @@ impl Chunk {
|
|||
|
||||
/// Read a chunk, parse lines and send them.
|
||||
///
|
||||
/// No empty chunk will be sent. If we reach the end of the input, sender_option
|
||||
/// is set to None. If this function however does not set sender_option to None,
|
||||
/// it is not guaranteed that there is still input left: If the input fits _exactly_
|
||||
/// into a buffer, we will only notice that there's nothing more to read at the next
|
||||
/// invocation.
|
||||
/// No empty chunk will be sent. If we reach the end of the input, `false` is returned.
|
||||
/// However, if this function returns `true`, it is not guaranteed that there is still
|
||||
/// input left: If the input fits _exactly_ into a buffer, we will only notice that there's
|
||||
/// nothing more to read at the next invocation. In case there is no input left, nothing will
|
||||
/// be sent.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// (see also `read_to_chunk` for a more detailed documentation)
|
||||
///
|
||||
/// * `sender_option`: The sender to send the lines to the sorter. If `None`, this function does nothing.
|
||||
/// * `sender`: The sender to send the lines to the sorter.
|
||||
/// * `buffer`: The recycled buffer. All contents will be overwritten, but it must already be filled.
|
||||
/// (i.e. `buffer.len()` should be equal to `buffer.capacity()`)
|
||||
/// * `max_buffer_size`: How big `buffer` can be.
|
||||
|
@ -73,52 +73,47 @@ impl Chunk {
|
|||
/// * `lines`: The recycled vector to fill with lines. Must be empty.
|
||||
/// * `settings`: The global settings.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[allow(clippy::borrowed_box)]
|
||||
pub fn read(
|
||||
sender_option: &mut Option<SyncSender<Chunk>>,
|
||||
pub fn read<T: Read>(
|
||||
sender: &SyncSender<Chunk>,
|
||||
mut buffer: Vec<u8>,
|
||||
max_buffer_size: Option<usize>,
|
||||
carry_over: &mut Vec<u8>,
|
||||
file: &mut Box<dyn Read + Send>,
|
||||
next_files: &mut impl Iterator<Item = Box<dyn Read + Send>>,
|
||||
file: &mut T,
|
||||
next_files: &mut impl Iterator<Item = T>,
|
||||
separator: u8,
|
||||
lines: Vec<Line<'static>>,
|
||||
settings: &GlobalSettings,
|
||||
) {
|
||||
) -> bool {
|
||||
assert!(lines.is_empty());
|
||||
if let Some(sender) = sender_option {
|
||||
if buffer.len() < carry_over.len() {
|
||||
buffer.resize(carry_over.len() + 10 * 1024, 0);
|
||||
}
|
||||
buffer[..carry_over.len()].copy_from_slice(carry_over);
|
||||
let (read, should_continue) = read_to_buffer(
|
||||
file,
|
||||
next_files,
|
||||
&mut buffer,
|
||||
max_buffer_size,
|
||||
carry_over.len(),
|
||||
separator,
|
||||
);
|
||||
carry_over.clear();
|
||||
carry_over.extend_from_slice(&buffer[read..]);
|
||||
|
||||
if read != 0 {
|
||||
let payload = Chunk::new(buffer, |buf| {
|
||||
let mut lines = unsafe {
|
||||
// SAFETY: It is safe to transmute to a vector of lines with shorter lifetime,
|
||||
// because it was only temporarily transmuted to a Vec<Line<'static>> to make recycling possible.
|
||||
std::mem::transmute::<Vec<Line<'static>>, Vec<Line<'_>>>(lines)
|
||||
};
|
||||
let read = crash_if_err!(1, std::str::from_utf8(&buf[..read]));
|
||||
parse_lines(read, &mut lines, separator, settings);
|
||||
lines
|
||||
});
|
||||
sender.send(payload).unwrap();
|
||||
}
|
||||
if !should_continue {
|
||||
*sender_option = None;
|
||||
}
|
||||
if buffer.len() < carry_over.len() {
|
||||
buffer.resize(carry_over.len() + 10 * 1024, 0);
|
||||
}
|
||||
buffer[..carry_over.len()].copy_from_slice(carry_over);
|
||||
let (read, should_continue) = read_to_buffer(
|
||||
file,
|
||||
next_files,
|
||||
&mut buffer,
|
||||
max_buffer_size,
|
||||
carry_over.len(),
|
||||
separator,
|
||||
);
|
||||
carry_over.clear();
|
||||
carry_over.extend_from_slice(&buffer[read..]);
|
||||
|
||||
if read != 0 {
|
||||
let payload = Chunk::new(buffer, |buf| {
|
||||
let mut lines = unsafe {
|
||||
// SAFETY: It is safe to transmute to a vector of lines with shorter lifetime,
|
||||
// because it was only temporarily transmuted to a Vec<Line<'static>> to make recycling possible.
|
||||
std::mem::transmute::<Vec<Line<'static>>, Vec<Line<'_>>>(lines)
|
||||
};
|
||||
let read = crash_if_err!(1, std::str::from_utf8(&buf[..read]));
|
||||
parse_lines(read, &mut lines, separator, settings);
|
||||
lines
|
||||
});
|
||||
sender.send(payload).unwrap();
|
||||
}
|
||||
should_continue
|
||||
}
|
||||
|
||||
/// Split `read` into `Line`s, and add them to `lines`.
|
||||
|
@ -165,10 +160,9 @@ fn parse_lines<'a>(
|
|||
/// The remaining bytes must be copied to the start of the buffer for the next invocation,
|
||||
/// if another invocation is necessary, which is determined by the other return value.
|
||||
/// * Whether this function should be called again.
|
||||
#[allow(clippy::borrowed_box)]
|
||||
fn read_to_buffer(
|
||||
file: &mut Box<dyn Read + Send>,
|
||||
next_files: &mut impl Iterator<Item = Box<dyn Read + Send>>,
|
||||
fn read_to_buffer<T: Read>(
|
||||
file: &mut T,
|
||||
next_files: &mut impl Iterator<Item = T>,
|
||||
buffer: &mut Vec<u8>,
|
||||
max_buffer_size: Option<usize>,
|
||||
start_offset: usize,
|
||||
|
|
|
@ -12,14 +12,10 @@
|
|||
//! The buffers for the individual chunks are recycled. There are two buffers.
|
||||
|
||||
use std::cmp::Ordering;
|
||||
use std::fs::File;
|
||||
use std::io::BufReader;
|
||||
use std::io::{BufWriter, Write};
|
||||
use std::io::Write;
|
||||
use std::path::Path;
|
||||
use std::process::Child;
|
||||
use std::process::{Command, Stdio};
|
||||
use std::path::PathBuf;
|
||||
use std::{
|
||||
fs::OpenOptions,
|
||||
io::Read,
|
||||
sync::mpsc::{Receiver, SyncSender},
|
||||
thread,
|
||||
|
@ -27,85 +23,91 @@ use std::{
|
|||
|
||||
use itertools::Itertools;
|
||||
|
||||
use tempfile::TempDir;
|
||||
|
||||
use crate::merge::ClosedTmpFile;
|
||||
use crate::merge::WriteableCompressedTmpFile;
|
||||
use crate::merge::WriteablePlainTmpFile;
|
||||
use crate::merge::WriteableTmpFile;
|
||||
use crate::Line;
|
||||
use crate::{
|
||||
chunks::{self, Chunk},
|
||||
compare_by, merge, output_sorted_lines, sort_by, GlobalSettings,
|
||||
};
|
||||
use tempfile::TempDir;
|
||||
|
||||
const START_BUFFER_SIZE: usize = 8_000;
|
||||
|
||||
/// Sort files by using auxiliary files for storing intermediate chunks (if needed), and output the result.
|
||||
pub fn ext_sort(files: &mut impl Iterator<Item = Box<dyn Read + Send>>, settings: &GlobalSettings) {
|
||||
let tmp_dir = crash_if_err!(
|
||||
1,
|
||||
tempfile::Builder::new()
|
||||
.prefix("uutils_sort")
|
||||
.tempdir_in(&settings.tmp_dir)
|
||||
);
|
||||
let (sorted_sender, sorted_receiver) = std::sync::mpsc::sync_channel(1);
|
||||
let (recycled_sender, recycled_receiver) = std::sync::mpsc::sync_channel(1);
|
||||
thread::spawn({
|
||||
let settings = settings.clone();
|
||||
move || sorter(recycled_receiver, sorted_sender, settings)
|
||||
});
|
||||
let read_result = reader_writer(
|
||||
if settings.compress_prog.is_some() {
|
||||
reader_writer::<_, WriteableCompressedTmpFile>(
|
||||
files,
|
||||
settings,
|
||||
sorted_receiver,
|
||||
recycled_sender,
|
||||
);
|
||||
} else {
|
||||
reader_writer::<_, WriteablePlainTmpFile>(
|
||||
files,
|
||||
settings,
|
||||
sorted_receiver,
|
||||
recycled_sender,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
fn reader_writer<F: Iterator<Item = Box<dyn Read + Send>>, Tmp: WriteableTmpFile + 'static>(
|
||||
files: F,
|
||||
settings: &GlobalSettings,
|
||||
receiver: Receiver<Chunk>,
|
||||
sender: SyncSender<Chunk>,
|
||||
) {
|
||||
let separator = if settings.zero_terminated {
|
||||
b'\0'
|
||||
} else {
|
||||
b'\n'
|
||||
};
|
||||
|
||||
// Heuristically chosen: Dividing by 10 seems to keep our memory usage roughly
|
||||
// around settings.buffer_size as a whole.
|
||||
let buffer_size = settings.buffer_size / 10;
|
||||
let read_result: ReadResult<Tmp> = read_write_loop(
|
||||
files,
|
||||
&tmp_dir,
|
||||
if settings.zero_terminated {
|
||||
b'\0'
|
||||
} else {
|
||||
b'\n'
|
||||
},
|
||||
&settings.tmp_dir,
|
||||
separator,
|
||||
// Heuristically chosen: Dividing by 10 seems to keep our memory usage roughly
|
||||
// around settings.buffer_size as a whole.
|
||||
settings.buffer_size / 10,
|
||||
settings.clone(),
|
||||
sorted_receiver,
|
||||
recycled_sender,
|
||||
buffer_size,
|
||||
&settings,
|
||||
receiver,
|
||||
sender,
|
||||
);
|
||||
match read_result {
|
||||
ReadResult::WroteChunksToFile { chunks_written } => {
|
||||
let mut children = Vec::new();
|
||||
let files = (0..chunks_written).map(|chunk_num| {
|
||||
let file_path = tmp_dir.path().join(chunk_num.to_string());
|
||||
let file = File::open(file_path).unwrap();
|
||||
if let Some(compress_prog) = &settings.compress_prog {
|
||||
let mut command = Command::new(compress_prog);
|
||||
command.stdin(file).stdout(Stdio::piped()).arg("-d");
|
||||
let mut child = crash_if_err!(
|
||||
2,
|
||||
command.spawn().map_err(|err| format!(
|
||||
"couldn't execute compress program: errno {}",
|
||||
err.raw_os_error().unwrap()
|
||||
))
|
||||
);
|
||||
let child_stdout = child.stdout.take().unwrap();
|
||||
children.push(child);
|
||||
Box::new(BufReader::new(child_stdout)) as Box<dyn Read + Send>
|
||||
} else {
|
||||
Box::new(BufReader::new(file)) as Box<dyn Read + Send>
|
||||
}
|
||||
});
|
||||
let mut merger = merge::merge_with_file_limit(files, settings);
|
||||
for child in children {
|
||||
assert_child_success(child, settings.compress_prog.as_ref().unwrap());
|
||||
}
|
||||
merger.write_all(settings);
|
||||
ReadResult::WroteChunksToFile { tmp_files, tmp_dir } => {
|
||||
let tmp_dir_size = tmp_files.len();
|
||||
let mut merger = merge::merge_with_file_limit::<_, _, Tmp>(
|
||||
tmp_files.into_iter().map(|c| c.reopen()),
|
||||
&settings,
|
||||
Some((tmp_dir, tmp_dir_size)),
|
||||
);
|
||||
merger.write_all(&settings);
|
||||
}
|
||||
ReadResult::SortedSingleChunk(chunk) => {
|
||||
output_sorted_lines(chunk.borrow_lines().iter(), settings);
|
||||
output_sorted_lines(chunk.borrow_lines().iter(), &settings);
|
||||
}
|
||||
ReadResult::SortedTwoChunks([a, b]) => {
|
||||
let merged_iter = a
|
||||
.borrow_lines()
|
||||
.iter()
|
||||
.merge_by(b.borrow_lines().iter(), |line_a, line_b| {
|
||||
compare_by(line_a, line_b, settings) != Ordering::Greater
|
||||
compare_by(line_a, line_b, &settings) != Ordering::Greater
|
||||
});
|
||||
output_sorted_lines(merged_iter, settings);
|
||||
output_sorted_lines(merged_iter, &settings);
|
||||
}
|
||||
ReadResult::EmptyInput => {
|
||||
// don't output anything
|
||||
|
@ -122,7 +124,7 @@ fn sorter(receiver: Receiver<Chunk>, sender: SyncSender<Chunk>, settings: Global
|
|||
}
|
||||
|
||||
/// Describes how we read the chunks from the input.
|
||||
enum ReadResult {
|
||||
enum ReadResult<I: WriteableTmpFile> {
|
||||
/// The input was empty. Nothing was read.
|
||||
EmptyInput,
|
||||
/// The input fits into a single Chunk, which was kept in memory.
|
||||
|
@ -131,33 +133,27 @@ enum ReadResult {
|
|||
SortedTwoChunks([Chunk; 2]),
|
||||
/// The input was read into multiple chunks, which were written to auxiliary files.
|
||||
WroteChunksToFile {
|
||||
/// The number of chunks written to auxiliary files.
|
||||
chunks_written: usize,
|
||||
tmp_files: Vec<I::Closed>,
|
||||
tmp_dir: TempDir,
|
||||
},
|
||||
}
|
||||
|
||||
/// The function that is executed on the reader/writer thread.
|
||||
///
|
||||
/// # Returns
|
||||
/// * The number of chunks read.
|
||||
fn reader_writer(
|
||||
fn read_write_loop<I: WriteableTmpFile>(
|
||||
mut files: impl Iterator<Item = Box<dyn Read + Send>>,
|
||||
tmp_dir: &TempDir,
|
||||
tmp_dir_parent: &Path,
|
||||
separator: u8,
|
||||
buffer_size: usize,
|
||||
settings: GlobalSettings,
|
||||
settings: &GlobalSettings,
|
||||
receiver: Receiver<Chunk>,
|
||||
sender: SyncSender<Chunk>,
|
||||
) -> ReadResult {
|
||||
let mut sender_option = Some(sender);
|
||||
|
||||
) -> ReadResult<I> {
|
||||
let mut file = files.next().unwrap();
|
||||
|
||||
let mut carry_over = vec![];
|
||||
// kick things off with two reads
|
||||
for _ in 0..2 {
|
||||
chunks::read(
|
||||
&mut sender_option,
|
||||
let should_continue = chunks::read(
|
||||
&sender,
|
||||
vec![
|
||||
0;
|
||||
if START_BUFFER_SIZE < buffer_size {
|
||||
|
@ -172,9 +168,11 @@ fn reader_writer(
|
|||
&mut files,
|
||||
separator,
|
||||
Vec::new(),
|
||||
&settings,
|
||||
settings,
|
||||
);
|
||||
if sender_option.is_none() {
|
||||
|
||||
if !should_continue {
|
||||
drop(sender);
|
||||
// We have already read the whole input. Since we are in our first two reads,
|
||||
// this means that we can fit the whole input into memory. Bypass writing below and
|
||||
// handle this case in a more straightforward way.
|
||||
|
@ -190,68 +188,69 @@ fn reader_writer(
|
|||
}
|
||||
}
|
||||
|
||||
let tmp_dir = crash_if_err!(
|
||||
1,
|
||||
tempfile::Builder::new()
|
||||
.prefix("uutils_sort")
|
||||
.tempdir_in(tmp_dir_parent)
|
||||
);
|
||||
|
||||
let mut sender_option = Some(sender);
|
||||
let mut file_number = 0;
|
||||
let mut tmp_files = vec![];
|
||||
loop {
|
||||
let mut chunk = match receiver.recv() {
|
||||
Ok(it) => it,
|
||||
_ => {
|
||||
return ReadResult::WroteChunksToFile {
|
||||
chunks_written: file_number,
|
||||
}
|
||||
return ReadResult::WroteChunksToFile { tmp_files, tmp_dir };
|
||||
}
|
||||
};
|
||||
|
||||
write(
|
||||
let tmp_file = write::<I>(
|
||||
&mut chunk,
|
||||
&tmp_dir.path().join(file_number.to_string()),
|
||||
tmp_dir.path().join(file_number.to_string()),
|
||||
settings.compress_prog.as_deref(),
|
||||
separator,
|
||||
);
|
||||
tmp_files.push(tmp_file);
|
||||
|
||||
file_number += 1;
|
||||
|
||||
let (recycled_lines, recycled_buffer) = chunk.recycle();
|
||||
|
||||
chunks::read(
|
||||
&mut sender_option,
|
||||
recycled_buffer,
|
||||
None,
|
||||
&mut carry_over,
|
||||
&mut file,
|
||||
&mut files,
|
||||
separator,
|
||||
recycled_lines,
|
||||
&settings,
|
||||
);
|
||||
if let Some(sender) = &sender_option {
|
||||
let should_continue = chunks::read(
|
||||
&sender,
|
||||
recycled_buffer,
|
||||
None,
|
||||
&mut carry_over,
|
||||
&mut file,
|
||||
&mut files,
|
||||
separator,
|
||||
recycled_lines,
|
||||
settings,
|
||||
);
|
||||
if !should_continue {
|
||||
sender_option = None;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Write the lines in `chunk` to `file`, separated by `separator`.
|
||||
/// `compress_prog` is used to optionally compress file contents.
|
||||
fn write(chunk: &mut Chunk, file: &Path, compress_prog: Option<&str>, separator: u8) {
|
||||
fn write<I: WriteableTmpFile>(
|
||||
chunk: &mut Chunk,
|
||||
file: PathBuf,
|
||||
compress_prog: Option<&str>,
|
||||
separator: u8,
|
||||
) -> I::Closed {
|
||||
chunk.with_lines_mut(|lines| {
|
||||
// Write the lines to the file
|
||||
let file = crash_if_err!(1, OpenOptions::new().create(true).write(true).open(file));
|
||||
if let Some(compress_prog) = compress_prog {
|
||||
let mut command = Command::new(compress_prog);
|
||||
command.stdin(Stdio::piped()).stdout(file);
|
||||
let mut child = crash_if_err!(
|
||||
2,
|
||||
command.spawn().map_err(|err| format!(
|
||||
"couldn't execute compress program: errno {}",
|
||||
err.raw_os_error().unwrap()
|
||||
))
|
||||
);
|
||||
let mut writer = BufWriter::new(child.stdin.take().unwrap());
|
||||
write_lines(lines, &mut writer, separator);
|
||||
writer.flush().unwrap();
|
||||
drop(writer);
|
||||
assert_child_success(child, compress_prog);
|
||||
} else {
|
||||
let mut writer = BufWriter::new(file);
|
||||
write_lines(lines, &mut writer, separator);
|
||||
};
|
||||
});
|
||||
let mut tmp_file = I::create(file, compress_prog);
|
||||
write_lines(lines, tmp_file.as_write(), separator);
|
||||
tmp_file.finished_writing()
|
||||
})
|
||||
}
|
||||
|
||||
fn write_lines<'a, T: Write>(lines: &[Line<'a>], writer: &mut T, separator: u8) {
|
||||
|
@ -260,12 +259,3 @@ fn write_lines<'a, T: Write>(lines: &[Line<'a>], writer: &mut T, separator: u8)
|
|||
crash_if_err!(1, writer.write_all(&[separator]));
|
||||
}
|
||||
}
|
||||
|
||||
fn assert_child_success(mut child: Child, program: &str) {
|
||||
if !matches!(
|
||||
child.wait().map(|e| e.code()),
|
||||
Ok(Some(0)) | Ok(None) | Err(_)
|
||||
) {
|
||||
crash!(2, "'{}' terminated abnormally", program)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,9 +9,11 @@
|
|||
|
||||
use std::{
|
||||
cmp::Ordering,
|
||||
fs::File,
|
||||
fs::{self, File},
|
||||
io::{BufWriter, Read, Write},
|
||||
iter,
|
||||
path::PathBuf,
|
||||
process::{Child, ChildStdin, ChildStdout, Command, Stdio},
|
||||
rc::Rc,
|
||||
sync::mpsc::{channel, sync_channel, Receiver, Sender, SyncSender},
|
||||
thread,
|
||||
|
@ -19,61 +21,94 @@ use std::{
|
|||
|
||||
use compare::Compare;
|
||||
use itertools::Itertools;
|
||||
use tempfile::TempDir;
|
||||
|
||||
use crate::{
|
||||
chunks::{self, Chunk},
|
||||
compare_by, GlobalSettings,
|
||||
};
|
||||
|
||||
// Merge already sorted files.
|
||||
pub fn merge_with_file_limit<F: ExactSizeIterator<Item = Box<dyn Read + Send>>>(
|
||||
files: F,
|
||||
/// Merge pre-sorted `Box<dyn Read>`s.
|
||||
///
|
||||
/// If `settings.merge_batch_size` is greater than the length of `files`, intermediate files will be used.
|
||||
/// If `settings.compress_prog` is `Some`, intermediate files will be compressed with it.
|
||||
pub fn merge<Files: ExactSizeIterator<Item = Box<dyn Read + Send>>>(
|
||||
files: Files,
|
||||
settings: &GlobalSettings,
|
||||
) -> FileMerger {
|
||||
if settings.compress_prog.is_none() {
|
||||
merge_with_file_limit::<_, _, WriteablePlainTmpFile>(
|
||||
files.map(|file| PlainMergeInput { inner: file }),
|
||||
settings,
|
||||
None,
|
||||
)
|
||||
} else {
|
||||
merge_with_file_limit::<_, _, WriteableCompressedTmpFile>(
|
||||
files.map(|file| PlainMergeInput { inner: file }),
|
||||
settings,
|
||||
None,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// Merge already sorted `MergeInput`s.
|
||||
pub fn merge_with_file_limit<
|
||||
M: MergeInput + 'static,
|
||||
F: ExactSizeIterator<Item = M>,
|
||||
Tmp: WriteableTmpFile + 'static,
|
||||
>(
|
||||
files: F,
|
||||
settings: &GlobalSettings,
|
||||
tmp_dir: Option<(TempDir, usize)>,
|
||||
) -> FileMerger {
|
||||
if files.len() > settings.merge_batch_size {
|
||||
let tmp_dir = tempfile::Builder::new()
|
||||
.prefix("uutils_sort")
|
||||
.tempdir_in(&settings.tmp_dir)
|
||||
.unwrap();
|
||||
let mut batch_number = 0;
|
||||
// If we did not get a tmp_dir, create one.
|
||||
let (tmp_dir, mut tmp_dir_size) = tmp_dir.unwrap_or_else(|| {
|
||||
(
|
||||
tempfile::Builder::new()
|
||||
.prefix("uutils_sort")
|
||||
.tempdir_in(&settings.tmp_dir)
|
||||
.unwrap(),
|
||||
0,
|
||||
)
|
||||
});
|
||||
let mut remaining_files = files.len();
|
||||
let batches = files.chunks(settings.merge_batch_size);
|
||||
let mut batches = batches.into_iter();
|
||||
while batch_number + remaining_files > settings.merge_batch_size && remaining_files != 0 {
|
||||
let mut temporary_files = vec![];
|
||||
while remaining_files != 0 {
|
||||
// Work around the fact that `Chunks` is not an `ExactSizeIterator`.
|
||||
remaining_files = remaining_files.saturating_sub(settings.merge_batch_size);
|
||||
let mut merger = merge_without_limit(batches.next().unwrap(), settings);
|
||||
let tmp_file = File::create(tmp_dir.path().join(batch_number.to_string())).unwrap();
|
||||
merger.write_all_to(settings, &mut BufWriter::new(tmp_file));
|
||||
batch_number += 1;
|
||||
}
|
||||
let batch_files = (0..batch_number).map(|n| {
|
||||
Box::new(File::open(tmp_dir.path().join(n.to_string())).unwrap())
|
||||
as Box<dyn Read + Send>
|
||||
});
|
||||
if batch_number > settings.merge_batch_size {
|
||||
assert!(batches.next().is_none());
|
||||
merge_with_file_limit(
|
||||
Box::new(batch_files) as Box<dyn ExactSizeIterator<Item = Box<dyn Read + Send>>>,
|
||||
settings,
|
||||
)
|
||||
} else {
|
||||
let final_batch = batches.next();
|
||||
assert!(batches.next().is_none());
|
||||
merge_without_limit(
|
||||
batch_files.chain(final_batch.into_iter().flatten()),
|
||||
settings,
|
||||
)
|
||||
let mut tmp_file = Tmp::create(
|
||||
tmp_dir.path().join(tmp_dir_size.to_string()),
|
||||
settings.compress_prog.as_deref(),
|
||||
);
|
||||
tmp_dir_size += 1;
|
||||
merger.write_all_to(settings, tmp_file.as_write());
|
||||
temporary_files.push(tmp_file.finished_writing());
|
||||
}
|
||||
assert!(batches.next().is_none());
|
||||
merge_with_file_limit::<_, _, Tmp>(
|
||||
temporary_files
|
||||
.into_iter()
|
||||
.map(Box::new(|c: Tmp::Closed| c.reopen())
|
||||
as Box<
|
||||
dyn FnMut(Tmp::Closed) -> <Tmp::Closed as ClosedTmpFile>::Reopened,
|
||||
>),
|
||||
settings,
|
||||
Some((tmp_dir, tmp_dir_size)),
|
||||
)
|
||||
} else {
|
||||
merge_without_limit(files, settings)
|
||||
}
|
||||
}
|
||||
|
||||
/// Merge files without limiting how many files are concurrently open
|
||||
/// Merge files without limiting how many files are concurrently open.
|
||||
///
|
||||
/// It is the responsibility of the caller to ensure that `files` yields only
|
||||
/// as many files as we are allowed to open concurrently.
|
||||
fn merge_without_limit<F: Iterator<Item = Box<dyn Read + Send>>>(
|
||||
fn merge_without_limit<M: MergeInput + 'static, F: Iterator<Item = M>>(
|
||||
files: F,
|
||||
settings: &GlobalSettings,
|
||||
) -> FileMerger {
|
||||
|
@ -83,16 +118,18 @@ fn merge_without_limit<F: Iterator<Item = Box<dyn Read + Send>>>(
|
|||
for (file_number, file) in files.enumerate() {
|
||||
let (sender, receiver) = sync_channel(2);
|
||||
loaded_receivers.push(receiver);
|
||||
reader_files.push(ReaderFile {
|
||||
reader_files.push(Some(ReaderFile {
|
||||
file,
|
||||
sender: Some(sender),
|
||||
sender,
|
||||
carry_over: vec![],
|
||||
});
|
||||
}));
|
||||
// Send the initial chunk to trigger a read for each file
|
||||
request_sender
|
||||
.send((file_number, Chunk::new(vec![0; 8 * 1024], |_| Vec::new())))
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
// Send the second chunk for each file
|
||||
for file_number in 0..reader_files.len() {
|
||||
request_sender
|
||||
.send((file_number, Chunk::new(vec![0; 8 * 1024], |_| Vec::new())))
|
||||
|
@ -136,37 +173,45 @@ fn merge_without_limit<F: Iterator<Item = Box<dyn Read + Send>>>(
|
|||
}
|
||||
}
|
||||
/// The struct on the reader thread representing an input file
|
||||
struct ReaderFile {
|
||||
file: Box<dyn Read + Send>,
|
||||
sender: Option<SyncSender<Chunk>>,
|
||||
struct ReaderFile<M: MergeInput> {
|
||||
file: M,
|
||||
sender: SyncSender<Chunk>,
|
||||
carry_over: Vec<u8>,
|
||||
}
|
||||
|
||||
/// The function running on the reader thread.
|
||||
fn reader(
|
||||
recycled_receiver: Receiver<(usize, Chunk)>,
|
||||
files: &mut [ReaderFile],
|
||||
files: &mut [Option<ReaderFile<impl MergeInput>>],
|
||||
settings: &GlobalSettings,
|
||||
separator: u8,
|
||||
) {
|
||||
for (file_idx, chunk) in recycled_receiver.iter() {
|
||||
let (recycled_lines, recycled_buffer) = chunk.recycle();
|
||||
let ReaderFile {
|
||||
if let Some(ReaderFile {
|
||||
file,
|
||||
sender,
|
||||
carry_over,
|
||||
} = &mut files[file_idx];
|
||||
chunks::read(
|
||||
sender,
|
||||
recycled_buffer,
|
||||
None,
|
||||
carry_over,
|
||||
file,
|
||||
&mut iter::empty(),
|
||||
separator,
|
||||
recycled_lines,
|
||||
settings,
|
||||
);
|
||||
}) = &mut files[file_idx]
|
||||
{
|
||||
let should_continue = chunks::read(
|
||||
sender,
|
||||
recycled_buffer,
|
||||
None,
|
||||
carry_over,
|
||||
file.as_read(),
|
||||
&mut iter::empty(),
|
||||
separator,
|
||||
recycled_lines,
|
||||
settings,
|
||||
);
|
||||
if !should_continue {
|
||||
// Remove the file from the list by replacing it with `None`.
|
||||
let ReaderFile { file, .. } = files[file_idx].take().unwrap();
|
||||
// Depending on the kind of the `MergeInput`, this may delete the file:
|
||||
file.finished_reading();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
/// The struct on the main thread representing an input file
|
||||
|
@ -241,11 +286,14 @@ impl<'a> FileMerger<'a> {
|
|||
self.heap.pop();
|
||||
}
|
||||
} else {
|
||||
// This will cause the comparison to use a different line and the heap to readjust.
|
||||
self.heap.peek_mut().unwrap().line_idx += 1;
|
||||
}
|
||||
|
||||
if let Some(prev) = prev {
|
||||
if let Ok(prev_chunk) = Rc::try_unwrap(prev.chunk) {
|
||||
// If nothing is referencing the previous chunk anymore, this means that the previous line
|
||||
// was the last line of the chunk. We can recycle the chunk.
|
||||
self.request_sender
|
||||
.send((prev.file_number, prev_chunk))
|
||||
.ok();
|
||||
|
@ -273,7 +321,195 @@ impl<'a> Compare<MergeableFile> for FileComparator<'a> {
|
|||
// as lines from a file with a lower number are to be considered "earlier".
|
||||
cmp = a.file_number.cmp(&b.file_number);
|
||||
}
|
||||
// Our BinaryHeap is a max heap. We use it as a min heap, so we need to reverse the ordering.
|
||||
// BinaryHeap is a max heap. We use it as a min heap, so we need to reverse the ordering.
|
||||
cmp.reverse()
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for the child to exit and check its exit code.
|
||||
fn assert_child_success(mut child: Child, program: &str) {
|
||||
if !matches!(
|
||||
child.wait().map(|e| e.code()),
|
||||
Ok(Some(0)) | Ok(None) | Err(_)
|
||||
) {
|
||||
crash!(2, "'{}' terminated abnormally", program)
|
||||
}
|
||||
}
|
||||
|
||||
/// A temporary file that can be written to.
|
||||
pub trait WriteableTmpFile {
|
||||
type Closed: ClosedTmpFile;
|
||||
type InnerWrite: Write;
|
||||
fn create(path: PathBuf, compress_prog: Option<&str>) -> Self;
|
||||
/// Closes the temporary file.
|
||||
fn finished_writing(self) -> Self::Closed;
|
||||
fn as_write(&mut self) -> &mut Self::InnerWrite;
|
||||
}
|
||||
/// A temporary file that is (temporarily) closed, but can be reopened.
|
||||
pub trait ClosedTmpFile {
|
||||
type Reopened: MergeInput;
|
||||
/// Reopens the temporary file.
|
||||
fn reopen(self) -> Self::Reopened;
|
||||
}
|
||||
/// A pre-sorted input for merging.
|
||||
pub trait MergeInput: Send {
|
||||
type InnerRead: Read;
|
||||
/// Cleans this `MergeInput` up.
|
||||
/// Implementations may delete the backing file.
|
||||
fn finished_reading(self);
|
||||
fn as_read(&mut self) -> &mut Self::InnerRead;
|
||||
}
|
||||
|
||||
pub struct WriteablePlainTmpFile {
|
||||
path: PathBuf,
|
||||
file: BufWriter<File>,
|
||||
}
|
||||
pub struct ClosedPlainTmpFile {
|
||||
path: PathBuf,
|
||||
}
|
||||
pub struct PlainTmpMergeInput {
|
||||
path: PathBuf,
|
||||
file: File,
|
||||
}
|
||||
impl WriteableTmpFile for WriteablePlainTmpFile {
|
||||
type Closed = ClosedPlainTmpFile;
|
||||
type InnerWrite = BufWriter<File>;
|
||||
|
||||
fn create(path: PathBuf, _: Option<&str>) -> Self {
|
||||
WriteablePlainTmpFile {
|
||||
file: BufWriter::new(File::create(&path).unwrap()),
|
||||
path,
|
||||
}
|
||||
}
|
||||
|
||||
fn finished_writing(self) -> Self::Closed {
|
||||
ClosedPlainTmpFile { path: self.path }
|
||||
}
|
||||
|
||||
fn as_write(&mut self) -> &mut Self::InnerWrite {
|
||||
&mut self.file
|
||||
}
|
||||
}
|
||||
impl ClosedTmpFile for ClosedPlainTmpFile {
|
||||
type Reopened = PlainTmpMergeInput;
|
||||
fn reopen(self) -> Self::Reopened {
|
||||
PlainTmpMergeInput {
|
||||
file: File::open(&self.path).unwrap(),
|
||||
path: self.path,
|
||||
}
|
||||
}
|
||||
}
|
||||
impl MergeInput for PlainTmpMergeInput {
|
||||
type InnerRead = File;
|
||||
|
||||
fn finished_reading(self) {
|
||||
fs::remove_file(self.path).ok();
|
||||
}
|
||||
|
||||
fn as_read(&mut self) -> &mut Self::InnerRead {
|
||||
&mut self.file
|
||||
}
|
||||
}
|
||||
|
||||
pub struct WriteableCompressedTmpFile {
|
||||
path: PathBuf,
|
||||
compress_prog: String,
|
||||
child: Child,
|
||||
child_stdin: BufWriter<ChildStdin>,
|
||||
}
|
||||
pub struct ClosedCompressedTmpFile {
|
||||
path: PathBuf,
|
||||
compress_prog: String,
|
||||
}
|
||||
pub struct CompressedTmpMergeInput {
|
||||
path: PathBuf,
|
||||
compress_prog: String,
|
||||
child: Child,
|
||||
child_stdout: ChildStdout,
|
||||
}
|
||||
impl WriteableTmpFile for WriteableCompressedTmpFile {
|
||||
type Closed = ClosedCompressedTmpFile;
|
||||
type InnerWrite = BufWriter<ChildStdin>;
|
||||
|
||||
fn create(path: PathBuf, compress_prog: Option<&str>) -> Self {
|
||||
let compress_prog = compress_prog.unwrap();
|
||||
let mut command = Command::new(compress_prog);
|
||||
command
|
||||
.stdin(Stdio::piped())
|
||||
.stdout(File::create(&path).unwrap());
|
||||
let mut child = crash_if_err!(
|
||||
2,
|
||||
command.spawn().map_err(|err| format!(
|
||||
"couldn't execute compress program: errno {}",
|
||||
err.raw_os_error().unwrap()
|
||||
))
|
||||
);
|
||||
let child_stdin = child.stdin.take().unwrap();
|
||||
WriteableCompressedTmpFile {
|
||||
path,
|
||||
compress_prog: compress_prog.to_owned(),
|
||||
child,
|
||||
child_stdin: BufWriter::new(child_stdin),
|
||||
}
|
||||
}
|
||||
|
||||
fn finished_writing(self) -> Self::Closed {
|
||||
drop(self.child_stdin);
|
||||
assert_child_success(self.child, &self.compress_prog);
|
||||
ClosedCompressedTmpFile {
|
||||
path: self.path,
|
||||
compress_prog: self.compress_prog,
|
||||
}
|
||||
}
|
||||
|
||||
fn as_write(&mut self) -> &mut Self::InnerWrite {
|
||||
&mut self.child_stdin
|
||||
}
|
||||
}
|
||||
impl ClosedTmpFile for ClosedCompressedTmpFile {
|
||||
type Reopened = CompressedTmpMergeInput;
|
||||
|
||||
fn reopen(self) -> Self::Reopened {
|
||||
let mut command = Command::new(&self.compress_prog);
|
||||
let file = File::open(&self.path).unwrap();
|
||||
command.stdin(file).stdout(Stdio::piped()).arg("-d");
|
||||
let mut child = crash_if_err!(
|
||||
2,
|
||||
command.spawn().map_err(|err| format!(
|
||||
"couldn't execute compress program: errno {}",
|
||||
err.raw_os_error().unwrap()
|
||||
))
|
||||
);
|
||||
let child_stdout = child.stdout.take().unwrap();
|
||||
CompressedTmpMergeInput {
|
||||
path: self.path,
|
||||
compress_prog: self.compress_prog,
|
||||
child,
|
||||
child_stdout,
|
||||
}
|
||||
}
|
||||
}
|
||||
impl MergeInput for CompressedTmpMergeInput {
|
||||
type InnerRead = ChildStdout;
|
||||
|
||||
fn finished_reading(self) {
|
||||
drop(self.child_stdout);
|
||||
assert_child_success(self.child, &self.compress_prog);
|
||||
fs::remove_file(self.path).ok();
|
||||
}
|
||||
|
||||
fn as_read(&mut self) -> &mut Self::InnerRead {
|
||||
&mut self.child_stdout
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PlainMergeInput<R: Read + Send> {
|
||||
inner: R,
|
||||
}
|
||||
impl<R: Read + Send> MergeInput for PlainMergeInput<R> {
|
||||
type InnerRead = R;
|
||||
fn finished_reading(self) {}
|
||||
fn as_read(&mut self) -> &mut Self::InnerRead {
|
||||
&mut self.inner
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1313,7 +1313,7 @@ fn output_sorted_lines<'a>(iter: impl Iterator<Item = &'a Line<'a>>, settings: &
|
|||
|
||||
fn exec(files: &[String], settings: &GlobalSettings) -> i32 {
|
||||
if settings.merge {
|
||||
let mut file_merger = merge::merge_with_file_limit(files.iter().map(open), settings);
|
||||
let mut file_merger = merge::merge(files.iter().map(open), settings);
|
||||
file_merger.write_all(settings);
|
||||
} else if settings.check {
|
||||
if files.len() > 1 {
|
||||
|
|
Loading…
Reference in a new issue