Added polars concat to allow concatenation of multiple dataframes (#13879)

# Description
Provides the ability to concatenate multiple dataframes together

# User-Facing Changes
- Introduces new command `polars concat`
This commit is contained in:
Jack Wright 2024-09-23 04:43:43 -07:00 committed by GitHub
parent ee877607fb
commit 2541a712e4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 164 additions and 0 deletions

View file

@ -0,0 +1,162 @@
use crate::{
values::{CustomValueSupport, NuLazyFrame},
PolarsPlugin,
};
use crate::values::NuDataFrame;
use nu_plugin::{EngineInterface, EvaluatedCall, PluginCommand};
use nu_protocol::{
Category, Example, LabeledError, PipelineData, ShellError, Signature, Span, SyntaxShape, Type,
Value,
};
use polars::{
df,
prelude::{LazyFrame, UnionArgs},
};
#[derive(Clone)]
pub struct ConcatDF;
impl PluginCommand for ConcatDF {
type Plugin = PolarsPlugin;
fn name(&self) -> &str {
"polars concat"
}
fn description(&self) -> &str {
"Concatenate two or more dataframes."
}
fn signature(&self) -> Signature {
Signature::build(self.name())
.switch("no-parallel", "Disable parallel execution", None)
.switch("rechunk", "Rechunk the resulting dataframe", None)
.switch("to-supertypes", "Cast to supertypes", None)
.switch("diagonal", "Concatenate dataframes diagonally", None)
.switch(
"from-partitioned-ds",
"Concatenate dataframes from a partitioned dataset",
None,
)
.rest(
"dataframes",
SyntaxShape::Any,
"The dataframes to concatenate",
)
.input_output_type(Type::Any, Type::Custom("dataframe".into()))
.category(Category::Custom("dataframe".into()))
}
fn examples(&self) -> Vec<Example> {
vec![
Example {
description: "Concatenates two dataframes with the dataframe in the pipeline.",
example: "[[a b]; [1 2]] | polars into-df
| polars concat ([[a b]; [3 4]] | polars into-df) ([[a b]; [5 6]] | polars into-df)
| polars collect
| polars sort-by [a b]",
result: Some(
NuDataFrame::from(
df!(
"a" => [1, 3, 5],
"b" => [2, 4, 6],
)
.expect("simple df for test should not fail"),
)
.into_value(Span::test_data()),
),
},
Example {
description: "Concatenates three dataframes together",
example: "polars concat ([[a b]; [1 2]] | polars into-df) ([[a b]; [3 4]] | polars into-df) ([[a b]; [5 6]] | polars into-df)
| polars collect
| polars sort-by [a b]",
result: Some(
NuDataFrame::from(
df!(
"a" => [1, 3, 5],
"b" => [2, 4, 6],
)
.expect("simple df for test should not fail"),
)
.into_value(Span::test_data()),
),
}
]
}
fn run(
&self,
plugin: &Self::Plugin,
engine: &EngineInterface,
call: &EvaluatedCall,
input: PipelineData,
) -> Result<PipelineData, LabeledError> {
let maybe_df = NuLazyFrame::try_from_pipeline_coerce(plugin, input, call.head).ok();
command_lazy(plugin, engine, call, maybe_df).map_err(LabeledError::from)
}
}
fn command_lazy(
plugin: &PolarsPlugin,
engine: &EngineInterface,
call: &EvaluatedCall,
maybe_lazy: Option<NuLazyFrame>,
) -> Result<PipelineData, ShellError> {
let parallel = !call.has_flag("no-parallel")?;
let rechunk = call.has_flag("rechunk")?;
let to_supertypes = call.has_flag("to-supertypes")?;
let diagonal = call.has_flag("diagonal")?;
let from_partitioned_ds = call.has_flag("from-partitioned-ds")?;
let mut dataframes = call
.rest::<Value>(0)?
.iter()
.map(|v| NuLazyFrame::try_from_value_coerce(plugin, v).map(|lazy| lazy.to_polars()))
.collect::<Result<Vec<LazyFrame>, ShellError>>()?;
if dataframes.is_empty() {
Err(ShellError::GenericError {
error: "At least one other dataframe must be provided".into(),
msg: "".into(),
span: Some(call.head),
help: None,
inner: vec![],
})
} else {
if let Some(lazy) = maybe_lazy.as_ref() {
dataframes.insert(0, lazy.to_polars());
}
let args = UnionArgs {
parallel,
rechunk,
to_supertypes,
diagonal,
from_partitioned_ds,
};
let res: NuLazyFrame = polars::prelude::concat(&dataframes, args)
.map_err(|e| ShellError::GenericError {
error: format!("Failed to concatenate dataframes: {e}"),
msg: "".into(),
span: Some(call.head),
help: None,
inner: vec![],
})?
.into();
res.to_pipeline_data(plugin, engine, call.head)
}
}
#[cfg(test)]
mod test {
use crate::test::test_polars_plugin_command;
use super::*;
#[test]
fn test_examples() -> Result<(), ShellError> {
test_polars_plugin_command(&ConcatDF)
}
}

View file

@ -4,6 +4,7 @@ mod arg_where;
mod cast; mod cast;
mod col; mod col;
mod collect; mod collect;
mod concat;
mod drop; mod drop;
mod drop_duplicates; mod drop_duplicates;
mod drop_nulls; mod drop_nulls;
@ -74,6 +75,7 @@ pub(crate) fn data_commands() -> Vec<Box<dyn PluginCommand<Plugin = PolarsPlugin
Box::new(AppendDF), Box::new(AppendDF),
Box::new(CastDF), Box::new(CastDF),
Box::new(DropDF), Box::new(DropDF),
Box::new(concat::ConcatDF),
Box::new(DropDuplicates), Box::new(DropDuplicates),
Box::new(DropNulls), Box::new(DropNulls),
Box::new(Dummies), Box::new(Dummies),