diff --git a/crates/nu_plugin_polars/Cargo.toml b/crates/nu_plugin_polars/Cargo.toml index 3c916286af..9636ce98a4 100644 --- a/crates/nu_plugin_polars/Cargo.toml +++ b/crates/nu_plugin_polars/Cargo.toml @@ -70,6 +70,7 @@ features = [ "serde", "serde-lazy", "strings", + "streaming", "to_dummies", ] optional = false @@ -81,4 +82,4 @@ nu-engine = { path = "../nu-engine", version = "0.96.2" } nu-parser = { path = "../nu-parser", version = "0.96.2" } nu-command = { path = "../nu-command", version = "0.96.2" } nu-plugin-test-support = { path = "../nu-plugin-test-support", version = "0.96.2" } -tempfile.workspace = true \ No newline at end of file +tempfile.workspace = true diff --git a/crates/nu_plugin_polars/src/dataframe/lazy/mod.rs b/crates/nu_plugin_polars/src/dataframe/lazy/mod.rs index e70143e6ce..e1944e06a1 100644 --- a/crates/nu_plugin_polars/src/dataframe/lazy/mod.rs +++ b/crates/nu_plugin_polars/src/dataframe/lazy/mod.rs @@ -12,6 +12,7 @@ mod macro_commands; mod median; mod quantile; mod select; +mod sink; mod sort_by_expr; mod to_lazy; @@ -53,5 +54,6 @@ pub(crate) fn lazy_commands() -> Vec &str { + "polars sink" + } + + fn usage(&self) -> &str { + "Streams a collect result to a file. This is useful if the result is too large for memory. Supports parquet, ipc/arrow, csv, and json formats." + } + + fn signature(&self) -> Signature { + Signature::build(self.name()) + .required("path", SyntaxShape::Filepath, "Path to write to.") + .named( + "type", + SyntaxShape::String, + "File type: csv, json, parquet, arrow/ipc. If omitted, derive from file extension", + Some('t'), + ) + .input_output_type(Type::Any, Type::String) + .category(Category::Custom("lazyframe".into())) + } + + fn examples(&self) -> Vec { + vec![Example { + description: "Collect and save the output to the specified file", + example: "[[a b];[1 2] [3 4]] | polars into-lazy | polars sink /tmp/foo.parquet", + result: None, + }] + } + + fn run( + &self, + plugin: &Self::Plugin, + engine: &EngineInterface, + call: &EvaluatedCall, + input: PipelineData, + ) -> Result { + let value = input.into_value(call.head)?; + + match PolarsPluginObject::try_from_value(plugin, &value)? { + PolarsPluginObject::NuDataFrame(df) => command(plugin, engine, call, df.lazy()), + PolarsPluginObject::NuLazyFrame(lazy) => command(plugin, engine, call, lazy), + _ => Err(cant_convert_err( + &value, + &[PolarsPluginType::NuDataFrame, PolarsPluginType::NuLazyFrame], + )), + } + .map_err(LabeledError::from) + } +} + +fn command( + _plugin: &PolarsPlugin, + engine: &EngineInterface, + call: &EvaluatedCall, + lazy: NuLazyFrame, +) -> Result { + let spanned_file: Spanned = call.req(0)?; + let file_path = expand_path_with(&spanned_file.item, engine.get_current_dir()?, true); + let file_span = spanned_file.span; + let type_option: Option> = call.get_flag("type")?; + let type_id = match &type_option { + Some(ref t) => Some((t.item.to_owned(), "Invalid type", t.span)), + None => file_path.extension().map(|e| { + ( + e.to_string_lossy().into_owned(), + "Invalid extension", + spanned_file.span, + ) + }), + }; + + let polars_df = lazy.to_polars(); + + match type_id { + Some((e, msg, blamed)) => match e.as_str() { + "parquet" | "parq" => polars_df + .sink_parquet(&file_path, ParquetWriteOptions::default()) + .map_err(|e| file_save_error(e, file_span))?, + "csv" => polars_df + .sink_csv(&file_path, CsvWriterOptions::default()) + .map_err(|e| file_save_error(e, file_span))?, + "ipc" | "arrow" => polars_df + .sink_ipc(&file_path, IpcWriterOptions::default()) + .map_err(|e| file_save_error(e, file_span))?, + "json" | "jsonl" | "ndjson" => polars_df + .sink_json(&file_path, JsonWriterOptions::default()) + .map_err(|e| file_save_error(e, file_span))?, + _ => Err(ShellError::FileNotFoundCustom { + msg: format!("{msg}. Supported values: csv, tsv, parquet, ipc, arrow, json, jsonl"), + span: blamed, + })?, + }, + None => Err(ShellError::FileNotFoundCustom { + msg: "File without extension".into(), + span: spanned_file.span, + })?, + }; + let file_value = Value::string(format!("saved {:?}", &file_path), file_span); + + Ok(PipelineData::Value( + Value::list(vec![file_value], call.head), + None, + )) +} + +fn file_save_error(e: PolarsError, span: Span) -> ShellError { + ShellError::GenericError { + error: "Error saving file".into(), + msg: e.to_string(), + span: Some(span), + help: None, + inner: vec![], + } +} + +#[cfg(test)] +pub mod test { + use nu_plugin_test_support::PluginTest; + use nu_protocol::{Span, Value}; + use uuid::Uuid; + + use crate::PolarsPlugin; + + pub fn test_sink(extension: &str) -> Result<(), Box> { + let tmp_dir = tempfile::tempdir()?; + let mut tmp_file = tmp_dir.path().to_owned(); + tmp_file.push(format!("{}.{}", Uuid::new_v4(), extension)); + let tmp_file_str = tmp_file.to_str().expect("should be able to get file path"); + + let cmd = format!( + "[[a b]; [1 2] [3 4]] | polars into-lazy | polars sink {}", + tmp_file_str + ); + let mut plugin_test = PluginTest::new("polars", PolarsPlugin::default().into())?; + plugin_test.engine_state_mut().add_env_var( + "PWD".to_string(), + Value::string( + tmp_dir + .path() + .to_str() + .expect("should be able to get path") + .to_owned(), + Span::test_data(), + ), + ); + let pipeline_data = plugin_test.eval(&cmd)?; + + assert!(tmp_file.exists()); + + let value = pipeline_data.into_value(Span::test_data())?; + let list = value.as_list()?; + assert_eq!(list.len(), 1); + let msg = list.first().expect("should have a value").as_str()?; + assert!(msg.contains("saved")); + + Ok(()) + } + + #[test] + pub fn test_to_parquet() -> Result<(), Box> { + test_sink("parquet") + } + + #[test] + pub fn test_to_ipc() -> Result<(), Box> { + test_sink("ipc") + } + + #[test] + pub fn test_to_csv() -> Result<(), Box> { + test_sink("csv") + } + + #[test] + pub fn test_to_json() -> Result<(), Box> { + test_sink("ndjson") + } +}