Merge pull request #888 from andrasio/data-primitives

WIP [data processing]
This commit is contained in:
Andrés N. Robalino 2019-11-03 16:52:21 -05:00 committed by GitHub
commit f966394b63
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 477 additions and 54 deletions

View file

@ -11,3 +11,11 @@ This is laying the groundwork for merging coloring and parsing. It also makes to
work with coloring, which is pretty useful on its own. work with coloring, which is pretty useful on its own.
""" """
enabled = false enabled = false
[data_processing_primitives]
description = "Groundwork so tables can be data processed"
reason = """
These will allow take tables and be able to transform, process, and explore.
"""
enabled = false

View file

@ -322,6 +322,8 @@ pub async fn cli() -> Result<(), Box<dyn Error>> {
whole_stream_command(Table), whole_stream_command(Table),
whole_stream_command(Version), whole_stream_command(Version),
whole_stream_command(Which), whole_stream_command(Which),
#[cfg(data_processing_primitives)]
whole_stream_command(SplitBy),
]); ]);
#[cfg(feature = "clipboard")] #[cfg(feature = "clipboard")]

View file

@ -57,6 +57,10 @@ pub(crate) mod shells;
pub(crate) mod size; pub(crate) mod size;
pub(crate) mod skip_while; pub(crate) mod skip_while;
pub(crate) mod sort_by; pub(crate) mod sort_by;
#[cfg(data_processing_primitives)]
pub(crate) mod split_by;
pub(crate) mod split_column; pub(crate) mod split_column;
pub(crate) mod split_row; pub(crate) mod split_row;
pub(crate) mod table; pub(crate) mod table;
@ -133,6 +137,10 @@ pub(crate) use shells::Shells;
pub(crate) use size::Size; pub(crate) use size::Size;
pub(crate) use skip_while::SkipWhile; pub(crate) use skip_while::SkipWhile;
pub(crate) use sort_by::SortBy; pub(crate) use sort_by::SortBy;
#[cfg(data_processing_primitives)]
pub(crate) use split_by::SplitBy;
pub(crate) use split_column::SplitColumn; pub(crate) use split_column::SplitColumn;
pub(crate) use split_row::SplitRow; pub(crate) use split_row::SplitRow;
pub(crate) use table::Table; pub(crate) use table::Table;

View file

@ -36,59 +36,154 @@ impl WholeStreamCommand for GroupBy {
} }
} }
fn group_by( pub fn group_by(
GroupByArgs { column_name }: GroupByArgs, GroupByArgs { column_name }: GroupByArgs,
RunnableContext { input, name, .. }: RunnableContext, RunnableContext { input, name, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
let stream = async_stream! { let stream = async_stream! {
let values: Vec<Tagged<Value>> = input.values.collect().await; let values: Vec<Tagged<Value>> = input.values.collect().await;
let mut groups = indexmap::IndexMap::new();
for value in values { if values.is_empty() {
let group_key = value.get_data_by_key(&column_name.item); yield Err(ShellError::labeled_error(
"Expected table from pipeline",
if group_key.is_none() { "requires a table input",
column_name.span()
let possibilities = value.data_descriptors(); ))
let mut possible_matches: Vec<_> = possibilities
.iter()
.map(|x| (natural::distance::levenshtein_distance(x, &column_name.item), x))
.collect();
possible_matches.sort();
let err = {
if possible_matches.len() > 0 {
ShellError::labeled_error(
"Unknown column",
format!("did you mean '{}'?", possible_matches[0].1),
&column_name.tag,)
} else { } else {
ShellError::labeled_error( match group(&column_name, values, name) {
"Unknown column", Ok(grouped) => yield ReturnSuccess::value(grouped),
"row does not contain this column", Err(err) => yield Err(err)
&column_name.tag,
)
}
};
yield Err(err)
} else {
let group_key = group_key.unwrap().as_string()?;
let mut group = groups.entry(group_key).or_insert(vec![]);
group.push(value);
} }
} }
let mut out = TaggedDictBuilder::new(name.clone());
for (k,v) in groups.iter() {
out.insert(k, Value::table(v));
}
yield ReturnSuccess::value(out)
}; };
Ok(stream.to_output_stream()) Ok(stream.to_output_stream())
} }
pub fn group(
column_name: &Tagged<String>,
values: Vec<Tagged<Value>>,
tag: impl Into<Tag>,
) -> Result<Tagged<Value>, ShellError> {
let tag = tag.into();
let mut groups = indexmap::IndexMap::new();
for value in values {
let group_key = value.get_data_by_key(column_name);
if group_key.is_none() {
let possibilities = value.data_descriptors();
let mut possible_matches: Vec<_> = possibilities
.iter()
.map(|x| (natural::distance::levenshtein_distance(x, column_name), x))
.collect();
possible_matches.sort();
if possible_matches.len() > 0 {
return Err(ShellError::labeled_error(
"Unknown column",
format!("did you mean '{}'?", possible_matches[0].1),
column_name.tag(),
));
} else {
return Err(ShellError::labeled_error(
"Unknown column",
"row does not contain this column",
column_name.tag(),
));
}
}
let group_key = group_key.unwrap().as_string()?;
let group = groups.entry(group_key).or_insert(vec![]);
group.push(value);
}
let mut out = TaggedDictBuilder::new(&tag);
for (k, v) in groups.iter() {
out.insert(k, Value::table(v));
}
Ok(out.into_tagged_value())
}
#[cfg(test)]
mod tests {
use crate::commands::group_by::group;
use crate::data::meta::*;
use crate::Value;
use indexmap::IndexMap;
fn string(input: impl Into<String>) -> Tagged<Value> {
Value::string(input.into()).tagged_unknown()
}
fn row(entries: IndexMap<String, Tagged<Value>>) -> Tagged<Value> {
Value::row(entries).tagged_unknown()
}
fn table(list: &Vec<Tagged<Value>>) -> Tagged<Value> {
Value::table(list).tagged_unknown()
}
#[test]
fn groups_table_by_key() {
let for_key = String::from("date").tagged_unknown();
let nu_releases = vec![
row(
indexmap! {"name".into() => string("AR"), "country".into() => string("EC"), "date".into() => string("August 23-2019")},
),
row(
indexmap! {"name".into() => string("JT"), "country".into() => string("NZ"), "date".into() => string("August 23-2019")},
),
row(
indexmap! {"name".into() => string("YK"), "country".into() => string("US"), "date".into() => string("October 10-2019")},
),
row(
indexmap! {"name".into() => string("AR"), "country".into() => string("EC"), "date".into() => string("Sept 24-2019")},
),
row(
indexmap! {"name".into() => string("JT"), "country".into() => string("NZ"), "date".into() => string("October 10-2019")},
),
row(
indexmap! {"name".into() => string("YK"), "country".into() => string("US"), "date".into() => string("Sept 24-2019")},
),
row(
indexmap! {"name".into() => string("AR"), "country".into() => string("EC"), "date".into() => string("October 10-2019")},
),
row(
indexmap! {"name".into() => string("JT"), "country".into() => string("NZ"), "date".into() => string("Sept 24-2019")},
),
row(
indexmap! {"name".into() => string("YK"), "country".into() => string("US"), "date".into() => string("August 23-2019")},
),
];
assert_eq!(
group(&for_key, nu_releases, Tag::unknown()).unwrap(),
row(indexmap! {
"August 23-2019".into() => table(&vec![
row(indexmap!{"name".into() => string("AR"), "country".into() => string("EC"), "date".into() => string("August 23-2019")}),
row(indexmap!{"name".into() => string("JT"), "country".into() => string("NZ"), "date".into() => string("August 23-2019")}),
row(indexmap!{"name".into() => string("YK"), "country".into() => string("US"), "date".into() => string("August 23-2019")})
]),
"October 10-2019".into() => table(&vec![
row(indexmap!{"name".into() => string("YK"), "country".into() => string("US"), "date".into() => string("October 10-2019")}),
row(indexmap!{"name".into() => string("JT"), "country".into() => string("NZ"), "date".into() => string("October 10-2019")}),
row(indexmap!{"name".into() => string("AR"), "country".into() => string("EC"), "date".into() => string("October 10-2019")})
]),
"Sept 24-2019".into() => table(&vec![
row(indexmap!{"name".into() => string("AR"), "country".into() => string("EC"), "date".into() => string("Sept 24-2019")}),
row(indexmap!{"name".into() => string("YK"), "country".into() => string("US"), "date".into() => string("Sept 24-2019")}),
row(indexmap!{"name".into() => string("JT"), "country".into() => string("NZ"), "date".into() => string("Sept 24-2019")})
]),
})
);
}
}

256
src/commands/split_by.rs Normal file
View file

@ -0,0 +1,256 @@
use crate::commands::WholeStreamCommand;
use crate::data::TaggedDictBuilder;
use crate::errors::ShellError;
use crate::prelude::*;
pub struct SplitBy;
#[derive(Deserialize)]
pub struct SplitByArgs {
column_name: Tagged<String>,
}
impl WholeStreamCommand for SplitBy {
fn name(&self) -> &str {
"split-by"
}
fn signature(&self) -> Signature {
Signature::build("split-by").required(
"column_name",
SyntaxShape::String,
"the name of the column within the nested table to split by",
)
}
fn usage(&self) -> &str {
"Creates a new table with the data from the inner tables splitted by the column given."
}
fn run(
&self,
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, split_by)?.run()
}
}
pub fn split_by(
SplitByArgs { column_name }: SplitByArgs,
RunnableContext { input, name, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> {
let stream = async_stream! {
let values: Vec<Tagged<Value>> = input.values.collect().await;
if values.len() > 1 || values.is_empty() {
yield Err(ShellError::labeled_error(
"Expected table from pipeline",
"requires a table input",
column_name.span()
))
} else {
match split(&column_name, &values[0], name) {
Ok(split) => yield ReturnSuccess::value(split),
Err(err) => yield Err(err),
}
}
};
Ok(stream.to_output_stream())
}
pub fn split(
column_name: &Tagged<String>,
value: &Tagged<Value>,
tag: impl Into<Tag>,
) -> Result<Tagged<Value>, ShellError> {
let origin_tag = tag.into();
let mut splits = indexmap::IndexMap::new();
match value {
Tagged {
item: Value::Row(group_sets),
..
} => {
for (group_key, group_value) in group_sets.entries.iter() {
match *group_value {
Tagged {
item: Value::Table(ref dataset),
..
} => {
let group = crate::commands::group_by::group(
&column_name,
dataset.to_vec(),
&origin_tag,
)?;
match group {
Tagged {
item: Value::Row(o),
..
} => {
for (split_label, subset) in o.entries.into_iter() {
match subset {
Tagged {
item: Value::Table(subset),
tag,
} => {
let s = splits
.entry(split_label.clone())
.or_insert(indexmap::IndexMap::new());
s.insert(
group_key.clone(),
Value::table(&subset).tagged(tag),
);
}
other => {
return Err(ShellError::type_error(
"a table value",
other.tagged_type_name(),
))
}
}
}
}
_ => {
return Err(ShellError::type_error(
"a table value",
group.tagged_type_name(),
))
}
}
}
ref other => {
return Err(ShellError::type_error(
"a table value",
other.tagged_type_name(),
))
}
}
}
}
_ => {
return Err(ShellError::type_error(
"a table value",
value.tagged_type_name(),
))
}
}
let mut out = TaggedDictBuilder::new(&origin_tag);
for (k, v) in splits.into_iter() {
out.insert(k, Value::row(v));
}
Ok(out.into_tagged_value())
}
#[cfg(test)]
mod tests {
use crate::commands::split_by::split;
use crate::data::meta::*;
use crate::Value;
use indexmap::IndexMap;
fn string(input: impl Into<String>) -> Tagged<Value> {
Value::string(input.into()).tagged_unknown()
}
fn row(entries: IndexMap<String, Tagged<Value>>) -> Tagged<Value> {
Value::row(entries).tagged_unknown()
}
fn table(list: &Vec<Tagged<Value>>) -> Tagged<Value> {
Value::table(list).tagged_unknown()
}
#[test]
fn splits_inner_tables_by_key() {
let for_key = String::from("country").tagged_unknown();
let nu_releases = row(indexmap! {
"August 23-2019".into() => table(&vec![
row(indexmap!{"name".into() => string("AR"), "country".into() => string("EC"), "date".into() => string("August 23-2019")}),
row(indexmap!{"name".into() => string("JT"), "country".into() => string("NZ"), "date".into() => string("August 23-2019")}),
row(indexmap!{"name".into() => string("YK"), "country".into() => string("US"), "date".into() => string("August 23-2019")})
]),
"Sept 24-2019".into() => table(&vec![
row(indexmap!{"name".into() => string("AR"), "country".into() => string("EC"), "date".into() => string("Sept 24-2019")}),
row(indexmap!{"name".into() => string("JT"), "country".into() => string("NZ"), "date".into() => string("Sept 24-2019")}),
row(indexmap!{"name".into() => string("YK"), "country".into() => string("US"), "date".into() => string("Sept 24-2019")})
]),
"October 10-2019".into() => table(&vec![
row(indexmap!{"name".into() => string("AR"), "country".into() => string("EC"), "date".into() => string("October 10-2019")}),
row(indexmap!{"name".into() => string("JT"), "country".into() => string("NZ"), "date".into() => string("October 10-2019")}),
row(indexmap!{"name".into() => string("YK"), "country".into() => string("US"), "date".into() => string("October 10-2019")})
])
});
assert_eq!(
split(&for_key, &nu_releases, Tag::unknown()).unwrap(),
Value::row(indexmap! {
"EC".into() => row(indexmap! {
"August 23-2019".into() => table(&vec![
row(indexmap!{"name".into() => string("AR"), "country".into() => string("EC"), "date".into() => string("August 23-2019")})
]),
"Sept 24-2019".into() => table(&vec![
row(indexmap!{"name".into() => string("AR"), "country".into() => string("EC"), "date".into() => string("Sept 24-2019")})
]),
"October 10-2019".into() => table(&vec![
row(indexmap!{"name".into() => string("AR"), "country".into() => string("EC"), "date".into() => string("October 10-2019")})
])
}),
"NZ".into() => row(indexmap! {
"August 23-2019".into() => table(&vec![
row(indexmap!{"name".into() => string("JT"), "country".into() => string("NZ"), "date".into() => string("August 23-2019")})
]),
"Sept 24-2019".into() => table(&vec![
row(indexmap!{"name".into() => string("JT"), "country".into() => string("NZ"), "date".into() => string("Sept 24-2019")})
]),
"October 10-2019".into() => table(&vec![
row(indexmap!{"name".into() => string("JT"), "country".into() => string("NZ"), "date".into() => string("October 10-2019")})
])
}),
"US".into() => row(indexmap! {
"August 23-2019".into() => table(&vec![
row(indexmap!{"name".into() => string("YK"), "country".into() => string("US"), "date".into() => string("August 23-2019")})
]),
"Sept 24-2019".into() => table(&vec![
row(indexmap!{"name".into() => string("YK"), "country".into() => string("US"), "date".into() => string("Sept 24-2019")})
]),
"October 10-2019".into() => table(&vec![
row(indexmap!{"name".into() => string("YK"), "country".into() => string("US"), "date".into() => string("October 10-2019")})
])
})
}).tagged_unknown()
);
}
#[test]
fn errors_if_key_within_some_inner_table_is_missing() {
let for_key = String::from("country").tagged_unknown();
let nu_releases = row(indexmap! {
"August 23-2019".into() => table(&vec![
row(indexmap!{"name".into() => string("AR"), "country".into() => string("EC"), "date".into() => string("August 23-2019")}),
row(indexmap!{"name".into() => string("JT"), "country".into() => string("NZ"), "date".into() => string("August 23-2019")}),
row(indexmap!{"name".into() => string("YK"), "country".into() => string("US"), "date".into() => string("August 23-2019")})
]),
"Sept 24-2019".into() => table(&vec![
row(indexmap!{"name".into() => string("AR"), "country".into() => string("EC"), "date".into() => string("Sept 24-2019")}),
row(indexmap!{"name".into() => Value::string("JT").tagged(Tag::from(Span::new(5,10))), "date".into() => string("Sept 24-2019")}),
row(indexmap!{"name".into() => string("YK"), "country".into() => string("US"), "date".into() => string("Sept 24-2019")})
]),
"October 10-2019".into() => table(&vec![
row(indexmap!{"name".into() => string("AR"), "country".into() => string("EC"), "date".into() => string("October 10-2019")}),
row(indexmap!{"name".into() => string("JT"), "country".into() => string("NZ"), "date".into() => string("October 10-2019")}),
row(indexmap!{"name".into() => string("YK"), "country".into() => string("US"), "date".into() => string("October 10-2019")})
])
});
assert!(split(&for_key, &nu_releases, Tag::from(Span::new(5, 10))).is_err());
}
}

View file

@ -9,10 +9,10 @@ fn group_by() {
sandbox.with_files(vec![FileWithContentToBeTrimmed( sandbox.with_files(vec![FileWithContentToBeTrimmed(
"los_tres_caballeros.csv", "los_tres_caballeros.csv",
r#" r#"
first_name,last_name,rusty_luck,type first_name,last_name,rusty_at,type
Andrés,Robalino,1,A Andrés,Robalino,10/11/2013,A
Jonathan,Turner,1,B Jonathan,Turner,10/12/2013,B
Yehuda,Katz,1,A Yehuda,Katz,10/11/2013,A
"#, "#,
)]); )]);
@ -20,8 +20,8 @@ fn group_by() {
cwd: dirs.test(), h::pipeline( cwd: dirs.test(), h::pipeline(
r#" r#"
open los_tres_caballeros.csv open los_tres_caballeros.csv
| group-by type | group-by rusty_at
| get A | get "10/11/2013"
| count | count
| echo $it | echo $it
"# "#
@ -37,10 +37,10 @@ fn group_by_errors_if_unknown_column_name() {
sandbox.with_files(vec![FileWithContentToBeTrimmed( sandbox.with_files(vec![FileWithContentToBeTrimmed(
"los_tres_caballeros.csv", "los_tres_caballeros.csv",
r#" r#"
first_name,last_name,rusty_luck,type first_name,last_name,rusty_at,type
Andrés,Robalino,1,A Andrés,Robalino,10/11/2013,A
Jonathan,Turner,1,B Jonathan,Turner,10/12/2013,B
Yehuda,Katz,1,A Yehuda,Katz,10/11/2013,A
"#, "#,
)]); )]);
@ -56,6 +56,60 @@ fn group_by_errors_if_unknown_column_name() {
}) })
} }
#[cfg(data_processing_primitives)]
#[test]
fn split_by() {
Playground::setup("split_by_test_1", |dirs, sandbox| {
sandbox.with_files(vec![FileWithContentToBeTrimmed(
"los_tres_caballeros.csv",
r#"
first_name,last_name,rusty_at,type
Andrés,Robalino,10/11/2013,A
Jonathan,Turner,10/12/2013,B
Yehuda,Katz,10/11/2013,A
"#,
)]);
let actual = nu!(
cwd: dirs.test(), h::pipeline(
r#"
open los_tres_caballeros.csv
| group-by rusty_at
| split-by type
| get A."10/11/2013"
| count
| echo $it
"#
));
assert_eq!(actual, "2");
})
}
#[cfg(data_processing_primitives)]
#[test]
fn split_by_errors_if_no_table_given_as_input() {
Playground::setup("split_by_test_2", |dirs, sandbox| {
sandbox.with_files(vec![
EmptyFile("los.txt"),
EmptyFile("tres.txt"),
EmptyFile("amigos.txt"),
EmptyFile("arepas.clu"),
]);
let actual = nu_error!(
cwd: dirs.test(), h::pipeline(
r#"
ls
| get name
| split-by type
"#
));
assert!(actual.contains("Expected table from pipeline"));
})
}
#[test] #[test]
fn first_gets_first_rows_by_amount() { fn first_gets_first_rows_by_amount() {
Playground::setup("first_test_1", |dirs, sandbox| { Playground::setup("first_test_1", |dirs, sandbox| {