Refactor flatten command (#11017)

# Description
Refactors the `flatten` command to remove a bunch of cloning. This was
down by passing ownership of the `Value` to `flat_value`, removing the
lifetime on `TableInside`, and using `Vec<Record>` in `FlattenedRows`
instead of a pair of `Vec` of columns and values.

For the quick benchmark below, it seems to be twice as fast now:
```nushell
let data = ls crates | where type == dir | each { ls $'($in.name)/**/*' }
timeit { for x in 0..1000 { $data | flatten } }
```
This took 550ms on v0.86.0 and only 230ms on this PR.
But considering that
```nushell
timeit { for x in 0..1000 { $data } }
```
takes 200ms on both versions, then the difference for `flatten` itself
is really 250ms vs 30ms -- 8x faster.
This commit is contained in:
Ian Manske 2023-11-10 12:18:02 +00:00 committed by GitHub
parent fe92051bb3
commit 523d0bca16
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -81,7 +81,7 @@ impl Command for Flatten {
}, },
Example { Example {
description: "Flatten inner table", description: "Flatten inner table",
example: "{ a: b, d: [ 1 2 3 4 ], e: [ 4 3 ] } | flatten d --all", example: "{ a: b, d: [ 1 2 3 4 ], e: [ 4 3 ] } | flatten d --all",
result: Some(Value::list( result: Some(Value::list(
vec![ vec![
Value::test_record(record! { Value::test_record(record! {
@ -126,232 +126,197 @@ fn flatten(
call: &Call, call: &Call,
input: PipelineData, input: PipelineData,
) -> Result<PipelineData, ShellError> { ) -> Result<PipelineData, ShellError> {
let tag = call.head;
let columns: Vec<CellPath> = call.rest(engine_state, stack, 0)?; let columns: Vec<CellPath> = call.rest(engine_state, stack, 0)?;
let metadata = input.metadata(); let metadata = input.metadata();
let flatten_all = call.has_flag("all"); let flatten_all = call.has_flag("all");
input input
.flat_map( .flat_map(
move |item| flat_value(&columns, &item, tag, flatten_all), move |item| flat_value(&columns, item, flatten_all),
engine_state.ctrlc.clone(), engine_state.ctrlc.clone(),
) )
.map(|x| x.set_metadata(metadata)) .map(|x| x.set_metadata(metadata))
} }
enum TableInside<'a> { enum TableInside {
// handle for a column which contains a single list(but not list of records) // handle for a column which contains a single list(but not list of records)
// it contains (column, span, values in the column, column index). // it contains (column, span, values in the column, column index).
Entries(&'a str, &'a Span, Vec<&'a Value>, usize), Entries(String, Vec<Value>, usize),
// handle for a column which contains a table, we can flatten the inner column to outer level // handle for a column which contains a table, we can flatten the inner column to outer level
// `columns` means that for the given row, it contains `len(columns)` nested rows, and each nested row contains a list of column name. // `records` is the nested/inner table to flatten to the outer level
// Likely, `values` means that for the given row, it contains `len(values)` nested rows, and each nested row contains a list of values.
//
// `parent_column_name` is handled for conflicting column name, the nested table may contains columns which has the same name // `parent_column_name` is handled for conflicting column name, the nested table may contains columns which has the same name
// to outer level, for that case, the output column name should be f"{parent_column_name}_{inner_column_name}". // to outer level, for that case, the output column name should be f"{parent_column_name}_{inner_column_name}".
// `parent_column_index` is the column index in original table. // `parent_column_index` is the column index in original table.
FlattenedRows { FlattenedRows {
columns: Vec<Vec<String>>, records: Vec<Record>,
_span: &'a Span, parent_column_name: String,
values: Vec<Vec<Value>>,
parent_column_name: &'a str,
parent_column_index: usize, parent_column_index: usize,
}, },
} }
fn flat_value(columns: &[CellPath], item: &Value, name_tag: Span, all: bool) -> Vec<Value> { fn flat_value(columns: &[CellPath], item: Value, all: bool) -> Vec<Value> {
let tag = item.span(); let tag = item.span();
if item.as_record().is_ok() { match item {
let mut out = IndexMap::<String, Value>::new(); Value::Record { val, .. } => {
let mut inner_table = None; let mut out = IndexMap::<String, Value>::new();
let mut inner_table = None;
let record = match item { for (column_index, (column, value)) in val.into_iter().enumerate() {
Value::Record { val, .. } => val, let column_requested = columns.iter().find(|c| c.into_string() == column);
// Propagate errors by explicitly matching them before the final case. let need_flatten = { columns.is_empty() || column_requested.is_some() };
Value::Error { .. } => return vec![item.clone()], let span = value.span();
other => {
return vec![Value::error(
ShellError::OnlySupportsThisInputType {
exp_input_type: "record".into(),
wrong_type: other.get_type().to_string(),
dst_span: name_tag,
src_span: other.span(),
},
name_tag,
)];
}
};
let s = item.span(); match value {
Value::Record { val, .. } => {
for (column_index, (column, value)) in record.iter().enumerate() { if need_flatten {
let column_requested = columns.iter().find(|c| c.into_string() == *column); for (col, val) in val {
let need_flatten = { columns.is_empty() || column_requested.is_some() }; if out.contains_key(&col) {
let span = value.span(); out.insert(format!("{column}_{col}"), val);
match value {
Value::Record { val, .. } => {
if need_flatten {
val.iter().for_each(|(col, val)| {
if out.contains_key(col) {
out.insert(format!("{column}_{col}"), val.clone());
} else {
out.insert(col.to_string(), val.clone());
}
})
} else if out.contains_key(column) {
out.insert(format!("{column}_{column}"), value.clone());
} else {
out.insert(column.to_string(), value.clone());
}
}
Value::List { vals, .. } if all && vals.iter().all(|f| f.as_record().is_ok()) => {
if need_flatten && inner_table.is_some() {
return vec![Value::error( ShellError::UnsupportedInput { msg: "can only flatten one inner list at a time. tried flattening more than one column with inner lists... but is flattened already".to_string(), input: "value originates from here".into(), msg_span: s, input_span: span }, span)
];
}
// it's a table (a list of record, we can flatten inner record)
let mut records = vec![];
for v in vals {
if let Ok(r) = v.as_record() {
records.push(r)
}
}
if need_flatten {
let cols = records.iter().map(|r| r.cols.clone());
let vals = records.iter().map(|r| r.vals.clone());
inner_table = Some(TableInside::FlattenedRows {
columns: cols.collect(),
_span: &s,
values: vals.collect(),
parent_column_name: column,
parent_column_index: column_index,
});
} else if out.contains_key(column) {
out.insert(format!("{column}_{column}"), value.clone());
} else {
out.insert(column.to_string(), value.clone());
}
}
Value::List { vals: values, .. } => {
if need_flatten && inner_table.is_some() {
return vec![Value::error( ShellError::UnsupportedInput { msg: "can only flatten one inner list at a time. tried flattening more than one column with inner lists... but is flattened already".to_string(), input: "value originates from here".into(), msg_span: s, input_span: span }, span)
];
}
if !columns.is_empty() {
let cell_path = column_requested.and_then(|x| match x.members.first() {
Some(PathMember::String { val, span: _, .. }) => Some(val),
_ => None,
});
if let Some(r) = cell_path {
inner_table = Some(TableInside::Entries(
r,
&s,
values.iter().collect::<Vec<_>>(),
column_index,
));
} else {
out.insert(column.to_string(), value.clone());
}
} else {
inner_table = Some(TableInside::Entries(
column,
&s,
values.iter().collect::<Vec<_>>(),
column_index,
));
}
}
_ => {
out.insert(column.to_string(), value.clone());
}
}
}
let mut expanded = vec![];
match inner_table {
Some(TableInside::Entries(column, _, entries, parent_column_index)) => {
for entry in entries {
let base = out.clone();
let mut record = Record::new();
let mut index = 0;
for (col, val) in base.into_iter() {
// meet the flattened column, push them to result record first
// this can avoid output column order changed.
if index == parent_column_index {
record.push(column, entry.clone());
}
record.push(col, val);
index += 1;
}
// the flattened column may be the last column in the original table.
if index == parent_column_index {
record.push(column, entry.clone());
}
expanded.push(Value::record(record, tag));
}
}
Some(TableInside::FlattenedRows {
columns,
_span,
values,
parent_column_name,
parent_column_index,
}) => {
for (inner_cols, inner_vals) in columns.into_iter().zip(values) {
let base = out.clone();
let mut record = Record::new();
let mut index = 0;
for (base_col, base_val) in base.into_iter() {
// meet the flattened column, push them to result record first
// this can avoid output column order changed.
if index == parent_column_index {
for (col, val) in inner_cols.iter().zip(inner_vals.iter()) {
if record.contains(col) {
record.push(format!("{parent_column_name}_{col}"), val.clone());
} else { } else {
record.push(col, val.clone()); out.insert(col, val);
}; }
} }
} else if out.contains_key(&column) {
out.insert(format!("{column}_{column}"), Value::record(val, span));
} else {
out.insert(column, Value::record(val, span));
}
}
Value::List { vals, .. } => {
if need_flatten && inner_table.is_some() {
return vec![Value::error(
ShellError::UnsupportedInput {
msg: "can only flatten one inner list at a time. tried flattening more than one column with inner lists... but is flattened already".into(),
input: "value originates from here".into(),
msg_span: tag,
input_span: span
},
span,
)];
} }
record.push(base_col, base_val); if all && vals.iter().all(|f| f.as_record().is_ok()) {
index += 1; // it's a table (a list of record, we can flatten inner record)
} if need_flatten {
let records = vals
.into_iter()
.filter_map(|v| {
if let Value::Record { val, .. } = v {
Some(val)
} else {
None
}
})
.collect();
// the flattened column may be the last column in the original table. inner_table = Some(TableInside::FlattenedRows {
if index == parent_column_index { records,
for (col, val) in inner_cols.iter().zip(inner_vals.iter()) { parent_column_name: column,
if record.contains(col) { parent_column_index: column_index,
record.push(format!("{parent_column_name}_{col}"), val.clone()); });
} else if out.contains_key(&column) {
out.insert(format!("{column}_{column}"), Value::list(vals, span));
} else { } else {
record.push(col, val.clone()); out.insert(column, Value::list(vals, span));
} }
} else if !columns.is_empty() {
let cell_path =
column_requested.and_then(|x| match x.members.first() {
Some(PathMember::String { val, .. }) => Some(val),
_ => None,
});
if let Some(r) = cell_path {
inner_table =
Some(TableInside::Entries(r.clone(), vals, column_index));
} else {
out.insert(column, Value::list(vals, span));
}
} else {
inner_table = Some(TableInside::Entries(column, vals, column_index));
} }
} }
expanded.push(Value::record(record, tag)); _ => {
out.insert(column, value);
}
} }
} }
None => {
expanded.push(Value::record(out.into_iter().collect(), tag)); let mut expanded = vec![];
match inner_table {
Some(TableInside::Entries(column, entries, parent_column_index)) => {
for entry in entries {
let base = out.clone();
let mut record = Record::new();
let mut index = 0;
for (col, val) in base.into_iter() {
// meet the flattened column, push them to result record first
// this can avoid output column order changed.
if index == parent_column_index {
record.push(column.clone(), entry.clone());
}
record.push(col, val);
index += 1;
}
// the flattened column may be the last column in the original table.
if index == parent_column_index {
record.push(column.clone(), entry);
}
expanded.push(Value::record(record, tag));
}
}
Some(TableInside::FlattenedRows {
records,
parent_column_name,
parent_column_index,
}) => {
for inner_record in records {
let base = out.clone();
let mut record = Record::new();
let mut index = 0;
for (base_col, base_val) in base {
// meet the flattened column, push them to result record first
// this can avoid output column order changed.
if index == parent_column_index {
for (col, val) in &inner_record {
if record.contains(col) {
record.push(
format!("{parent_column_name}_{col}"),
val.clone(),
);
} else {
record.push(col, val.clone());
};
}
}
record.push(base_col, base_val);
index += 1;
}
// the flattened column may be the last column in the original table.
if index == parent_column_index {
for (col, val) in inner_record {
if record.contains(&col) {
record.push(format!("{parent_column_name}_{col}"), val);
} else {
record.push(col, val);
}
}
}
expanded.push(Value::record(record, tag));
}
}
None => {
expanded.push(Value::record(out.into_iter().collect(), tag));
}
} }
expanded
} }
expanded Value::List { vals, .. } => vals,
} else if item.as_list().is_ok() { item => vec![item],
if let Value::List { vals, .. } = item {
vals.to_vec()
} else {
vec![]
}
} else {
vec![item.clone()]
} }
} }