Handle relative paths correctly on polars to-(parquet|jsonl|arrow|etc) commands (#12486)

# Description

All polars commands that output a file were not handling relative paths
correctly.

A command like
``` [[a b]; [6 2] [1 4] [4 1]] | polars into-df | polars to-parquet foo.json``` 
was outputting the foo.json to the directory of the plugin executable. 

This pull request pulls in nu-path and using it for resolving the file paths.

Related discussion
https://discord.com/channels/601130461678272522/1227612017171501136/1227889870358183966

# User-Facing Changes
None

# Tests + Formatting
Done, added tests for each of the polars to-* commands.

---------

Co-authored-by: Jack Wright <jack.wright@disqo.com>
This commit is contained in:
Jack Wright 2024-04-12 17:30:37 -07:00 committed by GitHub
parent b7fb0af967
commit f975c9923a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 297 additions and 46 deletions

2
Cargo.lock generated
View file

@ -3246,6 +3246,7 @@ dependencies = [
"nu-command",
"nu-engine",
"nu-parser",
"nu-path",
"nu-plugin",
"nu-plugin-test-support",
"nu-protocol",
@ -3258,6 +3259,7 @@ dependencies = [
"polars-utils",
"serde",
"sqlparser 0.43.1",
"tempfile",
"typetag",
"uuid",
]

View file

@ -19,6 +19,7 @@ bench = false
[dependencies]
nu-protocol = { path = "../nu-protocol", version = "0.92.3" }
nu-plugin = { path = "../nu-plugin", version = "0.92.3" }
nu-path = { path = "../nu-path", version = "0.92.3" }
# Potential dependencies for extras
chrono = { workspace = true, features = ["std", "unstable-locales"], default-features = false }
@ -76,3 +77,4 @@ nu-engine = { path = "../nu-engine", version = "0.92.3" }
nu-parser = { path = "../nu-parser", version = "0.92.3" }
nu-command = { path = "../nu-command", version = "0.92.3" }
nu-plugin-test-support = { path = "../nu-plugin-test-support", version = "0.92.3" }
tempfile.workspace = true

View file

@ -3,15 +3,20 @@ use crate::{
values::{cache_and_to_value, NuLazyFrame},
PolarsPlugin,
};
use nu_path::expand_path_with;
use super::super::values::NuDataFrame;
use nu_plugin::PluginCommand;
use nu_protocol::{
Category, Example, LabeledError, PipelineData, ShellError, Signature, Spanned, SyntaxShape,
Type, Value,
Category, Example, LabeledError, PipelineData, ShellError, Signature, Span, Spanned,
SyntaxShape, Type, Value,
};
use std::{fs::File, io::BufReader, path::PathBuf};
use std::{
fs::File,
io::BufReader,
path::{Path, PathBuf},
};
use polars::prelude::{
CsvEncoding, CsvReader, IpcReader, JsonFormat, JsonReader, LazyCsvReader, LazyFileListReader,
@ -111,29 +116,31 @@ fn command(
engine: &nu_plugin::EngineInterface,
call: &nu_plugin::EvaluatedCall,
) -> Result<PipelineData, ShellError> {
let file: Spanned<PathBuf> = call.req(0)?;
let spanned_file: Spanned<PathBuf> = call.req(0)?;
let file_path = expand_path_with(&spanned_file.item, engine.get_current_dir()?, true);
let file_span = spanned_file.span;
let type_option: Option<Spanned<String>> = call.get_flag("type")?;
let type_id = match &type_option {
Some(ref t) => Some((t.item.to_owned(), "Invalid type", t.span)),
None => file.item.extension().map(|e| {
None => file_path.extension().map(|e| {
(
e.to_string_lossy().into_owned(),
"Invalid extension",
file.span,
spanned_file.span,
)
}),
};
match type_id {
Some((e, msg, blamed)) => match e.as_str() {
"csv" | "tsv" => from_csv(plugin, engine, call),
"parquet" | "parq" => from_parquet(plugin, engine, call),
"ipc" | "arrow" => from_ipc(plugin, engine, call),
"json" => from_json(plugin, engine, call),
"jsonl" => from_jsonl(plugin, engine, call),
"avro" => from_avro(plugin, engine, call),
"csv" | "tsv" => from_csv(plugin, engine, call, &file_path, file_span),
"parquet" | "parq" => from_parquet(plugin, engine, call, &file_path, file_span),
"ipc" | "arrow" => from_ipc(plugin, engine, call, &file_path, file_span),
"json" => from_json(plugin, engine, call, &file_path, file_span),
"jsonl" => from_jsonl(plugin, engine, call, &file_path, file_span),
"avro" => from_avro(plugin, engine, call, &file_path, file_span),
_ => Err(ShellError::FileNotFoundCustom {
msg: format!(
"{msg}. Supported values: csv, tsv, parquet, ipc, arrow, json, jsonl, avro"
@ -143,7 +150,7 @@ fn command(
},
None => Err(ShellError::FileNotFoundCustom {
msg: "File without extension".into(),
span: file.span,
span: spanned_file.span,
}),
}
.map(|value| PipelineData::Value(value, None))
@ -153,6 +160,8 @@ fn from_parquet(
plugin: &PolarsPlugin,
engine: &nu_plugin::EngineInterface,
call: &nu_plugin::EvaluatedCall,
file_path: &Path,
file_span: Span,
) -> Result<Value, ShellError> {
if call.has_flag("lazy")? {
let file: String = call.req(0)?;
@ -180,13 +189,12 @@ fn from_parquet(
cache_and_to_value(plugin, engine, call.head, df)
} else {
let file: Spanned<PathBuf> = call.req(0)?;
let columns: Option<Vec<String>> = call.get_flag("columns")?;
let r = File::open(&file.item).map_err(|e| ShellError::GenericError {
let r = File::open(file_path).map_err(|e| ShellError::GenericError {
error: "Error opening file".into(),
msg: e.to_string(),
span: Some(file.span),
span: Some(file_span),
help: None,
inner: vec![],
})?;
@ -216,14 +224,15 @@ fn from_avro(
plugin: &PolarsPlugin,
engine: &nu_plugin::EngineInterface,
call: &nu_plugin::EvaluatedCall,
file_path: &Path,
file_span: Span,
) -> Result<Value, ShellError> {
let file: Spanned<PathBuf> = call.req(0)?;
let columns: Option<Vec<String>> = call.get_flag("columns")?;
let r = File::open(&file.item).map_err(|e| ShellError::GenericError {
let r = File::open(file_path).map_err(|e| ShellError::GenericError {
error: "Error opening file".into(),
msg: e.to_string(),
span: Some(file.span),
span: Some(file_span),
help: None,
inner: vec![],
})?;
@ -252,6 +261,8 @@ fn from_ipc(
plugin: &PolarsPlugin,
engine: &nu_plugin::EngineInterface,
call: &nu_plugin::EvaluatedCall,
file_path: &Path,
file_span: Span,
) -> Result<Value, ShellError> {
if call.has_flag("lazy")? {
let file: String = call.req(0)?;
@ -275,13 +286,12 @@ fn from_ipc(
cache_and_to_value(plugin, engine, call.head, df)
} else {
let file: Spanned<PathBuf> = call.req(0)?;
let columns: Option<Vec<String>> = call.get_flag("columns")?;
let r = File::open(&file.item).map_err(|e| ShellError::GenericError {
let r = File::open(file_path).map_err(|e| ShellError::GenericError {
error: "Error opening file".into(),
msg: e.to_string(),
span: Some(file.span),
span: Some(file_span),
help: None,
inner: vec![],
})?;
@ -311,12 +321,13 @@ fn from_json(
plugin: &PolarsPlugin,
engine: &nu_plugin::EngineInterface,
call: &nu_plugin::EvaluatedCall,
file_path: &Path,
file_span: Span,
) -> Result<Value, ShellError> {
let file: Spanned<PathBuf> = call.req(0)?;
let file = File::open(&file.item).map_err(|e| ShellError::GenericError {
let file = File::open(file_path).map_err(|e| ShellError::GenericError {
error: "Error opening file".into(),
msg: e.to_string(),
span: Some(file.span),
span: Some(file_span),
help: None,
inner: vec![],
})?;
@ -351,17 +362,18 @@ fn from_jsonl(
plugin: &PolarsPlugin,
engine: &nu_plugin::EngineInterface,
call: &nu_plugin::EvaluatedCall,
file_path: &Path,
file_span: Span,
) -> Result<Value, ShellError> {
let infer_schema: Option<usize> = call.get_flag("infer-schema")?;
let maybe_schema = call
.get_flag("schema")?
.map(|schema| NuSchema::try_from(&schema))
.transpose()?;
let file: Spanned<PathBuf> = call.req(0)?;
let file = File::open(&file.item).map_err(|e| ShellError::GenericError {
let file = File::open(file_path).map_err(|e| ShellError::GenericError {
error: "Error opening file".into(),
msg: e.to_string(),
span: Some(file.span),
span: Some(file_span),
help: None,
inner: vec![],
})?;
@ -394,6 +406,8 @@ fn from_csv(
plugin: &PolarsPlugin,
engine: &nu_plugin::EngineInterface,
call: &nu_plugin::EvaluatedCall,
file_path: &Path,
file_span: Span,
) -> Result<Value, ShellError> {
let delimiter: Option<Spanned<String>> = call.get_flag("delimiter")?;
let no_header: bool = call.has_flag("no-header")?;
@ -407,8 +421,7 @@ fn from_csv(
.transpose()?;
if call.has_flag("lazy")? {
let file: String = call.req(0)?;
let csv_reader = LazyCsvReader::new(file);
let csv_reader = LazyCsvReader::new(file_path);
let csv_reader = match delimiter {
None => csv_reader,
@ -461,12 +474,11 @@ fn from_csv(
cache_and_to_value(plugin, engine, call.head, df)
} else {
let file: Spanned<PathBuf> = call.req(0)?;
let csv_reader = CsvReader::from_path(&file.item)
let csv_reader = CsvReader::from_path(file_path)
.map_err(|e| ShellError::GenericError {
error: "Error creating CSV reader".into(),
msg: e.to_string(),
span: Some(file.span),
span: Some(file_span),
help: None,
inner: vec![],
})?

View file

@ -1,5 +1,6 @@
use std::{fs::File, path::PathBuf};
use nu_path::expand_path_with;
use nu_plugin::{EngineInterface, EvaluatedCall, PluginCommand};
use nu_protocol::{
Category, Example, LabeledError, PipelineData, ShellError, Signature, Spanned, SyntaxShape,
@ -43,24 +44,26 @@ impl PluginCommand for ToArrow {
fn run(
&self,
plugin: &Self::Plugin,
_engine: &EngineInterface,
engine: &EngineInterface,
call: &EvaluatedCall,
input: PipelineData,
) -> Result<PipelineData, LabeledError> {
command(plugin, call, input).map_err(|e| e.into())
command(plugin, engine, call, input).map_err(|e| e.into())
}
}
fn command(
plugin: &PolarsPlugin,
engine: &EngineInterface,
call: &EvaluatedCall,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
let file_name: Spanned<PathBuf> = call.req(0)?;
let file_path = expand_path_with(&file_name.item, engine.get_current_dir()?, true);
let df = NuDataFrame::try_from_pipeline_coerce(plugin, input, call.head)?;
let mut file = File::create(&file_name.item).map_err(|e| ShellError::GenericError {
let mut file = File::create(file_path).map_err(|e| ShellError::GenericError {
error: "Error with file name".into(),
msg: e.to_string(),
span: Some(file_name.span),
@ -85,3 +88,47 @@ fn command(
None,
))
}
#[cfg(test)]
pub mod test {
use nu_plugin_test_support::PluginTest;
use nu_protocol::{Span, Value};
use uuid::Uuid;
use crate::PolarsPlugin;
#[test]
pub fn test_to_arrow() -> Result<(), Box<dyn std::error::Error>> {
let tmp_dir = tempfile::tempdir()?;
let mut tmp_file = tmp_dir.path().to_owned();
tmp_file.push(format!("{}.arrow", Uuid::new_v4()));
let tmp_file_str = tmp_file.to_str().expect("should be able to get file path");
let cmd = format!(
"[[a b]; [1 2] [3 4]] | polars into-df | polars to-arrow {}",
tmp_file_str
);
let mut plugin_test = PluginTest::new("polars", PolarsPlugin::default().into())?;
plugin_test.engine_state_mut().add_env_var(
"PWD".to_string(),
Value::string(
tmp_dir
.path()
.to_str()
.expect("should be able to get path")
.to_owned(),
Span::test_data(),
),
);
let pipeline_data = plugin_test.eval(&cmd)?;
assert!(tmp_file.exists());
let value = pipeline_data.into_value(Span::test_data());
let list = value.as_list()?;
assert_eq!(list.len(), 1);
let msg = list.first().expect("should have a value").as_str()?;
assert!(msg.contains("saved"));
Ok(())
}
}

View file

@ -1,5 +1,6 @@
use std::{fs::File, path::PathBuf};
use nu_path::expand_path_with;
use nu_plugin::{EngineInterface, EvaluatedCall, PluginCommand};
use nu_protocol::{
Category, Example, LabeledError, PipelineData, ShellError, Signature, Spanned, SyntaxShape,
@ -80,16 +81,17 @@ fn get_compression(call: &EvaluatedCall) -> Result<Option<AvroCompression>, Shel
fn command(
plugin: &PolarsPlugin,
_engine: &EngineInterface,
engine: &EngineInterface,
call: &EvaluatedCall,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
let file_name: Spanned<PathBuf> = call.req(0)?;
let file_path = expand_path_with(&file_name.item, engine.get_current_dir()?, true);
let compression = get_compression(call)?;
let df = NuDataFrame::try_from_pipeline_coerce(plugin, input, call.head)?;
let file = File::create(&file_name.item).map_err(|e| ShellError::GenericError {
let file = File::create(file_path).map_err(|e| ShellError::GenericError {
error: "Error with file name".into(),
msg: e.to_string(),
span: Some(file_name.span),
@ -115,3 +117,47 @@ fn command(
None,
))
}
#[cfg(test)]
pub mod test {
use nu_plugin_test_support::PluginTest;
use nu_protocol::{Span, Value};
use uuid::Uuid;
use crate::PolarsPlugin;
#[test]
pub fn test_to_avro() -> Result<(), Box<dyn std::error::Error>> {
let tmp_dir = tempfile::tempdir()?;
let mut tmp_file = tmp_dir.path().to_owned();
tmp_file.push(format!("{}.avro", Uuid::new_v4()));
let tmp_file_str = tmp_file.to_str().expect("should be able to get file path");
let cmd = format!(
"[[a b]; [1 2] [3 4]] | polars into-df | polars to-avro {}",
tmp_file_str
);
let mut plugin_test = PluginTest::new("polars", PolarsPlugin::default().into())?;
plugin_test.engine_state_mut().add_env_var(
"PWD".to_string(),
Value::string(
tmp_dir
.path()
.to_str()
.expect("should be able to get path")
.to_owned(),
Span::test_data(),
),
);
let pipeline_data = plugin_test.eval(&cmd)?;
assert!(tmp_file.exists());
let value = pipeline_data.into_value(Span::test_data());
let list = value.as_list()?;
assert_eq!(list.len(), 1);
let msg = list.first().expect("should have a value").as_str()?;
assert!(msg.contains("saved"));
Ok(())
}
}

View file

@ -1,5 +1,6 @@
use std::{fs::File, path::PathBuf};
use nu_path::expand_path_with;
use nu_plugin::{EngineInterface, EvaluatedCall, PluginCommand};
use nu_protocol::{
Category, Example, LabeledError, PipelineData, ShellError, Signature, Spanned, SyntaxShape,
@ -57,26 +58,28 @@ impl PluginCommand for ToCSV {
fn run(
&self,
plugin: &Self::Plugin,
_engine: &EngineInterface,
engine: &EngineInterface,
call: &EvaluatedCall,
input: PipelineData,
) -> Result<PipelineData, LabeledError> {
command(plugin, call, input).map_err(|e| e.into())
command(plugin, engine, call, input).map_err(|e| e.into())
}
}
fn command(
plugin: &PolarsPlugin,
engine: &EngineInterface,
call: &EvaluatedCall,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
let file_name: Spanned<PathBuf> = call.req(0)?;
let file_path = expand_path_with(&file_name.item, engine.get_current_dir()?, true);
let delimiter: Option<Spanned<String>> = call.get_flag("delimiter")?;
let no_header: bool = call.has_flag("no-header")?;
let df = NuDataFrame::try_from_pipeline_coerce(plugin, input, call.head)?;
let mut file = File::create(&file_name.item).map_err(|e| ShellError::GenericError {
let mut file = File::create(file_path).map_err(|e| ShellError::GenericError {
error: "Error with file name".into(),
msg: e.to_string(),
span: Some(file_name.span),
@ -131,3 +134,48 @@ fn command(
None,
))
}
#[cfg(test)]
pub mod test {
use nu_plugin_test_support::PluginTest;
use nu_protocol::{Span, Value};
use uuid::Uuid;
use crate::PolarsPlugin;
#[test]
pub fn test_to_csv() -> Result<(), Box<dyn std::error::Error>> {
let tmp_dir = tempfile::tempdir()?;
let mut tmp_file = tmp_dir.path().to_owned();
tmp_file.push(format!("{}.csv", Uuid::new_v4()));
let tmp_file_str = tmp_file.to_str().expect("should be able to get file path");
let cmd = format!(
"[[a b]; [1 2] [3 4]] | polars into-df | polars to-csv {}",
tmp_file_str
);
println!("cmd: {}", cmd);
let mut plugin_test = PluginTest::new("polars", PolarsPlugin::default().into())?;
plugin_test.engine_state_mut().add_env_var(
"PWD".to_string(),
Value::string(
tmp_dir
.path()
.to_str()
.expect("should be able to get path")
.to_owned(),
Span::test_data(),
),
);
let pipeline_data = plugin_test.eval(&cmd)?;
assert!(tmp_file.exists());
let value = pipeline_data.into_value(Span::test_data());
let list = value.as_list()?;
assert_eq!(list.len(), 1);
let msg = list.first().expect("should have a value").as_str()?;
assert!(msg.contains("saved"));
Ok(())
}
}

View file

@ -1,5 +1,6 @@
use std::{fs::File, io::BufWriter, path::PathBuf};
use nu_path::expand_path_with;
use nu_plugin::{EngineInterface, EvaluatedCall, PluginCommand};
use nu_protocol::{
Category, Example, LabeledError, PipelineData, ShellError, Signature, Spanned, SyntaxShape,
@ -53,15 +54,16 @@ impl PluginCommand for ToJsonLines {
fn command(
plugin: &PolarsPlugin,
_engine: &EngineInterface,
engine: &EngineInterface,
call: &EvaluatedCall,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
let file_name: Spanned<PathBuf> = call.req(0)?;
let file_path = expand_path_with(&file_name.item, engine.get_current_dir()?, true);
let df = NuDataFrame::try_from_pipeline_coerce(plugin, input, call.head)?;
let file = File::create(&file_name.item).map_err(|e| ShellError::GenericError {
let file = File::create(file_path).map_err(|e| ShellError::GenericError {
error: "Error with file name".into(),
msg: e.to_string(),
span: Some(file_name.span),
@ -87,3 +89,47 @@ fn command(
None,
))
}
#[cfg(test)]
pub mod test {
use nu_plugin_test_support::PluginTest;
use nu_protocol::{Span, Value};
use uuid::Uuid;
use crate::PolarsPlugin;
#[test]
pub fn test_to_jsonl() -> Result<(), Box<dyn std::error::Error>> {
let tmp_dir = tempfile::tempdir()?;
let mut tmp_file = tmp_dir.path().to_owned();
tmp_file.push(format!("{}.jsonl", Uuid::new_v4()));
let tmp_file_str = tmp_file.to_str().expect("should be able to get file path");
let cmd = format!(
"[[a b]; [1 2] [3 4]] | polars into-df | polars to-jsonl {}",
tmp_file_str
);
let mut plugin_test = PluginTest::new("polars", PolarsPlugin::default().into())?;
plugin_test.engine_state_mut().add_env_var(
"PWD".to_string(),
Value::string(
tmp_dir
.path()
.to_str()
.expect("should be able to get path")
.to_owned(),
Span::test_data(),
),
);
let pipeline_data = plugin_test.eval(&cmd)?;
assert!(tmp_file.exists());
let value = pipeline_data.into_value(Span::test_data());
let list = value.as_list()?;
assert_eq!(list.len(), 1);
let msg = list.first().expect("should have a value").as_str()?;
assert!(msg.contains("saved"));
Ok(())
}
}

View file

@ -1,5 +1,6 @@
use std::{fs::File, path::PathBuf};
use nu_path::expand_path_with;
use nu_plugin::{EngineInterface, EvaluatedCall, PluginCommand};
use nu_protocol::{
Category, Example, LabeledError, PipelineData, ShellError, Signature, Spanned, SyntaxShape,
@ -43,24 +44,26 @@ impl PluginCommand for ToParquet {
fn run(
&self,
plugin: &Self::Plugin,
_engine: &EngineInterface,
engine: &EngineInterface,
call: &EvaluatedCall,
input: PipelineData,
) -> Result<PipelineData, LabeledError> {
command(plugin, call, input).map_err(LabeledError::from)
command(plugin, engine, call, input).map_err(LabeledError::from)
}
}
fn command(
plugin: &PolarsPlugin,
engine: &EngineInterface,
call: &EvaluatedCall,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
let file_name: Spanned<PathBuf> = call.req(0)?;
let file_path = expand_path_with(&file_name.item, engine.get_current_dir()?, true);
let df = NuDataFrame::try_from_pipeline_coerce(plugin, input, call.head)?;
let file = File::create(&file_name.item).map_err(|e| ShellError::GenericError {
let file = File::create(file_path).map_err(|e| ShellError::GenericError {
error: "Error with file name".into(),
msg: e.to_string(),
span: Some(file_name.span),
@ -85,3 +88,48 @@ fn command(
None,
))
}
#[cfg(test)]
pub mod test {
use nu_plugin_test_support::PluginTest;
use nu_protocol::{Span, Value};
use uuid::Uuid;
use crate::PolarsPlugin;
#[test]
pub fn test_to_parquet() -> Result<(), Box<dyn std::error::Error>> {
let tmp_dir = tempfile::tempdir()?;
let mut tmp_file = tmp_dir.path().to_owned();
tmp_file.push(format!("{}.parquet", Uuid::new_v4()));
let tmp_file_str = tmp_file.to_str().expect("should be able to get file path");
let cmd = format!(
"[[a b]; [1 2] [3 4]] | polars into-df | polars to-parquet {}",
tmp_file_str
);
let mut plugin_test = PluginTest::new("polars", PolarsPlugin::default().into())?;
plugin_test.engine_state_mut().add_env_var(
"PWD".to_string(),
Value::string(
tmp_dir
.path()
.to_str()
.expect("should be able to get path")
.to_owned(),
Span::test_data(),
),
);
let pipeline_data = plugin_test.eval(&cmd)?;
assert!(tmp_file.exists());
let value = pipeline_data.into_value(Span::test_data());
let list = value.as_list()?;
assert_eq!(list.len(), 1);
let msg = list.first().expect("should have a value").as_str()?;
assert!(msg.contains("saved"));
Ok(())
}
}