Another batch of removing async_stream (#1970)

This commit is contained in:
Jonathan Turner 2020-06-12 01:34:41 -07:00 committed by GitHub
parent 731aa6bbdd
commit 935a5f6b9e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 217 additions and 204 deletions

View file

@ -45,8 +45,7 @@ impl WholeStreamCommand for Alias {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
// args.process(registry, alias)?.run() alias(args, registry).await
alias(args, registry)
} }
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {
@ -65,14 +64,21 @@ impl WholeStreamCommand for Alias {
} }
} }
// <<<<<<< HEAD pub async fn alias(
// pub fn alias(alias_args: AliasArgs, ctx: RunnableContext) -> Result<OutputStream, ShellError> { args: CommandArgs,
// ======= registry: &CommandRegistry,
pub fn alias(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
let registry = registry.clone(); let registry = registry.clone();
let stream = async_stream! {
let mut raw_input = args.raw_input.clone(); let mut raw_input = args.raw_input.clone();
let (AliasArgs { name, args: list, block, save}, ctx) = args.process(&registry).await?; let (
AliasArgs {
name,
args: list,
block,
save,
},
_ctx,
) = args.process(&registry).await?;
let mut processed_args: Vec<String> = vec![]; let mut processed_args: Vec<String> = vec![];
if let Some(true) = save { if let Some(true) = save {
@ -80,14 +86,18 @@ pub fn alias(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStre
// process the alias to remove the --save flag // process the alias to remove the --save flag
let left_brace = raw_input.find('{').unwrap_or(0); let left_brace = raw_input.find('{').unwrap_or(0);
let right_brace = raw_input.rfind('}').unwrap_or(raw_input.len()); let right_brace = raw_input.rfind('}').unwrap_or_else(|| raw_input.len());
let mut left = raw_input[..left_brace].replace("--save", "").replace("-s", ""); let left = raw_input[..left_brace]
let mut right = raw_input[right_brace..].replace("--save", "").replace("-s", ""); .replace("--save", "")
.replace("-s", "");
let right = raw_input[right_brace..]
.replace("--save", "")
.replace("-s", "");
raw_input = format!("{}{}{}", left, &raw_input[left_brace..right_brace], right); raw_input = format!("{}{}{}", left, &raw_input[left_brace..right_brace], right);
// create a value from raw_input alias // create a value from raw_input alias
let alias: Value = raw_input.trim().to_string().into(); let alias: Value = raw_input.trim().to_string().into();
let alias_start = raw_input.find("[").unwrap_or(0); // used to check if the same alias already exists let alias_start = raw_input.find('[').unwrap_or(0); // used to check if the same alias already exists
// add to startup if alias doesn't exist and replce if it does // add to startup if alias doesn't exist and replce if it does
match result.get_mut("startup") { match result.get_mut("startup") {
@ -104,7 +114,7 @@ pub fn alias(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStre
} }
} }
None => { None => {
let mut table = UntaggedValue::table(&[alias]); let table = UntaggedValue::table(&[alias]);
result.insert("startup".to_string(), table.into_value(Tag::default())); result.insert("startup".to_string(), table.into_value(Tag::default()));
} }
} }
@ -115,13 +125,17 @@ pub fn alias(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStre
if let Ok(string) = item.as_string() { if let Ok(string) = item.as_string() {
processed_args.push(format!("${}", string)); processed_args.push(format!("${}", string));
} else { } else {
yield Err(ShellError::labeled_error("Expected a string", "expected a string", item.tag())); return Err(ShellError::labeled_error(
"Expected a string",
"expected a string",
item.tag(),
));
} }
} }
yield ReturnSuccess::action(CommandAction::AddAlias(name.to_string(), processed_args, block.clone()))
};
Ok(stream.to_output_stream()) Ok(OutputStream::one(ReturnSuccess::action(
CommandAction::AddAlias(name.to_string(), processed_args, block),
)))
} }
#[cfg(test)] #[cfg(test)]

View file

@ -4,9 +4,7 @@ use crate::utils::data_processing::{reducer_for, Reduce};
use bigdecimal::FromPrimitive; use bigdecimal::FromPrimitive;
use nu_errors::ShellError; use nu_errors::ShellError;
use nu_protocol::hir::{convert_number_to_u64, Number, Operator}; use nu_protocol::hir::{convert_number_to_u64, Number, Operator};
use nu_protocol::{ use nu_protocol::{Dictionary, Primitive, ReturnSuccess, Signature, UntaggedValue, Value};
Dictionary, Primitive, ReturnSuccess, ReturnValue, Signature, UntaggedValue, Value,
};
use num_traits::identities::Zero; use num_traits::identities::Zero;
use indexmap::map::IndexMap; use indexmap::map::IndexMap;
@ -42,6 +40,7 @@ impl WholeStreamCommand for Average {
name: args.call_info.name_tag, name: args.call_info.name_tag,
raw_input: args.raw_input, raw_input: args.raw_input,
}) })
.await
} }
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {
@ -53,34 +52,29 @@ impl WholeStreamCommand for Average {
} }
} }
fn average( async fn average(
RunnableContext { RunnableContext {
mut input, name, .. mut input, name, ..
}: RunnableContext, }: RunnableContext,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
let stream = async_stream! { let values: Vec<Value> = input.drain_vec().await;
let mut values: Vec<Value> = input.drain_vec().await;
let action = reducer_for(Reduce::Sum);
if values.iter().all(|v| if let UntaggedValue::Primitive(_) = v.value {true} else {false}) { if values.iter().all(|v| v.is_primitive()) {
match avg(&values, name) { match avg(&values, name) {
Ok(result) => yield ReturnSuccess::value(result), Ok(result) => Ok(OutputStream::one(ReturnSuccess::value(result))),
Err(err) => yield Err(err), Err(err) => Err(err),
} }
} else { } else {
let mut column_values = IndexMap::new(); let mut column_values = IndexMap::new();
for value in values { for value in values {
match value.value { if let UntaggedValue::Row(row_dict) = value.value {
UntaggedValue::Row(row_dict) => {
for (key, value) in row_dict.entries.iter() { for (key, value) in row_dict.entries.iter() {
column_values column_values
.entry(key.clone()) .entry(key.clone())
.and_modify(|v: &mut Vec<Value>| v.push(value.clone())) .and_modify(|v: &mut Vec<Value>| v.push(value.clone()))
.or_insert(vec![value.clone()]); .or_insert(vec![value.clone()]);
} }
}, }
table => {},
};
} }
let mut column_totals = IndexMap::new(); let mut column_totals = IndexMap::new();
@ -89,17 +83,17 @@ fn average(
Ok(result) => { Ok(result) => {
column_totals.insert(col_name, result); column_totals.insert(col_name, result);
} }
Err(err) => yield Err(err), Err(err) => return Err(err),
} }
} }
yield ReturnSuccess::value(
UntaggedValue::Row(Dictionary {entries: column_totals}).into_untagged_value())
}
};
let stream: BoxStream<'static, ReturnValue> = stream.boxed(); Ok(OutputStream::one(ReturnSuccess::value(
UntaggedValue::Row(Dictionary {
Ok(stream.to_output_stream()) entries: column_totals,
})
.into_untagged_value(),
)))
}
} }
fn avg(values: &[Value], name: impl Into<Tag>) -> Result<Value, ShellError> { fn avg(values: &[Value], name: impl Into<Tag>) -> Result<Value, ShellError> {

View file

@ -54,7 +54,7 @@ documentation link at https://docs.rs/encoding_rs/0.8.23/encoding_rs/#statics"#
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
enter(args, registry) enter(args, registry).await
} }
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {
@ -78,9 +78,11 @@ documentation link at https://docs.rs/encoding_rs/0.8.23/encoding_rs/#statics"#
} }
} }
fn enter(raw_args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { async fn enter(
raw_args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let registry = registry.clone(); let registry = registry.clone();
let stream = async_stream! {
let scope = raw_args.call_info.scope.clone(); let scope = raw_args.call_info.scope.clone();
let shell_manager = raw_args.shell_manager.clone(); let shell_manager = raw_args.shell_manager.clone();
let head = raw_args.call_info.args.head.clone(); let head = raw_args.call_info.args.head.clone();
@ -99,35 +101,36 @@ fn enter(raw_args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStre
let (_, command) = (spec[0], spec[1]); let (_, command) = (spec[0], spec[1]);
if registry.has(command) { if registry.has(command) {
yield Ok(ReturnSuccess::Action(CommandAction::EnterHelpShell( return Ok(OutputStream::one(ReturnSuccess::action(
CommandAction::EnterHelpShell(
UntaggedValue::string(command).into_value(Tag::unknown()), UntaggedValue::string(command).into_value(Tag::unknown()),
),
))); )));
return;
} }
} }
yield Ok(ReturnSuccess::Action(CommandAction::EnterHelpShell( Ok(OutputStream::one(ReturnSuccess::action(
UntaggedValue::nothing().into_value(Tag::unknown()), CommandAction::EnterHelpShell(UntaggedValue::nothing().into_value(Tag::unknown())),
))); )))
} else if location.is_dir() { } else if location.is_dir() {
yield Ok(ReturnSuccess::Action(CommandAction::EnterShell( Ok(OutputStream::one(ReturnSuccess::action(
location_clone, CommandAction::EnterShell(location_clone),
))); )))
} else { } else {
// If it's a file, attempt to open the file as a value and enter it // If it's a file, attempt to open the file as a value and enter it
let cwd = shell_manager.path(); let cwd = shell_manager.path();
let full_path = std::path::PathBuf::from(cwd); let full_path = std::path::PathBuf::from(cwd);
let (file_extension, contents, contents_tag) = let (file_extension, contents, contents_tag) = crate::commands::open::fetch(
crate::commands::open::fetch(
&full_path, &full_path,
&PathBuf::from(location_clone), &PathBuf::from(location_clone),
tag.span, tag.span,
match encoding { match encoding {
Some(e) => e.to_string(), Some(e) => e.to_string(),
_ => "".to_string() _ => "".to_string(),
} },
).await?; )
.await?;
match contents { match contents {
UntaggedValue::Primitive(Primitive::String(_)) => { UntaggedValue::Primitive(Primitive::String(_)) => {
@ -135,9 +138,7 @@ fn enter(raw_args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStre
if let Some(extension) = file_extension { if let Some(extension) = file_extension {
let command_name = format!("from {}", extension); let command_name = format!("from {}", extension);
if let Some(converter) = if let Some(converter) = registry.get_command(&command_name) {
registry.get_command(&command_name)
{
let new_args = RawCommandArgs { let new_args = RawCommandArgs {
host, host,
ctrl_c, ctrl_c,
@ -152,47 +153,47 @@ fn enter(raw_args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStre
is_last: false, is_last: false,
}, },
name_tag: tag.clone(), name_tag: tag.clone(),
scope: scope.clone() scope: scope.clone(),
}, },
}; };
let mut result = converter.run( let mut result = converter
new_args.with_input(vec![tagged_contents]), .run(new_args.with_input(vec![tagged_contents]), &registry)
&registry, .await;
).await;
let result_vec: Vec<Result<ReturnSuccess, ShellError>> = let result_vec: Vec<Result<ReturnSuccess, ShellError>> =
result.drain_vec().await; result.drain_vec().await;
for res in result_vec {
match res { Ok(futures::stream::iter(result_vec.into_iter().map(
Ok(ReturnSuccess::Value(Value { move |res| match res {
value, Ok(ReturnSuccess::Value(Value { value, .. })) => Ok(
.. ReturnSuccess::Action(CommandAction::EnterValueShell(Value {
})) => {
yield Ok(ReturnSuccess::Action(CommandAction::EnterValueShell(
Value {
value, value,
tag: contents_tag.clone(), tag: contents_tag.clone(),
}))); })),
} ),
x => yield x, x => x,
} },
))
.to_output_stream())
} else {
Ok(OutputStream::one(ReturnSuccess::action(
CommandAction::EnterValueShell(tagged_contents),
)))
} }
} else { } else {
yield Ok(ReturnSuccess::Action(CommandAction::EnterValueShell(tagged_contents))); Ok(OutputStream::one(ReturnSuccess::action(
} CommandAction::EnterValueShell(tagged_contents),
} else { )))
yield Ok(ReturnSuccess::Action(CommandAction::EnterValueShell(tagged_contents)));
} }
} }
_ => { _ => {
let tagged_contents = contents.into_value(contents_tag); let tagged_contents = contents.into_value(contents_tag);
yield Ok(ReturnSuccess::Action(CommandAction::EnterValueShell(tagged_contents))); Ok(OutputStream::one(ReturnSuccess::action(
CommandAction::EnterValueShell(tagged_contents),
)))
} }
} }
} }
};
Ok(stream.to_output_stream())
} }
#[cfg(test)] #[cfg(test)]

View file

@ -25,14 +25,10 @@ impl WholeStreamCommand for From {
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
let registry = registry.clone(); let registry = registry.clone();
let stream = async_stream! { Ok(OutputStream::one(ReturnSuccess::value(
yield Ok(ReturnSuccess::Value(
UntaggedValue::string(crate::commands::help::get_help(&From, &registry)) UntaggedValue::string(crate::commands::help::get_help(&From, &registry))
.into_value(Tag::unknown()), .into_value(Tag::unknown()),
)); )))
};
Ok(stream.to_output_stream())
} }
} }

View file

@ -339,6 +339,14 @@ impl Value {
} }
} }
/// View the Value as a Primitive value, if possible
pub fn is_primitive(&self) -> bool {
match &self.value {
UntaggedValue::Primitive(_) => true,
_ => false,
}
}
/// View the Value as unsigned 64-bit, if possible /// View the Value as unsigned 64-bit, if possible
pub fn as_u64(&self) -> Result<u64, ShellError> { pub fn as_u64(&self) -> Result<u64, ShellError> {
match &self.value { match &self.value {