diff --git a/src/cli.rs b/src/cli.rs index 2441ebb195..db275ff30e 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -607,8 +607,7 @@ async fn process_line(readline: Result, ctx: &mut Context return LineResult::Error(line.to_string(), err); } - let input = InputStream::empty(); - match run_pipeline(pipeline, ctx, input, line).await { + match run_pipeline(pipeline, ctx, None, line).await { Ok(_) => LineResult::Success(line.to_string()), Err(err) => LineResult::Error(line.to_string(), err), } diff --git a/src/commands/classified/external.rs b/src/commands/classified/external.rs index 4f92523883..bf0a131acf 100644 --- a/src/commands/classified/external.rs +++ b/src/commands/classified/external.rs @@ -51,9 +51,9 @@ impl Decoder for LinesCodec { pub(crate) async fn run_external_command( command: ExternalCommand, context: &mut Context, - input: InputStream, + input: Option, is_last: bool, -) -> Result { +) -> Result, ShellError> { trace!(target: "nu::run::external", "-> {}", command.name); let has_it_arg = command.args.iter().any(|arg| arg.contains("$it")); @@ -67,13 +67,13 @@ pub(crate) async fn run_external_command( async fn run_with_iterator_arg( command: ExternalCommand, context: &mut Context, - input: InputStream, + input: Option, is_last: bool, -) -> Result { +) -> Result, ShellError> { let name = command.name; let args = command.args; let name_tag = command.name_tag; - let inputs = input.into_vec().await; + let inputs = input.unwrap_or_else(InputStream::empty).into_vec().await; trace!(target: "nu::run::external", "inputs = {:?}", inputs); @@ -137,7 +137,7 @@ async fn run_with_iterator_arg( if let Ok(mut popen) = popen { if is_last { let _ = popen.wait(); - Ok(InputStream::empty()) + Ok(None) } else { let stdout = popen.stdout.take().ok_or_else(|| { ShellError::untagged_runtime_error("Can't redirect the stdout for external command") @@ -148,7 +148,7 @@ async fn run_with_iterator_arg( line.expect("Internal error: could not read lines of text from stdin") .into_value(&name_tag) }); - Ok(stream.boxed().into()) + Ok(Some(stream.boxed().into())) } } else { Err(ShellError::labeled_error( @@ -162,9 +162,9 @@ async fn run_with_iterator_arg( async fn run_with_stdin( command: ExternalCommand, context: &mut Context, - mut input: InputStream, + input: Option, is_last: bool, -) -> Result { +) -> Result, ShellError> { let name_tag = command.name_tag; let home_dir = dirs::home_dir(); @@ -192,61 +192,65 @@ async fn run_with_stdin( trace!(target: "nu::run::external", "set up stdout pipe"); } - process = process.stdin(subprocess::Redirection::Pipe); - trace!(target: "nu::run::external", "set up stdin pipe"); + if input.is_some() { + process = process.stdin(subprocess::Redirection::Pipe); + trace!(target: "nu::run::external", "set up stdin pipe"); + } trace!(target: "nu::run::external", "built process {:?}", process); let popen = process.detached().popen(); if let Ok(mut popen) = popen { - let mut stdin_write = popen - .stdin - .take() - .expect("Internal error: could not get stdin pipe for external command"); - let stream = async_stream! { - while let Some(item) = input.next().await { - match item.value { - UntaggedValue::Primitive(Primitive::Nothing) => { - // If first in a pipeline, will receive Nothing. This is not an error. - }, + if let Some(mut input) = input { + let mut stdin_write = popen + .stdin + .take() + .expect("Internal error: could not get stdin pipe for external command"); - UntaggedValue::Primitive(Primitive::String(s)) | - UntaggedValue::Primitive(Primitive::Line(s)) => - { - if let Err(e) = stdin_write.write(s.as_bytes()) { - let message = format!("Unable to write to stdin (error = {})", e); + while let Some(item) = input.next().await { + match item.value { + UntaggedValue::Primitive(Primitive::Nothing) => { + // If first in a pipeline, will receive Nothing. This is not an error. + }, + + UntaggedValue::Primitive(Primitive::String(s)) | + UntaggedValue::Primitive(Primitive::Line(s)) => + { + if let Err(e) = stdin_write.write(s.as_bytes()) { + let message = format!("Unable to write to stdin (error = {})", e); + yield Ok(Value { + value: UntaggedValue::Error(ShellError::labeled_error( + message, + "unable to write to stdin", + &name_tag, + )), + tag: name_tag, + }); + return; + } + }, + + // TODO serialize other primitives? https://github.com/nushell/nushell/issues/778 + + v => { + let message = format!("Received unexpected type from pipeline ({})", v.type_name()); yield Ok(Value { value: UntaggedValue::Error(ShellError::labeled_error( message, - "unable to write to stdin", + "expected a string", &name_tag, )), tag: name_tag, }); return; - } - }, - - // TODO serialize other primitives? https://github.com/nushell/nushell/issues/778 - - v => { - let message = format!("Received unexpected type from pipeline ({})", v.type_name()); - yield Ok(Value { - value: UntaggedValue::Error(ShellError::labeled_error( - message, - "expected a string", - &name_tag, - )), - tag: name_tag, - }); - return; - }, + }, + } } - } - // Close stdin, which informs the external process that there's no more input - drop(stdin_write); + // Close stdin, which informs the external process that there's no more input + drop(stdin_write); + } if !is_last { let stdout = if let Some(stdout) = popen.stdout.take() { @@ -302,7 +306,7 @@ async fn run_with_stdin( }; }; - Ok(stream.to_input_stream()) + Ok(Some(stream.to_input_stream())) } else { Err(ShellError::labeled_error( "Command not found", diff --git a/src/commands/classified/internal.rs b/src/commands/classified/internal.rs index 643246ebec..0f781ff244 100644 --- a/src/commands/classified/internal.rs +++ b/src/commands/classified/internal.rs @@ -8,16 +8,20 @@ use nu_protocol::{CommandAction, Primitive, ReturnSuccess, UntaggedValue, Value} pub(crate) async fn run_internal_command( command: InternalCommand, context: &mut Context, - input: InputStream, + input: Option, source: Text, -) -> Result { +) -> Result, ShellError> { if log_enabled!(log::Level::Trace) { trace!(target: "nu::run::internal", "->"); trace!(target: "nu::run::internal", "{}", command.name); trace!(target: "nu::run::internal", "{}", command.args.debug(&source)); } - let objects: InputStream = trace_stream!(target: "nu::trace_stream::internal", "input" = input); + let objects: InputStream = if let Some(input) = input { + trace_stream!(target: "nu::trace_stream::internal", "input" = input) + } else { + InputStream::empty() + }; let internal_command = context.expect_command(&command.name); @@ -167,5 +171,5 @@ pub(crate) async fn run_internal_command( } }; - Ok(stream.to_input_stream()) + Ok(Some(stream.to_input_stream())) } diff --git a/src/commands/classified/pipeline.rs b/src/commands/classified/pipeline.rs index e067a90bd0..645e46a1e3 100644 --- a/src/commands/classified/pipeline.rs +++ b/src/commands/classified/pipeline.rs @@ -11,7 +11,7 @@ use std::sync::atomic::Ordering; pub(crate) async fn run_pipeline( pipeline: ClassifiedPipeline, ctx: &mut Context, - mut input: InputStream, + mut input: Option, line: &str, ) -> Result<(), ShellError> { let mut iter = pipeline.commands.list.into_iter().peekable(); @@ -46,21 +46,23 @@ pub(crate) async fn run_pipeline( } use futures::stream::TryStreamExt; - let mut output_stream: OutputStream = input.into(); - loop { - match output_stream.try_next().await { - Ok(Some(ReturnSuccess::Value(Value { - value: UntaggedValue::Error(e), - .. - }))) => return Err(e), - Ok(Some(_item)) => { - if ctx.ctrl_c.load(Ordering::SeqCst) { + if let Some(input) = input { + let mut output_stream: OutputStream = input.into(); + loop { + match output_stream.try_next().await { + Ok(Some(ReturnSuccess::Value(Value { + value: UntaggedValue::Error(e), + .. + }))) => return Err(e), + Ok(Some(_item)) => { + if ctx.ctrl_c.load(Ordering::SeqCst) { + break; + } + } + _ => { break; } } - _ => { - break; - } } }