Revert "Polars lazy refactor (#12669)" (#12962)

This reverts commit 68adc4657f.

# Description

Reverts the lazyframe refactor (#12669) for the next release, since
there are still a few lingering issues. This temporarily solves #12863
and #12828. After the release, the lazyframes can be added back and
cleaned up.
This commit is contained in:
Ian Manske 2024-05-24 23:09:26 +00:00 committed by GitHub
parent 7d11c28eea
commit 84b7a99adf
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
50 changed files with 1210 additions and 511 deletions

1
Cargo.lock generated
View file

@ -3399,7 +3399,6 @@ dependencies = [
"polars",
"polars-arrow",
"polars-io",
"polars-lazy",
"polars-ops",
"polars-plan",
"polars-utils",

View file

@ -34,7 +34,6 @@ polars-arrow = { version = "0.39"}
polars-ops = { version = "0.39"}
polars-plan = { version = "0.39", features = ["regex"]}
polars-utils = { version = "0.39"}
polars-lazy = { version = "0.39"}
typetag = "0.2"
uuid = { version = "1.7", features = ["v4", "serde"] }

View file

@ -4,6 +4,7 @@ use crate::{
PolarsPlugin,
};
use super::super::values::NuDataFrame;
use nu_plugin::{EngineInterface, EvaluatedCall, PluginCommand};
use nu_protocol::{
record, Category, Example, LabeledError, PipelineData, ShellError, Signature, Span,
@ -66,7 +67,7 @@ impl PluginCommand for CastDF {
Example {
description: "Cast a column in a lazy dataframe to a different dtype",
example:
"[[a b]; [1 2] [3 4]] | polars into-df | polars cast u8 a | polars schema",
"[[a b]; [1 2] [3 4]] | polars into-df | polars into-lazy | polars cast u8 a | polars schema",
result: Some(Value::record(
record! {
"a" => Value::string("u8", Span::test_data()),
@ -98,7 +99,7 @@ impl PluginCommand for CastDF {
}
PolarsPluginObject::NuDataFrame(df) => {
let (dtype, column_nm) = df_args(call)?;
command_lazy(plugin, engine, call, column_nm, dtype, df.lazy())
command_eager(plugin, engine, call, column_nm, dtype, df)
}
PolarsPluginObject::NuExpression(expr) => {
let dtype: String = call.req(0)?;
@ -143,10 +144,51 @@ fn command_lazy(
) -> Result<PipelineData, ShellError> {
let column = col(&column_nm).cast(dtype);
let lazy = lazy.to_polars().with_columns(&[column]);
let lazy = NuLazyFrame::new(lazy);
let lazy = NuLazyFrame::new(false, lazy);
lazy.to_pipeline_data(plugin, engine, call.head)
}
fn command_eager(
plugin: &PolarsPlugin,
engine: &EngineInterface,
call: &EvaluatedCall,
column_nm: String,
dtype: DataType,
nu_df: NuDataFrame,
) -> Result<PipelineData, ShellError> {
let mut df = (*nu_df.df).clone();
let column = df
.column(&column_nm)
.map_err(|e| ShellError::GenericError {
error: format!("{e}"),
msg: "".into(),
span: Some(call.head),
help: None,
inner: vec![],
})?;
let casted = column.cast(&dtype).map_err(|e| ShellError::GenericError {
error: format!("{e}"),
msg: "".into(),
span: Some(call.head),
help: None,
inner: vec![],
})?;
let _ = df
.with_column(casted)
.map_err(|e| ShellError::GenericError {
error: format!("{e}"),
msg: "".into(),
span: Some(call.head),
help: None,
inner: vec![],
})?;
let df = NuDataFrame::new(false, df);
df.to_pipeline_data(plugin, engine, call.head)
}
#[cfg(test)]
mod test {

View file

@ -4,7 +4,7 @@ use nu_protocol::{
Value,
};
use crate::values::{CustomValueSupport, NuLazyFrame};
use crate::values::CustomValueSupport;
use crate::PolarsPlugin;
use super::super::values::utils::convert_columns;
@ -37,7 +37,7 @@ impl PluginCommand for DropDF {
fn examples(&self) -> Vec<Example> {
vec![Example {
description: "drop column a",
example: "[[a b]; [1 2] [3 4]] | polars into-df | polars drop a | polars collect",
example: "[[a b]; [1 2] [3 4]] | polars into-df | polars drop a",
result: Some(
NuDataFrame::try_from_columns(
vec![Column::new(
@ -70,11 +70,46 @@ fn command(
input: PipelineData,
) -> Result<PipelineData, ShellError> {
let columns: Vec<Value> = call.rest(0)?;
let (col_string, _col_span) = convert_columns(columns, call.head)?;
let (col_string, col_span) = convert_columns(columns, call.head)?;
let df = NuLazyFrame::try_from_pipeline_coerce(plugin, input, call.head)?;
let polars_df = df.to_polars().drop(col_string.iter().map(|s| &s.item));
let final_df = NuLazyFrame::new(polars_df);
let df = NuDataFrame::try_from_pipeline_coerce(plugin, input, call.head)?;
let new_df = col_string
.first()
.ok_or_else(|| ShellError::GenericError {
error: "Empty names list".into(),
msg: "No column names were found".into(),
span: Some(col_span),
help: None,
inner: vec![],
})
.and_then(|col| {
df.as_ref()
.drop(&col.item)
.map_err(|e| ShellError::GenericError {
error: "Error dropping column".into(),
msg: e.to_string(),
span: Some(col.span),
help: None,
inner: vec![],
})
})?;
// If there are more columns in the drop selection list, these
// are added from the resulting dataframe
let polars_df = col_string.iter().skip(1).try_fold(new_df, |new_df, col| {
new_df
.drop(&col.item)
.map_err(|e| ShellError::GenericError {
error: "Error dropping column".into(),
msg: e.to_string(),
span: Some(col.span),
help: None,
inner: vec![],
})
})?;
let final_df = NuDataFrame::new(df.from_lazy, polars_df);
final_df.to_pipeline_data(plugin, engine, call.head)
}

View file

@ -5,11 +5,11 @@ use nu_protocol::{
};
use polars::prelude::UniqueKeepStrategy;
use crate::values::{CustomValueSupport, NuDataFrame};
use crate::values::CustomValueSupport;
use crate::PolarsPlugin;
use super::super::values::utils::convert_columns_string;
use super::super::values::{Column, NuLazyFrame};
use super::super::values::{Column, NuDataFrame};
#[derive(Clone)]
pub struct DropDuplicates;
@ -48,7 +48,7 @@ impl PluginCommand for DropDuplicates {
fn examples(&self) -> Vec<Example> {
vec![Example {
description: "drop duplicates",
example: "[[a b]; [1 2] [3 4] [1 2]] | polars into-df | polars drop-duplicates | polars collect",
example: "[[a b]; [1 2] [3 4] [1 2]] | polars into-df | polars drop-duplicates",
result: Some(
NuDataFrame::try_from_columns(
vec![
@ -87,7 +87,7 @@ fn command(
input: PipelineData,
) -> Result<PipelineData, ShellError> {
let columns: Option<Vec<Value>> = call.opt(0)?;
let (subset, _col_span) = match columns {
let (subset, col_span) = match columns {
Some(cols) => {
let (agg_string, col_span) = convert_columns_string(cols, call.head)?;
(Some(agg_string), col_span)
@ -95,7 +95,9 @@ fn command(
None => (None, call.head),
};
let df = NuLazyFrame::try_from_pipeline_coerce(plugin, input, call.head)?;
let df = NuDataFrame::try_from_pipeline_coerce(plugin, input, call.head)?;
let subset_slice = subset.as_ref().map(|cols| &cols[..]);
let keep_strategy = if call.has_flag("last")? {
UniqueKeepStrategy::Last
@ -103,9 +105,18 @@ fn command(
UniqueKeepStrategy::First
};
let polars_df = df.to_polars().unique(subset, keep_strategy);
let polars_df = df
.as_ref()
.unique(subset_slice, keep_strategy, None)
.map_err(|e| ShellError::GenericError {
error: "Error dropping duplicates".into(),
msg: e.to_string(),
span: Some(col_span),
help: None,
inner: vec![],
})?;
let df = NuLazyFrame::new(polars_df);
let df = NuDataFrame::new(df.from_lazy, polars_df);
df.to_pipeline_data(plugin, engine, call.head)
}

View file

@ -4,13 +4,11 @@ use nu_protocol::{
Value,
};
use polars_lazy::dsl::col;
use crate::values::{CustomValueSupport, NuDataFrame};
use crate::values::CustomValueSupport;
use crate::PolarsPlugin;
use super::super::values::utils::convert_columns_string;
use super::super::values::{Column, NuLazyFrame};
use super::super::values::{Column, NuDataFrame};
#[derive(Clone)]
pub struct DropNulls;
@ -45,7 +43,8 @@ impl PluginCommand for DropNulls {
Example {
description: "drop null values in dataframe",
example: r#"let df = ([[a b]; [1 2] [3 0] [1 2]] | polars into-df);
let a = ($df | polars with-column [((polars col b) / (polars col b) | polars as res)]);
let res = ($df.b / $df.b);
let a = ($df | polars with-column $res --name res);
$a | polars drop-nulls"#,
result: Some(
NuDataFrame::try_from_columns(
@ -110,20 +109,31 @@ fn command(
call: &EvaluatedCall,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
let df = NuLazyFrame::try_from_pipeline_coerce(plugin, input, call.head)?;
let df = NuDataFrame::try_from_pipeline_coerce(plugin, input, call.head)?;
let columns: Option<Vec<Value>> = call.opt(0)?;
let (subset, _col_span) = match columns {
let (subset, col_span) = match columns {
Some(cols) => {
let (agg_string, col_span) = convert_columns_string(cols, call.head)?;
let agg_expr = agg_string.iter().map(|s| col(s)).collect();
(Some(agg_expr), col_span)
(Some(agg_string), col_span)
}
None => (None, call.head),
};
let polars_df = df.to_polars().drop_nulls(subset);
let df = NuLazyFrame::new(polars_df);
let subset_slice = subset.as_ref().map(|cols| &cols[..]);
let polars_df = df
.as_ref()
.drop_nulls(subset_slice)
.map_err(|e| ShellError::GenericError {
error: "Error dropping nulls".into(),
msg: e.to_string(),
span: Some(col_span),
help: None,
inner: vec![],
})?;
let df = NuDataFrame::new(df.from_lazy, polars_df);
df.to_pipeline_data(plugin, engine, call.head)
}

View file

@ -0,0 +1,162 @@
use nu_plugin::{EngineInterface, EvaluatedCall, PluginCommand};
use nu_protocol::{
Category, Example, LabeledError, PipelineData, ShellError, Signature, Span, SyntaxShape, Type,
Value,
};
use polars::prelude::LazyFrame;
use crate::{
dataframe::values::{NuExpression, NuLazyFrame},
values::{cant_convert_err, CustomValueSupport, PolarsPluginObject, PolarsPluginType},
PolarsPlugin,
};
use super::super::values::{Column, NuDataFrame};
#[derive(Clone)]
pub struct FilterWith;
impl PluginCommand for FilterWith {
type Plugin = PolarsPlugin;
fn name(&self) -> &str {
"polars filter-with"
}
fn usage(&self) -> &str {
"Filters dataframe using a mask or expression as reference."
}
fn signature(&self) -> Signature {
Signature::build(self.name())
.required(
"mask or expression",
SyntaxShape::Any,
"boolean mask used to filter data",
)
.input_output_type(
Type::Custom("dataframe".into()),
Type::Custom("dataframe".into()),
)
.category(Category::Custom("dataframe or lazyframe".into()))
}
fn examples(&self) -> Vec<Example> {
vec![
Example {
description: "Filter dataframe using a bool mask",
example: r#"let mask = ([true false] | polars into-df);
[[a b]; [1 2] [3 4]] | polars into-df | polars filter-with $mask"#,
result: Some(
NuDataFrame::try_from_columns(
vec![
Column::new("a".to_string(), vec![Value::test_int(1)]),
Column::new("b".to_string(), vec![Value::test_int(2)]),
],
None,
)
.expect("simple df for test should not fail")
.into_value(Span::test_data()),
),
},
Example {
description: "Filter dataframe using an expression",
example: "[[a b]; [1 2] [3 4]] | polars into-df | polars filter-with ((polars col a) > 1)",
result: Some(
NuDataFrame::try_from_columns(
vec![
Column::new("a".to_string(), vec![Value::test_int(3)]),
Column::new("b".to_string(), vec![Value::test_int(4)]),
],
None,
)
.expect("simple df for test should not fail")
.into_value(Span::test_data()),
),
},
]
}
fn run(
&self,
plugin: &Self::Plugin,
engine: &EngineInterface,
call: &EvaluatedCall,
input: PipelineData,
) -> Result<PipelineData, LabeledError> {
let value = input.into_value(call.head)?;
match PolarsPluginObject::try_from_value(plugin, &value)? {
PolarsPluginObject::NuDataFrame(df) => command_eager(plugin, engine, call, df),
PolarsPluginObject::NuLazyFrame(lazy) => command_lazy(plugin, engine, call, lazy),
_ => Err(cant_convert_err(
&value,
&[PolarsPluginType::NuDataFrame, PolarsPluginType::NuLazyFrame],
)),
}
.map_err(LabeledError::from)
}
}
fn command_eager(
plugin: &PolarsPlugin,
engine: &EngineInterface,
call: &EvaluatedCall,
df: NuDataFrame,
) -> Result<PipelineData, ShellError> {
let mask_value: Value = call.req(0)?;
let mask_span = mask_value.span();
if NuExpression::can_downcast(&mask_value) {
let expression = NuExpression::try_from_value(plugin, &mask_value)?;
let lazy = df.lazy();
let lazy = lazy.apply_with_expr(expression, LazyFrame::filter);
lazy.to_pipeline_data(plugin, engine, call.head)
} else {
let mask = NuDataFrame::try_from_value_coerce(plugin, &mask_value, mask_span)?
.as_series(mask_span)?;
let mask = mask.bool().map_err(|e| ShellError::GenericError {
error: "Error casting to bool".into(),
msg: e.to_string(),
span: Some(mask_span),
help: Some("Perhaps you want to use a series with booleans as mask".into()),
inner: vec![],
})?;
let polars_df = df
.as_ref()
.filter(mask)
.map_err(|e| ShellError::GenericError {
error: "Error filtering dataframe".into(),
msg: e.to_string(),
span: Some(call.head),
help: Some("The only allowed column types for dummies are String or Int".into()),
inner: vec![],
})?;
let df = NuDataFrame::new(df.from_lazy, polars_df);
df.to_pipeline_data(plugin, engine, call.head)
}
}
fn command_lazy(
plugin: &PolarsPlugin,
engine: &EngineInterface,
call: &EvaluatedCall,
lazy: NuLazyFrame,
) -> Result<PipelineData, ShellError> {
let expr: Value = call.req(0)?;
let expr = NuExpression::try_from_value(plugin, &expr)?;
let lazy = lazy.apply_with_expr(expr, LazyFrame::filter);
lazy.to_pipeline_data(plugin, engine, call.head)
}
#[cfg(test)]
mod test {
use super::*;
use crate::test::test_polars_plugin_command;
#[test]
fn test_examples() -> Result<(), ShellError> {
test_polars_plugin_command(&FilterWith)
}
}

View file

@ -48,7 +48,7 @@ impl PluginCommand for FirstDF {
vec![
Example {
description: "Return the first row of a dataframe",
example: "[[a b]; [1 2] [3 4]] | polars into-df | polars first | polars collect",
example: "[[a b]; [1 2] [3 4]] | polars into-df | polars first",
result: Some(
NuDataFrame::try_from_columns(
vec![
@ -63,7 +63,7 @@ impl PluginCommand for FirstDF {
},
Example {
description: "Return the first two rows of a dataframe",
example: "[[a b]; [1 2] [3 4]] | polars into-df | polars first 2 | polars collect",
example: "[[a b]; [1 2] [3 4]] | polars into-df | polars first 2",
result: Some(
NuDataFrame::try_from_columns(
vec![
@ -98,12 +98,13 @@ impl PluginCommand for FirstDF {
input: PipelineData,
) -> Result<PipelineData, LabeledError> {
let value = input.into_value(call.head)?;
if NuLazyFrame::can_downcast(&value) || NuDataFrame::can_downcast(&value) {
let lazy = NuLazyFrame::try_from_value_coerce(plugin, &value)?;
command(plugin, engine, call, lazy).map_err(LabeledError::from)
if NuDataFrame::can_downcast(&value) || NuLazyFrame::can_downcast(&value) {
let df = NuDataFrame::try_from_value_coerce(plugin, &value, call.head)?;
command(plugin, engine, call, df).map_err(|e| e.into())
} else {
let expr = NuExpression::try_from_value(plugin, &value)?;
let expr: NuExpression = expr.into_polars().first().into();
expr.to_pipeline_data(plugin, engine, call.head)
.map_err(LabeledError::from)
}
@ -114,13 +115,13 @@ fn command(
plugin: &PolarsPlugin,
engine: &EngineInterface,
call: &EvaluatedCall,
df: NuLazyFrame,
df: NuDataFrame,
) -> Result<PipelineData, ShellError> {
let rows: Option<usize> = call.opt(0)?;
let rows = rows.unwrap_or(1);
let res = df.to_polars().slice(0, rows as u32);
let res: NuLazyFrame = res.into();
let res = df.as_ref().head(Some(rows));
let res = NuDataFrame::new(false, res);
res.to_pipeline_data(plugin, engine, call.head)
}

View file

@ -5,13 +5,10 @@ use nu_protocol::{
};
use crate::{
dataframe::values::utils::convert_columns_string,
values::{CustomValueSupport, NuDataFrame},
PolarsPlugin,
dataframe::values::utils::convert_columns_string, values::CustomValueSupport, PolarsPlugin,
};
use super::super::values::{Column, NuLazyFrame};
use polars::prelude::{col, Expr};
use super::super::values::{Column, NuDataFrame};
#[derive(Clone)]
pub struct GetDF;
@ -40,7 +37,7 @@ impl PluginCommand for GetDF {
fn examples(&self) -> Vec<Example> {
vec![Example {
description: "Returns the selected column",
example: "[[a b]; [1 2] [3 4]] | polars into-df | polars get a | polars collect",
example: "[[a b]; [1 2] [3 4]] | polars into-df | polars get a",
result: Some(
NuDataFrame::try_from_columns(
vec![Column::new(
@ -73,13 +70,21 @@ fn command(
input: PipelineData,
) -> Result<PipelineData, ShellError> {
let columns: Vec<Value> = call.rest(0)?;
let (col_string, _col_span) = convert_columns_string(columns, call.head)?;
let col_expr: Vec<Expr> = col_string.iter().map(|s| col(s)).collect();
let (col_string, col_span) = convert_columns_string(columns, call.head)?;
let df = NuLazyFrame::try_from_pipeline_coerce(plugin, input, call.head)?;
let df = NuDataFrame::try_from_pipeline_coerce(plugin, input, call.head)?;
let df = df.to_polars().select(col_expr);
let df = NuLazyFrame::new(df);
let df = df
.as_ref()
.select(col_string)
.map_err(|e| ShellError::GenericError {
error: "Error selecting columns".into(),
msg: e.to_string(),
span: Some(col_span),
help: None,
inner: vec![],
})?;
let df = NuDataFrame::new(false, df);
df.to_pipeline_data(plugin, engine, call.head)
}

View file

@ -3,7 +3,7 @@ use crate::{
PolarsPlugin,
};
use super::super::values::{NuDataFrame, NuExpression};
use super::super::values::{utils::DEFAULT_ROWS, NuDataFrame, NuExpression};
use nu_plugin::{EngineInterface, EvaluatedCall, PluginCommand};
use nu_protocol::{
Category, Example, LabeledError, PipelineData, ShellError, Signature, Span, SyntaxShape, Type,
@ -44,7 +44,7 @@ impl PluginCommand for LastDF {
vec![
Example {
description: "Create new dataframe with last rows",
example: "[[a b]; [1 2] [3 4]] | polars into-df | polars last | polars collect",
example: "[[a b]; [1 2] [3 4]] | polars into-df | polars last 1",
result: Some(
NuDataFrame::try_from_columns(
vec![
@ -74,7 +74,7 @@ impl PluginCommand for LastDF {
) -> Result<PipelineData, LabeledError> {
let value = input.into_value(call.head)?;
if NuDataFrame::can_downcast(&value) || NuLazyFrame::can_downcast(&value) {
let df = NuLazyFrame::try_from_value_coerce(plugin, &value)?;
let df = NuDataFrame::try_from_value_coerce(plugin, &value, call.head)?;
command(plugin, engine, call, df).map_err(|e| e.into())
} else {
let expr = NuExpression::try_from_value(plugin, &value)?;
@ -90,13 +90,13 @@ fn command(
plugin: &PolarsPlugin,
engine: &EngineInterface,
call: &EvaluatedCall,
df: NuLazyFrame,
df: NuDataFrame,
) -> Result<PipelineData, ShellError> {
let rows: Option<usize> = call.opt(0)?;
let rows = rows.unwrap_or(1);
let rows = rows.unwrap_or(DEFAULT_ROWS);
let res = df.to_polars().tail(rows as u32);
let res = NuLazyFrame::new(res);
let res = df.as_ref().tail(Some(rows));
let res = NuDataFrame::new(false, res);
res.to_pipeline_data(plugin, engine, call.head)
}

View file

@ -1,14 +1,11 @@
use nu_plugin::{EngineInterface, EvaluatedCall, PluginCommand};
use nu_protocol::{
Category, Example, LabeledError, PipelineData, ShellError, Signature, Span, SyntaxShape, Type,
Value,
Category, Example, LabeledError, PipelineData, ShellError, Signature, Span, Spanned,
SyntaxShape, Type, Value,
};
use polars::frame::explode::MeltArgs;
use crate::{
dataframe::values::utils::convert_columns_string,
values::{CustomValueSupport, NuLazyFrame},
PolarsPlugin,
dataframe::values::utils::convert_columns_string, values::CustomValueSupport, PolarsPlugin,
};
use super::super::values::{Column, NuDataFrame};
@ -53,11 +50,6 @@ impl PluginCommand for MeltDF {
"optional name for value column",
Some('l'),
)
.switch(
"streamable",
"Use polar's streaming engine. Results will not have a stable ordering.",
Some('s'),
)
.input_output_type(
Type::Custom("dataframe".into()),
Type::Custom("dataframe".into()),
@ -69,7 +61,7 @@ impl PluginCommand for MeltDF {
vec![Example {
description: "melt dataframe",
example:
"[[a b c d]; [x 1 4 a] [y 2 5 b] [z 3 6 c]] | polars into-df | polars melt -c [b c] -v [a d] | polars collect",
"[[a b c d]; [x 1 4 a] [y 2 5 b] [z 3 6 c]] | polars into-df | polars melt -c [b c] -v [a d]",
result: Some(
NuDataFrame::try_from_columns(vec![
Column::new(
@ -143,31 +135,111 @@ fn command(
let id_col: Vec<Value> = call.get_flag("columns")?.expect("required value");
let val_col: Vec<Value> = call.get_flag("values")?.expect("required value");
let value_name = call.get_flag("value-name")?.map(|v: String| v.into());
let variable_name = call.get_flag("variable-name")?.map(|v: String| v.into());
let streamable = call.has_flag("streamable")?;
let value_name: Option<Spanned<String>> = call.get_flag("value-name")?;
let variable_name: Option<Spanned<String>> = call.get_flag("variable-name")?;
let (id_vars, _id_col_span) = convert_columns_string(id_col, call.head)?;
let id_vars = id_vars.into_iter().map(Into::into).collect();
let (value_vars, _val_col_span) = convert_columns_string(val_col, call.head)?;
let value_vars = value_vars.into_iter().map(Into::into).collect();
let (id_col_string, id_col_span) = convert_columns_string(id_col, call.head)?;
let (val_col_string, val_col_span) = convert_columns_string(val_col, call.head)?;
let df = NuLazyFrame::try_from_pipeline_coerce(plugin, input, call.head)?;
let polars_df = df.to_polars();
let df = NuDataFrame::try_from_pipeline_coerce(plugin, input, call.head)?;
let args = MeltArgs {
id_vars,
value_vars,
variable_name,
value_name,
streamable,
};
check_column_datatypes(df.as_ref(), &id_col_string, id_col_span)?;
check_column_datatypes(df.as_ref(), &val_col_string, val_col_span)?;
let res = polars_df.melt(args);
let res = NuLazyFrame::new(res);
let mut res = df
.as_ref()
.melt(&id_col_string, &val_col_string)
.map_err(|e| ShellError::GenericError {
error: "Error calculating melt".into(),
msg: e.to_string(),
span: Some(call.head),
help: None,
inner: vec![],
})?;
if let Some(name) = &variable_name {
res.rename("variable", &name.item)
.map_err(|e| ShellError::GenericError {
error: "Error renaming column".into(),
msg: e.to_string(),
span: Some(name.span),
help: None,
inner: vec![],
})?;
}
if let Some(name) = &value_name {
res.rename("value", &name.item)
.map_err(|e| ShellError::GenericError {
error: "Error renaming column".into(),
msg: e.to_string(),
span: Some(name.span),
help: None,
inner: vec![],
})?;
}
let res = NuDataFrame::new(false, res);
res.to_pipeline_data(plugin, engine, call.head)
}
fn check_column_datatypes<T: AsRef<str>>(
df: &polars::prelude::DataFrame,
cols: &[T],
col_span: Span,
) -> Result<(), ShellError> {
if cols.is_empty() {
return Err(ShellError::GenericError {
error: "Merge error".into(),
msg: "empty column list".into(),
span: Some(col_span),
help: None,
inner: vec![],
});
}
// Checking if they are same type
if cols.len() > 1 {
for w in cols.windows(2) {
let l_series = df
.column(w[0].as_ref())
.map_err(|e| ShellError::GenericError {
error: "Error selecting columns".into(),
msg: e.to_string(),
span: Some(col_span),
help: None,
inner: vec![],
})?;
let r_series = df
.column(w[1].as_ref())
.map_err(|e| ShellError::GenericError {
error: "Error selecting columns".into(),
msg: e.to_string(),
span: Some(col_span),
help: None,
inner: vec![],
})?;
if l_series.dtype() != r_series.dtype() {
return Err(ShellError::GenericError {
error: "Merge error".into(),
msg: "found different column types in list".into(),
span: Some(col_span),
help: Some(format!(
"datatypes {} and {} are incompatible",
l_series.dtype(),
r_series.dtype()
)),
inner: vec![],
});
}
}
}
Ok(())
}
#[cfg(test)]
mod test {
use crate::test::test_polars_plugin_command;

View file

@ -1,11 +1,22 @@
mod append;
mod cast;
mod columns;
mod drop;
mod drop_duplicates;
mod drop_nulls;
mod dummies;
mod filter_with;
mod first;
mod get;
mod last;
mod melt;
mod open;
mod query_df;
mod rename;
mod sample;
mod schema;
mod shape;
mod slice;
mod sql_context;
mod sql_expr;
mod summary;
@ -17,18 +28,30 @@ mod to_df;
mod to_json_lines;
mod to_nu;
mod to_parquet;
mod with_column;
use crate::PolarsPlugin;
pub use self::open::OpenDataFrame;
pub use append::AppendDF;
pub use cast::CastDF;
pub use columns::ColumnsDF;
pub use drop::DropDF;
pub use drop_duplicates::DropDuplicates;
pub use drop_nulls::DropNulls;
pub use dummies::Dummies;
pub use filter_with::FilterWith;
pub use first::FirstDF;
pub use get::GetDF;
pub use last::LastDF;
pub use melt::MeltDF;
use nu_plugin::PluginCommand;
pub use query_df::QueryDf;
pub use rename::RenameDF;
pub use sample::SampleDF;
pub use schema::SchemaCmd;
pub use shape::ShapeDF;
pub use slice::SliceDF;
pub use sql_context::SQLContext;
pub use summary::Summary;
pub use take::TakeDF;
@ -39,16 +62,28 @@ pub use to_df::ToDataFrame;
pub use to_json_lines::ToJsonLines;
pub use to_nu::ToNu;
pub use to_parquet::ToParquet;
pub use with_column::WithColumn;
pub(crate) fn eager_commands() -> Vec<Box<dyn PluginCommand<Plugin = PolarsPlugin>>> {
vec![
Box::new(AppendDF),
Box::new(CastDF),
Box::new(ColumnsDF),
Box::new(DropDF),
Box::new(DropDuplicates),
Box::new(DropNulls),
Box::new(Dummies),
Box::new(FilterWith),
Box::new(GetDF),
Box::new(OpenDataFrame),
Box::new(MeltDF),
Box::new(Summary),
Box::new(FirstDF),
Box::new(LastDF),
Box::new(RenameDF),
Box::new(SampleDF),
Box::new(ShapeDF),
Box::new(SliceDF),
Box::new(SchemaCmd),
Box::new(TakeDF),
Box::new(ToNu),
@ -59,5 +94,6 @@ pub(crate) fn eager_commands() -> Vec<Box<dyn PluginCommand<Plugin = PolarsPlugi
Box::new(ToJsonLines),
Box::new(ToParquet),
Box::new(QueryDf),
Box::new(WithColumn),
]
}

View file

@ -4,7 +4,6 @@ use crate::{
PolarsPlugin,
};
use nu_path::expand_path_with;
use polars_lazy::frame::LazyJsonLineReader;
use super::super::values::NuDataFrame;
use nu_plugin::PluginCommand;
@ -20,8 +19,8 @@ use std::{
};
use polars::prelude::{
col, Expr, JsonReader, LazyCsvReader, LazyFileListReader, LazyFrame, ScanArgsIpc,
ScanArgsParquet, SerReader,
CsvEncoding, CsvReader, IpcReader, JsonFormat, JsonReader, LazyCsvReader, LazyFileListReader,
LazyFrame, ParquetReader, ScanArgsIpc, ScanArgsParquet, SerReader,
};
use polars_io::{avro::AvroReader, prelude::ParallelStrategy, HiveOptions};
@ -47,6 +46,7 @@ impl PluginCommand for OpenDataFrame {
SyntaxShape::Filepath,
"file path to load values from",
)
.switch("lazy", "creates a lazy dataframe", Some('l'))
.named(
"type",
SyntaxShape::String,
@ -161,39 +161,63 @@ fn from_parquet(
engine: &nu_plugin::EngineInterface,
call: &nu_plugin::EvaluatedCall,
file_path: &Path,
_file_span: Span,
file_span: Span,
) -> Result<Value, ShellError> {
let args = ScanArgsParquet {
n_rows: None,
cache: true,
parallel: ParallelStrategy::Auto,
rechunk: false,
row_index: None,
low_memory: false,
cloud_options: None,
use_statistics: false,
hive_options: HiveOptions::default(),
};
if call.has_flag("lazy")? {
let file: String = call.req(0)?;
let args = ScanArgsParquet {
n_rows: None,
cache: true,
parallel: ParallelStrategy::Auto,
rechunk: false,
row_index: None,
low_memory: false,
cloud_options: None,
use_statistics: false,
hive_options: HiveOptions::default(),
};
let maybe_columns: Option<Vec<Expr>> = call
.get_flag::<Vec<String>>("columns")?
.map(|cols| cols.iter().map(|s| col(s)).collect());
let df: NuLazyFrame = LazyFrame::scan_parquet(file, args)
.map_err(|e| ShellError::GenericError {
error: "Parquet reader error".into(),
msg: format!("{e:?}"),
span: Some(call.head),
help: None,
inner: vec![],
})?
.into();
let mut polars_df =
LazyFrame::scan_parquet(file_path, args).map_err(|e| ShellError::GenericError {
error: "Parquet reader error".into(),
msg: format!("{e:?}"),
span: Some(call.head),
df.cache_and_to_value(plugin, engine, call.head)
} else {
let columns: Option<Vec<String>> = call.get_flag("columns")?;
let r = File::open(file_path).map_err(|e| ShellError::GenericError {
error: "Error opening file".into(),
msg: e.to_string(),
span: Some(file_span),
help: None,
inner: vec![],
})?;
let reader = ParquetReader::new(r);
if let Some(columns) = maybe_columns {
polars_df = polars_df.select(columns);
let reader = match columns {
None => reader,
Some(columns) => reader.with_columns(Some(columns)),
};
let df: NuDataFrame = reader
.finish()
.map_err(|e| ShellError::GenericError {
error: "Parquet reader error".into(),
msg: format!("{e:?}"),
span: Some(call.head),
help: None,
inner: vec![],
})?
.into();
df.cache_and_to_value(plugin, engine, call.head)
}
let df: NuLazyFrame = polars_df.into();
df.cache_and_to_value(plugin, engine, call.head)
}
fn from_avro(
@ -238,36 +262,60 @@ fn from_ipc(
engine: &nu_plugin::EngineInterface,
call: &nu_plugin::EvaluatedCall,
file_path: &Path,
_file_span: Span,
file_span: Span,
) -> Result<Value, ShellError> {
let args = ScanArgsIpc {
n_rows: None,
cache: true,
rechunk: false,
row_index: None,
memory_map: true,
cloud_options: None,
};
if call.has_flag("lazy")? {
let file: String = call.req(0)?;
let args = ScanArgsIpc {
n_rows: None,
cache: true,
rechunk: false,
row_index: None,
memory_map: true,
cloud_options: None,
};
let maybe_columns: Option<Vec<Expr>> = call
.get_flag::<Vec<String>>("columns")?
.map(|cols| cols.iter().map(|s| col(s)).collect());
let df: NuLazyFrame = LazyFrame::scan_ipc(file, args)
.map_err(|e| ShellError::GenericError {
error: "IPC reader error".into(),
msg: format!("{e:?}"),
span: Some(call.head),
help: None,
inner: vec![],
})?
.into();
let mut polars_df =
LazyFrame::scan_ipc(file_path, args).map_err(|e| ShellError::GenericError {
error: "IPC reader error".into(),
msg: format!("{e:?}"),
span: Some(call.head),
df.cache_and_to_value(plugin, engine, call.head)
} else {
let columns: Option<Vec<String>> = call.get_flag("columns")?;
let r = File::open(file_path).map_err(|e| ShellError::GenericError {
error: "Error opening file".into(),
msg: e.to_string(),
span: Some(file_span),
help: None,
inner: vec![],
})?;
let reader = IpcReader::new(r);
if let Some(columns) = maybe_columns {
polars_df = polars_df.select(columns);
let reader = match columns {
None => reader,
Some(columns) => reader.with_columns(Some(columns)),
};
let df: NuDataFrame = reader
.finish()
.map_err(|e| ShellError::GenericError {
error: "IPC reader error".into(),
msg: format!("{e:?}"),
span: Some(call.head),
help: None,
inner: vec![],
})?
.into();
df.cache_and_to_value(plugin, engine, call.head)
}
let df: NuLazyFrame = polars_df.into();
df.cache_and_to_value(plugin, engine, call.head)
}
fn from_json(
@ -316,21 +364,32 @@ fn from_jsonl(
engine: &nu_plugin::EngineInterface,
call: &nu_plugin::EvaluatedCall,
file_path: &Path,
_file_span: Span,
file_span: Span,
) -> Result<Value, ShellError> {
let infer_schema: Option<usize> = call.get_flag("infer-schema")?;
let maybe_schema = call
.get_flag("schema")?
.map(|schema| NuSchema::try_from(&schema))
.transpose()?;
let file = File::open(file_path).map_err(|e| ShellError::GenericError {
error: "Error opening file".into(),
msg: e.to_string(),
span: Some(file_span),
help: None,
inner: vec![],
})?;
let maybe_columns: Option<Vec<Expr>> = call
.get_flag::<Vec<String>>("columns")?
.map(|cols| cols.iter().map(|s| col(s)).collect());
let buf_reader = BufReader::new(file);
let reader = JsonReader::new(buf_reader)
.with_json_format(JsonFormat::JsonLines)
.infer_schema_len(infer_schema);
let mut polars_df = LazyJsonLineReader::new(file_path)
.with_infer_schema_length(infer_schema)
.with_schema(maybe_schema.map(|s| s.into()))
let reader = match maybe_schema {
Some(schema) => reader.with_schema(schema.into()),
None => reader,
};
let df: NuDataFrame = reader
.finish()
.map_err(|e| ShellError::GenericError {
error: "Json lines reader error".into(),
@ -338,13 +397,9 @@ fn from_jsonl(
span: Some(call.head),
help: None,
inner: vec![],
})?;
})?
.into();
if let Some(columns) = maybe_columns {
polars_df = polars_df.select(columns);
}
let df: NuLazyFrame = polars_df.into();
df.cache_and_to_value(plugin, engine, call.head)
}
@ -353,73 +408,137 @@ fn from_csv(
engine: &nu_plugin::EngineInterface,
call: &nu_plugin::EvaluatedCall,
file_path: &Path,
_file_span: Span,
file_span: Span,
) -> Result<Value, ShellError> {
let delimiter: Option<Spanned<String>> = call.get_flag("delimiter")?;
let no_header: bool = call.has_flag("no-header")?;
let infer_schema: Option<usize> = call.get_flag("infer-schema")?;
let skip_rows: Option<usize> = call.get_flag("skip-rows")?;
let maybe_columns: Option<Vec<Expr>> = call
.get_flag::<Vec<String>>("columns")?
.map(|cols| cols.iter().map(|s| col(s)).collect());
let columns: Option<Vec<String>> = call.get_flag("columns")?;
let maybe_schema = call
.get_flag("schema")?
.map(|schema| NuSchema::try_from(&schema))
.transpose()?;
let csv_reader = LazyCsvReader::new(file_path);
if call.has_flag("lazy")? {
let csv_reader = LazyCsvReader::new(file_path);
let csv_reader = match delimiter {
None => csv_reader,
Some(d) => {
if d.item.len() != 1 {
return Err(ShellError::GenericError {
error: "Incorrect delimiter".into(),
msg: "Delimiter has to be one character".into(),
span: Some(d.span),
help: None,
inner: vec![],
});
} else {
let delimiter = match d.item.chars().next() {
Some(d) => d as u8,
None => unreachable!(),
};
csv_reader.with_separator(delimiter)
let csv_reader = match delimiter {
None => csv_reader,
Some(d) => {
if d.item.len() != 1 {
return Err(ShellError::GenericError {
error: "Incorrect delimiter".into(),
msg: "Delimiter has to be one character".into(),
span: Some(d.span),
help: None,
inner: vec![],
});
} else {
let delimiter = match d.item.chars().next() {
Some(d) => d as u8,
None => unreachable!(),
};
csv_reader.with_separator(delimiter)
}
}
}
};
};
let csv_reader = csv_reader.has_header(!no_header);
let csv_reader = csv_reader.has_header(!no_header);
let csv_reader = match maybe_schema {
Some(schema) => csv_reader.with_schema(Some(schema.into())),
None => csv_reader,
};
let csv_reader = match maybe_schema {
Some(schema) => csv_reader.with_schema(Some(schema.into())),
None => csv_reader,
};
let csv_reader = match infer_schema {
None => csv_reader,
Some(r) => csv_reader.with_infer_schema_length(Some(r)),
};
let csv_reader = match infer_schema {
None => csv_reader,
Some(r) => csv_reader.with_infer_schema_length(Some(r)),
};
let csv_reader = match skip_rows {
None => csv_reader,
Some(r) => csv_reader.with_skip_rows(r),
};
let csv_reader = match skip_rows {
None => csv_reader,
Some(r) => csv_reader.with_skip_rows(r),
};
let mut polars_df = csv_reader.finish().map_err(|e| ShellError::GenericError {
error: "CSV reader error".into(),
msg: format!("{e:?}"),
span: Some(call.head),
help: None,
inner: vec![],
})?;
let df: NuLazyFrame = csv_reader
.finish()
.map_err(|e| ShellError::GenericError {
error: "Parquet reader error".into(),
msg: format!("{e:?}"),
span: Some(call.head),
help: None,
inner: vec![],
})?
.into();
if let Some(columns) = maybe_columns {
polars_df = polars_df.select(columns);
df.cache_and_to_value(plugin, engine, call.head)
} else {
let csv_reader = CsvReader::from_path(file_path)
.map_err(|e| ShellError::GenericError {
error: "Error creating CSV reader".into(),
msg: e.to_string(),
span: Some(file_span),
help: None,
inner: vec![],
})?
.with_encoding(CsvEncoding::LossyUtf8);
let csv_reader = match delimiter {
None => csv_reader,
Some(d) => {
if d.item.len() != 1 {
return Err(ShellError::GenericError {
error: "Incorrect delimiter".into(),
msg: "Delimiter has to be one character".into(),
span: Some(d.span),
help: None,
inner: vec![],
});
} else {
let delimiter = match d.item.chars().next() {
Some(d) => d as u8,
None => unreachable!(),
};
csv_reader.with_separator(delimiter)
}
}
};
let csv_reader = csv_reader.has_header(!no_header);
let csv_reader = match maybe_schema {
Some(schema) => csv_reader.with_schema(Some(schema.into())),
None => csv_reader,
};
let csv_reader = match infer_schema {
None => csv_reader,
Some(r) => csv_reader.infer_schema(Some(r)),
};
let csv_reader = match skip_rows {
None => csv_reader,
Some(r) => csv_reader.with_skip_rows(r),
};
let csv_reader = match columns {
None => csv_reader,
Some(columns) => csv_reader.with_columns(Some(columns)),
};
let df: NuDataFrame = csv_reader
.finish()
.map_err(|e| ShellError::GenericError {
error: "Parquet reader error".into(),
msg: format!("{e:?}"),
span: Some(call.head),
help: None,
inner: vec![],
})?
.into();
df.cache_and_to_value(plugin, engine, call.head)
}
let df: NuLazyFrame = polars_df.into();
df.cache_and_to_value(plugin, engine, call.head)
}

View file

@ -91,7 +91,7 @@ fn command(
help: None,
inner: vec![],
})?;
let lazy = NuLazyFrame::new(df_sql);
let lazy = NuLazyFrame::new(!df.from_lazy, df_sql);
lazy.to_pipeline_data(plugin, engine, call.head)
}

View file

@ -6,7 +6,7 @@ use nu_protocol::{
use crate::{
dataframe::{utils::extract_strings, values::NuLazyFrame},
values::CustomValueSupport,
values::{CustomValueSupport, PolarsPluginObject},
PolarsPlugin,
};
@ -49,7 +49,7 @@ impl PluginCommand for RenameDF {
vec![
Example {
description: "Renames a series",
example: "[5 6 7 8] | polars into-df | polars rename '0' new_name | polars collect",
example: "[5 6 7 8] | polars into-df | polars rename '0' new_name",
result: Some(
NuDataFrame::try_from_columns(
vec![Column::new(
@ -69,7 +69,7 @@ impl PluginCommand for RenameDF {
},
Example {
description: "Renames a dataframe column",
example: "[[a b]; [1 2] [3 4]] | polars into-df | polars rename a a_new | polars collect",
example: "[[a b]; [1 2] [3 4]] | polars into-df | polars rename a a_new",
result: Some(
NuDataFrame::try_from_columns(
vec![
@ -91,7 +91,7 @@ impl PluginCommand for RenameDF {
Example {
description: "Renames two dataframe columns",
example:
"[[a b]; [1 2] [3 4]] | polars into-df | polars rename [a b] [a_new b_new] | polars collect",
"[[a b]; [1 2] [3 4]] | polars into-df | polars rename [a b] [a_new b_new]",
result: Some(
NuDataFrame::try_from_columns(
vec![
@ -121,11 +121,49 @@ impl PluginCommand for RenameDF {
input: PipelineData,
) -> Result<PipelineData, LabeledError> {
let value = input.into_value(call.head)?;
let lazy = NuLazyFrame::try_from_value_coerce(plugin, &value)?;
command_lazy(plugin, engine, call, lazy).map_err(LabeledError::from)
match PolarsPluginObject::try_from_value(plugin, &value).map_err(LabeledError::from)? {
PolarsPluginObject::NuDataFrame(df) => {
command_eager(plugin, engine, call, df).map_err(LabeledError::from)
}
PolarsPluginObject::NuLazyFrame(lazy) => {
command_lazy(plugin, engine, call, lazy).map_err(LabeledError::from)
}
_ => Err(LabeledError::new(format!("Unsupported type: {value:?}"))
.with_label("Unsupported Type", call.head)),
}
}
}
fn command_eager(
plugin: &PolarsPlugin,
engine: &EngineInterface,
call: &EvaluatedCall,
df: NuDataFrame,
) -> Result<PipelineData, ShellError> {
let columns: Value = call.req(0)?;
let columns = extract_strings(columns)?;
let new_names: Value = call.req(1)?;
let new_names = extract_strings(new_names)?;
let mut polars_df = df.to_polars();
for (from, to) in columns.iter().zip(new_names.iter()) {
polars_df
.rename(from, to)
.map_err(|e| ShellError::GenericError {
error: "Error renaming".into(),
msg: e.to_string(),
span: Some(call.head),
help: None,
inner: vec![],
})?;
}
let df = NuDataFrame::new(false, polars_df);
df.to_pipeline_data(plugin, engine, call.head)
}
fn command_lazy(
plugin: &PolarsPlugin,
engine: &EngineInterface,

View file

@ -147,7 +147,7 @@ fn command(
inner: vec![],
}),
};
let df = NuDataFrame::new(df?);
let df = NuDataFrame::new(false, df?);
df.to_pipeline_data(plugin, engine, call.head)
}

View file

@ -4,11 +4,7 @@ use nu_protocol::{
Value,
};
use crate::{
dataframe::values::Column,
values::{CustomValueSupport, NuLazyFrame},
PolarsPlugin,
};
use crate::{dataframe::values::Column, values::CustomValueSupport, PolarsPlugin};
use super::super::values::NuDataFrame;
@ -40,7 +36,7 @@ impl PluginCommand for SliceDF {
fn examples(&self) -> Vec<Example> {
vec![Example {
description: "Create new dataframe from a slice of the rows",
example: "[[a b]; [1 2] [3 4]] | polars into-df | polars slice 0 1 | polars collect",
example: "[[a b]; [1 2] [3 4]] | polars into-df | polars slice 0 1",
result: Some(
NuDataFrame::try_from_columns(
vec![
@ -73,12 +69,12 @@ fn command(
input: PipelineData,
) -> Result<PipelineData, ShellError> {
let offset: i64 = call.req(0)?;
let size: i64 = call.req(1)?;
let size: usize = call.req(1)?;
let df = NuLazyFrame::try_from_pipeline_coerce(plugin, input, call.head)?;
let df = NuDataFrame::try_from_pipeline_coerce(plugin, input, call.head)?;
let res = df.to_polars().slice(offset, size as u32);
let res = NuLazyFrame::new(res);
let res = df.as_ref().slice(offset, size);
let res = NuDataFrame::new(false, res);
res.to_pipeline_data(plugin, engine, call.head)
}

View file

@ -272,7 +272,7 @@ fn command(
inner: vec![],
})?;
let df = NuDataFrame::new(polars_df);
let df = NuDataFrame::new(df.from_lazy, polars_df);
df.to_pipeline_data(plugin, engine, call.head)
}

View file

@ -143,7 +143,7 @@ fn command(
inner: vec![],
})?;
let df = NuDataFrame::new(polars_df);
let df = NuDataFrame::new(df.from_lazy, polars_df);
df.to_pipeline_data(plugin, engine, call.head)
}

View file

@ -0,0 +1,196 @@
use super::super::values::{Column, NuDataFrame};
use crate::{
dataframe::values::{NuExpression, NuLazyFrame},
values::{CustomValueSupport, PolarsPluginObject},
PolarsPlugin,
};
use nu_plugin::{EngineInterface, EvaluatedCall, PluginCommand};
use nu_protocol::{
Category, Example, LabeledError, PipelineData, ShellError, Signature, Span, SyntaxShape, Type,
Value,
};
#[derive(Clone)]
pub struct WithColumn;
impl PluginCommand for WithColumn {
type Plugin = PolarsPlugin;
fn name(&self) -> &str {
"polars with-column"
}
fn usage(&self) -> &str {
"Adds a series to the dataframe."
}
fn signature(&self) -> Signature {
Signature::build(self.name())
.named("name", SyntaxShape::String, "new column name", Some('n'))
.rest(
"series or expressions",
SyntaxShape::Any,
"series to be added or expressions used to define the new columns",
)
.input_output_type(
Type::Custom("dataframe".into()),
Type::Custom("dataframe".into()),
)
.category(Category::Custom("dataframe or lazyframe".into()))
}
fn examples(&self) -> Vec<Example> {
vec![
Example {
description: "Adds a series to the dataframe",
example: r#"[[a b]; [1 2] [3 4]]
| polars into-df
| polars with-column ([5 6] | polars into-df) --name c"#,
result: Some(
NuDataFrame::try_from_columns(
vec![
Column::new(
"a".to_string(),
vec![Value::test_int(1), Value::test_int(3)],
),
Column::new(
"b".to_string(),
vec![Value::test_int(2), Value::test_int(4)],
),
Column::new(
"c".to_string(),
vec![Value::test_int(5), Value::test_int(6)],
),
],
None,
)
.expect("simple df for test should not fail")
.into_value(Span::test_data()),
),
},
Example {
description: "Adds a series to the dataframe",
example: r#"[[a b]; [1 2] [3 4]]
| polars into-lazy
| polars with-column [
((polars col a) * 2 | polars as "c")
((polars col a) * 3 | polars as "d")
]
| polars collect"#,
result: Some(
NuDataFrame::try_from_columns(
vec![
Column::new(
"a".to_string(),
vec![Value::test_int(1), Value::test_int(3)],
),
Column::new(
"b".to_string(),
vec![Value::test_int(2), Value::test_int(4)],
),
Column::new(
"c".to_string(),
vec![Value::test_int(2), Value::test_int(6)],
),
Column::new(
"d".to_string(),
vec![Value::test_int(3), Value::test_int(9)],
),
],
None,
)
.expect("simple df for test should not fail")
.into_value(Span::test_data()),
),
},
]
}
fn run(
&self,
plugin: &Self::Plugin,
engine: &EngineInterface,
call: &EvaluatedCall,
input: PipelineData,
) -> Result<PipelineData, LabeledError> {
let value = input.into_value(call.head)?;
match PolarsPluginObject::try_from_value(plugin, &value)? {
PolarsPluginObject::NuDataFrame(df) => command_eager(plugin, engine, call, df),
PolarsPluginObject::NuLazyFrame(lazy) => command_lazy(plugin, engine, call, lazy),
_ => Err(ShellError::CantConvert {
to_type: "lazy or eager dataframe".into(),
from_type: value.get_type().to_string(),
span: value.span(),
help: None,
}),
}
.map_err(LabeledError::from)
}
}
fn command_eager(
plugin: &PolarsPlugin,
engine: &EngineInterface,
call: &EvaluatedCall,
df: NuDataFrame,
) -> Result<PipelineData, ShellError> {
let new_column: Value = call.req(0)?;
let column_span = new_column.span();
if NuExpression::can_downcast(&new_column) {
let vals: Vec<Value> = call.rest(0)?;
let value = Value::list(vals, call.head);
let expressions = NuExpression::extract_exprs(plugin, value)?;
let lazy = NuLazyFrame::new(true, df.lazy().to_polars().with_columns(&expressions));
let df = lazy.collect(call.head)?;
df.to_pipeline_data(plugin, engine, call.head)
} else {
let mut other = NuDataFrame::try_from_value_coerce(plugin, &new_column, call.head)?
.as_series(column_span)?;
let name = match call.get_flag::<String>("name")? {
Some(name) => name,
None => other.name().to_string(),
};
let series = other.rename(&name).clone();
let mut polars_df = df.to_polars();
polars_df
.with_column(series)
.map_err(|e| ShellError::GenericError {
error: "Error adding column to dataframe".into(),
msg: e.to_string(),
span: Some(column_span),
help: None,
inner: vec![],
})?;
let df = NuDataFrame::new(df.from_lazy, polars_df);
df.to_pipeline_data(plugin, engine, call.head)
}
}
fn command_lazy(
plugin: &PolarsPlugin,
engine: &EngineInterface,
call: &EvaluatedCall,
lazy: NuLazyFrame,
) -> Result<PipelineData, ShellError> {
let vals: Vec<Value> = call.rest(0)?;
let value = Value::list(vals, call.head);
let expressions = NuExpression::extract_exprs(plugin, value)?;
let lazy: NuLazyFrame = lazy.to_polars().with_columns(&expressions).into();
lazy.to_pipeline_data(plugin, engine, call.head)
}
#[cfg(test)]
mod test {
use super::*;
use crate::test::test_polars_plugin_command;
#[test]
fn test_examples() -> Result<(), ShellError> {
test_polars_plugin_command(&WithColumn)
}
}

View file

@ -164,6 +164,7 @@ macro_rules! lazy_expr_command {
let lazy = NuLazyFrame::try_from_value_coerce(plugin, &value)
.map_err(LabeledError::from)?;
let lazy = NuLazyFrame::new(
lazy.from_eager,
lazy.to_polars()
.$func()
.map_err(|e| ShellError::GenericError {
@ -244,6 +245,7 @@ macro_rules! lazy_expr_command {
let lazy = NuLazyFrame::try_from_value_coerce(plugin, &value)
.map_err(LabeledError::from)?;
let lazy = NuLazyFrame::new(
lazy.from_eager,
lazy.to_polars()
.$func($ddof)
.map_err(|e| ShellError::GenericError {

View file

@ -180,7 +180,8 @@ fn command_df(
res.rename("is_in");
let new_df = NuDataFrame::try_from_series_vec(vec![res.into_series()], call.head)?;
let mut new_df = NuDataFrame::try_from_series_vec(vec![res.into_series()], call.head)?;
new_df.from_lazy = df.from_lazy;
new_df.to_pipeline_data(plugin, engine, call.head)
}

View file

@ -48,7 +48,7 @@ impl PluginCommand for ExprOtherwise {
Example {
description: "Create a new column for the dataframe",
example: r#"[[a b]; [6 2] [1 4] [4 1]]
| polars into-df
| polars into-lazy
| polars with-column (
polars when ((polars col a) > 2) 4 | polars otherwise 5 | polars as c
)

View file

@ -57,7 +57,7 @@ impl PluginCommand for ExprWhen {
Example {
description: "Create a new column for the dataframe",
example: r#"[[a b]; [6 2] [1 4] [4 1]]
| polars into-df
| polars into-lazy
| polars with-column (
polars when ((polars col a) > 2) 4 | polars otherwise 5 | polars as c
)

View file

@ -80,7 +80,7 @@ impl PluginCommand for LazyAggregate {
Example {
description: "Group by and perform an aggregation",
example: r#"[[a b]; [1 2] [1 4] [2 6] [2 4]]
| polars into-df
| polars into-lazy
| polars group-by a
| polars agg [
(polars col b | polars min | polars as "b_min")
@ -147,7 +147,7 @@ impl PluginCommand for LazyAggregate {
}
let polars = group_by.to_polars();
let lazy = NuLazyFrame::new(polars.agg(&expressions));
let lazy = NuLazyFrame::new(false, polars.agg(&expressions));
lazy.to_pipeline_data(plugin, engine, call.head)
.map_err(LabeledError::from)
}

View file

@ -32,8 +32,8 @@ impl PluginCommand for LazyCollect {
fn examples(&self) -> Vec<Example> {
vec![Example {
description: "collect a lazy dataframe",
example: "[[a b]; [1 2] [3 4]] | polars into-df | polars select [(polars col a) (polars col b)] | polars collect",
description: "drop duplicates",
example: "[[a b]; [1 2] [3 4]] | polars into-lazy | polars collect",
result: Some(
NuDataFrame::try_from_columns(
vec![
@ -64,7 +64,9 @@ impl PluginCommand for LazyCollect {
let value = input.into_value(call.head)?;
match PolarsPluginObject::try_from_value(plugin, &value)? {
PolarsPluginObject::NuLazyFrame(lazy) => {
let eager = lazy.collect(call.head)?;
let mut eager = lazy.collect(call.head)?;
// We don't want this converted back to a lazy frame
eager.from_lazy = true;
Ok(PipelineData::Value(
eager
.cache(plugin, engine, call.head)?

View file

@ -70,7 +70,7 @@ impl PluginCommand for LazyFetch {
let value = input.into_value(call.head)?;
let lazy = NuLazyFrame::try_from_value_coerce(plugin, &value)?;
let eager: NuDataFrame = lazy
let mut eager: NuDataFrame = lazy
.to_polars()
.fetch(rows as usize)
.map_err(|e| ShellError::GenericError {
@ -82,6 +82,8 @@ impl PluginCommand for LazyFetch {
})?
.into();
// mark this as not from lazy so it doesn't get converted back to a lazy frame
eager.from_lazy = false;
eager
.to_pipeline_data(plugin, engine, call.head)
.map_err(LabeledError::from)

View file

@ -40,7 +40,7 @@ impl PluginCommand for LazyFillNull {
fn examples(&self) -> Vec<Example> {
vec![Example {
description: "Fills the null values by 0",
example: "[1 2 2 3 3] | polars into-df | polars shift 2 | polars fill-null 0 | polars collect",
example: "[1 2 2 3 3] | polars into-df | polars shift 2 | polars fill-null 0",
result: Some(
NuDataFrame::try_from_columns(
vec![Column::new(
@ -96,7 +96,7 @@ fn cmd_lazy(
fill: Value,
) -> Result<PipelineData, ShellError> {
let expr = NuExpression::try_from_value(plugin, &fill)?.into_polars();
let lazy = NuLazyFrame::new(lazy.to_polars().fill_null(expr));
let lazy = NuLazyFrame::new(lazy.from_eager, lazy.to_polars().fill_null(expr));
lazy.to_pipeline_data(plugin, engine, call.head)
}

View file

@ -42,7 +42,7 @@ impl PluginCommand for LazyFilter {
vec![Example {
description: "Filter dataframe using an expression",
example:
"[[a b]; [6 2] [4 2] [2 2]] | polars into-df | polars filter ((polars col a) >= 4) | polars collect",
"[[a b]; [6 2] [4 2] [2 2]] | polars into-df | polars filter ((polars col a) >= 4)",
result: Some(
NuDataFrame::try_from_columns(
vec![
@ -85,7 +85,10 @@ fn command(
lazy: NuLazyFrame,
filter_expr: NuExpression,
) -> Result<PipelineData, ShellError> {
let lazy = NuLazyFrame::new(lazy.to_polars().filter(filter_expr.into_polars()));
let lazy = NuLazyFrame::new(
lazy.from_eager,
lazy.to_polars().filter(filter_expr.into_polars()),
);
lazy.to_pipeline_data(plugin, engine, call.head)
}

View file

@ -1,97 +0,0 @@
use nu_plugin::{EngineInterface, EvaluatedCall, PluginCommand};
use nu_protocol::{
Category, Example, LabeledError, PipelineData, ShellError, Signature, Span, SyntaxShape, Type,
Value,
};
use polars::prelude::LazyFrame;
use crate::{
dataframe::values::{NuExpression, NuLazyFrame},
values::CustomValueSupport,
PolarsPlugin,
};
use super::super::values::{Column, NuDataFrame};
#[derive(Clone)]
pub struct FilterWith;
impl PluginCommand for FilterWith {
type Plugin = PolarsPlugin;
fn name(&self) -> &str {
"polars filter-with"
}
fn usage(&self) -> &str {
"Filters dataframe using an expression."
}
fn signature(&self) -> Signature {
Signature::build(self.name())
.required(
"filter expression",
SyntaxShape::Any,
"filter expression used to filter dataframe",
)
.input_output_type(
Type::Custom("dataframe".into()),
Type::Custom("dataframe".into()),
)
.category(Category::Custom("dataframe or lazyframe".into()))
}
fn examples(&self) -> Vec<Example> {
vec![Example {
description: "Filter dataframe using an expression",
example:
"[[a b]; [1 2] [3 4]] | polars into-df | polars filter-with ((polars col a) > 1)",
result: Some(
NuDataFrame::try_from_columns(
vec![
Column::new("a".to_string(), vec![Value::test_int(3)]),
Column::new("b".to_string(), vec![Value::test_int(4)]),
],
None,
)
.expect("simple df for test should not fail")
.into_value(Span::test_data()),
),
}]
}
fn run(
&self,
plugin: &Self::Plugin,
engine: &EngineInterface,
call: &EvaluatedCall,
input: PipelineData,
) -> Result<PipelineData, LabeledError> {
let value = input.into_value(call.head)?;
let lazy = NuLazyFrame::try_from_value_coerce(plugin, &value)?;
command_lazy(plugin, engine, call, lazy).map_err(LabeledError::from)
}
}
fn command_lazy(
plugin: &PolarsPlugin,
engine: &EngineInterface,
call: &EvaluatedCall,
lazy: NuLazyFrame,
) -> Result<PipelineData, ShellError> {
let expr: Value = call.req(0)?;
let expr = NuExpression::try_from_value(plugin, &expr)?;
let lazy = lazy.apply_with_expr(expr, LazyFrame::filter);
lazy.to_pipeline_data(plugin, engine, call.head)
}
#[cfg(test)]
mod test {
use super::*;
use crate::test::test_polars_plugin_command;
#[test]
fn test_examples() -> Result<(), ShellError> {
test_polars_plugin_command(&FilterWith)
}
}

View file

@ -79,7 +79,7 @@ impl PluginCommand for ToLazyGroupBy {
Example {
description: "Group by and perform an aggregation",
example: r#"[[a b]; [1 2] [1 4] [2 6] [2 4]]
| polars into-df
| polars into-lazy
| polars group-by a
| polars agg [
(polars col b | polars min | polars as "b_min")
@ -152,7 +152,7 @@ fn command(
expressions: Vec<Expr>,
) -> Result<PipelineData, ShellError> {
let group_by = lazy.to_polars().group_by(expressions);
let group_by = NuLazyGroupBy::new(group_by, lazy.schema()?);
let group_by = NuLazyGroupBy::new(group_by, lazy.from_eager, lazy.schema()?);
group_by.to_pipeline_data(plugin, engine, call.head)
}

View file

@ -54,8 +54,8 @@ impl PluginCommand for LazyJoin {
vec![
Example {
description: "Join two lazy dataframes",
example: r#"let df_a = ([[a b c];[1 "a" 0] [2 "b" 1] [1 "c" 2] [1 "c" 3]] | polars into-df);
let df_b = ([["foo" "bar" "ham"];[1 "a" "let"] [2 "c" "var"] [3 "c" "const"]] | polars into-df);
example: r#"let df_a = ([[a b c];[1 "a" 0] [2 "b" 1] [1 "c" 2] [1 "c" 3]] | polars into-lazy);
let df_b = ([["foo" "bar" "ham"];[1 "a" "let"] [2 "c" "var"] [3 "c" "const"]] | polars into-lazy);
$df_a | polars join $df_b a foo | polars collect"#,
result: Some(
NuDataFrame::try_from_columns(
@ -115,7 +115,7 @@ impl PluginCommand for LazyJoin {
Example {
description: "Join one eager dataframe with a lazy dataframe",
example: r#"let df_a = ([[a b c];[1 "a" 0] [2 "b" 1] [1 "c" 2] [1 "c" 3]] | polars into-df);
let df_b = ([["foo" "bar" "ham"];[1 "a" "let"] [2 "c" "var"] [3 "c" "const"]] | polars into-df);
let df_b = ([["foo" "bar" "ham"];[1 "a" "let"] [2 "c" "var"] [3 "c" "const"]] | polars into-lazy);
$df_a | polars join $df_b a foo"#,
result: Some(
NuDataFrame::try_from_columns(
@ -230,6 +230,7 @@ impl PluginCommand for LazyJoin {
let value = input.into_value(call.head)?;
let lazy = NuLazyFrame::try_from_value_coerce(plugin, &value)?;
let from_eager = lazy.from_eager;
let lazy = lazy.to_polars();
let lazy = lazy
@ -242,7 +243,7 @@ impl PluginCommand for LazyJoin {
.suffix(suffix)
.finish();
let lazy = NuLazyFrame::new(lazy);
let lazy = NuLazyFrame::new(from_eager, lazy);
lazy.to_pipeline_data(plugin, engine, call.head)
.map_err(LabeledError::from)
}

View file

@ -46,7 +46,7 @@ macro_rules! lazy_command {
) -> Result<PipelineData, LabeledError> {
let lazy = NuLazyFrame::try_from_pipeline_coerce(plugin, input, call.head)
.map_err(LabeledError::from)?;
let lazy = NuLazyFrame::new(lazy.to_polars().$func());
let lazy = NuLazyFrame::new(lazy.from_eager, lazy.to_polars().$func());
lazy.to_pipeline_data(plugin, engine, call.head)
.map_err(LabeledError::from)
}

View file

@ -126,7 +126,7 @@ fn command(
span: None,
inner: vec![],
})?;
let lazy = NuLazyFrame::new(polars_lazy);
let lazy = NuLazyFrame::new(lazy.from_eager, polars_lazy);
lazy.to_pipeline_data(plugin, engine, call.head)
}

View file

@ -1,30 +1,19 @@
mod aggregate;
mod cast;
mod collect;
mod drop;
mod drop_duplicates;
mod drop_nulls;
mod explode;
mod fetch;
mod fill_nan;
mod fill_null;
mod filter;
mod filter_with;
mod first;
mod flatten;
mod get;
pub mod groupby;
mod join;
mod last;
mod macro_commands;
mod median;
mod melt;
mod quantile;
mod rename;
mod select;
mod slice;
mod sort_by_expr;
mod with_column;
mod to_lazy;
use nu_plugin::PluginCommand;
@ -40,20 +29,13 @@ pub(crate) use crate::dataframe::lazy::macro_commands::*;
use crate::dataframe::lazy::quantile::LazyQuantile;
pub(crate) use crate::dataframe::lazy::select::LazySelect;
use crate::dataframe::lazy::sort_by_expr::LazySortBy;
pub use crate::dataframe::lazy::to_lazy::ToLazyFrame;
use crate::PolarsPlugin;
pub use explode::LazyExplode;
pub use flatten::LazyFlatten;
pub(crate) fn lazy_commands() -> Vec<Box<dyn PluginCommand<Plugin = PolarsPlugin>>> {
vec![
Box::new(cast::CastDF),
Box::new(drop::DropDF),
Box::new(drop_duplicates::DropDuplicates),
Box::new(drop_nulls::DropNulls),
Box::new(filter_with::FilterWith),
Box::new(first::FirstDF),
Box::new(get::GetDF),
Box::new(last::LastDF),
Box::new(LazyAggregate),
Box::new(LazyCache),
Box::new(LazyCollect),
@ -65,14 +47,11 @@ pub(crate) fn lazy_commands() -> Vec<Box<dyn PluginCommand<Plugin = PolarsPlugin
Box::new(LazyFlatten),
Box::new(LazyJoin),
Box::new(median::LazyMedian),
Box::new(melt::MeltDF),
Box::new(LazyReverse),
Box::new(LazySelect),
Box::new(LazySortBy),
Box::new(LazyQuantile),
Box::new(rename::RenameDF),
Box::new(slice::SliceDF),
Box::new(ToLazyFrame),
Box::new(ToLazyGroupBy),
Box::new(with_column::WithColumn),
]
}

View file

@ -132,6 +132,7 @@ fn command(
quantile: f64,
) -> Result<PipelineData, ShellError> {
let lazy = NuLazyFrame::new(
lazy.from_eager,
lazy.to_polars()
.quantile(lit(quantile), QuantileInterpolOptions::default())
.map_err(|e| ShellError::GenericError {

View file

@ -67,7 +67,7 @@ impl PluginCommand for LazySelect {
let pipeline_value = input.into_value(call.head)?;
let lazy = NuLazyFrame::try_from_value_coerce(plugin, &pipeline_value)?;
let lazy = NuLazyFrame::new(lazy.to_polars().select(&expressions));
let lazy = NuLazyFrame::new(lazy.from_eager, lazy.to_polars().select(&expressions));
lazy.to_pipeline_data(plugin, engine, call.head)
.map_err(LabeledError::from)
}

View file

@ -147,7 +147,10 @@ impl PluginCommand for LazySortBy {
let pipeline_value = input.into_value(call.head)?;
let lazy = NuLazyFrame::try_from_value_coerce(plugin, &pipeline_value)?;
let lazy = NuLazyFrame::new(lazy.to_polars().sort_by_exprs(&expressions, sort_options));
let lazy = NuLazyFrame::new(
lazy.from_eager,
lazy.to_polars().sort_by_exprs(&expressions, sort_options),
);
lazy.to_pipeline_data(plugin, engine, call.head)
.map_err(LabeledError::from)
}

View file

@ -0,0 +1,90 @@
use crate::{dataframe::values::NuSchema, values::CustomValueSupport, Cacheable, PolarsPlugin};
use super::super::values::{NuDataFrame, NuLazyFrame};
use nu_plugin::{EngineInterface, EvaluatedCall, PluginCommand};
use nu_protocol::{Category, Example, LabeledError, PipelineData, Signature, SyntaxShape, Type};
#[derive(Clone)]
pub struct ToLazyFrame;
impl PluginCommand for ToLazyFrame {
type Plugin = PolarsPlugin;
fn name(&self) -> &str {
"polars into-lazy"
}
fn usage(&self) -> &str {
"Converts a dataframe into a lazy dataframe."
}
fn signature(&self) -> Signature {
Signature::build(self.name())
.named(
"schema",
SyntaxShape::Record(vec![]),
r#"Polars Schema in format [{name: str}]. CSV, JSON, and JSONL files"#,
Some('s'),
)
.input_output_type(Type::Any, Type::Custom("dataframe".into()))
.category(Category::Custom("lazyframe".into()))
}
fn examples(&self) -> Vec<Example> {
vec![Example {
description: "Takes a table and creates a lazyframe",
example: "[[a b];[1 2] [3 4]] | polars into-lazy",
result: None,
},
Example {
description: "Takes a table, creates a lazyframe, assigns column 'b' type str, displays the schema",
example: "[[a b];[1 2] [3 4]] | polars into-lazy --schema {b: str} | polars schema",
result: None
},
]
}
fn run(
&self,
plugin: &Self::Plugin,
engine: &EngineInterface,
call: &EvaluatedCall,
input: PipelineData,
) -> Result<PipelineData, LabeledError> {
let maybe_schema = call
.get_flag("schema")?
.map(|schema| NuSchema::try_from(&schema))
.transpose()?;
let df = NuDataFrame::try_from_iter(plugin, input.into_iter(), maybe_schema)?;
let mut lazy = NuLazyFrame::from_dataframe(df);
// We don't want this converted back to an eager dataframe at some point
lazy.from_eager = false;
Ok(PipelineData::Value(
lazy.cache(plugin, engine, call.head)?.into_value(call.head),
None,
))
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use nu_plugin_test_support::PluginTest;
use nu_protocol::{ShellError, Span};
use super::*;
#[test]
fn test_to_lazy() -> Result<(), ShellError> {
let plugin: Arc<PolarsPlugin> = PolarsPlugin::new_test_mode().into();
let mut plugin_test = PluginTest::new("polars", Arc::clone(&plugin))?;
let pipeline_data = plugin_test.eval("[[a b]; [6 2] [1 4] [4 1]] | polars into-lazy")?;
let value = pipeline_data.into_value(Span::test_data())?;
let df = NuLazyFrame::try_from_value(&plugin, &value)?;
assert!(!df.from_eager);
Ok(())
}
}

View file

@ -1,114 +0,0 @@
use super::super::values::{Column, NuDataFrame};
use crate::{
dataframe::values::{NuExpression, NuLazyFrame},
values::CustomValueSupport,
PolarsPlugin,
};
use nu_plugin::{EngineInterface, EvaluatedCall, PluginCommand};
use nu_protocol::{
Category, Example, LabeledError, PipelineData, ShellError, Signature, Span, SyntaxShape, Type,
Value,
};
#[derive(Clone)]
pub struct WithColumn;
impl PluginCommand for WithColumn {
type Plugin = PolarsPlugin;
fn name(&self) -> &str {
"polars with-column"
}
fn usage(&self) -> &str {
"Adds a series to the dataframe."
}
fn signature(&self) -> Signature {
Signature::build(self.name())
.rest(
"series or expressions",
SyntaxShape::Any,
"series to be added or expressions used to define the new columns",
)
.input_output_type(
Type::Custom("dataframe".into()),
Type::Custom("dataframe".into()),
)
.category(Category::Custom("dataframe or lazyframe".into()))
}
fn examples(&self) -> Vec<Example> {
vec![Example {
description: "Adds a series to the dataframe",
example: r#"[[a b]; [1 2] [3 4]]
| polars into-df
| polars with-column [
((polars col a) * 2 | polars as "c")
((polars col a) * 3 | polars as "d")
]
| polars collect"#,
result: Some(
NuDataFrame::try_from_columns(
vec![
Column::new(
"a".to_string(),
vec![Value::test_int(1), Value::test_int(3)],
),
Column::new(
"b".to_string(),
vec![Value::test_int(2), Value::test_int(4)],
),
Column::new(
"c".to_string(),
vec![Value::test_int(2), Value::test_int(6)],
),
Column::new(
"d".to_string(),
vec![Value::test_int(3), Value::test_int(9)],
),
],
None,
)
.expect("simple df for test should not fail")
.into_value(Span::test_data()),
),
}]
}
fn run(
&self,
plugin: &Self::Plugin,
engine: &EngineInterface,
call: &EvaluatedCall,
input: PipelineData,
) -> Result<PipelineData, LabeledError> {
let value = input.into_value(call.head)?;
let lazy = NuLazyFrame::try_from_value_coerce(plugin, &value)?;
command_lazy(plugin, engine, call, lazy).map_err(LabeledError::from)
}
}
fn command_lazy(
plugin: &PolarsPlugin,
engine: &EngineInterface,
call: &EvaluatedCall,
lazy: NuLazyFrame,
) -> Result<PipelineData, ShellError> {
let vals: Vec<Value> = call.rest(0)?;
let value = Value::list(vals, call.head);
let expressions = NuExpression::extract_exprs(plugin, value)?;
let lazy: NuLazyFrame = lazy.to_polars().with_columns(&expressions).into();
lazy.to_pipeline_data(plugin, engine, call.head)
}
#[cfg(test)]
mod test {
use super::*;
use crate::test::test_polars_plugin_command;
#[test]
fn test_examples() -> Result<(), ShellError> {
test_polars_plugin_command(&WithColumn)
}
}

View file

@ -1,6 +1,6 @@
use crate::{
dataframe::values::{NuExpression, NuLazyFrame},
values::CustomValueSupport,
values::{cant_convert_err, CustomValueSupport, PolarsPluginObject, PolarsPluginType},
PolarsPlugin,
};
@ -63,7 +63,8 @@ impl PluginCommand for Shift {
},
Example {
description: "Shifts the values by a given period, fill absent values with 0",
example: "[1 2 2 3 3] | polars into-df | polars shift 2 --fill 0 | polars collect",
example:
"[1 2 2 3 3] | polars into-lazy | polars shift 2 --fill 0 | polars collect",
result: Some(
NuDataFrame::try_from_columns(
vec![Column::new(
@ -93,11 +94,35 @@ impl PluginCommand for Shift {
input: PipelineData,
) -> Result<PipelineData, LabeledError> {
let value = input.into_value(call.head)?;
let lazy = NuLazyFrame::try_from_value_coerce(plugin, &value)?;
command_lazy(plugin, engine, call, lazy).map_err(LabeledError::from)
match PolarsPluginObject::try_from_value(plugin, &value)? {
PolarsPluginObject::NuDataFrame(df) => command_eager(plugin, engine, call, df),
PolarsPluginObject::NuLazyFrame(lazy) => command_lazy(plugin, engine, call, lazy),
_ => Err(cant_convert_err(
&value,
&[
PolarsPluginType::NuDataFrame,
PolarsPluginType::NuLazyGroupBy,
],
)),
}
.map_err(LabeledError::from)
}
}
fn command_eager(
plugin: &PolarsPlugin,
engine: &EngineInterface,
call: &EvaluatedCall,
df: NuDataFrame,
) -> Result<PipelineData, ShellError> {
let period: i64 = call.req(0)?;
let series = df.as_series(call.head)?.shift(period);
let df = NuDataFrame::try_from_series_vec(vec![series], call.head)?;
df.to_pipeline_data(plugin, engine, call.head)
}
fn command_lazy(
plugin: &PolarsPlugin,
engine: &EngineInterface,

View file

@ -1,6 +1,6 @@
use crate::{
dataframe::{utils::extract_strings, values::NuLazyFrame},
values::CustomValueSupport,
values::{cant_convert_err, CustomValueSupport, PolarsPluginObject, PolarsPluginType},
PolarsPlugin,
};
@ -11,7 +11,7 @@ use nu_protocol::{
Category, Example, LabeledError, PipelineData, ShellError, Signature, Span, SyntaxShape, Type,
Value,
};
use polars::prelude::UniqueKeepStrategy;
use polars::prelude::{IntoSeries, UniqueKeepStrategy};
#[derive(Clone)]
pub struct Unique;
@ -68,7 +68,7 @@ impl PluginCommand for Unique {
},
Example {
description: "Returns unique values in a subset of lazyframe columns",
example: "[[a b c]; [1 2 1] [2 2 2] [3 2 1]] | polars into-df | polars unique --subset [b c] | polars collect",
example: "[[a b c]; [1 2 1] [2 2 2] [3 2 1]] | polars into-lazy | polars unique --subset [b c] | polars collect",
result: Some(
NuDataFrame::try_from_columns(
vec![
@ -94,7 +94,7 @@ impl PluginCommand for Unique {
Example {
description: "Returns unique values in a subset of lazyframe columns",
example: r#"[[a b c]; [1 2 1] [2 2 2] [3 2 1]]
| polars into-df
| polars into-lazy
| polars unique --subset [b c] --last
| polars collect"#,
result: Some(
@ -135,11 +135,42 @@ impl PluginCommand for Unique {
input: PipelineData,
) -> Result<PipelineData, LabeledError> {
let value = input.into_value(call.head)?;
let df = NuLazyFrame::try_from_value_coerce(plugin, &value)?;
command_lazy(plugin, engine, call, df).map_err(LabeledError::from)
match PolarsPluginObject::try_from_value(plugin, &value)? {
PolarsPluginObject::NuDataFrame(df) => command_eager(plugin, engine, call, df),
PolarsPluginObject::NuLazyFrame(lazy) => command_lazy(plugin, engine, call, lazy),
_ => Err(cant_convert_err(
&value,
&[
PolarsPluginType::NuDataFrame,
PolarsPluginType::NuLazyGroupBy,
],
)),
}
.map_err(LabeledError::from)
}
}
fn command_eager(
plugin: &PolarsPlugin,
engine: &EngineInterface,
call: &EvaluatedCall,
df: NuDataFrame,
) -> Result<PipelineData, ShellError> {
let series = df.as_series(call.head)?;
let res = series.unique().map_err(|e| ShellError::GenericError {
error: "Error calculating unique values".into(),
msg: e.to_string(),
span: Some(call.head),
help: Some("The str-slice command can only be used with string columns".into()),
inner: vec![],
})?;
let df = NuDataFrame::try_from_series_vec(vec![res.into_series()], call.head)?;
df.to_pipeline_data(plugin, engine, call.head)
}
fn command_lazy(
plugin: &PolarsPlugin,
engine: &EngineInterface,

View file

@ -323,7 +323,19 @@ pub trait CustomValueSupport: Cacheable {
engine: &EngineInterface,
span: Span,
) -> Result<Value, ShellError> {
Ok(self.cache(plugin, engine, span)?.into_value(span))
match self.to_cache_value()? {
// if it was from a lazy value, make it lazy again
PolarsPluginObject::NuDataFrame(df) if df.from_lazy => {
let df = df.lazy();
Ok(df.cache(plugin, engine, span)?.into_value(span))
}
// if it was from an eager value, make it eager again
PolarsPluginObject::NuLazyFrame(lf) if lf.from_eager => {
let lf = lf.collect(span)?;
Ok(lf.cache(plugin, engine, span)?.into_value(span))
}
_ => Ok(self.cache(plugin, engine, span)?.into_value(span)),
}
}
/// Caches the object, converts it to a it's CustomValue counterpart

View file

@ -472,7 +472,7 @@ pub fn from_parsed_columns(column_values: ColumnMap) -> Result<NuDataFrame, Shel
}
DataFrame::new(df_series)
.map(NuDataFrame::new)
.map(|df| NuDataFrame::new(false, df))
.map_err(|e| ShellError::GenericError {
error: "Error creating dataframe".into(),
msg: e.to_string(),

View file

@ -104,6 +104,7 @@ impl PolarsObject for DataFrameValue {
pub struct NuDataFrame {
pub id: Uuid,
pub df: Arc<DataFrame>,
pub from_lazy: bool,
}
impl AsRef<DataFrame> for NuDataFrame {
@ -114,16 +115,17 @@ impl AsRef<DataFrame> for NuDataFrame {
impl From<DataFrame> for NuDataFrame {
fn from(df: DataFrame) -> Self {
Self::new(df)
Self::new(false, df)
}
}
impl NuDataFrame {
pub fn new(df: DataFrame) -> Self {
pub fn new(from_lazy: bool, df: DataFrame) -> Self {
let id = Uuid::new_v4();
Self {
id,
df: Arc::new(df),
from_lazy,
}
}
@ -132,12 +134,12 @@ impl NuDataFrame {
}
pub fn lazy(&self) -> NuLazyFrame {
NuLazyFrame::new(self.to_polars().lazy())
NuLazyFrame::new(true, self.to_polars().lazy())
}
pub fn try_from_series(series: Series, span: Span) -> Result<Self, ShellError> {
match DataFrame::new(vec![series]) {
Ok(dataframe) => Ok(NuDataFrame::new(dataframe)),
Ok(dataframe) => Ok(NuDataFrame::new(false, dataframe)),
Err(e) => Err(ShellError::GenericError {
error: "Error creating dataframe".into(),
msg: e.to_string(),
@ -200,7 +202,7 @@ impl NuDataFrame {
inner: vec![],
})?;
Ok(Self::new(dataframe))
Ok(Self::new(false, dataframe))
}
pub fn try_from_columns(
@ -274,7 +276,7 @@ impl NuDataFrame {
inner: vec![],
})?;
Ok(Self::new(df))
Ok(Self::new(false, df))
}
pub fn is_series(&self) -> bool {

View file

@ -147,7 +147,7 @@ impl NuDataFrame {
inner: vec![],
})?;
Ok(NuDataFrame::new(df_new))
Ok(NuDataFrame::new(false, df_new))
}
Axis::Column => {
if self.df.width() != other.df.width() {
@ -205,7 +205,7 @@ impl NuDataFrame {
inner: vec![],
})?;
Ok(NuDataFrame::new(df_new))
Ok(NuDataFrame::new(false, df_new))
}
}
}

View file

@ -63,16 +63,45 @@ fn compute_with_value(
op: Span,
right: &Value,
) -> Result<Value, ShellError> {
let rhs = NuExpression::try_from_value(plugin, right)?;
with_operator(
(plugin, engine),
operator,
left,
&rhs,
lhs_span,
right.span(),
op,
)
let rhs_span = right.span();
match right {
Value::Custom { val: rhs, .. } => {
let rhs = rhs.as_any().downcast_ref::<NuExpression>().ok_or_else(|| {
ShellError::TypeMismatch {
err_message: "Right hand side not a dataframe expression".into(),
span: rhs_span,
}
})?;
match rhs.as_ref() {
polars::prelude::Expr::Literal(..) => with_operator(
(plugin, engine),
operator,
left,
rhs,
lhs_span,
right.span(),
op,
),
_ => Err(ShellError::TypeMismatch {
err_message: "Only literal expressions or number".into(),
span: right.span(),
}),
}
}
_ => {
let rhs = NuExpression::try_from_value(plugin, right)?;
with_operator(
(plugin, engine),
operator,
left,
&rhs,
lhs_span,
right.span(),
op,
)
}
}
}
fn with_operator(

View file

@ -21,6 +21,7 @@ pub use custom_value::NuLazyFrameCustomValue;
pub struct NuLazyFrame {
pub id: Uuid,
pub lazy: Arc<LazyFrame>,
pub from_eager: bool,
}
impl fmt::Debug for NuLazyFrame {
@ -31,21 +32,22 @@ impl fmt::Debug for NuLazyFrame {
impl From<LazyFrame> for NuLazyFrame {
fn from(lazy_frame: LazyFrame) -> Self {
NuLazyFrame::new(lazy_frame)
NuLazyFrame::new(false, lazy_frame)
}
}
impl NuLazyFrame {
pub fn new(lazy: LazyFrame) -> Self {
pub fn new(from_eager: bool, lazy: LazyFrame) -> Self {
Self {
id: Uuid::new_v4(),
lazy: Arc::new(lazy),
from_eager,
}
}
pub fn from_dataframe(df: NuDataFrame) -> Self {
let lazy = df.as_ref().clone().lazy();
NuLazyFrame::new(lazy)
NuLazyFrame::new(true, lazy)
}
pub fn to_polars(&self) -> LazyFrame {
@ -62,7 +64,7 @@ impl NuLazyFrame {
help: None,
inner: vec![],
})
.map(NuDataFrame::new)
.map(|df| NuDataFrame::new(true, df))
}
pub fn apply_with_expr<F>(self, expr: NuExpression, f: F) -> Self
@ -72,7 +74,7 @@ impl NuLazyFrame {
let df = self.to_polars();
let expr = expr.into_polars();
let new_frame = f(df, expr);
Self::new(new_frame)
Self::new(self.from_eager, new_frame)
}
pub fn schema(&self) -> Result<NuSchema, ShellError> {

View file

@ -20,6 +20,7 @@ pub struct NuLazyGroupBy {
pub id: Uuid,
pub group_by: Arc<LazyGroupBy>,
pub schema: NuSchema,
pub from_eager: bool,
}
impl fmt::Debug for NuLazyGroupBy {
@ -29,10 +30,11 @@ impl fmt::Debug for NuLazyGroupBy {
}
impl NuLazyGroupBy {
pub fn new(group_by: LazyGroupBy, schema: NuSchema) -> Self {
pub fn new(group_by: LazyGroupBy, from_eager: bool, schema: NuSchema) -> Self {
NuLazyGroupBy {
id: Uuid::new_v4(),
group_by: Arc::new(group_by),
from_eager,
schema,
}
}

View file

@ -48,6 +48,8 @@ pub(crate) fn convert_columns(
// Converts a Vec<Value> to a Vec<String> with a Span marking the whole
// location of the columns for error referencing
// todo - fix
#[allow(dead_code)]
pub(crate) fn convert_columns_string(
columns: Vec<Value>,
span: Span,