diff --git a/src/cli.rs b/src/cli.rs index 64d562f97e..a6720106b7 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -1,6 +1,5 @@ use crate::commands::classified::{ ClassifiedCommand, ClassifiedInputStream, ClassifiedPipeline, ExternalCommand, InternalCommand, - StreamNext, }; use crate::commands::plugin::JsonRpc; use crate::commands::plugin::{PluginCommand, PluginSink}; @@ -600,114 +599,17 @@ async fn process_line(readline: Result, ctx: &mut Context })), } - let mut input = ClassifiedInputStream::new(); - let mut iter = pipeline.commands.item.into_iter().peekable(); - // Check the config to see if we need to update the path // TODO: make sure config is cached so we don't path this load every call set_env_from_config(); - loop { - let item: Option = iter.next(); - let next: Option<&ClassifiedCommand> = iter.peek(); - - input = match (item, next) { - (None, _) => break, - - (Some(ClassifiedCommand::Dynamic(_)), _) - | (_, Some(ClassifiedCommand::Dynamic(_))) => { - return LineResult::Error( - line.to_string(), - ShellError::unimplemented("Dynamic commands"), - ) - } - - (Some(ClassifiedCommand::Expr(_)), _) => { - return LineResult::Error( - line.to_string(), - ShellError::unimplemented("Expression-only commands"), - ) - } - - (_, Some(ClassifiedCommand::Expr(_))) => { - return LineResult::Error( - line.to_string(), - ShellError::unimplemented("Expression-only commands"), - ) - } - - ( - Some(ClassifiedCommand::Internal(left)), - Some(ClassifiedCommand::External(_)), - ) => match left.run(ctx, input, Text::from(line)) { - Ok(val) => ClassifiedInputStream::from_input_stream(val), - Err(err) => return LineResult::Error(line.to_string(), err), - }, - - (Some(ClassifiedCommand::Internal(left)), Some(_)) => { - match left.run(ctx, input, Text::from(line)) { - Ok(val) => ClassifiedInputStream::from_input_stream(val), - Err(err) => return LineResult::Error(line.to_string(), err), - } - } - - (Some(ClassifiedCommand::Internal(left)), None) => { - match left.run(ctx, input, Text::from(line)) { - Ok(val) => { - use futures::stream::TryStreamExt; - - let mut output_stream: OutputStream = val.into(); - loop { - match output_stream.try_next().await { - Ok(Some(ReturnSuccess::Value(Tagged { - item: Value::Error(e), - .. - }))) => { - return LineResult::Error(line.to_string(), e); - } - Ok(Some(_item)) => { - if ctx.ctrl_c.load(Ordering::SeqCst) { - break; - } - } - _ => { - break; - } - } - } - - return LineResult::Success(line.to_string()); - } - Err(err) => return LineResult::Error(line.to_string(), err), - } - } - - ( - Some(ClassifiedCommand::External(left)), - Some(ClassifiedCommand::External(_)), - ) => match left.run(ctx, input, StreamNext::External).await { - Ok(val) => val, - Err(err) => return LineResult::Error(line.to_string(), err), - }, - - (Some(ClassifiedCommand::External(left)), Some(_)) => { - match left.run(ctx, input, StreamNext::Internal).await { - Ok(val) => val, - Err(err) => return LineResult::Error(line.to_string(), err), - } - } - - (Some(ClassifiedCommand::External(left)), None) => { - match left.run(ctx, input, StreamNext::Last).await { - Ok(val) => val, - Err(err) => return LineResult::Error(line.to_string(), err), - } - } - }; + let input = ClassifiedInputStream::new(); + match pipeline.run(ctx, input, line).await { + Ok(_) => LineResult::Success(line.to_string()), + Err(err) => LineResult::Error(line.to_string(), err), } - - LineResult::Success(line.to_string()) } + Err(ReadlineError::Interrupted) => LineResult::CtrlC, Err(ReadlineError::Eof) => LineResult::Break, Err(err) => { diff --git a/src/commands/classified/external.rs b/src/commands/classified/external.rs index 653e205639..c6dfe6c1f8 100644 --- a/src/commands/classified/external.rs +++ b/src/commands/classified/external.rs @@ -1,3 +1,4 @@ +use super::ClassifiedInputStream; use crate::prelude::*; use bytes::{BufMut, BytesMut}; use futures::stream::StreamExt; @@ -8,8 +9,6 @@ use std::fmt; use std::io::{Error, ErrorKind}; use subprocess::Exec; -use super::ClassifiedInputStream; - /// A simple `Codec` implementation that splits up data into lines. pub struct LinesCodec {} diff --git a/src/commands/classified/mod.rs b/src/commands/classified/mod.rs index 1675e113db..b0b320823f 100644 --- a/src/commands/classified/mod.rs +++ b/src/commands/classified/mod.rs @@ -9,6 +9,7 @@ mod pipeline; #[allow(unused_imports)] pub(crate) use dynamic::Command as DynamicCommand; +#[allow(unused_imports)] pub(crate) use external::{Command as ExternalCommand, StreamNext}; pub(crate) use internal::Command as InternalCommand; pub(crate) use pipeline::Pipeline as ClassifiedPipeline; diff --git a/src/commands/classified/pipeline.rs b/src/commands/classified/pipeline.rs index 3c78705d57..037ba5c122 100644 --- a/src/commands/classified/pipeline.rs +++ b/src/commands/classified/pipeline.rs @@ -1,7 +1,7 @@ +use super::{ClassifiedCommand, ClassifiedInputStream, StreamNext}; use crate::prelude::*; use std::fmt; - -use super::ClassifiedCommand; +use std::sync::atomic::Ordering; #[derive(Debug, Clone)] pub(crate) struct Pipeline { @@ -22,3 +22,70 @@ impl HasSpan for Pipeline { self.commands.span } } + +impl Pipeline { + pub(crate) async fn run( + self, + ctx: &mut Context, + mut input: ClassifiedInputStream, + line: &str, + ) -> Result<(), ShellError> { + let mut iter = self.commands.item.into_iter().peekable(); + + loop { + let item: Option = iter.next(); + let next: Option<&ClassifiedCommand> = iter.peek(); + + input = match (item, next) { + (Some(ClassifiedCommand::Dynamic(_)), _) + | (_, Some(ClassifiedCommand::Dynamic(_))) => { + return Err(ShellError::unimplemented("Dynamic commands")) + } + + (Some(ClassifiedCommand::Expr(_)), _) | (_, Some(ClassifiedCommand::Expr(_))) => { + return Err(ShellError::unimplemented("Expression-only commands")) + } + + (Some(ClassifiedCommand::Internal(left)), _) => { + let stream = left.run(ctx, input, Text::from(line))?; + ClassifiedInputStream::from_input_stream(stream) + } + + (Some(ClassifiedCommand::External(left)), Some(ClassifiedCommand::External(_))) => { + left.run(ctx, input, StreamNext::External).await? + } + + (Some(ClassifiedCommand::External(left)), Some(_)) => { + left.run(ctx, input, StreamNext::Internal).await? + } + + (Some(ClassifiedCommand::External(left)), None) => { + left.run(ctx, input, StreamNext::Last).await? + } + + (None, _) => break, + }; + } + + use futures::stream::TryStreamExt; + let mut output_stream: OutputStream = input.objects.into(); + loop { + match output_stream.try_next().await { + Ok(Some(ReturnSuccess::Value(Tagged { + item: Value::Error(e), + .. + }))) => return Err(e), + Ok(Some(_item)) => { + if ctx.ctrl_c.load(Ordering::SeqCst) { + break; + } + } + _ => { + break; + } + } + } + + Ok(()) + } +}