Removing async_stream! from some commands (#1940)

* Removing async_stream! from some commands

* Revert row.rs code

* Simplify logic for first.rs and skip.rs
This commit is contained in:
Joseph T. Lyons 2020-06-06 03:42:06 -04:00 committed by GitHub
parent 2a8ea88413
commit ba6370621f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 286 additions and 330 deletions

View file

@ -2,7 +2,7 @@ use crate::commands::WholeStreamCommand;
use crate::context::CommandRegistry;
use crate::prelude::*;
use nu_errors::ShellError;
use nu_protocol::{ReturnSuccess, Signature, SyntaxShape, UntaggedValue};
use nu_protocol::{Signature, SyntaxShape, UntaggedValue};
use nu_source::Tagged;
pub struct First;
@ -35,7 +35,7 @@ impl WholeStreamCommand for First {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
first(args, registry)
first(args, registry).await
}
fn examples(&self) -> Vec<Example> {
@ -57,27 +57,16 @@ impl WholeStreamCommand for First {
}
}
fn first(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
async fn first(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let (FirstArgs { rows }, mut input) = args.process(&registry).await?;
let mut rows_desired = if let Some(quantity) = rows {
let (FirstArgs { rows }, input) = args.process(&registry).await?;
let rows_desired = if let Some(quantity) = rows {
*quantity
} else {
1
};
while let Some(input) = input.next().await {
if rows_desired > 0 {
yield ReturnSuccess::value(input);
rows_desired -= 1;
} else {
break;
}
}
};
Ok(stream.to_output_stream())
Ok(input.take(rows_desired).to_output_stream())
}
#[cfg(test)]

View file

@ -27,7 +27,7 @@ impl WholeStreamCommand for FromBSON {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
from_bson(args, registry)
from_bson(args, registry).await
}
fn examples(&self) -> Vec<Example> {
@ -208,9 +208,11 @@ pub fn from_bson_bytes_to_value(bytes: Vec<u8>, tag: impl Into<Tag>) -> Result<V
convert_bson_value_to_nu_value(&Bson::Array(docs), tag)
}
fn from_bson(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
async fn from_bson(
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let args = args.evaluate_once(&registry).await?;
let tag = args.name_tag();
let input = args.input;
@ -218,21 +220,16 @@ fn from_bson(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStre
let bytes = input.collect_binary(tag.clone()).await?;
match from_bson_bytes_to_value(bytes.item, tag.clone()) {
Ok(x) => yield ReturnSuccess::value(x),
Err(_) => {
yield Err(ShellError::labeled_error_with_secondary(
Ok(x) => Ok(OutputStream::one(ReturnSuccess::value(x))),
Err(_) => Err(ShellError::labeled_error_with_secondary(
"Could not parse as BSON",
"input cannot be parsed as BSON",
tag.clone(),
"value originates from here",
bytes.tag,
))
)),
}
}
};
Ok(stream.to_output_stream())
}
#[cfg(test)]
mod tests {

View file

@ -42,7 +42,7 @@ impl WholeStreamCommand for FromCSV {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
from_csv(args, registry)
from_csv(args, registry).await
}
fn examples(&self) -> Vec<Example> {
@ -66,11 +66,20 @@ impl WholeStreamCommand for FromCSV {
}
}
fn from_csv(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
async fn from_csv(
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let name = args.call_info.name_tag.clone();
let stream = async_stream! {
let (FromCSVArgs { headerless, separator }, mut input) = args.process(&registry).await?;
let (
FromCSVArgs {
headerless,
separator,
},
input,
) = args.process(&registry).await?;
let sep = match separator {
Some(Value {
value: UntaggedValue::Primitive(Primitive::String(s)),
@ -82,12 +91,11 @@ fn from_csv(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStrea
} else {
let vec_s: Vec<char> = s.chars().collect();
if vec_s.len() != 1 {
yield Err(ShellError::labeled_error(
return Err(ShellError::labeled_error(
"Expected a single separator char from --separator",
"requires a single character string input",
tag,
));
return;
};
vec_s[0]
}
@ -95,14 +103,7 @@ fn from_csv(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStrea
_ => ',',
};
let mut result = from_delimited_data(headerless, sep, "CSV", input, name)?;
while let Some(item) = result.next().await {
yield item;
}
};
Ok(stream.to_output_stream())
from_delimited_data(headerless, sep, "CSV", input, name).await
}
#[cfg(test)]

View file

@ -1,7 +1,7 @@
use crate::prelude::*;
use csv::{ErrorKind, ReaderBuilder};
use nu_errors::ShellError;
use nu_protocol::{ReturnSuccess, TaggedDictBuilder, UntaggedValue, Value};
use nu_protocol::{TaggedDictBuilder, UntaggedValue, Value};
fn from_delimited_string_to_value(
s: String,
@ -41,7 +41,7 @@ fn from_delimited_string_to_value(
Ok(UntaggedValue::Table(rows).into_value(&tag))
}
pub fn from_delimited_data(
pub async fn from_delimited_data(
headerless: bool,
sep: char,
format_name: &'static str,
@ -49,18 +49,15 @@ pub fn from_delimited_data(
name: Tag,
) -> Result<OutputStream, ShellError> {
let name_tag = name;
let stream = async_stream! {
let concat_string = input.collect_string(name_tag.clone()).await?;
match from_delimited_string_to_value(concat_string.item, headerless, sep, name_tag.clone()) {
Ok(x) => match x {
Value { value: UntaggedValue::Table(list), .. } => {
for l in list {
yield ReturnSuccess::value(l);
}
}
x => yield ReturnSuccess::value(x),
Value {
value: UntaggedValue::Table(list),
..
} => Ok(futures::stream::iter(list).to_output_stream()),
x => Ok(OutputStream::one(x)),
},
Err(err) => {
let line_one = match pretty_csv_error(err) {
@ -68,18 +65,16 @@ pub fn from_delimited_data(
None => format!("Could not parse as {}", format_name),
};
let line_two = format!("input cannot be parsed as {}", format_name);
yield Err(ShellError::labeled_error_with_secondary(
Err(ShellError::labeled_error_with_secondary(
line_one,
line_two,
name_tag.clone(),
"value originates from here",
concat_string.tag,
))
} ,
}
};
Ok(stream.to_output_stream())
}
}
fn pretty_csv_error(err: csv::Error) -> Option<String> {

View file

@ -1,7 +1,7 @@
use crate::commands::WholeStreamCommand;
use crate::prelude::*;
use nu_errors::ShellError;
use nu_protocol::{Primitive, ReturnSuccess, Signature, TaggedDictBuilder, UntaggedValue, Value};
use nu_protocol::{Primitive, Signature, TaggedDictBuilder, UntaggedValue, Value};
use std::collections::HashMap;
pub struct FromINI;
@ -25,7 +25,7 @@ impl WholeStreamCommand for FromINI {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
from_ini(args, registry)
from_ini(args, registry).await
}
}
@ -64,9 +64,11 @@ pub fn from_ini_string_to_value(
Ok(convert_ini_top_to_nu_value(&v, tag))
}
fn from_ini(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
async fn from_ini(
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let args = args.evaluate_once(&registry).await?;
let tag = args.name_tag();
let input = args.input;
@ -74,27 +76,21 @@ fn from_ini(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStrea
match from_ini_string_to_value(concat_string.item, tag.clone()) {
Ok(x) => match x {
Value { value: UntaggedValue::Table(list), .. } => {
for l in list {
yield ReturnSuccess::value(l);
}
}
x => yield ReturnSuccess::value(x),
Value {
value: UntaggedValue::Table(list),
..
} => Ok(futures::stream::iter(list).to_output_stream()),
x => Ok(OutputStream::one(x)),
},
Err(_) => {
yield Err(ShellError::labeled_error_with_secondary(
Err(_) => Err(ShellError::labeled_error_with_secondary(
"Could not parse as INI",
"input cannot be parsed as INI",
&tag,
"value originates from here",
concat_string.tag,
))
)),
}
}
};
Ok(stream.to_output_stream())
}
#[cfg(test)]
mod tests {

View file

@ -1,7 +1,7 @@
use crate::commands::WholeStreamCommand;
use crate::prelude::*;
use nu_errors::ShellError;
use nu_protocol::{Primitive, ReturnSuccess, Signature, TaggedDictBuilder, UntaggedValue, Value};
use nu_protocol::{Primitive, Signature, TaggedDictBuilder, UntaggedValue, Value};
use rusqlite::{types::ValueRef, Connection, Row, NO_PARAMS};
use std::io::Write;
use std::path::Path;
@ -27,7 +27,7 @@ impl WholeStreamCommand for FromSQLite {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
from_sqlite(args, registry)
from_sqlite(args, registry).await
}
}
@ -52,7 +52,7 @@ impl WholeStreamCommand for FromDB {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
from_sqlite(args, registry)
from_sqlite(args, registry).await
}
}
@ -65,6 +65,7 @@ pub fn convert_sqlite_file_to_nu_value(
let mut meta_out = Vec::new();
let mut meta_stmt = conn.prepare("select name from sqlite_master where type='table'")?;
let mut meta_rows = meta_stmt.query(NO_PARAMS)?;
while let Some(meta_row) = meta_rows.next()? {
let table_name: String = meta_row.get(0)?;
let mut meta_dict = TaggedDictBuilder::new(tag.clone());
@ -134,26 +135,29 @@ pub fn from_sqlite_bytes_to_value(
}
}
fn from_sqlite(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
async fn from_sqlite(
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let args = args.evaluate_once(&registry).await?;
let tag = args.name_tag();
let input = args.input;
let bytes = input.collect_binary(tag.clone()).await?;
match from_sqlite_bytes_to_value(bytes.item, tag.clone()) {
Ok(x) => match x {
Value { value: UntaggedValue::Table(list), .. } => {
for l in list {
yield ReturnSuccess::value(l);
}
}
_ => yield ReturnSuccess::value(x),
}
Value {
value: UntaggedValue::Table(list),
..
} => Ok(futures::stream::iter(list).to_output_stream()),
_ => Ok(OutputStream::one(x)),
},
Err(err) => {
println!("{:?}", err);
yield Err(ShellError::labeled_error_with_secondary(
Err(ShellError::labeled_error_with_secondary(
"Could not parse as SQLite",
"input cannot be parsed as SQLite",
&tag,
@ -162,9 +166,6 @@ fn from_sqlite(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputSt
))
}
}
};
Ok(stream.to_output_stream())
}
#[cfg(test)]

View file

@ -34,23 +34,19 @@ impl WholeStreamCommand for FromTSV {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
from_tsv(args, registry)
from_tsv(args, registry).await
}
}
fn from_tsv(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
async fn from_tsv(
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let name = args.call_info.name_tag.clone();
let stream = async_stream! {
let (FromTSVArgs { headerless }, mut input) = args.process(&registry).await?;
let mut result = from_delimited_data(headerless, '\t', "TSV", input, name)?;
let (FromTSVArgs { headerless }, input) = args.process(&registry).await?;
while let Some(output) = result.next().await {
yield output;
}
};
Ok(stream.to_output_stream())
from_delimited_data(headerless, '\t', "TSV", input, name).await
}
#[cfg(test)]

View file

@ -24,13 +24,15 @@ impl WholeStreamCommand for FromURL {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
from_url(args, registry)
from_url(args, registry).await
}
}
fn from_url(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
async fn from_url(
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let args = args.evaluate_once(&registry).await?;
let tag = args.name_tag();
let input = args.input;
@ -47,22 +49,17 @@ fn from_url(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStrea
row.insert_untagged(k, UntaggedValue::string(v));
}
yield ReturnSuccess::value(row.into_value());
Ok(OutputStream::one(ReturnSuccess::value(row.into_value())))
}
_ => {
yield Err(ShellError::labeled_error_with_secondary(
_ => Err(ShellError::labeled_error_with_secondary(
"String not compatible with url-encoding",
"input not url-encoded",
tag,
"value originates from here",
concat_string.tag,
));
)),
}
}
};
Ok(stream.to_output_stream())
}
#[cfg(test)]
mod tests {

View file

@ -28,13 +28,15 @@ impl WholeStreamCommand for FromVcf {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
from_vcf(args, registry)
from_vcf(args, registry).await
}
}
fn from_vcf(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
async fn from_vcf(
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let args = args.evaluate_once(&registry).await?;
let tag = args.name_tag();
let input = args.input;
@ -44,19 +46,24 @@ fn from_vcf(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStrea
let buf_reader = BufReader::new(input_bytes);
let parser = ical::VcardParser::new(buf_reader);
let mut values_vec_deque = VecDeque::new();
for contact in parser {
match contact {
Ok(c) => yield ReturnSuccess::value(contact_to_value(c, tag.clone())),
Err(_) => yield Err(ShellError::labeled_error(
Ok(c) => {
values_vec_deque.push_back(ReturnSuccess::value(contact_to_value(c, tag.clone())))
}
Err(_) => {
return Err(ShellError::labeled_error(
"Could not parse as .vcf",
"input cannot be parsed as .vcf",
tag.clone()
)),
tag.clone(),
))
}
}
}
};
Ok(stream.to_output_stream())
Ok(futures::stream::iter(values_vec_deque).to_output_stream())
}
fn contact_to_value(contact: VcardContact, tag: Tag) -> Value {

View file

@ -1,7 +1,7 @@
use crate::commands::WholeStreamCommand;
use crate::prelude::*;
use nu_errors::ShellError;
use nu_protocol::{Primitive, ReturnSuccess, Signature, TaggedDictBuilder, UntaggedValue, Value};
use nu_protocol::{Primitive, Signature, TaggedDictBuilder, UntaggedValue, Value};
pub struct FromYAML;
@ -24,7 +24,7 @@ impl WholeStreamCommand for FromYAML {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
from_yaml(args, registry)
from_yaml(args, registry).await
}
}
@ -49,7 +49,7 @@ impl WholeStreamCommand for FromYML {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
from_yaml(args, registry)
from_yaml(args, registry).await
}
}
@ -120,9 +120,11 @@ pub fn from_yaml_string_to_value(s: String, tag: impl Into<Tag>) -> Result<Value
Ok(convert_yaml_value_to_nu_value(&v, tag)?)
}
fn from_yaml(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
async fn from_yaml(
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let args = args.evaluate_once(&registry).await?;
let tag = args.name_tag();
let input = args.input;
@ -131,27 +133,21 @@ fn from_yaml(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStre
match from_yaml_string_to_value(concat_string.item, tag.clone()) {
Ok(x) => match x {
Value { value: UntaggedValue::Table(list), .. } => {
for l in list {
yield ReturnSuccess::value(l);
}
}
x => yield ReturnSuccess::value(x),
Value {
value: UntaggedValue::Table(list),
..
} => Ok(futures::stream::iter(list).to_output_stream()),
x => Ok(OutputStream::one(x)),
},
Err(_) => {
yield Err(ShellError::labeled_error_with_secondary(
Err(_) => Err(ShellError::labeled_error_with_secondary(
"Could not parse as YAML",
"input cannot be parsed as YAML",
&tag,
"value originates from here",
&concat_string.tag,
))
)),
}
}
};
Ok(stream.to_output_stream())
}
#[cfg(test)]
mod tests {

View file

@ -35,7 +35,7 @@ impl WholeStreamCommand for Last {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
last(args, registry)
last(args, registry).await
}
fn examples(&self) -> Vec<Example> {
@ -58,10 +58,9 @@ impl WholeStreamCommand for Last {
}
}
fn last(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
async fn last(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let (LastArgs { rows }, mut input) = args.process(&registry).await?;
let (LastArgs { rows }, input) = args.process(&registry).await?;
let v: Vec<_> = input.into_vec().await;
let rows_desired = if let Some(quantity) = rows {
@ -70,16 +69,19 @@ fn last(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, S
1
};
let count = (rows_desired as usize);
let mut values_vec_deque = VecDeque::new();
let count = rows_desired as usize;
if count < v.len() {
let k = v.len() - count;
for x in v[k..].iter() {
let y: Value = x.clone();
yield ReturnSuccess::value(y)
values_vec_deque.push_back(ReturnSuccess::value(x.clone()));
}
}
};
Ok(stream.to_output_stream())
Ok(futures::stream::iter(values_vec_deque).to_output_stream())
}
#[cfg(test)]

View file

@ -24,7 +24,7 @@ impl WholeStreamCommand for Pwd {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
pwd(args, registry)
pwd(args, registry).await
}
fn examples(&self) -> Vec<Example> {
@ -36,20 +36,15 @@ impl WholeStreamCommand for Pwd {
}
}
pub fn pwd(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
pub async fn pwd(
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let shell_manager = args.shell_manager.clone();
let args = args.evaluate_once(&registry).await?;
let mut out = shell_manager.pwd(args)?;
while let Some(l) = out.next().await {
yield l;
}
};
Ok(stream.to_output_stream())
shell_manager.pwd(args)
}
#[cfg(test)]

View file

@ -37,27 +37,26 @@ impl WholeStreamCommand for ReduceBy {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
reduce_by(args, registry)
reduce_by(args, registry).await
}
}
pub fn reduce_by(
pub async fn reduce_by(
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let name = args.call_info.name_tag.clone();
let stream = async_stream! {
let (ReduceByArgs { reduce_with }, mut input) = args.process(&registry).await?;
let values: Vec<Value> = input.collect().await;
if values.is_empty() {
yield Err(ShellError::labeled_error(
return Err(ShellError::labeled_error(
"Expected table from pipeline",
"requires a table input",
name
))
} else {
name,
));
}
let reduce_with = if let Some(reducer) = reduce_with {
Some(reducer.item().clone())
@ -66,14 +65,10 @@ pub fn reduce_by(
};
match reduce(&values[0], reduce_with, name) {
Ok(reduced) => yield ReturnSuccess::value(reduced),
Err(err) => yield Err(err)
Ok(reduced) => Ok(OutputStream::one(ReturnSuccess::value(reduced))),
Err(err) => Err(err),
}
}
};
Ok(stream.to_output_stream())
}
#[cfg(test)]
mod tests {

View file

@ -2,7 +2,7 @@ use crate::commands::WholeStreamCommand;
use crate::context::CommandRegistry;
use crate::prelude::*;
use nu_errors::ShellError;
use nu_protocol::{ReturnSuccess, Signature, SyntaxShape, UntaggedValue};
use nu_protocol::{Signature, SyntaxShape, UntaggedValue};
use nu_source::Tagged;
pub struct Skip;
@ -31,7 +31,7 @@ impl WholeStreamCommand for Skip {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
skip(args, registry)
skip(args, registry).await
}
fn examples(&self) -> Vec<Example> {
@ -46,27 +46,16 @@ impl WholeStreamCommand for Skip {
}
}
fn skip(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
async fn skip(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let (SkipArgs { rows }, mut input) = args.process(&registry).await?;
let mut rows_desired = if let Some(quantity) = rows {
let (SkipArgs { rows }, input) = args.process(&registry).await?;
let rows_desired = if let Some(quantity) = rows {
*quantity
} else {
1
};
while let Some(input) = input.next().await {
if rows_desired == 0 {
yield ReturnSuccess::value(input);
}
if rows_desired > 0{
rows_desired -= 1;
}
}
};
Ok(stream.to_output_stream())
Ok(input.skip(rows_desired).to_output_stream())
}
#[cfg(test)]