Merge pull request #2156 from miDeb/sort-no-json-extsort

sort: don't rely on serde-json for extsort
This commit is contained in:
Sylvestre Ledru 2021-05-05 22:33:18 +02:00 committed by GitHub
commit f83316f36e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 280 additions and 347 deletions

8
Cargo.lock generated
View file

@ -1372,9 +1372,6 @@ name = "serde"
version = "1.0.125" version = "1.0.125"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "558dc50e1a5a5fa7112ca2ce4effcb321b0300c0d4ccf0776a9f60cd89031171" checksum = "558dc50e1a5a5fa7112ca2ce4effcb321b0300c0d4ccf0776a9f60cd89031171"
dependencies = [
"serde_derive",
]
[[package]] [[package]]
name = "serde_cbor" name = "serde_cbor"
@ -1453,9 +1450,6 @@ name = "smallvec"
version = "1.6.1" version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e" checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e"
dependencies = [
"serde",
]
[[package]] [[package]]
name = "strsim" name = "strsim"
@ -2396,8 +2390,6 @@ dependencies = [
"rand 0.7.3", "rand 0.7.3",
"rayon", "rayon",
"semver", "semver",
"serde",
"serde_json",
"smallvec 1.6.1", "smallvec 1.6.1",
"tempdir", "tempdir",
"unicode-width", "unicode-width",

View file

@ -69,6 +69,14 @@ Run `cargo build --release` before benchmarking after you make a change!
- Benchmark numeric sorting with hyperfine: `hyperfine "target/release/coreutils sort shuffled_numbers_si.txt -h -o output.txt"`. - Benchmark numeric sorting with hyperfine: `hyperfine "target/release/coreutils sort shuffled_numbers_si.txt -h -o output.txt"`.
## External sorting
Try running commands with the `-S` option set to an amount of memory to be used, such as `1M`. Additionally, you could try sorting
huge files (ideally multiple Gigabytes) with `-S`. Creating such a large file can be achieved by running `cat shuffled_wordlist.txt | sort -R >> shuffled_wordlist.txt`
multiple times (this will add the contents of `shuffled_wordlist.txt` to itself).
Example: Run `hyperfine './target/release/coreutils sort shuffled_wordlist.txt -S 1M' 'sort shuffled_wordlist.txt -S 1M'`
`
## Stdout and stdin performance ## Stdout and stdin performance
Try to run the above benchmarks by piping the input through stdin (standard input) and redirect the Try to run the above benchmarks by piping the input through stdin (standard input) and redirect the

View file

@ -15,15 +15,13 @@ edition = "2018"
path = "src/sort.rs" path = "src/sort.rs"
[dependencies] [dependencies]
serde_json = { version = "1.0.64", default-features = false, features = ["alloc"] }
serde = { version = "1.0", features = ["derive"] }
rayon = "1.5" rayon = "1.5"
rand = "0.7" rand = "0.7"
clap = "2.33" clap = "2.33"
fnv = "1.0.7" fnv = "1.0.7"
itertools = "0.10.0" itertools = "0.10.0"
semver = "0.9.0" semver = "0.9.0"
smallvec = { version="1.6.1", features=["serde"] } smallvec = "1.6.1"
unicode-width = "0.1.8" unicode-width = "0.1.8"
uucore = { version=">=0.0.8", package="uucore", path="../../uucore", features=["fs"] } uucore = { version=">=0.0.8", package="uucore", path="../../uucore", features=["fs"] }
uucore_procs = { version=">=0.0.5", package="uucore_procs", path="../../uucore_procs" } uucore_procs = { version=">=0.0.5", package="uucore_procs", path="../../uucore_procs" }

View file

@ -1,50 +1,32 @@
use std::clone::Clone; use std::cmp::Ordering;
use std::cmp::Ordering::Less;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::error::Error;
use std::fs::{File, OpenOptions}; use std::fs::{File, OpenOptions};
use std::io::SeekFrom::Start; use std::io::SeekFrom;
use std::io::{BufRead, BufReader, BufWriter, Seek, Write}; use std::io::{BufRead, BufReader, BufWriter, Seek, Write};
use std::marker::PhantomData; use std::path::Path;
use std::path::PathBuf;
use serde::de::DeserializeOwned;
use serde::Serialize;
use serde_json;
use tempdir::TempDir; use tempdir::TempDir;
use super::{GlobalSettings, Line}; use super::{GlobalSettings, Line};
/// Trait for types that can be used by
/// [ExternalSorter](struct.ExternalSorter.html). Must be sortable, cloneable,
/// serializeable, and able to report on it's size
pub trait ExternallySortable: Clone + Serialize + DeserializeOwned {
/// Get the size, in bytes, of this object (used to constrain the buffer
/// used in the external sort).
fn get_size(&self) -> u64;
}
/// Iterator that provides sorted `T`s /// Iterator that provides sorted `T`s
pub struct ExtSortedIterator<Line> { pub struct ExtSortedIterator {
buffers: Vec<VecDeque<Line>>, buffers: Vec<VecDeque<Line>>,
chunk_offsets: Vec<u64>, chunk_offsets: Vec<u64>,
max_per_chunk: u64, max_per_chunk: usize,
chunks: u64, chunks: usize,
tmp_dir: TempDir, tmp_dir: TempDir,
settings: GlobalSettings, settings: GlobalSettings,
failed: bool, failed: bool,
} }
impl Iterator for ExtSortedIterator<Line> impl Iterator for ExtSortedIterator {
where type Item = Line;
Line: ExternallySortable,
{
type Item = Result<Line, Box<dyn Error>>;
/// # Errors /// # Errors
/// ///
/// This method can fail due to issues reading intermediate sorted chunks /// This method can fail due to issues reading intermediate sorted chunks
/// from disk, or due to serde deserialization issues /// from disk
fn next(&mut self) -> Option<Self::Item> { fn next(&mut self) -> Option<Self::Item> {
if self.failed { if self.failed {
return None; return None;
@ -53,29 +35,18 @@ where
let mut empty = true; let mut empty = true;
for chunk_num in 0..self.chunks { for chunk_num in 0..self.chunks {
if self.buffers[chunk_num as usize].is_empty() { if self.buffers[chunk_num as usize].is_empty() {
let mut f = match File::open(self.tmp_dir.path().join(chunk_num.to_string())) { let mut f = crash_if_err!(
Ok(f) => f, 1,
Err(e) => { File::open(self.tmp_dir.path().join(chunk_num.to_string()))
self.failed = true; );
return Some(Err(Box::new(e))); crash_if_err!(1, f.seek(SeekFrom::Start(self.chunk_offsets[chunk_num])));
} let bytes_read = fill_buff(
}; &mut self.buffers[chunk_num as usize],
match f.seek(Start(self.chunk_offsets[chunk_num as usize])) { f,
Ok(_) => (), self.max_per_chunk,
Err(e) => { &self.settings,
self.failed = true; );
return Some(Err(Box::new(e))); self.chunk_offsets[chunk_num as usize] += bytes_read as u64;
}
}
let bytes_read =
match fill_buff(&mut self.buffers[chunk_num as usize], f, self.max_per_chunk) {
Ok(bytes_read) => bytes_read,
Err(e) => {
self.failed = true;
return Some(Err(e));
}
};
self.chunk_offsets[chunk_num as usize] += bytes_read;
if !self.buffers[chunk_num as usize].is_empty() { if !self.buffers[chunk_num as usize].is_empty() {
empty = false; empty = false;
} }
@ -91,52 +62,21 @@ where
// check is_empty() before unwrap()ing // check is_empty() before unwrap()ing
let mut idx = 0; let mut idx = 0;
for chunk_num in 0..self.chunks as usize { for chunk_num in 0..self.chunks as usize {
if !self.buffers[chunk_num].is_empty() { if !self.buffers[chunk_num].is_empty()
if self.buffers[idx].is_empty() && (self.buffers[idx].is_empty()
|| (super::compare_by)( || super::compare_by(
self.buffers[chunk_num].front().unwrap(), self.buffers[chunk_num].front().unwrap(),
self.buffers[idx].front().unwrap(), self.buffers[idx].front().unwrap(),
&self.settings, &self.settings,
) == Less ) == Ordering::Less)
{ {
idx = chunk_num; idx = chunk_num;
} }
} }
}
// unwrap due to checks above // unwrap due to checks above
let r = self.buffers[idx].pop_front().unwrap(); let r = self.buffers[idx].pop_front().unwrap();
Some(Ok(r)) Some(r)
}
}
/// Perform an external sort on an unsorted stream of incoming data
pub struct ExternalSorter<Line>
where
Line: ExternallySortable,
{
tmp_dir: Option<PathBuf>,
buffer_bytes: u64,
phantom: PhantomData<Line>,
settings: GlobalSettings,
}
impl ExternalSorter<Line>
where
Line: ExternallySortable,
{
/// Create a new `ExternalSorter` with a specified memory buffer and
/// temporary directory
pub fn new(
buffer_bytes: u64,
tmp_dir: Option<PathBuf>,
settings: GlobalSettings,
) -> ExternalSorter<Line> {
ExternalSorter {
buffer_bytes,
tmp_dir,
phantom: PhantomData,
settings,
} }
} }
@ -146,81 +86,53 @@ where
/// # Errors /// # Errors
/// ///
/// This method can fail due to issues writing intermediate sorted chunks /// This method can fail due to issues writing intermediate sorted chunks
/// to disk, or due to serde serialization issues /// to disk.
pub fn sort_by<I>( pub fn ext_sort(
&self, unsorted: impl Iterator<Item = Line>,
unsorted: I, settings: &GlobalSettings,
settings: GlobalSettings, ) -> ExtSortedIterator {
) -> Result<ExtSortedIterator<Line>, Box<dyn Error>> let tmp_dir = crash_if_err!(1, TempDir::new_in(&settings.tmp_dir, "uutils_sort"));
where
I: Iterator<Item = Line>,
{
let tmp_dir = match self.tmp_dir {
Some(ref p) => TempDir::new_in(p, "uutils_sort")?,
None => TempDir::new("uutils_sort")?,
};
// creating the thing we need to return first due to the face that we need to
// borrow tmp_dir and move it out
let mut iter = ExtSortedIterator { let mut iter = ExtSortedIterator {
buffers: Vec::new(), buffers: Vec::new(),
chunk_offsets: Vec::new(), chunk_offsets: Vec::new(),
max_per_chunk: 0, max_per_chunk: 0,
chunks: 0, chunks: 0,
tmp_dir, tmp_dir,
settings, settings: settings.clone(),
failed: false, failed: false,
}; };
{
let mut total_read = 0; let mut total_read = 0;
let mut chunk = Vec::new(); let mut chunk = Vec::new();
// Initial buffer is specified by user
let mut adjusted_buffer_size = self.buffer_bytes;
let (iter_size, _) = unsorted.size_hint();
// make the initial chunks on disk // make the initial chunks on disk
for seq in unsorted { for seq in unsorted {
let seq_size = seq.get_size(); let seq_size = seq.estimate_size();
total_read += seq_size; total_read += seq_size;
// GNU minimum is 16 * (sizeof struct + 2), but GNU uses about
// 1/10 the memory that we do. And GNU even says in the code it may
// not work on small buffer sizes.
//
// The following seems to work pretty well, and has about the same max
// RSS as lower minimum values.
//
let minimum_buffer_size: u64 = iter_size as u64 * seq_size / 8;
adjusted_buffer_size =
// Grow buffer size for a struct/Line larger than buffer
if adjusted_buffer_size < seq_size {
seq_size
} else if adjusted_buffer_size < minimum_buffer_size {
minimum_buffer_size
} else {
adjusted_buffer_size
};
chunk.push(seq); chunk.push(seq);
if total_read >= adjusted_buffer_size { if total_read >= settings.buffer_size {
super::sort_by(&mut chunk, &self.settings); super::sort_by(&mut chunk, &settings);
self.write_chunk( write_chunk(
settings,
&iter.tmp_dir.path().join(iter.chunks.to_string()), &iter.tmp_dir.path().join(iter.chunks.to_string()),
&mut chunk, &mut chunk,
)?; );
chunk.clear(); chunk.clear();
total_read = 0; total_read = 0;
iter.chunks += 1; iter.chunks += 1;
} }
} }
// write the last chunk // write the last chunk
if chunk.len() > 0 { if !chunk.is_empty() {
super::sort_by(&mut chunk, &self.settings); super::sort_by(&mut chunk, &settings);
self.write_chunk( write_chunk(
settings,
&iter.tmp_dir.path().join(iter.chunks.to_string()), &iter.tmp_dir.path().join(iter.chunks.to_string()),
&mut chunk, &mut chunk,
)?; );
iter.chunks += 1; iter.chunks += 1;
} }
@ -230,66 +142,70 @@ where
// //
// We will have to have the entire iter in memory sometime right? // We will have to have the entire iter in memory sometime right?
// Set minimum to the size of the writer buffer, ~8K // Set minimum to the size of the writer buffer, ~8K
//
const MINIMUM_READBACK_BUFFER: u64 = 8200; const MINIMUM_READBACK_BUFFER: usize = 8200;
let right_sized_buffer = adjusted_buffer_size let right_sized_buffer = settings
.buffer_size
.checked_div(iter.chunks) .checked_div(iter.chunks)
.unwrap_or(adjusted_buffer_size); .unwrap_or(settings.buffer_size);
iter.max_per_chunk = if right_sized_buffer > MINIMUM_READBACK_BUFFER { iter.max_per_chunk = if right_sized_buffer > MINIMUM_READBACK_BUFFER {
right_sized_buffer right_sized_buffer
} else { } else {
MINIMUM_READBACK_BUFFER MINIMUM_READBACK_BUFFER
}; };
iter.buffers = vec![VecDeque::new(); iter.chunks as usize]; iter.buffers = vec![VecDeque::new(); iter.chunks];
iter.chunk_offsets = vec![0 as u64; iter.chunks as usize]; iter.chunk_offsets = vec![0; iter.chunks];
for chunk_num in 0..iter.chunks { for chunk_num in 0..iter.chunks {
let offset = fill_buff( let offset = fill_buff(
&mut iter.buffers[chunk_num as usize], &mut iter.buffers[chunk_num],
File::open(iter.tmp_dir.path().join(chunk_num.to_string()))?, crash_if_err!(
1,
File::open(iter.tmp_dir.path().join(chunk_num.to_string()))
),
iter.max_per_chunk, iter.max_per_chunk,
)?; &settings,
iter.chunk_offsets[chunk_num as usize] = offset; );
} iter.chunk_offsets[chunk_num] = offset as u64;
} }
Ok(iter) iter
} }
fn write_chunk(&self, file: &PathBuf, chunk: &mut Vec<Line>) -> Result<(), Box<dyn Error>> { fn write_chunk(settings: &GlobalSettings, file: &Path, chunk: &mut Vec<Line>) {
let new_file = OpenOptions::new().create(true).append(true).open(file)?; let new_file = crash_if_err!(1, OpenOptions::new().create(true).append(true).open(file));
let mut buf_write = Box::new(BufWriter::new(new_file)) as Box<dyn Write>; let mut buf_write = BufWriter::new(new_file);
for s in chunk { for s in chunk {
let mut serialized = serde_json::to_string(&s).expect("JSON write error: "); crash_if_err!(1, buf_write.write_all(s.line.as_bytes()));
serialized.push_str("\n"); crash_if_err!(
buf_write.write(serialized.as_bytes())?; 1,
} buf_write.write_all(if settings.zero_terminated { "\0" } else { "\n" }.as_bytes(),)
buf_write.flush()?; );
Ok(())
} }
crash_if_err!(1, buf_write.flush());
} }
fn fill_buff<Line>( fn fill_buff(
vec: &mut VecDeque<Line>, vec: &mut VecDeque<Line>,
file: File, file: File,
max_bytes: u64, max_bytes: usize,
) -> Result<u64, Box<dyn Error>> settings: &GlobalSettings,
where ) -> usize {
Line: ExternallySortable,
{
let mut total_read = 0; let mut total_read = 0;
let mut bytes_read = 0; let mut bytes_read = 0;
for line in BufReader::new(file).lines() { for line in BufReader::new(file).split(if settings.zero_terminated {
let line_s = line?; b'\0'
} else {
b'\n'
}) {
let line_s = String::from_utf8(crash_if_err!(1, line)).unwrap();
bytes_read += line_s.len() + 1; bytes_read += line_s.len() + 1;
// This is where the bad stuff happens usually let deserialized = Line::new(line_s, settings);
let deserialized: Line = serde_json::from_str(&line_s).expect("JSON read error: "); total_read += deserialized.estimate_size();
total_read += deserialized.get_size();
vec.push_back(deserialized); vec.push_back(deserialized);
if total_read > max_bytes { if total_read > max_bytes {
break; break;
} }
} }
Ok(bytes_read as u64) bytes_read
} }

View file

@ -14,21 +14,20 @@
//! More specifically, exponent can be understood so that the original number is in (1..10)*10^exponent. //! More specifically, exponent can be understood so that the original number is in (1..10)*10^exponent.
//! From that follows the constraints of this algorithm: It is able to compare numbers in ±(1*10^[i64::MIN]..10*10^[i64::MAX]). //! From that follows the constraints of this algorithm: It is able to compare numbers in ±(1*10^[i64::MIN]..10*10^[i64::MAX]).
use serde::{Deserialize, Serialize};
use std::{cmp::Ordering, ops::Range}; use std::{cmp::Ordering, ops::Range};
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, Clone)] #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
enum Sign { enum Sign {
Negative, Negative,
Positive, Positive,
} }
#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] #[derive(Debug, PartialEq, Clone)]
pub struct NumInfo { pub struct NumInfo {
exponent: i64, exponent: i64,
sign: Sign, sign: Sign,
} }
#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] #[derive(Debug, PartialEq, Clone)]
pub struct NumInfoParseSettings { pub struct NumInfoParseSettings {
pub accept_si_units: bool, pub accept_si_units: bool,
pub thousands_separator: Option<char>, pub thousands_separator: Option<char>,

View file

@ -20,8 +20,8 @@ mod external_sort;
mod numeric_str_cmp; mod numeric_str_cmp;
use clap::{App, Arg}; use clap::{App, Arg};
use external_sort::ext_sort;
use custom_str_cmp::custom_str_cmp; use custom_str_cmp::custom_str_cmp;
use external_sort::{ExternalSorter, ExternallySortable};
use fnv::FnvHasher; use fnv::FnvHasher;
use itertools::Itertools; use itertools::Itertools;
use numeric_str_cmp::{numeric_str_cmp, NumInfo, NumInfoParseSettings}; use numeric_str_cmp::{numeric_str_cmp, NumInfo, NumInfoParseSettings};
@ -29,14 +29,13 @@ use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
use rayon::prelude::*; use rayon::prelude::*;
use semver::Version; use semver::Version;
use serde::{Deserialize, Serialize};
use smallvec::SmallVec; use smallvec::SmallVec;
use std::cmp::Ordering; use std::cmp::Ordering;
use std::collections::BinaryHeap; use std::collections::BinaryHeap;
use std::env; use std::env;
use std::fs::File; use std::fs::File;
use std::hash::{Hash, Hasher}; use std::hash::{Hash, Hasher};
use std::io::{stdin, stdout, BufRead, BufReader, BufWriter, Lines, Read, Write}; use std::io::{stdin, stdout, BufRead, BufReader, BufWriter, Read, Write};
use std::mem::replace; use std::mem::replace;
use std::ops::Range; use std::ops::Range;
use std::path::Path; use std::path::Path;
@ -106,7 +105,7 @@ enum SortMode {
Default, Default,
} }
#[derive(Clone)] #[derive(Clone)]
struct GlobalSettings { pub struct GlobalSettings {
mode: SortMode, mode: SortMode,
debug: bool, debug: bool,
ignore_blanks: bool, ignore_blanks: bool,
@ -206,7 +205,7 @@ impl From<&GlobalSettings> for KeySettings {
} }
} }
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Clone)]
/// Represents the string selected by a FieldSelector. /// Represents the string selected by a FieldSelector.
struct SelectionRange { struct SelectionRange {
range: Range<usize>, range: Range<usize>,
@ -228,7 +227,7 @@ impl SelectionRange {
} }
} }
#[derive(Serialize, Deserialize, Clone)] #[derive(Clone)]
enum NumCache { enum NumCache {
AsF64(GeneralF64ParseResult), AsF64(GeneralF64ParseResult),
WithInfo(NumInfo), WithInfo(NumInfo),
@ -249,7 +248,8 @@ impl NumCache {
} }
} }
} }
#[derive(Serialize, Deserialize, Clone)]
#[derive(Clone)]
struct Selection { struct Selection {
range: SelectionRange, range: SelectionRange,
num_cache: NumCache, num_cache: NumCache,
@ -264,22 +264,21 @@ impl Selection {
type Field = Range<usize>; type Field = Range<usize>;
#[derive(Serialize, Deserialize, Clone)] #[derive(Clone)]
struct Line { pub struct Line {
line: String, line: String,
// The common case is not to specify fields. Let's make this fast. // The common case is not to specify fields. Let's make this fast.
selections: SmallVec<[Selection; 1]>, selections: SmallVec<[Selection; 1]>,
} }
impl ExternallySortable for Line { impl Line {
fn get_size(&self) -> u64 { pub fn estimate_size(&self) -> usize {
// Currently 96 bytes, but that could change, so we get that size here self.line.capacity()
std::mem::size_of::<Line>() as u64 + self.selections.capacity() * std::mem::size_of::<Selection>()
} + std::mem::size_of::<Self>()
} }
impl Line { pub fn new(line: String, settings: &GlobalSettings) -> Self {
fn new(line: String, settings: &GlobalSettings) -> Self {
let fields = if settings let fields = if settings
.selectors .selectors
.iter() .iter()
@ -291,7 +290,7 @@ impl Line {
None None
}; };
let selections = settings let selections: SmallVec<[Selection; 1]> = settings
.selectors .selectors
.iter() .iter()
.map(|selector| { .map(|selector| {
@ -683,7 +682,7 @@ impl FieldSelector {
} }
struct MergeableFile<'a> { struct MergeableFile<'a> {
lines: Lines<BufReader<Box<dyn Read>>>, lines: Box<dyn Iterator<Item = Line> + 'a>,
current_line: Line, current_line: Line,
settings: &'a GlobalSettings, settings: &'a GlobalSettings,
} }
@ -723,11 +722,11 @@ impl<'a> FileMerger<'a> {
settings, settings,
} }
} }
fn push_file(&mut self, mut lines: Lines<BufReader<Box<dyn Read>>>) { fn push_file(&mut self, mut lines: Box<dyn Iterator<Item = Line> + 'a>) {
if let Some(Ok(next_line)) = lines.next() { if let Some(next_line) = lines.next() {
let mergeable_file = MergeableFile { let mergeable_file = MergeableFile {
lines, lines,
current_line: Line::new(next_line, &self.settings), current_line: next_line,
settings: &self.settings, settings: &self.settings,
}; };
self.heap.push(mergeable_file); self.heap.push(mergeable_file);
@ -741,11 +740,8 @@ impl<'a> Iterator for FileMerger<'a> {
match self.heap.pop() { match self.heap.pop() {
Some(mut current) => { Some(mut current) => {
match current.lines.next() { match current.lines.next() {
Some(Ok(next_line)) => { Some(next_line) => {
let ret = replace( let ret = replace(&mut current.current_line, next_line);
&mut current.current_line,
Line::new(next_line, &self.settings),
);
self.heap.push(current); self.heap.push(current);
Some(ret) Some(ret)
} }
@ -1113,82 +1109,98 @@ pub fn uumain(args: impl uucore::Args) -> i32 {
exec(files, settings) exec(files, settings)
} }
fn exec(files: Vec<String>, settings: GlobalSettings) -> i32 { fn file_to_lines_iter<'a>(
let mut lines = Vec::new(); file: &str,
let mut file_merger = FileMerger::new(&settings); settings: &'a GlobalSettings,
) -> Option<impl Iterator<Item = Line> + 'a> {
for path in &files { let (reader, _) = match open(file) {
let (reader, _) = match open(path) {
Some(x) => x, Some(x) => x,
None => continue, None => return None,
}; };
let buf_reader = BufReader::new(reader); let buf_reader = BufReader::new(reader);
if settings.merge { Some(
file_merger.push_file(buf_reader.lines()); buf_reader
} else if settings.zero_terminated { .split(if settings.zero_terminated {
for line in buf_reader.split(b'\0').flatten() { b'\0'
lines.push(Line::new( } else {
std::str::from_utf8(&line) b'\n'
.expect("Could not parse string from zero terminated input.") })
.to_string(), .map(move |line| {
Line::new(
crash_if_err!(1, String::from_utf8(crash_if_err!(1, line))),
settings,
)
}),
)
}
fn output_sorted_lines(iter: impl Iterator<Item = Line>, settings: &GlobalSettings) {
if settings.unique {
print_sorted(
iter.dedup_by(|a, b| compare_by(a, b, &settings) == Ordering::Equal),
&settings, &settings,
)); );
}
} else { } else {
for line in buf_reader.lines() { print_sorted(iter, &settings);
if let Ok(n) = line {
lines.push(Line::new(n, &settings));
} else {
break;
}
}
} }
} }
fn exec(files: Vec<String>, settings: GlobalSettings) -> i32 {
if settings.merge {
let mut file_merger = FileMerger::new(&settings);
for lines in files
.iter()
.filter_map(|file| file_to_lines_iter(file, &settings))
{
file_merger.push_file(Box::new(lines));
}
output_sorted_lines(file_merger, &settings);
} else {
let lines = files
.iter()
.filter_map(|file| file_to_lines_iter(file, &settings))
.flatten();
if settings.check { if settings.check {
return exec_check_file(&lines, &settings); return exec_check_file(lines, &settings);
} }
// Only use ext_sorter when we need to. // Only use ext_sorter when we need to.
// Probably faster that we don't create // Probably faster that we don't create
// an owned value each run // an owned value each run
if settings.ext_sort { if settings.ext_sort {
lines = ext_sort_by(lines, settings.clone()); let sorted_lines = ext_sort(lines, &settings);
output_sorted_lines(sorted_lines, &settings);
} else { } else {
sort_by(&mut lines, &settings); let mut lines = vec![];
// This is duplicated from fn file_to_lines_iter, but using that function directly results in a performance regression.
for (file, _) in files.iter().map(|file| open(file)).flatten() {
let buf_reader = BufReader::new(file);
for line in buf_reader.split(if settings.zero_terminated {
b'\0'
} else {
b'\n'
}) {
let string = crash_if_err!(1, String::from_utf8(crash_if_err!(1, line)));
lines.push(Line::new(string, &settings));
}
} }
if settings.merge { sort_by(&mut lines, &settings);
if settings.unique { output_sorted_lines(lines.into_iter(), &settings);
print_sorted(
file_merger.dedup_by(|a, b| compare_by(a, b, &settings) == Ordering::Equal),
&settings,
)
} else {
print_sorted(file_merger, &settings)
} }
} else if settings.unique {
print_sorted(
lines
.into_iter()
.dedup_by(|a, b| compare_by(a, b, &settings) == Ordering::Equal),
&settings,
)
} else {
print_sorted(lines.into_iter(), &settings)
} }
0 0
} }
fn exec_check_file(unwrapped_lines: &[Line], settings: &GlobalSettings) -> i32 { fn exec_check_file(unwrapped_lines: impl Iterator<Item = Line>, settings: &GlobalSettings) -> i32 {
// errors yields the line before each disorder, // errors yields the line before each disorder,
// plus the last line (quirk of .coalesce()) // plus the last line (quirk of .coalesce())
let mut errors = let mut errors = unwrapped_lines
unwrapped_lines
.iter()
.enumerate() .enumerate()
.coalesce(|(last_i, last_line), (i, line)| { .coalesce(|(last_i, last_line), (i, line)| {
if compare_by(&last_line, &line, &settings) == Ordering::Greater { if compare_by(&last_line, &line, &settings) == Ordering::Greater {
@ -1215,20 +1227,6 @@ fn exec_check_file(unwrapped_lines: &[Line], settings: &GlobalSettings) -> i32 {
} }
} }
fn ext_sort_by(unsorted: Vec<Line>, settings: GlobalSettings) -> Vec<Line> {
let external_sorter = ExternalSorter::new(
settings.buffer_size as u64,
Some(settings.tmp_dir.clone()),
settings.clone(),
);
let iter = external_sorter
.sort_by(unsorted.into_iter(), settings)
.unwrap()
.map(|x| x.unwrap())
.collect::<Vec<Line>>();
iter
}
fn sort_by(unsorted: &mut Vec<Line>, settings: &GlobalSettings) { fn sort_by(unsorted: &mut Vec<Line>, settings: &GlobalSettings) {
if settings.stable || settings.unique { if settings.stable || settings.unique {
unsorted.par_sort_by(|a, b| compare_by(a, b, &settings)) unsorted.par_sort_by(|a, b| compare_by(a, b, &settings))
@ -1332,7 +1330,7 @@ fn get_leading_gen(input: &str) -> Range<usize> {
leading_whitespace_len..input.len() leading_whitespace_len..input.len()
} }
#[derive(Serialize, Deserialize, Copy, Clone, PartialEq, PartialOrd)] #[derive(Copy, Clone, PartialEq, PartialOrd)]
enum GeneralF64ParseResult { enum GeneralF64ParseResult {
Invalid, Invalid,
NaN, NaN,

View file

@ -37,7 +37,29 @@ fn test_larger_than_specified_segment() {
.arg("50K") .arg("50K")
.arg("ext_sort.txt") .arg("ext_sort.txt")
.succeeds() .succeeds()
.stdout_is_fixture(format!("{}", "ext_sort.expected")); .stdout_is_fixture("ext_sort.expected");
}
#[test]
fn test_smaller_than_specified_segment() {
new_ucmd!()
.arg("-n")
.arg("-S")
.arg("100M")
.arg("ext_sort.txt")
.succeeds()
.stdout_is_fixture("ext_sort.expected");
}
#[test]
fn test_extsort_zero_terminated() {
new_ucmd!()
.arg("-z")
.arg("-S")
.arg("10K")
.arg("zero-terminated.txt")
.succeeds()
.stdout_is_fixture("zero-terminated.expected");
} }
#[test] #[test]