From 9d8d305e9d0233274ceb961ab96a0d9ae33a1760 Mon Sep 17 00:00:00 2001 From: Fernando Herrera Date: Sun, 14 Aug 2022 14:06:31 +0200 Subject: [PATCH] lazy dataframe reader (#6321) * lazy dataframe reader * correct space for polars dependencies --- crates/nu-command/Cargo.toml | 29 +- crates/nu-command/src/dataframe/eager/open.rs | 283 ++++++++++++------ 2 files changed, 214 insertions(+), 98 deletions(-) diff --git a/crates/nu-command/Cargo.toml b/crates/nu-command/Cargo.toml index 8275277190..41eaab670c 100644 --- a/crates/nu-command/Cargo.toml +++ b/crates/nu-command/Cargo.toml @@ -103,11 +103,30 @@ optional = true version = "0.23.2" optional = true features = [ - "default", "to_dummies", "parquet", "json", "serde", "serde-lazy", - "object", "checked_arithmetic", "strings", "cum_agg", "is_in", - "rolling_window", "strings", "rows", "random", - "dtype-datetime", "dtype-struct", "lazy", "cross_join", - "dynamic_groupby", "dtype-categorical", "concat_str", "arg_where" + "arg_where", + "checked_arithmetic", + "concat_str", + "cross_join", + "csv-file", + "cum_agg", + "default", + "dtype-datetime", + "dtype-struct", + "dtype-categorical", + "dynamic_groupby", + "is_in", + "json", + "lazy", + "object", + "parquet", + "random", + "rolling_window", + "rows", + "serde", + "serde-lazy", + "strings", + "strings", + "to_dummies", ] [target.'cfg(windows)'.dependencies.windows] diff --git a/crates/nu-command/src/dataframe/eager/open.rs b/crates/nu-command/src/dataframe/eager/open.rs index b69439634d..de20048009 100644 --- a/crates/nu-command/src/dataframe/eager/open.rs +++ b/crates/nu-command/src/dataframe/eager/open.rs @@ -1,14 +1,17 @@ -use super::super::values::NuDataFrame; +use super::super::values::{NuDataFrame, NuLazyFrame}; use nu_engine::CallExt; use nu_protocol::{ ast::Call, engine::{Command, EngineState, Stack}, - Category, Example, PipelineData, ShellError, Signature, Spanned, SyntaxShape, Type, + Category, Example, PipelineData, ShellError, Signature, Spanned, SyntaxShape, Type, Value, }; use std::{fs::File, io::BufReader, path::PathBuf}; -use polars::prelude::{CsvEncoding, CsvReader, JsonReader, ParquetReader, SerReader}; +use polars::prelude::{ + CsvEncoding, CsvReader, JsonReader, LazyCsvReader, LazyFrame, ParallelStrategy, ParquetReader, + ScanArgsParquet, SerReader, +}; #[derive(Clone)] pub struct OpenDataFrame; @@ -29,6 +32,7 @@ impl Command for OpenDataFrame { SyntaxShape::Filepath, "file path to load values from", ) + .switch("lazy", "creates a lazy dataframe", Some('l')) .named( "delimiter", SyntaxShape::String, @@ -87,7 +91,6 @@ fn command( stack: &mut Stack, call: &Call, ) -> Result { - let span = call.head; let file: Spanned = call.req(engine_state, stack, 0)?; match file.item.extension() { @@ -105,49 +108,80 @@ fn command( file.span, )), } - .map(|df| PipelineData::Value(NuDataFrame::dataframe_into_value(df, span), None)) + .map(|value| PipelineData::Value(value, None)) } fn from_parquet( engine_state: &EngineState, stack: &mut Stack, call: &Call, -) -> Result { - let file: Spanned = call.req(engine_state, stack, 0)?; - let columns: Option> = call.get_flag(engine_state, stack, "columns")?; +) -> Result { + if call.has_flag("lazy") { + let file: String = call.req(engine_state, stack, 0)?; + let args = ScanArgsParquet { + n_rows: None, + cache: true, + parallel: ParallelStrategy::Auto, + rechunk: false, + row_count: None, + low_memory: false, + }; - let r = File::open(&file.item).map_err(|e| { - ShellError::GenericError( - "Error opening file".into(), - e.to_string(), - Some(file.span), - None, - Vec::new(), - ) - })?; - let reader = ParquetReader::new(r); + let df: NuLazyFrame = LazyFrame::scan_parquet(file, args) + .map_err(|e| { + ShellError::GenericError( + "Parquet reader error".into(), + format!("{:?}", e), + Some(call.head), + None, + Vec::new(), + ) + })? + .into(); - let reader = match columns { - None => reader, - Some(columns) => reader.with_columns(Some(columns)), - }; + df.into_value(call.head) + } else { + let file: Spanned = call.req(engine_state, stack, 0)?; + let columns: Option> = call.get_flag(engine_state, stack, "columns")?; - reader.finish().map_err(|e| { - ShellError::GenericError( - "Parquet reader error".into(), - format!("{:?}", e), - Some(call.head), - None, - Vec::new(), - ) - }) + let r = File::open(&file.item).map_err(|e| { + ShellError::GenericError( + "Error opening file".into(), + e.to_string(), + Some(file.span), + None, + Vec::new(), + ) + })?; + let reader = ParquetReader::new(r); + + let reader = match columns { + None => reader, + Some(columns) => reader.with_columns(Some(columns)), + }; + + let df: NuDataFrame = reader + .finish() + .map_err(|e| { + ShellError::GenericError( + "Parquet reader error".into(), + format!("{:?}", e), + Some(call.head), + None, + Vec::new(), + ) + })? + .into(); + + Ok(df.into_value(call.head)) + } } fn from_json( engine_state: &EngineState, stack: &mut Stack, call: &Call, -) -> Result { +) -> Result { let file: Spanned = call.req(engine_state, stack, 0)?; let file = File::open(&file.item).map_err(|e| { ShellError::GenericError( @@ -162,86 +196,149 @@ fn from_json( let buf_reader = BufReader::new(file); let reader = JsonReader::new(buf_reader); - reader.finish().map_err(|e| { - ShellError::GenericError( - "Json reader error".into(), - format!("{:?}", e), - Some(call.head), - None, - Vec::new(), - ) - }) + let df: NuDataFrame = reader + .finish() + .map_err(|e| { + ShellError::GenericError( + "Json reader error".into(), + format!("{:?}", e), + Some(call.head), + None, + Vec::new(), + ) + })? + .into(); + + Ok(df.into_value(call.head)) } fn from_csv( engine_state: &EngineState, stack: &mut Stack, call: &Call, -) -> Result { - let file: Spanned = call.req(engine_state, stack, 0)?; +) -> Result { let delimiter: Option> = call.get_flag(engine_state, stack, "delimiter")?; let no_header: bool = call.has_flag("no-header"); let infer_schema: Option = call.get_flag(engine_state, stack, "infer-schema")?; let skip_rows: Option = call.get_flag(engine_state, stack, "skip-rows")?; let columns: Option> = call.get_flag(engine_state, stack, "columns")?; - let csv_reader = CsvReader::from_path(&file.item) - .map_err(|e| { - ShellError::GenericError( - "Error creating CSV reader".into(), - e.to_string(), - Some(file.span), - None, - Vec::new(), - ) - })? - .with_encoding(CsvEncoding::LossyUtf8); + if call.has_flag("lazy") { + let file: String = call.req(engine_state, stack, 0)?; + let csv_reader = LazyCsvReader::new(file); - let csv_reader = match delimiter { - None => csv_reader, - Some(d) => { - if d.item.len() != 1 { - return Err(ShellError::GenericError( - "Incorrect delimiter".into(), - "Delimiter has to be one character".into(), - Some(d.span), + let csv_reader = match delimiter { + None => csv_reader, + Some(d) => { + if d.item.len() != 1 { + return Err(ShellError::GenericError( + "Incorrect delimiter".into(), + "Delimiter has to be one character".into(), + Some(d.span), + None, + Vec::new(), + )); + } else { + let delimiter = match d.item.chars().next() { + Some(d) => d as u8, + None => unreachable!(), + }; + csv_reader.with_delimiter(delimiter) + } + } + }; + + let csv_reader = csv_reader.has_header(!no_header); + + let csv_reader = match infer_schema { + None => csv_reader, + Some(r) => csv_reader.with_infer_schema_length(Some(r)), + }; + + let csv_reader = match skip_rows { + None => csv_reader, + Some(r) => csv_reader.with_skip_rows(r), + }; + + let df: NuLazyFrame = csv_reader + .finish() + .map_err(|e| { + ShellError::GenericError( + "Parquet reader error".into(), + format!("{:?}", e), + Some(call.head), None, Vec::new(), - )); - } else { - let delimiter = match d.item.chars().next() { - Some(d) => d as u8, - None => unreachable!(), - }; - csv_reader.with_delimiter(delimiter) + ) + })? + .into(); + + df.into_value(call.head) + } else { + let file: Spanned = call.req(engine_state, stack, 0)?; + let csv_reader = CsvReader::from_path(&file.item) + .map_err(|e| { + ShellError::GenericError( + "Error creating CSV reader".into(), + e.to_string(), + Some(file.span), + None, + Vec::new(), + ) + })? + .with_encoding(CsvEncoding::LossyUtf8); + + let csv_reader = match delimiter { + None => csv_reader, + Some(d) => { + if d.item.len() != 1 { + return Err(ShellError::GenericError( + "Incorrect delimiter".into(), + "Delimiter has to be one character".into(), + Some(d.span), + None, + Vec::new(), + )); + } else { + let delimiter = match d.item.chars().next() { + Some(d) => d as u8, + None => unreachable!(), + }; + csv_reader.with_delimiter(delimiter) + } } - } - }; + }; - let csv_reader = csv_reader.has_header(!no_header); + let csv_reader = csv_reader.has_header(!no_header); - let csv_reader = match infer_schema { - None => csv_reader, - Some(r) => csv_reader.infer_schema(Some(r)), - }; + let csv_reader = match infer_schema { + None => csv_reader, + Some(r) => csv_reader.infer_schema(Some(r)), + }; - let csv_reader = match skip_rows { - None => csv_reader, - Some(r) => csv_reader.with_skip_rows(r), - }; + let csv_reader = match skip_rows { + None => csv_reader, + Some(r) => csv_reader.with_skip_rows(r), + }; - let csv_reader = match columns { - None => csv_reader, - Some(columns) => csv_reader.with_columns(Some(columns)), - }; + let csv_reader = match columns { + None => csv_reader, + Some(columns) => csv_reader.with_columns(Some(columns)), + }; - csv_reader.finish().map_err(|e| { - ShellError::GenericError( - "Parquet reader error".into(), - format!("{:?}", e), - Some(call.head), - None, - Vec::new(), - ) - }) + let df: NuDataFrame = csv_reader + .finish() + .map_err(|e| { + ShellError::GenericError( + "Parquet reader error".into(), + format!("{:?}", e), + Some(call.head), + None, + Vec::new(), + ) + })? + .into(); + + Ok(df.into_value(call.head)) + } }