From b84ff99e7f0d30c4736d9f25d577689f28905665 Mon Sep 17 00:00:00 2001 From: Jonathan Turner Date: Sat, 30 May 2020 11:36:04 +1200 Subject: [PATCH] Batch of moving commands off async_stream (#1916) --- crates/nu-cli/src/cli.rs | 4 +- crates/nu-cli/src/commands/append.rs | 23 +- crates/nu-cli/src/commands/autoview.rs | 495 +++++++++++---------- crates/nu-cli/src/commands/build_string.rs | 34 +- crates/nu-cli/src/commands/cal.rs | 82 ++-- crates/nu-cli/src/commands/calc.rs | 30 +- crates/nu-cli/src/commands/cd.rs | 21 +- crates/nu-cli/src/commands/clear.rs | 34 +- crates/nu-cli/src/commands/clip.rs | 47 +- crates/nu-cli/src/commands/compact.rs | 42 +- crates/nu-cli/src/commands/config.rs | 216 ++++----- crates/nu-cli/src/commands/count.rs | 22 +- crates/nu-cli/src/commands/cp.rs | 21 +- crates/nu-cli/src/commands/date.rs | 31 +- crates/nu-cli/src/commands/ls.rs | 23 +- crates/nu-cli/src/stream/output.rs | 6 +- 16 files changed, 552 insertions(+), 579 deletions(-) diff --git a/crates/nu-cli/src/cli.rs b/crates/nu-cli/src/cli.rs index decd4fc924..f93a248db1 100644 --- a/crates/nu-cli/src/cli.rs +++ b/crates/nu-cli/src/cli.rs @@ -913,7 +913,9 @@ async fn process_line( raw_input: line.to_string(), }; - if let Ok(mut output_stream) = crate::commands::autoview::autoview(context) { + if let Ok(mut output_stream) = + crate::commands::autoview::autoview(context).await + { loop { match output_stream.try_next().await { Ok(Some(ReturnSuccess::Value(Value { diff --git a/crates/nu-cli/src/commands/append.rs b/crates/nu-cli/src/commands/append.rs index a3246b5c1c..e78a0de497 100644 --- a/crates/nu-cli/src/commands/append.rs +++ b/crates/nu-cli/src/commands/append.rs @@ -2,7 +2,7 @@ use crate::commands::WholeStreamCommand; use crate::context::CommandRegistry; use crate::prelude::*; use nu_errors::ShellError; -use nu_protocol::{ReturnSuccess, Signature, SyntaxShape, UntaggedValue, Value}; +use nu_protocol::{Signature, SyntaxShape, UntaggedValue, Value}; #[derive(Deserialize)] struct AppendArgs { @@ -34,7 +34,11 @@ impl WholeStreamCommand for Append { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - append(args, registry) + let (AppendArgs { row }, input) = args.process(registry).await?; + + let eos = futures::stream::iter(vec![row]); + + Ok(input.chain(eos).to_output_stream()) } fn examples(&self) -> Vec { @@ -51,21 +55,6 @@ impl WholeStreamCommand for Append { } } -fn append(args: CommandArgs, registry: &CommandRegistry) -> Result { - let registry = registry.clone(); - - let stream = async_stream! { - let (AppendArgs { row }, mut input) = args.process(®istry).await?; - - while let Some(item) = input.next().await { - yield ReturnSuccess::value(item); - } - yield ReturnSuccess::value(row); - }; - - Ok(stream.to_output_stream()) -} - #[cfg(test)] mod tests { use super::Append; diff --git a/crates/nu-cli/src/commands/autoview.rs b/crates/nu-cli/src/commands/autoview.rs index 5a00c5fa20..fd9a949abd 100644 --- a/crates/nu-cli/src/commands/autoview.rs +++ b/crates/nu-cli/src/commands/autoview.rs @@ -4,9 +4,12 @@ use crate::data::value::format_leaf; use crate::prelude::*; use nu_errors::ShellError; use nu_protocol::{hir, hir::Expression, hir::Literal, hir::SpannedExpression}; -use nu_protocol::{Primitive, ReturnSuccess, Scope, Signature, UntaggedValue, Value}; +use nu_protocol::{Primitive, Scope, Signature, UntaggedValue, Value}; +use prettytable::format::{FormatBuilder, LinePosition, LineSeparator}; +use prettytable::{color, Attr, Cell, Row, Table}; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; +use textwrap::fill; pub struct Autoview; @@ -38,6 +41,7 @@ impl WholeStreamCommand for Autoview { name: args.call_info.name_tag, raw_input: args.raw_input, }) + .await } fn examples(&self) -> Vec { @@ -77,7 +81,7 @@ impl RunnableContextWithoutInput { } } -pub fn autoview(context: RunnableContext) -> Result { +pub async fn autoview(context: RunnableContext) -> Result { let binary = context.get_command("binaryview"); let text = context.get_command("textview"); let table = context.get_command("table"); @@ -101,254 +105,283 @@ pub fn autoview(context: RunnableContext) -> Result { AutoPivotMode::Always }; - Ok(OutputStream::new(async_stream! { - let (mut input_stream, context) = RunnableContextWithoutInput::convert(context); + let (mut input_stream, context) = RunnableContextWithoutInput::convert(context); + if let Some(x) = input_stream.next().await { match input_stream.next().await { - Some(x) => { - match input_stream.next().await { - Some(y) => { - let ctrl_c = context.ctrl_c.clone(); - let stream = async_stream! { - yield Ok(x); - yield Ok(y); + Some(y) => { + let ctrl_c = context.ctrl_c.clone(); + let stream = async_stream! { + yield Ok(x); + yield Ok(y); - loop { - match input_stream.next().await { - Some(z) => { - if ctrl_c.load(Ordering::SeqCst) { - break; - } - yield Ok(z); - } - _ => break, + loop { + match input_stream.next().await { + Some(z) => { + if ctrl_c.load(Ordering::SeqCst) { + break; } + yield Ok(z); } - }; - let stream = stream.to_input_stream(); - - if let Some(table) = table { - let command_args = create_default_command_args(&context).with_input(stream); - let result = table.run(command_args, &context.registry).await; - result.collect::>().await; + _ => break, } } - _ => { - match x { - Value { - value: UntaggedValue::Primitive(Primitive::String(ref s)), - tag: Tag { anchor, span }, - } if anchor.is_some() => { - if let Some(text) = text { - let mut stream = VecDeque::new(); - stream.push_back(UntaggedValue::string(s).into_value(Tag { anchor, span })); - let command_args = create_default_command_args(&context).with_input(stream); - let result = text.run(command_args, &context.registry).await; - result.collect::>().await; - } else { - out!("{}", s); - } - } - Value { - value: UntaggedValue::Primitive(Primitive::String(s)), - .. - } => { - out!("{}", s); - } - Value { - value: UntaggedValue::Primitive(Primitive::Line(ref s)), - tag: Tag { anchor, span }, - } if anchor.is_some() => { - if let Some(text) = text { - let mut stream = VecDeque::new(); - stream.push_back(UntaggedValue::string(s).into_value(Tag { anchor, span })); - let command_args = create_default_command_args(&context).with_input(stream); - let result = text.run(command_args, &context.registry).await; - result.collect::>().await; - } else { - out!("{}\n", s); - } - } - Value { - value: UntaggedValue::Primitive(Primitive::Line(s)), - .. - } => { - out!("{}\n", s); - } - Value { - value: UntaggedValue::Primitive(Primitive::Path(s)), - .. - } => { - out!("{}", s.display()); - } - Value { - value: UntaggedValue::Primitive(Primitive::Int(n)), - .. - } => { - out!("{}", n); - } - Value { - value: UntaggedValue::Primitive(Primitive::Decimal(n)), - .. - } => { - // TODO: normalize decimal to remove trailing zeros. - // normalization will be available in next release of bigdecimal crate - let mut output = n.to_string(); - if output.contains('.') { - output = output.trim_end_matches('0').to_owned(); - } - if output.ends_with('.') { - output.push('0'); - } - out!("{}", output); - } - Value { - value: UntaggedValue::Primitive(Primitive::Boolean(b)), - .. - } => { - out!("{}", b); - } - Value { - value: UntaggedValue::Primitive(Primitive::Duration(d)), - .. - } => { - let output = format_leaf(&x).plain_string(100_000); - out!("{}", output); - } - Value { - value: UntaggedValue::Primitive(Primitive::Date(d)), - .. - } => { - out!("{}", d); - } - Value { - value: UntaggedValue::Primitive(Primitive::Range(_)), - .. - } => { - let output = format_leaf(&x).plain_string(100_000); - out!("{}", output); - } + }; + let stream = stream.to_input_stream(); - Value { value: UntaggedValue::Primitive(Primitive::Binary(ref b)), .. } => { - if let Some(binary) = binary { - let mut stream = VecDeque::new(); - stream.push_back(x); - let command_args = create_default_command_args(&context).with_input(stream); - let result = binary.run(command_args, &context.registry).await; - result.collect::>().await; - } else { - use pretty_hex::*; - out!("{:?}", b.hex_dump()); - } + if let Some(table) = table { + let command_args = create_default_command_args(&context).with_input(stream); + let result = table.run(command_args, &context.registry).await; + result.collect::>().await; + } + } + _ => { + match x { + Value { + value: UntaggedValue::Primitive(Primitive::String(ref s)), + tag: Tag { anchor, span }, + } if anchor.is_some() => { + if let Some(text) = text { + let mut stream = VecDeque::new(); + stream.push_back( + UntaggedValue::string(s).into_value(Tag { anchor, span }), + ); + let command_args = + create_default_command_args(&context).with_input(stream); + let result = text.run(command_args, &context.registry).await; + result.collect::>().await; + } else { + out!("{}", s); + } + } + Value { + value: UntaggedValue::Primitive(Primitive::String(s)), + .. + } => { + out!("{}", s); + } + Value { + value: UntaggedValue::Primitive(Primitive::Line(ref s)), + tag: Tag { anchor, span }, + } if anchor.is_some() => { + if let Some(text) = text { + let mut stream = VecDeque::new(); + stream.push_back( + UntaggedValue::string(s).into_value(Tag { anchor, span }), + ); + let command_args = + create_default_command_args(&context).with_input(stream); + let result = text.run(command_args, &context.registry).await; + result.collect::>().await; + } else { + out!("{}\n", s); + } + } + Value { + value: UntaggedValue::Primitive(Primitive::Line(s)), + .. + } => { + out!("{}\n", s); + } + Value { + value: UntaggedValue::Primitive(Primitive::Path(s)), + .. + } => { + out!("{}", s.display()); + } + Value { + value: UntaggedValue::Primitive(Primitive::Int(n)), + .. + } => { + out!("{}", n); + } + Value { + value: UntaggedValue::Primitive(Primitive::Decimal(n)), + .. + } => { + // TODO: normalize decimal to remove trailing zeros. + // normalization will be available in next release of bigdecimal crate + let mut output = n.to_string(); + if output.contains('.') { + output = output.trim_end_matches('0').to_owned(); + } + if output.ends_with('.') { + output.push('0'); + } + out!("{}", output); + } + Value { + value: UntaggedValue::Primitive(Primitive::Boolean(b)), + .. + } => { + out!("{}", b); + } + Value { + value: UntaggedValue::Primitive(Primitive::Duration(_)), + .. + } => { + let output = format_leaf(&x).plain_string(100_000); + out!("{}", output); + } + Value { + value: UntaggedValue::Primitive(Primitive::Date(d)), + .. + } => { + out!("{}", d); + } + Value { + value: UntaggedValue::Primitive(Primitive::Range(_)), + .. + } => { + let output = format_leaf(&x).plain_string(100_000); + out!("{}", output); + } + + Value { + value: UntaggedValue::Primitive(Primitive::Binary(ref b)), + .. + } => { + if let Some(binary) = binary { + let mut stream = VecDeque::new(); + stream.push_back(x); + let command_args = + create_default_command_args(&context).with_input(stream); + let result = binary.run(command_args, &context.registry).await; + result.collect::>().await; + } else { + use pretty_hex::*; + out!("{:?}", b.hex_dump()); + } + } + + Value { + value: UntaggedValue::Error(e), + .. + } => { + return Err(e); + } + + Value { + value: UntaggedValue::Row(row), + .. + } if pivot_mode == AutoPivotMode::Always + || (pivot_mode == AutoPivotMode::Auto + && (row + .entries + .iter() + .map(|(_, v)| v.convert_to_string()) + .collect::>() + .iter() + .fold(0usize, |acc, len| acc + len.len()) + + row.entries.iter().count() * 2) + > textwrap::termwidth()) => + { + let termwidth = std::cmp::max(textwrap::termwidth(), 20); + + enum TableMode { + Light, + Normal, + } + + let mut table = Table::new(); + let table_mode = crate::data::config::config(Tag::unknown()); + + let table_mode = if let Some(s) = table_mode?.get("table_mode") { + match s.as_string() { + Ok(typ) if typ == "light" => TableMode::Light, + _ => TableMode::Normal, } + } else { + TableMode::Normal + }; - Value { value: UntaggedValue::Error(e), .. } => { - yield Err(e); + match table_mode { + TableMode::Light => { + table.set_format( + FormatBuilder::new() + .separator( + LinePosition::Title, + LineSeparator::new('─', '─', ' ', ' '), + ) + .separator( + LinePosition::Bottom, + LineSeparator::new(' ', ' ', ' ', ' '), + ) + .padding(1, 1) + .build(), + ); } - - Value { value: UntaggedValue::Row(row), ..} - if pivot_mode == AutoPivotMode::Always || - (pivot_mode == AutoPivotMode::Auto && - (row.entries.iter().map(|(k,v)| v.convert_to_string()) - .collect::>().iter() - .fold(0, |acc, len| acc + len.len()) - + - (row.entries.iter().map(|(k,_)| k.chars()).count() * 2)) - > textwrap::termwidth()) => { - use prettytable::format::{FormatBuilder, LinePosition, LineSeparator}; - use prettytable::{color, Attr, Cell, Row, Table}; - use crate::data::value::{format_leaf, style_leaf}; - use textwrap::fill; - - let termwidth = std::cmp::max(textwrap::termwidth(), 20); - - enum TableMode { - Light, - Normal, - } - - let mut table = Table::new(); - let table_mode = crate::data::config::config(Tag::unknown()); - - let table_mode = if let Some(s) = table_mode?.get("table_mode") { - match s.as_string() { - Ok(typ) if typ == "light" => TableMode::Light, - _ => TableMode::Normal, - } - } else { - TableMode::Normal - }; - - match table_mode { - TableMode::Light => { - table.set_format( - FormatBuilder::new() - .separator(LinePosition::Title, LineSeparator::new('─', '─', ' ', ' ')) - .separator(LinePosition::Bottom, LineSeparator::new(' ', ' ', ' ', ' ')) - .padding(1, 1) - .build(), - ); - } - _ => { - table.set_format( - FormatBuilder::new() - .column_separator('│') - .separator(LinePosition::Top, LineSeparator::new('─', '┬', ' ', ' ')) - .separator(LinePosition::Title, LineSeparator::new('─', '┼', ' ', ' ')) - .separator(LinePosition::Bottom, LineSeparator::new('─', '┴', ' ', ' ')) - .padding(1, 1) - .build(), - ); - } - } - - let mut max_key_len = 0; - for (key, _) in row.entries.iter() { - max_key_len = std::cmp::max(max_key_len, key.chars().count()); - } - - if max_key_len > (termwidth/2 - 1) { - max_key_len = termwidth/2 - 1; - } - - let max_val_len = termwidth - max_key_len - 5; - - for (key, value) in row.entries.iter() { - table.add_row(Row::new(vec![Cell::new(&fill(&key, max_key_len)).with_style(Attr::ForegroundColor(color::GREEN)).with_style(Attr::Bold), - Cell::new(&fill(&format_leaf(value).plain_string(100_000), max_val_len))])); - } - - table.printstd(); - - // table.print_term(&mut *context.host.lock().out_terminal().ok_or_else(|| ShellError::untagged_runtime_error("Could not open terminal for output"))?) - // .map_err(|_| ShellError::untagged_runtime_error("Internal error: could not print to terminal (for unix systems check to make sure TERM is set)"))?; + _ => { + table.set_format( + FormatBuilder::new() + .column_separator('│') + .separator( + LinePosition::Top, + LineSeparator::new('─', '┬', ' ', ' '), + ) + .separator( + LinePosition::Title, + LineSeparator::new('─', '┼', ' ', ' '), + ) + .separator( + LinePosition::Bottom, + LineSeparator::new('─', '┴', ' ', ' '), + ) + .padding(1, 1) + .build(), + ); } + } - Value { value: ref item, .. } => { - if let Some(table) = table { - let mut stream = VecDeque::new(); - stream.push_back(x); - let command_args = create_default_command_args(&context).with_input(stream); - let result = table.run(command_args, &context.registry).await; - result.collect::>().await; - } else { - out!("{:?}", item); - } - } + let mut max_key_len = 0; + for (key, _) in row.entries.iter() { + max_key_len = std::cmp::max(max_key_len, key.chars().count()); + } + + if max_key_len > (termwidth / 2 - 1) { + max_key_len = termwidth / 2 - 1; + } + + let max_val_len = termwidth - max_key_len - 5; + + for (key, value) in row.entries.iter() { + table.add_row(Row::new(vec![ + Cell::new(&fill(&key, max_key_len)) + .with_style(Attr::ForegroundColor(color::GREEN)) + .with_style(Attr::Bold), + Cell::new(&fill( + &format_leaf(value).plain_string(100_000), + max_val_len, + )), + ])); + } + + table.printstd(); + + // table.print_term(&mut *context.host.lock().out_terminal().ok_or_else(|| ShellError::untagged_runtime_error("Could not open terminal for output"))?) + // .map_err(|_| ShellError::untagged_runtime_error("Internal error: could not print to terminal (for unix systems check to make sure TERM is set)"))?; + } + + Value { + value: ref item, .. + } => { + if let Some(table) = table { + let mut stream = VecDeque::new(); + stream.push_back(x); + let command_args = + create_default_command_args(&context).with_input(stream); + let result = table.run(command_args, &context.registry).await; + result.collect::>().await; + } else { + out!("{:?}", item); } } } } - _ => { - //out!(""); - } } + } - // Needed for async_stream to type check - if false { - yield ReturnSuccess::value(UntaggedValue::nothing().into_untagged_value()); - } - })) + Ok(OutputStream::empty()) } fn create_default_command_args(context: &RunnableContextWithoutInput) -> RawCommandArgs { diff --git a/crates/nu-cli/src/commands/build_string.rs b/crates/nu-cli/src/commands/build_string.rs index 22bffe821d..3420bb7cef 100644 --- a/crates/nu-cli/src/commands/build_string.rs +++ b/crates/nu-cli/src/commands/build_string.rs @@ -32,7 +32,18 @@ impl WholeStreamCommand for BuildString { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - build_string(args, registry) + let tag = args.call_info.name_tag.clone(); + let (BuildStringArgs { rest }, _) = args.process(®istry).await?; + + let mut output_string = String::new(); + + for r in rest { + output_string.push_str(&format_leaf(&r).plain_string(100_000)) + } + + Ok(OutputStream::one(ReturnSuccess::value( + UntaggedValue::string(output_string).into_value(tag), + ))) } fn examples(&self) -> Vec { @@ -43,24 +54,3 @@ impl WholeStreamCommand for BuildString { }] } } - -pub fn build_string( - args: CommandArgs, - registry: &CommandRegistry, -) -> Result { - let registry = registry.clone(); - let tag = args.call_info.name_tag.clone(); - let stream = async_stream! { - let (BuildStringArgs { rest }, mut input) = args.process(®istry).await?; - - let mut output_string = String::new(); - - for r in rest { - output_string.push_str(&format_leaf(&r).plain_string(100_000)) - } - - yield Ok(ReturnSuccess::Value(UntaggedValue::string(&output_string).into_value(tag))); - }; - - Ok(stream.to_output_stream()) -} diff --git a/crates/nu-cli/src/commands/cal.rs b/crates/nu-cli/src/commands/cal.rs index 0bfff43f94..a79dc2918e 100644 --- a/crates/nu-cli/src/commands/cal.rs +++ b/crates/nu-cli/src/commands/cal.rs @@ -5,7 +5,7 @@ use nu_protocol::Dictionary; use crate::commands::{command::EvaluatedWholeStreamCommandArgs, WholeStreamCommand}; use indexmap::IndexMap; -use nu_protocol::{ReturnSuccess, Signature, SyntaxShape, UntaggedValue, Value}; +use nu_protocol::{Signature, SyntaxShape, UntaggedValue, Value}; pub struct Cal; @@ -42,7 +42,7 @@ impl WholeStreamCommand for Cal { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - cal(args, registry) + cal(args, registry).await } fn examples(&self) -> Vec { @@ -61,58 +61,52 @@ impl WholeStreamCommand for Cal { } } -pub fn cal(args: CommandArgs, registry: &CommandRegistry) -> Result { +pub async fn cal( + args: CommandArgs, + registry: &CommandRegistry, +) -> Result { let registry = registry.clone(); - let stream = async_stream! { - let args = args.evaluate_once(®istry).await?; - let mut calendar_vec_deque = VecDeque::new(); - let tag = args.call_info.name_tag.clone(); + let args = args.evaluate_once(®istry).await?; + let mut calendar_vec_deque = VecDeque::new(); + let tag = args.call_info.name_tag.clone(); - let (current_year, current_month, current_day) = get_current_date(); + let (current_year, current_month, current_day) = get_current_date(); - let mut selected_year: i32 = current_year; - let mut current_day_option: Option = Some(current_day); + let mut selected_year: i32 = current_year; + let mut current_day_option: Option = Some(current_day); - let month_range = if args.has("full-year") { - if let Some(full_year_value) = args.get("full-year") { - if let Ok(year_u64) = full_year_value.as_u64() { - selected_year = year_u64 as i32; + let month_range = if args.has("full-year") { + if let Some(full_year_value) = args.get("full-year") { + if let Ok(year_u64) = full_year_value.as_u64() { + selected_year = year_u64 as i32; - if selected_year != current_year { - current_day_option = None - } - } else { - yield Err(get_invalid_year_shell_error(&full_year_value.tag())); - return; + if selected_year != current_year { + current_day_option = None } + } else { + return Err(get_invalid_year_shell_error(&full_year_value.tag())); } - - (1, 12) - } else { - (current_month, current_month) - }; - - let add_months_of_year_to_table_result = add_months_of_year_to_table( - &args, - &mut calendar_vec_deque, - &tag, - selected_year, - month_range, - current_month, - current_day_option, - ); - - match add_months_of_year_to_table_result { - Ok(()) => { - for item in calendar_vec_deque { - yield ReturnSuccess::value(item); - } - } - Err(error) => yield Err(error), } + + (1, 12) + } else { + (current_month, current_month) }; - Ok(stream.to_output_stream()) + let add_months_of_year_to_table_result = add_months_of_year_to_table( + &args, + &mut calendar_vec_deque, + &tag, + selected_year, + month_range, + current_month, + current_day_option, + ); + + match add_months_of_year_to_table_result { + Ok(()) => Ok(futures::stream::iter(calendar_vec_deque).to_output_stream()), + Err(error) => Err(error), + } } fn get_invalid_year_shell_error(year_tag: &Tag) -> ShellError { diff --git a/crates/nu-cli/src/commands/calc.rs b/crates/nu-cli/src/commands/calc.rs index 235f7c5493..9337e04940 100644 --- a/crates/nu-cli/src/commands/calc.rs +++ b/crates/nu-cli/src/commands/calc.rs @@ -20,7 +20,7 @@ impl WholeStreamCommand for Calc { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - calc(args, registry) + calc(args, registry).await } fn examples(&self) -> Vec { @@ -32,31 +32,33 @@ impl WholeStreamCommand for Calc { } } -pub fn calc(args: CommandArgs, _registry: &CommandRegistry) -> Result { - let stream = async_stream! { - let mut input = args.input; - let name = args.call_info.name_tag.clone(); - while let Some(input) = input.next().await { +pub async fn calc( + args: CommandArgs, + _registry: &CommandRegistry, +) -> Result { + let input = args.input; + let name = args.call_info.name_tag.span; + + Ok(input + .map(move |input| { if let Ok(string) = input.as_string() { match parse(&string, &input.tag) { - Ok(value) => yield ReturnSuccess::value(value), - Err(err) => yield Err(ShellError::labeled_error( + Ok(value) => ReturnSuccess::value(value), + Err(err) => Err(ShellError::labeled_error( "Calculation error", err, &input.tag.span, )), } } else { - yield Err(ShellError::labeled_error( + Err(ShellError::labeled_error( "Expected a string from pipeline", "requires string input", - name.clone(), + name, )) } - } - }; - - Ok(stream.to_output_stream()) + }) + .to_output_stream()) } pub fn parse(math_expression: &str, tag: impl Into) -> Result { diff --git a/crates/nu-cli/src/commands/cd.rs b/crates/nu-cli/src/commands/cd.rs index a9a7af16e4..294c8bc0e7 100644 --- a/crates/nu-cli/src/commands/cd.rs +++ b/crates/nu-cli/src/commands/cd.rs @@ -37,7 +37,10 @@ impl WholeStreamCommand for Cd { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - cd(args, registry) + let name = args.call_info.name_tag.clone(); + let shell_manager = args.shell_manager.clone(); + let (args, _): (CdArgs, _) = args.process(®istry).await?; + shell_manager.cd(args, name) } fn examples(&self) -> Vec { @@ -66,22 +69,6 @@ impl WholeStreamCommand for Cd { } } -fn cd(args: CommandArgs, registry: &CommandRegistry) -> Result { - let registry = registry.clone(); - let stream = async_stream! { - let name = args.call_info.name_tag.clone(); - let shell_manager = args.shell_manager.clone(); - - let (args, _): (CdArgs, _) = args.process(®istry).await?; - let mut result = shell_manager.cd(args, name)?; - while let Some(item) = result.next().await { - yield item; - } - }; - - Ok(stream.to_output_stream()) -} - #[cfg(test)] mod tests { use super::Cd; diff --git a/crates/nu-cli/src/commands/clear.rs b/crates/nu-cli/src/commands/clear.rs index b8420da308..5d7647018b 100644 --- a/crates/nu-cli/src/commands/clear.rs +++ b/crates/nu-cli/src/commands/clear.rs @@ -20,12 +20,19 @@ impl WholeStreamCommand for Clear { "clears the terminal" } - async fn run( - &self, - args: CommandArgs, - registry: &CommandRegistry, - ) -> Result { - clear(args, registry) + async fn run(&self, _: CommandArgs, _: &CommandRegistry) -> Result { + if cfg!(windows) { + Command::new("cmd") + .args(&["/C", "cls"]) + .status() + .expect("failed to execute process"); + } else if cfg!(unix) { + Command::new("/bin/sh") + .args(&["-c", "clear"]) + .status() + .expect("failed to execute process"); + } + Ok(OutputStream::empty()) } fn examples(&self) -> Vec { @@ -37,21 +44,6 @@ impl WholeStreamCommand for Clear { } } -fn clear(_args: CommandArgs, _registry: &CommandRegistry) -> Result { - if cfg!(windows) { - Command::new("cmd") - .args(&["/C", "cls"]) - .status() - .expect("failed to execute process"); - } else if cfg!(unix) { - Command::new("/bin/sh") - .args(&["-c", "clear"]) - .status() - .expect("failed to execute process"); - } - Ok(OutputStream::empty()) -} - #[cfg(test)] mod tests { use super::Clear; diff --git a/crates/nu-cli/src/commands/clip.rs b/crates/nu-cli/src/commands/clip.rs index d0b6f50c26..3b0f466ab5 100644 --- a/crates/nu-cli/src/commands/clip.rs +++ b/crates/nu-cli/src/commands/clip.rs @@ -3,7 +3,7 @@ use crate::context::CommandRegistry; use crate::prelude::*; use futures::stream::StreamExt; use nu_errors::ShellError; -use nu_protocol::{ReturnValue, Signature, Value}; +use nu_protocol::{Signature, Value}; use clipboard::{ClipboardContext, ClipboardProvider}; @@ -28,7 +28,7 @@ impl WholeStreamCommand for Clip { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - clip(args, registry) + clip(args, registry).await } fn examples(&self) -> Vec { @@ -40,31 +40,21 @@ impl WholeStreamCommand for Clip { } } -pub fn clip(args: CommandArgs, _registry: &CommandRegistry) -> Result { - let stream = async_stream! { - let mut input = args.input; - let name = args.call_info.name_tag.clone(); - let values: Vec = input.collect().await; +pub async fn clip( + args: CommandArgs, + _registry: &CommandRegistry, +) -> Result { + let input = args.input; + let name = args.call_info.name_tag.clone(); + let values: Vec = input.collect().await; - let mut clip_stream = inner_clip(values, name).await; - while let Some(value) = clip_stream.next().await { - yield value; - } - }; - - let stream: BoxStream<'static, ReturnValue> = stream.boxed(); - - Ok(OutputStream::from(stream)) -} - -async fn inner_clip(input: Vec, name: Tag) -> OutputStream { if let Ok(clip_context) = ClipboardProvider::new() { let mut clip_context: ClipboardContext = clip_context; let mut new_copy_data = String::new(); - if !input.is_empty() { + if !values.is_empty() { let mut first = true; - for i in input.iter() { + for i in values.iter() { if !first { new_copy_data.push_str("\n"); } else { @@ -74,11 +64,11 @@ async fn inner_clip(input: Vec, name: Tag) -> OutputStream { let string: String = match i.as_string() { Ok(string) => string.to_string(), Err(_) => { - return OutputStream::one(Err(ShellError::labeled_error( + return Err(ShellError::labeled_error( "Given non-string data", "expected strings from pipeline", name, - ))) + )) } }; @@ -89,22 +79,21 @@ async fn inner_clip(input: Vec, name: Tag) -> OutputStream { match clip_context.set_contents(new_copy_data) { Ok(_) => {} Err(_) => { - return OutputStream::one(Err(ShellError::labeled_error( + return Err(ShellError::labeled_error( "Could not set contents of clipboard", "could not set contents of clipboard", name, - ))); + )); } } - - OutputStream::empty() } else { - OutputStream::one(Err(ShellError::labeled_error( + return Err(ShellError::labeled_error( "Could not open clipboard", "could not open clipboard", name, - ))) + )); } + Ok(OutputStream::empty()) } #[cfg(test)] diff --git a/crates/nu-cli/src/commands/compact.rs b/crates/nu-cli/src/commands/compact.rs index 2385b9342d..499606da60 100644 --- a/crates/nu-cli/src/commands/compact.rs +++ b/crates/nu-cli/src/commands/compact.rs @@ -1,6 +1,7 @@ use crate::commands::WholeStreamCommand; use crate::context::CommandRegistry; use crate::prelude::*; +use futures::future; use futures::stream::StreamExt; use nu_errors::ShellError; use nu_protocol::{ReturnSuccess, Signature, SyntaxShape, UntaggedValue, Value}; @@ -32,7 +33,7 @@ impl WholeStreamCommand for Compact { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - compact(args, registry) + compact(args, registry).await } fn examples(&self) -> Vec { @@ -55,31 +56,40 @@ impl WholeStreamCommand for Compact { } } -pub fn compact(args: CommandArgs, registry: &CommandRegistry) -> Result { +pub async fn compact( + args: CommandArgs, + registry: &CommandRegistry, +) -> Result { let registry = registry.clone(); - let stream = async_stream! { - let (CompactArgs { rest: columns }, mut input) = args.process(®istry).await?; - while let Some(item) = input.next().await { - if columns.is_empty() { + let (CompactArgs { rest: columns }, input) = args.process(®istry).await?; + Ok(input + .filter_map(move |item| { + future::ready(if columns.is_empty() { if !item.is_empty() { - yield ReturnSuccess::value(item); + Some(ReturnSuccess::value(item)) + } else { + None } } else { match item { Value { value: UntaggedValue::Row(ref r), .. - } => if columns - .iter() - .all(|field| r.get_data(field).borrow().is_some()) { - yield ReturnSuccess::value(item); + } => { + if columns + .iter() + .all(|field| r.get_data(field).borrow().is_some()) + { + Some(ReturnSuccess::value(item)) + } else { + None } - _ => {}, + } + _ => None, } - }; - } - }; - Ok(stream.to_output_stream()) + }) + }) + .to_output_stream()) } #[cfg(test)] diff --git a/crates/nu-cli/src/commands/config.rs b/crates/nu-cli/src/commands/config.rs index cd6764abbd..7a7c246c2f 100644 --- a/crates/nu-cli/src/commands/config.rs +++ b/crates/nu-cli/src/commands/config.rs @@ -71,7 +71,7 @@ impl WholeStreamCommand for Config { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - config(args, registry) + config(args, registry).await } fn examples(&self) -> Vec { @@ -115,13 +115,16 @@ impl WholeStreamCommand for Config { } } -pub fn config(args: CommandArgs, registry: &CommandRegistry) -> Result { +pub async fn config( + args: CommandArgs, + registry: &CommandRegistry, +) -> Result { let name_span = args.call_info.name_tag.clone(); let name = args.call_info.name_tag.clone(); let registry = registry.clone(); - let stream = async_stream! { - let (ConfigArgs { + let ( + ConfigArgs { load, set, set_into, @@ -129,104 +132,119 @@ pub fn config(args: CommandArgs, registry: &CommandRegistry) -> Result { - for l in list { - let value = l.clone(); - yield ReturnSuccess::value(l.clone()); - } - } - x => yield ReturnSuccess::value(x.clone()), - } - } - else if let Some((key, value)) = set { - result.insert(key.to_string(), value.clone()); - - config::write(&result, &configuration)?; - - yield ReturnSuccess::value(UntaggedValue::Row(result.into()).into_value(&value.tag)); - } - else if let Some(v) = set_into { - let rows: Vec = input.collect().await; - let key = v.to_string(); - - if rows.len() == 0 { - yield Err(ShellError::labeled_error("No values given for set_into", "needs value(s) from pipeline", v.tag())); - } else if rows.len() == 1 { - // A single value - let value = &rows[0]; - - result.insert(key.to_string(), value.clone()); - - config::write(&result, &configuration)?; - - yield ReturnSuccess::value(UntaggedValue::Row(result.into()).into_value(name)); - } else { - // Take in the pipeline as a table - let value = UntaggedValue::Table(rows).into_value(name.clone()); - - result.insert(key.to_string(), value.clone()); - - config::write(&result, &configuration)?; - - yield ReturnSuccess::value(UntaggedValue::Row(result.into()).into_value(name)); - } - } - else if let Tagged { item: true, tag } = clear { - result.clear(); - - config::write(&result, &configuration)?; - - yield ReturnSuccess::value(UntaggedValue::Row(result.into()).into_value(tag)); - - return; - } - else if let Tagged { item: true, tag } = path { - let path = config::default_path_for(&configuration)?; - - yield ReturnSuccess::value(UntaggedValue::Primitive(Primitive::Path(path)).into_value(tag)); - } - else if let Some(v) = remove { - let key = v.to_string(); - - if result.contains_key(&key) { - result.swap_remove(&key); - config::write(&result, &configuration)? - } else { - yield Err(ShellError::labeled_error( - "Key does not exist in config", - "key", - v.tag(), - )); - } - - yield ReturnSuccess::value(UntaggedValue::Row(result.into()).into_value(v.tag())); - } - else { - yield ReturnSuccess::value(UntaggedValue::Row(result.into()).into_value(name)); - } + let configuration = if let Some(supplied) = load { + Some(supplied.item().clone()) + } else { + None }; - Ok(stream.to_output_stream()) + let mut result = crate::data::config::read(name_span, &configuration)?; + + Ok(if let Some(v) = get { + let key = v.to_string(); + let value = result + .get(&key) + .ok_or_else(|| ShellError::labeled_error("Missing key in config", "key", v.tag()))?; + + match value { + Value { + value: UntaggedValue::Table(list), + .. + } => { + let list: Vec<_> = list + .iter() + .map(|x| ReturnSuccess::value(x.clone())) + .collect(); + + futures::stream::iter(list).to_output_stream() + } + x => { + let x = x.clone(); + OutputStream::one(ReturnSuccess::value(x)) + } + } + } else if let Some((key, value)) = set { + result.insert(key.to_string(), value.clone()); + + config::write(&result, &configuration)?; + + OutputStream::one(ReturnSuccess::value( + UntaggedValue::Row(result.into()).into_value(&value.tag), + )) + } else if let Some(v) = set_into { + let rows: Vec = input.collect().await; + let key = v.to_string(); + + if rows.is_empty() { + return Err(ShellError::labeled_error( + "No values given for set_into", + "needs value(s) from pipeline", + v.tag(), + )); + } else if rows.len() == 1 { + // A single value + let value = &rows[0]; + + result.insert(key, value.clone()); + + config::write(&result, &configuration)?; + + OutputStream::one(ReturnSuccess::value( + UntaggedValue::Row(result.into()).into_value(name), + )) + } else { + // Take in the pipeline as a table + let value = UntaggedValue::Table(rows).into_value(name.clone()); + + result.insert(key, value); + + config::write(&result, &configuration)?; + + OutputStream::one(ReturnSuccess::value( + UntaggedValue::Row(result.into()).into_value(name), + )) + } + } else if let Tagged { item: true, tag } = clear { + result.clear(); + + config::write(&result, &configuration)?; + + OutputStream::one(ReturnSuccess::value( + UntaggedValue::Row(result.into()).into_value(tag), + )) + } else if let Tagged { item: true, tag } = path { + let path = config::default_path_for(&configuration)?; + + OutputStream::one(ReturnSuccess::value( + UntaggedValue::Primitive(Primitive::Path(path)).into_value(tag), + )) + } else if let Some(v) = remove { + let key = v.to_string(); + + if result.contains_key(&key) { + result.swap_remove(&key); + config::write(&result, &configuration)?; + futures::stream::iter(vec![ReturnSuccess::value( + UntaggedValue::Row(result.into()).into_value(v.tag()), + )]) + .to_output_stream() + } else { + return Err(ShellError::labeled_error( + "Key does not exist in config", + "key", + v.tag(), + )); + } + } else { + futures::stream::iter(vec![ReturnSuccess::value( + UntaggedValue::Row(result.into()).into_value(name), + )]) + .to_output_stream() + }) } #[cfg(test)] diff --git a/crates/nu-cli/src/commands/count.rs b/crates/nu-cli/src/commands/count.rs index fc8cb07dbb..7d0e846e70 100644 --- a/crates/nu-cli/src/commands/count.rs +++ b/crates/nu-cli/src/commands/count.rs @@ -3,7 +3,7 @@ use crate::context::CommandRegistry; use crate::prelude::*; use futures::stream::StreamExt; use nu_errors::ShellError; -use nu_protocol::{ReturnSuccess, Signature, UntaggedValue, Value}; +use nu_protocol::{Signature, UntaggedValue, Value}; pub struct Count; @@ -24,9 +24,14 @@ impl WholeStreamCommand for Count { async fn run( &self, args: CommandArgs, - registry: &CommandRegistry, + _registry: &CommandRegistry, ) -> Result { - count(args, registry) + let name = args.call_info.name_tag.clone(); + let rows: Vec = args.input.collect().await; + + Ok(OutputStream::one( + UntaggedValue::int(rows.len()).into_value(name), + )) } fn examples(&self) -> Vec { @@ -38,17 +43,6 @@ impl WholeStreamCommand for Count { } } -pub fn count(args: CommandArgs, _registry: &CommandRegistry) -> Result { - let stream = async_stream! { - let name = args.call_info.name_tag.clone(); - let rows: Vec = args.input.collect().await; - - yield ReturnSuccess::value(UntaggedValue::int(rows.len()).into_value(name)) - }; - - Ok(stream.to_output_stream()) -} - #[cfg(test)] mod tests { use super::Count; diff --git a/crates/nu-cli/src/commands/cp.rs b/crates/nu-cli/src/commands/cp.rs index a034140865..778f032a87 100644 --- a/crates/nu-cli/src/commands/cp.rs +++ b/crates/nu-cli/src/commands/cp.rs @@ -41,7 +41,10 @@ impl WholeStreamCommand for Cpy { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - cp(args, registry) + let shell_manager = args.shell_manager.clone(); + let name = args.call_info.name_tag.clone(); + let (args, _) = args.process(®istry).await?; + shell_manager.cp(args, name) } fn examples(&self) -> Vec { @@ -60,22 +63,6 @@ impl WholeStreamCommand for Cpy { } } -pub fn cp(args: CommandArgs, registry: &CommandRegistry) -> Result { - let registry = registry.clone(); - let stream = async_stream! { - let shell_manager = args.shell_manager.clone(); - let name = args.call_info.name_tag.clone(); - let (args, _) = args.process(®istry).await?; - let mut result = shell_manager.cp(args, name)?; - - while let Some(item) = result.next().await { - yield item; - } - }; - - Ok(stream.to_output_stream()) -} - #[cfg(test)] mod tests { use super::Cpy; diff --git a/crates/nu-cli/src/commands/date.rs b/crates/nu-cli/src/commands/date.rs index 49f126e0c3..ab59993935 100644 --- a/crates/nu-cli/src/commands/date.rs +++ b/crates/nu-cli/src/commands/date.rs @@ -7,7 +7,7 @@ use crate::commands::WholeStreamCommand; use chrono::{Datelike, TimeZone, Timelike}; use core::fmt::Display; use indexmap::IndexMap; -use nu_protocol::{ReturnSuccess, Signature, UntaggedValue}; +use nu_protocol::{Signature, UntaggedValue}; pub struct Date; @@ -32,7 +32,7 @@ impl WholeStreamCommand for Date { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - date(args, registry) + date(args, registry).await } fn examples(&self) -> Vec { @@ -91,25 +91,24 @@ where UntaggedValue::Row(Dictionary::from(indexmap)).into_value(&tag) } -pub fn date(args: CommandArgs, registry: &CommandRegistry) -> Result { +pub async fn date( + args: CommandArgs, + registry: &CommandRegistry, +) -> Result { let registry = registry.clone(); - let stream = async_stream! { - let args = args.evaluate_once(®istry).await?; + let args = args.evaluate_once(®istry).await?; - let tag = args.call_info.name_tag.clone(); + let tag = args.call_info.name_tag.clone(); - let value = if args.has("utc") { - let utc: DateTime = Utc::now(); - date_to_value(utc, tag) - } else { - let local: DateTime = Local::now(); - date_to_value(local, tag) - }; - - yield ReturnSuccess::value(value); + let value = if args.has("utc") { + let utc: DateTime = Utc::now(); + date_to_value(utc, tag) + } else { + let local: DateTime = Local::now(); + date_to_value(local, tag) }; - Ok(stream.to_output_stream()) + Ok(OutputStream::one(value)) } #[cfg(test)] diff --git a/crates/nu-cli/src/commands/ls.rs b/crates/nu-cli/src/commands/ls.rs index 3470eb32f5..cd25fa92ec 100644 --- a/crates/nu-cli/src/commands/ls.rs +++ b/crates/nu-cli/src/commands/ls.rs @@ -65,7 +65,11 @@ impl WholeStreamCommand for Ls { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - ls(args, registry) + let name = args.call_info.name_tag.clone(); + let ctrl_c = args.ctrl_c.clone(); + let shell_manager = args.shell_manager.clone(); + let (args, _) = args.process(®istry).await?; + shell_manager.ls(args, name, ctrl_c) } fn examples(&self) -> Vec { @@ -89,23 +93,6 @@ impl WholeStreamCommand for Ls { } } -fn ls(args: CommandArgs, registry: &CommandRegistry) -> Result { - let registry = registry.clone(); - let stream = async_stream! { - let name = args.call_info.name_tag.clone(); - let ctrl_c = args.ctrl_c.clone(); - let shell_manager = args.shell_manager.clone(); - let (args, _) = args.process(®istry).await?; - let mut result = shell_manager.ls(args, name, ctrl_c)?; - - while let Some(item) = result.next().await { - yield item; - } - }; - - Ok(stream.to_output_stream()) -} - #[cfg(test)] mod tests { use super::Ls; diff --git a/crates/nu-cli/src/stream/output.rs b/crates/nu-cli/src/stream/output.rs index e165f6aac4..1da51567d8 100644 --- a/crates/nu-cli/src/stream/output.rs +++ b/crates/nu-cli/src/stream/output.rs @@ -1,6 +1,7 @@ use crate::prelude::*; use futures::stream::iter; use nu_protocol::{ReturnSuccess, ReturnValue, Value}; +use std::iter::IntoIterator; pub struct OutputStream { pub(crate) values: BoxStream<'static, ReturnValue>, @@ -19,9 +20,8 @@ impl OutputStream { } pub fn one(item: impl Into) -> OutputStream { - let mut v: VecDeque = VecDeque::new(); - v.push_back(item.into()); - v.into() + let item = item.into(); + futures::stream::once(async move { item }).to_output_stream() } pub fn from_input(input: impl Stream + Send + 'static) -> OutputStream {