Batch of moving commands off async_stream (#1916)

This commit is contained in:
Jonathan Turner 2020-05-30 11:36:04 +12:00 committed by GitHub
parent 3a6a3d7409
commit b84ff99e7f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 552 additions and 579 deletions

View file

@ -913,7 +913,9 @@ async fn process_line(
raw_input: line.to_string(), raw_input: line.to_string(),
}; };
if let Ok(mut output_stream) = crate::commands::autoview::autoview(context) { if let Ok(mut output_stream) =
crate::commands::autoview::autoview(context).await
{
loop { loop {
match output_stream.try_next().await { match output_stream.try_next().await {
Ok(Some(ReturnSuccess::Value(Value { Ok(Some(ReturnSuccess::Value(Value {

View file

@ -2,7 +2,7 @@ use crate::commands::WholeStreamCommand;
use crate::context::CommandRegistry; use crate::context::CommandRegistry;
use crate::prelude::*; use crate::prelude::*;
use nu_errors::ShellError; use nu_errors::ShellError;
use nu_protocol::{ReturnSuccess, Signature, SyntaxShape, UntaggedValue, Value}; use nu_protocol::{Signature, SyntaxShape, UntaggedValue, Value};
#[derive(Deserialize)] #[derive(Deserialize)]
struct AppendArgs { struct AppendArgs {
@ -34,7 +34,11 @@ impl WholeStreamCommand for Append {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
append(args, registry) let (AppendArgs { row }, input) = args.process(registry).await?;
let eos = futures::stream::iter(vec![row]);
Ok(input.chain(eos).to_output_stream())
} }
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {
@ -51,21 +55,6 @@ impl WholeStreamCommand for Append {
} }
} }
fn append(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let (AppendArgs { row }, mut input) = args.process(&registry).await?;
while let Some(item) = input.next().await {
yield ReturnSuccess::value(item);
}
yield ReturnSuccess::value(row);
};
Ok(stream.to_output_stream())
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::Append; use super::Append;

View file

@ -4,9 +4,12 @@ use crate::data::value::format_leaf;
use crate::prelude::*; use crate::prelude::*;
use nu_errors::ShellError; use nu_errors::ShellError;
use nu_protocol::{hir, hir::Expression, hir::Literal, hir::SpannedExpression}; use nu_protocol::{hir, hir::Expression, hir::Literal, hir::SpannedExpression};
use nu_protocol::{Primitive, ReturnSuccess, Scope, Signature, UntaggedValue, Value}; use nu_protocol::{Primitive, Scope, Signature, UntaggedValue, Value};
use prettytable::format::{FormatBuilder, LinePosition, LineSeparator};
use prettytable::{color, Attr, Cell, Row, Table};
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use textwrap::fill;
pub struct Autoview; pub struct Autoview;
@ -38,6 +41,7 @@ impl WholeStreamCommand for Autoview {
name: args.call_info.name_tag, name: args.call_info.name_tag,
raw_input: args.raw_input, raw_input: args.raw_input,
}) })
.await
} }
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {
@ -77,7 +81,7 @@ impl RunnableContextWithoutInput {
} }
} }
pub fn autoview(context: RunnableContext) -> Result<OutputStream, ShellError> { pub async fn autoview(context: RunnableContext) -> Result<OutputStream, ShellError> {
let binary = context.get_command("binaryview"); let binary = context.get_command("binaryview");
let text = context.get_command("textview"); let text = context.get_command("textview");
let table = context.get_command("table"); let table = context.get_command("table");
@ -101,11 +105,9 @@ pub fn autoview(context: RunnableContext) -> Result<OutputStream, ShellError> {
AutoPivotMode::Always AutoPivotMode::Always
}; };
Ok(OutputStream::new(async_stream! {
let (mut input_stream, context) = RunnableContextWithoutInput::convert(context); let (mut input_stream, context) = RunnableContextWithoutInput::convert(context);
match input_stream.next().await { if let Some(x) = input_stream.next().await {
Some(x) => {
match input_stream.next().await { match input_stream.next().await {
Some(y) => { Some(y) => {
let ctrl_c = context.ctrl_c.clone(); let ctrl_c = context.ctrl_c.clone();
@ -141,8 +143,11 @@ pub fn autoview(context: RunnableContext) -> Result<OutputStream, ShellError> {
} if anchor.is_some() => { } if anchor.is_some() => {
if let Some(text) = text { if let Some(text) = text {
let mut stream = VecDeque::new(); let mut stream = VecDeque::new();
stream.push_back(UntaggedValue::string(s).into_value(Tag { anchor, span })); stream.push_back(
let command_args = create_default_command_args(&context).with_input(stream); UntaggedValue::string(s).into_value(Tag { anchor, span }),
);
let command_args =
create_default_command_args(&context).with_input(stream);
let result = text.run(command_args, &context.registry).await; let result = text.run(command_args, &context.registry).await;
result.collect::<Vec<_>>().await; result.collect::<Vec<_>>().await;
} else { } else {
@ -161,8 +166,11 @@ pub fn autoview(context: RunnableContext) -> Result<OutputStream, ShellError> {
} if anchor.is_some() => { } if anchor.is_some() => {
if let Some(text) = text { if let Some(text) = text {
let mut stream = VecDeque::new(); let mut stream = VecDeque::new();
stream.push_back(UntaggedValue::string(s).into_value(Tag { anchor, span })); stream.push_back(
let command_args = create_default_command_args(&context).with_input(stream); UntaggedValue::string(s).into_value(Tag { anchor, span }),
);
let command_args =
create_default_command_args(&context).with_input(stream);
let result = text.run(command_args, &context.registry).await; let result = text.run(command_args, &context.registry).await;
result.collect::<Vec<_>>().await; result.collect::<Vec<_>>().await;
} else { } else {
@ -209,7 +217,7 @@ pub fn autoview(context: RunnableContext) -> Result<OutputStream, ShellError> {
out!("{}", b); out!("{}", b);
} }
Value { Value {
value: UntaggedValue::Primitive(Primitive::Duration(d)), value: UntaggedValue::Primitive(Primitive::Duration(_)),
.. ..
} => { } => {
let output = format_leaf(&x).plain_string(100_000); let output = format_leaf(&x).plain_string(100_000);
@ -229,11 +237,15 @@ pub fn autoview(context: RunnableContext) -> Result<OutputStream, ShellError> {
out!("{}", output); out!("{}", output);
} }
Value { value: UntaggedValue::Primitive(Primitive::Binary(ref b)), .. } => { Value {
value: UntaggedValue::Primitive(Primitive::Binary(ref b)),
..
} => {
if let Some(binary) = binary { if let Some(binary) = binary {
let mut stream = VecDeque::new(); let mut stream = VecDeque::new();
stream.push_back(x); stream.push_back(x);
let command_args = create_default_command_args(&context).with_input(stream); let command_args =
create_default_command_args(&context).with_input(stream);
let result = binary.run(command_args, &context.registry).await; let result = binary.run(command_args, &context.registry).await;
result.collect::<Vec<_>>().await; result.collect::<Vec<_>>().await;
} else { } else {
@ -242,24 +254,28 @@ pub fn autoview(context: RunnableContext) -> Result<OutputStream, ShellError> {
} }
} }
Value { value: UntaggedValue::Error(e), .. } => { Value {
yield Err(e); value: UntaggedValue::Error(e),
..
} => {
return Err(e);
} }
Value { value: UntaggedValue::Row(row), ..} Value {
if pivot_mode == AutoPivotMode::Always || value: UntaggedValue::Row(row),
(pivot_mode == AutoPivotMode::Auto && ..
(row.entries.iter().map(|(k,v)| v.convert_to_string()) } if pivot_mode == AutoPivotMode::Always
.collect::<Vec<_>>().iter() || (pivot_mode == AutoPivotMode::Auto
.fold(0, |acc, len| acc + len.len()) && (row
+ .entries
(row.entries.iter().map(|(k,_)| k.chars()).count() * 2)) .iter()
> textwrap::termwidth()) => { .map(|(_, v)| v.convert_to_string())
use prettytable::format::{FormatBuilder, LinePosition, LineSeparator}; .collect::<Vec<_>>()
use prettytable::{color, Attr, Cell, Row, Table}; .iter()
use crate::data::value::{format_leaf, style_leaf}; .fold(0usize, |acc, len| acc + len.len())
use textwrap::fill; + row.entries.iter().count() * 2)
> textwrap::termwidth()) =>
{
let termwidth = std::cmp::max(textwrap::termwidth(), 20); let termwidth = std::cmp::max(textwrap::termwidth(), 20);
enum TableMode { enum TableMode {
@ -283,8 +299,14 @@ pub fn autoview(context: RunnableContext) -> Result<OutputStream, ShellError> {
TableMode::Light => { TableMode::Light => {
table.set_format( table.set_format(
FormatBuilder::new() FormatBuilder::new()
.separator(LinePosition::Title, LineSeparator::new('─', '─', ' ', ' ')) .separator(
.separator(LinePosition::Bottom, LineSeparator::new(' ', ' ', ' ', ' ')) LinePosition::Title,
LineSeparator::new('─', '─', ' ', ' '),
)
.separator(
LinePosition::Bottom,
LineSeparator::new(' ', ' ', ' ', ' '),
)
.padding(1, 1) .padding(1, 1)
.build(), .build(),
); );
@ -293,9 +315,18 @@ pub fn autoview(context: RunnableContext) -> Result<OutputStream, ShellError> {
table.set_format( table.set_format(
FormatBuilder::new() FormatBuilder::new()
.column_separator('│') .column_separator('│')
.separator(LinePosition::Top, LineSeparator::new('─', '┬', ' ', ' ')) .separator(
.separator(LinePosition::Title, LineSeparator::new('─', '┼', ' ', ' ')) LinePosition::Top,
.separator(LinePosition::Bottom, LineSeparator::new('─', '┴', ' ', ' ')) LineSeparator::new('─', '┬', ' ', ' '),
)
.separator(
LinePosition::Title,
LineSeparator::new('─', '┼', ' ', ' '),
)
.separator(
LinePosition::Bottom,
LineSeparator::new('─', '┴', ' ', ' '),
)
.padding(1, 1) .padding(1, 1)
.build(), .build(),
); );
@ -307,15 +338,22 @@ pub fn autoview(context: RunnableContext) -> Result<OutputStream, ShellError> {
max_key_len = std::cmp::max(max_key_len, key.chars().count()); max_key_len = std::cmp::max(max_key_len, key.chars().count());
} }
if max_key_len > (termwidth/2 - 1) { if max_key_len > (termwidth / 2 - 1) {
max_key_len = termwidth/2 - 1; max_key_len = termwidth / 2 - 1;
} }
let max_val_len = termwidth - max_key_len - 5; let max_val_len = termwidth - max_key_len - 5;
for (key, value) in row.entries.iter() { for (key, value) in row.entries.iter() {
table.add_row(Row::new(vec![Cell::new(&fill(&key, max_key_len)).with_style(Attr::ForegroundColor(color::GREEN)).with_style(Attr::Bold), table.add_row(Row::new(vec![
Cell::new(&fill(&format_leaf(value).plain_string(100_000), max_val_len))])); Cell::new(&fill(&key, max_key_len))
.with_style(Attr::ForegroundColor(color::GREEN))
.with_style(Attr::Bold),
Cell::new(&fill(
&format_leaf(value).plain_string(100_000),
max_val_len,
)),
]));
} }
table.printstd(); table.printstd();
@ -324,11 +362,14 @@ pub fn autoview(context: RunnableContext) -> Result<OutputStream, ShellError> {
// .map_err(|_| ShellError::untagged_runtime_error("Internal error: could not print to terminal (for unix systems check to make sure TERM is set)"))?; // .map_err(|_| ShellError::untagged_runtime_error("Internal error: could not print to terminal (for unix systems check to make sure TERM is set)"))?;
} }
Value { value: ref item, .. } => { Value {
value: ref item, ..
} => {
if let Some(table) = table { if let Some(table) = table {
let mut stream = VecDeque::new(); let mut stream = VecDeque::new();
stream.push_back(x); stream.push_back(x);
let command_args = create_default_command_args(&context).with_input(stream); let command_args =
create_default_command_args(&context).with_input(stream);
let result = table.run(command_args, &context.registry).await; let result = table.run(command_args, &context.registry).await;
result.collect::<Vec<_>>().await; result.collect::<Vec<_>>().await;
} else { } else {
@ -339,16 +380,8 @@ pub fn autoview(context: RunnableContext) -> Result<OutputStream, ShellError> {
} }
} }
} }
_ => {
//out!("<no results>");
}
}
// Needed for async_stream to type check Ok(OutputStream::empty())
if false {
yield ReturnSuccess::value(UntaggedValue::nothing().into_untagged_value());
}
}))
} }
fn create_default_command_args(context: &RunnableContextWithoutInput) -> RawCommandArgs { fn create_default_command_args(context: &RunnableContextWithoutInput) -> RawCommandArgs {

View file

@ -32,7 +32,18 @@ impl WholeStreamCommand for BuildString {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
build_string(args, registry) let tag = args.call_info.name_tag.clone();
let (BuildStringArgs { rest }, _) = args.process(&registry).await?;
let mut output_string = String::new();
for r in rest {
output_string.push_str(&format_leaf(&r).plain_string(100_000))
}
Ok(OutputStream::one(ReturnSuccess::value(
UntaggedValue::string(output_string).into_value(tag),
)))
} }
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {
@ -43,24 +54,3 @@ impl WholeStreamCommand for BuildString {
}] }]
} }
} }
pub fn build_string(
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let tag = args.call_info.name_tag.clone();
let stream = async_stream! {
let (BuildStringArgs { rest }, mut input) = args.process(&registry).await?;
let mut output_string = String::new();
for r in rest {
output_string.push_str(&format_leaf(&r).plain_string(100_000))
}
yield Ok(ReturnSuccess::Value(UntaggedValue::string(&output_string).into_value(tag)));
};
Ok(stream.to_output_stream())
}

View file

@ -5,7 +5,7 @@ use nu_protocol::Dictionary;
use crate::commands::{command::EvaluatedWholeStreamCommandArgs, WholeStreamCommand}; use crate::commands::{command::EvaluatedWholeStreamCommandArgs, WholeStreamCommand};
use indexmap::IndexMap; use indexmap::IndexMap;
use nu_protocol::{ReturnSuccess, Signature, SyntaxShape, UntaggedValue, Value}; use nu_protocol::{Signature, SyntaxShape, UntaggedValue, Value};
pub struct Cal; pub struct Cal;
@ -42,7 +42,7 @@ impl WholeStreamCommand for Cal {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
cal(args, registry) cal(args, registry).await
} }
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {
@ -61,9 +61,11 @@ impl WholeStreamCommand for Cal {
} }
} }
pub fn cal(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { pub async fn cal(
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let registry = registry.clone(); let registry = registry.clone();
let stream = async_stream! {
let args = args.evaluate_once(&registry).await?; let args = args.evaluate_once(&registry).await?;
let mut calendar_vec_deque = VecDeque::new(); let mut calendar_vec_deque = VecDeque::new();
let tag = args.call_info.name_tag.clone(); let tag = args.call_info.name_tag.clone();
@ -82,8 +84,7 @@ pub fn cal(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream
current_day_option = None current_day_option = None
} }
} else { } else {
yield Err(get_invalid_year_shell_error(&full_year_value.tag())); return Err(get_invalid_year_shell_error(&full_year_value.tag()));
return;
} }
} }
@ -103,16 +104,9 @@ pub fn cal(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream
); );
match add_months_of_year_to_table_result { match add_months_of_year_to_table_result {
Ok(()) => { Ok(()) => Ok(futures::stream::iter(calendar_vec_deque).to_output_stream()),
for item in calendar_vec_deque { Err(error) => Err(error),
yield ReturnSuccess::value(item);
} }
}
Err(error) => yield Err(error),
}
};
Ok(stream.to_output_stream())
} }
fn get_invalid_year_shell_error(year_tag: &Tag) -> ShellError { fn get_invalid_year_shell_error(year_tag: &Tag) -> ShellError {

View file

@ -20,7 +20,7 @@ impl WholeStreamCommand for Calc {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
calc(args, registry) calc(args, registry).await
} }
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {
@ -32,31 +32,33 @@ impl WholeStreamCommand for Calc {
} }
} }
pub fn calc(args: CommandArgs, _registry: &CommandRegistry) -> Result<OutputStream, ShellError> { pub async fn calc(
let stream = async_stream! { args: CommandArgs,
let mut input = args.input; _registry: &CommandRegistry,
let name = args.call_info.name_tag.clone(); ) -> Result<OutputStream, ShellError> {
while let Some(input) = input.next().await { let input = args.input;
let name = args.call_info.name_tag.span;
Ok(input
.map(move |input| {
if let Ok(string) = input.as_string() { if let Ok(string) = input.as_string() {
match parse(&string, &input.tag) { match parse(&string, &input.tag) {
Ok(value) => yield ReturnSuccess::value(value), Ok(value) => ReturnSuccess::value(value),
Err(err) => yield Err(ShellError::labeled_error( Err(err) => Err(ShellError::labeled_error(
"Calculation error", "Calculation error",
err, err,
&input.tag.span, &input.tag.span,
)), )),
} }
} else { } else {
yield Err(ShellError::labeled_error( Err(ShellError::labeled_error(
"Expected a string from pipeline", "Expected a string from pipeline",
"requires string input", "requires string input",
name.clone(), name,
)) ))
} }
} })
}; .to_output_stream())
Ok(stream.to_output_stream())
} }
pub fn parse(math_expression: &str, tag: impl Into<Tag>) -> Result<Value, String> { pub fn parse(math_expression: &str, tag: impl Into<Tag>) -> Result<Value, String> {

View file

@ -37,7 +37,10 @@ impl WholeStreamCommand for Cd {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
cd(args, registry) let name = args.call_info.name_tag.clone();
let shell_manager = args.shell_manager.clone();
let (args, _): (CdArgs, _) = args.process(&registry).await?;
shell_manager.cd(args, name)
} }
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {
@ -66,22 +69,6 @@ impl WholeStreamCommand for Cd {
} }
} }
fn cd(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let name = args.call_info.name_tag.clone();
let shell_manager = args.shell_manager.clone();
let (args, _): (CdArgs, _) = args.process(&registry).await?;
let mut result = shell_manager.cd(args, name)?;
while let Some(item) = result.next().await {
yield item;
}
};
Ok(stream.to_output_stream())
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::Cd; use super::Cd;

View file

@ -20,24 +20,7 @@ impl WholeStreamCommand for Clear {
"clears the terminal" "clears the terminal"
} }
async fn run( async fn run(&self, _: CommandArgs, _: &CommandRegistry) -> Result<OutputStream, ShellError> {
&self,
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
clear(args, registry)
}
fn examples(&self) -> Vec<Example> {
vec![Example {
description: "Clear the screen",
example: "clear",
result: None,
}]
}
}
fn clear(_args: CommandArgs, _registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
if cfg!(windows) { if cfg!(windows) {
Command::new("cmd") Command::new("cmd")
.args(&["/C", "cls"]) .args(&["/C", "cls"])
@ -50,6 +33,15 @@ fn clear(_args: CommandArgs, _registry: &CommandRegistry) -> Result<OutputStream
.expect("failed to execute process"); .expect("failed to execute process");
} }
Ok(OutputStream::empty()) Ok(OutputStream::empty())
}
fn examples(&self) -> Vec<Example> {
vec![Example {
description: "Clear the screen",
example: "clear",
result: None,
}]
}
} }
#[cfg(test)] #[cfg(test)]

View file

@ -3,7 +3,7 @@ use crate::context::CommandRegistry;
use crate::prelude::*; use crate::prelude::*;
use futures::stream::StreamExt; use futures::stream::StreamExt;
use nu_errors::ShellError; use nu_errors::ShellError;
use nu_protocol::{ReturnValue, Signature, Value}; use nu_protocol::{Signature, Value};
use clipboard::{ClipboardContext, ClipboardProvider}; use clipboard::{ClipboardContext, ClipboardProvider};
@ -28,7 +28,7 @@ impl WholeStreamCommand for Clip {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
clip(args, registry) clip(args, registry).await
} }
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {
@ -40,31 +40,21 @@ impl WholeStreamCommand for Clip {
} }
} }
pub fn clip(args: CommandArgs, _registry: &CommandRegistry) -> Result<OutputStream, ShellError> { pub async fn clip(
let stream = async_stream! { args: CommandArgs,
let mut input = args.input; _registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let input = args.input;
let name = args.call_info.name_tag.clone(); let name = args.call_info.name_tag.clone();
let values: Vec<Value> = input.collect().await; let values: Vec<Value> = input.collect().await;
let mut clip_stream = inner_clip(values, name).await;
while let Some(value) = clip_stream.next().await {
yield value;
}
};
let stream: BoxStream<'static, ReturnValue> = stream.boxed();
Ok(OutputStream::from(stream))
}
async fn inner_clip(input: Vec<Value>, name: Tag) -> OutputStream {
if let Ok(clip_context) = ClipboardProvider::new() { if let Ok(clip_context) = ClipboardProvider::new() {
let mut clip_context: ClipboardContext = clip_context; let mut clip_context: ClipboardContext = clip_context;
let mut new_copy_data = String::new(); let mut new_copy_data = String::new();
if !input.is_empty() { if !values.is_empty() {
let mut first = true; let mut first = true;
for i in input.iter() { for i in values.iter() {
if !first { if !first {
new_copy_data.push_str("\n"); new_copy_data.push_str("\n");
} else { } else {
@ -74,11 +64,11 @@ async fn inner_clip(input: Vec<Value>, name: Tag) -> OutputStream {
let string: String = match i.as_string() { let string: String = match i.as_string() {
Ok(string) => string.to_string(), Ok(string) => string.to_string(),
Err(_) => { Err(_) => {
return OutputStream::one(Err(ShellError::labeled_error( return Err(ShellError::labeled_error(
"Given non-string data", "Given non-string data",
"expected strings from pipeline", "expected strings from pipeline",
name, name,
))) ))
} }
}; };
@ -89,22 +79,21 @@ async fn inner_clip(input: Vec<Value>, name: Tag) -> OutputStream {
match clip_context.set_contents(new_copy_data) { match clip_context.set_contents(new_copy_data) {
Ok(_) => {} Ok(_) => {}
Err(_) => { Err(_) => {
return OutputStream::one(Err(ShellError::labeled_error( return Err(ShellError::labeled_error(
"Could not set contents of clipboard", "Could not set contents of clipboard",
"could not set contents of clipboard", "could not set contents of clipboard",
name, name,
))); ));
} }
} }
OutputStream::empty()
} else { } else {
OutputStream::one(Err(ShellError::labeled_error( return Err(ShellError::labeled_error(
"Could not open clipboard", "Could not open clipboard",
"could not open clipboard", "could not open clipboard",
name, name,
))) ));
} }
Ok(OutputStream::empty())
} }
#[cfg(test)] #[cfg(test)]

View file

@ -1,6 +1,7 @@
use crate::commands::WholeStreamCommand; use crate::commands::WholeStreamCommand;
use crate::context::CommandRegistry; use crate::context::CommandRegistry;
use crate::prelude::*; use crate::prelude::*;
use futures::future;
use futures::stream::StreamExt; use futures::stream::StreamExt;
use nu_errors::ShellError; use nu_errors::ShellError;
use nu_protocol::{ReturnSuccess, Signature, SyntaxShape, UntaggedValue, Value}; use nu_protocol::{ReturnSuccess, Signature, SyntaxShape, UntaggedValue, Value};
@ -32,7 +33,7 @@ impl WholeStreamCommand for Compact {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
compact(args, registry) compact(args, registry).await
} }
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {
@ -55,31 +56,40 @@ impl WholeStreamCommand for Compact {
} }
} }
pub fn compact(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { pub async fn compact(
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let registry = registry.clone(); let registry = registry.clone();
let stream = async_stream! { let (CompactArgs { rest: columns }, input) = args.process(&registry).await?;
let (CompactArgs { rest: columns }, mut input) = args.process(&registry).await?; Ok(input
while let Some(item) = input.next().await { .filter_map(move |item| {
if columns.is_empty() { future::ready(if columns.is_empty() {
if !item.is_empty() { if !item.is_empty() {
yield ReturnSuccess::value(item); Some(ReturnSuccess::value(item))
} else {
None
} }
} else { } else {
match item { match item {
Value { Value {
value: UntaggedValue::Row(ref r), value: UntaggedValue::Row(ref r),
.. ..
} => if columns } => {
if columns
.iter() .iter()
.all(|field| r.get_data(field).borrow().is_some()) { .all(|field| r.get_data(field).borrow().is_some())
yield ReturnSuccess::value(item); {
Some(ReturnSuccess::value(item))
} else {
None
} }
_ => {},
} }
}; _ => None,
} }
}; })
Ok(stream.to_output_stream()) })
.to_output_stream())
} }
#[cfg(test)] #[cfg(test)]

View file

@ -71,7 +71,7 @@ impl WholeStreamCommand for Config {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
config(args, registry) config(args, registry).await
} }
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {
@ -115,13 +115,16 @@ impl WholeStreamCommand for Config {
} }
} }
pub fn config(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { pub async fn config(
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let name_span = args.call_info.name_tag.clone(); let name_span = args.call_info.name_tag.clone();
let name = args.call_info.name_tag.clone(); let name = args.call_info.name_tag.clone();
let registry = registry.clone(); let registry = registry.clone();
let stream = async_stream! { let (
let (ConfigArgs { ConfigArgs {
load, load,
set, set,
set_into, set_into,
@ -129,7 +132,9 @@ pub fn config(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStr
clear, clear,
remove, remove,
path, path,
}, mut input) = args.process(&registry).await?; },
input,
) = args.process(&registry).await?;
let configuration = if let Some(supplied) = load { let configuration = if let Some(supplied) = load {
Some(supplied.item().clone()) Some(supplied.item().clone())
@ -139,7 +144,7 @@ pub fn config(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStr
let mut result = crate::data::config::read(name_span, &configuration)?; let mut result = crate::data::config::read(name_span, &configuration)?;
if let Some(v) = get { Ok(if let Some(v) = get {
let key = v.to_string(); let key = v.to_string();
let value = result let value = result
.get(&key) .get(&key)
@ -150,83 +155,96 @@ pub fn config(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStr
value: UntaggedValue::Table(list), value: UntaggedValue::Table(list),
.. ..
} => { } => {
for l in list { let list: Vec<_> = list
let value = l.clone(); .iter()
yield ReturnSuccess::value(l.clone()); .map(|x| ReturnSuccess::value(x.clone()))
.collect();
futures::stream::iter(list).to_output_stream()
}
x => {
let x = x.clone();
OutputStream::one(ReturnSuccess::value(x))
} }
} }
x => yield ReturnSuccess::value(x.clone()), } else if let Some((key, value)) = set {
}
}
else if let Some((key, value)) = set {
result.insert(key.to_string(), value.clone()); result.insert(key.to_string(), value.clone());
config::write(&result, &configuration)?; config::write(&result, &configuration)?;
yield ReturnSuccess::value(UntaggedValue::Row(result.into()).into_value(&value.tag)); OutputStream::one(ReturnSuccess::value(
} UntaggedValue::Row(result.into()).into_value(&value.tag),
else if let Some(v) = set_into { ))
} else if let Some(v) = set_into {
let rows: Vec<Value> = input.collect().await; let rows: Vec<Value> = input.collect().await;
let key = v.to_string(); let key = v.to_string();
if rows.len() == 0 { if rows.is_empty() {
yield Err(ShellError::labeled_error("No values given for set_into", "needs value(s) from pipeline", v.tag())); return Err(ShellError::labeled_error(
"No values given for set_into",
"needs value(s) from pipeline",
v.tag(),
));
} else if rows.len() == 1 { } else if rows.len() == 1 {
// A single value // A single value
let value = &rows[0]; let value = &rows[0];
result.insert(key.to_string(), value.clone()); result.insert(key, value.clone());
config::write(&result, &configuration)?; config::write(&result, &configuration)?;
yield ReturnSuccess::value(UntaggedValue::Row(result.into()).into_value(name)); OutputStream::one(ReturnSuccess::value(
UntaggedValue::Row(result.into()).into_value(name),
))
} else { } else {
// Take in the pipeline as a table // Take in the pipeline as a table
let value = UntaggedValue::Table(rows).into_value(name.clone()); let value = UntaggedValue::Table(rows).into_value(name.clone());
result.insert(key.to_string(), value.clone()); result.insert(key, value);
config::write(&result, &configuration)?; config::write(&result, &configuration)?;
yield ReturnSuccess::value(UntaggedValue::Row(result.into()).into_value(name)); OutputStream::one(ReturnSuccess::value(
UntaggedValue::Row(result.into()).into_value(name),
))
} }
} } else if let Tagged { item: true, tag } = clear {
else if let Tagged { item: true, tag } = clear {
result.clear(); result.clear();
config::write(&result, &configuration)?; config::write(&result, &configuration)?;
yield ReturnSuccess::value(UntaggedValue::Row(result.into()).into_value(tag)); OutputStream::one(ReturnSuccess::value(
UntaggedValue::Row(result.into()).into_value(tag),
return; ))
} } else if let Tagged { item: true, tag } = path {
else if let Tagged { item: true, tag } = path {
let path = config::default_path_for(&configuration)?; let path = config::default_path_for(&configuration)?;
yield ReturnSuccess::value(UntaggedValue::Primitive(Primitive::Path(path)).into_value(tag)); OutputStream::one(ReturnSuccess::value(
} UntaggedValue::Primitive(Primitive::Path(path)).into_value(tag),
else if let Some(v) = remove { ))
} else if let Some(v) = remove {
let key = v.to_string(); let key = v.to_string();
if result.contains_key(&key) { if result.contains_key(&key) {
result.swap_remove(&key); result.swap_remove(&key);
config::write(&result, &configuration)? config::write(&result, &configuration)?;
futures::stream::iter(vec![ReturnSuccess::value(
UntaggedValue::Row(result.into()).into_value(v.tag()),
)])
.to_output_stream()
} else { } else {
yield Err(ShellError::labeled_error( return Err(ShellError::labeled_error(
"Key does not exist in config", "Key does not exist in config",
"key", "key",
v.tag(), v.tag(),
)); ));
} }
} else {
yield ReturnSuccess::value(UntaggedValue::Row(result.into()).into_value(v.tag())); futures::stream::iter(vec![ReturnSuccess::value(
} UntaggedValue::Row(result.into()).into_value(name),
else { )])
yield ReturnSuccess::value(UntaggedValue::Row(result.into()).into_value(name)); .to_output_stream()
} })
};
Ok(stream.to_output_stream())
} }
#[cfg(test)] #[cfg(test)]

View file

@ -3,7 +3,7 @@ use crate::context::CommandRegistry;
use crate::prelude::*; use crate::prelude::*;
use futures::stream::StreamExt; use futures::stream::StreamExt;
use nu_errors::ShellError; use nu_errors::ShellError;
use nu_protocol::{ReturnSuccess, Signature, UntaggedValue, Value}; use nu_protocol::{Signature, UntaggedValue, Value};
pub struct Count; pub struct Count;
@ -24,9 +24,14 @@ impl WholeStreamCommand for Count {
async fn run( async fn run(
&self, &self,
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, _registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
count(args, registry) let name = args.call_info.name_tag.clone();
let rows: Vec<Value> = args.input.collect().await;
Ok(OutputStream::one(
UntaggedValue::int(rows.len()).into_value(name),
))
} }
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {
@ -38,17 +43,6 @@ impl WholeStreamCommand for Count {
} }
} }
pub fn count(args: CommandArgs, _registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let stream = async_stream! {
let name = args.call_info.name_tag.clone();
let rows: Vec<Value> = args.input.collect().await;
yield ReturnSuccess::value(UntaggedValue::int(rows.len()).into_value(name))
};
Ok(stream.to_output_stream())
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::Count; use super::Count;

View file

@ -41,7 +41,10 @@ impl WholeStreamCommand for Cpy {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
cp(args, registry) let shell_manager = args.shell_manager.clone();
let name = args.call_info.name_tag.clone();
let (args, _) = args.process(&registry).await?;
shell_manager.cp(args, name)
} }
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {
@ -60,22 +63,6 @@ impl WholeStreamCommand for Cpy {
} }
} }
pub fn cp(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let shell_manager = args.shell_manager.clone();
let name = args.call_info.name_tag.clone();
let (args, _) = args.process(&registry).await?;
let mut result = shell_manager.cp(args, name)?;
while let Some(item) = result.next().await {
yield item;
}
};
Ok(stream.to_output_stream())
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::Cpy; use super::Cpy;

View file

@ -7,7 +7,7 @@ use crate::commands::WholeStreamCommand;
use chrono::{Datelike, TimeZone, Timelike}; use chrono::{Datelike, TimeZone, Timelike};
use core::fmt::Display; use core::fmt::Display;
use indexmap::IndexMap; use indexmap::IndexMap;
use nu_protocol::{ReturnSuccess, Signature, UntaggedValue}; use nu_protocol::{Signature, UntaggedValue};
pub struct Date; pub struct Date;
@ -32,7 +32,7 @@ impl WholeStreamCommand for Date {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
date(args, registry) date(args, registry).await
} }
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {
@ -91,9 +91,11 @@ where
UntaggedValue::Row(Dictionary::from(indexmap)).into_value(&tag) UntaggedValue::Row(Dictionary::from(indexmap)).into_value(&tag)
} }
pub fn date(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { pub async fn date(
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let registry = registry.clone(); let registry = registry.clone();
let stream = async_stream! {
let args = args.evaluate_once(&registry).await?; let args = args.evaluate_once(&registry).await?;
let tag = args.call_info.name_tag.clone(); let tag = args.call_info.name_tag.clone();
@ -106,10 +108,7 @@ pub fn date(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStrea
date_to_value(local, tag) date_to_value(local, tag)
}; };
yield ReturnSuccess::value(value); Ok(OutputStream::one(value))
};
Ok(stream.to_output_stream())
} }
#[cfg(test)] #[cfg(test)]

View file

@ -65,7 +65,11 @@ impl WholeStreamCommand for Ls {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
ls(args, registry) let name = args.call_info.name_tag.clone();
let ctrl_c = args.ctrl_c.clone();
let shell_manager = args.shell_manager.clone();
let (args, _) = args.process(&registry).await?;
shell_manager.ls(args, name, ctrl_c)
} }
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {
@ -89,23 +93,6 @@ impl WholeStreamCommand for Ls {
} }
} }
fn ls(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let name = args.call_info.name_tag.clone();
let ctrl_c = args.ctrl_c.clone();
let shell_manager = args.shell_manager.clone();
let (args, _) = args.process(&registry).await?;
let mut result = shell_manager.ls(args, name, ctrl_c)?;
while let Some(item) = result.next().await {
yield item;
}
};
Ok(stream.to_output_stream())
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::Ls; use super::Ls;

View file

@ -1,6 +1,7 @@
use crate::prelude::*; use crate::prelude::*;
use futures::stream::iter; use futures::stream::iter;
use nu_protocol::{ReturnSuccess, ReturnValue, Value}; use nu_protocol::{ReturnSuccess, ReturnValue, Value};
use std::iter::IntoIterator;
pub struct OutputStream { pub struct OutputStream {
pub(crate) values: BoxStream<'static, ReturnValue>, pub(crate) values: BoxStream<'static, ReturnValue>,
@ -19,9 +20,8 @@ impl OutputStream {
} }
pub fn one(item: impl Into<ReturnValue>) -> OutputStream { pub fn one(item: impl Into<ReturnValue>) -> OutputStream {
let mut v: VecDeque<ReturnValue> = VecDeque::new(); let item = item.into();
v.push_back(item.into()); futures::stream::once(async move { item }).to_output_stream()
v.into()
} }
pub fn from_input(input: impl Stream<Item = Value> + Send + 'static) -> OutputStream { pub fn from_input(input: impl Stream<Item = Value> + Send + 'static) -> OutputStream {