From ba6370621fdec2ea26e4a5f805462efd715aacac Mon Sep 17 00:00:00 2001 From: "Joseph T. Lyons" Date: Sat, 6 Jun 2020 03:42:06 -0400 Subject: [PATCH] Removing async_stream! from some commands (#1940) * Removing async_stream! from some commands * Revert row.rs code * Simplify logic for first.rs and skip.rs --- crates/nu-cli/src/commands/first.rs | 29 +++------ crates/nu-cli/src/commands/from_bson.rs | 41 ++++++------ crates/nu-cli/src/commands/from_csv.rs | 65 ++++++++++--------- .../src/commands/from_delimited_data.rs | 55 +++++++--------- crates/nu-cli/src/commands/from_ini.rs | 56 ++++++++-------- crates/nu-cli/src/commands/from_sqlite.rs | 63 +++++++++--------- crates/nu-cli/src/commands/from_tsv.rs | 18 ++--- crates/nu-cli/src/commands/from_url.rs | 53 +++++++-------- crates/nu-cli/src/commands/from_vcf.rs | 43 +++++++----- crates/nu-cli/src/commands/from_yaml.rs | 58 ++++++++--------- crates/nu-cli/src/commands/last.rs | 42 ++++++------ crates/nu-cli/src/commands/pwd.rs | 21 +++--- crates/nu-cli/src/commands/reduce_by.rs | 43 ++++++------ crates/nu-cli/src/commands/skip.rs | 29 +++------ 14 files changed, 286 insertions(+), 330 deletions(-) diff --git a/crates/nu-cli/src/commands/first.rs b/crates/nu-cli/src/commands/first.rs index 774f3aeffb..b96cfc4a49 100644 --- a/crates/nu-cli/src/commands/first.rs +++ b/crates/nu-cli/src/commands/first.rs @@ -2,7 +2,7 @@ use crate::commands::WholeStreamCommand; use crate::context::CommandRegistry; use crate::prelude::*; use nu_errors::ShellError; -use nu_protocol::{ReturnSuccess, Signature, SyntaxShape, UntaggedValue}; +use nu_protocol::{Signature, SyntaxShape, UntaggedValue}; use nu_source::Tagged; pub struct First; @@ -35,7 +35,7 @@ impl WholeStreamCommand for First { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - first(args, registry) + first(args, registry).await } fn examples(&self) -> Vec { @@ -57,27 +57,16 @@ impl WholeStreamCommand for First { } } -fn first(args: CommandArgs, registry: &CommandRegistry) -> Result { +async fn first(args: CommandArgs, registry: &CommandRegistry) -> Result { let registry = registry.clone(); - let stream = async_stream! { - let (FirstArgs { rows }, mut input) = args.process(®istry).await?; - let mut rows_desired = if let Some(quantity) = rows { - *quantity - } else { - 1 - }; - - while let Some(input) = input.next().await { - if rows_desired > 0 { - yield ReturnSuccess::value(input); - rows_desired -= 1; - } else { - break; - } - } + let (FirstArgs { rows }, input) = args.process(®istry).await?; + let rows_desired = if let Some(quantity) = rows { + *quantity + } else { + 1 }; - Ok(stream.to_output_stream()) + Ok(input.take(rows_desired).to_output_stream()) } #[cfg(test)] diff --git a/crates/nu-cli/src/commands/from_bson.rs b/crates/nu-cli/src/commands/from_bson.rs index e9f6e17471..8c7b2d3cef 100644 --- a/crates/nu-cli/src/commands/from_bson.rs +++ b/crates/nu-cli/src/commands/from_bson.rs @@ -27,7 +27,7 @@ impl WholeStreamCommand for FromBSON { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - from_bson(args, registry) + from_bson(args, registry).await } fn examples(&self) -> Vec { @@ -208,30 +208,27 @@ pub fn from_bson_bytes_to_value(bytes: Vec, tag: impl Into) -> Result Result { +async fn from_bson( + args: CommandArgs, + registry: &CommandRegistry, +) -> Result { let registry = registry.clone(); - let stream = async_stream! { - let args = args.evaluate_once(®istry).await?; - let tag = args.name_tag(); - let input = args.input; + let args = args.evaluate_once(®istry).await?; + let tag = args.name_tag(); + let input = args.input; - let bytes = input.collect_binary(tag.clone()).await?; + let bytes = input.collect_binary(tag.clone()).await?; - match from_bson_bytes_to_value(bytes.item, tag.clone()) { - Ok(x) => yield ReturnSuccess::value(x), - Err(_) => { - yield Err(ShellError::labeled_error_with_secondary( - "Could not parse as BSON", - "input cannot be parsed as BSON", - tag.clone(), - "value originates from here", - bytes.tag, - )) - } - } - }; - - Ok(stream.to_output_stream()) + match from_bson_bytes_to_value(bytes.item, tag.clone()) { + Ok(x) => Ok(OutputStream::one(ReturnSuccess::value(x))), + Err(_) => Err(ShellError::labeled_error_with_secondary( + "Could not parse as BSON", + "input cannot be parsed as BSON", + tag.clone(), + "value originates from here", + bytes.tag, + )), + } } #[cfg(test)] diff --git a/crates/nu-cli/src/commands/from_csv.rs b/crates/nu-cli/src/commands/from_csv.rs index 9a52eb09a1..a4051bc637 100644 --- a/crates/nu-cli/src/commands/from_csv.rs +++ b/crates/nu-cli/src/commands/from_csv.rs @@ -42,7 +42,7 @@ impl WholeStreamCommand for FromCSV { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - from_csv(args, registry) + from_csv(args, registry).await } fn examples(&self) -> Vec { @@ -66,43 +66,44 @@ impl WholeStreamCommand for FromCSV { } } -fn from_csv(args: CommandArgs, registry: &CommandRegistry) -> Result { +async fn from_csv( + args: CommandArgs, + registry: &CommandRegistry, +) -> Result { let registry = registry.clone(); let name = args.call_info.name_tag.clone(); - let stream = async_stream! { - let (FromCSVArgs { headerless, separator }, mut input) = args.process(®istry).await?; - let sep = match separator { - Some(Value { - value: UntaggedValue::Primitive(Primitive::String(s)), - tag, - .. - }) => { - if s == r"\t" { - '\t' - } else { - let vec_s: Vec = s.chars().collect(); - if vec_s.len() != 1 { - yield Err(ShellError::labeled_error( - "Expected a single separator char from --separator", - "requires a single character string input", - tag, - )); - return; - }; - vec_s[0] - } + + let ( + FromCSVArgs { + headerless, + separator, + }, + input, + ) = args.process(®istry).await?; + let sep = match separator { + Some(Value { + value: UntaggedValue::Primitive(Primitive::String(s)), + tag, + .. + }) => { + if s == r"\t" { + '\t' + } else { + let vec_s: Vec = s.chars().collect(); + if vec_s.len() != 1 { + return Err(ShellError::labeled_error( + "Expected a single separator char from --separator", + "requires a single character string input", + tag, + )); + }; + vec_s[0] } - _ => ',', - }; - - let mut result = from_delimited_data(headerless, sep, "CSV", input, name)?; - while let Some(item) = result.next().await { - yield item; } - + _ => ',', }; - Ok(stream.to_output_stream()) + from_delimited_data(headerless, sep, "CSV", input, name).await } #[cfg(test)] diff --git a/crates/nu-cli/src/commands/from_delimited_data.rs b/crates/nu-cli/src/commands/from_delimited_data.rs index eaa9ed4421..152c7750ad 100644 --- a/crates/nu-cli/src/commands/from_delimited_data.rs +++ b/crates/nu-cli/src/commands/from_delimited_data.rs @@ -1,7 +1,7 @@ use crate::prelude::*; use csv::{ErrorKind, ReaderBuilder}; use nu_errors::ShellError; -use nu_protocol::{ReturnSuccess, TaggedDictBuilder, UntaggedValue, Value}; +use nu_protocol::{TaggedDictBuilder, UntaggedValue, Value}; fn from_delimited_string_to_value( s: String, @@ -41,7 +41,7 @@ fn from_delimited_string_to_value( Ok(UntaggedValue::Table(rows).into_value(&tag)) } -pub fn from_delimited_data( +pub async fn from_delimited_data( headerless: bool, sep: char, format_name: &'static str, @@ -49,37 +49,32 @@ pub fn from_delimited_data( name: Tag, ) -> Result { let name_tag = name; + let concat_string = input.collect_string(name_tag.clone()).await?; - let stream = async_stream! { - let concat_string = input.collect_string(name_tag.clone()).await?; + match from_delimited_string_to_value(concat_string.item, headerless, sep, name_tag.clone()) { + Ok(x) => match x { + Value { + value: UntaggedValue::Table(list), + .. + } => Ok(futures::stream::iter(list).to_output_stream()), + x => Ok(OutputStream::one(x)), + }, + Err(err) => { + let line_one = match pretty_csv_error(err) { + Some(pretty) => format!("Could not parse as {} ({})", format_name, pretty), + None => format!("Could not parse as {}", format_name), + }; + let line_two = format!("input cannot be parsed as {}", format_name); - match from_delimited_string_to_value(concat_string.item, headerless, sep, name_tag.clone()) { - Ok(x) => match x { - Value { value: UntaggedValue::Table(list), .. } => { - for l in list { - yield ReturnSuccess::value(l); - } - } - x => yield ReturnSuccess::value(x), - }, - Err(err) => { - let line_one = match pretty_csv_error(err) { - Some(pretty) => format!("Could not parse as {} ({})", format_name,pretty), - None => format!("Could not parse as {}", format_name), - }; - let line_two = format!("input cannot be parsed as {}", format_name); - yield Err(ShellError::labeled_error_with_secondary( - line_one, - line_two, - name_tag.clone(), - "value originates from here", - concat_string.tag, - )) - } , + Err(ShellError::labeled_error_with_secondary( + line_one, + line_two, + name_tag.clone(), + "value originates from here", + concat_string.tag, + )) } - }; - - Ok(stream.to_output_stream()) + } } fn pretty_csv_error(err: csv::Error) -> Option { diff --git a/crates/nu-cli/src/commands/from_ini.rs b/crates/nu-cli/src/commands/from_ini.rs index d0f7dd7b11..2a9a9799e3 100644 --- a/crates/nu-cli/src/commands/from_ini.rs +++ b/crates/nu-cli/src/commands/from_ini.rs @@ -1,7 +1,7 @@ use crate::commands::WholeStreamCommand; use crate::prelude::*; use nu_errors::ShellError; -use nu_protocol::{Primitive, ReturnSuccess, Signature, TaggedDictBuilder, UntaggedValue, Value}; +use nu_protocol::{Primitive, Signature, TaggedDictBuilder, UntaggedValue, Value}; use std::collections::HashMap; pub struct FromINI; @@ -25,7 +25,7 @@ impl WholeStreamCommand for FromINI { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - from_ini(args, registry) + from_ini(args, registry).await } } @@ -64,36 +64,32 @@ pub fn from_ini_string_to_value( Ok(convert_ini_top_to_nu_value(&v, tag)) } -fn from_ini(args: CommandArgs, registry: &CommandRegistry) -> Result { +async fn from_ini( + args: CommandArgs, + registry: &CommandRegistry, +) -> Result { let registry = registry.clone(); - let stream = async_stream! { - let args = args.evaluate_once(®istry).await?; - let tag = args.name_tag(); - let input = args.input; - let concat_string = input.collect_string(tag.clone()).await?; + let args = args.evaluate_once(®istry).await?; + let tag = args.name_tag(); + let input = args.input; + let concat_string = input.collect_string(tag.clone()).await?; - match from_ini_string_to_value(concat_string.item, tag.clone()) { - Ok(x) => match x { - Value { value: UntaggedValue::Table(list), .. } => { - for l in list { - yield ReturnSuccess::value(l); - } - } - x => yield ReturnSuccess::value(x), - }, - Err(_) => { - yield Err(ShellError::labeled_error_with_secondary( - "Could not parse as INI", - "input cannot be parsed as INI", - &tag, - "value originates from here", - concat_string.tag, - )) - } - } - }; - - Ok(stream.to_output_stream()) + match from_ini_string_to_value(concat_string.item, tag.clone()) { + Ok(x) => match x { + Value { + value: UntaggedValue::Table(list), + .. + } => Ok(futures::stream::iter(list).to_output_stream()), + x => Ok(OutputStream::one(x)), + }, + Err(_) => Err(ShellError::labeled_error_with_secondary( + "Could not parse as INI", + "input cannot be parsed as INI", + &tag, + "value originates from here", + concat_string.tag, + )), + } } #[cfg(test)] diff --git a/crates/nu-cli/src/commands/from_sqlite.rs b/crates/nu-cli/src/commands/from_sqlite.rs index 72bf2b0615..c74fabd392 100644 --- a/crates/nu-cli/src/commands/from_sqlite.rs +++ b/crates/nu-cli/src/commands/from_sqlite.rs @@ -1,7 +1,7 @@ use crate::commands::WholeStreamCommand; use crate::prelude::*; use nu_errors::ShellError; -use nu_protocol::{Primitive, ReturnSuccess, Signature, TaggedDictBuilder, UntaggedValue, Value}; +use nu_protocol::{Primitive, Signature, TaggedDictBuilder, UntaggedValue, Value}; use rusqlite::{types::ValueRef, Connection, Row, NO_PARAMS}; use std::io::Write; use std::path::Path; @@ -27,7 +27,7 @@ impl WholeStreamCommand for FromSQLite { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - from_sqlite(args, registry) + from_sqlite(args, registry).await } } @@ -52,7 +52,7 @@ impl WholeStreamCommand for FromDB { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - from_sqlite(args, registry) + from_sqlite(args, registry).await } } @@ -65,6 +65,7 @@ pub fn convert_sqlite_file_to_nu_value( let mut meta_out = Vec::new(); let mut meta_stmt = conn.prepare("select name from sqlite_master where type='table'")?; let mut meta_rows = meta_stmt.query(NO_PARAMS)?; + while let Some(meta_row) = meta_rows.next()? { let table_name: String = meta_row.get(0)?; let mut meta_dict = TaggedDictBuilder::new(tag.clone()); @@ -134,37 +135,37 @@ pub fn from_sqlite_bytes_to_value( } } -fn from_sqlite(args: CommandArgs, registry: &CommandRegistry) -> Result { +async fn from_sqlite( + args: CommandArgs, + registry: &CommandRegistry, +) -> Result { let registry = registry.clone(); - let stream = async_stream! { - let args = args.evaluate_once(®istry).await?; - let tag = args.name_tag(); - let input = args.input; + let args = args.evaluate_once(®istry).await?; + let tag = args.name_tag(); + let input = args.input; - let bytes = input.collect_binary(tag.clone()).await?; - match from_sqlite_bytes_to_value(bytes.item, tag.clone()) { - Ok(x) => match x { - Value { value: UntaggedValue::Table(list), .. } => { - for l in list { - yield ReturnSuccess::value(l); - } - } - _ => yield ReturnSuccess::value(x), - } - Err(err) => { - println!("{:?}", err); - yield Err(ShellError::labeled_error_with_secondary( - "Could not parse as SQLite", - "input cannot be parsed as SQLite", - &tag, - "value originates from here", - bytes.tag, - )) - } + let bytes = input.collect_binary(tag.clone()).await?; + + match from_sqlite_bytes_to_value(bytes.item, tag.clone()) { + Ok(x) => match x { + Value { + value: UntaggedValue::Table(list), + .. + } => Ok(futures::stream::iter(list).to_output_stream()), + _ => Ok(OutputStream::one(x)), + }, + Err(err) => { + println!("{:?}", err); + + Err(ShellError::labeled_error_with_secondary( + "Could not parse as SQLite", + "input cannot be parsed as SQLite", + &tag, + "value originates from here", + bytes.tag, + )) } - }; - - Ok(stream.to_output_stream()) + } } #[cfg(test)] diff --git a/crates/nu-cli/src/commands/from_tsv.rs b/crates/nu-cli/src/commands/from_tsv.rs index 38fa8a8ebb..98046d7cd3 100644 --- a/crates/nu-cli/src/commands/from_tsv.rs +++ b/crates/nu-cli/src/commands/from_tsv.rs @@ -34,23 +34,19 @@ impl WholeStreamCommand for FromTSV { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - from_tsv(args, registry) + from_tsv(args, registry).await } } -fn from_tsv(args: CommandArgs, registry: &CommandRegistry) -> Result { +async fn from_tsv( + args: CommandArgs, + registry: &CommandRegistry, +) -> Result { let registry = registry.clone(); let name = args.call_info.name_tag.clone(); - let stream = async_stream! { - let (FromTSVArgs { headerless }, mut input) = args.process(®istry).await?; - let mut result = from_delimited_data(headerless, '\t', "TSV", input, name)?; + let (FromTSVArgs { headerless }, input) = args.process(®istry).await?; - while let Some(output) = result.next().await { - yield output; - } - }; - - Ok(stream.to_output_stream()) + from_delimited_data(headerless, '\t', "TSV", input, name).await } #[cfg(test)] diff --git a/crates/nu-cli/src/commands/from_url.rs b/crates/nu-cli/src/commands/from_url.rs index cf0ae30328..fa48eb550f 100644 --- a/crates/nu-cli/src/commands/from_url.rs +++ b/crates/nu-cli/src/commands/from_url.rs @@ -24,44 +24,41 @@ impl WholeStreamCommand for FromURL { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - from_url(args, registry) + from_url(args, registry).await } } -fn from_url(args: CommandArgs, registry: &CommandRegistry) -> Result { +async fn from_url( + args: CommandArgs, + registry: &CommandRegistry, +) -> Result { let registry = registry.clone(); - let stream = async_stream! { - let args = args.evaluate_once(®istry).await?; - let tag = args.name_tag(); - let input = args.input; + let args = args.evaluate_once(®istry).await?; + let tag = args.name_tag(); + let input = args.input; - let concat_string = input.collect_string(tag.clone()).await?; + let concat_string = input.collect_string(tag.clone()).await?; - let result = serde_urlencoded::from_str::>(&concat_string.item); + let result = serde_urlencoded::from_str::>(&concat_string.item); - match result { - Ok(result) => { - let mut row = TaggedDictBuilder::new(tag); + match result { + Ok(result) => { + let mut row = TaggedDictBuilder::new(tag); - for (k,v) in result { - row.insert_untagged(k, UntaggedValue::string(v)); - } - - yield ReturnSuccess::value(row.into_value()); - } - _ => { - yield Err(ShellError::labeled_error_with_secondary( - "String not compatible with url-encoding", - "input not url-encoded", - tag, - "value originates from here", - concat_string.tag, - )); + for (k, v) in result { + row.insert_untagged(k, UntaggedValue::string(v)); } + + Ok(OutputStream::one(ReturnSuccess::value(row.into_value()))) } - }; - - Ok(stream.to_output_stream()) + _ => Err(ShellError::labeled_error_with_secondary( + "String not compatible with url-encoding", + "input not url-encoded", + tag, + "value originates from here", + concat_string.tag, + )), + } } #[cfg(test)] diff --git a/crates/nu-cli/src/commands/from_vcf.rs b/crates/nu-cli/src/commands/from_vcf.rs index d3c83f13dc..1b55f4c0f4 100644 --- a/crates/nu-cli/src/commands/from_vcf.rs +++ b/crates/nu-cli/src/commands/from_vcf.rs @@ -28,35 +28,42 @@ impl WholeStreamCommand for FromVcf { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - from_vcf(args, registry) + from_vcf(args, registry).await } } -fn from_vcf(args: CommandArgs, registry: &CommandRegistry) -> Result { +async fn from_vcf( + args: CommandArgs, + registry: &CommandRegistry, +) -> Result { let registry = registry.clone(); - let stream = async_stream! { - let args = args.evaluate_once(®istry).await?; - let tag = args.name_tag(); - let input = args.input; + let args = args.evaluate_once(®istry).await?; + let tag = args.name_tag(); + let input = args.input; - let input_string = input.collect_string(tag.clone()).await?.item; - let input_bytes = input_string.as_bytes(); - let buf_reader = BufReader::new(input_bytes); - let parser = ical::VcardParser::new(buf_reader); + let input_string = input.collect_string(tag.clone()).await?.item; + let input_bytes = input_string.as_bytes(); + let buf_reader = BufReader::new(input_bytes); + let parser = ical::VcardParser::new(buf_reader); - for contact in parser { - match contact { - Ok(c) => yield ReturnSuccess::value(contact_to_value(c, tag.clone())), - Err(_) => yield Err(ShellError::labeled_error( + let mut values_vec_deque = VecDeque::new(); + + for contact in parser { + match contact { + Ok(c) => { + values_vec_deque.push_back(ReturnSuccess::value(contact_to_value(c, tag.clone()))) + } + Err(_) => { + return Err(ShellError::labeled_error( "Could not parse as .vcf", "input cannot be parsed as .vcf", - tag.clone() - )), + tag.clone(), + )) } } - }; + } - Ok(stream.to_output_stream()) + Ok(futures::stream::iter(values_vec_deque).to_output_stream()) } fn contact_to_value(contact: VcardContact, tag: Tag) -> Value { diff --git a/crates/nu-cli/src/commands/from_yaml.rs b/crates/nu-cli/src/commands/from_yaml.rs index 7d1b1c6f86..f28e004482 100644 --- a/crates/nu-cli/src/commands/from_yaml.rs +++ b/crates/nu-cli/src/commands/from_yaml.rs @@ -1,7 +1,7 @@ use crate::commands::WholeStreamCommand; use crate::prelude::*; use nu_errors::ShellError; -use nu_protocol::{Primitive, ReturnSuccess, Signature, TaggedDictBuilder, UntaggedValue, Value}; +use nu_protocol::{Primitive, Signature, TaggedDictBuilder, UntaggedValue, Value}; pub struct FromYAML; @@ -24,7 +24,7 @@ impl WholeStreamCommand for FromYAML { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - from_yaml(args, registry) + from_yaml(args, registry).await } } @@ -49,7 +49,7 @@ impl WholeStreamCommand for FromYML { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - from_yaml(args, registry) + from_yaml(args, registry).await } } @@ -120,37 +120,33 @@ pub fn from_yaml_string_to_value(s: String, tag: impl Into) -> Result Result { +async fn from_yaml( + args: CommandArgs, + registry: &CommandRegistry, +) -> Result { let registry = registry.clone(); - let stream = async_stream! { - let args = args.evaluate_once(®istry).await?; - let tag = args.name_tag(); - let input = args.input; + let args = args.evaluate_once(®istry).await?; + let tag = args.name_tag(); + let input = args.input; - let concat_string = input.collect_string(tag.clone()).await?; + let concat_string = input.collect_string(tag.clone()).await?; - match from_yaml_string_to_value(concat_string.item, tag.clone()) { - Ok(x) => match x { - Value { value: UntaggedValue::Table(list), .. } => { - for l in list { - yield ReturnSuccess::value(l); - } - } - x => yield ReturnSuccess::value(x), - }, - Err(_) => { - yield Err(ShellError::labeled_error_with_secondary( - "Could not parse as YAML", - "input cannot be parsed as YAML", - &tag, - "value originates from here", - &concat_string.tag, - )) - } - } - }; - - Ok(stream.to_output_stream()) + match from_yaml_string_to_value(concat_string.item, tag.clone()) { + Ok(x) => match x { + Value { + value: UntaggedValue::Table(list), + .. + } => Ok(futures::stream::iter(list).to_output_stream()), + x => Ok(OutputStream::one(x)), + }, + Err(_) => Err(ShellError::labeled_error_with_secondary( + "Could not parse as YAML", + "input cannot be parsed as YAML", + &tag, + "value originates from here", + &concat_string.tag, + )), + } } #[cfg(test)] diff --git a/crates/nu-cli/src/commands/last.rs b/crates/nu-cli/src/commands/last.rs index 052885f61c..1e3365cac3 100644 --- a/crates/nu-cli/src/commands/last.rs +++ b/crates/nu-cli/src/commands/last.rs @@ -35,7 +35,7 @@ impl WholeStreamCommand for Last { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - last(args, registry) + last(args, registry).await } fn examples(&self) -> Vec { @@ -58,28 +58,30 @@ impl WholeStreamCommand for Last { } } -fn last(args: CommandArgs, registry: &CommandRegistry) -> Result { +async fn last(args: CommandArgs, registry: &CommandRegistry) -> Result { let registry = registry.clone(); - let stream = async_stream! { - let (LastArgs { rows }, mut input) = args.process(®istry).await?; - let v: Vec<_> = input.into_vec().await; + let (LastArgs { rows }, input) = args.process(®istry).await?; + let v: Vec<_> = input.into_vec().await; - let rows_desired = if let Some(quantity) = rows { - *quantity - } else { - 1 - }; - - let count = (rows_desired as usize); - if count < v.len() { - let k = v.len() - count; - for x in v[k..].iter() { - let y: Value = x.clone(); - yield ReturnSuccess::value(y) - } - } + let rows_desired = if let Some(quantity) = rows { + *quantity + } else { + 1 }; - Ok(stream.to_output_stream()) + + let mut values_vec_deque = VecDeque::new(); + + let count = rows_desired as usize; + + if count < v.len() { + let k = v.len() - count; + + for x in v[k..].iter() { + values_vec_deque.push_back(ReturnSuccess::value(x.clone())); + } + } + + Ok(futures::stream::iter(values_vec_deque).to_output_stream()) } #[cfg(test)] diff --git a/crates/nu-cli/src/commands/pwd.rs b/crates/nu-cli/src/commands/pwd.rs index 69739ca2e8..b1d27f27ec 100644 --- a/crates/nu-cli/src/commands/pwd.rs +++ b/crates/nu-cli/src/commands/pwd.rs @@ -24,7 +24,7 @@ impl WholeStreamCommand for Pwd { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - pwd(args, registry) + pwd(args, registry).await } fn examples(&self) -> Vec { @@ -36,20 +36,15 @@ impl WholeStreamCommand for Pwd { } } -pub fn pwd(args: CommandArgs, registry: &CommandRegistry) -> Result { +pub async fn pwd( + args: CommandArgs, + registry: &CommandRegistry, +) -> Result { let registry = registry.clone(); + let shell_manager = args.shell_manager.clone(); + let args = args.evaluate_once(®istry).await?; - let stream = async_stream! { - let shell_manager = args.shell_manager.clone(); - let args = args.evaluate_once(®istry).await?; - let mut out = shell_manager.pwd(args)?; - - while let Some(l) = out.next().await { - yield l; - } - }; - - Ok(stream.to_output_stream()) + shell_manager.pwd(args) } #[cfg(test)] diff --git a/crates/nu-cli/src/commands/reduce_by.rs b/crates/nu-cli/src/commands/reduce_by.rs index 8c826ba945..404b7adbb4 100644 --- a/crates/nu-cli/src/commands/reduce_by.rs +++ b/crates/nu-cli/src/commands/reduce_by.rs @@ -37,42 +37,37 @@ impl WholeStreamCommand for ReduceBy { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - reduce_by(args, registry) + reduce_by(args, registry).await } } -pub fn reduce_by( +pub async fn reduce_by( args: CommandArgs, registry: &CommandRegistry, ) -> Result { let registry = registry.clone(); let name = args.call_info.name_tag.clone(); - let stream = async_stream! { - let (ReduceByArgs { reduce_with }, mut input) = args.process(®istry).await?; - let values: Vec = input.collect().await; + let (ReduceByArgs { reduce_with }, mut input) = args.process(®istry).await?; + let values: Vec = input.collect().await; - if values.is_empty() { - yield Err(ShellError::labeled_error( - "Expected table from pipeline", - "requires a table input", - name - )) - } else { + if values.is_empty() { + return Err(ShellError::labeled_error( + "Expected table from pipeline", + "requires a table input", + name, + )); + } - let reduce_with = if let Some(reducer) = reduce_with { - Some(reducer.item().clone()) - } else { - None - }; - - match reduce(&values[0], reduce_with, name) { - Ok(reduced) => yield ReturnSuccess::value(reduced), - Err(err) => yield Err(err) - } - } + let reduce_with = if let Some(reducer) = reduce_with { + Some(reducer.item().clone()) + } else { + None }; - Ok(stream.to_output_stream()) + match reduce(&values[0], reduce_with, name) { + Ok(reduced) => Ok(OutputStream::one(ReturnSuccess::value(reduced))), + Err(err) => Err(err), + } } #[cfg(test)] diff --git a/crates/nu-cli/src/commands/skip.rs b/crates/nu-cli/src/commands/skip.rs index 9fb4b214c8..2008f60d83 100644 --- a/crates/nu-cli/src/commands/skip.rs +++ b/crates/nu-cli/src/commands/skip.rs @@ -2,7 +2,7 @@ use crate::commands::WholeStreamCommand; use crate::context::CommandRegistry; use crate::prelude::*; use nu_errors::ShellError; -use nu_protocol::{ReturnSuccess, Signature, SyntaxShape, UntaggedValue}; +use nu_protocol::{Signature, SyntaxShape, UntaggedValue}; use nu_source::Tagged; pub struct Skip; @@ -31,7 +31,7 @@ impl WholeStreamCommand for Skip { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - skip(args, registry) + skip(args, registry).await } fn examples(&self) -> Vec { @@ -46,27 +46,16 @@ impl WholeStreamCommand for Skip { } } -fn skip(args: CommandArgs, registry: &CommandRegistry) -> Result { +async fn skip(args: CommandArgs, registry: &CommandRegistry) -> Result { let registry = registry.clone(); - let stream = async_stream! { - let (SkipArgs { rows }, mut input) = args.process(®istry).await?; - let mut rows_desired = if let Some(quantity) = rows { - *quantity - } else { - 1 - }; - - while let Some(input) = input.next().await { - if rows_desired == 0 { - yield ReturnSuccess::value(input); - } - if rows_desired > 0{ - rows_desired -= 1; - } - } + let (SkipArgs { rows }, input) = args.process(®istry).await?; + let rows_desired = if let Some(quantity) = rows { + *quantity + } else { + 1 }; - Ok(stream.to_output_stream()) + Ok(input.skip(rows_desired).to_output_stream()) } #[cfg(test)]