Merge pull request #3092 from jtracey/join-performance

join: improve performance
This commit is contained in:
Sylvestre Ledru 2022-02-10 22:39:12 +01:00 committed by GitHub
commit e818fd2b98
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 333 additions and 107 deletions

1
Cargo.lock generated
View file

@ -2444,6 +2444,7 @@ name = "uu_join"
version = "0.0.12"
dependencies = [
"clap 3.0.10",
"memchr 2.4.1",
"uucore",
]

View file

@ -0,0 +1,55 @@
# Benchmarking join
<!-- spell-checker:ignore (words) CSVs nocheck hotpaths -->
## Performance profile
The amount of time spent in which part of the code can vary depending on the files being joined and the flags used.
A benchmark with `-j` and `-i` shows the following time:
| Function/Method | Fraction of Samples | Why? |
| ---------------- | ------------------- | ---- |
| `Line::new` | 27% | Linear search for field separators, plus some vector operations. |
| `read_until` | 22% | Mostly libc reading file contents, with a few vector operations to represent them. |
| `Input::compare` | 20% | ~2/3 making the keys lowercase, ~1/3 comparing them. |
| `print_fields` | 11% | Writing to and flushing the buffer. |
| Other | 20% | |
| libc | 25% | I/O and memory allocation. |
More detailed profiles can be obtained via [flame graphs](https://github.com/flamegraph-rs/flamegraph):
```
cargo flamegraph --bin join --package uu_join -- file1 file2 > /dev/null
```
You may need to add the following lines to the top-level `Cargo.toml` to get full stack traces:
```
[profile.release]
debug = true
```
## How to benchmark
Benchmarking typically requires files large enough to ensure that the benchmark is not overwhelmed by background system noise; say, on the order of tens of MB.
While `join` operates on line-oriented data, and not properly formatted CSVs (e.g., `join` is not designed to accommodate escaped or quoted delimiters),
in practice many CSV datasets will function well after being sorted.
Like most of the utils, the recommended tool for benchmarking is [hyperfine](https://github.com/sharkdp/hyperfine).
To benchmark your changes:
- checkout the main branch (without your changes), do a `--release` build, and back up the executable produced at `target/release/join`
- checkout your working branch (with your changes), do a `--release` build
- run
```
hyperfine -w 5 "/path/to/main/branch/build/join file1 file2" "/path/to/working/branch/build/join file1 file2"
```
- you'll likely need to add additional options to both commands, such as a field separator, or if you're benchmarking some particular behavior
- you can also optionally benchmark against GNU's join
## What to benchmark
The following options can have a non-trivial impact on performance:
- `-a`/`-v` if one of the two files has significantly more lines than the other
- `-j`/`-1`/`-2` cause work to be done to grab the appropriate field
- `-i` adds a call to `to_ascii_lowercase()` that adds some time for allocating and dropping memory for the lowercase key
- `--nocheck-order` causes some calls of `Input::compare` to be skipped
The content of the files being joined has a very significant impact on the performance.
Things like how long each line is, how many fields there are, how long the key fields are, how many lines there are, how many lines can be joined, and how many lines each line can be joined with all change the behavior of the hotpaths.

View file

@ -17,6 +17,7 @@ path = "src/join.rs"
[dependencies]
clap = { version = "3.0", features = ["wrap_help", "cargo"] }
uucore = { version=">=0.0.11", package="uucore", path="../../uucore" }
memchr = "2"
[[bin]]
name = "join"

View file

@ -11,16 +11,49 @@
extern crate uucore;
use clap::{crate_version, App, AppSettings, Arg};
use memchr::{memchr3_iter, memchr_iter};
use std::cmp::Ordering;
use std::convert::From;
use std::error::Error;
use std::fmt::Display;
use std::fs::File;
use std::io::{stdin, stdout, BufRead, BufReader, Split, Stdin, Write};
use std::io::{stdin, stdout, BufRead, BufReader, BufWriter, Split, Stdin, Write};
#[cfg(unix)]
use std::os::unix::ffi::OsStrExt;
use uucore::display::Quotable;
use uucore::error::{set_exit_code, UResult, USimpleError};
use uucore::error::{set_exit_code, UError, UResult, USimpleError};
static NAME: &str = "join";
#[derive(Debug)]
enum JoinError {
IOError(std::io::Error),
UnorderedInput(String),
}
impl UError for JoinError {
fn code(&self) -> i32 {
1
}
}
impl Error for JoinError {}
impl Display for JoinError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
JoinError::IOError(e) => write!(f, "io error: {}", e),
JoinError::UnorderedInput(e) => f.write_str(e),
}
}
}
impl From<std::io::Error> for JoinError {
fn from(error: std::io::Error) -> Self {
Self::IOError(error)
}
}
#[derive(Copy, Clone, PartialEq)]
enum FileNum {
File1,
@ -34,7 +67,7 @@ enum LineEnding {
Newline = b'\n',
}
#[derive(Copy, Clone)]
#[derive(Copy, Clone, PartialEq)]
enum Sep {
Char(u8),
Line,
@ -112,34 +145,43 @@ impl<'a> Repr<'a> {
}
/// Print the field or empty filler if the field is not set.
fn print_field(&self, field: Option<&Vec<u8>>) -> Result<(), std::io::Error> {
fn print_field(
&self,
writer: &mut impl Write,
field: Option<&[u8]>,
) -> Result<(), std::io::Error> {
let value = match field {
Some(field) => field,
None => self.empty,
};
stdout().write_all(value)
writer.write_all(value)
}
/// Print each field except the one at the index.
fn print_fields(&self, line: &Line, index: usize) -> Result<(), std::io::Error> {
for i in 0..line.fields.len() {
fn print_fields(
&self,
writer: &mut impl Write,
line: &Line,
index: usize,
) -> Result<(), std::io::Error> {
for i in 0..line.field_ranges.len() {
if i != index {
stdout().write_all(&[self.separator])?;
stdout().write_all(&line.fields[i])?;
writer.write_all(&[self.separator])?;
writer.write_all(line.get_field(i).unwrap())?;
}
}
Ok(())
}
/// Print each field or the empty filler if the field is not set.
fn print_format<F>(&self, f: F) -> Result<(), std::io::Error>
fn print_format<F>(&self, writer: &mut impl Write, f: F) -> Result<(), std::io::Error>
where
F: Fn(&Spec) -> Option<&'a Vec<u8>>,
F: Fn(&Spec) -> Option<&'a [u8]>,
{
for i in 0..self.format.len() {
if i > 0 {
stdout().write_all(&[self.separator])?;
writer.write_all(&[self.separator])?;
}
let field = match f(&self.format[i]) {
@ -147,13 +189,13 @@ impl<'a> Repr<'a> {
None => self.empty,
};
stdout().write_all(field)?;
writer.write_all(field)?;
}
Ok(())
}
fn print_line_ending(&self) -> Result<(), std::io::Error> {
stdout().write_all(&[self.line_ending as u8])
fn print_line_ending(&self, writer: &mut impl Write) -> Result<(), std::io::Error> {
writer.write_all(&[self.line_ending as u8])
}
}
@ -173,7 +215,7 @@ impl Input {
}
}
fn compare(&self, field1: Option<&Vec<u8>>, field2: Option<&Vec<u8>>) -> Ordering {
fn compare(&self, field1: Option<&[u8]>, field2: Option<&[u8]>) -> Ordering {
if let (Some(field1), Some(field2)) = (field1, field2) {
if self.ignore_case {
field1
@ -236,30 +278,41 @@ impl Spec {
}
struct Line {
fields: Vec<Vec<u8>>,
field_ranges: Vec<(usize, usize)>,
string: Vec<u8>,
}
impl Line {
fn new(string: Vec<u8>, separator: Sep) -> Self {
let fields = match separator {
Sep::Whitespaces => string
// GNU join uses Bourne shell field splitters by default
.split(|c| matches!(*c, b' ' | b'\t' | b'\n'))
.filter(|f| !f.is_empty())
.map(Vec::from)
.collect(),
Sep::Char(sep) => string.split(|c| *c == sep).map(Vec::from).collect(),
Sep::Line => vec![string.clone()],
};
fn new(string: Vec<u8>, separator: Sep, len_guess: usize) -> Self {
let mut field_ranges = Vec::with_capacity(len_guess);
let mut last_end = 0;
if separator == Sep::Whitespaces {
// GNU join uses Bourne shell field splitters by default
for i in memchr3_iter(b' ', b'\t', b'\n', &string) {
if i > last_end {
field_ranges.push((last_end, i));
}
last_end = i + 1;
}
} else if let Sep::Char(sep) = separator {
for i in memchr_iter(sep, &string) {
field_ranges.push((last_end, i));
last_end = i + 1;
}
}
field_ranges.push((last_end, string.len()));
Self { fields, string }
Self {
field_ranges,
string,
}
}
/// Get field at index.
fn get_field(&self, index: usize) -> Option<&Vec<u8>> {
if index < self.fields.len() {
Some(&self.fields[index])
fn get_field(&self, index: usize) -> Option<&[u8]> {
if index < self.field_ranges.len() {
let (low, high) = self.field_ranges[index];
Some(&self.string[low..high])
} else {
None
}
@ -272,6 +325,7 @@ struct State<'a> {
file_num: FileNum,
print_unpaired: bool,
lines: Split<Box<dyn BufRead + 'a>>,
max_len: usize,
seq: Vec<Line>,
line_num: usize,
has_failed: bool,
@ -302,6 +356,7 @@ impl<'a> State<'a> {
file_num,
print_unpaired,
lines: f.split(line_ending as u8),
max_len: 1,
seq: Vec::new(),
line_num: 0,
has_failed: false,
@ -310,54 +365,69 @@ impl<'a> State<'a> {
}
/// Skip the current unpaired line.
fn skip_line(&mut self, input: &Input, repr: &Repr) -> Result<(), std::io::Error> {
fn skip_line(
&mut self,
writer: &mut impl Write,
input: &Input,
repr: &Repr,
) -> Result<(), JoinError> {
if self.print_unpaired {
self.print_first_line(repr)?;
self.print_first_line(writer, repr)?;
}
self.reset_next_line(input);
self.reset_next_line(input)?;
Ok(())
}
/// Keep reading line sequence until the key does not change, return
/// the first line whose key differs.
fn extend(&mut self, input: &Input) -> Option<Line> {
while let Some(line) = self.next_line(input) {
fn extend(&mut self, input: &Input) -> Result<Option<Line>, JoinError> {
while let Some(line) = self.next_line(input)? {
let diff = input.compare(self.get_current_key(), line.get_field(self.key));
if diff == Ordering::Equal {
self.seq.push(line);
} else {
return Some(line);
return Ok(Some(line));
}
}
None
Ok(None)
}
/// Print lines in the buffers as headers.
fn print_headers(&self, other: &State, repr: &Repr) -> Result<(), std::io::Error> {
fn print_headers(
&self,
writer: &mut impl Write,
other: &State,
repr: &Repr,
) -> Result<(), std::io::Error> {
if self.has_line() {
if other.has_line() {
self.combine(other, repr)?;
self.combine(writer, other, repr)?;
} else {
self.print_first_line(repr)?;
self.print_first_line(writer, repr)?;
}
} else if other.has_line() {
other.print_first_line(repr)?;
other.print_first_line(writer, repr)?;
}
Ok(())
}
/// Combine two line sequences.
fn combine(&self, other: &State, repr: &Repr) -> Result<(), std::io::Error> {
fn combine(
&self,
writer: &mut impl Write,
other: &State,
repr: &Repr,
) -> Result<(), std::io::Error> {
let key = self.get_current_key();
for line1 in &self.seq {
for line2 in &other.seq {
if repr.uses_format() {
repr.print_format(|spec| match *spec {
repr.print_format(writer, |spec| match *spec {
Spec::Key => key,
Spec::Field(file_num, field_num) => {
if file_num == self.file_num {
@ -372,12 +442,12 @@ impl<'a> State<'a> {
}
})?;
} else {
repr.print_field(key)?;
repr.print_fields(line1, self.key)?;
repr.print_fields(line2, other.key)?;
repr.print_field(writer, key)?;
repr.print_fields(writer, line1, self.key)?;
repr.print_fields(writer, line2, other.key)?;
}
repr.print_line_ending()?;
repr.print_line_ending(writer)?;
}
}
@ -393,14 +463,16 @@ impl<'a> State<'a> {
}
}
fn reset_read_line(&mut self, input: &Input) {
let line = self.read_line(input.separator);
fn reset_read_line(&mut self, input: &Input) -> Result<(), std::io::Error> {
let line = self.read_line(input.separator)?;
self.reset(line);
Ok(())
}
fn reset_next_line(&mut self, input: &Input) {
let line = self.next_line(input);
fn reset_next_line(&mut self, input: &Input) -> Result<(), JoinError> {
let line = self.next_line(input)?;
self.reset(line);
Ok(())
}
fn has_line(&self) -> bool {
@ -408,29 +480,34 @@ impl<'a> State<'a> {
}
fn initialize(&mut self, read_sep: Sep, autoformat: bool) -> usize {
if let Some(line) = self.read_line(read_sep) {
if let Some(line) = crash_if_err!(1, self.read_line(read_sep)) {
self.seq.push(line);
if autoformat {
return self.seq[0].fields.len();
return self.seq[0].field_ranges.len();
}
}
0
}
fn finalize(&mut self, input: &Input, repr: &Repr) -> Result<(), std::io::Error> {
fn finalize(
&mut self,
writer: &mut impl Write,
input: &Input,
repr: &Repr,
) -> Result<(), JoinError> {
if self.has_line() {
if self.print_unpaired {
self.print_first_line(repr)?;
self.print_first_line(writer, repr)?;
}
let mut next_line = self.next_line(input);
let mut next_line = self.next_line(input)?;
while let Some(line) = &next_line {
if self.print_unpaired {
self.print_line(line, repr)?;
self.print_line(writer, line, repr)?;
}
self.reset(next_line);
next_line = self.next_line(input);
next_line = self.next_line(input)?;
}
}
@ -438,51 +515,66 @@ impl<'a> State<'a> {
}
/// Get the next line without the order check.
fn read_line(&mut self, sep: Sep) -> Option<Line> {
let value = self.lines.next()?;
self.line_num += 1;
Some(Line::new(crash_if_err!(1, value), sep))
fn read_line(&mut self, sep: Sep) -> Result<Option<Line>, std::io::Error> {
match self.lines.next() {
Some(value) => {
self.line_num += 1;
let line = Line::new(value?, sep, self.max_len);
if line.field_ranges.len() > self.max_len {
self.max_len = line.field_ranges.len();
}
Ok(Some(line))
}
None => Ok(None),
}
}
/// Get the next line with the order check.
fn next_line(&mut self, input: &Input) -> Option<Line> {
let line = self.read_line(input.separator)?;
fn next_line(&mut self, input: &Input) -> Result<Option<Line>, JoinError> {
if let Some(line) = self.read_line(input.separator)? {
if input.check_order == CheckOrder::Disabled {
return Ok(Some(line));
}
if input.check_order == CheckOrder::Disabled {
return Some(line);
}
let diff = input.compare(self.get_current_key(), line.get_field(self.key));
let diff = input.compare(self.get_current_key(), line.get_field(self.key));
if diff == Ordering::Greater {
if input.check_order == CheckOrder::Enabled || (self.has_unpaired && !self.has_failed) {
eprintln!(
"{}: {}:{}: is not sorted: {}",
uucore::execution_phrase(),
if diff == Ordering::Greater
&& (input.check_order == CheckOrder::Enabled
|| (self.has_unpaired && !self.has_failed))
{
let err_msg = format!(
"{}:{}: is not sorted: {}",
self.file_name.maybe_quote(),
self.line_num,
String::from_utf8_lossy(&line.string)
);
// This is fatal if the check is enabled.
if input.check_order == CheckOrder::Enabled {
return Err(JoinError::UnorderedInput(err_msg));
}
eprintln!("{}: {}", uucore::execution_phrase(), err_msg);
self.has_failed = true;
}
// This is fatal if the check is enabled.
if input.check_order == CheckOrder::Enabled {
std::process::exit(1);
}
}
Some(line)
Ok(Some(line))
} else {
Ok(None)
}
}
/// Gets the key value of the lines stored in seq.
fn get_current_key(&self) -> Option<&Vec<u8>> {
fn get_current_key(&self) -> Option<&[u8]> {
self.seq[0].get_field(self.key)
}
fn print_line(&self, line: &Line, repr: &Repr) -> Result<(), std::io::Error> {
fn print_line(
&self,
writer: &mut impl Write,
line: &Line,
repr: &Repr,
) -> Result<(), std::io::Error> {
if repr.uses_format() {
repr.print_format(|spec| match *spec {
repr.print_format(writer, |spec| match *spec {
Spec::Key => line.get_field(self.key),
Spec::Field(file_num, field_num) => {
if file_num == self.file_num {
@ -493,15 +585,15 @@ impl<'a> State<'a> {
}
})?;
} else {
repr.print_field(line.get_field(self.key))?;
repr.print_fields(line, self.key)?;
repr.print_field(writer, line.get_field(self.key))?;
repr.print_fields(writer, line, self.key)?;
}
repr.print_line_ending()
repr.print_line_ending(writer)
}
fn print_first_line(&self, repr: &Repr) -> Result<(), std::io::Error> {
self.print_line(&self.seq[0], repr)
fn print_first_line(&self, writer: &mut impl Write, repr: &Repr) -> Result<(), std::io::Error> {
self.print_line(writer, &self.seq[0], repr)
}
}
@ -718,7 +810,7 @@ FILENUM is 1 or 2, corresponding to FILE1 or FILE2",
)
}
fn exec(file1: &str, file2: &str, settings: Settings) -> Result<(), std::io::Error> {
fn exec(file1: &str, file2: &str, settings: Settings) -> Result<(), JoinError> {
let stdin = stdin();
let mut state1 = State::new(
@ -774,10 +866,13 @@ fn exec(file1: &str, file2: &str, settings: Settings) -> Result<(), std::io::Err
&settings.empty,
);
let stdout = stdout();
let mut writer = BufWriter::new(stdout.lock());
if settings.headers {
state1.print_headers(&state2, &repr)?;
state1.reset_read_line(&input);
state2.reset_read_line(&input);
state1.print_headers(&mut writer, &state2, &repr)?;
state1.reset_read_line(&input)?;
state2.reset_read_line(&input)?;
}
while state1.has_line() && state2.has_line() {
@ -785,21 +880,39 @@ fn exec(file1: &str, file2: &str, settings: Settings) -> Result<(), std::io::Err
match diff {
Ordering::Less => {
state1.skip_line(&input, &repr)?;
if let Err(e) = state1.skip_line(&mut writer, &input, &repr) {
writer.flush()?;
return Err(e);
}
state1.has_unpaired = true;
state2.has_unpaired = true;
}
Ordering::Greater => {
state2.skip_line(&input, &repr)?;
if let Err(e) = state2.skip_line(&mut writer, &input, &repr) {
writer.flush()?;
return Err(e);
}
state1.has_unpaired = true;
state2.has_unpaired = true;
}
Ordering::Equal => {
let next_line1 = state1.extend(&input);
let next_line2 = state2.extend(&input);
let next_line1 = match state1.extend(&input) {
Ok(line) => line,
Err(e) => {
writer.flush()?;
return Err(e);
}
};
let next_line2 = match state2.extend(&input) {
Ok(line) => line,
Err(e) => {
writer.flush()?;
return Err(e);
}
};
if settings.print_joined {
state1.combine(&state2, &repr)?;
state1.combine(&mut writer, &state2, &repr)?;
}
state1.reset(next_line1);
@ -808,8 +921,16 @@ fn exec(file1: &str, file2: &str, settings: Settings) -> Result<(), std::io::Err
}
}
state1.finalize(&input, &repr)?;
state2.finalize(&input, &repr)?;
if let Err(e) = state1.finalize(&mut writer, &input, &repr) {
writer.flush()?;
return Err(e);
};
if let Err(e) = state2.finalize(&mut writer, &input, &repr) {
writer.flush()?;
return Err(e);
};
writer.flush()?;
if state1.has_failed || state2.has_failed {
eprintln!(

View file

@ -1,6 +1,8 @@
// spell-checker:ignore (words) autoformat
// spell-checker:ignore (words) autoformat nocheck
use crate::common::util::*;
#[cfg(any(target_os = "linux", target_os = "freebsd", target_os = "netbsd"))]
use std::fs::OpenOptions;
#[cfg(unix)]
use std::{ffi::OsStr, os::unix::ffi::OsStrExt};
#[cfg(windows)]
@ -306,6 +308,16 @@ fn missing_format_fields() {
.stdout_only_fixture("missing_format_fields.expected");
}
#[test]
fn nocheck_order() {
new_ucmd!()
.arg("fields_1.txt")
.arg("fields_2.txt")
.arg("--nocheck-order")
.succeeds()
.stdout_only_fixture("default.expected");
}
#[test]
fn wrong_line_order() {
let ts = TestScenario::new(util_name!());
@ -313,11 +325,23 @@ fn wrong_line_order() {
.arg("fields_2.txt")
.arg("fields_4.txt")
.fails()
.stdout_contains("7 g f 4 fg")
.stderr_is(&format!(
"{0} {1}: fields_4.txt:5: is not sorted: 11 g 5 gh\n{0} {1}: input is not in sorted order",
ts.bin_path.to_string_lossy(),
ts.util_name
));
"{0} {1}: fields_4.txt:5: is not sorted: 11 g 5 gh\n{0} {1}: input is not in sorted order",
ts.bin_path.to_string_lossy(),
ts.util_name
));
new_ucmd!()
.arg("--check-order")
.arg("fields_2.txt")
.arg("fields_4.txt")
.fails()
.stdout_does_not_contain("7 g f 4 fg")
.stderr_is(&format!(
"{0}: fields_4.txt:5: is not sorted: 11 g 5 gh",
ts.util_name
));
}
#[test]
@ -327,11 +351,23 @@ fn both_files_wrong_line_order() {
.arg("fields_4.txt")
.arg("fields_5.txt")
.fails()
.stdout_contains("5 e 3 ef")
.stderr_is(&format!(
"{0} {1}: fields_5.txt:4: is not sorted: 3\n{0} {1}: fields_4.txt:5: is not sorted: 11 g 5 gh\n{0} {1}: input is not in sorted order",
ts.bin_path.to_string_lossy(),
ts.util_name
));
new_ucmd!()
.arg("--check-order")
.arg("fields_4.txt")
.arg("fields_5.txt")
.fails()
.stdout_does_not_contain("5 e 3 ef")
.stderr_is(&format!(
"{0}: fields_5.txt:4: is not sorted: 3",
ts.util_name
));
}
#[test]
@ -437,3 +473,15 @@ fn null_line_endings() {
.succeeds()
.stdout_only_fixture("z.expected");
}
#[test]
#[cfg(any(target_os = "linux", target_os = "freebsd", target_os = "netbsd"))]
fn test_full() {
let dev_full = OpenOptions::new().write(true).open("/dev/full").unwrap();
new_ucmd!()
.arg("fields_1.txt")
.arg("fields_2.txt")
.set_stdout(dev_full)
.fails()
.stderr_contains("No space left on device");
}