Moving some commands off of async stream (#1934)

* Remove async_stream from rm

* Remove async_stream from sort_by

* Remove async_stream from split_by

* Remove dbg!() statement

* Remove async_stream from uniq

* Remove async_stream from mkdir

* Don't change functions from private to public

* Clippy fixes

* Peer-review updates
This commit is contained in:
Joseph T. Lyons 2020-06-04 04:42:23 -04:00 committed by GitHub
parent 5dd346094e
commit 012c99839c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 99 additions and 102 deletions

View file

@ -32,7 +32,7 @@ impl WholeStreamCommand for Mkdir {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
mkdir(args, registry) mkdir(args, registry).await
} }
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {
@ -44,20 +44,13 @@ impl WholeStreamCommand for Mkdir {
} }
} }
fn mkdir(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { async fn mkdir(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone(); let registry = registry.clone();
let stream = async_stream! { let name = args.call_info.name_tag.clone();
let name = args.call_info.name_tag.clone(); let shell_manager = args.shell_manager.clone();
let shell_manager = args.shell_manager.clone(); let (args, _) = args.process(&registry).await?;
let (args, _) = args.process(&registry).await?;
let mut result = shell_manager.mkdir(args, name)?;
while let Some(item) = result.next().await { shell_manager.mkdir(args, name)
yield item;
}
};
Ok(stream.to_output_stream())
} }
#[cfg(test)] #[cfg(test)]

View file

@ -49,7 +49,7 @@ impl WholeStreamCommand for Remove {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
rm(args, registry) rm(args, registry).await
} }
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {
@ -73,27 +73,21 @@ impl WholeStreamCommand for Remove {
} }
} }
fn rm(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { async fn rm(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone(); let registry = registry.clone();
let stream = async_stream! { let name = args.call_info.name_tag.clone();
let name = args.call_info.name_tag.clone(); let shell_manager = args.shell_manager.clone();
let shell_manager = args.shell_manager.clone(); let (args, _): (RemoveArgs, _) = args.process(&registry).await?;
let (args, _): (RemoveArgs, _) = args.process(&registry).await?;
let mut result = if args.trash.item && args.permanent.item {
OutputStream::one(Err(ShellError::labeled_error(
"only one of --permanent and --trash can be used",
"conflicting flags",
name
)))
} else {
shell_manager.rm(args, name)?
};
while let Some(item) = result.next().await {
yield item;
}
};
Ok(stream.to_output_stream()) if args.trash.item && args.permanent.item {
return Ok(OutputStream::one(Err(ShellError::labeled_error(
"only one of --permanent and --trash can be used",
"conflicting flags",
name,
))));
}
shell_manager.rm(args, name)
} }
#[cfg(test)] #[cfg(test)]

View file

@ -31,7 +31,7 @@ impl WholeStreamCommand for SortBy {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
sort_by(args, registry) sort_by(args, registry).await
} }
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {
@ -60,51 +60,59 @@ impl WholeStreamCommand for SortBy {
} }
} }
fn sort_by(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { async fn sort_by(
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let registry = registry.clone(); let registry = registry.clone();
let stream = async_stream! { let tag = args.call_info.name_tag.clone();
let (SortByArgs { rest }, mut input) = args.process(&registry).await?;
let mut vec = input.drain_vec().await;
if vec.is_empty() { let (SortByArgs { rest }, mut input) = args.process(&registry).await?;
return; let mut vec = input.drain_vec().await;
if vec.is_empty() {
return Err(ShellError::labeled_error(
"Error performing sort-by command",
"sort-by error",
tag,
));
}
for sort_arg in rest.iter() {
let match_test = get_data_by_key(&vec[0], sort_arg.borrow_spanned());
if match_test == None {
return Err(ShellError::labeled_error(
"Can not find column to sort by",
"invalid column",
sort_arg.borrow_spanned().span,
));
} }
}
for sort_arg in rest.iter() { match &vec[0] {
let match_test = get_data_by_key(&vec[0], sort_arg.borrow_spanned()); Value {
if match_test == None { value: UntaggedValue::Primitive(_),
yield Err(ShellError::labeled_error( ..
"Can not find column to sort by", } => {
"invalid column", vec.sort();
sort_arg.borrow_spanned().span,
));
return;
}
} }
_ => {
match &vec[0] { let calc_key = |item: &Value| {
Value { rest.iter()
value: UntaggedValue::Primitive(_), .map(|f| get_data_by_key(item, f.borrow_spanned()))
.. .collect::<Vec<Option<Value>>>()
} => { };
vec.sort(); vec.sort_by_cached_key(calc_key);
},
_ => {
let calc_key = |item: &Value| {
rest.iter()
.map(|f| get_data_by_key(item, f.borrow_spanned()))
.collect::<Vec<Option<Value>>>()
};
vec.sort_by_cached_key(calc_key);
},
};
for item in vec {
yield item.into();
} }
}; };
Ok(stream.to_output_stream()) let mut values_vec_deque: VecDeque<Value> = VecDeque::new();
for item in vec {
values_vec_deque.push_back(item);
}
Ok(futures::stream::iter(values_vec_deque).to_output_stream())
} }
#[cfg(test)] #[cfg(test)]

View file

@ -2,7 +2,7 @@ use crate::commands::WholeStreamCommand;
use crate::prelude::*; use crate::prelude::*;
use nu_errors::ShellError; use nu_errors::ShellError;
use nu_protocol::{ use nu_protocol::{
ReturnSuccess, Signature, SpannedTypeName, SyntaxShape, TaggedDictBuilder, UntaggedValue, Value, Signature, SpannedTypeName, SyntaxShape, TaggedDictBuilder, UntaggedValue, Value,
}; };
use nu_source::Tagged; use nu_source::Tagged;
@ -36,32 +36,31 @@ impl WholeStreamCommand for SplitBy {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
split_by(args, registry) split_by(args, registry).await
} }
} }
pub fn split_by(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { pub async fn split_by(
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let registry = registry.clone(); let registry = registry.clone();
let stream = async_stream! { let name = args.call_info.name_tag.clone();
let name = args.call_info.name_tag.clone(); let (SplitByArgs { column_name }, input) = args.process(&registry).await?;
let (SplitByArgs { column_name }, mut input) = args.process(&registry).await?; let values: Vec<Value> = input.collect().await;
let values: Vec<Value> = input.collect().await;
if values.len() > 1 || values.is_empty() { if values.len() > 1 || values.is_empty() {
yield Err(ShellError::labeled_error( return Err(ShellError::labeled_error(
"Expected table from pipeline", "Expected table from pipeline",
"requires a table input", "requires a table input",
column_name.span() column_name.span(),
)) ));
} else { }
match split(&column_name, &values[0], name) {
Ok(split) => yield ReturnSuccess::value(split),
Err(err) => yield Err(err),
}
}
};
Ok(stream.to_output_stream()) match split(&column_name, &values[0], name) {
Ok(split) => Ok(OutputStream::one(split)),
Err(err) => Err(err),
}
} }
pub fn split( pub fn split(

View file

@ -26,21 +26,24 @@ impl WholeStreamCommand for Uniq {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
uniq(args, registry) uniq(args, registry).await
} }
} }
fn uniq(args: CommandArgs, _registry: &CommandRegistry) -> Result<OutputStream, ShellError> { async fn uniq(args: CommandArgs, _registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let stream = async_stream! { let input = args.input;
let mut input = args.input; let uniq_values: IndexSet<_> = input.collect().await;
let uniq_values: IndexSet<_> = input.collect().await;
for item in uniq_values.iter().map(|row| ReturnSuccess::value(row.clone())) { let mut values_vec_deque = VecDeque::new();
yield item;
}
};
Ok(stream.to_output_stream()) for item in uniq_values
.iter()
.map(|row| ReturnSuccess::value(row.clone()))
{
values_vec_deque.push_back(item);
}
Ok(futures::stream::iter(values_vec_deque).to_output_stream())
} }
#[cfg(test)] #[cfg(test)]