Upgrading to polars 0.35 (#11241)

Co-authored-by: Jack Wright <jack.wright@disqo.com>
This commit is contained in:
Jack Wright 2023-12-05 16:09:34 -08:00 committed by GitHub
parent 05d7d6d6ad
commit 31146a7591
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 532 additions and 352 deletions

756
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -19,12 +19,16 @@ nu-protocol = { path = "../nu-protocol", version = "0.87.2" }
# Potential dependencies for extras # Potential dependencies for extras
chrono = { version = "0.4", features = ["std", "unstable-locales"], default-features = false } chrono = { version = "0.4", features = ["std", "unstable-locales"], default-features = false }
fancy-regex = "0.11" chrono-tz = "0.8"
fancy-regex = "0.12"
indexmap = { version = "2.1" } indexmap = { version = "2.1" }
num = { version = "0.4", optional = true } num = { version = "0.4", optional = true }
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
sqlparser = { version = "0.36.1", optional = true } sqlparser = { version = "0.39", optional = true }
polars-io = { version = "0.33", features = ["avro"], optional = true } polars-io = { version = "0.35", features = ["avro"], optional = true }
polars-arrow = "0.35"
polars-ops = "0.35"
polars-plan = "0.35"
[dependencies.polars] [dependencies.polars]
features = [ features = [
@ -58,7 +62,7 @@ features = [
"to_dummies", "to_dummies",
] ]
optional = true optional = true
version = "0.33" version = "0.35"
[features] [features]
dataframe = ["num", "polars", "polars-io", "sqlparser"] dataframe = ["num", "polars", "polars-io", "sqlparser"]

View file

@ -150,6 +150,7 @@ fn from_parquet(
low_memory: false, low_memory: false,
cloud_options: None, cloud_options: None,
use_statistics: false, use_statistics: false,
hive_partitioning: false,
}; };
let df: NuLazyFrame = LazyFrame::scan_parquet(file, args) let df: NuLazyFrame = LazyFrame::scan_parquet(file, args)
@ -411,7 +412,7 @@ fn from_csv(
Some(d) => d as u8, Some(d) => d as u8,
None => unreachable!(), None => unreachable!(),
}; };
csv_reader.with_delimiter(delimiter) csv_reader.with_separator(delimiter)
} }
} }
}; };
@ -472,7 +473,7 @@ fn from_csv(
Some(d) => d as u8, Some(d) => d as u8,
None => unreachable!(), None => unreachable!(),
}; };
csv_reader.with_delimiter(delimiter) csv_reader.with_separator(delimiter)
} }
} }
}; };

View file

@ -4,6 +4,8 @@ use nu_protocol::{
engine::{Command, EngineState, Stack}, engine::{Command, EngineState, Stack},
Category, Example, PipelineData, ShellError, Signature, Spanned, SyntaxShape, Type, Category, Example, PipelineData, ShellError, Signature, Spanned, SyntaxShape, Type,
}; };
use polars::prelude::NamedFrom;
use polars::series::Series;
use super::super::values::NuDataFrame; use super::super::values::NuDataFrame;
@ -81,7 +83,7 @@ fn command(
call: &Call, call: &Call,
input: PipelineData, input: PipelineData,
) -> Result<PipelineData, ShellError> { ) -> Result<PipelineData, ShellError> {
let rows: Option<Spanned<usize>> = call.get_flag(engine_state, stack, "n-rows")?; let rows: Option<Spanned<i64>> = call.get_flag(engine_state, stack, "n-rows")?;
let fraction: Option<Spanned<f64>> = call.get_flag(engine_state, stack, "fraction")?; let fraction: Option<Spanned<f64>> = call.get_flag(engine_state, stack, "fraction")?;
let seed: Option<u64> = call let seed: Option<u64> = call
.get_flag::<i64>(engine_state, stack, "seed")? .get_flag::<i64>(engine_state, stack, "seed")?
@ -94,7 +96,7 @@ fn command(
match (rows, fraction) { match (rows, fraction) {
(Some(rows), None) => df (Some(rows), None) => df
.as_ref() .as_ref()
.sample_n(rows.item, replace, shuffle, seed) .sample_n(&Series::new("s", &[rows.item]), replace, shuffle, seed)
.map_err(|e| { .map_err(|e| {
ShellError::GenericError( ShellError::GenericError(
"Error creating sample".into(), "Error creating sample".into(),
@ -106,7 +108,7 @@ fn command(
}), }),
(None, Some(frac)) => df (None, Some(frac)) => df
.as_ref() .as_ref()
.sample_frac(frac.item, replace, shuffle, seed) .sample_frac(&Series::new("frac", &[frac.item]), replace, shuffle, seed)
.map_err(|e| { .map_err(|e| {
ShellError::GenericError( ShellError::GenericError(
"Error creating sample".into(), "Error creating sample".into(),

View file

@ -2,7 +2,8 @@ use crate::dataframe::eager::sql_expr::parse_sql_expr;
use polars::error::{ErrString, PolarsError}; use polars::error::{ErrString, PolarsError};
use polars::prelude::{col, DataFrame, DataType, IntoLazy, LazyFrame}; use polars::prelude::{col, DataFrame, DataType, IntoLazy, LazyFrame};
use sqlparser::ast::{ use sqlparser::ast::{
Expr as SqlExpr, Select, SelectItem, SetExpr, Statement, TableFactor, Value as SQLValue, Expr as SqlExpr, GroupByExpr, Select, SelectItem, SetExpr, Statement, TableFactor,
Value as SQLValue,
}; };
use sqlparser::dialect::GenericDialect; use sqlparser::dialect::GenericDialect;
use sqlparser::parser::Parser; use sqlparser::parser::Parser;
@ -96,8 +97,13 @@ impl SQLContext {
.collect::<Result<Vec<_>, PolarsError>>()?; .collect::<Result<Vec<_>, PolarsError>>()?;
// Check for group by // Check for group by
// After projection since there might be number. // After projection since there might be number.
let group_by = select_stmt let group_by = match &select_stmt.group_by {
.group_by GroupByExpr::All =>
Err(
PolarsError::ComputeError("Group-By Error: Only positive number or expression are supported, not all".into())
)?,
GroupByExpr::Expressions(expressions) => expressions
}
.iter() .iter()
.map( .map(
|e|match e { |e|match e {

View file

@ -2,8 +2,8 @@ use polars::error::PolarsError;
use polars::prelude::{col, lit, DataType, Expr, LiteralValue, PolarsResult as Result, TimeUnit}; use polars::prelude::{col, lit, DataType, Expr, LiteralValue, PolarsResult as Result, TimeUnit};
use sqlparser::ast::{ use sqlparser::ast::{
BinaryOperator as SQLBinaryOperator, DataType as SQLDataType, Expr as SqlExpr, ArrayElemTypeDef, BinaryOperator as SQLBinaryOperator, DataType as SQLDataType,
Function as SQLFunction, Value as SqlValue, WindowType, Expr as SqlExpr, Function as SQLFunction, Value as SqlValue, WindowType,
}; };
fn map_sql_polars_datatype(data_type: &SQLDataType) -> Result<DataType> { fn map_sql_polars_datatype(data_type: &SQLDataType) -> Result<DataType> {
@ -13,7 +13,7 @@ fn map_sql_polars_datatype(data_type: &SQLDataType) -> Result<DataType> {
| SQLDataType::Uuid | SQLDataType::Uuid
| SQLDataType::Clob(_) | SQLDataType::Clob(_)
| SQLDataType::Text | SQLDataType::Text
| SQLDataType::String => DataType::Utf8, | SQLDataType::String(_) => DataType::Utf8,
SQLDataType::Float(_) => DataType::Float32, SQLDataType::Float(_) => DataType::Float32,
SQLDataType::Real => DataType::Float32, SQLDataType::Real => DataType::Float32,
SQLDataType::Double => DataType::Float64, SQLDataType::Double => DataType::Float64,
@ -31,9 +31,12 @@ fn map_sql_polars_datatype(data_type: &SQLDataType) -> Result<DataType> {
SQLDataType::Time(_, _) => DataType::Time, SQLDataType::Time(_, _) => DataType::Time,
SQLDataType::Timestamp(_, _) => DataType::Datetime(TimeUnit::Microseconds, None), SQLDataType::Timestamp(_, _) => DataType::Datetime(TimeUnit::Microseconds, None),
SQLDataType::Interval => DataType::Duration(TimeUnit::Microseconds), SQLDataType::Interval => DataType::Duration(TimeUnit::Microseconds),
SQLDataType::Array(inner_type) => match inner_type { SQLDataType::Array(array_type_def) => match array_type_def {
Some(inner_type) => DataType::List(Box::new(map_sql_polars_datatype(inner_type)?)), ArrayElemTypeDef::AngleBracket(inner_type)
None => { | ArrayElemTypeDef::SquareBracket(inner_type) => {
DataType::List(Box::new(map_sql_polars_datatype(inner_type)?))
}
_ => {
return Err(PolarsError::ComputeError( return Err(PolarsError::ComputeError(
"SQL Datatype Array(None) was not supported in polars-sql yet!".into(), "SQL Datatype Array(None) was not supported in polars-sql yet!".into(),
)) ))
@ -114,7 +117,11 @@ pub fn parse_sql_expr(expr: &SqlExpr) -> Result<Expr> {
binary_op_(left, right, op)? binary_op_(left, right, op)?
} }
SqlExpr::Function(sql_function) => parse_sql_function(sql_function)?, SqlExpr::Function(sql_function) => parse_sql_function(sql_function)?,
SqlExpr::Cast { expr, data_type } => cast_(parse_sql_expr(expr)?, data_type)?, SqlExpr::Cast {
expr,
data_type,
format: _,
} => cast_(parse_sql_expr(expr)?, data_type)?,
SqlExpr::Nested(expr) => parse_sql_expr(expr)?, SqlExpr::Nested(expr) => parse_sql_expr(expr)?,
SqlExpr::Value(value) => literal_expr(value)?, SqlExpr::Value(value) => literal_expr(value)?,
_ => { _ => {

View file

@ -87,9 +87,9 @@ fn command(
let writer = CsvWriter::new(&mut file); let writer = CsvWriter::new(&mut file);
let writer = if no_header { let writer = if no_header {
writer.has_header(false) writer.include_header(false)
} else { } else {
writer.has_header(true) writer.include_header(true)
}; };
let mut writer = match delimiter { let mut writer = match delimiter {
@ -109,7 +109,7 @@ fn command(
None => unreachable!(), None => unreachable!(),
}; };
writer.with_delimiter(delimiter) writer.with_separator(delimiter)
} }
} }
}; };

View file

@ -171,7 +171,7 @@ fn get_col_name(expr: &Expr) -> Option<String> {
| Expr::Slice { input: expr, .. } | Expr::Slice { input: expr, .. }
| Expr::Cast { expr, .. } | Expr::Cast { expr, .. }
| Expr::Sort { expr, .. } | Expr::Sort { expr, .. }
| Expr::Take { expr, .. } | Expr::Gather { expr, .. }
| Expr::SortBy { expr, .. } | Expr::SortBy { expr, .. }
| Expr::Exclude(expr, _) | Expr::Exclude(expr, _)
| Expr::Alias(expr, _) | Expr::Alias(expr, _)
@ -189,6 +189,7 @@ fn get_col_name(expr: &Expr) -> Option<String> {
| Expr::RenameAlias { .. } | Expr::RenameAlias { .. }
| Expr::Count | Expr::Count
| Expr::Nth(_) | Expr::Nth(_)
| Expr::SubPlan(_, _)
| Expr::Selector(_) => None, | Expr::Selector(_) => None,
} }
} }

View file

@ -8,6 +8,7 @@ use nu_protocol::{
Value, Value,
}; };
use polars::prelude::{DataType, IntoSeries}; use polars::prelude::{DataType, IntoSeries};
use polars_ops::prelude::{cum_max, cum_min, cum_sum};
enum CumType { enum CumType {
Min, Min,
@ -119,10 +120,19 @@ fn command(
let cum_type = CumType::from_str(&cum_type.item, cum_type.span)?; let cum_type = CumType::from_str(&cum_type.item, cum_type.span)?;
let mut res = match cum_type { let mut res = match cum_type {
CumType::Max => series.cummax(reverse), CumType::Max => cum_max(&series, reverse),
CumType::Min => series.cummin(reverse), CumType::Min => cum_min(&series, reverse),
CumType::Sum => series.cumsum(reverse), CumType::Sum => cum_sum(&series, reverse),
}; }
.map_err(|e| {
ShellError::GenericError(
"Error creating cumulative".into(),
e.to_string(),
Some(call.head),
None,
Vec::new(),
)
})?;
let name = format!("{}_{}", series.name(), cum_type.to_str()); let name = format!("{}_{}", series.name(), cum_type.to_str());
res.rename(&name); res.rename(&name);

View file

@ -9,6 +9,8 @@ use nu_protocol::{
Category, Example, PipelineData, ShellError, Signature, Span, SyntaxShape, Type, Value, Category, Example, PipelineData, ShellError, Signature, Span, SyntaxShape, Type, Value,
}; };
use polars_plan::prelude::lit;
#[derive(Clone)] #[derive(Clone)]
pub struct Shift; pub struct Shift;
@ -98,7 +100,7 @@ fn command_lazy(
let lazy: NuLazyFrame = match fill { let lazy: NuLazyFrame = match fill {
Some(fill) => { Some(fill) => {
let expr = NuExpression::try_from_value(fill)?.into_polars(); let expr = NuExpression::try_from_value(fill)?.into_polars();
lazy.shift_and_fill(shift, expr).into() lazy.shift_and_fill(lit(shift), expr).into()
} }
None => lazy.shift(shift).into(), None => lazy.shift(shift).into(),
}; };

View file

@ -73,7 +73,7 @@ fn command(
) )
})?; })?;
let res = chunked.as_ref().str_lengths().into_series(); let res = chunked.as_ref().str_len_bytes().into_series();
NuDataFrame::try_from_series(vec![res], call.head) NuDataFrame::try_from_series(vec![res], call.head)
.map(|df| PipelineData::Value(NuDataFrame::into_value(df, call.head), None)) .map(|df| PipelineData::Value(NuDataFrame::into_value(df, call.head), None))

View file

@ -85,15 +85,7 @@ fn command(
) )
})?; })?;
let mut res = chunked.str_slice(start, length).map_err(|e| { let mut res = chunked.str_slice(start, length);
ShellError::GenericError(
"Error slicing series".into(),
e.to_string(),
Some(call.head),
None,
Vec::new(),
)
})?;
res.rename(series.name()); res.rename(series.name());
NuDataFrame::try_from_series(vec![res.into_series()], call.head) NuDataFrame::try_from_series(vec![res.into_series()], call.head)

View file

@ -9,6 +9,7 @@ pub use operations::Axis;
use indexmap::map::IndexMap; use indexmap::map::IndexMap;
use nu_protocol::{did_you_mean, PipelineData, Record, ShellError, Span, Value}; use nu_protocol::{did_you_mean, PipelineData, Record, ShellError, Span, Value};
use polars::prelude::{DataFrame, DataType, IntoLazy, LazyFrame, PolarsObject, Series}; use polars::prelude::{DataFrame, DataType, IntoLazy, LazyFrame, PolarsObject, Series};
use polars_arrow::util::total_ord::TotalEq;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{cmp::Ordering, fmt::Display, hash::Hasher}; use std::{cmp::Ordering, fmt::Display, hash::Hasher};
@ -61,6 +62,12 @@ impl std::hash::Hash for DataFrameValue {
} }
} }
impl TotalEq for DataFrameValue {
fn tot_eq(&self, other: &Self) -> bool {
self == other
}
}
impl PolarsObject for DataFrameValue { impl PolarsObject for DataFrameValue {
fn type_name() -> &'static str { fn type_name() -> &'static str {
"object" "object"

View file

@ -299,7 +299,11 @@ pub fn expr_to_value(expr: &Expr, span: Span) -> Result<Value, ShellError> {
}, },
span, span,
)), )),
Expr::Take { expr, idx } => Ok(Value::record( Expr::Gather {
expr,
idx,
returns_scalar: _,
} => Ok(Value::record(
record! { record! {
"expr" => expr_to_value(expr.as_ref(), span)?, "expr" => expr_to_value(expr.as_ref(), span)?,
"idx" => expr_to_value(idx.as_ref(), span)?, "idx" => expr_to_value(idx.as_ref(), span)?,
@ -401,7 +405,6 @@ pub fn expr_to_value(expr: &Expr, span: Span) -> Result<Value, ShellError> {
Expr::Window { Expr::Window {
function, function,
partition_by, partition_by,
order_by,
options, options,
} => { } => {
let partition_by: Result<Vec<Value>, ShellError> = partition_by let partition_by: Result<Vec<Value>, ShellError> = partition_by
@ -409,22 +412,21 @@ pub fn expr_to_value(expr: &Expr, span: Span) -> Result<Value, ShellError> {
.map(|e| expr_to_value(e, span)) .map(|e| expr_to_value(e, span))
.collect(); .collect();
let order_by = order_by
.as_ref()
.map(|e| expr_to_value(e.as_ref(), span))
.transpose()?
.unwrap_or_else(|| Value::nothing(span));
Ok(Value::record( Ok(Value::record(
record! { record! {
"function" => expr_to_value(function, span)?, "function" => expr_to_value(function, span)?,
"partition_by" => Value::list(partition_by?, span), "partition_by" => Value::list(partition_by?, span),
"order_by" => order_by,
"options" => Value::string(format!("{options:?}"), span), "options" => Value::string(format!("{options:?}"), span),
}, },
span, span,
)) ))
} }
Expr::SubPlan(_, _) => Err(ShellError::UnsupportedInput {
msg: "Expressions of type SubPlan are not yet supported".to_string(),
input: format!("Expression is {expr:?}"),
msg_span: span,
input_span: Span::unknown(),
}),
// the parameter polars_plan::dsl::selector::Selector is not publicly exposed. // the parameter polars_plan::dsl::selector::Selector is not publicly exposed.
// I am not sure what we can meaningfully do with this at this time. // I am not sure what we can meaningfully do with this at this time.
Expr::Selector(_) => Err(ShellError::UnsupportedInput { Expr::Selector(_) => Err(ShellError::UnsupportedInput {