diff --git a/Cargo.lock b/Cargo.lock index 3cc8903b9d..5e79b6c45b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -128,27 +128,6 @@ dependencies = [ "slab", ] -[[package]] -name = "async-stream" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22068c0c19514942eefcfd4daf8976ef1aad84e61539f95cd200c35202f80af5" -dependencies = [ - "async-stream-impl", - "futures-core", -] - -[[package]] -name = "async-stream-impl" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25f9db3b38af870bf7e5cc649167533b493928e50744e2c30ae350230b414670" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "async-task" version = "1.3.1" @@ -2227,7 +2206,6 @@ dependencies = [ "ansi_term 0.12.1", "app_dirs", "async-recursion", - "async-stream", "async-trait", "base64 0.12.1", "bigdecimal", diff --git a/crates/nu-cli/Cargo.toml b/crates/nu-cli/Cargo.toml index dcd062a7f0..7f75306cdf 100644 --- a/crates/nu-cli/Cargo.toml +++ b/crates/nu-cli/Cargo.toml @@ -23,7 +23,6 @@ app_dirs = "1.2.1" async-recursion = "0.3.1" async-trait = "0.1.31" directories = "2.0.2" -async-stream = "0.2" base64 = "0.12.1" bigdecimal = { version = "0.1.2", features = ["serde"] } bson = { version = "0.14.1", features = ["decimal128"] } diff --git a/crates/nu-cli/src/commands/classified/internal.rs b/crates/nu-cli/src/commands/classified/internal.rs index cbf2e544e4..6a51f86edf 100644 --- a/crates/nu-cli/src/commands/classified/internal.rs +++ b/crates/nu-cli/src/commands/classified/internal.rs @@ -28,7 +28,7 @@ pub(crate) async fn run_internal_command( let objects: InputStream = trace_stream!(target: "nu::trace_stream::internal", "input" = input); let internal_command = context.expect_command(&command.name); - let mut result = { + let result = { context .run_command( internal_command?, @@ -40,156 +40,202 @@ pub(crate) async fn run_internal_command( .await }; - let mut context = context.clone(); + let head = Arc::new(command.args.head.clone()); + //let context = Arc::new(context.clone()); + let context = context.clone(); + let command = Arc::new(command); + let scope = Arc::new(scope); // let scope = scope.clone(); - let stream = async_stream! { - let mut soft_errs: Vec = vec![]; - let mut yielded = false; + Ok(InputStream::from_stream( + result + .then(move |item| { + let head = head.clone(); + let command = command.clone(); + let mut context = context.clone(); + let scope = scope.clone(); + async move { + match item { + Ok(ReturnSuccess::Action(action)) => match action { + CommandAction::ChangePath(path) => { + context.shell_manager.set_path(path); + InputStream::from_stream(futures::stream::iter(vec![])) + } + CommandAction::Exit => std::process::exit(0), // TODO: save history.txt + CommandAction::Error(err) => { + context.error(err.clone()); + InputStream::one(UntaggedValue::Error(err).into_untagged_value()) + } + CommandAction::AutoConvert(tagged_contents, extension) => { + let contents_tag = tagged_contents.tag.clone(); + let command_name = format!("from {}", extension); + let command = command.clone(); + if let Some(converter) = context.registry.get_command(&command_name) + { + let new_args = RawCommandArgs { + host: context.host.clone(), + ctrl_c: context.ctrl_c.clone(), + current_errors: context.current_errors.clone(), + shell_manager: context.shell_manager.clone(), + call_info: UnevaluatedCallInfo { + args: nu_protocol::hir::Call { + head: (&*head).clone(), + positional: None, + named: None, + span: Span::unknown(), + is_last: false, + }, + name_tag: Tag::unknown_anchor(command.name_span), + scope: (&*scope).clone(), + }, + }; + let mut result = converter + .run( + new_args.with_input(vec![tagged_contents]), + &context.registry, + ) + .await; + let result_vec: Vec> = + result.drain_vec().await; - while let Some(item) = result.next().await { - match item { - Ok(ReturnSuccess::Action(action)) => match action { - CommandAction::ChangePath(path) => { - context.shell_manager.set_path(path); - } - CommandAction::Exit => std::process::exit(0), // TODO: save history.txt - CommandAction::Error(err) => { - context.error(err); - break; - } - CommandAction::AutoConvert(tagged_contents, extension) => { - let contents_tag = tagged_contents.tag.clone(); - let command_name = format!("from {}", extension); - let command = command.clone(); - if let Some(converter) = context.registry.get_command(&command_name) { - let new_args = RawCommandArgs { - host: context.host.clone(), - ctrl_c: context.ctrl_c.clone(), - current_errors: context.current_errors.clone(), - shell_manager: context.shell_manager.clone(), - call_info: UnevaluatedCallInfo { - args: nu_protocol::hir::Call { - head: command.args.head, - positional: None, - named: None, - span: Span::unknown(), - is_last: false, - }, - name_tag: Tag::unknown_anchor(command.name_span), - scope: scope.clone(), - } - }; - let mut result = converter.run(new_args.with_input(vec![tagged_contents]), &context.registry).await; - let result_vec: Vec> = result.drain_vec().await; - for res in result_vec { - match res { - Ok(ReturnSuccess::Value(Value { value: UntaggedValue::Table(list), ..})) => { - for l in list { - yield Ok(l); + let mut output = vec![]; + for res in result_vec { + match res { + Ok(ReturnSuccess::Value(Value { + value: UntaggedValue::Table(list), + .. + })) => { + for l in list { + output.push(Ok(l)); + } + } + Ok(ReturnSuccess::Value(Value { value, .. })) => { + output.push(Ok( + value.into_value(contents_tag.clone()) + )); + } + Err(e) => output.push(Err(e)), + _ => {} } } - Ok(ReturnSuccess::Value(Value { value, .. })) => { - yield Ok(value.into_value(contents_tag.clone())); - } - Err(e) => yield Err(e), - _ => {} + + futures::stream::iter(output).to_input_stream() + } else { + InputStream::one(tagged_contents) } } - } else { - yield Ok(tagged_contents) - } - } - CommandAction::EnterHelpShell(value) => { - match value { - Value { - value: UntaggedValue::Primitive(Primitive::String(cmd)), - tag, - } => { - context.shell_manager.insert_at_current(Box::new( - HelpShell::for_command( - UntaggedValue::string(cmd).into_value(tag), - &context.registry(), - )?, - )); + CommandAction::EnterHelpShell(value) => match value { + Value { + value: UntaggedValue::Primitive(Primitive::String(cmd)), + tag, + } => { + context.shell_manager.insert_at_current(Box::new( + match HelpShell::for_command( + UntaggedValue::string(cmd).into_value(tag), + &context.registry(), + ) { + Ok(v) => v, + Err(err) => { + return InputStream::one( + UntaggedValue::Error(err).into_untagged_value(), + ) + } + }, + )); + InputStream::from_stream(futures::stream::iter(vec![])) + } + _ => { + context.shell_manager.insert_at_current(Box::new( + match HelpShell::index(&context.registry()) { + Ok(v) => v, + Err(err) => { + return InputStream::one( + UntaggedValue::Error(err).into_untagged_value(), + ) + } + }, + )); + InputStream::from_stream(futures::stream::iter(vec![])) + } + }, + CommandAction::EnterValueShell(value) => { + context + .shell_manager + .insert_at_current(Box::new(ValueShell::new(value))); + InputStream::from_stream(futures::stream::iter(vec![])) } - _ => { + CommandAction::EnterShell(location) => { context.shell_manager.insert_at_current(Box::new( - HelpShell::index(&context.registry())?, + match FilesystemShell::with_location( + location, + context.registry().clone(), + ) { + Ok(v) => v, + Err(err) => { + return InputStream::one( + UntaggedValue::Error(err.into()) + .into_untagged_value(), + ) + } + }, )); + InputStream::from_stream(futures::stream::iter(vec![])) } + CommandAction::AddAlias(name, args, block) => { + context.add_commands(vec![whole_stream_command( + AliasCommand::new(name, args, block), + )]); + InputStream::from_stream(futures::stream::iter(vec![])) + } + CommandAction::PreviousShell => { + context.shell_manager.prev(); + InputStream::from_stream(futures::stream::iter(vec![])) + } + CommandAction::NextShell => { + context.shell_manager.next(); + InputStream::from_stream(futures::stream::iter(vec![])) + } + CommandAction::LeaveShell => { + context.shell_manager.remove_at_current(); + if context.shell_manager.is_empty() { + std::process::exit(0); // TODO: save history.txt + } + InputStream::from_stream(futures::stream::iter(vec![])) + } + }, + + Ok(ReturnSuccess::Value(Value { + value: UntaggedValue::Error(err), + tag, + })) => { + context.error(err.clone()); + InputStream::one(UntaggedValue::Error(err).into_value(tag)) + } + + Ok(ReturnSuccess::Value(v)) => InputStream::one(v), + + Ok(ReturnSuccess::DebugValue(v)) => { + let doc = PrettyDebug::pretty_doc(&v); + let mut buffer = termcolor::Buffer::ansi(); + + let _ = doc.render_raw( + context.with_host(|host| host.width() - 5), + &mut nu_source::TermColored::new(&mut buffer), + ); + + let value = String::from_utf8_lossy(buffer.as_slice()); + + InputStream::one(UntaggedValue::string(value).into_untagged_value()) + } + + Err(err) => { + context.error(err.clone()); + InputStream::one(UntaggedValue::Error(err).into_untagged_value()) } } - CommandAction::EnterValueShell(value) => { - context - .shell_manager - .insert_at_current(Box::new(ValueShell::new(value))); - } - CommandAction::EnterShell(location) => { - context.shell_manager.insert_at_current(Box::new( - FilesystemShell::with_location(location, context.registry().clone())?, - )); - } - CommandAction::AddAlias(name, args, block) => { - context.add_commands(vec![ - whole_stream_command(AliasCommand::new( - name, - args, - block, - )) - ]); - } - CommandAction::PreviousShell => { - context.shell_manager.prev(); - } - CommandAction::NextShell => { - context.shell_manager.next(); - } - CommandAction::LeaveShell => { - context.shell_manager.remove_at_current(); - if context.shell_manager.is_empty() { - std::process::exit(0); // TODO: save history.txt - } - } - }, - - Ok(ReturnSuccess::Value(Value { - value: UntaggedValue::Error(err), - .. - })) => { - context.error(err.clone()); - yield Err(err); - break; } - - Ok(ReturnSuccess::Value(v)) => { - yielded = true; - yield Ok(v); - } - - Ok(ReturnSuccess::DebugValue(v)) => { - yielded = true; - - let doc = PrettyDebug::pretty_doc(&v); - let mut buffer = termcolor::Buffer::ansi(); - - let _ = doc.render_raw( - context.with_host(|host| host.width() - 5), - &mut nu_source::TermColored::new(&mut buffer), - ); - - let value = String::from_utf8_lossy(buffer.as_slice()); - - yield Ok(UntaggedValue::string(value).into_untagged_value()) - } - - Err(err) => { - context.error(err); - break; - } - } - } - }; - - Ok(stream.to_input_stream()) + }) + .flatten() + .take_while(|x| futures::future::ready(!x.is_error())), + )) } diff --git a/crates/nu-cli/src/commands/lines.rs b/crates/nu-cli/src/commands/lines.rs index 3ddd028327..4af00a7258 100644 --- a/crates/nu-cli/src/commands/lines.rs +++ b/crates/nu-cli/src/commands/lines.rs @@ -24,7 +24,7 @@ impl WholeStreamCommand for Lines { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - lines(args, registry) + lines(args, registry).await } fn examples(&self) -> Vec { @@ -46,82 +46,121 @@ fn ends_with_line_ending(st: &str) -> bool { } } -fn lines(args: CommandArgs, registry: &CommandRegistry) -> Result { - let mut leftover = vec![]; - let mut leftover_string = String::new(); +async fn lines(args: CommandArgs, registry: &CommandRegistry) -> Result { + let leftover = Arc::new(vec![]); + let leftover_string = Arc::new(String::new()); let registry = registry.clone(); - let stream = async_stream! { - let args = args.evaluate_once(®istry).await.unwrap(); - let tag = args.name_tag(); - let name_span = tag.span; - let mut input = args.input; - loop { - match input.next().await { - Some(Value { value: UntaggedValue::Primitive(Primitive::String(st)), ..}) => { - let mut st = leftover_string.clone() + &st; - leftover.clear(); + let args = args.evaluate_once(®istry).await?; + let tag = args.name_tag(); + let name_span = tag.span; + + let eos = futures::stream::iter(vec![ + UntaggedValue::Primitive(Primitive::EndOfStream).into_untagged_value() + ]); + + Ok(args + .input + .chain(eos) + .map(move |item| { + let mut leftover = leftover.clone(); + let mut leftover_string = leftover_string.clone(); + + match item { + Value { + value: UntaggedValue::Primitive(Primitive::String(st)), + .. + } => { + let st = (&*leftover_string).clone() + &st; + if let Some(leftover) = Arc::get_mut(&mut leftover) { + leftover.clear(); + } let mut lines: Vec = st.lines().map(|x| x.to_string()).collect(); if !ends_with_line_ending(&st) { if let Some(last) = lines.pop() { - leftover_string = last; - } else { + if let Some(leftover_string) = Arc::get_mut(&mut leftover_string) { + leftover_string.clear(); + leftover_string.push_str(&last); + } + } else if let Some(leftover_string) = Arc::get_mut(&mut leftover_string) { leftover_string.clear(); } - } else { + } else if let Some(leftover_string) = Arc::get_mut(&mut leftover_string) { leftover_string.clear(); } - let success_lines: Vec<_> = lines.iter().map(|x| ReturnSuccess::value(UntaggedValue::line(x).into_untagged_value())).collect(); + let success_lines: Vec<_> = lines + .iter() + .map(|x| ReturnSuccess::value(UntaggedValue::line(x).into_untagged_value())) + .collect(); - yield futures::stream::iter(success_lines) + futures::stream::iter(success_lines) } - Some(Value { value: UntaggedValue::Primitive(Primitive::Line(st)), ..}) => { - let mut st = leftover_string.clone() + &st; - leftover.clear(); - + Value { + value: UntaggedValue::Primitive(Primitive::Line(st)), + .. + } => { + let st = (&*leftover_string).clone() + &st; + if let Some(leftover) = Arc::get_mut(&mut leftover) { + leftover.clear(); + } let mut lines: Vec = st.lines().map(|x| x.to_string()).collect(); if !ends_with_line_ending(&st) { if let Some(last) = lines.pop() { - leftover_string = last; - } else { + if let Some(leftover_string) = Arc::get_mut(&mut leftover_string) { + leftover_string.clear(); + leftover_string.push_str(&last); + } + } else if let Some(leftover_string) = Arc::get_mut(&mut leftover_string) { leftover_string.clear(); } - } else { + } else if let Some(leftover_string) = Arc::get_mut(&mut leftover_string) { leftover_string.clear(); } - let success_lines: Vec<_> = lines.iter().map(|x| ReturnSuccess::value(UntaggedValue::line(x).into_untagged_value())).collect(); - yield futures::stream::iter(success_lines) + let success_lines: Vec<_> = lines + .iter() + .map(|x| ReturnSuccess::value(UntaggedValue::line(x).into_untagged_value())) + .collect(); + futures::stream::iter(success_lines) } - Some( Value { tag: value_span, ..}) => { - yield futures::stream::iter(vec![Err(ShellError::labeled_error_with_secondary( - "Expected a string from pipeline", - "requires string input", - name_span, - "value originates from here", - value_span, - ))]); - } - None => { + Value { + value: UntaggedValue::Primitive(Primitive::EndOfStream), + .. + } => { if !leftover.is_empty() { - let mut st = leftover_string.clone(); - if let Ok(extra) = String::from_utf8(leftover) { + let mut st = (&*leftover_string).clone(); + if let Ok(extra) = String::from_utf8((&*leftover).clone()) { st.push_str(&extra); } - yield futures::stream::iter(vec![ReturnSuccess::value(UntaggedValue::string(st).into_untagged_value())]) + // futures::stream::iter(vec![ReturnSuccess::value( + // UntaggedValue::string(st).into_untagged_value(), + // )]) + if !st.is_empty() { + futures::stream::iter(vec![ReturnSuccess::value( + UntaggedValue::string(&*leftover_string).into_untagged_value(), + )]) + } else { + futures::stream::iter(vec![]) + } + } else { + futures::stream::iter(vec![]) } - break; } + Value { + tag: value_span, .. + } => futures::stream::iter(vec![Err(ShellError::labeled_error_with_secondary( + "Expected a string from pipeline", + "requires string input", + name_span, + "value originates from here", + value_span, + ))]), } - } - if !leftover_string.is_empty() { - yield futures::stream::iter(vec![ReturnSuccess::value(UntaggedValue::string(leftover_string).into_untagged_value())]); - } - } - .flatten(); - Ok(stream.to_output_stream()) + }) + .flatten() + .to_output_stream()) } #[cfg(test)] diff --git a/crates/nu-cli/src/commands/plugin.rs b/crates/nu-cli/src/commands/plugin.rs index 067f6dd001..033ba0e493 100644 --- a/crates/nu-cli/src/commands/plugin.rs +++ b/crates/nu-cli/src/commands/plugin.rs @@ -3,7 +3,7 @@ use crate::prelude::*; use derive_new::new; use log::trace; use nu_errors::ShellError; -use nu_protocol::{ReturnValue, Signature, Value}; +use nu_protocol::{Primitive, ReturnValue, Signature, UntaggedValue, Value}; use serde::{self, Deserialize, Serialize}; use std::io::prelude::*; use std::io::BufReader; @@ -61,11 +61,11 @@ impl WholeStreamCommand for PluginCommand { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - filter_plugin(self.path.clone(), args, registry) + filter_plugin(self.path.clone(), args, registry).await } } -pub fn filter_plugin( +pub async fn filter_plugin( path: String, args: CommandArgs, registry: &CommandRegistry, @@ -75,207 +75,216 @@ pub fn filter_plugin( let scope = args.call_info.scope.clone(); - let stream = async_stream! { - let mut args = args.evaluate_once_with_scope(®istry, &scope).await?; + let bos = futures::stream::iter(vec![ + UntaggedValue::Primitive(Primitive::BeginningOfStream).into_untagged_value() + ]); + let eos = futures::stream::iter(vec![ + UntaggedValue::Primitive(Primitive::EndOfStream).into_untagged_value() + ]); - let mut child = std::process::Command::new(path) - .stdin(std::process::Stdio::piped()) - .stdout(std::process::Stdio::piped()) - .spawn() - .expect("Failed to spawn child process"); + let args = args.evaluate_once_with_scope(®istry, &scope).await?; - let call_info = args.call_info.clone(); + let mut child = std::process::Command::new(path) + .stdin(std::process::Stdio::piped()) + .stdout(std::process::Stdio::piped()) + .spawn() + .expect("Failed to spawn child process"); - trace!("filtering :: {:?}", call_info); + let call_info = args.call_info.clone(); - // Beginning of the stream - { - let stdin = child.stdin.as_mut().expect("Failed to open stdin"); - let stdout = child.stdout.as_mut().expect("Failed to open stdout"); + trace!("filtering :: {:?}", call_info); - let mut reader = BufReader::new(stdout); + Ok(bos + .chain(args.input) + .chain(eos) + .map(move |item| { + match item { + Value { + value: UntaggedValue::Primitive(Primitive::BeginningOfStream), + .. + } => { + // Beginning of the stream + let stdin = child.stdin.as_mut().expect("Failed to open stdin"); + let stdout = child.stdout.as_mut().expect("Failed to open stdout"); - let request = JsonRpc::new("begin_filter", call_info.clone()); - let request_raw = serde_json::to_string(&request); + let mut reader = BufReader::new(stdout); - match request_raw { - Err(_) => { - yield Err(ShellError::labeled_error( - "Could not load json from plugin", - "could not load json from plugin", - &call_info.name_tag, - )); - } - Ok(request_raw) => match stdin.write(format!("{}\n", request_raw).as_bytes()) { - Ok(_) => {} - Err(err) => { - yield Err(ShellError::unexpected(format!("{}", err))); - } - }, - } + let request = JsonRpc::new("begin_filter", call_info.clone()); + let request_raw = serde_json::to_string(&request); - let mut input = String::new(); - match reader.read_line(&mut input) { - Ok(_) => { - let response = serde_json::from_str::(&input); - match response { - Ok(NuResult::response { params }) => match params { - Ok(params) => for param in params { yield param }, - Err(e) => { - yield ReturnValue::Err(e); + match request_raw { + Err(_) => { + return OutputStream::one(Err(ShellError::labeled_error( + "Could not load json from plugin", + "could not load json from plugin", + &call_info.name_tag, + ))); + } + Ok(request_raw) => { + match stdin.write(format!("{}\n", request_raw).as_bytes()) { + Ok(_) => {} + Err(err) => { + return OutputStream::one(Err(ShellError::unexpected( + format!("{}", err), + ))); + } } - }, + } + } + + let mut input = String::new(); + match reader.read_line(&mut input) { + Ok(_) => { + let response = serde_json::from_str::(&input); + match response { + Ok(NuResult::response { params }) => match params { + Ok(params) => futures::stream::iter(params).to_output_stream(), + Err(e) => futures::stream::iter(vec![ReturnValue::Err(e)]) + .to_output_stream(), + }, + + Err(e) => OutputStream::one(Err( + ShellError::untagged_runtime_error(format!( + "Error while processing begin_filter response: {:?} {}", + e, input + )), + )), + } + } + Err(e) => OutputStream::one(Err(ShellError::untagged_runtime_error( + format!("Error while reading begin_filter response: {:?}", e), + ))), + } + } + Value { + value: UntaggedValue::Primitive(Primitive::EndOfStream), + .. + } => { + // post stream contents + let stdin = child.stdin.as_mut().expect("Failed to open stdin"); + let stdout = child.stdout.as_mut().expect("Failed to open stdout"); + + let mut reader = BufReader::new(stdout); + + let request: JsonRpc> = JsonRpc::new("end_filter", vec![]); + let request_raw = serde_json::to_string(&request); + + match request_raw { + Err(_) => { + return OutputStream::one(Err(ShellError::labeled_error( + "Could not load json from plugin", + "could not load json from plugin", + &call_info.name_tag, + ))); + } + Ok(request_raw) => { + match stdin.write(format!("{}\n", request_raw).as_bytes()) { + Ok(_) => {} + Err(err) => { + return OutputStream::one(Err(ShellError::unexpected( + format!("{}", err), + ))); + } + } + } + } + + let mut input = String::new(); + let stream = match reader.read_line(&mut input) { + Ok(_) => { + let response = serde_json::from_str::(&input); + match response { + Ok(NuResult::response { params }) => match params { + Ok(params) => futures::stream::iter(params).to_output_stream(), + Err(e) => futures::stream::iter(vec![ReturnValue::Err(e)]) + .to_output_stream(), + }, + Err(e) => futures::stream::iter(vec![Err( + ShellError::untagged_runtime_error(format!( + "Error while processing end_filter response: {:?} {}", + e, input + )), + )]) + .to_output_stream(), + } + } Err(e) => { - yield Err(ShellError::untagged_runtime_error(format!( - "Error while processing begin_filter response: {:?} {}", - e, input + futures::stream::iter(vec![Err(ShellError::untagged_runtime_error( + format!("Error while reading end_filter response: {:?}", e), + ))]) + .to_output_stream() + } + }; + + let stdin = child.stdin.as_mut().expect("Failed to open stdin"); + + let request: JsonRpc> = JsonRpc::new("quit", vec![]); + let request_raw = serde_json::to_string(&request); + + match request_raw { + Ok(request_raw) => { + let _ = stdin.write(format!("{}\n", request_raw).as_bytes()); + // TODO: Handle error + } + Err(e) => { + return OutputStream::one(Err(ShellError::untagged_runtime_error( + format!("Error while processing quit response: {:?}", e), ))); } } + let _ = child.wait(); + + stream } - Err(e) => { - yield Err(ShellError::untagged_runtime_error(format!( - "Error while reading begin_filter response: {:?}", - e - ))); - } - } - } - // Stream contents - { - while let Some(v) = args.input.next().await { - let stdin = child.stdin.as_mut().expect("Failed to open stdin"); - let stdout = child.stdout.as_mut().expect("Failed to open stdout"); + v => { + // Stream contents + let stdin = child.stdin.as_mut().expect("Failed to open stdin"); + let stdout = child.stdout.as_mut().expect("Failed to open stdout"); - let mut reader = BufReader::new(stdout); + let mut reader = BufReader::new(stdout); - let request = JsonRpc::new("filter", v); - let request_raw = serde_json::to_string(&request); - match request_raw { - Ok(request_raw) => { - let _ = stdin.write(format!("{}\n", request_raw).as_bytes()); // TODO: Handle error + let request = JsonRpc::new("filter", v); + let request_raw = serde_json::to_string(&request); + match request_raw { + Ok(request_raw) => { + let _ = stdin.write(format!("{}\n", request_raw).as_bytes()); + // TODO: Handle error + } + Err(e) => { + return OutputStream::one(Err(ShellError::untagged_runtime_error( + format!("Error while processing filter response: {:?}", e), + ))); + } } - Err(e) => { - yield Err(ShellError::untagged_runtime_error(format!( - "Error while processing filter response: {:?}", - e - ))); - } - } - let mut input = String::new(); - match reader.read_line(&mut input) { - Ok(_) => { - let response = serde_json::from_str::(&input); - match response { - Ok(NuResult::response { params }) => match params { - Ok(params) => for param in params { yield param }, - Err(e) => { - yield ReturnValue::Err(e); - } - }, - Err(e) => { - yield Err(ShellError::untagged_runtime_error(format!( + let mut input = String::new(); + match reader.read_line(&mut input) { + Ok(_) => { + let response = serde_json::from_str::(&input); + match response { + Ok(NuResult::response { params }) => match params { + Ok(params) => futures::stream::iter(params).to_output_stream(), + Err(e) => futures::stream::iter(vec![ReturnValue::Err(e)]) + .to_output_stream(), + }, + Err(e) => OutputStream::one(Err( + ShellError::untagged_runtime_error(format!( "Error while processing filter response: {:?}\n== input ==\n{}", e, input - ))); + )), + )), } } - } - Err(e) => { - yield Err(ShellError::untagged_runtime_error(format!( - "Error while reading filter response: {:?}", - e - ))); + Err(e) => OutputStream::one(Err(ShellError::untagged_runtime_error( + format!("Error while reading filter response: {:?}", e), + ))), } } - } - } - - // post stream contents - { - let stdin = child.stdin.as_mut().expect("Failed to open stdin"); - let stdout = child.stdout.as_mut().expect("Failed to open stdout"); - - let mut reader = BufReader::new(stdout); - - let request: JsonRpc> = JsonRpc::new("end_filter", vec![]); - let request_raw = serde_json::to_string(&request); - - match request_raw { - Err(_) => { - yield Err(ShellError::labeled_error( - "Could not load json from plugin", - "could not load json from plugin", - &call_info.name_tag, - )); - } - Ok(request_raw) => match stdin.write(format!("{}\n", request_raw).as_bytes()) { - Ok(_) => {} - Err(err) => { - yield Err(ShellError::unexpected(format!("{}", err))); - } - }, - } - - let mut input = String::new(); - match reader.read_line(&mut input) { - Ok(_) => { - let response = serde_json::from_str::(&input); - match response { - Ok(NuResult::response { params }) => match params { - Ok(params) => for param in params { yield param }, - Err(e) => { - yield ReturnValue::Err(e); - } - }, - Err(e) => { - yield Err(ShellError::untagged_runtime_error(format!( - "Error while processing end_filter response: {:?} {}", - e, input - ))); - } - } - } - Err(e) => { - yield Err(ShellError::untagged_runtime_error(format!( - "Error while reading end_filter response: {:?}", - e - ))); - } - } - } - - // End of the stream - { - let stdin = child.stdin.as_mut().expect("Failed to open stdin"); - let stdout = child.stdout.as_mut().expect("Failed to open stdout"); - - let mut reader = BufReader::new(stdout); - - let request: JsonRpc> = JsonRpc::new("quit", vec![]); - let request_raw = serde_json::to_string(&request); - - match request_raw { - Ok(request_raw) => { - let _ = stdin.write(format!("{}\n", request_raw).as_bytes()); // TODO: Handle error - } - Err(e) => { - yield Err(ShellError::untagged_runtime_error(format!( - "Error while processing quit response: {:?}", - e - ))); - return; - } - } - } - - let _ = child.wait(); - }; - - Ok(stream.to_output_stream()) + }) + .flatten() + .to_output_stream()) } #[derive(new)] diff --git a/crates/nu-cli/src/prelude.rs b/crates/nu-cli/src/prelude.rs index f960ebf63d..e8df6041a1 100644 --- a/crates/nu-cli/src/prelude.rs +++ b/crates/nu-cli/src/prelude.rs @@ -84,7 +84,6 @@ pub(crate) use crate::shell::help_shell::HelpShell; pub(crate) use crate::shell::shell_manager::ShellManager; pub(crate) use crate::shell::value_shell::ValueShell; pub(crate) use crate::stream::{InputStream, InterruptibleStream, OutputStream}; -pub(crate) use async_stream::stream as async_stream; pub(crate) use bigdecimal::BigDecimal; pub(crate) use futures::stream::BoxStream; pub(crate) use futures::{Stream, StreamExt}; diff --git a/crates/nu-cli/src/shell/filesystem_shell.rs b/crates/nu-cli/src/shell/filesystem_shell.rs index 4001e9b001..874cb41e7c 100644 --- a/crates/nu-cli/src/shell/filesystem_shell.rs +++ b/crates/nu-cli/src/shell/filesystem_shell.rs @@ -147,40 +147,44 @@ impl Shell for FilesystemShell { } // Generated stream: impl Stream - let stream = async_stream::try_stream! { - for path in paths { - let path = path.map_err(|e| ShellError::from(e.into_error()))?; - if !all && is_hidden_dir(&path) { - continue; - } + Ok(futures::stream::iter(paths.filter_map(move |path| { + let path = match path.map_err(|e| ShellError::from(e.into_error())) { + Ok(path) => path, + Err(err) => return Some(Err(err)), + }; - let metadata = match std::fs::symlink_metadata(&path) { - Ok(metadata) => Ok(Some(metadata)), - Err(e) => if let PermissionDenied = e.kind() { - Ok(None) - } else { - Err(e) - }, - }?; - - let entry = dir_entry_dict( - &path, - metadata.as_ref(), - name_tag.clone(), - full, - short_names, - with_symlink_targets, - du, - ctrl_c.clone() - ) - .map(|entry| ReturnSuccess::Value(entry.into()))?; - - yield entry; + if !all && is_hidden_dir(&path) { + return None; } - }; - Ok(stream.interruptible(ctrl_c_copy).to_output_stream()) + let metadata = match std::fs::symlink_metadata(&path) { + Ok(metadata) => Some(metadata), + Err(e) => { + if e.kind() == std::io::ErrorKind::PermissionDenied { + None + } else { + return Some(Err(e.into())); + } + } + }; + + let entry = dir_entry_dict( + &path, + metadata.as_ref(), + name_tag.clone(), + full, + short_names, + with_symlink_targets, + du, + ctrl_c.clone(), + ) + .map(ReturnSuccess::Value); + + Some(entry) + })) + .interruptible(ctrl_c_copy) + .to_output_stream()) } fn cd(&self, args: CdArgs, name: Tag) -> Result {