From 86b316e93000e6acfacd442dd4399d45277d3bdc Mon Sep 17 00:00:00 2001 From: Jonathan Turner Date: Sat, 13 Jun 2020 15:01:44 -0700 Subject: [PATCH] Another batch of removing async_stream (#1979) * Another batch of removing async_stream * Another batch of removing async_stream * Another batch of removing async_stream --- crates/nu-cli/src/commands/format.rs | 73 ++++++----- crates/nu-cli/src/commands/from_ics.rs | 47 ++++--- crates/nu-cli/src/commands/get.rs | 50 +++---- crates/nu-cli/src/commands/help.rs | 98 ++++++++------ crates/nu-cli/src/commands/is_empty.rs | 48 +++---- crates/nu-cli/src/commands/plugin.rs | 56 ++++---- crates/nu-cli/src/commands/select.rs | 174 ++++++++++++------------- crates/nu-cli/src/commands/with_env.rs | 57 +++----- 8 files changed, 312 insertions(+), 291 deletions(-) diff --git a/crates/nu-cli/src/commands/format.rs b/crates/nu-cli/src/commands/format.rs index 8cf614dae5..a03d2b7b9a 100644 --- a/crates/nu-cli/src/commands/format.rs +++ b/crates/nu-cli/src/commands/format.rs @@ -37,7 +37,7 @@ impl WholeStreamCommand for Format { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - format_command(args, registry) + format_command(args, registry).await } fn examples(&self) -> Vec { @@ -49,49 +49,60 @@ impl WholeStreamCommand for Format { } } -fn format_command( +async fn format_command( args: CommandArgs, registry: &CommandRegistry, ) -> Result { - let registry = registry.clone(); - let stream = async_stream! { - let scope = args.call_info.scope.clone(); - let (FormatArgs { pattern }, mut input) = args.process(®istry).await?; - let pattern_tag = pattern.tag.clone(); + let registry = Arc::new(registry.clone()); + let scope = Arc::new(args.call_info.scope.clone()); + let (FormatArgs { pattern }, input) = args.process(®istry).await?; - let format_pattern = format(&pattern); - let commands = format_pattern; + let format_pattern = format(&pattern); + let commands = Arc::new(format_pattern); - while let Some(value) = input.next().await { + Ok(input + .then(move |value| { let mut output = String::new(); + let commands = commands.clone(); + let registry = registry.clone(); + let scope = scope.clone(); - for command in &commands { - match command { - FormatCommand::Text(s) => { - output.push_str(&s); - } - FormatCommand::Column(c) => { - // FIXME: use the correct spans - let full_column_path = nu_parser::parse_full_column_path(&(c.to_string()).spanned(Span::unknown()), ®istry); + async move { + for command in &*commands { + match command { + FormatCommand::Text(s) => { + output.push_str(&s); + } + FormatCommand::Column(c) => { + // FIXME: use the correct spans + let full_column_path = nu_parser::parse_full_column_path( + &(c.to_string()).spanned(Span::unknown()), + &*registry, + ); - let result = evaluate_baseline_expr(&full_column_path.0, ®istry, &value, &scope.vars, &scope.env).await; + let result = evaluate_baseline_expr( + &full_column_path.0, + ®istry, + &value, + &scope.vars, + &scope.env, + ) + .await; - if let Ok(c) = result { - output - .push_str(&value::format_leaf(c.borrow()).plain_string(100_000)) - } else { - // That column doesn't match, so don't emit anything + if let Ok(c) = result { + output + .push_str(&value::format_leaf(c.borrow()).plain_string(100_000)) + } else { + // That column doesn't match, so don't emit anything + } } } } + + ReturnSuccess::value(UntaggedValue::string(output).into_untagged_value()) } - - yield ReturnSuccess::value( - UntaggedValue::string(output).into_untagged_value()) - } - }; - - Ok(stream.to_output_stream()) + }) + .to_output_stream()) } #[derive(Debug)] diff --git a/crates/nu-cli/src/commands/from_ics.rs b/crates/nu-cli/src/commands/from_ics.rs index bbe06bce93..e61ca93e6e 100644 --- a/crates/nu-cli/src/commands/from_ics.rs +++ b/crates/nu-cli/src/commands/from_ics.rs @@ -28,35 +28,40 @@ impl WholeStreamCommand for FromIcs { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - from_ics(args, registry) + from_ics(args, registry).await } } -fn from_ics(args: CommandArgs, registry: &CommandRegistry) -> Result { +async fn from_ics( + args: CommandArgs, + registry: &CommandRegistry, +) -> Result { let registry = registry.clone(); - let stream = async_stream! { - let args = args.evaluate_once(®istry).await?; - let tag = args.name_tag(); - let input = args.input; + let args = args.evaluate_once(®istry).await?; + let tag = args.name_tag(); + let input = args.input; - let input_string = input.collect_string(tag.clone()).await?.item; - let input_bytes = input_string.as_bytes(); - let buf_reader = BufReader::new(input_bytes); - let parser = ical::IcalParser::new(buf_reader); + let input_string = input.collect_string(tag.clone()).await?.item; + let input_bytes = input_string.as_bytes(); + let buf_reader = BufReader::new(input_bytes); + let parser = ical::IcalParser::new(buf_reader); - for calendar in parser { - match calendar { - Ok(c) => yield ReturnSuccess::value(calendar_to_value(c, tag.clone())), - Err(_) => yield Err(ShellError::labeled_error( - "Could not parse as .ics", - "input cannot be parsed as .ics", - tag.clone() - )), - } + // TODO: it should be possible to make this a stream, but the some of the lifetime requirements make this tricky. + // Pre-computing for now + let mut output = vec![]; + + for calendar in parser { + match calendar { + Ok(c) => output.push(ReturnSuccess::value(calendar_to_value(c, tag.clone()))), + Err(_) => output.push(Err(ShellError::labeled_error( + "Could not parse as .ics", + "input cannot be parsed as .ics", + tag.clone(), + ))), } - }; + } - Ok(stream.to_output_stream()) + Ok(futures::stream::iter(output).to_output_stream()) } fn calendar_to_value(calendar: IcalCalendar, tag: Tag) -> Value { diff --git a/crates/nu-cli/src/commands/get.rs b/crates/nu-cli/src/commands/get.rs index 1d318ca32e..c5bee9d3bb 100644 --- a/crates/nu-cli/src/commands/get.rs +++ b/crates/nu-cli/src/commands/get.rs @@ -39,7 +39,7 @@ impl WholeStreamCommand for Get { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - get(args, registry) + get(args, registry).await } fn examples(&self) -> Vec { @@ -192,21 +192,24 @@ pub fn get_column_path(path: &ColumnPath, obj: &Value) -> Result Result { +pub async fn get( + args: CommandArgs, + registry: &CommandRegistry, +) -> Result { let registry = registry.clone(); - let stream = async_stream! { - let (GetArgs { rest: mut fields }, mut input) = args.process(®istry).await?; - if fields.is_empty() { - let mut vec = input.drain_vec().await; + let (GetArgs { rest: mut fields }, mut input) = args.process(®istry).await?; + if fields.is_empty() { + let vec = input.drain_vec().await; - let descs = nu_protocol::merge_descriptors(&vec); - for desc in descs { - yield ReturnSuccess::value(desc); - } - } else { - let member = fields.remove(0); - trace!("get {:?} {:?}", member, fields); - while let Some(item) = input.next().await { + let descs = nu_protocol::merge_descriptors(&vec); + + Ok(futures::stream::iter(descs.into_iter().map(ReturnSuccess::value)).to_output_stream()) + } else { + let member = fields.remove(0); + trace!("get {:?} {:?}", member, fields); + + Ok(input + .map(move |item| { let member = vec![member.clone()]; let column_paths = vec![&member, &fields] @@ -214,6 +217,7 @@ pub fn get(args: CommandArgs, registry: &CommandRegistry) -> Result>(); + let mut output = vec![]; for path in column_paths { let res = get_column_path(&path, &item); @@ -224,24 +228,26 @@ pub fn get(args: CommandArgs, registry: &CommandRegistry) -> Result { for item in rows { - yield ReturnSuccess::value(item.clone()); + output.push(ReturnSuccess::value(item.clone())); } } Value { value: UntaggedValue::Primitive(Primitive::Nothing), .. } => {} - other => yield ReturnSuccess::value(other.clone()), + other => output.push(ReturnSuccess::value(other.clone())), }, - Err(reason) => yield ReturnSuccess::value( + Err(reason) => output.push(ReturnSuccess::value( UntaggedValue::Error(reason).into_untagged_value(), - ), + )), } } - } - } - }; - Ok(stream.to_output_stream()) + + futures::stream::iter(output) + }) + .flatten() + .to_output_stream()) + } } #[cfg(test)] diff --git a/crates/nu-cli/src/commands/help.rs b/crates/nu-cli/src/commands/help.rs index dcd5711398..a90f7a8d58 100644 --- a/crates/nu-cli/src/commands/help.rs +++ b/crates/nu-cli/src/commands/help.rs @@ -36,70 +36,93 @@ impl WholeStreamCommand for Help { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - help(args, registry) + help(args, registry).await } } -fn help(args: CommandArgs, registry: &CommandRegistry) -> Result { +async fn help(args: CommandArgs, registry: &CommandRegistry) -> Result { let registry = registry.clone(); let name = args.call_info.name_tag.clone(); - let stream = async_stream! { - let (HelpArgs { rest }, mut input) = args.process(®istry).await?; - if let Some(document) = rest.get(0) { - if document.item == "commands" { - let mut sorted_names = registry.names(); - sorted_names.sort(); - for cmd in sorted_names { + let (HelpArgs { rest }, ..) = args.process(®istry).await?; + + if !rest.is_empty() { + if rest[0].item == "commands" { + let mut sorted_names = registry.names(); + sorted_names.sort(); + + Ok( + futures::stream::iter(sorted_names.into_iter().filter_map(move |cmd| { // If it's a subcommand, don't list it during the commands list if cmd.contains(' ') { - continue; + return None; } let mut short_desc = TaggedDictBuilder::new(name.clone()); - let document_tag = document.tag.clone(); + let document_tag = rest[0].tag.clone(); let value = command_dict( - registry.get_command(&cmd).ok_or_else(|| { + match registry.get_command(&cmd).ok_or_else(|| { ShellError::labeled_error( format!("Could not load {}", cmd), "could not load command", document_tag, ) - })?, + }) { + Ok(ok) => ok, + Err(err) => return Some(Err(err)), + }, name.clone(), ); short_desc.insert_untagged("name", cmd); short_desc.insert_untagged( "description", - get_data_by_key(&value, "usage".spanned_unknown()) - .ok_or_else(|| { + match match get_data_by_key(&value, "usage".spanned_unknown()).ok_or_else( + || { ShellError::labeled_error( "Expected a usage key", "expected a 'usage' key", &value.tag, ) - })? - .as_string()?, + }, + ) { + Ok(ok) => ok, + Err(err) => return Some(Err(err)), + } + .as_string() + { + Ok(ok) => ok, + Err(err) => return Some(Err(err)), + }, ); - yield ReturnSuccess::value(short_desc.into_value()); - } - } else if rest.len() == 2 { - // Check for a subcommand - let command_name = format!("{} {}", rest[0].item, rest[1].item); - if let Some(command) = registry.get_command(&command_name) { - yield Ok(ReturnSuccess::Value(UntaggedValue::string(get_help(command.stream_command(), ®istry)).into_value(Tag::unknown()))); - } - } else if let Some(command) = registry.get_command(&document.item) { - yield Ok(ReturnSuccess::Value(UntaggedValue::string(get_help(command.stream_command(), ®istry)).into_value(Tag::unknown()))); + Some(ReturnSuccess::value(short_desc.into_value())) + })) + .to_output_stream(), + ) + } else if rest.len() == 2 { + // Check for a subcommand + let command_name = format!("{} {}", rest[0].item, rest[1].item); + if let Some(command) = registry.get_command(&command_name) { + Ok(OutputStream::one(ReturnSuccess::value( + UntaggedValue::string(get_help(command.stream_command(), ®istry)) + .into_value(Tag::unknown()), + ))) } else { - yield Err(ShellError::labeled_error( - "Can't find command (use 'help commands' for full list)", - "can't find command", - document.tag.span, - )); + Ok(OutputStream::empty()) } + } else if let Some(command) = registry.get_command(&rest[0].item) { + Ok(OutputStream::one(ReturnSuccess::value( + UntaggedValue::string(get_help(command.stream_command(), ®istry)) + .into_value(Tag::unknown()), + ))) } else { - let msg = r#"Welcome to Nushell. + Err(ShellError::labeled_error( + "Can't find command (use 'help commands' for full list)", + "can't find command", + rest[0].tag.span, + )) + } + } else { + let msg = r#"Welcome to Nushell. Here are some tips to help you get started. * help commands - list all available commands @@ -121,11 +144,10 @@ Get the processes on your system actively using CPU: You can also learn more at https://www.nushell.sh/book/"#; - yield Ok(ReturnSuccess::Value(UntaggedValue::string(msg).into_value(Tag::unknown()))); - } - }; - - Ok(stream.to_output_stream()) + Ok(OutputStream::one(ReturnSuccess::value( + UntaggedValue::string(msg).into_value(Tag::unknown()), + ))) + } } #[allow(clippy::cognitive_complexity)] diff --git a/crates/nu-cli/src/commands/is_empty.rs b/crates/nu-cli/src/commands/is_empty.rs index a16819178c..79e9ebe2df 100644 --- a/crates/nu-cli/src/commands/is_empty.rs +++ b/crates/nu-cli/src/commands/is_empty.rs @@ -42,15 +42,19 @@ impl WholeStreamCommand for IsEmpty { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - is_empty(args, registry) + is_empty(args, registry).await } } -fn is_empty(args: CommandArgs, registry: &CommandRegistry) -> Result { +async fn is_empty( + args: CommandArgs, + registry: &CommandRegistry, +) -> Result { let registry = registry.clone(); - let stream = async_stream! { - let (IsEmptyArgs { rest }, mut input) = args.process(®istry).await?; - while let Some(value) = input.next().await { + let (IsEmptyArgs { rest }, input) = args.process(®istry).await?; + + Ok(input + .map(move |value| { let value_tag = value.tag(); let action = if rest.len() <= 2 { @@ -85,15 +89,14 @@ fn is_empty(args: CommandArgs, registry: &CommandRegistry) -> Result yield Ok(ReturnSuccess::Value( + IsEmptyFor::Value => Ok(ReturnSuccess::Value( UntaggedValue::boolean(value.is_empty()).into_value(value_tag), )), IsEmptyFor::RowWithFieldsAndFallback(fields, default) => { let mut out = value; for field in fields.iter() { - let val = - crate::commands::get::get_column_path(&field, &out)?; + let val = crate::commands::get::get_column_path(&field, &out)?; let emptiness_value = match out { obj @@ -125,11 +128,10 @@ fn is_empty(args: CommandArgs, registry: &CommandRegistry) -> Result { - let val = - crate::commands::get::get_column_path(&field, &value)?; + let val = crate::commands::get::get_column_path(&field, &value)?; match &value { obj @@ -143,18 +145,18 @@ fn is_empty(args: CommandArgs, registry: &CommandRegistry) -> Result yield Ok(ReturnSuccess::Value(v)), - None => yield Err(ShellError::labeled_error( + Some(v) => Ok(ReturnSuccess::Value(v)), + None => Err(ShellError::labeled_error( "empty? could not find place to check emptiness", "column name", &field.tag, )), } } else { - yield Ok(ReturnSuccess::Value(value)) + Ok(ReturnSuccess::Value(value)) } } - _ => yield Err(ShellError::labeled_error( + _ => Err(ShellError::labeled_error( "Unrecognized type in stream", "original value", &value_tag, @@ -162,8 +164,7 @@ fn is_empty(args: CommandArgs, registry: &CommandRegistry) -> Result { - let val = - crate::commands::get::get_column_path(&field, &value)?; + let val = crate::commands::get::get_column_path(&field, &value)?; match &value { obj @@ -174,18 +175,18 @@ fn is_empty(args: CommandArgs, registry: &CommandRegistry) -> Result { if val.is_empty() { match obj.replace_data_at_column_path(&field, default) { - Some(v) => yield Ok(ReturnSuccess::Value(v)), - None => yield Err(ShellError::labeled_error( + Some(v) => Ok(ReturnSuccess::Value(v)), + None => Err(ShellError::labeled_error( "empty? could not find place to check emptiness", "column name", &field.tag, )), } } else { - yield Ok(ReturnSuccess::Value(value)) + Ok(ReturnSuccess::Value(value)) } } - _ => yield Err(ShellError::labeled_error( + _ => Err(ShellError::labeled_error( "Unrecognized type in stream", "original value", &value_tag, @@ -193,9 +194,8 @@ fn is_empty(args: CommandArgs, registry: &CommandRegistry) -> Result Result { - sink_plugin(self.path.clone(), args, registry) + sink_plugin(self.path.clone(), args, registry).await } } -pub fn sink_plugin( +pub async fn sink_plugin( path: String, args: CommandArgs, registry: &CommandRegistry, ) -> Result { let registry = registry.clone(); - let stream = async_stream! { - let args = args.evaluate_once(®istry).await?; - let call_info = args.call_info.clone(); + let args = args.evaluate_once(®istry).await?; + let call_info = args.call_info.clone(); - let input: Vec = args.input.collect().await; + let input: Vec = args.input.collect().await; - let request = JsonRpc::new("sink", (call_info.clone(), input)); - let request_raw = serde_json::to_string(&request); - if let Ok(request_raw) = request_raw { - if let Ok(mut tmpfile) = tempfile::NamedTempFile::new() { - let _ = writeln!(tmpfile, "{}", request_raw); - let _ = tmpfile.flush(); + let request = JsonRpc::new("sink", (call_info.clone(), input)); + let request_raw = serde_json::to_string(&request); + if let Ok(request_raw) = request_raw { + if let Ok(mut tmpfile) = tempfile::NamedTempFile::new() { + let _ = writeln!(tmpfile, "{}", request_raw); + let _ = tmpfile.flush(); - let mut child = std::process::Command::new(path) - .arg(tmpfile.path()) - .spawn(); + let child = std::process::Command::new(path).arg(tmpfile.path()).spawn(); - if let Ok(mut child) = child { - let _ = child.wait(); + if let Ok(mut child) = child { + let _ = child.wait(); - // Needed for async_stream to type check - if false { - yield ReturnSuccess::value(UntaggedValue::nothing().into_untagged_value()); - } - } else { - yield Err(ShellError::untagged_runtime_error("Could not create process for sink command")); - } + Ok(OutputStream::empty()) } else { - yield Err(ShellError::untagged_runtime_error("Could not open file to send sink command message")); + Err(ShellError::untagged_runtime_error( + "Could not create process for sink command", + )) } } else { - yield Err(ShellError::untagged_runtime_error("Could not create message to sink command")); + Err(ShellError::untagged_runtime_error( + "Could not open file to send sink command message", + )) } - }; - Ok(OutputStream::new(stream)) + } else { + Err(ShellError::untagged_runtime_error( + "Could not create message to sink command", + )) + } } diff --git a/crates/nu-cli/src/commands/select.rs b/crates/nu-cli/src/commands/select.rs index 911dc53816..c6920a9aba 100644 --- a/crates/nu-cli/src/commands/select.rs +++ b/crates/nu-cli/src/commands/select.rs @@ -6,7 +6,6 @@ use nu_protocol::{ ColumnPath, PathMember, Primitive, ReturnSuccess, Signature, SyntaxShape, TaggedDictBuilder, UnspannedPathMember, UntaggedValue, Value, }; -use nu_source::span_for_spanned_list; use nu_value_ext::{as_string, get_data_by_column_path}; #[derive(Deserialize)] @@ -38,7 +37,7 @@ impl WholeStreamCommand for Select { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - select(args, registry) + select(args, registry).await } fn examples(&self) -> Vec { @@ -57,123 +56,120 @@ impl WholeStreamCommand for Select { } } -fn select(args: CommandArgs, registry: &CommandRegistry) -> Result { +async fn select(args: CommandArgs, registry: &CommandRegistry) -> Result { let registry = registry.clone(); let name = args.call_info.name_tag.clone(); - let stream = async_stream! { - let (SelectArgs { rest: mut fields }, mut input) = args.process(®istry).await?; - if fields.is_empty() { - yield Err(ShellError::labeled_error( - "Select requires columns to select", - "needs parameter", - name, - )); - return; - } + let (SelectArgs { rest: mut fields }, mut input) = args.process(®istry).await?; + if fields.is_empty() { + return Err(ShellError::labeled_error( + "Select requires columns to select", + "needs parameter", + name, + )); + } - let member = fields.remove(0); - let member = vec![member]; + let member = fields.remove(0); + let member = vec![member]; - let column_paths = vec![&member, &fields] - .into_iter() - .flatten() - .cloned() - .collect::>(); - let mut empty = true; - let mut bring_back: indexmap::IndexMap> = indexmap::IndexMap::new(); + let column_paths = vec![&member, &fields] + .into_iter() + .flatten() + .cloned() + .collect::>(); + let mut bring_back: indexmap::IndexMap> = indexmap::IndexMap::new(); - while let Some(value) = input.next().await { - for path in &column_paths { - let path_members_span = span_for_spanned_list(path.members().iter().map(|p| p.span)); - - let fetcher = get_data_by_column_path(&value, &path, Box::new(move |(obj_source, path_member_tried, error)| { - if let PathMember { unspanned: UnspannedPathMember::String(column), .. } = path_member_tried { + while let Some(value) = input.next().await { + for path in &column_paths { + let fetcher = get_data_by_column_path( + &value, + &path, + Box::new(move |(obj_source, path_member_tried, error)| { + if let PathMember { + unspanned: UnspannedPathMember::String(column), + .. + } = path_member_tried + { return ShellError::labeled_error_with_secondary( "No data to fetch.", format!("Couldn't select column \"{}\"", column), path_member_tried.span, - format!("How about exploring it with \"get\"? Check the input is appropriate originating from here"), - obj_source.tag.span) + "How about exploring it with \"get\"? Check the input is appropriate originating from here", + obj_source.tag.span); } error - })); + }), + ); + let field = path.clone(); + let key = as_string( + &UntaggedValue::Primitive(Primitive::ColumnPath(field.clone())) + .into_untagged_value(), + )?; - let field = path.clone(); - let key = as_string(&UntaggedValue::Primitive(Primitive::ColumnPath(field.clone())).into_untagged_value())?; - - match fetcher { - Ok(results) => { - match results.value { - UntaggedValue::Table(records) => { - for x in records { - let mut out = TaggedDictBuilder::new(name.clone()); - out.insert_untagged(&key, x.value.clone()); - let group = bring_back.entry(key.clone()).or_insert(vec![]); - group.push(out.into_value()); - } - }, - x => { - let mut out = TaggedDictBuilder::new(name.clone()); - out.insert_untagged(&key, x.clone()); - let group = bring_back.entry(key.clone()).or_insert(vec![]); - group.push(out.into_value()); - } - + match fetcher { + Ok(results) => match results.value { + UntaggedValue::Table(records) => { + for x in records { + let mut out = TaggedDictBuilder::new(name.clone()); + out.insert_untagged(&key, x.value.clone()); + let group = bring_back.entry(key.clone()).or_insert(vec![]); + group.push(out.into_value()); } } - Err(reason) => { - // At the moment, we can't add switches, named flags - // and the like while already using .rest since it - // breaks the parser. - // - // We allow flexibility for now and skip the error - // if a given column isn't present. - let strict: Option = None; - - if strict.is_some() { - yield Err(reason); - return; - } - - bring_back.entry(key.clone()).or_insert(vec![]); + x => { + let mut out = TaggedDictBuilder::new(name.clone()); + out.insert_untagged(&key, x.clone()); + let group = bring_back.entry(key.clone()).or_insert(vec![]); + group.push(out.into_value()); } + }, + Err(reason) => { + // At the moment, we can't add switches, named flags + // and the like while already using .rest since it + // breaks the parser. + // + // We allow flexibility for now and skip the error + // if a given column isn't present. + let strict: Option = None; + + if strict.is_some() { + return Err(reason); + } + + bring_back.entry(key.clone()).or_insert(vec![]); } } } + } - let mut max = 0; + let mut max = 0; - if let Some(max_column) = bring_back.values().max() { - max = max_column.len(); - } + if let Some(max_column) = bring_back.values().max() { + max = max_column.len(); + } - let keys = bring_back.keys().map(|x| x.clone()).collect::>(); + let keys = bring_back.keys().cloned().collect::>(); - for mut current in 0..max { - let mut out = TaggedDictBuilder::new(name.clone()); + Ok(futures::stream::iter((0..max).map(move |current| { + let mut out = TaggedDictBuilder::new(name.clone()); - for k in &keys { - let nothing = UntaggedValue::Primitive(Primitive::Nothing).into_untagged_value(); - let subsets = bring_back.get(k); + for k in &keys { + let nothing = UntaggedValue::Primitive(Primitive::Nothing).into_untagged_value(); + let subsets = bring_back.get(k); - match subsets { - Some(set) => { - match set.get(current) { - Some(row) => out.insert_untagged(k, row.get_data(k).borrow().clone()), - None => out.insert_untagged(k, nothing.clone()), - } - } + match subsets { + Some(set) => match set.get(current) { + Some(row) => out.insert_untagged(k, row.get_data(k).borrow().clone()), None => out.insert_untagged(k, nothing.clone()), - } + }, + None => out.insert_untagged(k, nothing.clone()), } - - yield ReturnSuccess::value(out.into_value()); } - }; - Ok(stream.to_output_stream()) + ReturnSuccess::value(out.into_value()) + })) + .to_output_stream()) } #[cfg(test)] diff --git a/crates/nu-cli/src/commands/with_env.rs b/crates/nu-cli/src/commands/with_env.rs index 2e57096f49..af60f9f62d 100644 --- a/crates/nu-cli/src/commands/with_env.rs +++ b/crates/nu-cli/src/commands/with_env.rs @@ -2,7 +2,7 @@ use crate::commands::classified::block::run_block; use crate::commands::WholeStreamCommand; use crate::prelude::*; use nu_errors::ShellError; -use nu_protocol::{hir::Block, ReturnSuccess, Signature, SyntaxShape, Value}; +use nu_protocol::{hir::Block, Signature, SyntaxShape, Value}; use nu_source::Tagged; pub struct WithEnv; @@ -42,7 +42,7 @@ impl WholeStreamCommand for WithEnv { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - with_env(args, registry) + with_env(args, registry).await } fn examples(&self) -> Vec { @@ -54,46 +54,29 @@ impl WholeStreamCommand for WithEnv { } } -fn with_env(raw_args: CommandArgs, registry: &CommandRegistry) -> Result { +async fn with_env( + raw_args: CommandArgs, + registry: &CommandRegistry, +) -> Result { let registry = registry.clone(); - let stream = async_stream! { - let mut context = Context::from_raw(&raw_args, ®istry); - let mut scope = raw_args - .call_info - .scope - .clone(); - let (WithEnvArgs { variable, block }, mut input) = raw_args.process(®istry).await?; + let mut context = Context::from_raw(&raw_args, ®istry); + let mut scope = raw_args.call_info.scope.clone(); + let (WithEnvArgs { variable, block }, input) = raw_args.process(®istry).await?; - scope.env.insert(variable.0.item, variable.1.item); + scope.env.insert(variable.0.item, variable.1.item); - let result = run_block( - &block, - &mut context, - input, - &scope.it, - &scope.vars, - &scope.env, - ).await; + let result = run_block( + &block, + &mut context, + input, + &scope.it, + &scope.vars, + &scope.env, + ) + .await; - match result { - Ok(mut stream) => { - while let Some(result) = stream.next().await { - yield Ok(ReturnSuccess::Value(result)); - } - - let errors = context.get_errors(); - if let Some(error) = errors.first() { - yield Err(error.clone()); - } - } - Err(e) => { - yield Err(e); - } - } - }; - - Ok(stream.to_output_stream()) + result.map(|x| x.to_output_stream()) } #[cfg(test)]