diff --git a/src/commands/autoview.rs b/src/commands/autoview.rs index e715491e91..c3702b3f13 100644 --- a/src/commands/autoview.rs +++ b/src/commands/autoview.rs @@ -1,7 +1,6 @@ use crate::commands::{RawCommandArgs, WholeStreamCommand}; use crate::errors::ShellError; use crate::prelude::*; -use futures_async_stream::async_stream_block; pub struct Autoview; @@ -35,7 +34,7 @@ pub fn autoview( mut context: RunnableContext, raw: RawCommandArgs, ) -> Result { - Ok(OutputStream::new(async_stream_block! { + Ok(OutputStream::new(async_stream! { let input = context.input.drain_vec().await; if input.len() > 0 { @@ -89,6 +88,11 @@ pub fn autoview( result.collect::>().await; } } + + // Needed for async_stream to type check + if false { + yield ReturnSuccess::value(Value::nothing().tagged_unknown()); + } })) } diff --git a/src/commands/clip.rs b/src/commands/clip.rs index 2ef5bfac1d..ac3ded1d4b 100644 --- a/src/commands/clip.rs +++ b/src/commands/clip.rs @@ -5,7 +5,6 @@ pub mod clipboard { use crate::errors::ShellError; use crate::prelude::*; use futures::stream::StreamExt; - use futures_async_stream::async_stream_block; use clipboard::{ClipboardContext, ClipboardProvider}; @@ -40,10 +39,13 @@ pub mod clipboard { ClipArgs {}: ClipArgs, RunnableContext { input, name, .. }: RunnableContext, ) -> Result { - let stream = async_stream_block! { + let stream = async_stream! { let values: Vec> = input.values.collect().await; - inner_clip(values, name).await; + let mut clip_stream = inner_clip(values, name).await; + while let Some(value) = clip_stream.next().await { + yield value; + } }; let stream: BoxStream<'static, ReturnValue> = stream.boxed(); diff --git a/src/commands/plugin.rs b/src/commands/plugin.rs index 4e30b68f45..e769a7b5c7 100644 --- a/src/commands/plugin.rs +++ b/src/commands/plugin.rs @@ -3,7 +3,6 @@ use crate::errors::ShellError; use crate::parser::registry; use crate::prelude::*; use derive_new::new; -use futures_async_stream::async_stream_block; use log::trace; use serde::{self, Deserialize, Serialize}; use std::io::prelude::*; @@ -298,7 +297,7 @@ pub fn sink_plugin( let args = args.evaluate_once(registry)?; let call_info = args.call_info.clone(); - let stream = async_stream_block! { + let stream = async_stream! { let input: Vec> = args.input.values.collect().await; let request = JsonRpc::new("sink", (call_info.clone(), input)); @@ -313,6 +312,11 @@ pub fn sink_plugin( .expect("Failed to spawn child process"); let _ = child.wait(); + + // Needed for async_stream to type check + if false { + yield ReturnSuccess::value(Value::nothing().tagged_unknown()); + } }; Ok(OutputStream::new(stream)) } diff --git a/src/commands/save.rs b/src/commands/save.rs index 982e578a8a..e41116d682 100644 --- a/src/commands/save.rs +++ b/src/commands/save.rs @@ -2,13 +2,12 @@ use crate::commands::{UnevaluatedCallInfo, WholeStreamCommand}; use crate::data::Value; use crate::errors::ShellError; use crate::prelude::*; -use futures_async_stream::async_stream_block; use std::path::{Path, PathBuf}; pub struct Save; macro_rules! process_string { - ($input:ident, $name_tag:ident) => {{ + ($scope:tt, $input:ident, $name_tag:ident) => {{ let mut result_string = String::new(); for res in $input { match res { @@ -19,11 +18,11 @@ macro_rules! process_string { result_string.push_str(&s); } _ => { - yield core::task::Poll::Ready(Err(ShellError::labeled_error( + break $scope Err(ShellError::labeled_error( "Save could not successfully save", "unexpected data during save", $name_tag, - ))); + )); } } } @@ -32,7 +31,7 @@ macro_rules! process_string { } macro_rules! process_string_return_success { - ($result_vec:ident, $name_tag:ident) => {{ + ($scope:tt, $result_vec:ident, $name_tag:ident) => {{ let mut result_string = String::new(); for res in $result_vec { match res { @@ -43,11 +42,11 @@ macro_rules! process_string_return_success { result_string.push_str(&s); } _ => { - yield core::task::Poll::Ready(Err(ShellError::labeled_error( + break $scope Err(ShellError::labeled_error( "Save could not successfully save", "unexpected data during text save", $name_tag, - ))); + )); } } } @@ -56,7 +55,7 @@ macro_rules! process_string_return_success { } macro_rules! process_binary_return_success { - ($result_vec:ident, $name_tag:ident) => {{ + ($scope:tt, $result_vec:ident, $name_tag:ident) => {{ let mut result_binary: Vec = Vec::new(); for res in $result_vec { match res { @@ -69,11 +68,11 @@ macro_rules! process_binary_return_success { } } _ => { - yield core::task::Poll::Ready(Err(ShellError::labeled_error( + break $scope Err(ShellError::labeled_error( "Save could not successfully save", "unexpected data during binary save", $name_tag, - ))); + )); } } } @@ -131,7 +130,7 @@ fn save( let name_tag = name; let source_map = source_map.clone(); - let stream = async_stream_block! { + let stream = async_stream! { let input: Vec> = input.values.collect().await; if path.is_none() { // If there is no filename, check the metadata for the origin filename @@ -171,39 +170,43 @@ fn save( } } - let content : Result, ShellError> = if !save_raw { - if let Some(extension) = full_path.extension() { - let command_name = format!("to-{}", extension.to_str().unwrap()); - if let Some(converter) = registry.get_command(&command_name) { - let new_args = RawCommandArgs { - host, - shell_manager, - call_info: UnevaluatedCallInfo { - args: crate::parser::hir::Call { - head: raw_args.call_info.args.head, - positional: None, - named: None - }, - source: raw_args.call_info.source, - source_map: raw_args.call_info.source_map, - name_tag: raw_args.call_info.name_tag, + // TODO use label_break_value once it is stable: + // https://github.com/rust-lang/rust/issues/48594 + let content : Result, ShellError> = 'scope: loop { + break if !save_raw { + if let Some(extension) = full_path.extension() { + let command_name = format!("to-{}", extension.to_str().unwrap()); + if let Some(converter) = registry.get_command(&command_name) { + let new_args = RawCommandArgs { + host, + shell_manager, + call_info: UnevaluatedCallInfo { + args: crate::parser::hir::Call { + head: raw_args.call_info.args.head, + positional: None, + named: None + }, + source: raw_args.call_info.source, + source_map: raw_args.call_info.source_map, + name_tag: raw_args.call_info.name_tag, + } + }; + let mut result = converter.run(new_args.with_input(input), ®istry, false); + let result_vec: Vec> = result.drain_vec().await; + if converter.is_binary() { + process_binary_return_success!('scope, result_vec, name_tag) + } else { + process_string_return_success!('scope, result_vec, name_tag) } - }; - let mut result = converter.run(new_args.with_input(input), ®istry, false); - let result_vec: Vec> = result.drain_vec().await; - if converter.is_binary() { - process_binary_return_success!(result_vec, name_tag) } else { - process_string_return_success!(result_vec, name_tag) + process_string!('scope, input, name_tag) } } else { - process_string!(input, name_tag) + process_string!('scope, input, name_tag) } } else { - process_string!(input, name_tag) - } - } else { - Ok(string_from(&input).into_bytes()) + Ok(string_from(&input).into_bytes()) + }; }; match content { diff --git a/src/commands/table.rs b/src/commands/table.rs index 4efd6f821f..e9fbe35f2e 100644 --- a/src/commands/table.rs +++ b/src/commands/table.rs @@ -2,7 +2,6 @@ use crate::commands::WholeStreamCommand; use crate::errors::ShellError; use crate::format::TableView; use crate::prelude::*; -use futures_async_stream::async_stream_block; pub struct Table; @@ -32,7 +31,7 @@ impl WholeStreamCommand for Table { } pub fn table(_args: TableArgs, context: RunnableContext) -> Result { - let stream = async_stream_block! { + let stream = async_stream! { let input: Vec> = context.input.into_vec().await; if input.len() > 0 { let mut host = context.host.lock().unwrap(); @@ -41,6 +40,10 @@ pub fn table(_args: TableArgs, context: RunnableContext) -> Result