From 339ec46961e97268e589753783dac31360082d48 Mon Sep 17 00:00:00 2001 From: Jason Gedge Date: Sun, 24 Nov 2019 17:19:12 -0500 Subject: [PATCH 1/4] Refactor classified.rs into separate modules. Adds modules for internal, external, and dynamic commands, as well as the pipeline functionality. These are exported as their old names from the classified module so as to keep its "interface" the same. --- src/commands/classified.rs | 453 ---------------------------- src/commands/classified/dynamic.rs | 7 + src/commands/classified/external.rs | 224 ++++++++++++++ src/commands/classified/internal.rs | 151 ++++++++++ src/commands/classified/mod.rs | 74 +++++ src/commands/classified/pipeline.rs | 24 ++ 6 files changed, 480 insertions(+), 453 deletions(-) delete mode 100644 src/commands/classified.rs create mode 100644 src/commands/classified/dynamic.rs create mode 100644 src/commands/classified/external.rs create mode 100644 src/commands/classified/internal.rs create mode 100644 src/commands/classified/mod.rs create mode 100644 src/commands/classified/pipeline.rs diff --git a/src/commands/classified.rs b/src/commands/classified.rs deleted file mode 100644 index 7dd72af4fa..0000000000 --- a/src/commands/classified.rs +++ /dev/null @@ -1,453 +0,0 @@ -use crate::parser::{hir, TokenNode}; -use crate::prelude::*; -use bytes::{BufMut, BytesMut}; -use derive_new::new; -use futures::stream::StreamExt; -use futures_codec::{Decoder, Encoder, Framed}; -use itertools::Itertools; -use log::{log_enabled, trace}; -use std::fmt; -use std::io::{Error, ErrorKind}; -use subprocess::Exec; - -/// A simple `Codec` implementation that splits up data into lines. -pub struct LinesCodec {} - -impl Encoder for LinesCodec { - type Item = String; - type Error = Error; - - fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { - dst.put(item); - Ok(()) - } -} - -impl Decoder for LinesCodec { - type Item = String; - type Error = Error; - - fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { - match src.iter().position(|b| b == &b'\n') { - Some(pos) if !src.is_empty() => { - let buf = src.split_to(pos + 1); - String::from_utf8(buf.to_vec()) - .map(Some) - .map_err(|e| Error::new(ErrorKind::InvalidData, e)) - } - _ if !src.is_empty() => { - let drained = src.take(); - String::from_utf8(drained.to_vec()) - .map(Some) - .map_err(|e| Error::new(ErrorKind::InvalidData, e)) - } - _ => Ok(None), - } - } -} - -pub(crate) struct ClassifiedInputStream { - pub(crate) objects: InputStream, - pub(crate) stdin: Option, -} - -impl ClassifiedInputStream { - pub(crate) fn new() -> ClassifiedInputStream { - ClassifiedInputStream { - objects: vec![Value::nothing().tagged(Tag::unknown())].into(), - stdin: None, - } - } - - pub(crate) fn from_input_stream(stream: impl Into) -> ClassifiedInputStream { - ClassifiedInputStream { - objects: stream.into(), - stdin: None, - } - } - - pub(crate) fn from_stdout(stdout: std::fs::File) -> ClassifiedInputStream { - ClassifiedInputStream { - objects: VecDeque::new().into(), - stdin: Some(stdout), - } - } -} - -#[derive(Debug, Clone)] -pub(crate) struct ClassifiedPipeline { - pub(crate) commands: Spanned>, -} - -impl FormatDebug for ClassifiedPipeline { - fn fmt_debug(&self, f: &mut DebugFormatter, source: &str) -> fmt::Result { - f.say_str( - "classified pipeline", - self.commands.iter().map(|c| c.debug(source)).join(" | "), - ) - } -} - -impl HasSpan for ClassifiedPipeline { - fn span(&self) -> Span { - self.commands.span - } -} - -#[derive(Debug, Clone, Eq, PartialEq)] -pub(crate) enum ClassifiedCommand { - #[allow(unused)] - Expr(TokenNode), - Internal(InternalCommand), - #[allow(unused)] - Dynamic(Spanned), - External(ExternalCommand), -} - -impl FormatDebug for ClassifiedCommand { - fn fmt_debug(&self, f: &mut DebugFormatter, source: &str) -> fmt::Result { - match self { - ClassifiedCommand::Expr(expr) => expr.fmt_debug(f, source), - ClassifiedCommand::Internal(internal) => internal.fmt_debug(f, source), - ClassifiedCommand::Dynamic(dynamic) => dynamic.fmt_debug(f, source), - ClassifiedCommand::External(external) => external.fmt_debug(f, source), - } - } -} - -impl HasSpan for ClassifiedCommand { - fn span(&self) -> Span { - match self { - ClassifiedCommand::Expr(node) => node.span(), - ClassifiedCommand::Internal(command) => command.span(), - ClassifiedCommand::Dynamic(call) => call.span, - ClassifiedCommand::External(command) => command.span(), - } - } -} - -#[derive(new, Debug, Clone, Eq, PartialEq)] -pub(crate) struct InternalCommand { - pub(crate) name: String, - pub(crate) name_tag: Tag, - pub(crate) args: Spanned, -} - -impl HasSpan for InternalCommand { - fn span(&self) -> Span { - let start = self.name_tag.span; - - start.until(self.args.span) - } -} - -impl FormatDebug for InternalCommand { - fn fmt_debug(&self, f: &mut DebugFormatter, source: &str) -> fmt::Result { - f.say("internal", self.args.debug(source)) - } -} - -#[derive(new, Debug, Eq, PartialEq)] -pub(crate) struct DynamicCommand { - pub(crate) args: hir::Call, -} - -impl InternalCommand { - pub(crate) fn run( - self, - context: &mut Context, - input: ClassifiedInputStream, - source: Text, - ) -> Result { - if log_enabled!(log::Level::Trace) { - trace!(target: "nu::run::internal", "->"); - trace!(target: "nu::run::internal", "{}", self.name); - trace!(target: "nu::run::internal", "{}", self.args.debug(&source)); - } - - let objects: InputStream = trace_stream!(target: "nu::trace_stream::internal", source: source, "input" = input.objects); - - let command = context.expect_command(&self.name); - - let result = { - context.run_command( - command, - self.name_tag.clone(), - self.args.item, - &source, - objects, - ) - }; - - let result = trace_out_stream!(target: "nu::trace_stream::internal", source: source, "output" = result); - let mut result = result.values; - let mut context = context.clone(); - - let stream = async_stream! { - let mut soft_errs: Vec = vec![]; - let mut yielded = false; - - while let Some(item) = result.next().await { - match item { - Ok(ReturnSuccess::Action(action)) => match action { - CommandAction::ChangePath(path) => { - context.shell_manager.set_path(path); - } - CommandAction::Exit => std::process::exit(0), // TODO: save history.txt - CommandAction::Error(err) => { - context.error(err); - break; - } - CommandAction::EnterHelpShell(value) => { - match value { - Tagged { - item: Value::Primitive(Primitive::String(cmd)), - tag, - } => { - context.shell_manager.insert_at_current(Box::new( - HelpShell::for_command( - Value::string(cmd).tagged(tag), - &context.registry(), - ).unwrap(), - )); - } - _ => { - context.shell_manager.insert_at_current(Box::new( - HelpShell::index(&context.registry()).unwrap(), - )); - } - } - } - CommandAction::EnterValueShell(value) => { - context - .shell_manager - .insert_at_current(Box::new(ValueShell::new(value))); - } - CommandAction::EnterShell(location) => { - context.shell_manager.insert_at_current(Box::new( - FilesystemShell::with_location(location, context.registry().clone()).unwrap(), - )); - } - CommandAction::PreviousShell => { - context.shell_manager.prev(); - } - CommandAction::NextShell => { - context.shell_manager.next(); - } - CommandAction::LeaveShell => { - context.shell_manager.remove_at_current(); - if context.shell_manager.is_empty() { - std::process::exit(0); // TODO: save history.txt - } - } - }, - - Ok(ReturnSuccess::Value(v)) => { - yielded = true; - yield Ok(v); - } - - Ok(ReturnSuccess::DebugValue(v)) => { - yielded = true; - - let doc = v.item.pretty_doc(); - let mut buffer = termcolor::Buffer::ansi(); - - doc.render_raw( - context.with_host(|host| host.width() - 5), - &mut crate::parser::debug::TermColored::new(&mut buffer), - ).unwrap(); - - let value = String::from_utf8_lossy(buffer.as_slice()); - - yield Ok(Value::string(value).tagged_unknown()) - } - - Err(err) => { - context.error(err); - break; - } - } - } - }; - - Ok(stream.to_input_stream()) - } -} - -#[derive(Debug, Clone, Eq, PartialEq)] -pub(crate) struct ExternalCommand { - pub(crate) name: String, - - pub(crate) name_tag: Tag, - pub(crate) args: Spanned>>, -} - -impl FormatDebug for ExternalCommand { - fn fmt_debug(&self, f: &mut DebugFormatter, source: &str) -> fmt::Result { - write!(f, "{}", self.name)?; - - if self.args.item.len() > 0 { - write!(f, " ")?; - write!(f, "{}", self.args.iter().map(|i| i.debug(source)).join(" "))?; - } - - Ok(()) - } -} - -impl HasSpan for ExternalCommand { - fn span(&self) -> Span { - self.name_tag.span.until(self.args.span) - } -} - -#[derive(Debug)] -pub(crate) enum StreamNext { - Last, - External, - Internal, -} - -impl ExternalCommand { - pub(crate) async fn run( - self, - context: &mut Context, - input: ClassifiedInputStream, - stream_next: StreamNext, - ) -> Result { - let stdin = input.stdin; - let inputs: Vec> = input.objects.into_vec().await; - - trace!(target: "nu::run::external", "-> {}", self.name); - trace!(target: "nu::run::external", "inputs = {:?}", inputs); - - let mut arg_string = format!("{}", self.name); - for arg in &self.args.item { - arg_string.push_str(&arg); - } - - trace!(target: "nu::run::external", "command = {:?}", self.name); - - let mut process; - if arg_string.contains("$it") { - let input_strings = inputs - .iter() - .map(|i| { - i.as_string().map_err(|_| { - let arg = self.args.iter().find(|arg| arg.item.contains("$it")); - if let Some(arg) = arg { - ShellError::labeled_error( - "External $it needs string data", - "given row instead of string data", - arg.tag(), - ) - } else { - ShellError::labeled_error( - "$it needs string data", - "given something else", - self.name_tag.clone(), - ) - } - }) - }) - .collect::, ShellError>>()?; - - let commands = input_strings.iter().map(|i| { - let args = self.args.iter().filter_map(|arg| { - if arg.chars().all(|c| c.is_whitespace()) { - None - } else { - Some(arg.replace("$it", &i)) - } - }); - - format!("{} {}", self.name, itertools::join(args, " ")) - }); - - process = Exec::shell(itertools::join(commands, " && ")) - } else { - process = Exec::cmd(&self.name); - for arg in &self.args.item { - let arg_chars: Vec<_> = arg.chars().collect(); - if arg_chars.len() > 1 - && arg_chars[0] == '"' - && arg_chars[arg_chars.len() - 1] == '"' - { - // quoted string - let new_arg: String = arg_chars[1..arg_chars.len() - 1].iter().collect(); - process = process.arg(new_arg); - } else { - process = process.arg(arg.item.clone()); - } - } - } - - process = process.cwd(context.shell_manager.path()); - - trace!(target: "nu::run::external", "cwd = {:?}", context.shell_manager.path()); - - let mut process = match stream_next { - StreamNext::Last => process, - StreamNext::External | StreamNext::Internal => { - process.stdout(subprocess::Redirection::Pipe) - } - }; - - trace!(target: "nu::run::external", "set up stdout pipe"); - - if let Some(stdin) = stdin { - process = process.stdin(stdin); - } - - trace!(target: "nu::run::external", "set up stdin pipe"); - trace!(target: "nu::run::external", "built process {:?}", process); - - let popen = process.popen(); - - trace!(target: "nu::run::external", "next = {:?}", stream_next); - - let name_tag = self.name_tag.clone(); - if let Ok(mut popen) = popen { - match stream_next { - StreamNext::Last => { - let _ = popen.detach(); - loop { - match popen.poll() { - None => { - let _ = std::thread::sleep(std::time::Duration::new(0, 100000000)); - } - _ => { - let _ = popen.terminate(); - break; - } - } - } - Ok(ClassifiedInputStream::new()) - } - StreamNext::External => { - let _ = popen.detach(); - let stdout = popen.stdout.take().unwrap(); - Ok(ClassifiedInputStream::from_stdout(stdout)) - } - StreamNext::Internal => { - let _ = popen.detach(); - let stdout = popen.stdout.take().unwrap(); - let file = futures::io::AllowStdIo::new(stdout); - let stream = Framed::new(file, LinesCodec {}); - let stream = - stream.map(move |line| Value::string(line.unwrap()).tagged(&name_tag)); - Ok(ClassifiedInputStream::from_input_stream( - stream.boxed() as BoxStream<'static, Tagged> - )) - } - } - } else { - return Err(ShellError::labeled_error( - "Command not found", - "command not found", - name_tag, - )); - } - } -} diff --git a/src/commands/classified/dynamic.rs b/src/commands/classified/dynamic.rs new file mode 100644 index 0000000000..8e6e7d6510 --- /dev/null +++ b/src/commands/classified/dynamic.rs @@ -0,0 +1,7 @@ +use crate::parser::hir; +use derive_new::new; + +#[derive(new, Debug, Eq, PartialEq)] +pub(crate) struct Command { + pub(crate) args: hir::Call, +} diff --git a/src/commands/classified/external.rs b/src/commands/classified/external.rs new file mode 100644 index 0000000000..653e205639 --- /dev/null +++ b/src/commands/classified/external.rs @@ -0,0 +1,224 @@ +use crate::prelude::*; +use bytes::{BufMut, BytesMut}; +use futures::stream::StreamExt; +use futures_codec::{Decoder, Encoder, Framed}; +use itertools::Itertools; +use log::trace; +use std::fmt; +use std::io::{Error, ErrorKind}; +use subprocess::Exec; + +use super::ClassifiedInputStream; + +/// A simple `Codec` implementation that splits up data into lines. +pub struct LinesCodec {} + +impl Encoder for LinesCodec { + type Item = String; + type Error = Error; + + fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { + dst.put(item); + Ok(()) + } +} + +impl Decoder for LinesCodec { + type Item = String; + type Error = Error; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + match src.iter().position(|b| b == &b'\n') { + Some(pos) if !src.is_empty() => { + let buf = src.split_to(pos + 1); + String::from_utf8(buf.to_vec()) + .map(Some) + .map_err(|e| Error::new(ErrorKind::InvalidData, e)) + } + _ if !src.is_empty() => { + let drained = src.take(); + String::from_utf8(drained.to_vec()) + .map(Some) + .map_err(|e| Error::new(ErrorKind::InvalidData, e)) + } + _ => Ok(None), + } + } +} + +#[derive(Debug, Clone, Eq, PartialEq)] +pub(crate) struct Command { + pub(crate) name: String, + + pub(crate) name_tag: Tag, + pub(crate) args: Spanned>>, +} + +impl FormatDebug for Command { + fn fmt_debug(&self, f: &mut DebugFormatter, source: &str) -> fmt::Result { + write!(f, "{}", self.name)?; + + if self.args.item.len() > 0 { + write!(f, " ")?; + write!(f, "{}", self.args.iter().map(|i| i.debug(source)).join(" "))?; + } + + Ok(()) + } +} + +impl HasSpan for Command { + fn span(&self) -> Span { + self.name_tag.span.until(self.args.span) + } +} + +#[derive(Debug)] +pub(crate) enum StreamNext { + Last, + External, + Internal, +} + +impl Command { + pub(crate) async fn run( + self, + context: &mut Context, + input: ClassifiedInputStream, + stream_next: StreamNext, + ) -> Result { + let stdin = input.stdin; + let inputs: Vec> = input.objects.into_vec().await; + + trace!(target: "nu::run::external", "-> {}", self.name); + trace!(target: "nu::run::external", "inputs = {:?}", inputs); + + let mut arg_string = format!("{}", self.name); + for arg in &self.args.item { + arg_string.push_str(&arg); + } + + trace!(target: "nu::run::external", "command = {:?}", self.name); + + let mut process; + if arg_string.contains("$it") { + let input_strings = inputs + .iter() + .map(|i| { + i.as_string().map_err(|_| { + let arg = self.args.iter().find(|arg| arg.item.contains("$it")); + if let Some(arg) = arg { + ShellError::labeled_error( + "External $it needs string data", + "given row instead of string data", + arg.tag(), + ) + } else { + ShellError::labeled_error( + "$it needs string data", + "given something else", + self.name_tag.clone(), + ) + } + }) + }) + .collect::, ShellError>>()?; + + let commands = input_strings.iter().map(|i| { + let args = self.args.iter().filter_map(|arg| { + if arg.chars().all(|c| c.is_whitespace()) { + None + } else { + Some(arg.replace("$it", &i)) + } + }); + + format!("{} {}", self.name, itertools::join(args, " ")) + }); + + process = Exec::shell(itertools::join(commands, " && ")) + } else { + process = Exec::cmd(&self.name); + for arg in &self.args.item { + let arg_chars: Vec<_> = arg.chars().collect(); + if arg_chars.len() > 1 + && arg_chars[0] == '"' + && arg_chars[arg_chars.len() - 1] == '"' + { + // quoted string + let new_arg: String = arg_chars[1..arg_chars.len() - 1].iter().collect(); + process = process.arg(new_arg); + } else { + process = process.arg(arg.item.clone()); + } + } + } + + process = process.cwd(context.shell_manager.path()); + + trace!(target: "nu::run::external", "cwd = {:?}", context.shell_manager.path()); + + let mut process = match stream_next { + StreamNext::Last => process, + StreamNext::External | StreamNext::Internal => { + process.stdout(subprocess::Redirection::Pipe) + } + }; + + trace!(target: "nu::run::external", "set up stdout pipe"); + + if let Some(stdin) = stdin { + process = process.stdin(stdin); + } + + trace!(target: "nu::run::external", "set up stdin pipe"); + trace!(target: "nu::run::external", "built process {:?}", process); + + let popen = process.popen(); + + trace!(target: "nu::run::external", "next = {:?}", stream_next); + + let name_tag = self.name_tag.clone(); + if let Ok(mut popen) = popen { + match stream_next { + StreamNext::Last => { + let _ = popen.detach(); + loop { + match popen.poll() { + None => { + let _ = std::thread::sleep(std::time::Duration::new(0, 100000000)); + } + _ => { + let _ = popen.terminate(); + break; + } + } + } + Ok(ClassifiedInputStream::new()) + } + StreamNext::External => { + let _ = popen.detach(); + let stdout = popen.stdout.take().unwrap(); + Ok(ClassifiedInputStream::from_stdout(stdout)) + } + StreamNext::Internal => { + let _ = popen.detach(); + let stdout = popen.stdout.take().unwrap(); + let file = futures::io::AllowStdIo::new(stdout); + let stream = Framed::new(file, LinesCodec {}); + let stream = + stream.map(move |line| Value::string(line.unwrap()).tagged(&name_tag)); + Ok(ClassifiedInputStream::from_input_stream( + stream.boxed() as BoxStream<'static, Tagged> + )) + } + } + } else { + return Err(ShellError::labeled_error( + "Command not found", + "command not found", + name_tag, + )); + } + } +} diff --git a/src/commands/classified/internal.rs b/src/commands/classified/internal.rs new file mode 100644 index 0000000000..51f14e8f5a --- /dev/null +++ b/src/commands/classified/internal.rs @@ -0,0 +1,151 @@ +use crate::parser::hir; +use crate::prelude::*; +use derive_new::new; +use log::{log_enabled, trace}; +use std::fmt; + +use super::ClassifiedInputStream; + +#[derive(new, Debug, Clone, Eq, PartialEq)] +pub(crate) struct Command { + pub(crate) name: String, + pub(crate) name_tag: Tag, + pub(crate) args: Spanned, +} + +impl HasSpan for Command { + fn span(&self) -> Span { + let start = self.name_tag.span; + + start.until(self.args.span) + } +} + +impl FormatDebug for Command { + fn fmt_debug(&self, f: &mut DebugFormatter, source: &str) -> fmt::Result { + f.say("internal", self.args.debug(source)) + } +} + +impl Command { + pub(crate) fn run( + self, + context: &mut Context, + input: ClassifiedInputStream, + source: Text, + ) -> Result { + if log_enabled!(log::Level::Trace) { + trace!(target: "nu::run::internal", "->"); + trace!(target: "nu::run::internal", "{}", self.name); + trace!(target: "nu::run::internal", "{}", self.args.debug(&source)); + } + + let objects: InputStream = trace_stream!(target: "nu::trace_stream::internal", source: source, "input" = input.objects); + + let command = context.expect_command(&self.name); + + let result = { + context.run_command( + command, + self.name_tag.clone(), + self.args.item, + &source, + objects, + ) + }; + + let result = trace_out_stream!(target: "nu::trace_stream::internal", source: source, "output" = result); + let mut result = result.values; + let mut context = context.clone(); + + let stream = async_stream! { + let mut soft_errs: Vec = vec![]; + let mut yielded = false; + + while let Some(item) = result.next().await { + match item { + Ok(ReturnSuccess::Action(action)) => match action { + CommandAction::ChangePath(path) => { + context.shell_manager.set_path(path); + } + CommandAction::Exit => std::process::exit(0), // TODO: save history.txt + CommandAction::Error(err) => { + context.error(err); + break; + } + CommandAction::EnterHelpShell(value) => { + match value { + Tagged { + item: Value::Primitive(Primitive::String(cmd)), + tag, + } => { + context.shell_manager.insert_at_current(Box::new( + HelpShell::for_command( + Value::string(cmd).tagged(tag), + &context.registry(), + ).unwrap(), + )); + } + _ => { + context.shell_manager.insert_at_current(Box::new( + HelpShell::index(&context.registry()).unwrap(), + )); + } + } + } + CommandAction::EnterValueShell(value) => { + context + .shell_manager + .insert_at_current(Box::new(ValueShell::new(value))); + } + CommandAction::EnterShell(location) => { + context.shell_manager.insert_at_current(Box::new( + FilesystemShell::with_location(location, context.registry().clone()).unwrap(), + )); + } + CommandAction::PreviousShell => { + context.shell_manager.prev(); + } + CommandAction::NextShell => { + context.shell_manager.next(); + } + CommandAction::LeaveShell => { + context.shell_manager.remove_at_current(); + if context.shell_manager.is_empty() { + std::process::exit(0); // TODO: save history.txt + } + } + }, + + Ok(ReturnSuccess::Value(v)) => { + yielded = true; + yield Ok(v); + } + + Ok(ReturnSuccess::DebugValue(v)) => { + yielded = true; + + let doc = v.item.pretty_doc(); + let mut buffer = termcolor::Buffer::ansi(); + + doc.render_raw( + context.with_host(|host| host.width() - 5), + &mut crate::parser::debug::TermColored::new(&mut buffer), + ).unwrap(); + + let value = String::from_utf8_lossy(buffer.as_slice()); + + yield Ok(Value::string(value).tagged_unknown()) + } + + Err(err) => { + context.error(err); + break; + } + } + } + }; + + Ok(stream.to_input_stream()) + } +} diff --git a/src/commands/classified/mod.rs b/src/commands/classified/mod.rs new file mode 100644 index 0000000000..1675e113db --- /dev/null +++ b/src/commands/classified/mod.rs @@ -0,0 +1,74 @@ +use crate::parser::{hir, TokenNode}; +use crate::prelude::*; +use std::fmt; + +mod dynamic; +mod external; +mod internal; +mod pipeline; + +#[allow(unused_imports)] +pub(crate) use dynamic::Command as DynamicCommand; +pub(crate) use external::{Command as ExternalCommand, StreamNext}; +pub(crate) use internal::Command as InternalCommand; +pub(crate) use pipeline::Pipeline as ClassifiedPipeline; + +pub(crate) struct ClassifiedInputStream { + pub(crate) objects: InputStream, + pub(crate) stdin: Option, +} + +impl ClassifiedInputStream { + pub(crate) fn new() -> ClassifiedInputStream { + ClassifiedInputStream { + objects: vec![Value::nothing().tagged(Tag::unknown())].into(), + stdin: None, + } + } + + pub(crate) fn from_input_stream(stream: impl Into) -> ClassifiedInputStream { + ClassifiedInputStream { + objects: stream.into(), + stdin: None, + } + } + + pub(crate) fn from_stdout(stdout: std::fs::File) -> ClassifiedInputStream { + ClassifiedInputStream { + objects: VecDeque::new().into(), + stdin: Some(stdout), + } + } +} + +#[derive(Debug, Clone, Eq, PartialEq)] +pub(crate) enum ClassifiedCommand { + #[allow(unused)] + Expr(TokenNode), + Internal(InternalCommand), + #[allow(unused)] + Dynamic(Spanned), + External(ExternalCommand), +} + +impl FormatDebug for ClassifiedCommand { + fn fmt_debug(&self, f: &mut DebugFormatter, source: &str) -> fmt::Result { + match self { + ClassifiedCommand::Expr(expr) => expr.fmt_debug(f, source), + ClassifiedCommand::Internal(internal) => internal.fmt_debug(f, source), + ClassifiedCommand::Dynamic(dynamic) => dynamic.fmt_debug(f, source), + ClassifiedCommand::External(external) => external.fmt_debug(f, source), + } + } +} + +impl HasSpan for ClassifiedCommand { + fn span(&self) -> Span { + match self { + ClassifiedCommand::Expr(node) => node.span(), + ClassifiedCommand::Internal(command) => command.span(), + ClassifiedCommand::Dynamic(call) => call.span, + ClassifiedCommand::External(command) => command.span(), + } + } +} diff --git a/src/commands/classified/pipeline.rs b/src/commands/classified/pipeline.rs new file mode 100644 index 0000000000..3c78705d57 --- /dev/null +++ b/src/commands/classified/pipeline.rs @@ -0,0 +1,24 @@ +use crate::prelude::*; +use std::fmt; + +use super::ClassifiedCommand; + +#[derive(Debug, Clone)] +pub(crate) struct Pipeline { + pub(crate) commands: Spanned>, +} + +impl FormatDebug for Pipeline { + fn fmt_debug(&self, f: &mut DebugFormatter, source: &str) -> fmt::Result { + f.say_str( + "classified pipeline", + self.commands.iter().map(|c| c.debug(source)).join(" | "), + ) + } +} + +impl HasSpan for Pipeline { + fn span(&self) -> Span { + self.commands.span + } +} From 71e7eb7cfc5d572e821476a341d8e936fcafd0bb Mon Sep 17 00:00:00 2001 From: Jason Gedge Date: Sun, 24 Nov 2019 18:16:45 -0500 Subject: [PATCH 2/4] Move all pipeline execution code from cli to classified::pipeline --- src/cli.rs | 108 ++-------------------------- src/commands/classified/external.rs | 3 +- src/commands/classified/mod.rs | 1 + src/commands/classified/pipeline.rs | 71 +++++++++++++++++- 4 files changed, 76 insertions(+), 107 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index 64d562f97e..a6720106b7 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -1,6 +1,5 @@ use crate::commands::classified::{ ClassifiedCommand, ClassifiedInputStream, ClassifiedPipeline, ExternalCommand, InternalCommand, - StreamNext, }; use crate::commands::plugin::JsonRpc; use crate::commands::plugin::{PluginCommand, PluginSink}; @@ -600,114 +599,17 @@ async fn process_line(readline: Result, ctx: &mut Context })), } - let mut input = ClassifiedInputStream::new(); - let mut iter = pipeline.commands.item.into_iter().peekable(); - // Check the config to see if we need to update the path // TODO: make sure config is cached so we don't path this load every call set_env_from_config(); - loop { - let item: Option = iter.next(); - let next: Option<&ClassifiedCommand> = iter.peek(); - - input = match (item, next) { - (None, _) => break, - - (Some(ClassifiedCommand::Dynamic(_)), _) - | (_, Some(ClassifiedCommand::Dynamic(_))) => { - return LineResult::Error( - line.to_string(), - ShellError::unimplemented("Dynamic commands"), - ) - } - - (Some(ClassifiedCommand::Expr(_)), _) => { - return LineResult::Error( - line.to_string(), - ShellError::unimplemented("Expression-only commands"), - ) - } - - (_, Some(ClassifiedCommand::Expr(_))) => { - return LineResult::Error( - line.to_string(), - ShellError::unimplemented("Expression-only commands"), - ) - } - - ( - Some(ClassifiedCommand::Internal(left)), - Some(ClassifiedCommand::External(_)), - ) => match left.run(ctx, input, Text::from(line)) { - Ok(val) => ClassifiedInputStream::from_input_stream(val), - Err(err) => return LineResult::Error(line.to_string(), err), - }, - - (Some(ClassifiedCommand::Internal(left)), Some(_)) => { - match left.run(ctx, input, Text::from(line)) { - Ok(val) => ClassifiedInputStream::from_input_stream(val), - Err(err) => return LineResult::Error(line.to_string(), err), - } - } - - (Some(ClassifiedCommand::Internal(left)), None) => { - match left.run(ctx, input, Text::from(line)) { - Ok(val) => { - use futures::stream::TryStreamExt; - - let mut output_stream: OutputStream = val.into(); - loop { - match output_stream.try_next().await { - Ok(Some(ReturnSuccess::Value(Tagged { - item: Value::Error(e), - .. - }))) => { - return LineResult::Error(line.to_string(), e); - } - Ok(Some(_item)) => { - if ctx.ctrl_c.load(Ordering::SeqCst) { - break; - } - } - _ => { - break; - } - } - } - - return LineResult::Success(line.to_string()); - } - Err(err) => return LineResult::Error(line.to_string(), err), - } - } - - ( - Some(ClassifiedCommand::External(left)), - Some(ClassifiedCommand::External(_)), - ) => match left.run(ctx, input, StreamNext::External).await { - Ok(val) => val, - Err(err) => return LineResult::Error(line.to_string(), err), - }, - - (Some(ClassifiedCommand::External(left)), Some(_)) => { - match left.run(ctx, input, StreamNext::Internal).await { - Ok(val) => val, - Err(err) => return LineResult::Error(line.to_string(), err), - } - } - - (Some(ClassifiedCommand::External(left)), None) => { - match left.run(ctx, input, StreamNext::Last).await { - Ok(val) => val, - Err(err) => return LineResult::Error(line.to_string(), err), - } - } - }; + let input = ClassifiedInputStream::new(); + match pipeline.run(ctx, input, line).await { + Ok(_) => LineResult::Success(line.to_string()), + Err(err) => LineResult::Error(line.to_string(), err), } - - LineResult::Success(line.to_string()) } + Err(ReadlineError::Interrupted) => LineResult::CtrlC, Err(ReadlineError::Eof) => LineResult::Break, Err(err) => { diff --git a/src/commands/classified/external.rs b/src/commands/classified/external.rs index 653e205639..c6dfe6c1f8 100644 --- a/src/commands/classified/external.rs +++ b/src/commands/classified/external.rs @@ -1,3 +1,4 @@ +use super::ClassifiedInputStream; use crate::prelude::*; use bytes::{BufMut, BytesMut}; use futures::stream::StreamExt; @@ -8,8 +9,6 @@ use std::fmt; use std::io::{Error, ErrorKind}; use subprocess::Exec; -use super::ClassifiedInputStream; - /// A simple `Codec` implementation that splits up data into lines. pub struct LinesCodec {} diff --git a/src/commands/classified/mod.rs b/src/commands/classified/mod.rs index 1675e113db..b0b320823f 100644 --- a/src/commands/classified/mod.rs +++ b/src/commands/classified/mod.rs @@ -9,6 +9,7 @@ mod pipeline; #[allow(unused_imports)] pub(crate) use dynamic::Command as DynamicCommand; +#[allow(unused_imports)] pub(crate) use external::{Command as ExternalCommand, StreamNext}; pub(crate) use internal::Command as InternalCommand; pub(crate) use pipeline::Pipeline as ClassifiedPipeline; diff --git a/src/commands/classified/pipeline.rs b/src/commands/classified/pipeline.rs index 3c78705d57..037ba5c122 100644 --- a/src/commands/classified/pipeline.rs +++ b/src/commands/classified/pipeline.rs @@ -1,7 +1,7 @@ +use super::{ClassifiedCommand, ClassifiedInputStream, StreamNext}; use crate::prelude::*; use std::fmt; - -use super::ClassifiedCommand; +use std::sync::atomic::Ordering; #[derive(Debug, Clone)] pub(crate) struct Pipeline { @@ -22,3 +22,70 @@ impl HasSpan for Pipeline { self.commands.span } } + +impl Pipeline { + pub(crate) async fn run( + self, + ctx: &mut Context, + mut input: ClassifiedInputStream, + line: &str, + ) -> Result<(), ShellError> { + let mut iter = self.commands.item.into_iter().peekable(); + + loop { + let item: Option = iter.next(); + let next: Option<&ClassifiedCommand> = iter.peek(); + + input = match (item, next) { + (Some(ClassifiedCommand::Dynamic(_)), _) + | (_, Some(ClassifiedCommand::Dynamic(_))) => { + return Err(ShellError::unimplemented("Dynamic commands")) + } + + (Some(ClassifiedCommand::Expr(_)), _) | (_, Some(ClassifiedCommand::Expr(_))) => { + return Err(ShellError::unimplemented("Expression-only commands")) + } + + (Some(ClassifiedCommand::Internal(left)), _) => { + let stream = left.run(ctx, input, Text::from(line))?; + ClassifiedInputStream::from_input_stream(stream) + } + + (Some(ClassifiedCommand::External(left)), Some(ClassifiedCommand::External(_))) => { + left.run(ctx, input, StreamNext::External).await? + } + + (Some(ClassifiedCommand::External(left)), Some(_)) => { + left.run(ctx, input, StreamNext::Internal).await? + } + + (Some(ClassifiedCommand::External(left)), None) => { + left.run(ctx, input, StreamNext::Last).await? + } + + (None, _) => break, + }; + } + + use futures::stream::TryStreamExt; + let mut output_stream: OutputStream = input.objects.into(); + loop { + match output_stream.try_next().await { + Ok(Some(ReturnSuccess::Value(Tagged { + item: Value::Error(e), + .. + }))) => return Err(e), + Ok(Some(_item)) => { + if ctx.ctrl_c.load(Ordering::SeqCst) { + break; + } + } + _ => { + break; + } + } + } + + Ok(()) + } +} From fbc6f01cfb566c41b674492724ef9a60b882bae3 Mon Sep 17 00:00:00 2001 From: Belhorma Bendebiche Date: Thu, 21 Nov 2019 12:18:00 -0500 Subject: [PATCH 3/4] Add `=~` and `!~` operators on strings `left =~ right` return true if left contains right, using Rust's `String::contains`. `!~` is the negated version. A new `apply_operator` function is added which decouples evaluation from `Value::compare`. This returns a `Value` and opens the door to implementing `+` for example, though it wouldn't be useful immediately. The `operator!` macro had to be changed slightly as it would choke on `~` in arguments. --- src/evaluate/evaluator.rs | 5 +- src/evaluate/mod.rs | 1 + src/evaluate/operator.rs | 33 +++++++++++ src/parser/parse/operator.rs | 6 ++ src/parser/parse/parser.rs | 30 +++++++--- tests/filter_where_tests.rs | 112 +++++++++++++++++++++++++++++++++++ 6 files changed, 176 insertions(+), 11 deletions(-) create mode 100644 src/evaluate/operator.rs create mode 100644 tests/filter_where_tests.rs diff --git a/src/evaluate/evaluator.rs b/src/evaluate/evaluator.rs index 1930b64c82..8c5f23f1a5 100644 --- a/src/evaluate/evaluator.rs +++ b/src/evaluate/evaluator.rs @@ -1,5 +1,6 @@ use crate::data::base::Block; use crate::errors::ArgumentError; +use crate::evaluate::operator::apply_operator; use crate::parser::hir::path::{ColumnPath, RawPathMember}; use crate::parser::{ hir::{self, Expression, RawExpression}, @@ -79,8 +80,8 @@ pub(crate) fn evaluate_baseline_expr( trace!("left={:?} right={:?}", left.item, right.item); - match left.compare(binary.op(), &*right) { - Ok(result) => Ok(Value::boolean(result).tagged(tag)), + match apply_operator(binary.op(), &*left, &*right) { + Ok(result) => Ok(result.tagged(tag)), Err((left_type, right_type)) => Err(ShellError::coerce_error( left_type.spanned(binary.left().span), right_type.spanned(binary.right().span), diff --git a/src/evaluate/mod.rs b/src/evaluate/mod.rs index 21a3b369d8..f8133808e0 100644 --- a/src/evaluate/mod.rs +++ b/src/evaluate/mod.rs @@ -1,3 +1,4 @@ pub(crate) mod evaluator; +pub(crate) mod operator; pub(crate) use evaluator::{evaluate_baseline_expr, Scope}; diff --git a/src/evaluate/operator.rs b/src/evaluate/operator.rs new file mode 100644 index 0000000000..d73e122bc9 --- /dev/null +++ b/src/evaluate/operator.rs @@ -0,0 +1,33 @@ +use crate::data::Primitive; +use crate::data::Value; +use crate::parser::Operator; +use crate::traits::ShellTypeName; +use std::ops::Not; + +pub fn apply_operator( + op: &Operator, + left: &Value, + right: &Value, +) -> Result { + match *op { + Operator::Equal + | Operator::NotEqual + | Operator::LessThan + | Operator::GreaterThan + | Operator::LessThanOrEqual + | Operator::GreaterThanOrEqual => left.compare(op, right).map(Value::boolean), + Operator::Dot => Ok(Value::boolean(false)), + Operator::Contains => contains(left, right).map(Value::boolean), + Operator::NotContains => contains(left, right).map(Not::not).map(Value::boolean), + } +} + +fn contains(left: &Value, right: &Value) -> Result { + if let (Value::Primitive(Primitive::String(l)), Value::Primitive(Primitive::String(r))) = + (left, right) + { + Ok(l.contains(r)) + } else { + Err((left.type_name(), right.type_name())) + } +} diff --git a/src/parser/parse/operator.rs b/src/parser/parse/operator.rs index 47c63075af..0a596e5897 100644 --- a/src/parser/parse/operator.rs +++ b/src/parser/parse/operator.rs @@ -12,6 +12,8 @@ pub enum Operator { LessThanOrEqual, GreaterThanOrEqual, Dot, + Contains, + NotContains, } impl FormatDebug for Operator { @@ -34,6 +36,8 @@ impl Operator { Operator::LessThanOrEqual => "<=", Operator::GreaterThanOrEqual => ">=", Operator::Dot => ".", + Operator::Contains => "=~", + Operator::NotContains => "!~", } } } @@ -55,6 +59,8 @@ impl FromStr for Operator { "<=" => Ok(Operator::LessThanOrEqual), ">=" => Ok(Operator::GreaterThanOrEqual), "." => Ok(Operator::Dot), + "=~" => Ok(Operator::Contains), + "!~" => Ok(Operator::NotContains), _ => Err(()), } } diff --git a/src/parser/parse/parser.rs b/src/parser/parse/parser.rs index e5fe60559a..c352cc4cf9 100644 --- a/src/parser/parse/parser.rs +++ b/src/parser/parse/parser.rs @@ -59,7 +59,7 @@ macro_rules! operator { #[tracable_parser] pub fn $name(input: NomSpan) -> IResult { let start = input.offset; - let (input, tag) = tag(stringify!($token))(input)?; + let (input, tag) = tag($token)(input)?; let end = input.offset; Ok(( @@ -70,13 +70,15 @@ macro_rules! operator { }; } -operator! { gt: > } -operator! { lt: < } -operator! { gte: >= } -operator! { lte: <= } -operator! { eq: == } -operator! { neq: != } -operator! { dot: . } +operator! { gt: ">" } +operator! { lt: "<" } +operator! { gte: ">=" } +operator! { lte: "<=" } +operator! { eq: "==" } +operator! { neq: "!=" } +operator! { dot: "." } +operator! { cont: "=~" } +operator! { ncont: "!~" } #[derive(Debug, Clone, Eq, PartialEq, Hash, Ord, PartialOrd, Serialize, Deserialize)] pub enum Number { @@ -228,7 +230,7 @@ pub fn raw_number(input: NomSpan) -> IResult> { #[tracable_parser] pub fn operator(input: NomSpan) -> IResult { - let (input, operator) = alt((gte, lte, neq, gt, lt, eq))(input)?; + let (input, operator) = alt((gte, lte, neq, gt, lt, eq, cont, ncont))(input)?; Ok((input, operator)) } @@ -830,6 +832,16 @@ mod tests { "!=" -> b::token_list(vec![b::op("!=")]) } + + equal_tokens! { + + "=~" -> b::token_list(vec![b::op("=~")]) + } + + equal_tokens! { + + "!~" -> b::token_list(vec![b::op("!~")]) + } } #[test] diff --git a/tests/filter_where_tests.rs b/tests/filter_where_tests.rs new file mode 100644 index 0000000000..e802607d85 --- /dev/null +++ b/tests/filter_where_tests.rs @@ -0,0 +1,112 @@ +mod helpers; + +use helpers as h; + +#[test] +fn test_compare() { + let actual = nu!( + cwd: "tests/fixtures/formats", h::pipeline( + r#" + open sample.db + | where table_name == ints + | get table_values + | first 4 + | where z > 4200 + | get z + | echo $it + "# + )); + + assert_eq!(actual, "4253"); + + let actual = nu!( + cwd: "tests/fixtures/formats", h::pipeline( + r#" + open sample.db + | where table_name == ints + | get table_values + | first 4 + | where z >= 4253 + | get z + | echo $it + "# + )); + + assert_eq!(actual, "4253"); + + let actual = nu!( + cwd: "tests/fixtures/formats", h::pipeline( + r#" + open sample.db + | where table_name == ints + | get table_values + | first 4 + | where z < 10 + | get z + | echo $it + "# + )); + + assert_eq!(actual, "1"); + + let actual = nu!( + cwd: "tests/fixtures/formats", h::pipeline( + r#" + open sample.db + | where table_name == ints + | get table_values + | first 4 + | where z <= 1 + | get z + | echo $it + "# + )); + + assert_eq!(actual, "1"); + + let actual = nu!( + cwd: "tests/fixtures/formats", h::pipeline( + r#" + open sample.db + | where table_name == ints + | get table_values + | where z != 1 + | first 1 + | get z + | echo $it + "# + )); + + assert_eq!(actual, "42"); +} + +#[test] +fn test_contains() { + let actual = nu!( + cwd: "tests/fixtures/formats", h::pipeline( + r#" + open sample.db + | where table_name == strings + | get table_values + | where x =~ ell + | count + | echo $it + "# + )); + + assert_eq!(actual, "4"); + + let actual = nu!( + cwd: "tests/fixtures/formats", h::pipeline( + r#" + open sample.db + | where table_name == strings + | get table_values + | where x !~ ell + | count + | echo $it + "# + )); + + assert_eq!(actual, "2"); +} From d320ffe742b9a8bb418845947685579d6d8aff2e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9s=20N=2E=20Robalino?= Date: Mon, 25 Nov 2019 17:10:09 -0500 Subject: [PATCH 4/4] nth can select more than one row at a time. --- README.md | 2 +- src/commands/nth.rs | 50 ++++++++++++++++++++++++++++++++---------- tests/commands_test.rs | 39 ++++++++++++++++++++++++++++++++ 3 files changed, 79 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index 5baa3e5ce0..1e804d22c1 100644 --- a/README.md +++ b/README.md @@ -281,7 +281,7 @@ Nu adheres closely to a set of goals that make up its design philosophy. As feat | inc (column-or-column-path) | Increment a value or version. Optionally use the column of a table | | insert column-or-column-path value | Insert a new column to the table | | last amount | Show only the last number of rows | -| nth row-number | Return only the selected row | +| nth ...row-numbers | Return only the selected rows | | pick ...columns | Down-select table to only these columns | | pivot --header-row | Pivot the tables, making columns into rows and vice versa | | prepend row-data | Prepend a row to the beginning of the table | diff --git a/src/commands/nth.rs b/src/commands/nth.rs index bcd3057879..6e495b25aa 100644 --- a/src/commands/nth.rs +++ b/src/commands/nth.rs @@ -5,7 +5,8 @@ use crate::prelude::*; #[derive(Deserialize)] struct NthArgs { - amount: Tagged, + row_number: Tagged, + rest: Vec>, } pub struct Nth; @@ -16,15 +17,17 @@ impl WholeStreamCommand for Nth { } fn signature(&self) -> Signature { - Signature::build("nth").required( - "row number", - SyntaxShape::Any, - "the number of the row to return", - ) + Signature::build("nth") + .required( + "row number", + SyntaxShape::Any, + "the number of the row to return", + ) + .rest(SyntaxShape::Any, "Optionally return more rows") } fn usage(&self) -> &str { - "Return only the selected row" + "Return only the selected rows" } fn run( @@ -37,10 +40,35 @@ impl WholeStreamCommand for Nth { } fn nth( - NthArgs { amount }: NthArgs, + NthArgs { + row_number, + rest: and_rows, + }: NthArgs, RunnableContext { input, .. }: RunnableContext, ) -> Result { - Ok(OutputStream::from_input( - input.values.skip(amount.item as u64).take(1), - )) + let stream = input + .values + .enumerate() + .map(move |(idx, item)| { + let row_number = vec![row_number.clone()]; + + let row_numbers = vec![&row_number, &and_rows] + .into_iter() + .flatten() + .collect::>>(); + + let mut result = VecDeque::new(); + + if row_numbers + .iter() + .any(|requested| requested.item == idx as u64) + { + result.push_back(ReturnSuccess::value(item.clone())); + } + + result + }) + .flatten(); + + Ok(stream.to_output_stream()) } diff --git a/tests/commands_test.rs b/tests/commands_test.rs index 59621539db..89ab430b36 100644 --- a/tests/commands_test.rs +++ b/tests/commands_test.rs @@ -3,6 +3,45 @@ mod helpers; use helpers as h; use helpers::{Playground, Stub::*}; +#[test] +fn nth_selects_a_row() { + Playground::setup("nth_test_1", |dirs, sandbox| { + sandbox.with_files(vec![EmptyFile("notes.txt"), EmptyFile("arepas.txt")]); + + let actual = nu!( + cwd: dirs.test(), h::pipeline( + r#" + ls + | sort-by name + | nth 0 + | get name + | echo $it + "# + )); + + assert_eq!(actual, "arepas.txt"); + }); +} + +#[test] +fn nth_selects_many_rows() { + Playground::setup("nth_test_2", |dirs, sandbox| { + sandbox.with_files(vec![EmptyFile("notes.txt"), EmptyFile("arepas.txt")]); + + let actual = nu!( + cwd: dirs.test(), h::pipeline( + r#" + ls + | get name + | nth 1 0 + | count + | echo $it + "# + )); + + assert_eq!(actual, "2"); + }); +} #[test] fn default_row_data_if_column_missing() { Playground::setup("default_test_1", |dirs, sandbox| {