Refactor classified.rs into separate modules.

Adds modules for internal, external, and dynamic commands, as well as
the pipeline functionality. These are exported as their old names from
the classified module so as to keep its "interface" the same.
This commit is contained in:
Jason Gedge 2019-11-24 17:19:12 -05:00 committed by Yehuda Katz
parent 8f9dd6516e
commit 4e9afd6698
6 changed files with 232 additions and 355 deletions

View file

@ -1,6 +1,6 @@
use crate::commands::classified::{ use crate::commands::classified::external::{run_external_command, StreamNext};
run_external_command, run_internal_command, ClassifiedInputStream, StreamNext, use crate::commands::classified::internal::run_internal_command;
}; use crate::commands::classified::ClassifiedInputStream;
use crate::commands::plugin::JsonRpc; use crate::commands::plugin::JsonRpc;
use crate::commands::plugin::{PluginCommand, PluginSink}; use crate::commands::plugin::{PluginCommand, PluginSink};
use crate::commands::whole_stream_command; use crate::commands::whole_stream_command;

View file

@ -1,15 +1,16 @@
use super::ClassifiedInputStream;
use crate::data::value;
use crate::prelude::*; use crate::prelude::*;
use bytes::{BufMut, BytesMut}; use bytes::{BufMut, BytesMut};
use futures::stream::StreamExt; use futures::stream::StreamExt;
use futures_codec::{Decoder, Encoder, Framed}; use futures_codec::{Decoder, Encoder, Framed};
use log::trace; use log::trace;
use nu_errors::ShellError; use nu_errors::ShellError;
use nu_parser::ExternalCommand;
use nu_protocol::Value; use nu_protocol::Value;
use std::io::{Error, ErrorKind}; use std::io::{Error, ErrorKind};
use subprocess::Exec; use subprocess::Exec;
use super::ClassifiedInputStream;
/// A simple `Codec` implementation that splits up data into lines. /// A simple `Codec` implementation that splits up data into lines.
pub struct LinesCodec {} pub struct LinesCodec {}
@ -46,36 +47,6 @@ impl Decoder for LinesCodec {
} }
} }
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct Command {
pub(crate) name: String,
pub(crate) name_tag: Tag,
pub(crate) args: ExternalArgs,
}
impl HasSpan for Command {
fn span(&self) -> Span {
self.name_tag.span.until(self.args.span)
}
}
impl PrettyDebug for Command {
fn pretty(&self) -> DebugDocBuilder {
b::typed(
"external command",
b::description(&self.name)
+ b::preceded(
b::space(),
b::intersperse(
self.args.iter().map(|a| b::primitive(format!("{}", a.arg))),
b::space(),
),
),
)
}
}
#[derive(Debug)] #[derive(Debug)]
pub(crate) enum StreamNext { pub(crate) enum StreamNext {
Last, Last,
@ -83,179 +54,140 @@ pub(crate) enum StreamNext {
Internal, Internal,
} }
impl Command { pub(crate) async fn run_external_command(
pub(crate) async fn run( command: ExternalCommand,
self, context: &mut Context,
context: &mut Context, input: ClassifiedInputStream,
input: ClassifiedInputStream, stream_next: StreamNext,
stream_next: StreamNext, ) -> Result<ClassifiedInputStream, ShellError> {
) -> Result<ClassifiedInputStream, ShellError> { let stdin = input.stdin;
let stdin = input.stdin; let inputs: Vec<Value> = input.objects.into_vec().await;
let inputs: Vec<Value> = input.objects.into_vec().await;
trace!(target: "nu::run::external", "-> {}", self.name); trace!(target: "nu::run::external", "-> {}", command.name);
trace!(target: "nu::run::external", "inputs = {:?}", inputs); trace!(target: "nu::run::external", "inputs = {:?}", inputs);
let mut arg_string = format!("{}", self.name); let mut arg_string = format!("{}", command.name);
for arg in &self.args.list { for arg in command.args.iter() {
arg_string.push_str(&arg); arg_string.push_str(&arg);
} }
trace!(target: "nu::run::external", "command = {:?}", self.name); trace!(target: "nu::run::external", "command = {:?}", command.name);
let mut process; let mut process;
if arg_string.contains("$it") { if arg_string.contains("$it") {
let input_strings = inputs let input_strings = inputs
.iter() .iter()
.map(|i| { .map(|i| {
i.as_string().map(|s| s.to_string()).map_err(|_| { i.as_string().map(|s| s.to_string()).map_err(|_| {
let arg = self.args.iter().find(|arg| arg.arg.contains("$it")); let arg = command.args.iter().find(|arg| arg.contains("$it"));
if let Some(arg) = arg { if let Some(arg) = arg {
ShellError::labeled_error( ShellError::labeled_error(
"External $it needs string data", "External $it needs string data",
"given row instead of string data", "given row instead of string data",
&arg.tag, &arg.tag,
) )
} else {
ShellError::labeled_error(
"$it needs string data",
"given something else",
self.name_tag.clone(),
)
}
})
})
.collect::<Result<Vec<String>, ShellError>>()?;
let commands = input_strings.iter().map(|i| {
let args = self.args.iter().filter_map(|arg| {
if arg.chars().all(|c| c.is_whitespace()) {
None
} else { } else {
Some(arg.replace("$it", &i)) ShellError::labeled_error(
"$it needs string data",
"given something else",
command.name_tag.clone(),
)
} }
}); })
})
.collect::<Result<Vec<String>, ShellError>>()?;
format!("{} {}", self.name, itertools::join(args, " ")) let commands = input_strings.iter().map(|i| {
let args = command.args.iter().filter_map(|arg| {
if arg.chars().all(|c| c.is_whitespace()) {
None
} else {
Some(arg.replace("$it", &i))
}
}); });
process = Exec::shell(itertools::join(commands, " && ")) format!("{} {}", command.name, itertools::join(args, " "))
} else { });
process = Exec::cmd(&self.name);
for arg in &self.args.list { process = Exec::shell(itertools::join(commands, " && "))
let arg_chars: Vec<_> = arg.chars().collect(); } else {
if arg_chars.len() > 1 process = Exec::cmd(&command.name);
&& arg_chars[0] == '"' for arg in command.args.iter() {
&& arg_chars[arg_chars.len() - 1] == '"' let arg_chars: Vec<_> = arg.chars().collect();
{ if arg_chars.len() > 1 && arg_chars[0] == '"' && arg_chars[arg_chars.len() - 1] == '"' {
// quoted string // quoted string
let new_arg: String = arg_chars[1..arg_chars.len() - 1].iter().collect(); let new_arg: String = arg_chars[1..arg_chars.len() - 1].iter().collect();
process = process.arg(new_arg); process = process.arg(new_arg);
} else { } else {
process = process.arg(arg.arg.clone()); process = process.arg(arg.arg.clone());
}
} }
} }
}
process = process.cwd(context.shell_manager.path()); process = process.cwd(context.shell_manager.path());
trace!(target: "nu::run::external", "cwd = {:?}", context.shell_manager.path()); trace!(target: "nu::run::external", "cwd = {:?}", context.shell_manager.path());
let mut process = match stream_next { let mut process = match stream_next {
StreamNext::Last => process, StreamNext::Last => process,
StreamNext::External | StreamNext::Internal => { StreamNext::External | StreamNext::Internal => {
process.stdout(subprocess::Redirection::Pipe) process.stdout(subprocess::Redirection::Pipe)
}
};
trace!(target: "nu::run::external", "set up stdout pipe");
if let Some(stdin) = stdin {
process = process.stdin(stdin);
} }
};
trace!(target: "nu::run::external", "set up stdin pipe"); trace!(target: "nu::run::external", "set up stdout pipe");
trace!(target: "nu::run::external", "built process {:?}", process);
let popen = process.popen(); if let Some(stdin) = stdin {
process = process.stdin(stdin);
}
trace!(target: "nu::run::external", "next = {:?}", stream_next); trace!(target: "nu::run::external", "set up stdin pipe");
trace!(target: "nu::run::external", "built process {:?}", process);
let name_tag = self.name_tag.clone(); let popen = process.popen();
if let Ok(mut popen) = popen {
match stream_next { trace!(target: "nu::run::external", "next = {:?}", stream_next);
StreamNext::Last => {
let _ = popen.detach(); let name_tag = command.name_tag.clone();
loop { if let Ok(mut popen) = popen {
match popen.poll() { match stream_next {
None => { StreamNext::Last => {
let _ = std::thread::sleep(std::time::Duration::new(0, 100000000)); let _ = popen.detach();
} loop {
_ => { match popen.poll() {
let _ = popen.terminate(); None => {
break; let _ = std::thread::sleep(std::time::Duration::new(0, 100000000));
} }
_ => {
let _ = popen.terminate();
break;
} }
} }
Ok(ClassifiedInputStream::new())
}
StreamNext::External => {
let _ = popen.detach();
let stdout = popen.stdout.take().unwrap();
Ok(ClassifiedInputStream::from_stdout(stdout))
}
StreamNext::Internal => {
let _ = popen.detach();
let stdout = popen.stdout.take().unwrap();
let file = futures::io::AllowStdIo::new(stdout);
let stream = Framed::new(file, LinesCodec {});
let stream =
stream.map(move |line| value::string(line.unwrap()).into_value(&name_tag));
Ok(ClassifiedInputStream::from_input_stream(
stream.boxed() as BoxStream<'static, Value>
))
} }
Ok(ClassifiedInputStream::new())
}
StreamNext::External => {
let _ = popen.detach();
let stdout = popen.stdout.take().unwrap();
Ok(ClassifiedInputStream::from_stdout(stdout))
}
StreamNext::Internal => {
let _ = popen.detach();
let stdout = popen.stdout.take().unwrap();
let file = futures::io::AllowStdIo::new(stdout);
let stream = Framed::new(file, LinesCodec {});
let stream =
stream.map(move |line| value::string(line.unwrap()).into_value(&name_tag));
Ok(ClassifiedInputStream::from_input_stream(
stream.boxed() as BoxStream<'static, Value>
))
} }
} else {
return Err(ShellError::labeled_error(
"Command not found",
"command not found",
name_tag,
));
} }
} } else {
} return Err(ShellError::labeled_error(
"Command not found",
#[derive(Debug, Clone, Eq, PartialEq)] "command not found",
pub struct ExternalArg { name_tag,
pub arg: String, ));
pub tag: Tag,
}
impl std::ops::Deref for ExternalArg {
type Target = str;
fn deref(&self) -> &str {
&self.arg
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct ExternalArgs {
pub list: Vec<ExternalArg>,
pub span: Span,
}
impl ExternalArgs {
pub fn iter(&self) -> impl Iterator<Item = &ExternalArg> {
self.list.iter()
}
}
impl std::ops::Deref for ExternalArgs {
type Target = [ExternalArg];
fn deref(&self) -> &[ExternalArg] {
&self.list
} }
} }

View file

@ -1,150 +1,129 @@
use crate::data::value;
use crate::prelude::*; use crate::prelude::*;
use derive_new::new;
use log::{log_enabled, trace}; use log::{log_enabled, trace};
use nu_errors::ShellError; use nu_errors::ShellError;
use nu_parser::hir; use nu_parser::InternalCommand;
use nu_protocol::{CommandAction, Primitive, ReturnSuccess, UntaggedValue, Value}; use nu_protocol::{CommandAction, Primitive, ReturnSuccess, UntaggedValue, Value};
use super::ClassifiedInputStream; use super::ClassifiedInputStream;
#[derive(new, Debug, Clone, Eq, PartialEq)] pub(crate) async fn run_internal_command(
pub struct Command { command: InternalCommand,
pub(crate) name: String, context: &mut Context,
pub(crate) name_tag: Tag, input: ClassifiedInputStream,
pub(crate) args: hir::Call, source: Text,
} ) -> Result<InputStream, ShellError> {
if log_enabled!(log::Level::Trace) {
impl HasSpan for Command { trace!(target: "nu::run::internal", "->");
fn span(&self) -> Span { trace!(target: "nu::run::internal", "{}", command.name);
let start = self.name_tag.span; trace!(target: "nu::run::internal", "{}", command.args.debug(&source));
start.until(self.args.span)
} }
}
impl PrettyDebugWithSource for Command { let objects: InputStream =
fn pretty_debug(&self, source: &str) -> DebugDocBuilder { trace_stream!(target: "nu::trace_stream::internal", "input" = input.objects);
b::typed(
"internal command", let internal_command = context.expect_command(&command.name);
b::description(&self.name) + b::space() + self.args.pretty_debug(source),
let result = {
context.run_command(
internal_command,
command.name_tag.clone(),
command.args,
&source,
objects,
) )
} };
}
impl Command { let result = trace_out_stream!(target: "nu::trace_stream::internal", "output" = result);
pub(crate) fn run( let mut result = result.values;
self, let mut context = context.clone();
context: &mut Context,
input: ClassifiedInputStream,
source: Text,
) -> Result<InputStream, ShellError> {
if log_enabled!(log::Level::Trace) {
trace!(target: "nu::run::internal", "->");
trace!(target: "nu::run::internal", "{}", self.name);
trace!(target: "nu::run::internal", "{}", self.args.debug(&source));
}
let objects: InputStream = let stream = async_stream! {
trace_stream!(target: "nu::trace_stream::internal", "input" = input.objects); let mut soft_errs: Vec<ShellError> = vec![];
let mut yielded = false;
let command = context.expect_command(&self.name); while let Some(item) = result.next().await {
match item {
let result = Ok(ReturnSuccess::Action(action)) => match action {
{ context.run_command(command, self.name_tag.clone(), self.args, &source, objects) }; CommandAction::ChangePath(path) => {
context.shell_manager.set_path(path);
let result = trace_out_stream!(target: "nu::trace_stream::internal", "output" = result);
let mut result = result.values;
let mut context = context.clone();
let stream = async_stream! {
let mut soft_errs: Vec<ShellError> = vec![];
let mut yielded = false;
while let Some(item) = result.next().await {
match item {
Ok(ReturnSuccess::Action(action)) => match action {
CommandAction::ChangePath(path) => {
context.shell_manager.set_path(path);
}
CommandAction::Exit => std::process::exit(0), // TODO: save history.txt
CommandAction::Error(err) => {
context.error(err);
break;
}
CommandAction::EnterHelpShell(value) => {
match value {
Value {
value: UntaggedValue::Primitive(Primitive::String(cmd)),
tag,
} => {
context.shell_manager.insert_at_current(Box::new(
HelpShell::for_command(
value::string(cmd).into_value(tag),
&context.registry(),
).unwrap(),
));
}
_ => {
context.shell_manager.insert_at_current(Box::new(
HelpShell::index(&context.registry()).unwrap(),
));
}
}
}
CommandAction::EnterValueShell(value) => {
context
.shell_manager
.insert_at_current(Box::new(ValueShell::new(value)));
}
CommandAction::EnterShell(location) => {
context.shell_manager.insert_at_current(Box::new(
FilesystemShell::with_location(location, context.registry().clone()).unwrap(),
));
}
CommandAction::PreviousShell => {
context.shell_manager.prev();
}
CommandAction::NextShell => {
context.shell_manager.next();
}
CommandAction::LeaveShell => {
context.shell_manager.remove_at_current();
if context.shell_manager.is_empty() {
std::process::exit(0); // TODO: save history.txt
}
}
},
Ok(ReturnSuccess::Value(v)) => {
yielded = true;
yield Ok(v);
} }
CommandAction::Exit => std::process::exit(0), // TODO: save history.txt
Ok(ReturnSuccess::DebugValue(v)) => { CommandAction::Error(err) => {
yielded = true;
let doc = PrettyDebug::pretty_doc(&v);
let mut buffer = termcolor::Buffer::ansi();
doc.render_raw(
context.with_host(|host| host.width() - 5),
&mut nu_source::TermColored::new(&mut buffer),
).unwrap();
let value = String::from_utf8_lossy(buffer.as_slice());
yield Ok(value::string(value).into_untagged_value())
}
Err(err) => {
context.error(err); context.error(err);
break; break;
} }
CommandAction::EnterHelpShell(value) => {
match value {
Value {
value: UntaggedValue::Primitive(Primitive::String(cmd)),
tag,
} => {
context.shell_manager.insert_at_current(Box::new(
HelpShell::for_command(
value::string(cmd).into_value(tag),
&context.registry(),
).unwrap(),
));
}
_ => {
context.shell_manager.insert_at_current(Box::new(
HelpShell::index(&context.registry()).unwrap(),
));
}
}
}
CommandAction::EnterValueShell(value) => {
context
.shell_manager
.insert_at_current(Box::new(ValueShell::new(value)));
}
CommandAction::EnterShell(location) => {
context.shell_manager.insert_at_current(Box::new(
FilesystemShell::with_location(location, context.registry().clone()).unwrap(),
));
}
CommandAction::PreviousShell => {
context.shell_manager.prev();
}
CommandAction::NextShell => {
context.shell_manager.next();
}
CommandAction::LeaveShell => {
context.shell_manager.remove_at_current();
if context.shell_manager.is_empty() {
std::process::exit(0); // TODO: save history.txt
}
}
},
Ok(ReturnSuccess::Value(v)) => {
yielded = true;
yield Ok(v);
}
Ok(ReturnSuccess::DebugValue(v)) => {
yielded = true;
let doc = PrettyDebug::pretty_doc(&v);
let mut buffer = termcolor::Buffer::ansi();
doc.render_raw(
context.with_host(|host| host.width() - 5),
&mut nu_source::TermColored::new(&mut buffer),
).unwrap();
let value = String::from_utf8_lossy(buffer.as_slice());
yield Ok(value::string(value).into_untagged_value())
}
Err(err) => {
context.error(err);
break;
} }
} }
}; }
};
Ok(stream.to_input_stream()) Ok(stream.to_input_stream())
}
} }

View file

@ -1,16 +1,12 @@
use crate::data::value;
use crate::prelude::*; use crate::prelude::*;
use nu_parser::{hir, TokenNode};
mod dynamic; mod dynamic;
mod external; pub(crate) mod external;
mod internal; pub(crate) mod internal;
mod pipeline;
#[allow(unused_imports)] #[allow(unused_imports)]
pub(crate) use dynamic::Command as DynamicCommand; pub(crate) use dynamic::Command as DynamicCommand;
#[allow(unused_imports)]
pub(crate) use external::{Command as ExternalCommand, ExternalArg, ExternalArgs, StreamNext};
pub(crate) use internal::Command as InternalCommand;
pub(crate) struct ClassifiedInputStream { pub(crate) struct ClassifiedInputStream {
pub(crate) objects: InputStream, pub(crate) objects: InputStream,
@ -20,7 +16,7 @@ pub(crate) struct ClassifiedInputStream {
impl ClassifiedInputStream { impl ClassifiedInputStream {
pub(crate) fn new() -> ClassifiedInputStream { pub(crate) fn new() -> ClassifiedInputStream {
ClassifiedInputStream { ClassifiedInputStream {
objects: vec![crate::data::value::nothing().into_untagged_value()].into(), objects: vec![value::nothing().into_value(Tag::unknown())].into(),
stdin: None, stdin: None,
} }
} }
@ -39,35 +35,3 @@ impl ClassifiedInputStream {
} }
} }
} }
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum ClassifiedCommand {
#[allow(unused)]
Expr(TokenNode),
#[allow(unused)]
Dynamic(hir::Call),
Internal(InternalCommand),
External(ExternalCommand),
}
impl PrettyDebugWithSource for ClassifiedCommand {
fn pretty_debug(&self, source: &str) -> DebugDocBuilder {
match self {
ClassifiedCommand::Expr(token) => b::typed("command", token.pretty_debug(source)),
ClassifiedCommand::Dynamic(call) => b::typed("command", call.pretty_debug(source)),
ClassifiedCommand::Internal(internal) => internal.pretty_debug(source),
ClassifiedCommand::External(external) => external.pretty_debug(source),
}
}
}
impl HasSpan for ClassifiedCommand {
fn span(&self) -> Span {
match self {
ClassifiedCommand::Expr(node) => node.span(),
ClassifiedCommand::Internal(command) => command.span(),
ClassifiedCommand::Dynamic(call) => call.span,
ClassifiedCommand::External(command) => command.span(),
}
}
}

View file

@ -14,7 +14,9 @@ pub fn apply_operator(
| Operator::LessThan | Operator::LessThan
| Operator::GreaterThan | Operator::GreaterThan
| Operator::LessThanOrEqual | Operator::LessThanOrEqual
| Operator::GreaterThanOrEqual => left.compare(op, right).map(value::boolean), | Operator::GreaterThanOrEqual => {
value::compare_values(op, left, right).map(value::boolean)
}
Operator::Dot => Ok(value::boolean(false)), Operator::Dot => Ok(value::boolean(false)),
Operator::Contains => contains(left, right).map(value::boolean), Operator::Contains => contains(left, right).map(value::boolean),
Operator::NotContains => contains(left, right).map(Not::not).map(value::boolean), Operator::NotContains => contains(left, right).map(Not::not).map(value::boolean),