diff --git a/Cargo.lock b/Cargo.lock index 4efa5681e4..cf05fa9543 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -206,11 +206,9 @@ checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" [[package]] name = "arrow" -version = "4.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93811be1c0f60f4b29d80b34dad4e59fdc397a9e580f849df9e2635701498663" +version = "5.0.0-SNAPSHOT" +source = "git+https://github.com/apache/arrow-rs?rev=f26ffb3091ae355d246edc4a6fcc2c8e5b9bc570#f26ffb3091ae355d246edc4a6fcc2c8e5b9bc570" dependencies = [ - "cfg_aliases", "chrono", "csv", "flatbuffers", @@ -399,9 +397,9 @@ checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" [[package]] name = "backtrace" -version = "0.3.59" +version = "0.3.60" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4717cfcbfaa661a0fd48f8453951837ae7e8f81e481fbb136e3202d72805a744" +checksum = "b7815ea54e4d821e791162e078acbebfd6d8c8939cd559c9335dceb1c8ca7282" dependencies = [ "addr2line", "cc", @@ -753,12 +751,6 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" -[[package]] -name = "cfg_aliases" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd16c4719339c4530435d38e511904438d07cce7950afa3718a84ac36c10e89e" - [[package]] name = "chrono" version = "0.4.19" @@ -1926,9 +1918,9 @@ checksum = "acc499defb3b348f8d8f3f66415835a9131856ff7714bf10dadfc4ec4bdb29a1" [[package]] name = "futures-lite" -version = "1.11.3" +version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4481d0cd0de1d204a4fa55e7d45f07b1d958abcb06714b3446438e2eff695fb" +checksum = "7694489acd39452c77daa48516b894c153f192c3578d5a839b62c58099fcbf48" dependencies = [ "fastrand", "futures-core", @@ -2269,9 +2261,9 @@ dependencies = [ [[package]] name = "heck" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87cbf45460356b7deeb5e3415b5563308c0a9b057c85e12b06ad551f98d0a6ac" +checksum = "6d621efb26863f0e9924c6ac577e8275e5e6b77455db64ffa6c65c904e9e132c" dependencies = [ "unicode-segmentation", ] @@ -4209,9 +4201,12 @@ dependencies = [ [[package]] name = "object" -version = "0.24.0" +version = "0.25.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a5b3dd1c072ee7963717671d1ca129f1048fda25edea6b752bfc71ac8854170" +checksum = "9023c1c0973b327f073c7f2fceb9bcc049862f93a7d14c6feb46c8a56460a0d5" +dependencies = [ + "memchr", +] [[package]] name = "once_cell" @@ -4358,9 +4353,8 @@ dependencies = [ [[package]] name = "parquet" -version = "4.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9275a7f8eab04e6ab6918b4fdd50e00aeba3c288e0f91bdc5da87a2c8ff288a6" +version = "5.0.0-SNAPSHOT" +source = "git+https://github.com/apache/arrow-rs?rev=f26ffb3091ae355d246edc4a6fcc2c8e5b9bc570#f26ffb3091ae355d246edc4a6fcc2c8e5b9bc570" dependencies = [ "arrow", "base64 0.13.0", @@ -4598,9 +4592,8 @@ dependencies = [ [[package]] name = "polars" -version = "0.13.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c406ce46726b7d33b05a343d9c1317c0803a419d50bb45275de3f366410e9a80" +version = "0.14.0" +source = "git+https://github.com/pola-rs/polars?rev=a5f17b0a6e3e05ff6be789aa24a7cae54fd400dd#a5f17b0a6e3e05ff6be789aa24a7cae54fd400dd" dependencies = [ "polars-core", "polars-io", @@ -4609,9 +4602,8 @@ dependencies = [ [[package]] name = "polars-arrow" -version = "0.13.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53b2d5fb400345c7977e4e728a10be382476f2f9d2caf6b57cd60e97ea17d364" +version = "0.14.0" +source = "git+https://github.com/pola-rs/polars?rev=a5f17b0a6e3e05ff6be789aa24a7cae54fd400dd#a5f17b0a6e3e05ff6be789aa24a7cae54fd400dd" dependencies = [ "arrow", "num 0.4.0", @@ -4620,9 +4612,8 @@ dependencies = [ [[package]] name = "polars-core" -version = "0.13.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "88561e850748c507f0fc7835b35e795e770597ceecb14e0a8f7d8abf8346645d" +version = "0.14.0" +source = "git+https://github.com/pola-rs/polars?rev=a5f17b0a6e3e05ff6be789aa24a7cae54fd400dd#a5f17b0a6e3e05ff6be789aa24a7cae54fd400dd" dependencies = [ "ahash", "anyhow", @@ -4640,15 +4631,15 @@ dependencies = [ "rand_distr", "rayon", "regex 1.5.4", + "serde 1.0.126", "thiserror", "unsafe_unwrap", ] [[package]] name = "polars-io" -version = "0.13.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "27388810ec5f3346838725aa0aa49343802c1344b96fe82229ae781c62c98bc7" +version = "0.14.0" +source = "git+https://github.com/pola-rs/polars?rev=a5f17b0a6e3e05ff6be789aa24a7cae54fd400dd#a5f17b0a6e3e05ff6be789aa24a7cae54fd400dd" dependencies = [ "ahash", "anyhow", @@ -4670,9 +4661,8 @@ dependencies = [ [[package]] name = "polars-lazy" -version = "0.13.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e7f83284970a9db7d0b6a56d6f944c3988587429c124c1d087188e9d2c7ad7c" +version = "0.14.0" +source = "git+https://github.com/pola-rs/polars?rev=a5f17b0a6e3e05ff6be789aa24a7cae54fd400dd#a5f17b0a6e3e05ff6be789aa24a7cae54fd400dd" dependencies = [ "ahash", "itertools", @@ -6856,9 +6846,9 @@ dependencies = [ [[package]] name = "unicode-normalization" -version = "0.1.18" +version = "0.1.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33717dca7ac877f497014e10d73f3acf948c342bee31b5ca7892faf94ccc6b49" +checksum = "d54590932941a9e9266f0832deed84ebe1bf2e4c9e4a3554d393d18f5e854bf9" dependencies = [ "tinyvec", ] @@ -7327,9 +7317,9 @@ dependencies = [ [[package]] name = "zstd" -version = "0.8.1+zstd.1.5.0" +version = "0.8.3+zstd.1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "357d6bb1bd9c6f6a55a5a15c74d01260b272f724dc60cc829b86ebd2172ac5ef" +checksum = "5ea7094c7b4a58fbd738eb0d4a2fc7684a0e6949a31597e074ffe20a07cbc2bf" dependencies = [ "zstd-safe", ] diff --git a/crates/nu-command/Cargo.toml b/crates/nu-command/Cargo.toml index 2b6c66acd9..af42ee4b65 100644 --- a/crates/nu-command/Cargo.toml +++ b/crates/nu-command/Cargo.toml @@ -100,7 +100,9 @@ which = { version = "4.1.0", optional = true } zip = { version = "0.5.9", optional = true } [dependencies.polars] -version = "0.13.4" +git = "https://github.com/pola-rs/polars" +rev = "a5f17b0a6e3e05ff6be789aa24a7cae54fd400dd" +version = "0.14.0" optional = true features = ["parquet", "json", "random", "pivot"] diff --git a/crates/nu-command/src/commands.rs b/crates/nu-command/src/commands.rs index cd46003b99..73120a6c9b 100644 --- a/crates/nu-command/src/commands.rs +++ b/crates/nu-command/src/commands.rs @@ -191,10 +191,12 @@ pub(crate) use all::Command as All; pub(crate) use any::Command as Any; #[cfg(feature = "dataframe")] pub(crate) use dataframe::{ - DataFrame, DataFrameAggregate, DataFrameConvert, DataFrameDTypes, DataFrameDrop, - DataFrameDummies, DataFrameGroupBy, DataFrameHead, DataFrameJoin, DataFrameList, DataFrameLoad, - DataFrameMelt, DataFramePivot, DataFrameSample, DataFrameSelect, DataFrameShow, DataFrameSlice, - DataFrameTail, DataFrameToCsv, DataFrameToParquet, DataFrameWhere, + DataFrame, DataFrameAggregate, DataFrameColumn, DataFrameDTypes, DataFrameDrop, + DataFrameDropDuplicates, DataFrameDropNulls, DataFrameDummies, DataFrameGet, DataFrameGroupBy, + DataFrameHead, DataFrameJoin, DataFrameList, DataFrameLoad, DataFrameMelt, DataFramePivot, + DataFrameSample, DataFrameSelect, DataFrameShow, DataFrameSlice, DataFrameSort, DataFrameTail, + DataFrameToCsv, DataFrameToDF, DataFrameToParquet, DataFrameToSeries, DataFrameWhere, + DataFrameWithColumn, }; pub(crate) use enter::Enter; pub(crate) use every::Every; diff --git a/crates/nu-command/src/commands/autoview/command.rs b/crates/nu-command/src/commands/autoview/command.rs index 7ac29f1bfc..a0c4bc4797 100644 --- a/crates/nu-command/src/commands/autoview/command.rs +++ b/crates/nu-command/src/commands/autoview/command.rs @@ -265,6 +265,20 @@ pub fn autoview(args: CommandArgs) -> Result { let _ = result.collect::>(); } } + #[cfg(feature = "dataframe")] + Value { + value: UntaggedValue::DataFrame(PolarsData::Series(series)), + tag, + } => { + if let Some(table) = table { + // TODO. Configure the parameter rows from file. It can be + // adjusted to see a certain amount of values in the head + let command_args = + create_default_command_args(&context, series.print()?.into(), tag); + let result = table.run(command_args)?; + let _ = result.collect::>(); + } + } Value { value: UntaggedValue::Primitive(Primitive::Nothing), .. diff --git a/crates/nu-command/src/commands/dataframe/aggregate.rs b/crates/nu-command/src/commands/dataframe/aggregate.rs index d90c80f338..83346fc54a 100644 --- a/crates/nu-command/src/commands/dataframe/aggregate.rs +++ b/crates/nu-command/src/commands/dataframe/aggregate.rs @@ -66,7 +66,7 @@ impl Operation { "Operation not fount", "Operation does not exist", &name.tag, - "Perhaps you want: mean, sum, min, max, first, last, nunique, quantile, median, count", + "Perhaps you want: mean, sum, min, max, first, last, nunique, quantile, median, var, std, or count", &name.tag, )), } @@ -81,7 +81,7 @@ impl WholeStreamCommand for DataFrame { } fn usage(&self) -> &str { - "Performs an aggregation operation on a groupby object" + "Performs an aggregation operation on a dataframe or groupby object" } fn signature(&self) -> Signature { @@ -105,11 +105,19 @@ impl WholeStreamCommand for DataFrame { } fn examples(&self) -> Vec { - vec![Example { - description: "Aggregate sum by grouping by column a and summing on col b", - example: "[[a b]; [one 1] [one 2]] | pls convert | pls groupby [a] | pls aggregate sum", - result: None, - }] + vec![ + Example { + description: "Aggregate sum by grouping by column a and summing on col b", + example: + "[[a b]; [one 1] [one 2]] | pls to-df | pls groupby [a] | pls aggregate sum", + result: None, + }, + Example { + description: "Aggregate sum in dataframe columns", + example: "[[a b]; [4 1] [5 2]] | pls to-df | pls aggregate sum", + result: None, + }, + ] } } @@ -131,45 +139,48 @@ fn command(args: CommandArgs) -> Result { None => (None, Span::unknown()), }; - // The operation is only done in one groupby. Only one input is - // expected from the InputStream - match args.input.next() { - None => Err(ShellError::labeled_error( - "No input received", - "missing groupby input from stream", - &tag, - )), - Some(value) => { - if let UntaggedValue::DataFrame(PolarsData::GroupBy(nu_groupby)) = value.value { - let groupby = nu_groupby.to_groupby()?; + let value = args.input.next().ok_or(ShellError::labeled_error( + "Empty stream", + "No value found in the stream", + &tag, + ))?; - let groupby = match &selection { - Some(cols) => groupby.select(cols), - None => groupby, - }; + let res = match value.value { + UntaggedValue::DataFrame(PolarsData::GroupBy(nu_groupby)) => { + let groupby = nu_groupby.to_groupby()?; - let res = perform_aggregation(groupby, op, &operation.tag, &agg_span)?; + let groupby = match &selection { + Some(cols) => groupby.select(cols), + None => groupby, + }; - let final_df = Value { - tag, - value: UntaggedValue::DataFrame(PolarsData::EagerDataFrame(NuDataFrame::new( - res, - ))), - }; + perform_groupby_aggregation(groupby, op, &operation.tag, &agg_span) + } + UntaggedValue::DataFrame(PolarsData::EagerDataFrame(df)) => { + let df = df.as_ref(); - Ok(OutputStream::one(final_df)) - } else { - Err(ShellError::labeled_error( - "No groupby in stream", - "no groupby found in input stream", - &tag, - )) + match &selection { + Some(cols) => { + let df = df + .select(cols) + .map_err(|e| parse_polars_error::<&str>(&e, &agg_span, None))?; + + perform_dataframe_aggregation(&df, op, &operation.tag) + } + None => perform_dataframe_aggregation(&df, op, &operation.tag), } } - } + _ => Err(ShellError::labeled_error( + "No groupby or dataframe", + "no groupby or found in input stream", + &value.tag.span, + )), + }?; + + Ok(OutputStream::one(NuDataFrame::dataframe_to_value(res, tag))) } -fn perform_aggregation( +fn perform_groupby_aggregation( groupby: GroupBy, operation: Operation, operation_tag: &Tag, @@ -198,3 +209,29 @@ fn perform_aggregation( parse_polars_error::<&str>(&e, span, None) }) } + +fn perform_dataframe_aggregation( + dataframe: &polars::prelude::DataFrame, + operation: Operation, + operation_tag: &Tag, +) -> Result { + match operation { + Operation::Mean => Ok(dataframe.mean()), + Operation::Sum => Ok(dataframe.sum()), + Operation::Min => Ok(dataframe.min()), + Operation::Max => Ok(dataframe.max()), + Operation::Quantile(quantile) => dataframe + .quantile(quantile) + .map_err(|e| parse_polars_error::<&str>(&e, &operation_tag.span, None)), + Operation::Median => Ok(dataframe.median()), + Operation::Var => Ok(dataframe.var()), + Operation::Std => Ok(dataframe.std()), + _ => Err(ShellError::labeled_error_with_secondary( + "Not valid operation", + "operation not valid for dataframe", + &operation_tag.span, + "Perhaps you want: mean, sum, min, max, quantile, median, var, or std", + &operation_tag.span, + )), + } +} diff --git a/crates/nu-command/src/commands/dataframe/column.rs b/crates/nu-command/src/commands/dataframe/column.rs new file mode 100644 index 0000000000..b82acd2a6b --- /dev/null +++ b/crates/nu-command/src/commands/dataframe/column.rs @@ -0,0 +1,56 @@ +use crate::prelude::*; +use nu_engine::WholeStreamCommand; +use nu_errors::ShellError; +use nu_protocol::{ + dataframe::{NuDataFrame, NuSeries}, + Signature, SyntaxShape, +}; + +use nu_source::Tagged; + +use super::utils::parse_polars_error; +pub struct DataFrame; + +impl WholeStreamCommand for DataFrame { + fn name(&self) -> &str { + "pls column" + } + + fn usage(&self) -> &str { + "Returns the selected column as Series" + } + + fn signature(&self) -> Signature { + Signature::build("pls column").required("column", SyntaxShape::String, "column name") + } + + fn run(&self, args: CommandArgs) -> Result { + command(args) + } + + fn examples(&self) -> Vec { + vec![Example { + description: "Returns the selected column as series", + example: "[[a b]; [1 2] [3 4]] | pls to-df | pls column a", + result: None, + }] + } +} + +fn command(args: CommandArgs) -> Result { + let tag = args.call_info.name_tag.clone(); + let mut args = args.evaluate_once()?; + let column: Tagged = args.req(0)?; + + let df = NuDataFrame::try_from_stream(&mut args.input, &tag.span)?; + + let res = df + .as_ref() + .column(column.item.as_ref()) + .map_err(|e| parse_polars_error::<&str>(&e, &column.tag.span, None))?; + + Ok(OutputStream::one(NuSeries::series_to_value( + res.clone(), + tag, + ))) +} diff --git a/crates/nu-command/src/commands/dataframe/drop.rs b/crates/nu-command/src/commands/dataframe/drop.rs index f4f1b90f59..4407460102 100644 --- a/crates/nu-command/src/commands/dataframe/drop.rs +++ b/crates/nu-command/src/commands/dataframe/drop.rs @@ -1,10 +1,7 @@ use crate::prelude::*; use nu_engine::WholeStreamCommand; use nu_errors::ShellError; -use nu_protocol::{ - dataframe::{NuDataFrame, PolarsData}, - Signature, SyntaxShape, UntaggedValue, Value, -}; +use nu_protocol::{dataframe::NuDataFrame, Signature, SyntaxShape, Value}; use super::utils::{convert_columns, parse_polars_error}; @@ -34,7 +31,7 @@ impl WholeStreamCommand for DataFrame { fn examples(&self) -> Vec { vec![Example { description: "drop column a", - example: "[[a b]; [1 2] [3 4]] | pls convert | pls drop [a]", + example: "[[a b]; [1 2] [3 4]] | pls to-df | pls drop [a]", result: None, }] } @@ -45,53 +42,29 @@ fn command(args: CommandArgs) -> Result { let mut args = args.evaluate_once()?; let columns: Vec = args.req(0)?; - let (col_string, col_span) = convert_columns(&columns, &tag)?; - match args.input.next() { + let df = NuDataFrame::try_from_stream(&mut args.input, &tag.span)?; + + let new_df = match col_string.iter().next() { + Some(col) => df + .as_ref() + .drop(col) + .map_err(|e| parse_polars_error::<&str>(&e, &col_span, None)), None => Err(ShellError::labeled_error( - "No input received", - "missing dataframe input from stream", - &tag, + "Empty names list", + "No column names where found", + &col_span, )), - Some(value) => { - if let UntaggedValue::DataFrame(PolarsData::EagerDataFrame(df)) = value.value { - // Dataframe with the first selected column - let new_df = match col_string.iter().next() { - Some(col) => df - .as_ref() - .drop(col) - .map_err(|e| parse_polars_error::<&str>(&e, &col_span, None)), - None => Err(ShellError::labeled_error( - "Empty names list", - "No column names where found", - &col_span, - )), - }?; + }?; - // If there are more columns in the drop selection list, these - // are added from the resulting dataframe - let res = col_string.iter().skip(1).try_fold(new_df, |new_df, col| { - new_df - .drop(col) - .map_err(|e| parse_polars_error::<&str>(&e, &col_span, None)) - })?; + // If there are more columns in the drop selection list, these + // are added from the resulting dataframe + let res = col_string.iter().skip(1).try_fold(new_df, |new_df, col| { + new_df + .drop(col) + .map_err(|e| parse_polars_error::<&str>(&e, &col_span, None)) + })?; - let value = Value { - value: UntaggedValue::DataFrame(PolarsData::EagerDataFrame(NuDataFrame::new( - res, - ))), - tag: tag.clone(), - }; - - Ok(OutputStream::one(value)) - } else { - Err(ShellError::labeled_error( - "No dataframe in stream", - "no dataframe found in input stream", - &tag, - )) - } - } - } + Ok(OutputStream::one(NuDataFrame::dataframe_to_value(res, tag))) } diff --git a/crates/nu-command/src/commands/dataframe/drop_duplicates.rs b/crates/nu-command/src/commands/dataframe/drop_duplicates.rs new file mode 100644 index 0000000000..372d9b5c0e --- /dev/null +++ b/crates/nu-command/src/commands/dataframe/drop_duplicates.rs @@ -0,0 +1,66 @@ +use crate::prelude::*; +use nu_engine::WholeStreamCommand; +use nu_errors::ShellError; +use nu_protocol::{dataframe::NuDataFrame, Signature, SyntaxShape, Value}; + +use super::utils::{convert_columns, parse_polars_error}; + +pub struct DataFrame; + +impl WholeStreamCommand for DataFrame { + fn name(&self) -> &str { + "pls drop-duplicates" + } + + fn usage(&self) -> &str { + "Drops duplicate values in dataframe" + } + + fn signature(&self) -> Signature { + Signature::build("pls drop-duplicates") + .optional( + "subset", + SyntaxShape::Table, + "subset of columns to drop duplicates", + ) + .switch("maintain", "maintain order", Some('m')) + } + + fn run(&self, args: CommandArgs) -> Result { + command(args) + } + + fn examples(&self) -> Vec { + vec![Example { + description: "drop duplicates", + example: "[[a b]; [1 2] [3 4] [1 2]] | pls to-df | pls drop-duplicates", + result: None, + }] + } +} + +fn command(args: CommandArgs) -> Result { + let tag = args.call_info.name_tag.clone(); + let mut args = args.evaluate_once()?; + + // Extracting the selection columns of the columns to perform the aggregation + let columns: Option> = args.opt(0)?; + let (subset, col_span) = match columns { + Some(cols) => { + let (agg_string, col_span) = convert_columns(&cols, &tag)?; + (Some(agg_string), col_span) + } + None => (None, Span::unknown()), + }; + + let df = NuDataFrame::try_from_stream(&mut args.input, &tag.span)?; + + let subset_slice = subset.as_ref().map(|cols| &cols[..]); + + let res = df + .as_ref() + .drop_duplicates(args.has_flag("maintain"), subset_slice) + .map_err(|e| parse_polars_error::<&str>(&e, &col_span, None))?; + + Ok(OutputStream::one(NuDataFrame::dataframe_to_value(res, tag))) +} diff --git a/crates/nu-command/src/commands/dataframe/drop_nulls.rs b/crates/nu-command/src/commands/dataframe/drop_nulls.rs new file mode 100644 index 0000000000..a32f847195 --- /dev/null +++ b/crates/nu-command/src/commands/dataframe/drop_nulls.rs @@ -0,0 +1,64 @@ +use crate::prelude::*; +use nu_engine::WholeStreamCommand; +use nu_errors::ShellError; +use nu_protocol::{dataframe::NuDataFrame, Signature, SyntaxShape, Value}; + +use super::utils::{convert_columns, parse_polars_error}; + +pub struct DataFrame; + +impl WholeStreamCommand for DataFrame { + fn name(&self) -> &str { + "pls drop-nulls" + } + + fn usage(&self) -> &str { + "Drops null values in dataframe" + } + + fn signature(&self) -> Signature { + Signature::build("pls drop-nulls").optional( + "subset", + SyntaxShape::Table, + "subset of columns to drop duplicates", + ) + } + + fn run(&self, args: CommandArgs) -> Result { + command(args) + } + + fn examples(&self) -> Vec { + vec![Example { + description: "drop null values duplicates", + example: "[[a b]; [1 2] [3 4] [1 2]] | pls to-df | pls drop-nulls", + result: None, + }] + } +} + +fn command(args: CommandArgs) -> Result { + let tag = args.call_info.name_tag.clone(); + let mut args = args.evaluate_once()?; + + // Extracting the selection columns of the columns to perform the aggregation + let columns: Option> = args.opt(0)?; + let (subset, col_span) = match columns { + Some(cols) => { + let (agg_string, col_span) = convert_columns(&cols, &tag)?; + (Some(agg_string), col_span) + } + None => (None, Span::unknown()), + }; + + let df = NuDataFrame::try_from_stream(&mut args.input, &tag.span)?; + + let subset_slice = subset.as_ref().map(|cols| &cols[..]); + + let res = df + .as_ref() + .drop_nulls(subset_slice) + .map_err(|e| parse_polars_error::<&str>(&e, &col_span, None))?; + + Ok(OutputStream::one(NuDataFrame::dataframe_to_value(res, tag))) +} diff --git a/crates/nu-command/src/commands/dataframe/dtypes.rs b/crates/nu-command/src/commands/dataframe/dtypes.rs index 478eec0a76..be1f91307b 100644 --- a/crates/nu-command/src/commands/dataframe/dtypes.rs +++ b/crates/nu-command/src/commands/dataframe/dtypes.rs @@ -1,7 +1,7 @@ use crate::prelude::*; use nu_engine::WholeStreamCommand; use nu_errors::ShellError; -use nu_protocol::{dataframe::PolarsData, Signature, TaggedDictBuilder, UntaggedValue}; +use nu_protocol::{dataframe::NuDataFrame, Signature, TaggedDictBuilder}; pub struct DataFrame; @@ -25,7 +25,7 @@ impl WholeStreamCommand for DataFrame { fn examples(&self) -> Vec { vec![Example { description: "drop column a", - example: "[[a b]; [1 2] [3 4]] | pls convert | pls dtypes", + example: "[[a b]; [1 2] [3 4]] | pls to-df | pls dtypes", result: None, }] } @@ -35,42 +35,26 @@ fn command(args: CommandArgs) -> Result { let tag = args.call_info.name_tag.clone(); let mut args = args.evaluate_once()?; - match args.input.next() { - None => Err(ShellError::labeled_error( - "No input received", - "missing dataframe input from stream", - &tag, - )), - Some(value) => { - if let UntaggedValue::DataFrame(PolarsData::EagerDataFrame(df)) = value.value { - let col_names = df - .as_ref() - .get_column_names() - .iter() - .map(|v| v.to_string()) - .collect::>(); + let df = NuDataFrame::try_from_stream(&mut args.input, &tag.span)?; + let col_names = df + .as_ref() + .get_column_names() + .iter() + .map(|v| v.to_string()) + .collect::>(); - let values = df - .as_ref() - .dtypes() - .into_iter() - .zip(col_names.into_iter()) - .map(move |(dtype, name)| { - let mut data = TaggedDictBuilder::new(tag.clone()); - data.insert_value("column", name.as_ref()); - data.insert_value("dtype", format!("{}", dtype)); + let values = df + .as_ref() + .dtypes() + .into_iter() + .zip(col_names.into_iter()) + .map(move |(dtype, name)| { + let mut data = TaggedDictBuilder::new(tag.clone()); + data.insert_value("column", name.as_ref()); + data.insert_value("dtype", format!("{}", dtype)); - data.into_value() - }); + data.into_value() + }); - Ok(OutputStream::from_stream(values)) - } else { - Err(ShellError::labeled_error( - "No dataframe in stream", - "no dataframe found in input stream", - &tag, - )) - } - } - } + Ok(OutputStream::from_stream(values)) } diff --git a/crates/nu-command/src/commands/dataframe/dummies.rs b/crates/nu-command/src/commands/dataframe/dummies.rs index 449ea47504..be3c7b2929 100644 --- a/crates/nu-command/src/commands/dataframe/dummies.rs +++ b/crates/nu-command/src/commands/dataframe/dummies.rs @@ -1,10 +1,7 @@ use crate::prelude::*; use nu_engine::WholeStreamCommand; use nu_errors::ShellError; -use nu_protocol::{ - dataframe::{NuDataFrame, PolarsData}, - Signature, UntaggedValue, Value, -}; +use nu_protocol::{dataframe::NuDataFrame, Signature}; use super::utils::parse_polars_error; @@ -12,7 +9,7 @@ pub struct DataFrame; impl WholeStreamCommand for DataFrame { fn name(&self) -> &str { - "pls to_dummies" + "pls to-dummies" } fn usage(&self) -> &str { @@ -20,7 +17,7 @@ impl WholeStreamCommand for DataFrame { } fn signature(&self) -> Signature { - Signature::build("pls select") + Signature::build("pls to-dummies") } fn run(&self, args: CommandArgs) -> Result { @@ -30,7 +27,7 @@ impl WholeStreamCommand for DataFrame { fn examples(&self) -> Vec { vec![Example { description: "Create new dataframe with dummy variables", - example: "[[a b]; [1 2] [3 4]] | pls convert | pls to_dummies", + example: "[[a b]; [1 2] [3 4]] | pls to-df | pls to-dummies", result: None, }] } @@ -40,37 +37,14 @@ fn command(args: CommandArgs) -> Result { let tag = args.call_info.name_tag.clone(); let mut args = args.evaluate_once()?; - match args.input.next() { - None => Err(ShellError::labeled_error( - "No input received", - "missing dataframe input from stream", - &tag, - )), - Some(value) => { - if let UntaggedValue::DataFrame(PolarsData::EagerDataFrame(df)) = value.value { - let res = df.as_ref().to_dummies().map_err(|e| { - parse_polars_error( - &e, - &tag.span, - Some("The only allowed column types for dummies are String or Int"), - ) - })?; + let df = NuDataFrame::try_from_stream(&mut args.input, &tag.span)?; + let res = df.as_ref().to_dummies().map_err(|e| { + parse_polars_error( + &e, + &tag.span, + Some("The only allowed column types for dummies are String or Int"), + ) + })?; - let value = Value { - value: UntaggedValue::DataFrame(PolarsData::EagerDataFrame(NuDataFrame::new( - res, - ))), - tag: tag.clone(), - }; - - Ok(OutputStream::one(value)) - } else { - Err(ShellError::labeled_error( - "No dataframe in stream", - "no dataframe found in input stream", - &tag, - )) - } - } - } + Ok(OutputStream::one(NuDataFrame::dataframe_to_value(res, tag))) } diff --git a/crates/nu-command/src/commands/dataframe/get.rs b/crates/nu-command/src/commands/dataframe/get.rs new file mode 100644 index 0000000000..25711fb739 --- /dev/null +++ b/crates/nu-command/src/commands/dataframe/get.rs @@ -0,0 +1,54 @@ +use crate::prelude::*; +use nu_engine::WholeStreamCommand; +use nu_errors::ShellError; +use nu_protocol::{dataframe::NuDataFrame, Signature, SyntaxShape, Value}; + +use super::utils::{convert_columns, parse_polars_error}; +pub struct DataFrame; + +impl WholeStreamCommand for DataFrame { + fn name(&self) -> &str { + "pls get" + } + + fn usage(&self) -> &str { + "Creates dataframe with the selected columns" + } + + fn signature(&self) -> Signature { + Signature::build("pls get").required( + "columns", + SyntaxShape::Table, + "column names to sort dataframe", + ) + } + + fn run(&self, args: CommandArgs) -> Result { + command(args) + } + + fn examples(&self) -> Vec { + vec![Example { + description: "Creates dataframe with selected columns", + example: "[[a b]; [1 2] [3 4]] | pls to-df | pls get [a]", + result: None, + }] + } +} + +fn command(args: CommandArgs) -> Result { + let tag = args.call_info.name_tag.clone(); + let mut args = args.evaluate_once()?; + let columns: Vec = args.req(0)?; + + let (col_string, col_span) = convert_columns(&columns, &tag)?; + + let df = NuDataFrame::try_from_stream(&mut args.input, &tag.span)?; + + let res = df + .as_ref() + .select(&col_string) + .map_err(|e| parse_polars_error::<&str>(&e, &col_span, None))?; + + Ok(OutputStream::one(NuDataFrame::dataframe_to_value(res, tag))) +} diff --git a/crates/nu-command/src/commands/dataframe/groupby.rs b/crates/nu-command/src/commands/dataframe/groupby.rs index b765ecbd33..80691740b1 100644 --- a/crates/nu-command/src/commands/dataframe/groupby.rs +++ b/crates/nu-command/src/commands/dataframe/groupby.rs @@ -34,7 +34,7 @@ impl WholeStreamCommand for DataFrame { fn examples(&self) -> Vec { vec![Example { description: "Grouping by column a", - example: "[[a b]; [one 1] [one 2]] | pls convert | pls groupby [a]", + example: "[[a b]; [one 1] [one 2]] | pls to-df | pls groupby [a]", result: None, }] } @@ -48,43 +48,26 @@ fn command(args: CommandArgs) -> Result { let by_columns: Vec = args.req(0)?; let (columns_string, col_span) = convert_columns(&by_columns, &tag)?; - // The operation is only done in one dataframe. Only one input is - // expected from the InputStream - match args.input.next() { - None => Err(ShellError::labeled_error( - "No input received", - "missing dataframe input from stream", - &tag, - )), - Some(value) => { - if let UntaggedValue::DataFrame(PolarsData::EagerDataFrame(nu_df)) = value.value { - // This is the expensive part of the groupby; to create the - // groups that will be used for grouping the data in the - // dataframe. Once it has been done these values can be stored - // in a NuGroupBy - let groupby = nu_df - .as_ref() - .groupby(&columns_string) - .map_err(|e| parse_polars_error::<&str>(&e, &col_span, None))?; + let df = NuDataFrame::try_from_stream(&mut args.input, &tag.span)?; - let groups = groupby.get_groups().to_vec(); - let groupby = Value { - tag: value.tag, - value: UntaggedValue::DataFrame(PolarsData::GroupBy(NuGroupBy::new( - NuDataFrame::new(nu_df.as_ref().clone()), - columns_string, - groups, - ))), - }; + // This is the expensive part of the groupby; to create the + // groups that will be used for grouping the data in the + // dataframe. Once it has been done these values can be stored + // in a NuGroupBy + let groupby = df + .as_ref() + .groupby(&columns_string) + .map_err(|e| parse_polars_error::<&str>(&e, &col_span, None))?; - Ok(OutputStream::one(groupby)) - } else { - Err(ShellError::labeled_error( - "No dataframe in stream", - "no dataframe found in input stream", - &tag, - )) - } - } - } + let groups = groupby.get_groups().to_vec(); + let groupby = Value { + tag, + value: UntaggedValue::DataFrame(PolarsData::GroupBy(NuGroupBy::new( + NuDataFrame::new(df.as_ref().clone()), + columns_string, + groups, + ))), + }; + + Ok(OutputStream::one(groupby)) } diff --git a/crates/nu-command/src/commands/dataframe/head.rs b/crates/nu-command/src/commands/dataframe/head.rs index 2d64d2ff7e..49ecfdd7bb 100644 --- a/crates/nu-command/src/commands/dataframe/head.rs +++ b/crates/nu-command/src/commands/dataframe/head.rs @@ -1,10 +1,7 @@ use crate::prelude::*; use nu_engine::WholeStreamCommand; use nu_errors::ShellError; -use nu_protocol::{ - dataframe::{NuDataFrame, PolarsData}, - Signature, SyntaxShape, UntaggedValue, Value, -}; +use nu_protocol::{dataframe::NuDataFrame, Signature, SyntaxShape}; use nu_source::Tagged; @@ -21,7 +18,7 @@ impl WholeStreamCommand for DataFrame { fn signature(&self) -> Signature { Signature::build("pls select").optional( - "n_rows", + "rows", SyntaxShape::Number, "Number of rows for head", ) @@ -34,7 +31,7 @@ impl WholeStreamCommand for DataFrame { fn examples(&self) -> Vec { vec![Example { description: "Create new dataframe with head rows", - example: "[[a b]; [1 2] [3 4]] | pls convert | pls head", + example: "[[a b]; [1 2] [3 4]] | pls to-df | pls head", result: None, }] } @@ -50,31 +47,8 @@ fn command(args: CommandArgs) -> Result { None => 5, }; - match args.input.next() { - None => Err(ShellError::labeled_error( - "No input received", - "missing dataframe input from stream", - &tag, - )), - Some(value) => { - if let UntaggedValue::DataFrame(PolarsData::EagerDataFrame(df)) = value.value { - let res = df.as_ref().head(Some(rows)); + let df = NuDataFrame::try_from_stream(&mut args.input, &tag.span)?; + let res = df.as_ref().head(Some(rows)); - let value = Value { - value: UntaggedValue::DataFrame(PolarsData::EagerDataFrame(NuDataFrame::new( - res, - ))), - tag: tag.clone(), - }; - - Ok(OutputStream::one(value)) - } else { - Err(ShellError::labeled_error( - "No dataframe in stream", - "no dataframe found in input stream", - &tag, - )) - } - } - } + Ok(OutputStream::one(NuDataFrame::dataframe_to_value(res, tag))) } diff --git a/crates/nu-command/src/commands/dataframe/join.rs b/crates/nu-command/src/commands/dataframe/join.rs index 9b496def6f..9b89025d62 100644 --- a/crates/nu-command/src/commands/dataframe/join.rs +++ b/crates/nu-command/src/commands/dataframe/join.rs @@ -52,13 +52,13 @@ impl WholeStreamCommand for DataFrame { vec![ Example { description: "inner join dataframe", - example: "echo [[a b]; [1 2] [3 4]] | pls convert | pls join $right [a] [a]", + example: "echo [[a b]; [1 2] [3 4]] | pls to-df | pls join $right [a] [a]", result: None, }, Example { description: "right join dataframe", example: - "[[a b]; [1 2] [3 4] [5 6]] | pls convert | pls join $right [b] [b] -t right", + "[[a b]; [1 2] [3 4] [5 6]] | pls to-df | pls join $right [b] [b] -t right", result: None, }, ] @@ -95,53 +95,31 @@ fn command(args: CommandArgs) -> Result { let (l_col_string, l_col_span) = convert_columns(&l_col, &tag)?; let (r_col_string, r_col_span) = convert_columns(&r_col, &tag)?; - match args.input.next() { - None => Err(ShellError::labeled_error( - "No input received", - "missing dataframe input from stream", - &tag, - )), - Some(value) => { - if let UntaggedValue::DataFrame(PolarsData::EagerDataFrame(df)) = value.value { - let res = match r_df.value { - UntaggedValue::DataFrame(PolarsData::EagerDataFrame(r_df)) => { - // Checking the column types before performing the join - check_column_datatypes( - df.as_ref(), - &l_col_string, - &l_col_span, - &r_col_string, - &r_col_span, - )?; + let df = NuDataFrame::try_from_stream(&mut args.input, &tag.span)?; - df.as_ref() - .join(r_df.as_ref(), &l_col_string, &r_col_string, join_type) - .map_err(|e| parse_polars_error::<&str>(&e, &l_col_span, None)) - } - _ => Err(ShellError::labeled_error( - "Not a dataframe", - "not a dataframe type value", - &r_df.tag, - )), - }?; + let res = match r_df.value { + UntaggedValue::DataFrame(PolarsData::EagerDataFrame(r_df)) => { + // Checking the column types before performing the join + check_column_datatypes( + df.as_ref(), + &l_col_string, + &l_col_span, + &r_col_string, + &r_col_span, + )?; - let value = Value { - value: UntaggedValue::DataFrame(PolarsData::EagerDataFrame(NuDataFrame::new( - res, - ))), - tag: tag.clone(), - }; - - Ok(OutputStream::one(value)) - } else { - Err(ShellError::labeled_error( - "No dataframe in stream", - "no dataframe found in input stream", - &tag, - )) - } + df.as_ref() + .join(r_df.as_ref(), &l_col_string, &r_col_string, join_type) + .map_err(|e| parse_polars_error::<&str>(&e, &l_col_span, None)) } - } + _ => Err(ShellError::labeled_error( + "Not a dataframe", + "not a dataframe type value", + &r_df.tag, + )), + }?; + + Ok(OutputStream::one(NuDataFrame::dataframe_to_value(res, tag))) } fn check_column_datatypes>( diff --git a/crates/nu-command/src/commands/dataframe/load.rs b/crates/nu-command/src/commands/dataframe/load.rs index 2bf6dfadfd..eef2eb4330 100644 --- a/crates/nu-command/src/commands/dataframe/load.rs +++ b/crates/nu-command/src/commands/dataframe/load.rs @@ -4,8 +4,7 @@ use crate::{commands::dataframe::utils::parse_polars_error, prelude::*}; use nu_engine::{EvaluatedCommandArgs, WholeStreamCommand}; use nu_errors::ShellError; use nu_protocol::{ - dataframe::{NuDataFrame, PolarsData}, - Primitive, Signature, SyntaxShape, UntaggedValue, Value, + dataframe::NuDataFrame, Primitive, Signature, SyntaxShape, UntaggedValue, Value, }; use nu_source::Tagged; @@ -113,12 +112,9 @@ fn command(args: CommandArgs) -> Result { span: tag.span, }; - let tagged_value = Value { - value: UntaggedValue::DataFrame(PolarsData::EagerDataFrame(NuDataFrame::new(df))), - tag: df_tag, - }; - - Ok(InputStream::one(tagged_value).to_output_stream()) + Ok(OutputStream::one(NuDataFrame::dataframe_to_value( + df, df_tag, + ))) } fn from_parquet(args: EvaluatedCommandArgs) -> Result { diff --git a/crates/nu-command/src/commands/dataframe/melt.rs b/crates/nu-command/src/commands/dataframe/melt.rs index 6853189e94..283a084350 100644 --- a/crates/nu-command/src/commands/dataframe/melt.rs +++ b/crates/nu-command/src/commands/dataframe/melt.rs @@ -1,10 +1,7 @@ use crate::{commands::dataframe::utils::parse_polars_error, prelude::*}; use nu_engine::WholeStreamCommand; use nu_errors::ShellError; -use nu_protocol::{ - dataframe::{NuDataFrame, PolarsData}, - Signature, SyntaxShape, UntaggedValue, Value, -}; +use nu_protocol::{dataframe::NuDataFrame, Signature, SyntaxShape, Value}; use super::utils::convert_columns; @@ -20,7 +17,7 @@ impl WholeStreamCommand for DataFrame { } fn signature(&self) -> Signature { - Signature::build("pls join") + Signature::build("pls melt") .required("id_columns", SyntaxShape::Table, "Id columns for melting") .required( "value_columns", @@ -36,7 +33,7 @@ impl WholeStreamCommand for DataFrame { fn examples(&self) -> Vec { vec![Example { description: "melt dataframe", - example: "[[a b]; [a 2] [b 4] [a 6]] | pls convert | pls melt [a] [b]", + example: "[[a b]; [a 2] [b 4] [a 6]] | pls to-df | pls melt [a] [b]", result: None, }] } @@ -52,39 +49,17 @@ fn command(args: CommandArgs) -> Result { let (id_col_string, id_col_span) = convert_columns(&id_col, &tag)?; let (val_col_string, val_col_span) = convert_columns(&val_col, &tag)?; - match args.input.next() { - None => Err(ShellError::labeled_error( - "No input received", - "missing dataframe input from stream", - &tag, - )), - Some(value) => { - if let UntaggedValue::DataFrame(PolarsData::EagerDataFrame(df)) = value.value { - check_column_datatypes(df.as_ref(), &id_col_string, &id_col_span)?; - check_column_datatypes(df.as_ref(), &val_col_string, &val_col_span)?; + let df = NuDataFrame::try_from_stream(&mut args.input, &tag.span)?; - let res = df - .as_ref() - .melt(&id_col_string, &val_col_string) - .map_err(|e| parse_polars_error::<&str>(&e, &tag.span, None))?; + check_column_datatypes(df.as_ref(), &id_col_string, &id_col_span)?; + check_column_datatypes(df.as_ref(), &val_col_string, &val_col_span)?; - let value = Value { - value: UntaggedValue::DataFrame(PolarsData::EagerDataFrame(NuDataFrame::new( - res, - ))), - tag: tag.clone(), - }; + let res = df + .as_ref() + .melt(&id_col_string, &val_col_string) + .map_err(|e| parse_polars_error::<&str>(&e, &tag.span, None))?; - Ok(OutputStream::one(value)) - } else { - Err(ShellError::labeled_error( - "No dataframe in stream", - "no dataframe found in input stream", - &tag, - )) - } - } - } + Ok(OutputStream::one(NuDataFrame::dataframe_to_value(res, tag))) } fn check_column_datatypes>( diff --git a/crates/nu-command/src/commands/dataframe/mod.rs b/crates/nu-command/src/commands/dataframe/mod.rs index 94b040c374..6f113bbc4d 100644 --- a/crates/nu-command/src/commands/dataframe/mod.rs +++ b/crates/nu-command/src/commands/dataframe/mod.rs @@ -1,9 +1,12 @@ pub mod aggregate; +pub mod column; pub mod command; -pub mod convert; pub mod drop; +pub mod drop_duplicates; +pub mod drop_nulls; pub mod dtypes; pub mod dummies; +pub mod get; pub mod groupby; pub mod head; pub mod join; @@ -15,18 +18,25 @@ pub mod sample; pub mod select; pub mod show; pub mod slice; +pub mod sort; pub mod tail; pub mod to_csv; +pub mod to_df; pub mod to_parquet; +pub mod to_series; pub(crate) mod utils; pub mod where_; +pub mod with_column; pub use aggregate::DataFrame as DataFrameAggregate; +pub use column::DataFrame as DataFrameColumn; pub use command::Command as DataFrame; -pub use convert::DataFrame as DataFrameConvert; pub use drop::DataFrame as DataFrameDrop; +pub use drop_duplicates::DataFrame as DataFrameDropDuplicates; +pub use drop_nulls::DataFrame as DataFrameDropNulls; pub use dtypes::DataFrame as DataFrameDTypes; pub use dummies::DataFrame as DataFrameDummies; +pub use get::DataFrame as DataFrameGet; pub use groupby::DataFrame as DataFrameGroupBy; pub use head::DataFrame as DataFrameHead; pub use join::DataFrame as DataFrameJoin; @@ -38,7 +48,11 @@ pub use sample::DataFrame as DataFrameSample; pub use select::DataFrame as DataFrameSelect; pub use show::DataFrame as DataFrameShow; pub use slice::DataFrame as DataFrameSlice; +pub use sort::DataFrame as DataFrameSort; pub use tail::DataFrame as DataFrameTail; pub use to_csv::DataFrame as DataFrameToCsv; +pub use to_df::DataFrame as DataFrameToDF; pub use to_parquet::DataFrame as DataFrameToParquet; +pub use to_series::DataFrame as DataFrameToSeries; pub use where_::DataFrame as DataFrameWhere; +pub use with_column::DataFrame as DataFrameWithColumn; diff --git a/crates/nu-command/src/commands/dataframe/pivot.rs b/crates/nu-command/src/commands/dataframe/pivot.rs index 97bba08f97..b466cda97f 100644 --- a/crates/nu-command/src/commands/dataframe/pivot.rs +++ b/crates/nu-command/src/commands/dataframe/pivot.rs @@ -2,8 +2,8 @@ use crate::{commands::dataframe::utils::parse_polars_error, prelude::*}; use nu_engine::WholeStreamCommand; use nu_errors::ShellError; use nu_protocol::{ - dataframe::{NuDataFrame, PolarsData}, - Signature, SyntaxShape, UntaggedValue, Value, + dataframe::{NuDataFrame, NuGroupBy}, + Signature, SyntaxShape, }; use nu_source::Tagged; @@ -72,7 +72,7 @@ impl WholeStreamCommand for DataFrame { vec![Example { description: "Pivot a dataframe on b and aggregation on col c", example: - "[[a b c]; [one x 1] [two y 2]] | pls convert | pls groupby [a] | pls pivot b c sum", + "[[a b c]; [one x 1] [two y 2]] | pls to-df | pls groupby [a] | pls pivot b c sum", result: None, }] } @@ -93,50 +93,27 @@ fn command(args: CommandArgs) -> Result { // The operation is only done in one groupby. Only one input is // expected from the InputStream - match args.input.next() { - None => Err(ShellError::labeled_error( - "No input received", - "missing groupby input from stream", - &tag, - )), - Some(value) => { - if let UntaggedValue::DataFrame(PolarsData::GroupBy(nu_groupby)) = value.value { - let df_ref = nu_groupby.as_ref(); + let nu_groupby = NuGroupBy::try_from_stream(&mut args.input, &tag.span)?; + let df_ref = nu_groupby.as_ref(); - check_pivot_column(df_ref, &pivot_col)?; - check_value_column(df_ref, &value_col)?; + check_pivot_column(df_ref, &pivot_col)?; + check_value_column(df_ref, &value_col)?; - let mut groupby = nu_groupby.to_groupby()?; + let mut groupby = nu_groupby.to_groupby()?; - let pivot = groupby.pivot(pivot_col.item.as_ref(), value_col.item.as_ref()); + let pivot = groupby.pivot(pivot_col.item.as_ref(), value_col.item.as_ref()); - let res = match op { - Operation::Mean => pivot.mean(), - Operation::Sum => pivot.sum(), - Operation::Min => pivot.min(), - Operation::Max => pivot.max(), - Operation::First => pivot.first(), - Operation::Median => pivot.median(), - } - .map_err(|e| parse_polars_error::<&str>(&e, &tag.span, None))?; - - let final_df = Value { - tag, - value: UntaggedValue::DataFrame(PolarsData::EagerDataFrame(NuDataFrame::new( - res, - ))), - }; - - Ok(OutputStream::one(final_df)) - } else { - Err(ShellError::labeled_error( - "No groupby in stream", - "no groupby found in input stream", - &tag, - )) - } - } + let res = match op { + Operation::Mean => pivot.mean(), + Operation::Sum => pivot.sum(), + Operation::Min => pivot.min(), + Operation::Max => pivot.max(), + Operation::First => pivot.first(), + Operation::Median => pivot.median(), } + .map_err(|e| parse_polars_error::<&str>(&e, &tag.span, None))?; + + Ok(OutputStream::one(NuDataFrame::dataframe_to_value(res, tag))) } fn check_pivot_column( diff --git a/crates/nu-command/src/commands/dataframe/sample.rs b/crates/nu-command/src/commands/dataframe/sample.rs index 48b5755807..83918f6dec 100644 --- a/crates/nu-command/src/commands/dataframe/sample.rs +++ b/crates/nu-command/src/commands/dataframe/sample.rs @@ -1,10 +1,7 @@ use crate::{commands::dataframe::utils::parse_polars_error, prelude::*}; use nu_engine::WholeStreamCommand; use nu_errors::ShellError; -use nu_protocol::{ - dataframe::{NuDataFrame, PolarsData}, - Signature, SyntaxShape, UntaggedValue, Value, -}; +use nu_protocol::{dataframe::NuDataFrame, Signature, SyntaxShape}; use nu_source::Tagged; @@ -44,12 +41,12 @@ impl WholeStreamCommand for DataFrame { vec![ Example { description: "Sample rows from dataframe", - example: "[[a b]; [1 2] [3 4]] | pls load | pls sample -r 1", + example: "[[a b]; [1 2] [3 4]] | pls to-df | pls sample -r 1", result: None, }, Example { description: "Shows sample row using fraction and replace", - example: "[[a b]; [1 2] [3 4] [5 6]] | pls load | pls sample -f 0.5 -e", + example: "[[a b]; [1 2] [3 4] [5 6]] | pls to-df | pls sample -f 0.5 -e", result: None, }, ] @@ -64,52 +61,30 @@ fn command(args: CommandArgs) -> Result { let fraction: Option> = args.get_flag("fraction")?; let replace: bool = args.has_flag("replace"); - match args.input.next() { - None => Err(ShellError::labeled_error( - "No input received", - "missing dataframe input from stream", + let df = NuDataFrame::try_from_stream(&mut args.input, &tag.span)?; + + let res = match (rows, fraction) { + (Some(rows), None) => df + .as_ref() + .sample_n(rows.item, replace) + .map_err(|e| parse_polars_error::<&str>(&e, &rows.tag.span, None)), + (None, Some(frac)) => df + .as_ref() + .sample_frac(frac.item, replace) + .map_err(|e| parse_polars_error::<&str>(&e, &frac.tag.span, None)), + (Some(_), Some(_)) => Err(ShellError::labeled_error( + "Incompatible flags", + "Only one selection criterion allowed", &tag, )), - Some(value) => { - if let UntaggedValue::DataFrame(PolarsData::EagerDataFrame(df)) = value.value { - let res = match (rows, fraction) { - (Some(rows), None) => df - .as_ref() - .sample_n(rows.item, replace) - .map_err(|e| parse_polars_error::<&str>(&e, &rows.tag.span, None)), - (None, Some(frac)) => df - .as_ref() - .sample_frac(frac.item, replace) - .map_err(|e| parse_polars_error::<&str>(&e, &frac.tag.span, None)), - (Some(_), Some(_)) => Err(ShellError::labeled_error( - "Incompatible flags", - "Only one selection criterion allowed", - &tag, - )), - (None, None) => Err(ShellError::labeled_error_with_secondary( - "No selection", - "No selection criterion was found", - &tag, - "Perhaps you want to use the flag -n or -f", - &tag, - )), - }?; + (None, None) => Err(ShellError::labeled_error_with_secondary( + "No selection", + "No selection criterion was found", + &tag, + "Perhaps you want to use the flag -n or -f", + &tag, + )), + }?; - let value = Value { - value: UntaggedValue::DataFrame(PolarsData::EagerDataFrame(NuDataFrame::new( - res, - ))), - tag: tag.clone(), - }; - - Ok(OutputStream::one(value)) - } else { - Err(ShellError::labeled_error( - "No dataframe in stream", - "no dataframe found in input stream", - &tag, - )) - } - } - } + Ok(OutputStream::one(NuDataFrame::dataframe_to_value(res, tag))) } diff --git a/crates/nu-command/src/commands/dataframe/select.rs b/crates/nu-command/src/commands/dataframe/select.rs index f7b8333c9e..e719fcfcc7 100644 --- a/crates/nu-command/src/commands/dataframe/select.rs +++ b/crates/nu-command/src/commands/dataframe/select.rs @@ -1,10 +1,7 @@ use crate::prelude::*; use nu_engine::WholeStreamCommand; use nu_errors::ShellError; -use nu_protocol::{ - dataframe::{NuDataFrame, PolarsData}, - Signature, SyntaxShape, UntaggedValue, Value, -}; +use nu_protocol::{dataframe::NuDataFrame, Signature, SyntaxShape, Value}; use super::utils::{convert_columns, parse_polars_error}; @@ -34,7 +31,7 @@ impl WholeStreamCommand for DataFrame { fn examples(&self) -> Vec { vec![Example { description: "Create new dataframe with column a", - example: "[[a b]; [1 2] [3 4]] | pls convert | pls select [a]", + example: "[[a b]; [1 2] [3 4]] | pls to-df | pls select [a]", result: None, }] } @@ -48,34 +45,12 @@ fn command(args: CommandArgs) -> Result { let (col_string, col_span) = convert_columns(&columns, &tag)?; - match args.input.next() { - None => Err(ShellError::labeled_error( - "No input received", - "missing dataframe input from stream", - &tag, - )), - Some(value) => { - if let UntaggedValue::DataFrame(PolarsData::EagerDataFrame(df)) = value.value { - let res = df - .as_ref() - .select(&col_string) - .map_err(|e| parse_polars_error::<&str>(&e, &col_span, None))?; + let df = NuDataFrame::try_from_stream(&mut args.input, &tag.span)?; - let value = Value { - value: UntaggedValue::DataFrame(PolarsData::EagerDataFrame(NuDataFrame::new( - res, - ))), - tag: tag.clone(), - }; + let res = df + .as_ref() + .select(&col_string) + .map_err(|e| parse_polars_error::<&str>(&e, &col_span, None))?; - Ok(OutputStream::one(value)) - } else { - Err(ShellError::labeled_error( - "No dataframe in stream", - "no dataframe found in input stream", - &tag, - )) - } - } - } + Ok(OutputStream::one(NuDataFrame::dataframe_to_value(res, tag))) } diff --git a/crates/nu-command/src/commands/dataframe/show.rs b/crates/nu-command/src/commands/dataframe/show.rs index a814512c5c..38bf00d7ee 100644 --- a/crates/nu-command/src/commands/dataframe/show.rs +++ b/crates/nu-command/src/commands/dataframe/show.rs @@ -1,7 +1,7 @@ use crate::prelude::*; use nu_engine::WholeStreamCommand; use nu_errors::ShellError; -use nu_protocol::{dataframe::PolarsData, Signature, SyntaxShape, UntaggedValue}; +use nu_protocol::{dataframe::NuDataFrame, Signature, SyntaxShape}; use nu_source::Tagged; @@ -35,12 +35,12 @@ impl WholeStreamCommand for DataFrame { vec![ Example { description: "Shows head rows from dataframe", - example: "[[a b]; [1 2] [3 4]] | pls convert | pls show", + example: "[[a b]; [1 2] [3 4]] | pls to-df | pls show", result: None, }, Example { description: "Shows tail rows from dataframe", - example: "[[a b]; [1 2] [3 4] [5 6]] | pls convert | pls show -t -n 1", + example: "[[a b]; [1 2] [3 4] [5 6]] | pls to-df | pls show -t -n 1", result: None, }, ] @@ -54,25 +54,9 @@ fn command(args: CommandArgs) -> Result { let rows: Option> = args.get_flag("n_rows")?; let tail: bool = args.has_flag("tail"); - match args.input.next() { - None => Err(ShellError::labeled_error( - "No input received", - "missing dataframe input from stream", - &tag, - )), - Some(value) => { - if let UntaggedValue::DataFrame(PolarsData::EagerDataFrame(df)) = value.value { - let rows = rows.map(|v| v.item); - let values = if tail { df.tail(rows)? } else { df.head(rows)? }; + let df = NuDataFrame::try_from_stream(&mut args.input, &tag.span)?; + let rows = rows.map(|v| v.item); + let values = if tail { df.tail(rows)? } else { df.head(rows)? }; - Ok(OutputStream::from_stream(values.into_iter())) - } else { - Err(ShellError::labeled_error( - "No dataframe in stream", - "no dataframe found in input stream", - &tag, - )) - } - } - } + Ok(OutputStream::from_stream(values.into_iter())) } diff --git a/crates/nu-command/src/commands/dataframe/slice.rs b/crates/nu-command/src/commands/dataframe/slice.rs index 7ba6fbf2e7..84cb016bed 100644 --- a/crates/nu-command/src/commands/dataframe/slice.rs +++ b/crates/nu-command/src/commands/dataframe/slice.rs @@ -1,10 +1,7 @@ use crate::prelude::*; use nu_engine::WholeStreamCommand; use nu_errors::ShellError; -use nu_protocol::{ - dataframe::{NuDataFrame, PolarsData}, - Signature, SyntaxShape, UntaggedValue, Value, -}; +use nu_protocol::{dataframe::NuDataFrame, Signature, SyntaxShape}; use nu_source::Tagged; pub struct DataFrame; @@ -19,7 +16,7 @@ impl WholeStreamCommand for DataFrame { } fn signature(&self) -> Signature { - Signature::build("pls select") + Signature::build("pls slice") .required("offset", SyntaxShape::Number, "start of slice") .required("size", SyntaxShape::Number, "size of slice") } @@ -31,7 +28,7 @@ impl WholeStreamCommand for DataFrame { fn examples(&self) -> Vec { vec![Example { description: "Create new dataframe from a slice of the rows", - example: "[[a b]; [1 2] [3 4]] | pls convert | pls slice 0 1", + example: "[[a b]; [1 2] [3 4]] | pls to-df | pls slice 0 1", result: None, }] } @@ -44,31 +41,8 @@ fn command(args: CommandArgs) -> Result { let offset: Tagged = args.req(0)?; let size: Tagged = args.req(1)?; - match args.input.next() { - None => Err(ShellError::labeled_error( - "No input received", - "missing dataframe input from stream", - &tag, - )), - Some(value) => { - if let UntaggedValue::DataFrame(PolarsData::EagerDataFrame(df)) = value.value { - let res = df.as_ref().slice(offset.item as i64, size.item); + let df = NuDataFrame::try_from_stream(&mut args.input, &tag.span)?; + let res = df.as_ref().slice(offset.item as i64, size.item); - let value = Value { - value: UntaggedValue::DataFrame(PolarsData::EagerDataFrame(NuDataFrame::new( - res, - ))), - tag: tag.clone(), - }; - - Ok(OutputStream::one(value)) - } else { - Err(ShellError::labeled_error( - "No dataframe in stream", - "no dataframe found in input stream", - &tag, - )) - } - } - } + Ok(OutputStream::one(NuDataFrame::dataframe_to_value(res, tag))) } diff --git a/crates/nu-command/src/commands/dataframe/sort.rs b/crates/nu-command/src/commands/dataframe/sort.rs new file mode 100644 index 0000000000..fadf1e1f78 --- /dev/null +++ b/crates/nu-command/src/commands/dataframe/sort.rs @@ -0,0 +1,57 @@ +use crate::prelude::*; +use nu_engine::WholeStreamCommand; +use nu_errors::ShellError; +use nu_protocol::{dataframe::NuDataFrame, Signature, SyntaxShape, Value}; + +use super::utils::{convert_columns, parse_polars_error}; +pub struct DataFrame; + +impl WholeStreamCommand for DataFrame { + fn name(&self) -> &str { + "pls sort" + } + + fn usage(&self) -> &str { + "Creates new sorted dataframe" + } + + fn signature(&self) -> Signature { + Signature::build("pls sort") + .required( + "columns", + SyntaxShape::Table, + "column names to sort dataframe", + ) + .switch("reverse", "invert sort", Some('r')) + } + + fn run(&self, args: CommandArgs) -> Result { + command(args) + } + + fn examples(&self) -> Vec { + vec![Example { + description: "Create new sorted dataframe", + example: "[[a b]; [3 4] [1 2]] | pls to-df | pls sort [a]", + result: None, + }] + } +} + +fn command(args: CommandArgs) -> Result { + let tag = args.call_info.name_tag.clone(); + let mut args = args.evaluate_once()?; + let columns: Vec = args.req(0)?; + let reverse = args.has_flag("reverse"); + + let (col_string, col_span) = convert_columns(&columns, &tag)?; + + let df = NuDataFrame::try_from_stream(&mut args.input, &tag.span)?; + + let res = df + .as_ref() + .sort(&col_string, reverse) + .map_err(|e| parse_polars_error::<&str>(&e, &col_span, None))?; + + Ok(OutputStream::one(NuDataFrame::dataframe_to_value(res, tag))) +} diff --git a/crates/nu-command/src/commands/dataframe/tail.rs b/crates/nu-command/src/commands/dataframe/tail.rs index 88c8e142fa..9d6b7acdcb 100644 --- a/crates/nu-command/src/commands/dataframe/tail.rs +++ b/crates/nu-command/src/commands/dataframe/tail.rs @@ -1,10 +1,7 @@ use crate::prelude::*; use nu_engine::WholeStreamCommand; use nu_errors::ShellError; -use nu_protocol::{ - dataframe::{NuDataFrame, PolarsData}, - Signature, SyntaxShape, UntaggedValue, Value, -}; +use nu_protocol::{dataframe::NuDataFrame, Signature, SyntaxShape}; use nu_source::Tagged; pub struct DataFrame; @@ -19,7 +16,7 @@ impl WholeStreamCommand for DataFrame { } fn signature(&self) -> Signature { - Signature::build("pls select").optional( + Signature::build("pls tail").optional( "n_rows", SyntaxShape::Number, "Number of rows for tail", @@ -33,7 +30,7 @@ impl WholeStreamCommand for DataFrame { fn examples(&self) -> Vec { vec![Example { description: "Create new dataframe with tail rows", - example: "[[a b]; [1 2] [3 4]] | pls convert | pls tail", + example: "[[a b]; [1 2] [3 4]] | pls to-df | pls tail", result: None, }] } @@ -49,31 +46,9 @@ fn command(args: CommandArgs) -> Result { None => 5, }; - match args.input.next() { - None => Err(ShellError::labeled_error( - "No input received", - "missing dataframe input from stream", - &tag, - )), - Some(value) => { - if let UntaggedValue::DataFrame(PolarsData::EagerDataFrame(df)) = value.value { - let res = df.as_ref().tail(Some(rows)); + let df = NuDataFrame::try_from_stream(&mut args.input, &tag.span)?; - let value = Value { - value: UntaggedValue::DataFrame(PolarsData::EagerDataFrame(NuDataFrame::new( - res, - ))), - tag: tag.clone(), - }; + let res = df.as_ref().tail(Some(rows)); - Ok(OutputStream::one(value)) - } else { - Err(ShellError::labeled_error( - "No dataframe in stream", - "no dataframe found in input stream", - &tag, - )) - } - } - } + Ok(OutputStream::one(NuDataFrame::dataframe_to_value(res, tag))) } diff --git a/crates/nu-command/src/commands/dataframe/to_csv.rs b/crates/nu-command/src/commands/dataframe/to_csv.rs index cff1e33691..a34d8f0934 100644 --- a/crates/nu-command/src/commands/dataframe/to_csv.rs +++ b/crates/nu-command/src/commands/dataframe/to_csv.rs @@ -4,9 +4,10 @@ use std::path::PathBuf; use crate::prelude::*; use nu_engine::WholeStreamCommand; use nu_errors::ShellError; +use nu_protocol::dataframe::NuDataFrame; use nu_protocol::Primitive; use nu_protocol::Value; -use nu_protocol::{dataframe::PolarsData, Signature, SyntaxShape, UntaggedValue}; +use nu_protocol::{Signature, SyntaxShape, UntaggedValue}; use polars::prelude::{CsvWriter, SerWriter}; @@ -17,7 +18,7 @@ pub struct DataFrame; impl WholeStreamCommand for DataFrame { fn name(&self) -> &str { - "pls to_csv" + "pls to-csv" } fn usage(&self) -> &str { @@ -25,7 +26,7 @@ impl WholeStreamCommand for DataFrame { } fn signature(&self) -> Signature { - Signature::build("pls to_csv") + Signature::build("pls to-csv") .required("file", SyntaxShape::FilePath, "file path to save dataframe") .named( "delimiter", @@ -44,12 +45,12 @@ impl WholeStreamCommand for DataFrame { vec![ Example { description: "Saves dataframe to csv file", - example: "[[a b]; [1 2] [3 4]] | pls convert | pls to_csv test.csv", + example: "[[a b]; [1 2] [3 4]] | pls to-df | pls to_csv test.csv", result: None, }, Example { description: "Saves dataframe to csv file using other delimiter", - example: "[[a b]; [1 2] [3 4]] | pls convert | pls to_csv test.csv -d '|'", + example: "[[a b]; [1 2] [3 4]] | pls to-df | pls to-csv test.csv -d '|'", result: None, }, ] @@ -63,18 +64,7 @@ fn command(args: CommandArgs) -> Result { let delimiter: Option> = args.get_flag("delimiter")?; let no_header: bool = args.has_flag("no_header"); - let mut df = args - .input - .next() - .and_then(|value| match value.value { - UntaggedValue::DataFrame(PolarsData::EagerDataFrame(df)) => Some(df), - _ => None, - }) - .ok_or(ShellError::labeled_error( - "No input received", - "missing dataframe input from stream", - &tag.span, - ))?; + let mut df = NuDataFrame::try_from_stream(&mut args.input, &tag.span)?; let mut file = File::create(&file_name.item).map_err(|e| { ShellError::labeled_error( diff --git a/crates/nu-command/src/commands/dataframe/convert.rs b/crates/nu-command/src/commands/dataframe/to_df.rs similarity index 66% rename from crates/nu-command/src/commands/dataframe/convert.rs rename to crates/nu-command/src/commands/dataframe/to_df.rs index ed23993065..57b75d8914 100644 --- a/crates/nu-command/src/commands/dataframe/convert.rs +++ b/crates/nu-command/src/commands/dataframe/to_df.rs @@ -1,16 +1,13 @@ use crate::prelude::*; use nu_engine::WholeStreamCommand; use nu_errors::ShellError; -use nu_protocol::{ - dataframe::{NuDataFrame, PolarsData}, - Signature, UntaggedValue, -}; +use nu_protocol::{dataframe::NuDataFrame, Signature}; pub struct DataFrame; impl WholeStreamCommand for DataFrame { fn name(&self) -> &str { - "pls convert" + "pls to-df" } fn usage(&self) -> &str { @@ -18,7 +15,7 @@ impl WholeStreamCommand for DataFrame { } fn signature(&self) -> Signature { - Signature::build("pls convert") + Signature::build("pls to-df") } fn run(&self, args: CommandArgs) -> Result { @@ -26,17 +23,14 @@ impl WholeStreamCommand for DataFrame { let args = args.evaluate_once()?; let df = NuDataFrame::try_from_iter(args.input, &tag)?; - let init = InputStream::one( - UntaggedValue::DataFrame(PolarsData::EagerDataFrame(df)).into_value(&tag), - ); - Ok(init.to_output_stream()) + Ok(InputStream::one(df.to_value(tag))) } fn examples(&self) -> Vec { vec![Example { description: "Takes an input stream and converts it to a polars dataframe", - example: "[[a b];[1 2] [3 4]] | pls convert", + example: "[[a b];[1 2] [3 4]] | pls to-df", result: None, }] } diff --git a/crates/nu-command/src/commands/dataframe/to_parquet.rs b/crates/nu-command/src/commands/dataframe/to_parquet.rs index 8acda895fd..0f827c0199 100644 --- a/crates/nu-command/src/commands/dataframe/to_parquet.rs +++ b/crates/nu-command/src/commands/dataframe/to_parquet.rs @@ -4,7 +4,8 @@ use std::path::PathBuf; use crate::prelude::*; use nu_engine::WholeStreamCommand; use nu_errors::ShellError; -use nu_protocol::{dataframe::PolarsData, Primitive, Signature, SyntaxShape, UntaggedValue, Value}; +use nu_protocol::dataframe::NuDataFrame; +use nu_protocol::{Primitive, Signature, SyntaxShape, UntaggedValue, Value}; use polars::prelude::ParquetWriter; @@ -15,7 +16,7 @@ pub struct DataFrame; impl WholeStreamCommand for DataFrame { fn name(&self) -> &str { - "pls to_parquet" + "pls to-parquet" } fn usage(&self) -> &str { @@ -23,7 +24,7 @@ impl WholeStreamCommand for DataFrame { } fn signature(&self) -> Signature { - Signature::build("pls to_parquet").required( + Signature::build("pls to-parquet").required( "file", SyntaxShape::FilePath, "file path to save dataframe", @@ -37,7 +38,7 @@ impl WholeStreamCommand for DataFrame { fn examples(&self) -> Vec { vec![Example { description: "Saves dataframe to parquet file", - example: "[[a b]; [1 2] [3 4]] | pls convert | pls to_parquet test.parquet", + example: "[[a b]; [1 2] [3 4]] | pls to-df | pls to-parquet test.parquet", result: None, }] } @@ -48,18 +49,7 @@ fn command(args: CommandArgs) -> Result { let mut args = args.evaluate_once()?; let file_name: Tagged = args.req(0)?; - let mut df = args - .input - .next() - .and_then(|value| match value.value { - UntaggedValue::DataFrame(PolarsData::EagerDataFrame(df)) => Some(df), - _ => None, - }) - .ok_or(ShellError::labeled_error( - "No input received", - "missing dataframe input from stream", - &tag.span, - ))?; + let mut df = NuDataFrame::try_from_stream(&mut args.input, &tag.span)?; let file = File::create(&file_name.item).map_err(|e| { ShellError::labeled_error( diff --git a/crates/nu-command/src/commands/dataframe/to_series.rs b/crates/nu-command/src/commands/dataframe/to_series.rs new file mode 100644 index 0000000000..3143139154 --- /dev/null +++ b/crates/nu-command/src/commands/dataframe/to_series.rs @@ -0,0 +1,45 @@ +use crate::prelude::*; +use nu_engine::WholeStreamCommand; +use nu_errors::ShellError; +use nu_protocol::{dataframe::NuSeries, Signature, SyntaxShape}; +use nu_source::Tagged; + +pub struct DataFrame; + +impl WholeStreamCommand for DataFrame { + fn name(&self) -> &str { + "pls to-series" + } + + fn usage(&self) -> &str { + "Converts a pipelined List into a polars series" + } + + fn signature(&self) -> Signature { + Signature::build("pls to-series").optional( + "name", + SyntaxShape::String, + "Optional series name", + ) + } + + fn run(&self, args: CommandArgs) -> Result { + let tag = args.call_info.name_tag.clone(); + let args = args.evaluate_once()?; + + let name: Option> = args.opt(0)?; + let name = name.map(|v| v.item); + + let series = NuSeries::try_from_iter(args.input, name)?; + + Ok(InputStream::one(series.to_value(tag))) + } + + fn examples(&self) -> Vec { + vec![Example { + description: "Takes an input stream and converts it to a polars series", + example: "[1 2 3 4] | pls to-series my-col", + result: None, + }] + } +} diff --git a/crates/nu-command/src/commands/dataframe/where_.rs b/crates/nu-command/src/commands/dataframe/where_.rs index 80db5307f9..4ac7a33fc3 100644 --- a/crates/nu-command/src/commands/dataframe/where_.rs +++ b/crates/nu-command/src/commands/dataframe/where_.rs @@ -2,9 +2,9 @@ use crate::prelude::*; use nu_engine::{evaluate_baseline_expr, EvaluatedCommandArgs, WholeStreamCommand}; use nu_errors::ShellError; use nu_protocol::{ - dataframe::{NuDataFrame, PolarsData}, + dataframe::NuDataFrame, hir::{CapturedBlock, ClassifiedCommand, Expression, Literal, Operator, SpannedExpression}, - Primitive, Signature, SyntaxShape, UnspannedPathMember, UntaggedValue, Value, + Primitive, Signature, SyntaxShape, UnspannedPathMember, UntaggedValue, }; use super::utils::parse_polars_error; @@ -36,7 +36,7 @@ impl WholeStreamCommand for DataFrame { fn examples(&self) -> Vec { vec![Example { description: "Filter dataframe based on column a", - example: "[[a b]; [1 2] [3 4]] | pls convert | pls where a == 1", + example: "[[a b]; [1 2] [3 4]] | pls to-df | pls where a == 1", result: None, }] } @@ -148,18 +148,8 @@ fn filter_dataframe( right_condition: &Primitive, operator: &SpannedExpression, ) -> Result { - let df = args - .input - .next() - .and_then(|value| match value.value { - UntaggedValue::DataFrame(PolarsData::EagerDataFrame(nu)) => Some(nu), - _ => None, - }) - .ok_or(ShellError::labeled_error( - "Incorrect stream input", - "Expected dataframe in stream", - &args.call_info.name_tag.span, - ))?; + let span = args.call_info.name_tag.span; + let df = NuDataFrame::try_from_stream(&mut args.input, &span)?; let col = df .as_ref() @@ -198,10 +188,8 @@ fn filter_dataframe( .filter(&mask) .map_err(|e| parse_polars_error::<&str>(&e, &args.call_info.name_tag.span, None))?; - let value = Value { - value: UntaggedValue::DataFrame(PolarsData::EagerDataFrame(NuDataFrame::new(res))), - tag: args.call_info.name_tag.clone(), - }; - - Ok(OutputStream::one(value)) + Ok(OutputStream::one(NuDataFrame::dataframe_to_value( + res, + args.call_info.name_tag.clone(), + ))) } diff --git a/crates/nu-command/src/commands/dataframe/with_column.rs b/crates/nu-command/src/commands/dataframe/with_column.rs new file mode 100644 index 0000000000..65842be82d --- /dev/null +++ b/crates/nu-command/src/commands/dataframe/with_column.rs @@ -0,0 +1,67 @@ +use crate::prelude::*; +use nu_engine::WholeStreamCommand; +use nu_errors::ShellError; +use nu_protocol::{ + dataframe::{NuDataFrame, PolarsData}, + Signature, SyntaxShape, UntaggedValue, Value, +}; + +use super::utils::parse_polars_error; +pub struct DataFrame; + +impl WholeStreamCommand for DataFrame { + fn name(&self) -> &str { + "pls with-column" + } + + fn usage(&self) -> &str { + "Adds a series to the dataframe" + } + + fn signature(&self) -> Signature { + Signature::build("pls with-column").required( + "series", + SyntaxShape::Any, + "series to be added", + ) + } + + fn run(&self, args: CommandArgs) -> Result { + command(args) + } + + fn examples(&self) -> Vec { + vec![Example { + description: "Adds a series to the dataframe", + example: "[[a b]; [1 2] [3 4]] | pls to-df | pls with-column ([5 6] | pls to-series)", + result: None, + }] + } +} + +fn command(args: CommandArgs) -> Result { + let tag = args.call_info.name_tag.clone(); + let mut args = args.evaluate_once()?; + let value: Value = args.req(0)?; + + let series = match value.value { + UntaggedValue::DataFrame(PolarsData::Series(series)) => Ok(series), + _ => Err(ShellError::labeled_error( + "Incorrect type", + "can only add a series to a dataframe", + value.tag.span, + )), + }?; + + let mut df = NuDataFrame::try_from_stream(&mut args.input, &tag.span)?; + + let res = df + .as_mut() + .with_column(series.series()) + .map_err(|e| parse_polars_error::<&str>(&e, &tag.span, None))?; + + Ok(OutputStream::one(NuDataFrame::dataframe_to_value( + res.clone(), + tag, + ))) +} diff --git a/crates/nu-command/src/commands/default_context.rs b/crates/nu-command/src/commands/default_context.rs index b0b902ff6f..2d1c6b8228 100644 --- a/crates/nu-command/src/commands/default_context.rs +++ b/crates/nu-command/src/commands/default_context.rs @@ -253,49 +253,39 @@ pub fn create_default_context(interactive: bool) -> Result Err((left.type_name(), right.type_name())), }, + #[cfg(feature = "dataframe")] + ( + UntaggedValue::DataFrame(PolarsData::Series(lhs)), + UntaggedValue::DataFrame(PolarsData::Series(rhs)), + ) => { + if lhs.as_ref().dtype() == rhs.as_ref().dtype() { + let result = match operator { + Operator::Plus => { + let mut res = lhs.as_ref() + rhs.as_ref(); + let name = format!("sum_{}_{}", lhs.as_ref().name(), rhs.as_ref().name()); + let res = res.rename(name.as_ref()); + Ok(res.clone()) + } + Operator::Minus => { + let mut res = lhs.as_ref() - rhs.as_ref(); + let name = format!("sub_{}_{}", lhs.as_ref().name(), rhs.as_ref().name()); + let res = res.rename(name.as_ref()); + Ok(res.clone()) + } + Operator::Multiply => { + let mut res = lhs.as_ref() * rhs.as_ref(); + let name = format!("mul_{}_{}", lhs.as_ref().name(), rhs.as_ref().name()); + let res = res.rename(name.as_ref()); + Ok(res.clone()) + } + Operator::Divide => { + let mut res = lhs.as_ref() / rhs.as_ref(); + let name = format!("div_{}_{}", lhs.as_ref().name(), rhs.as_ref().name()); + let res = res.rename(name.as_ref()); + Ok(res.clone()) + } + Operator::Modulo => { + let mut res = lhs.as_ref() % rhs.as_ref(); + let name = format!("mod_{}_{}", lhs.as_ref().name(), rhs.as_ref().name()); + let res = res.rename(name.as_ref()); + Ok(res.clone()) + } + _ => Err((left.type_name(), right.type_name())), + }?; + + Ok(NuSeries::series_to_untagged(result)) + } else { + Err((left.type_name(), right.type_name())) + } + } _ => Err((left.type_name(), right.type_name())), } } diff --git a/crates/nu-protocol/Cargo.toml b/crates/nu-protocol/Cargo.toml index 106b8913a5..a9b02e4fb6 100644 --- a/crates/nu-protocol/Cargo.toml +++ b/crates/nu-protocol/Cargo.toml @@ -31,8 +31,11 @@ serde_yaml = "0.8.16" toml = "0.5.8" [dependencies.polars] -version = "0.13.4" +git = "https://github.com/pola-rs/polars" +rev = "a5f17b0a6e3e05ff6be789aa24a7cae54fd400dd" +version = "0.14.0" optional = true +features = ["serde"] [features] dataframe = ["polars"] diff --git a/crates/nu-protocol/src/dataframe/mod.rs b/crates/nu-protocol/src/dataframe/mod.rs index 47767847c3..12ca7c9a96 100644 --- a/crates/nu-protocol/src/dataframe/mod.rs +++ b/crates/nu-protocol/src/dataframe/mod.rs @@ -1,12 +1,15 @@ pub mod nu_dataframe; pub mod nu_groupby; +pub mod nu_series; pub use nu_dataframe::NuDataFrame; pub use nu_groupby::NuGroupBy; +pub use nu_series::NuSeries; use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Hash, Serialize, Deserialize)] pub enum PolarsData { EagerDataFrame(NuDataFrame), GroupBy(NuGroupBy), + Series(NuSeries), } diff --git a/crates/nu-protocol/src/dataframe/nu_dataframe.rs b/crates/nu-protocol/src/dataframe/nu_dataframe.rs index 6f0611a06a..319c811443 100644 --- a/crates/nu-protocol/src/dataframe/nu_dataframe.rs +++ b/crates/nu-protocol/src/dataframe/nu_dataframe.rs @@ -4,16 +4,15 @@ use std::{cmp::Ordering, collections::hash_map::Entry, collections::HashMap}; use bigdecimal::FromPrimitive; use chrono::{DateTime, FixedOffset, NaiveDateTime}; use nu_errors::ShellError; -use nu_source::Tag; +use nu_source::{Span, Tag}; use num_bigint::BigInt; use polars::prelude::{AnyValue, DataFrame, NamedFrom, Series, TimeUnit}; -use serde::de::{Deserialize, Deserializer, Visitor}; -use serde::Serialize; - -use std::fmt; +use serde::{Deserialize, Serialize}; use crate::{Dictionary, Primitive, UntaggedValue, Value}; +use super::PolarsData; + const SECS_PER_DAY: i64 = 86_400; #[derive(Debug)] @@ -40,26 +39,9 @@ impl Default for ColumnValues { type ColumnMap = HashMap; -// TODO. Using Option to help with deserialization. It will be better to find -// a way to use serde with dataframes -#[derive(Debug, Clone, Serialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct NuDataFrame { - #[serde(skip_serializing)] - pub dataframe: Option, -} - -impl Default for NuDataFrame { - fn default() -> Self { - NuDataFrame { dataframe: None } - } -} - -impl NuDataFrame { - pub fn new(df: polars::prelude::DataFrame) -> Self { - NuDataFrame { - dataframe: Some(df), - } - } + dataframe: DataFrame, } // TODO. Better definition of equality and comparison for a dataframe. @@ -88,30 +70,46 @@ impl Hash for NuDataFrame { fn hash(&self, _: &mut H) {} } -impl<'de> Visitor<'de> for NuDataFrame { - type Value = Self; - - fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { - formatter.write_str("an integer between -2^31 and 2^31") +impl AsRef for NuDataFrame { + fn as_ref(&self) -> &polars::prelude::DataFrame { + &self.dataframe } } -impl<'de> Deserialize<'de> for NuDataFrame { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - deserializer.deserialize_i32(NuDataFrame::default()) +impl AsMut for NuDataFrame { + fn as_mut(&mut self) -> &mut polars::prelude::DataFrame { + &mut self.dataframe } } impl NuDataFrame { + pub fn new(dataframe: polars::prelude::DataFrame) -> Self { + NuDataFrame { dataframe } + } + + pub fn try_from_stream(input: &mut T, span: &Span) -> Result + where + T: Iterator, + { + input + .next() + .and_then(|value| match value.value { + UntaggedValue::DataFrame(PolarsData::EagerDataFrame(df)) => Some(df), + _ => None, + }) + .ok_or(ShellError::labeled_error( + "No dataframe in stream", + "no dataframe found in input stream", + span, + )) + } + pub fn try_from_iter(iter: T, tag: &Tag) -> Result where T: Iterator, { // Dictionary to store the columnar data extracted from - // the input. During the iteration we will sort if the values + // the input. During the iteration we check if the values // have different type let mut column_values: ColumnMap = HashMap::new(); @@ -120,10 +118,12 @@ impl NuDataFrame { UntaggedValue::Row(dictionary) => insert_row(&mut column_values, dictionary)?, UntaggedValue::Table(table) => insert_table(&mut column_values, table)?, _ => { - return Err(ShellError::labeled_error( + return Err(ShellError::labeled_error_with_secondary( "Format not supported", "Value not supported for conversion", &value.tag, + "Perhaps you want to use a List of Tables or a Dictionary", + &value.tag, )); } } @@ -132,26 +132,37 @@ impl NuDataFrame { from_parsed_columns(column_values, tag) } + pub fn to_value(self, tag: Tag) -> Value { + Value { + value: UntaggedValue::DataFrame(PolarsData::EagerDataFrame(self)), + tag, + } + } + + pub fn dataframe_to_value(df: DataFrame, tag: Tag) -> Value { + Value { + value: UntaggedValue::DataFrame(PolarsData::EagerDataFrame(NuDataFrame::new(df))), + tag, + } + } + // Print is made out a head and if the dataframe is too large, then a tail pub fn print(&self) -> Result, ShellError> { - if let Some(df) = &self.dataframe { - let size: usize = 20; + let df = &self.as_ref(); + let size: usize = 20; - if df.height() > size { - let sample_size = size / 2; - let mut values = self.head(Some(sample_size))?; - add_separator(&mut values, df); - let remaining = df.height() - sample_size; - let tail_size = remaining.min(sample_size); - let mut tail_values = self.tail(Some(tail_size))?; - values.append(&mut tail_values); + if df.height() > size { + let sample_size = size / 2; + let mut values = self.head(Some(sample_size))?; + add_separator(&mut values, df); + let remaining = df.height() - sample_size; + let tail_size = remaining.min(sample_size); + let mut tail_values = self.tail(Some(tail_size))?; + values.append(&mut tail_values); - Ok(values) - } else { - Ok(self.head(Some(size))?) - } + Ok(values) } else { - unreachable!("No dataframe found in print command") + Ok(self.head(Some(size))?) } } @@ -163,71 +174,47 @@ impl NuDataFrame { } pub fn tail(&self, rows: Option) -> Result, ShellError> { - if let Some(df) = &self.dataframe { - let to_row = df.height(); - let size = rows.unwrap_or(5); - let from_row = to_row.saturating_sub(size); + let df = &self.as_ref(); + let to_row = df.height(); + let size = rows.unwrap_or(5); + let from_row = to_row.saturating_sub(size); - let values = self.to_rows(from_row, to_row)?; + let values = self.to_rows(from_row, to_row)?; - Ok(values) - } else { - unreachable!() - } + Ok(values) } pub fn to_rows(&self, from_row: usize, to_row: usize) -> Result, ShellError> { - if let Some(df) = &self.dataframe { - let column_names = df.get_column_names(); + let df = &self.as_ref(); + let column_names = df.get_column_names(); - let mut values: Vec = Vec::new(); + let mut values: Vec = Vec::new(); - let upper_row = to_row.min(df.height()); - for i in from_row..upper_row { - let row = df.get_row(i); - let mut dictionary_row = Dictionary::default(); + let upper_row = to_row.min(df.height()); + for i in from_row..upper_row { + let row = df.get_row(i); + let mut dictionary_row = Dictionary::default(); - for (val, name) in row.0.iter().zip(column_names.iter()) { - let untagged_val = anyvalue_to_untagged(val)?; + for (val, name) in row.0.iter().zip(column_names.iter()) { + let untagged_val = anyvalue_to_untagged(val)?; - let dict_val = Value { - value: untagged_val, - tag: Tag::unknown(), - }; - - dictionary_row.insert(name.to_string(), dict_val); - } - - let value = Value { - value: UntaggedValue::Row(dictionary_row), + let dict_val = Value { + value: untagged_val, tag: Tag::unknown(), }; - values.push(value); + dictionary_row.insert(name.to_string(), dict_val); } - Ok(values) - } else { - unreachable!() - } - } -} + let value = Value { + value: UntaggedValue::Row(dictionary_row), + tag: Tag::unknown(), + }; -impl AsRef for NuDataFrame { - fn as_ref(&self) -> &polars::prelude::DataFrame { - match &self.dataframe { - Some(df) => df, - None => unreachable!("Accessing ref to dataframe from nu_dataframe"), + values.push(value); } - } -} -impl AsMut for NuDataFrame { - fn as_mut(&mut self) -> &mut polars::prelude::DataFrame { - match &mut self.dataframe { - Some(df) => df, - None => unreachable!("Accessing mut ref to dataframe from nu_dataframe"), - } + Ok(values) } } @@ -391,10 +378,12 @@ fn insert_value( UntaggedValue::Primitive(Primitive::String(_)), ) => col_val.values.push(value), _ => { - return Err(ShellError::labeled_error( + return Err(ShellError::labeled_error_with_secondary( "Different values in column", "Value with different type", &value.tag, + "Perhaps you want to change it to this value type", + &prev_value.tag, )); } } @@ -418,7 +407,7 @@ fn from_parsed_columns(column_values: ColumnMap, tag: &Tag) -> Result { let series_values: Result, _> = - column.values.iter().map(|v| v.as_f32()).collect(); + column.values.iter().map(|v| v.as_i64()).collect(); let series = Series::new(&name, series_values?); df_series.push(series) } @@ -434,9 +423,7 @@ fn from_parsed_columns(column_values: ColumnMap, tag: &Tag) -> Result Ok(NuDataFrame { - dataframe: Some(df), - }), + Ok(df) => Ok(NuDataFrame::new(df)), Err(e) => { return Err(ShellError::labeled_error( "Error while creating dataframe", diff --git a/crates/nu-protocol/src/dataframe/nu_groupby.rs b/crates/nu-protocol/src/dataframe/nu_groupby.rs index 0f0ee91db0..8ec8df9ab5 100644 --- a/crates/nu-protocol/src/dataframe/nu_groupby.rs +++ b/crates/nu-protocol/src/dataframe/nu_groupby.rs @@ -1,11 +1,11 @@ -use nu_source::Tag; +use nu_source::{Span, Tag}; use polars::frame::groupby::{GroupBy, GroupTuples}; use serde::{Deserialize, Serialize}; -use super::NuDataFrame; +use super::{NuDataFrame, PolarsData}; use nu_errors::ShellError; -use crate::{TaggedDictBuilder, Value}; +use crate::{TaggedDictBuilder, UntaggedValue, Value}; #[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Hash, Serialize, Deserialize)] pub struct NuGroupBy { @@ -23,11 +23,25 @@ impl NuGroupBy { } } + pub fn try_from_stream(input: &mut T, span: &Span) -> Result + where + T: Iterator, + { + input + .next() + .and_then(|value| match value.value { + UntaggedValue::DataFrame(PolarsData::GroupBy(group)) => Some(group), + _ => None, + }) + .ok_or(ShellError::labeled_error( + "No groupby object in stream", + "no groupby object found in input stream", + span, + )) + } + pub fn to_groupby(&self) -> Result { - let df = match &self.dataframe.dataframe { - Some(df) => df, - None => unreachable!("No dataframe in nu_dataframe"), - }; + let df = self.dataframe.as_ref(); let by = df.select_series(&self.by).map_err(|e| { ShellError::labeled_error("Error creating groupby", format!("{}", e), Tag::unknown()) @@ -50,9 +64,6 @@ impl NuGroupBy { impl AsRef for NuGroupBy { fn as_ref(&self) -> &polars::prelude::DataFrame { - match &self.dataframe.dataframe { - Some(df) => df, - None => unreachable!("Accessing reference to dataframe from nu_groupby"), - } + self.dataframe.as_ref() } } diff --git a/crates/nu-protocol/src/dataframe/nu_series.rs b/crates/nu-protocol/src/dataframe/nu_series.rs new file mode 100644 index 0000000000..c5e50ab5cc --- /dev/null +++ b/crates/nu-protocol/src/dataframe/nu_series.rs @@ -0,0 +1,330 @@ +use std::cmp::Ordering; +use std::hash::{Hash, Hasher}; +use std::vec; + +use nu_errors::ShellError; +use nu_source::{Span, Tag}; +use polars::prelude::{DataType, NamedFrom, Series}; +use serde::{Deserialize, Serialize}; + +use crate::{Dictionary, Primitive, UntaggedValue, Value}; + +use super::PolarsData; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct NuSeries { + series: Series, + dtype: String, +} + +// TODO. Better definition of equality and comparison for a dataframe. +// Probably it make sense to have a name field and use it for comparisons +impl PartialEq for NuSeries { + fn eq(&self, _: &Self) -> bool { + false + } +} + +impl Eq for NuSeries {} + +impl PartialOrd for NuSeries { + fn partial_cmp(&self, _: &Self) -> Option { + Some(Ordering::Equal) + } +} + +impl Ord for NuSeries { + fn cmp(&self, _: &Self) -> Ordering { + Ordering::Equal + } +} + +impl Hash for NuSeries { + fn hash(&self, _: &mut H) {} +} + +impl NuSeries { + pub fn new(series: Series) -> Self { + let dtype = series.dtype().to_string(); + + NuSeries { series, dtype } + } + + pub fn try_from_stream(input: &mut T, span: &Span) -> Result + where + T: Iterator, + { + input + .next() + .and_then(|value| match value.value { + UntaggedValue::DataFrame(PolarsData::Series(series)) => Some(series), + _ => None, + }) + .ok_or(ShellError::labeled_error( + "No series in stream", + "no series found in input stream", + span, + )) + } + + pub fn try_from_iter(iter: T, name: Option) -> Result + where + T: Iterator, + { + let mut vec_values: Vec = Vec::new(); + + for value in iter { + match value.value { + UntaggedValue::Primitive(Primitive::Int(_)) + | UntaggedValue::Primitive(Primitive::Decimal(_)) + | UntaggedValue::Primitive(Primitive::String(_)) => { + insert_value(value, &mut vec_values)? + } + _ => { + return Err(ShellError::labeled_error_with_secondary( + "Format not supported", + "Value not supported for conversion", + &value.tag.span, + "Perhaps you want to use a list of primitive values (int, decimal, string)", + &value.tag.span, + )); + } + } + } + + from_parsed_vector(vec_values, name) + } + + pub fn to_value(self, tag: Tag) -> Value { + Value { + value: UntaggedValue::DataFrame(PolarsData::Series(self)), + tag, + } + } + + pub fn series_to_value(series: Series, tag: Tag) -> Value { + Value { + value: UntaggedValue::DataFrame(PolarsData::Series(NuSeries::new(series))), + tag, + } + } + + pub fn series_to_untagged(series: Series) -> UntaggedValue { + UntaggedValue::DataFrame(PolarsData::Series(NuSeries::new(series))) + } + + pub fn dtype(&self) -> &str { + &self.dtype + } + + pub fn series(self) -> Series { + self.series + } +} + +impl AsRef for NuSeries { + fn as_ref(&self) -> &Series { + &self.series + } +} + +impl AsMut for NuSeries { + fn as_mut(&mut self) -> &mut Series { + &mut self.series + } +} + +macro_rules! series_to_chunked { + ($converter: expr, $self: expr) => {{ + let chunked_array = $converter.map_err(|e| { + ShellError::labeled_error("Parsing Error", format!("{}", e), Span::unknown()) + })?; + + let size = 20; + + let (head_size, skip, tail_size) = if $self.as_ref().len() > size { + let remaining = $self.as_ref().len() - (size / 2); + let skip = $self.as_ref().len() - remaining; + (size / 2, skip, remaining.min(size / 2)) + } else { + (size, 0, 0) + }; + + let head = chunked_array + .into_iter() + .take(head_size) + .map(|value| match value { + Some(v) => { + let mut dictionary_row = Dictionary::default(); + + let value = Value { + value: UntaggedValue::Primitive(v.into()), + tag: Tag::unknown(), + }; + + let header = format!("{} ({})", $self.as_ref().name(), $self.as_ref().dtype()); + dictionary_row.insert(header, value); + + Value { + value: UntaggedValue::Row(dictionary_row), + tag: Tag::unknown(), + } + } + None => Value { + value: UntaggedValue::Primitive(Primitive::Nothing), + tag: Tag::unknown(), + }, + }); + + let res = if $self.as_ref().len() < size { + head.collect::>() + } else { + let middle = std::iter::once({ + let mut dictionary_row = Dictionary::default(); + + let value = Value { + value: UntaggedValue::Primitive("...".into()), + tag: Tag::unknown(), + }; + + let header = format!("{} ({})", $self.as_ref().name(), $self.as_ref().dtype()); + dictionary_row.insert(header, value); + + Value { + value: UntaggedValue::Row(dictionary_row), + tag: Tag::unknown(), + } + }); + + let tail = + chunked_array + .into_iter() + .skip(skip) + .take(tail_size) + .map(|value| match value { + Some(v) => { + let mut dictionary_row = Dictionary::default(); + + let value = Value { + value: UntaggedValue::Primitive(v.into()), + tag: Tag::unknown(), + }; + + let header = format!("{} ({})", $self.as_ref().name(), $self.dtype()); + dictionary_row.insert(header, value); + + Value { + value: UntaggedValue::Row(dictionary_row), + tag: Tag::unknown(), + } + } + None => Value { + value: UntaggedValue::Primitive(Primitive::Nothing), + tag: Tag::unknown(), + }, + }); + + head.chain(middle).chain(tail).collect::>() + }; + + Ok(res) + }}; +} + +impl NuSeries { + pub fn print(&self) -> Result, ShellError> { + match self.as_ref().dtype() { + DataType::Boolean => series_to_chunked!(self.as_ref().bool(), self), + DataType::UInt8 => series_to_chunked!(self.as_ref().u8(), self), + DataType::UInt16 => series_to_chunked!(self.as_ref().u16(), self), + DataType::UInt32 => series_to_chunked!(self.as_ref().u32(), self), + DataType::UInt64 => series_to_chunked!(self.as_ref().u64(), self), + DataType::Int8 => series_to_chunked!(self.as_ref().i8(), self), + DataType::Int16 => series_to_chunked!(self.as_ref().i16(), self), + DataType::Int32 => series_to_chunked!(self.as_ref().i32(), self), + DataType::Int64 => series_to_chunked!(self.as_ref().i64(), self), + DataType::Float32 => series_to_chunked!(self.as_ref().f32(), self), + DataType::Float64 => series_to_chunked!(self.as_ref().f64(), self), + DataType::Utf8 => series_to_chunked!(self.as_ref().utf8(), self), + DataType::Date32 => series_to_chunked!(self.as_ref().date32(), self), + DataType::Date64 => series_to_chunked!(self.as_ref().date64(), self), + DataType::Null => Ok(vec![Value { + value: UntaggedValue::Primitive(Primitive::Nothing), + tag: Tag::unknown(), + }]), + //DataType::List(_) => None, + //DataType::Time64(TimeUnit) => None, + //DataType::Duration(TimeUnit) => None, + // DataType::Categorical => None, + _ => unimplemented!(), + } + } +} + +fn insert_value(value: Value, vec_values: &mut Vec) -> Result<(), ShellError> { + // Checking that the type for the value is the same + // for the previous value in the column + if vec_values.is_empty() { + Ok(vec_values.push(value)) + } else { + let prev_value = &vec_values[vec_values.len() - 1]; + + match (&prev_value.value, &value.value) { + ( + UntaggedValue::Primitive(Primitive::Int(_)), + UntaggedValue::Primitive(Primitive::Int(_)), + ) + | ( + UntaggedValue::Primitive(Primitive::Decimal(_)), + UntaggedValue::Primitive(Primitive::Decimal(_)), + ) + | ( + UntaggedValue::Primitive(Primitive::String(_)), + UntaggedValue::Primitive(Primitive::String(_)), + ) => Ok(vec_values.push(value)), + _ => Err(ShellError::labeled_error_with_secondary( + "Different values in column", + "Value with different type", + &value.tag, + "Perhaps you want to change it to this value type", + &prev_value.tag, + )), + } + } +} + +fn from_parsed_vector( + vec_values: Vec, + name: Option, +) -> Result { + let series = match &vec_values[0].value { + UntaggedValue::Primitive(Primitive::Int(_)) => { + let series_values: Result, _> = vec_values.iter().map(|v| v.as_i64()).collect(); + let series_name = match &name { + Some(n) => n.as_ref(), + None => "int", + }; + Series::new(series_name, series_values?) + } + UntaggedValue::Primitive(Primitive::Decimal(_)) => { + let series_values: Result, _> = vec_values.iter().map(|v| v.as_f64()).collect(); + let series_name = match &name { + Some(n) => n.as_ref(), + None => "decimal", + }; + Series::new(series_name, series_values?) + } + UntaggedValue::Primitive(Primitive::String(_)) => { + let series_values: Result, _> = + vec_values.iter().map(|v| v.as_string()).collect(); + let series_name = match &name { + Some(n) => n.as_ref(), + None => "string", + }; + Series::new(series_name, series_values?) + } + _ => unreachable!("The untagged type is checked while creating vec_values"), + }; + + Ok(NuSeries::new(series)) +} diff --git a/crates/nu-protocol/src/value.rs b/crates/nu-protocol/src/value.rs index f6cf0fcd7e..94c790bc5b 100644 --- a/crates/nu-protocol/src/value.rs +++ b/crates/nu-protocol/src/value.rs @@ -672,7 +672,11 @@ impl ShellTypeName for UntaggedValue { UntaggedValue::Error(_) => "error", UntaggedValue::Block(_) => "block", #[cfg(feature = "dataframe")] - UntaggedValue::DataFrame(_) => "dataframe", + UntaggedValue::DataFrame(PolarsData::EagerDataFrame(_)) => "dataframe", + #[cfg(feature = "dataframe")] + UntaggedValue::DataFrame(PolarsData::Series(_)) => "series", + #[cfg(feature = "dataframe")] + UntaggedValue::DataFrame(PolarsData::GroupBy(_)) => "groupby", } } }