diff --git a/src/commands.rs b/src/commands.rs index ee70534640..c238b451d8 100644 --- a/src/commands.rs +++ b/src/commands.rs @@ -1,6 +1,8 @@ #[macro_use] pub(crate) mod macros; +mod from_structured_data; + pub(crate) mod append; pub(crate) mod args; pub(crate) mod autoview; diff --git a/src/commands/from_csv.rs b/src/commands/from_csv.rs index cd29b625a6..4bada42dfb 100644 --- a/src/commands/from_csv.rs +++ b/src/commands/from_csv.rs @@ -1,7 +1,7 @@ +use crate::commands::from_structured_data::from_structured_data; use crate::commands::WholeStreamCommand; -use crate::data::{Primitive, TaggedDictBuilder, Value}; +use crate::data::{Primitive, Value}; use crate::prelude::*; -use csv::ReaderBuilder; pub struct FromCSV; @@ -39,49 +39,13 @@ impl WholeStreamCommand for FromCSV { } } -pub fn from_csv_string_to_value( - s: String, - headerless: bool, - separator: char, - tag: impl Into, -) -> Result, csv::Error> { - let mut reader = ReaderBuilder::new() - .has_headers(!headerless) - .delimiter(separator as u8) - .from_reader(s.as_bytes()); - let tag = tag.into(); - - let headers = if headerless { - (1..=reader.headers()?.len()) - .map(|i| format!("Column{}", i)) - .collect::>() - } else { - reader.headers()?.iter().map(String::from).collect() - }; - - let mut rows = vec![]; - for row in reader.records() { - let mut tagged_row = TaggedDictBuilder::new(&tag); - for (value, header) in row?.iter().zip(headers.iter()) { - tagged_row.insert_tagged( - header, - Value::Primitive(Primitive::String(String::from(value))).tagged(&tag), - ) - } - rows.push(tagged_row.into_tagged_value()); - } - - Ok(Value::Table(rows).tagged(&tag)) -} - fn from_csv( FromCSVArgs { headerless, separator, }: FromCSVArgs, - RunnableContext { input, name, .. }: RunnableContext, + runnable_context: RunnableContext, ) -> Result { - let name_tag = name; let sep = match separator { Some(Tagged { item: Value::Primitive(Primitive::String(s)), @@ -101,51 +65,5 @@ fn from_csv( _ => ',', }; - let stream = async_stream! { - let values: Vec> = input.values.collect().await; - - let mut concat_string = String::new(); - let mut latest_tag: Option = None; - - for value in values { - let value_tag = value.tag(); - latest_tag = Some(value_tag.clone()); - match value.item { - Value::Primitive(Primitive::String(s)) => { - concat_string.push_str(&s); - concat_string.push_str("\n"); - } - _ => yield Err(ShellError::labeled_error_with_secondary( - "Expected a string from pipeline", - "requires string input", - name_tag.clone(), - "value originates from here", - value_tag.clone(), - )), - - } - } - - match from_csv_string_to_value(concat_string, headerless, sep, name_tag.clone()) { - Ok(x) => match x { - Tagged { item: Value::Table(list), .. } => { - for l in list { - yield ReturnSuccess::value(l); - } - } - x => yield ReturnSuccess::value(x), - }, - Err(_) => if let Some(last_tag) = latest_tag { - yield Err(ShellError::labeled_error_with_secondary( - "Could not parse as CSV", - "input cannot be parsed as CSV", - name_tag.clone(), - "value originates from here", - last_tag.clone(), - )) - } , - } - }; - - Ok(stream.to_output_stream()) + from_structured_data(headerless, sep, "CSV", runnable_context) } diff --git a/src/commands/from_structured_data.rs b/src/commands/from_structured_data.rs new file mode 100644 index 0000000000..4799a40993 --- /dev/null +++ b/src/commands/from_structured_data.rs @@ -0,0 +1,97 @@ +use crate::data::{Primitive, TaggedDictBuilder, Value}; +use crate::prelude::*; +use csv::ReaderBuilder; + +fn from_stuctured_string_to_value( + s: String, + headerless: bool, + separator: char, + tag: impl Into, +) -> Result, csv::Error> { + let mut reader = ReaderBuilder::new() + .has_headers(!headerless) + .delimiter(separator as u8) + .from_reader(s.as_bytes()); + let tag = tag.into(); + + let headers = if headerless { + (1..=reader.headers()?.len()) + .map(|i| format!("Column{}", i)) + .collect::>() + } else { + reader.headers()?.iter().map(String::from).collect() + }; + + let mut rows = vec![]; + for row in reader.records() { + let mut tagged_row = TaggedDictBuilder::new(&tag); + for (value, header) in row?.iter().zip(headers.iter()) { + tagged_row.insert_tagged( + header, + Value::Primitive(Primitive::String(String::from(value))).tagged(&tag), + ) + } + rows.push(tagged_row.into_tagged_value()); + } + + Ok(Value::Table(rows).tagged(&tag)) +} + +pub fn from_structured_data( + headerless: bool, + sep: char, + format_name: &'static str, + RunnableContext { input, name, .. }: RunnableContext, +) -> Result { + let name_tag = name; + + let stream = async_stream! { + let values: Vec> = input.values.collect().await; + + let mut concat_string = String::new(); + let mut latest_tag: Option = None; + + for value in values { + let value_tag = value.tag(); + latest_tag = Some(value_tag.clone()); + match value.item { + Value::Primitive(Primitive::String(s)) => { + concat_string.push_str(&s); + concat_string.push_str("\n"); + } + _ => yield Err(ShellError::labeled_error_with_secondary( + "Expected a string from pipeline", + "requires string input", + name_tag.clone(), + "value originates from here", + value_tag.clone(), + )), + + } + } + + match from_stuctured_string_to_value(concat_string, headerless, sep, name_tag.clone()) { + Ok(x) => match x { + Tagged { item: Value::Table(list), .. } => { + for l in list { + yield ReturnSuccess::value(l); + } + } + x => yield ReturnSuccess::value(x), + }, + Err(_) => if let Some(last_tag) = latest_tag { + let line_one = format!("Could not parse as {}", format_name); + let line_two = format!("input cannot be parsed as {}", format_name); + yield Err(ShellError::labeled_error_with_secondary( + line_one, + line_two, + name_tag.clone(), + "value originates from here", + last_tag.clone(), + )) + } , + } + }; + + Ok(stream.to_output_stream()) +} diff --git a/src/commands/from_tsv.rs b/src/commands/from_tsv.rs index 24841b91c1..7931b8ef38 100644 --- a/src/commands/from_tsv.rs +++ b/src/commands/from_tsv.rs @@ -1,7 +1,6 @@ +use crate::commands::from_structured_data::from_structured_data; use crate::commands::WholeStreamCommand; -use crate::data::{Primitive, TaggedDictBuilder, Value}; use crate::prelude::*; -use csv::ReaderBuilder; pub struct FromTSV; @@ -33,91 +32,9 @@ impl WholeStreamCommand for FromTSV { } } -pub fn from_tsv_string_to_value( - s: String, - headerless: bool, - tag: impl Into, -) -> Result, csv::Error> { - let mut reader = ReaderBuilder::new() - .has_headers(!headerless) - .delimiter(b'\t') - .from_reader(s.as_bytes()); - let tag = tag.into(); - - let headers = if headerless { - (1..=reader.headers()?.len()) - .map(|i| format!("Column{}", i)) - .collect::>() - } else { - reader.headers()?.iter().map(String::from).collect() - }; - - let mut rows = vec![]; - for row in reader.records() { - let mut tagged_row = TaggedDictBuilder::new(&tag); - for (value, header) in row?.iter().zip(headers.iter()) { - tagged_row.insert_tagged( - header, - Value::Primitive(Primitive::String(String::from(value))).tagged(&tag), - ) - } - rows.push(tagged_row.into_tagged_value()); - } - - Ok(Value::Table(rows).tagged(&tag)) -} - fn from_tsv( FromTSVArgs { headerless }: FromTSVArgs, - RunnableContext { input, name, .. }: RunnableContext, + runnable_context: RunnableContext, ) -> Result { - let name_tag = name; - - let stream = async_stream! { - let values: Vec> = input.values.collect().await; - - let mut concat_string = String::new(); - let mut latest_tag: Option = None; - - for value in values { - let value_tag = value.tag(); - latest_tag = Some(value_tag.clone()); - match value.item { - Value::Primitive(Primitive::String(s)) => { - concat_string.push_str(&s); - concat_string.push_str("\n"); - } - _ => yield Err(ShellError::labeled_error_with_secondary( - "Expected a string from pipeline", - "requires string input", - &name_tag, - "value originates from here", - &value_tag, - )), - - } - } - - match from_tsv_string_to_value(concat_string, headerless, name_tag.clone()) { - Ok(x) => match x { - Tagged { item: Value::Table(list), .. } => { - for l in list { - yield ReturnSuccess::value(l); - } - } - x => yield ReturnSuccess::value(x), - }, - Err(_) => if let Some(last_tag) = latest_tag { - yield Err(ShellError::labeled_error_with_secondary( - "Could not parse as TSV", - "input cannot be parsed as TSV", - &name_tag, - "value originates from here", - &last_tag, - )) - } , - } - }; - - Ok(stream.to_output_stream()) + from_structured_data(headerless, '\t', "TSV", runnable_context) }