mirror of
https://github.com/uutils/coreutils
synced 2024-11-17 02:08:09 +00:00
sort: separate additional data from the Line struct
Data that was previously boxed inside the `Line` struct was moved to separate vectors. Inside of each `Line` remains only an index that allows to access that data. This helps with keeping the `Line` struct small and therefore reduces memory usage in most cases. Additionally, this improves performance because one big allocation (the vectors) are faster than many small ones (many boxes inside of each `Line`). Those vectors can be reused as well, reducing the amount of (de-)allocations.
This commit is contained in:
parent
e48ff9dd9e
commit
4a956f38b9
5 changed files with 347 additions and 216 deletions
|
@ -8,7 +8,7 @@
|
|||
//! Check if a file is ordered
|
||||
|
||||
use crate::{
|
||||
chunks::{self, Chunk},
|
||||
chunks::{self, Chunk, RecycledChunk},
|
||||
compare_by, open, GlobalSettings,
|
||||
};
|
||||
use itertools::Itertools;
|
||||
|
@ -34,7 +34,7 @@ pub fn check(path: &str, settings: &GlobalSettings) -> i32 {
|
|||
move || reader(file, recycled_receiver, loaded_sender, &settings)
|
||||
});
|
||||
for _ in 0..2 {
|
||||
let _ = recycled_sender.send(Chunk::new(vec![0; 100 * 1024], |_| Vec::new()));
|
||||
let _ = recycled_sender.send(RecycledChunk::new(100 * 1024));
|
||||
}
|
||||
|
||||
let mut prev_chunk: Option<Chunk> = None;
|
||||
|
@ -44,21 +44,29 @@ pub fn check(path: &str, settings: &GlobalSettings) -> i32 {
|
|||
if let Some(prev_chunk) = prev_chunk.take() {
|
||||
// Check if the first element of the new chunk is greater than the last
|
||||
// element from the previous chunk
|
||||
let prev_last = prev_chunk.borrow_lines().last().unwrap();
|
||||
let new_first = chunk.borrow_lines().first().unwrap();
|
||||
let prev_last = prev_chunk.lines().last().unwrap();
|
||||
let new_first = chunk.lines().first().unwrap();
|
||||
|
||||
if compare_by(prev_last, new_first, settings) == Ordering::Greater {
|
||||
if compare_by(
|
||||
prev_last,
|
||||
new_first,
|
||||
settings,
|
||||
prev_chunk.line_data(),
|
||||
chunk.line_data(),
|
||||
) == Ordering::Greater
|
||||
{
|
||||
if !settings.check_silent {
|
||||
println!("sort: {}:{}: disorder: {}", path, line_idx, new_first.line);
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
let _ = recycled_sender.send(prev_chunk);
|
||||
let _ = recycled_sender.send(prev_chunk.recycle());
|
||||
}
|
||||
|
||||
for (a, b) in chunk.borrow_lines().iter().tuple_windows() {
|
||||
for (a, b) in chunk.lines().iter().tuple_windows() {
|
||||
line_idx += 1;
|
||||
if compare_by(a, b, settings) == Ordering::Greater {
|
||||
if compare_by(a, b, settings, chunk.line_data(), chunk.line_data()) == Ordering::Greater
|
||||
{
|
||||
if !settings.check_silent {
|
||||
println!("sort: {}:{}: disorder: {}", path, line_idx, b.line);
|
||||
}
|
||||
|
@ -74,16 +82,15 @@ pub fn check(path: &str, settings: &GlobalSettings) -> i32 {
|
|||
/// The function running on the reader thread.
|
||||
fn reader(
|
||||
mut file: Box<dyn Read + Send>,
|
||||
receiver: Receiver<Chunk>,
|
||||
receiver: Receiver<RecycledChunk>,
|
||||
sender: SyncSender<Chunk>,
|
||||
settings: &GlobalSettings,
|
||||
) {
|
||||
let mut carry_over = vec![];
|
||||
for chunk in receiver.iter() {
|
||||
let (recycled_lines, recycled_buffer) = chunk.recycle();
|
||||
for recycled_chunk in receiver.iter() {
|
||||
let should_continue = chunks::read(
|
||||
&sender,
|
||||
recycled_buffer,
|
||||
recycled_chunk,
|
||||
None,
|
||||
&mut carry_over,
|
||||
&mut file,
|
||||
|
@ -93,7 +100,6 @@ fn reader(
|
|||
} else {
|
||||
b'\n'
|
||||
},
|
||||
recycled_lines,
|
||||
settings,
|
||||
);
|
||||
if !should_continue {
|
||||
|
|
|
@ -15,7 +15,7 @@ use std::{
|
|||
use memchr::memchr_iter;
|
||||
use ouroboros::self_referencing;
|
||||
|
||||
use crate::{GlobalSettings, Line};
|
||||
use crate::{numeric_str_cmp::NumInfo, GeneralF64ParseResult, GlobalSettings, Line};
|
||||
|
||||
/// The chunk that is passed around between threads.
|
||||
/// `lines` consist of slices into `buffer`.
|
||||
|
@ -25,28 +25,87 @@ pub struct Chunk {
|
|||
pub buffer: Vec<u8>,
|
||||
#[borrows(buffer)]
|
||||
#[covariant]
|
||||
pub lines: Vec<Line<'this>>,
|
||||
pub contents: ChunkContents<'this>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ChunkContents<'a> {
|
||||
pub lines: Vec<Line<'a>>,
|
||||
pub line_data: LineData<'a>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct LineData<'a> {
|
||||
pub selections: Vec<&'a str>,
|
||||
pub num_infos: Vec<NumInfo>,
|
||||
pub parsed_floats: Vec<GeneralF64ParseResult>,
|
||||
}
|
||||
|
||||
impl Chunk {
|
||||
/// Destroy this chunk and return its components to be reused.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// * The `lines` vector, emptied
|
||||
/// * The `buffer` vector, **not** emptied
|
||||
pub fn recycle(mut self) -> (Vec<Line<'static>>, Vec<u8>) {
|
||||
let recycled_lines = self.with_lines_mut(|lines| {
|
||||
lines.clear();
|
||||
unsafe {
|
||||
pub fn recycle(mut self) -> RecycledChunk {
|
||||
let recycled_contents = self.with_contents_mut(|contents| {
|
||||
contents.lines.clear();
|
||||
contents.line_data.selections.clear();
|
||||
contents.line_data.num_infos.clear();
|
||||
contents.line_data.parsed_floats.clear();
|
||||
let lines = unsafe {
|
||||
// SAFETY: It is safe to (temporarily) transmute to a vector of lines with a longer lifetime,
|
||||
// because the vector is empty.
|
||||
// Transmuting is necessary to make recycling possible. See https://github.com/rust-lang/rfcs/pull/2802
|
||||
// for a rfc to make this unnecessary. Its example is similar to the code here.
|
||||
std::mem::transmute::<Vec<Line<'_>>, Vec<Line<'static>>>(std::mem::take(lines))
|
||||
}
|
||||
std::mem::transmute::<Vec<Line<'_>>, Vec<Line<'static>>>(std::mem::take(
|
||||
&mut contents.lines,
|
||||
))
|
||||
};
|
||||
let selections = unsafe {
|
||||
// SAFETY: (same as above) It is safe to (temporarily) transmute to a vector of &str with a longer lifetime,
|
||||
// because the vector is empty.
|
||||
std::mem::transmute::<Vec<&'_ str>, Vec<&'static str>>(std::mem::take(
|
||||
&mut contents.line_data.selections,
|
||||
))
|
||||
};
|
||||
(
|
||||
lines,
|
||||
selections,
|
||||
std::mem::take(&mut contents.line_data.num_infos),
|
||||
std::mem::take(&mut contents.line_data.parsed_floats),
|
||||
)
|
||||
});
|
||||
(recycled_lines, self.into_heads().buffer)
|
||||
RecycledChunk {
|
||||
lines: recycled_contents.0,
|
||||
selections: recycled_contents.1,
|
||||
num_infos: recycled_contents.2,
|
||||
parsed_floats: recycled_contents.3,
|
||||
buffer: self.into_heads().buffer,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn lines(&self) -> &Vec<Line> {
|
||||
&self.borrow_contents().lines
|
||||
}
|
||||
pub fn line_data(&self) -> &LineData {
|
||||
&self.borrow_contents().line_data
|
||||
}
|
||||
}
|
||||
|
||||
pub struct RecycledChunk {
|
||||
lines: Vec<Line<'static>>,
|
||||
selections: Vec<&'static str>,
|
||||
num_infos: Vec<NumInfo>,
|
||||
parsed_floats: Vec<GeneralF64ParseResult>,
|
||||
buffer: Vec<u8>,
|
||||
}
|
||||
|
||||
impl RecycledChunk {
|
||||
pub fn new(capacity: usize) -> Self {
|
||||
RecycledChunk {
|
||||
lines: Vec::new(),
|
||||
selections: Vec::new(),
|
||||
num_infos: Vec::new(),
|
||||
parsed_floats: Vec::new(),
|
||||
buffer: vec![0; capacity],
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -63,28 +122,32 @@ impl Chunk {
|
|||
/// (see also `read_to_chunk` for a more detailed documentation)
|
||||
///
|
||||
/// * `sender`: The sender to send the lines to the sorter.
|
||||
/// * `buffer`: The recycled buffer. All contents will be overwritten, but it must already be filled.
|
||||
/// * `recycled_chunk`: The recycled chunk, as returned by `Chunk::recycle`.
|
||||
/// (i.e. `buffer.len()` should be equal to `buffer.capacity()`)
|
||||
/// * `max_buffer_size`: How big `buffer` can be.
|
||||
/// * `carry_over`: The bytes that must be carried over in between invocations.
|
||||
/// * `file`: The current file.
|
||||
/// * `next_files`: What `file` should be updated to next.
|
||||
/// * `separator`: The line separator.
|
||||
/// * `lines`: The recycled vector to fill with lines. Must be empty.
|
||||
/// * `settings`: The global settings.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn read<T: Read>(
|
||||
sender: &SyncSender<Chunk>,
|
||||
mut buffer: Vec<u8>,
|
||||
recycled_chunk: RecycledChunk,
|
||||
max_buffer_size: Option<usize>,
|
||||
carry_over: &mut Vec<u8>,
|
||||
file: &mut T,
|
||||
next_files: &mut impl Iterator<Item = T>,
|
||||
separator: u8,
|
||||
lines: Vec<Line<'static>>,
|
||||
settings: &GlobalSettings,
|
||||
) -> bool {
|
||||
assert!(lines.is_empty());
|
||||
let RecycledChunk {
|
||||
lines,
|
||||
selections,
|
||||
num_infos,
|
||||
parsed_floats,
|
||||
mut buffer,
|
||||
} = recycled_chunk;
|
||||
if buffer.len() < carry_over.len() {
|
||||
buffer.resize(carry_over.len() + 10 * 1024, 0);
|
||||
}
|
||||
|
@ -101,15 +164,25 @@ pub fn read<T: Read>(
|
|||
carry_over.extend_from_slice(&buffer[read..]);
|
||||
|
||||
if read != 0 {
|
||||
let payload = Chunk::new(buffer, |buf| {
|
||||
let payload = Chunk::new(buffer, |buffer| {
|
||||
let selections = unsafe {
|
||||
// SAFETY: It is safe to transmute to an empty vector of selections with shorter lifetime.
|
||||
// It was only temporarily transmuted to a Vec<Line<'static>> to make recycling possible.
|
||||
std::mem::transmute::<Vec<&'static str>, Vec<&'_ str>>(selections)
|
||||
};
|
||||
let mut lines = unsafe {
|
||||
// SAFETY: It is safe to transmute to a vector of lines with shorter lifetime,
|
||||
// SAFETY: (same as above) It is safe to transmute to a vector of lines with shorter lifetime,
|
||||
// because it was only temporarily transmuted to a Vec<Line<'static>> to make recycling possible.
|
||||
std::mem::transmute::<Vec<Line<'static>>, Vec<Line<'_>>>(lines)
|
||||
};
|
||||
let read = crash_if_err!(1, std::str::from_utf8(&buf[..read]));
|
||||
parse_lines(read, &mut lines, separator, settings);
|
||||
lines
|
||||
let read = crash_if_err!(1, std::str::from_utf8(&buffer[..read]));
|
||||
let mut line_data = LineData {
|
||||
selections,
|
||||
num_infos,
|
||||
parsed_floats,
|
||||
};
|
||||
parse_lines(read, &mut lines, &mut line_data, separator, settings);
|
||||
ChunkContents { lines, line_data }
|
||||
});
|
||||
sender.send(payload).unwrap();
|
||||
}
|
||||
|
@ -120,6 +193,7 @@ pub fn read<T: Read>(
|
|||
fn parse_lines<'a>(
|
||||
mut read: &'a str,
|
||||
lines: &mut Vec<Line<'a>>,
|
||||
line_data: &mut LineData<'a>,
|
||||
separator: u8,
|
||||
settings: &GlobalSettings,
|
||||
) {
|
||||
|
@ -128,9 +202,15 @@ fn parse_lines<'a>(
|
|||
read = &read[..read.len() - 1];
|
||||
}
|
||||
|
||||
assert!(lines.is_empty());
|
||||
assert!(line_data.selections.is_empty());
|
||||
assert!(line_data.num_infos.is_empty());
|
||||
assert!(line_data.parsed_floats.is_empty());
|
||||
let mut token_buffer = vec![];
|
||||
lines.extend(
|
||||
read.split(separator as char)
|
||||
.map(|line| Line::create(line, settings)),
|
||||
.enumerate()
|
||||
.map(|(index, line)| Line::create(line, index, line_data, &mut token_buffer, settings)),
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -23,15 +23,16 @@ use std::{
|
|||
|
||||
use itertools::Itertools;
|
||||
|
||||
use crate::chunks::RecycledChunk;
|
||||
use crate::merge::ClosedTmpFile;
|
||||
use crate::merge::WriteableCompressedTmpFile;
|
||||
use crate::merge::WriteablePlainTmpFile;
|
||||
use crate::merge::WriteableTmpFile;
|
||||
use crate::Line;
|
||||
use crate::{
|
||||
chunks::{self, Chunk},
|
||||
compare_by, merge, output_sorted_lines, sort_by, GlobalSettings,
|
||||
compare_by, merge, sort_by, GlobalSettings,
|
||||
};
|
||||
use crate::{print_sorted, Line};
|
||||
use tempfile::TempDir;
|
||||
|
||||
const START_BUFFER_SIZE: usize = 8_000;
|
||||
|
@ -98,16 +99,39 @@ fn reader_writer<F: Iterator<Item = Box<dyn Read + Send>>, Tmp: WriteableTmpFile
|
|||
merger.write_all(settings);
|
||||
}
|
||||
ReadResult::SortedSingleChunk(chunk) => {
|
||||
output_sorted_lines(chunk.borrow_lines().iter(), settings);
|
||||
if settings.unique {
|
||||
print_sorted(
|
||||
chunk.lines().iter().dedup_by(|a, b| {
|
||||
compare_by(a, b, settings, chunk.line_data(), chunk.line_data())
|
||||
== Ordering::Equal
|
||||
}),
|
||||
settings,
|
||||
);
|
||||
} else {
|
||||
print_sorted(chunk.lines().iter(), settings);
|
||||
}
|
||||
}
|
||||
ReadResult::SortedTwoChunks([a, b]) => {
|
||||
let merged_iter = a
|
||||
.borrow_lines()
|
||||
.iter()
|
||||
.merge_by(b.borrow_lines().iter(), |line_a, line_b| {
|
||||
compare_by(line_a, line_b, settings) != Ordering::Greater
|
||||
});
|
||||
output_sorted_lines(merged_iter, settings);
|
||||
let merged_iter = a.lines().iter().map(|line| (line, &a)).merge_by(
|
||||
b.lines().iter().map(|line| (line, &b)),
|
||||
|(line_a, a), (line_b, b)| {
|
||||
compare_by(line_a, line_b, settings, a.line_data(), b.line_data())
|
||||
!= Ordering::Greater
|
||||
},
|
||||
);
|
||||
if settings.unique {
|
||||
print_sorted(
|
||||
merged_iter
|
||||
.dedup_by(|(line_a, a), (line_b, b)| {
|
||||
compare_by(line_a, line_b, settings, a.line_data(), b.line_data())
|
||||
== Ordering::Equal
|
||||
})
|
||||
.map(|(line, _)| line),
|
||||
settings,
|
||||
);
|
||||
} else {
|
||||
print_sorted(merged_iter.map(|(line, _)| line), settings);
|
||||
}
|
||||
}
|
||||
ReadResult::EmptyInput => {
|
||||
// don't output anything
|
||||
|
@ -118,7 +142,9 @@ fn reader_writer<F: Iterator<Item = Box<dyn Read + Send>>, Tmp: WriteableTmpFile
|
|||
/// The function that is executed on the sorter thread.
|
||||
fn sorter(receiver: Receiver<Chunk>, sender: SyncSender<Chunk>, settings: GlobalSettings) {
|
||||
while let Ok(mut payload) = receiver.recv() {
|
||||
payload.with_lines_mut(|lines| sort_by(lines, &settings));
|
||||
payload.with_contents_mut(|contents| {
|
||||
sort_by(&mut contents.lines, &settings, &contents.line_data)
|
||||
});
|
||||
sender.send(payload).unwrap();
|
||||
}
|
||||
}
|
||||
|
@ -154,20 +180,16 @@ fn read_write_loop<I: WriteableTmpFile>(
|
|||
for _ in 0..2 {
|
||||
let should_continue = chunks::read(
|
||||
&sender,
|
||||
vec![
|
||||
0;
|
||||
if START_BUFFER_SIZE < buffer_size {
|
||||
START_BUFFER_SIZE
|
||||
} else {
|
||||
buffer_size
|
||||
}
|
||||
],
|
||||
RecycledChunk::new(if START_BUFFER_SIZE < buffer_size {
|
||||
START_BUFFER_SIZE
|
||||
} else {
|
||||
buffer_size
|
||||
}),
|
||||
Some(buffer_size),
|
||||
&mut carry_over,
|
||||
&mut file,
|
||||
&mut files,
|
||||
separator,
|
||||
Vec::new(),
|
||||
settings,
|
||||
);
|
||||
|
||||
|
@ -216,18 +238,17 @@ fn read_write_loop<I: WriteableTmpFile>(
|
|||
|
||||
file_number += 1;
|
||||
|
||||
let (recycled_lines, recycled_buffer) = chunk.recycle();
|
||||
let recycled_chunk = chunk.recycle();
|
||||
|
||||
if let Some(sender) = &sender_option {
|
||||
let should_continue = chunks::read(
|
||||
sender,
|
||||
recycled_buffer,
|
||||
recycled_chunk,
|
||||
None,
|
||||
&mut carry_over,
|
||||
&mut file,
|
||||
&mut files,
|
||||
separator,
|
||||
recycled_lines,
|
||||
settings,
|
||||
);
|
||||
if !should_continue {
|
||||
|
@ -245,12 +266,9 @@ fn write<I: WriteableTmpFile>(
|
|||
compress_prog: Option<&str>,
|
||||
separator: u8,
|
||||
) -> I::Closed {
|
||||
chunk.with_lines_mut(|lines| {
|
||||
// Write the lines to the file
|
||||
let mut tmp_file = I::create(file, compress_prog);
|
||||
write_lines(lines, tmp_file.as_write(), separator);
|
||||
tmp_file.finished_writing()
|
||||
})
|
||||
let mut tmp_file = I::create(file, compress_prog);
|
||||
write_lines(chunk.lines(), tmp_file.as_write(), separator);
|
||||
tmp_file.finished_writing()
|
||||
}
|
||||
|
||||
fn write_lines<'a, T: Write>(lines: &[Line<'a>], writer: &mut T, separator: u8) {
|
||||
|
|
|
@ -24,7 +24,7 @@ use itertools::Itertools;
|
|||
use tempfile::TempDir;
|
||||
|
||||
use crate::{
|
||||
chunks::{self, Chunk},
|
||||
chunks::{self, Chunk, RecycledChunk},
|
||||
compare_by, GlobalSettings,
|
||||
};
|
||||
|
||||
|
@ -125,14 +125,14 @@ fn merge_without_limit<M: MergeInput + 'static, F: Iterator<Item = M>>(
|
|||
}));
|
||||
// Send the initial chunk to trigger a read for each file
|
||||
request_sender
|
||||
.send((file_number, Chunk::new(vec![0; 8 * 1024], |_| Vec::new())))
|
||||
.send((file_number, RecycledChunk::new(8 * 1024)))
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
// Send the second chunk for each file
|
||||
for file_number in 0..reader_files.len() {
|
||||
request_sender
|
||||
.send((file_number, Chunk::new(vec![0; 8 * 1024], |_| Vec::new())))
|
||||
.send((file_number, RecycledChunk::new(8 * 1024)))
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
|
@ -181,13 +181,12 @@ struct ReaderFile<M: MergeInput> {
|
|||
|
||||
/// The function running on the reader thread.
|
||||
fn reader(
|
||||
recycled_receiver: Receiver<(usize, Chunk)>,
|
||||
recycled_receiver: Receiver<(usize, RecycledChunk)>,
|
||||
files: &mut [Option<ReaderFile<impl MergeInput>>],
|
||||
settings: &GlobalSettings,
|
||||
separator: u8,
|
||||
) {
|
||||
for (file_idx, chunk) in recycled_receiver.iter() {
|
||||
let (recycled_lines, recycled_buffer) = chunk.recycle();
|
||||
for (file_idx, recycled_chunk) in recycled_receiver.iter() {
|
||||
if let Some(ReaderFile {
|
||||
file,
|
||||
sender,
|
||||
|
@ -196,13 +195,12 @@ fn reader(
|
|||
{
|
||||
let should_continue = chunks::read(
|
||||
sender,
|
||||
recycled_buffer,
|
||||
recycled_chunk,
|
||||
None,
|
||||
carry_over,
|
||||
file.as_read(),
|
||||
&mut iter::empty(),
|
||||
separator,
|
||||
recycled_lines,
|
||||
settings,
|
||||
);
|
||||
if !should_continue {
|
||||
|
@ -234,7 +232,7 @@ struct PreviousLine {
|
|||
/// Merges files together. This is **not** an iterator because of lifetime problems.
|
||||
pub struct FileMerger<'a> {
|
||||
heap: binary_heap_plus::BinaryHeap<MergeableFile, FileComparator<'a>>,
|
||||
request_sender: Sender<(usize, Chunk)>,
|
||||
request_sender: Sender<(usize, RecycledChunk)>,
|
||||
prev: Option<PreviousLine>,
|
||||
}
|
||||
|
||||
|
@ -257,14 +255,16 @@ impl<'a> FileMerger<'a> {
|
|||
file_number: file.file_number,
|
||||
});
|
||||
|
||||
file.current_chunk.with_lines(|lines| {
|
||||
let current_line = &lines[file.line_idx];
|
||||
file.current_chunk.with_contents(|contents| {
|
||||
let current_line = &contents.lines[file.line_idx];
|
||||
if settings.unique {
|
||||
if let Some(prev) = &prev {
|
||||
let cmp = compare_by(
|
||||
&prev.chunk.borrow_lines()[prev.line_idx],
|
||||
&prev.chunk.lines()[prev.line_idx],
|
||||
current_line,
|
||||
settings,
|
||||
prev.chunk.line_data(),
|
||||
file.current_chunk.line_data(),
|
||||
);
|
||||
if cmp == Ordering::Equal {
|
||||
return;
|
||||
|
@ -274,8 +274,7 @@ impl<'a> FileMerger<'a> {
|
|||
current_line.print(out, settings);
|
||||
});
|
||||
|
||||
let was_last_line_for_file =
|
||||
file.current_chunk.borrow_lines().len() == file.line_idx + 1;
|
||||
let was_last_line_for_file = file.current_chunk.lines().len() == file.line_idx + 1;
|
||||
|
||||
if was_last_line_for_file {
|
||||
if let Ok(next_chunk) = file.receiver.recv() {
|
||||
|
@ -295,7 +294,7 @@ impl<'a> FileMerger<'a> {
|
|||
// If nothing is referencing the previous chunk anymore, this means that the previous line
|
||||
// was the last line of the chunk. We can recycle the chunk.
|
||||
self.request_sender
|
||||
.send((prev.file_number, prev_chunk))
|
||||
.send((prev.file_number, prev_chunk.recycle()))
|
||||
.ok();
|
||||
}
|
||||
}
|
||||
|
@ -312,9 +311,11 @@ struct FileComparator<'a> {
|
|||
impl<'a> Compare<MergeableFile> for FileComparator<'a> {
|
||||
fn compare(&self, a: &MergeableFile, b: &MergeableFile) -> Ordering {
|
||||
let mut cmp = compare_by(
|
||||
&a.current_chunk.borrow_lines()[a.line_idx],
|
||||
&b.current_chunk.borrow_lines()[b.line_idx],
|
||||
&a.current_chunk.lines()[a.line_idx],
|
||||
&b.current_chunk.lines()[b.line_idx],
|
||||
self.settings,
|
||||
a.current_chunk.line_data(),
|
||||
b.current_chunk.line_data(),
|
||||
);
|
||||
if cmp == Ordering::Equal {
|
||||
// To make sorting stable, we need to consider the file number as well,
|
||||
|
|
|
@ -23,11 +23,11 @@ mod ext_sort;
|
|||
mod merge;
|
||||
mod numeric_str_cmp;
|
||||
|
||||
use chunks::LineData;
|
||||
use clap::{crate_version, App, Arg};
|
||||
use custom_str_cmp::custom_str_cmp;
|
||||
use ext_sort::ext_sort;
|
||||
use fnv::FnvHasher;
|
||||
use itertools::Itertools;
|
||||
use numeric_str_cmp::{numeric_str_cmp, NumInfo, NumInfoParseSettings};
|
||||
use rand::distributions::Alphanumeric;
|
||||
use rand::{thread_rng, Rng};
|
||||
|
@ -170,6 +170,17 @@ pub struct GlobalSettings {
|
|||
tmp_dir: PathBuf,
|
||||
compress_prog: Option<String>,
|
||||
merge_batch_size: usize,
|
||||
precomputed: Precomputed,
|
||||
}
|
||||
|
||||
/// Data needed for sorting. Should be computed once before starting to sort
|
||||
/// by calling `GlobalSettings::init_precomputed`.
|
||||
#[derive(Clone, Debug)]
|
||||
struct Precomputed {
|
||||
needs_tokens: bool,
|
||||
num_infos_per_line: usize,
|
||||
floats_per_line: usize,
|
||||
selections_per_line: usize,
|
||||
}
|
||||
|
||||
impl GlobalSettings {
|
||||
|
@ -210,6 +221,28 @@ impl GlobalSettings {
|
|||
None => BufWriter::new(Box::new(stdout()) as Box<dyn Write>),
|
||||
}
|
||||
}
|
||||
|
||||
/// Precompute some data needed for sorting.
|
||||
/// This function **must** be called before starting to sort, and `GlobalSettings` may not be altered
|
||||
/// afterwards.
|
||||
fn init_precomputed(&mut self) {
|
||||
self.precomputed.needs_tokens = self.selectors.iter().any(|s| s.needs_tokens);
|
||||
self.precomputed.selections_per_line = self
|
||||
.selectors
|
||||
.iter()
|
||||
.filter(|s| !s.is_default_selection)
|
||||
.count();
|
||||
self.precomputed.num_infos_per_line = self
|
||||
.selectors
|
||||
.iter()
|
||||
.filter(|s| matches!(s.settings.mode, SortMode::Numeric | SortMode::HumanNumeric))
|
||||
.count();
|
||||
self.precomputed.floats_per_line = self
|
||||
.selectors
|
||||
.iter()
|
||||
.filter(|s| matches!(s.settings.mode, SortMode::GeneralNumeric))
|
||||
.count();
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for GlobalSettings {
|
||||
|
@ -237,9 +270,16 @@ impl Default for GlobalSettings {
|
|||
tmp_dir: PathBuf::new(),
|
||||
compress_prog: None,
|
||||
merge_batch_size: 32,
|
||||
precomputed: Precomputed {
|
||||
num_infos_per_line: 0,
|
||||
floats_per_line: 0,
|
||||
selections_per_line: 0,
|
||||
needs_tokens: false,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq, Debug)]
|
||||
struct KeySettings {
|
||||
mode: SortMode,
|
||||
|
@ -322,32 +362,10 @@ impl Default for KeySettings {
|
|||
Self::from(&GlobalSettings::default())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
enum NumCache {
|
||||
AsF64(GeneralF64ParseResult),
|
||||
WithInfo(NumInfo),
|
||||
}
|
||||
|
||||
impl NumCache {
|
||||
fn as_f64(&self) -> GeneralF64ParseResult {
|
||||
match self {
|
||||
NumCache::AsF64(n) => *n,
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
fn as_num_info(&self) -> &NumInfo {
|
||||
match self {
|
||||
NumCache::WithInfo(n) => n,
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct Selection<'a> {
|
||||
slice: &'a str,
|
||||
num_cache: Option<Box<NumCache>>,
|
||||
None,
|
||||
}
|
||||
|
||||
type Field = Range<usize>;
|
||||
|
@ -355,31 +373,39 @@ type Field = Range<usize>;
|
|||
#[derive(Clone, Debug)]
|
||||
pub struct Line<'a> {
|
||||
line: &'a str,
|
||||
selections: Box<[Selection<'a>]>,
|
||||
index: usize,
|
||||
}
|
||||
|
||||
impl<'a> Line<'a> {
|
||||
fn create(string: &'a str, settings: &GlobalSettings) -> Self {
|
||||
let fields = if settings
|
||||
/// Creates a new `Line`.
|
||||
///
|
||||
/// If additional data is needed for sorting it is added to `line_data`.
|
||||
/// `token_buffer` allows to reuse the allocation for tokens.
|
||||
fn create(
|
||||
line: &'a str,
|
||||
index: usize,
|
||||
line_data: &mut LineData<'a>,
|
||||
token_buffer: &mut Vec<Field>,
|
||||
settings: &GlobalSettings,
|
||||
) -> Self {
|
||||
token_buffer.clear();
|
||||
if settings.precomputed.needs_tokens {
|
||||
tokenize(line, settings.separator, token_buffer);
|
||||
}
|
||||
for (selection, num_cache) in settings
|
||||
.selectors
|
||||
.iter()
|
||||
.any(|selector| selector.needs_tokens)
|
||||
.filter(|selector| !selector.is_default_selection)
|
||||
.map(|selector| selector.get_selection(line, token_buffer))
|
||||
{
|
||||
// Only tokenize if we will need tokens.
|
||||
Some(tokenize(string, settings.separator))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
Line {
|
||||
line: string,
|
||||
selections: settings
|
||||
.selectors
|
||||
.iter()
|
||||
.filter(|selector| !selector.is_default_selection)
|
||||
.map(|selector| selector.get_selection(string, fields.as_deref()))
|
||||
.collect(),
|
||||
line_data.selections.push(selection);
|
||||
match num_cache {
|
||||
NumCache::AsF64(parsed_float) => line_data.parsed_floats.push(parsed_float),
|
||||
NumCache::WithInfo(num_info) => line_data.num_infos.push(num_info),
|
||||
NumCache::None => (),
|
||||
}
|
||||
}
|
||||
Self { line, index }
|
||||
}
|
||||
|
||||
fn print(&self, writer: &mut impl Write, settings: &GlobalSettings) {
|
||||
|
@ -408,7 +434,8 @@ impl<'a> Line<'a> {
|
|||
let line = self.line.replace('\t', ">");
|
||||
writeln!(writer, "{}", line)?;
|
||||
|
||||
let fields = tokenize(self.line, settings.separator);
|
||||
let mut fields = vec![];
|
||||
tokenize(self.line, settings.separator, &mut fields);
|
||||
for selector in settings.selectors.iter() {
|
||||
let mut selection = selector.get_range(self.line, Some(&fields));
|
||||
match selector.settings.mode {
|
||||
|
@ -539,51 +566,51 @@ impl<'a> Line<'a> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Tokenize a line into fields.
|
||||
fn tokenize(line: &str, separator: Option<char>) -> Vec<Field> {
|
||||
/// Tokenize a line into fields. The result is stored into `token_buffer`.
|
||||
fn tokenize(line: &str, separator: Option<char>, token_buffer: &mut Vec<Field>) {
|
||||
assert!(token_buffer.is_empty());
|
||||
if let Some(separator) = separator {
|
||||
tokenize_with_separator(line, separator)
|
||||
tokenize_with_separator(line, separator, token_buffer)
|
||||
} else {
|
||||
tokenize_default(line)
|
||||
tokenize_default(line, token_buffer)
|
||||
}
|
||||
}
|
||||
|
||||
/// By default fields are separated by the first whitespace after non-whitespace.
|
||||
/// Whitespace is included in fields at the start.
|
||||
fn tokenize_default(line: &str) -> Vec<Field> {
|
||||
let mut tokens = vec![0..0];
|
||||
/// The result is stored into `token_buffer`.
|
||||
fn tokenize_default(line: &str, token_buffer: &mut Vec<Field>) {
|
||||
token_buffer.push(0..0);
|
||||
// pretend that there was whitespace in front of the line
|
||||
let mut previous_was_whitespace = true;
|
||||
for (idx, char) in line.char_indices() {
|
||||
if char.is_whitespace() {
|
||||
if !previous_was_whitespace {
|
||||
tokens.last_mut().unwrap().end = idx;
|
||||
tokens.push(idx..0);
|
||||
token_buffer.last_mut().unwrap().end = idx;
|
||||
token_buffer.push(idx..0);
|
||||
}
|
||||
previous_was_whitespace = true;
|
||||
} else {
|
||||
previous_was_whitespace = false;
|
||||
}
|
||||
}
|
||||
tokens.last_mut().unwrap().end = line.len();
|
||||
tokens
|
||||
token_buffer.last_mut().unwrap().end = line.len();
|
||||
}
|
||||
|
||||
/// Split between separators. These separators are not included in fields.
|
||||
fn tokenize_with_separator(line: &str, separator: char) -> Vec<Field> {
|
||||
let mut tokens = vec![];
|
||||
/// The result is stored into `token_buffer`.
|
||||
fn tokenize_with_separator(line: &str, separator: char, token_buffer: &mut Vec<Field>) {
|
||||
let separator_indices =
|
||||
line.char_indices()
|
||||
.filter_map(|(i, c)| if c == separator { Some(i) } else { None });
|
||||
let mut start = 0;
|
||||
for sep_idx in separator_indices {
|
||||
tokens.push(start..sep_idx);
|
||||
token_buffer.push(start..sep_idx);
|
||||
start = sep_idx + 1;
|
||||
}
|
||||
if start < line.len() {
|
||||
tokens.push(start..line.len());
|
||||
token_buffer.push(start..line.len());
|
||||
}
|
||||
tokens
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq, Debug)]
|
||||
|
@ -764,8 +791,14 @@ impl FieldSelector {
|
|||
}
|
||||
|
||||
/// Get the selection that corresponds to this selector for the line.
|
||||
/// If needs_fields returned false, tokens may be None.
|
||||
fn get_selection<'a>(&self, line: &'a str, tokens: Option<&[Field]>) -> Selection<'a> {
|
||||
/// If needs_fields returned false, tokens may be empty.
|
||||
fn get_selection<'a>(&self, line: &'a str, tokens: &[Field]) -> (&'a str, NumCache) {
|
||||
// `get_range` expects `None` when we don't need tokens and would get confused by an empty vector.
|
||||
let tokens = if self.needs_tokens {
|
||||
Some(tokens)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let mut range = &line[self.get_range(line, tokens)];
|
||||
let num_cache = if self.settings.mode == SortMode::Numeric
|
||||
|| self.settings.mode == SortMode::HumanNumeric
|
||||
|
@ -780,24 +813,19 @@ impl FieldSelector {
|
|||
);
|
||||
// Shorten the range to what we need to pass to numeric_str_cmp later.
|
||||
range = &range[num_range];
|
||||
Some(Box::new(NumCache::WithInfo(info)))
|
||||
NumCache::WithInfo(info)
|
||||
} else if self.settings.mode == SortMode::GeneralNumeric {
|
||||
// Parse this number as f64, as this is the requirement for general numeric sorting.
|
||||
Some(Box::new(NumCache::AsF64(general_f64_parse(
|
||||
&range[get_leading_gen(range)],
|
||||
))))
|
||||
NumCache::AsF64(general_f64_parse(&range[get_leading_gen(range)]))
|
||||
} else {
|
||||
// This is not a numeric sort, so we don't need a NumCache.
|
||||
None
|
||||
NumCache::None
|
||||
};
|
||||
Selection {
|
||||
slice: range,
|
||||
num_cache,
|
||||
}
|
||||
(range, num_cache)
|
||||
}
|
||||
|
||||
/// Look up the range in the line that corresponds to this selector.
|
||||
/// If needs_fields returned false, tokens may be None.
|
||||
/// If needs_fields returned false, tokens must be None.
|
||||
fn get_range<'a>(&self, line: &'a str, tokens: Option<&[Field]>) -> Range<usize> {
|
||||
enum Resolution {
|
||||
// The start index of the resolved character, inclusive
|
||||
|
@ -1297,18 +1325,9 @@ pub fn uumain(args: impl uucore::Args) -> i32 {
|
|||
);
|
||||
}
|
||||
|
||||
exec(&files, &settings)
|
||||
}
|
||||
settings.init_precomputed();
|
||||
|
||||
fn output_sorted_lines<'a>(iter: impl Iterator<Item = &'a Line<'a>>, settings: &GlobalSettings) {
|
||||
if settings.unique {
|
||||
print_sorted(
|
||||
iter.dedup_by(|a, b| compare_by(a, b, settings) == Ordering::Equal),
|
||||
settings,
|
||||
);
|
||||
} else {
|
||||
print_sorted(iter, settings);
|
||||
}
|
||||
exec(&files, &settings)
|
||||
}
|
||||
|
||||
fn exec(files: &[String], settings: &GlobalSettings) -> i32 {
|
||||
|
@ -1328,55 +1347,59 @@ fn exec(files: &[String], settings: &GlobalSettings) -> i32 {
|
|||
0
|
||||
}
|
||||
|
||||
fn sort_by<'a>(unsorted: &mut Vec<Line<'a>>, settings: &GlobalSettings) {
|
||||
fn sort_by<'a>(unsorted: &mut Vec<Line<'a>>, settings: &GlobalSettings, line_data: &LineData<'a>) {
|
||||
if settings.stable || settings.unique {
|
||||
unsorted.par_sort_by(|a, b| compare_by(a, b, settings))
|
||||
unsorted.par_sort_by(|a, b| compare_by(a, b, settings, line_data, line_data))
|
||||
} else {
|
||||
unsorted.par_sort_unstable_by(|a, b| compare_by(a, b, settings))
|
||||
unsorted.par_sort_unstable_by(|a, b| compare_by(a, b, settings, line_data, line_data))
|
||||
}
|
||||
}
|
||||
|
||||
fn compare_by<'a>(a: &Line<'a>, b: &Line<'a>, global_settings: &GlobalSettings) -> Ordering {
|
||||
let mut idx = 0;
|
||||
fn compare_by<'a>(
|
||||
a: &Line<'a>,
|
||||
b: &Line<'a>,
|
||||
global_settings: &GlobalSettings,
|
||||
a_line_data: &LineData<'a>,
|
||||
b_line_data: &LineData<'a>,
|
||||
) -> Ordering {
|
||||
let mut selection_index = 0;
|
||||
let mut num_info_index = 0;
|
||||
let mut parsed_float_index = 0;
|
||||
for selector in &global_settings.selectors {
|
||||
let mut _selections = None;
|
||||
let (a_selection, b_selection) = if selector.is_default_selection {
|
||||
let (a_str, b_str) = if selector.is_default_selection {
|
||||
// We can select the whole line.
|
||||
// We have to store the selections outside of the if-block so that they live long enough.
|
||||
_selections = Some((
|
||||
Selection {
|
||||
slice: a.line,
|
||||
num_cache: None,
|
||||
},
|
||||
Selection {
|
||||
slice: b.line,
|
||||
num_cache: None,
|
||||
},
|
||||
));
|
||||
// Unwrap the selections again, and return references to them.
|
||||
(
|
||||
&_selections.as_ref().unwrap().0,
|
||||
&_selections.as_ref().unwrap().1,
|
||||
)
|
||||
(a.line, b.line)
|
||||
} else {
|
||||
let selections = (&a.selections[idx], &b.selections[idx]);
|
||||
idx += 1;
|
||||
let selections = (
|
||||
a_line_data.selections
|
||||
[a.index * global_settings.precomputed.selections_per_line + selection_index],
|
||||
b_line_data.selections
|
||||
[b.index * global_settings.precomputed.selections_per_line + selection_index],
|
||||
);
|
||||
selection_index += 1;
|
||||
selections
|
||||
};
|
||||
let a_str = a_selection.slice;
|
||||
let b_str = b_selection.slice;
|
||||
|
||||
let settings = &selector.settings;
|
||||
|
||||
let cmp: Ordering = match settings.mode {
|
||||
SortMode::Random => random_shuffle(a_str, b_str, &global_settings.salt),
|
||||
SortMode::Numeric | SortMode::HumanNumeric => numeric_str_cmp(
|
||||
(a_str, a_selection.num_cache.as_ref().unwrap().as_num_info()),
|
||||
(b_str, b_selection.num_cache.as_ref().unwrap().as_num_info()),
|
||||
),
|
||||
SortMode::GeneralNumeric => general_numeric_compare(
|
||||
a_selection.num_cache.as_ref().unwrap().as_f64(),
|
||||
b_selection.num_cache.as_ref().unwrap().as_f64(),
|
||||
),
|
||||
SortMode::Numeric | SortMode::HumanNumeric => {
|
||||
let a_num_info = &a_line_data.num_infos
|
||||
[a.index * global_settings.precomputed.num_infos_per_line + num_info_index];
|
||||
let b_num_info = &b_line_data.num_infos
|
||||
[b.index * global_settings.precomputed.num_infos_per_line + num_info_index];
|
||||
num_info_index += 1;
|
||||
numeric_str_cmp((a_str, a_num_info), (b_str, b_num_info))
|
||||
}
|
||||
SortMode::GeneralNumeric => {
|
||||
let a_float = &a_line_data.parsed_floats
|
||||
[a.index * global_settings.precomputed.floats_per_line + parsed_float_index];
|
||||
let b_float = &b_line_data.parsed_floats
|
||||
[b.index * global_settings.precomputed.floats_per_line + parsed_float_index];
|
||||
parsed_float_index += 1;
|
||||
general_numeric_compare(a_float, b_float)
|
||||
}
|
||||
SortMode::Month => month_compare(a_str, b_str),
|
||||
SortMode::Version => version_compare(a_str, b_str),
|
||||
SortMode::Default => custom_str_cmp(
|
||||
|
@ -1470,7 +1493,7 @@ fn get_leading_gen(input: &str) -> Range<usize> {
|
|||
}
|
||||
|
||||
#[derive(Copy, Clone, PartialEq, PartialOrd, Debug)]
|
||||
enum GeneralF64ParseResult {
|
||||
pub enum GeneralF64ParseResult {
|
||||
Invalid,
|
||||
NaN,
|
||||
NegInfinity,
|
||||
|
@ -1497,8 +1520,8 @@ fn general_f64_parse(a: &str) -> GeneralF64ParseResult {
|
|||
/// Compares two floats, with errors and non-numerics assumed to be -inf.
|
||||
/// Stops coercing at the first non-numeric char.
|
||||
/// We explicitly need to convert to f64 in this case.
|
||||
fn general_numeric_compare(a: GeneralF64ParseResult, b: GeneralF64ParseResult) -> Ordering {
|
||||
a.partial_cmp(&b).unwrap()
|
||||
fn general_numeric_compare(a: &GeneralF64ParseResult, b: &GeneralF64ParseResult) -> Ordering {
|
||||
a.partial_cmp(b).unwrap()
|
||||
}
|
||||
|
||||
fn get_rand_string() -> String {
|
||||
|
@ -1646,6 +1669,12 @@ mod tests {
|
|||
|
||||
use super::*;
|
||||
|
||||
fn tokenize_helper(line: &str, separator: Option<char>) -> Vec<Field> {
|
||||
let mut buffer = vec![];
|
||||
tokenize(line, separator, &mut buffer);
|
||||
buffer
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_hash() {
|
||||
let a = "Ted".to_string();
|
||||
|
@ -1689,20 +1718,23 @@ mod tests {
|
|||
#[test]
|
||||
fn test_tokenize_fields() {
|
||||
let line = "foo bar b x";
|
||||
assert_eq!(tokenize(line, None), vec![0..3, 3..7, 7..9, 9..14,],);
|
||||
assert_eq!(tokenize_helper(line, None), vec![0..3, 3..7, 7..9, 9..14,],);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_tokenize_fields_leading_whitespace() {
|
||||
let line = " foo bar b x";
|
||||
assert_eq!(tokenize(line, None), vec![0..7, 7..11, 11..13, 13..18,]);
|
||||
assert_eq!(
|
||||
tokenize_helper(line, None),
|
||||
vec![0..7, 7..11, 11..13, 13..18,]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_tokenize_fields_custom_separator() {
|
||||
let line = "aaa foo bar b x";
|
||||
assert_eq!(
|
||||
tokenize(line, Some('a')),
|
||||
tokenize_helper(line, Some('a')),
|
||||
vec![0..0, 1..1, 2..2, 3..9, 10..18,]
|
||||
);
|
||||
}
|
||||
|
@ -1710,11 +1742,11 @@ mod tests {
|
|||
#[test]
|
||||
fn test_tokenize_fields_trailing_custom_separator() {
|
||||
let line = "a";
|
||||
assert_eq!(tokenize(line, Some('a')), vec![0..0]);
|
||||
assert_eq!(tokenize_helper(line, Some('a')), vec![0..0]);
|
||||
let line = "aa";
|
||||
assert_eq!(tokenize(line, Some('a')), vec![0..0, 1..1]);
|
||||
assert_eq!(tokenize_helper(line, Some('a')), vec![0..0, 1..1]);
|
||||
let line = "..a..a";
|
||||
assert_eq!(tokenize(line, Some('a')), vec![0..2, 3..5]);
|
||||
assert_eq!(tokenize_helper(line, Some('a')), vec![0..2, 3..5]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -1722,13 +1754,7 @@ mod tests {
|
|||
fn test_line_size() {
|
||||
// We should make sure to not regress the size of the Line struct because
|
||||
// it is unconditional overhead for every line we sort.
|
||||
assert_eq!(std::mem::size_of::<Line>(), 32);
|
||||
// These are the fields of Line:
|
||||
assert_eq!(std::mem::size_of::<&str>(), 16);
|
||||
assert_eq!(std::mem::size_of::<Box<[Selection]>>(), 16);
|
||||
|
||||
// How big is a selection? Constant cost all lines pay when we need selections.
|
||||
assert_eq!(std::mem::size_of::<Selection>(), 24);
|
||||
assert_eq!(std::mem::size_of::<Line>(), 24);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
Loading…
Reference in a new issue