Another batch of removing async_stream (#1978)

This commit is contained in:
Jonathan Turner 2020-06-13 12:13:36 -07:00 committed by GitHub
parent bcfb084d4c
commit 40673e4599
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 554 additions and 568 deletions

60
TODO.md
View file

@ -1,60 +0,0 @@
This pattern is extremely repetitive and can be abstracted:
```rs
let args = args.evaluate_once(registry)?;
let tag = args.name_tag();
let input = args.input;
let stream = async_stream! {
let values: Vec<Value> = input.values.collect().await;
let mut concat_string = String::new();
let mut latest_tag: Option<Tag> = None;
for value in values {
latest_tag = Some(value_tag.clone());
let value_span = value.tag.span;
match &value.value {
UntaggedValue::Primitive(Primitive::String(s)) => {
concat_string.push_str(&s);
concat_string.push_str("\n");
}
_ => yield Err(ShellError::labeled_error_with_secondary(
"Expected a string from pipeline",
"requires string input",
name_span,
"value originates from here",
value_span,
)),
}
}
```
Mandatory and Optional in parse_command
trace_remaining?
select_fields and select_fields take unnecessary Tag
Value#value should be Value#untagged
Unify dictionary building, probably around a macro
sys plugin in own crate
textview in own crate
Combine atomic and atomic_parse in parser
at_end_possible_ws needs to be comment and separator sensitive
Eliminate unnecessary `nodes` parser
#[derive(HasSpan)]
Figure out a solution for the duplication in stuff like NumberShape vs. NumberExpressionShape
use `struct Expander` from signature.rs

View file

@ -9,7 +9,6 @@ use parking_lot::Mutex;
use prettytable::format::{FormatBuilder, LinePosition, LineSeparator};
use prettytable::{color, Attr, Cell, Row, Table};
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use textwrap::fill;
pub struct Autoview;
@ -115,23 +114,12 @@ pub async fn autoview(context: RunnableContext) -> Result<OutputStream, ShellErr
match input_stream.next().await {
Some(y) => {
let ctrl_c = context.ctrl_c.clone();
let stream = async_stream! {
yield Ok(x);
yield Ok(y);
let xy = vec![x, y];
let xy_stream = futures::stream::iter(xy)
.chain(input_stream)
.interruptible(ctrl_c);
loop {
match input_stream.next().await {
Some(z) => {
if ctrl_c.load(Ordering::SeqCst) {
break;
}
yield Ok(z);
}
_ => break,
}
}
};
let stream = stream.to_input_stream();
let stream = InputStream::from_stream(xy_stream);
if let Some(table) = table {
let command_args = create_default_command_args(&context).with_input(stream);

View file

@ -389,42 +389,44 @@ impl WholeStreamCommand for FnFilterCommand {
ctrl_c,
shell_manager,
call_info,
mut input,
input,
..
}: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let host: Arc<parking_lot::Mutex<dyn Host>> = host.clone();
let registry: CommandRegistry = registry.clone();
let registry = Arc::new(registry.clone());
let func = self.func;
let stream = async_stream! {
while let Some(it) = input.next().await {
Ok(input
.then(move |it| {
let host = host.clone();
let registry = registry.clone();
let call_info = match call_info.clone().evaluate_with_new_it(&registry, &it).await {
Err(err) => { yield Err(err); return; },
Ok(args) => args,
};
let args = EvaluatedFilterCommandArgs::new(
host.clone(),
ctrl_c.clone(),
shell_manager.clone(),
call_info,
);
match func(args) {
Err(err) => yield Err(err),
Ok(mut stream) => {
while let Some(value) = stream.values.next().await {
yield value;
let ctrl_c = ctrl_c.clone();
let shell_manager = shell_manager.clone();
let call_info = call_info.clone();
async move {
let call_info = match call_info.evaluate_with_new_it(&*registry, &it).await {
Err(err) => {
return OutputStream::one(Err(err));
}
Ok(args) => args,
};
let args = EvaluatedFilterCommandArgs::new(
host.clone(),
ctrl_c.clone(),
shell_manager.clone(),
call_info,
);
match func(args) {
Err(err) => return OutputStream::one(Err(err)),
Ok(stream) => stream,
}
}
}
};
Ok(stream.to_output_stream())
})
.flatten()
.to_output_stream())
}
}

View file

@ -32,7 +32,7 @@ impl WholeStreamCommand for Echo {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
echo(args, registry)
echo(args, registry).await
}
fn examples(&self) -> Vec<Example> {
@ -51,67 +51,62 @@ impl WholeStreamCommand for Echo {
}
}
fn echo(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
async fn echo(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let (args, _): (EchoArgs, _) = args.process(&registry).await?;
let (args, _): (EchoArgs, _) = args.process(&registry).await?;
for i in args.rest {
match i.as_string() {
Ok(s) => {
yield Ok(ReturnSuccess::Value(
UntaggedValue::string(s).into_value(i.tag.clone()),
));
}
_ => match i {
Value {
value: UntaggedValue::Table(table),
..
} => {
for value in table {
yield Ok(ReturnSuccess::Value(value.clone()));
}
}
Value {
value: UntaggedValue::Primitive(Primitive::Range(range)),
tag
} => {
let mut current = range.from.0.item;
while current != range.to.0.item {
yield Ok(ReturnSuccess::Value(UntaggedValue::Primitive(current.clone()).into_value(&tag)));
current = match crate::data::value::compute_values(Operator::Plus, &UntaggedValue::Primitive(current), &UntaggedValue::int(1)) {
Ok(result) => match result {
UntaggedValue::Primitive(p) => p,
_ => {
yield Err(ShellError::unimplemented("Internal error: expected a primitive result from increment"));
return;
}
},
Err((left_type, right_type)) => {
yield Err(ShellError::coerce_error(
left_type.spanned(tag.span),
right_type.spanned(tag.span),
));
return;
}
}
}
match range.to.1 {
RangeInclusion::Inclusive => {
yield Ok(ReturnSuccess::Value(UntaggedValue::Primitive(current.clone()).into_value(&tag)));
}
_ => {}
}
}
_ => {
yield Ok(ReturnSuccess::Value(i.clone()));
}
},
let stream = args.rest.into_iter().map(|i| {
match i.as_string() {
Ok(s) => {
OutputStream::one(Ok(ReturnSuccess::Value(
UntaggedValue::string(s).into_value(i.tag.clone()),
)))
}
}
};
_ => match i {
Value {
value: UntaggedValue::Table(table),
..
} => {
futures::stream::iter(table.into_iter().map(ReturnSuccess::value)).to_output_stream()
}
Value {
value: UntaggedValue::Primitive(Primitive::Range(range)),
tag
} => {
let mut output_vec = vec![];
Ok(stream.to_output_stream())
let mut current = range.from.0.item;
while current != range.to.0.item {
output_vec.push(Ok(ReturnSuccess::Value(UntaggedValue::Primitive(current.clone()).into_value(&tag))));
current = match crate::data::value::compute_values(Operator::Plus, &UntaggedValue::Primitive(current), &UntaggedValue::int(1)) {
Ok(result) => match result {
UntaggedValue::Primitive(p) => p,
_ => {
return OutputStream::one(Err(ShellError::unimplemented("Internal error: expected a primitive result from increment")));
}
},
Err((left_type, right_type)) => {
return OutputStream::one(Err(ShellError::coerce_error(
left_type.spanned(tag.span),
right_type.spanned(tag.span),
)));
}
}
}
if let RangeInclusion::Inclusive = range.to.1 {
output_vec.push(Ok(ReturnSuccess::Value(UntaggedValue::Primitive(current).into_value(&tag))));
}
futures::stream::iter(output_vec.into_iter()).to_output_stream()
}
_ => {
OutputStream::one(Ok(ReturnSuccess::Value(i.clone())))
}
},
}
});
Ok(futures::stream::iter(stream).flatten().to_output_stream())
}
#[cfg(test)]

View file

@ -36,62 +36,66 @@ impl WholeStreamCommand for FromXLSX {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
from_xlsx(args, registry)
from_xlsx(args, registry).await
}
}
fn from_xlsx(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
async fn from_xlsx(
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let tag = args.call_info.name_tag.clone();
let registry = registry.clone();
let stream = async_stream! {
let (FromXLSXArgs { headerless: _headerless }, mut input) = args.process(&registry).await?;
let value = input.collect_binary(tag.clone()).await?;
let (
FromXLSXArgs {
headerless: _headerless,
},
input,
) = args.process(&registry).await?;
let value = input.collect_binary(tag.clone()).await?;
let mut buf: Cursor<Vec<u8>> = Cursor::new(value.item);
let mut xls = Xlsx::<_>::new(buf).map_err(|_| {
ShellError::labeled_error("Could not load xlsx file", "could not load xlsx file", &tag)
})?;
let buf: Cursor<Vec<u8>> = Cursor::new(value.item);
let mut xls = Xlsx::<_>::new(buf).map_err(|_| {
ShellError::labeled_error("Could not load xlsx file", "could not load xlsx file", &tag)
})?;
let mut dict = TaggedDictBuilder::new(&tag);
let mut dict = TaggedDictBuilder::new(&tag);
let sheet_names = xls.sheet_names().to_owned();
let sheet_names = xls.sheet_names().to_owned();
for sheet_name in &sheet_names {
let mut sheet_output = TaggedListBuilder::new(&tag);
for sheet_name in &sheet_names {
let mut sheet_output = TaggedListBuilder::new(&tag);
if let Some(Ok(current_sheet)) = xls.worksheet_range(sheet_name) {
for row in current_sheet.rows() {
let mut row_output = TaggedDictBuilder::new(&tag);
for (i, cell) in row.iter().enumerate() {
let value = match cell {
DataType::Empty => UntaggedValue::nothing(),
DataType::String(s) => UntaggedValue::string(s),
DataType::Float(f) => UntaggedValue::decimal(*f),
DataType::Int(i) => UntaggedValue::int(*i),
DataType::Bool(b) => UntaggedValue::boolean(*b),
_ => UntaggedValue::nothing(),
};
if let Some(Ok(current_sheet)) = xls.worksheet_range(sheet_name) {
for row in current_sheet.rows() {
let mut row_output = TaggedDictBuilder::new(&tag);
for (i, cell) in row.iter().enumerate() {
let value = match cell {
DataType::Empty => UntaggedValue::nothing(),
DataType::String(s) => UntaggedValue::string(s),
DataType::Float(f) => UntaggedValue::decimal(*f),
DataType::Int(i) => UntaggedValue::int(*i),
DataType::Bool(b) => UntaggedValue::boolean(*b),
_ => UntaggedValue::nothing(),
};
row_output.insert_untagged(&format!("Column{}", i), value);
}
sheet_output.push_untagged(row_output.into_untagged_value());
row_output.insert_untagged(&format!("Column{}", i), value);
}
dict.insert_untagged(sheet_name, sheet_output.into_untagged_value());
} else {
yield Err(ShellError::labeled_error(
"Could not load sheet",
"could not load sheet",
&tag,
));
sheet_output.push_untagged(row_output.into_untagged_value());
}
dict.insert_untagged(sheet_name, sheet_output.into_untagged_value());
} else {
return Err(ShellError::labeled_error(
"Could not load sheet",
"could not load sheet",
&tag,
));
}
}
yield ReturnSuccess::value(dict.into_value());
};
Ok(stream.to_output_stream())
Ok(OutputStream::one(ReturnSuccess::value(dict.into_value())))
}
#[cfg(test)]

View file

@ -43,16 +43,27 @@ impl WholeStreamCommand for SubCommand {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
split_column(args, registry)
split_column(args, registry).await
}
}
fn split_column(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
async fn split_column(
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let name_span = args.call_info.name_tag.span;
let registry = registry.clone();
let stream = async_stream! {
let (SplitColumnArgs { separator, rest, collapse_empty }, mut input) = args.process(&registry).await?;
while let Some(v) = input.next().await {
let (
SplitColumnArgs {
separator,
rest,
collapse_empty,
},
input,
) = args.process(&registry).await?;
Ok(input
.map(move |v| {
if let Ok(s) = v.as_string() {
let splitter = separator.replace("\\n", "\n");
trace!("splitting with {:?}", splitter);
@ -79,7 +90,7 @@ fn split_column(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputS
dict.insert_untagged(v.clone(), Primitive::String(k.into()));
}
yield ReturnSuccess::value(dict.into_value());
ReturnSuccess::value(dict.into_value())
} else {
let mut dict = TaggedDictBuilder::new(&v.tag);
for (&k, v) in split_result.iter().zip(positional.iter()) {
@ -88,21 +99,19 @@ fn split_column(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputS
UntaggedValue::Primitive(Primitive::String(k.into())),
);
}
yield ReturnSuccess::value(dict.into_value());
ReturnSuccess::value(dict.into_value())
}
} else {
yield Err(ShellError::labeled_error_with_secondary(
Err(ShellError::labeled_error_with_secondary(
"Expected a string from pipeline",
"requires string input",
name_span,
"value originates from here",
v.tag.span,
));
))
}
}
};
Ok(stream.to_output_stream())
})
.to_output_stream())
}
#[cfg(test)]

View file

@ -47,7 +47,7 @@ impl WholeStreamCommand for SubCommand {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
operate(args, registry)
operate(args, registry).await
}
fn examples(&self) -> Vec<Example> {
@ -62,54 +62,53 @@ impl WholeStreamCommand for SubCommand {
#[derive(Clone)]
struct DatetimeFormat(String);
fn operate(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
async fn operate(
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let (Arguments { format, rest }, mut input) = args.process(&registry).await?;
let (Arguments { format, rest }, input) = args.process(&registry).await?;
let column_paths: Vec<_> = rest.iter().map(|x| x.clone()).collect();
let column_paths: Vec<_> = rest;
let options = if let Some(Tagged { item: fmt, tag }) = format {
DatetimeFormat(fmt)
} else {
DatetimeFormat(String::from("%d.%m.%Y %H:%M %P %z"))
};
let options = if let Some(Tagged { item: fmt, .. }) = format {
DatetimeFormat(fmt)
} else {
DatetimeFormat(String::from("%d.%m.%Y %H:%M %P %z"))
};
while let Some(v) = input.next().await {
Ok(input
.map(move |v| {
if column_paths.is_empty() {
match action(&v, &options, v.tag()) {
Ok(out) => yield ReturnSuccess::value(out),
Err(err) => {
yield Err(err);
return;
}
Ok(out) => ReturnSuccess::value(out),
Err(err) => Err(err),
}
} else {
let mut ret = v.clone();
let mut ret = v;
for path in &column_paths {
let options = options.clone();
let swapping = ret.swap_data_by_column_path(path, Box::new(move |old| action(old, &options, old.tag())));
let swapping = ret.swap_data_by_column_path(
path,
Box::new(move |old| action(old, &options, old.tag())),
);
match swapping {
Ok(new_value) => {
ret = new_value;
}
Err(err) => {
yield Err(err);
return;
return Err(err);
}
}
}
yield ReturnSuccess::value(ret);
ReturnSuccess::value(ret)
}
}
};
Ok(stream.to_output_stream())
})
.to_output_stream())
}
fn action(

View file

@ -57,36 +57,47 @@ impl WholeStreamCommand for TSortBy {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
t_sort_by(args, registry)
t_sort_by(args, registry).await
}
}
fn t_sort_by(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
async fn t_sort_by(
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let name = args.call_info.name_tag.clone();
let (TSortByArgs { show_columns, group_by, ..}, mut input) = args.process(&registry).await?;
let values: Vec<Value> = input.collect().await;
let name = args.call_info.name_tag.clone();
let (
TSortByArgs {
show_columns,
group_by,
..
},
mut input,
) = args.process(&registry).await?;
let values: Vec<Value> = input.collect().await;
let column_grouped_by_name = if let Some(grouped_by) = group_by {
Some(grouped_by.item().clone())
} else {
None
};
if show_columns {
for label in columns_sorted(column_grouped_by_name, &values[0], &name).into_iter() {
yield ReturnSuccess::value(UntaggedValue::string(label.item).into_value(label.tag));
}
} else {
match t_sort(column_grouped_by_name, None, &values[0], name) {
Ok(sorted) => yield ReturnSuccess::value(sorted),
Err(err) => yield Err(err)
}
}
let column_grouped_by_name = if let Some(grouped_by) = group_by {
Some(grouped_by.item().clone())
} else {
None
};
Ok(stream.to_output_stream())
if show_columns {
Ok(futures::stream::iter(
columns_sorted(column_grouped_by_name, &values[0], &name)
.into_iter()
.map(move |label| {
ReturnSuccess::value(UntaggedValue::string(label.item).into_value(label.tag))
}),
)
.to_output_stream())
} else {
match t_sort(column_grouped_by_name, None, &values[0], name) {
Ok(sorted) => Ok(OutputStream::one(ReturnSuccess::value(sorted))),
Err(err) => Ok(OutputStream::one(Err(err))),
}
}
}
#[cfg(test)]

View file

@ -2,7 +2,7 @@ use crate::commands::WholeStreamCommand;
use crate::format::TableView;
use crate::prelude::*;
use nu_errors::ShellError;
use nu_protocol::{Primitive, ReturnSuccess, Signature, SyntaxShape, UntaggedValue, Value};
use nu_protocol::{Primitive, Signature, SyntaxShape, UntaggedValue, Value};
use std::time::Instant;
const STREAM_PAGE_SIZE: usize = 1000;
@ -34,100 +34,97 @@ impl WholeStreamCommand for Table {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
table(args, registry)
table(args, registry).await
}
}
fn table(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
async fn table(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let mut args = args.evaluate_once(&registry).await?;
let mut finished = false;
let mut args = args.evaluate_once(&registry).await?;
let mut finished = false;
let host = args.host.clone();
let mut start_number = match args.get("start_number") {
Some(Value { value: UntaggedValue::Primitive(Primitive::Int(i)), .. }) => {
if let Some(num) = i.to_usize() {
num
} else {
yield Err(ShellError::labeled_error("Expected a row number", "expected a row number", &args.args.call_info.name_tag));
0
}
let host = args.host.clone();
let mut start_number = match args.get("start_number") {
Some(Value {
value: UntaggedValue::Primitive(Primitive::Int(i)),
..
}) => {
if let Some(num) = i.to_usize() {
num
} else {
return Err(ShellError::labeled_error(
"Expected a row number",
"expected a row number",
&args.args.call_info.name_tag,
));
}
_ => {
0
}
};
}
_ => 0,
};
let mut delay_slot = None;
let mut delay_slot = None;
while !finished {
let mut new_input: VecDeque<Value> = VecDeque::new();
while !finished {
let mut new_input: VecDeque<Value> = VecDeque::new();
let start_time = Instant::now();
for idx in 0..STREAM_PAGE_SIZE {
if let Some(val) = delay_slot {
new_input.push_back(val);
delay_slot = None;
} else {
match args.input.next().await {
Some(a) => {
if !new_input.is_empty() {
if let Some(descs) = new_input.get(0) {
let descs = descs.data_descriptors();
let compare = a.data_descriptors();
if descs != compare {
delay_slot = Some(a);
break;
} else {
new_input.push_back(a);
}
let start_time = Instant::now();
for idx in 0..STREAM_PAGE_SIZE {
if let Some(val) = delay_slot {
new_input.push_back(val);
delay_slot = None;
} else {
match args.input.next().await {
Some(a) => {
if !new_input.is_empty() {
if let Some(descs) = new_input.get(0) {
let descs = descs.data_descriptors();
let compare = a.data_descriptors();
if descs != compare {
delay_slot = Some(a);
break;
} else {
new_input.push_back(a);
}
} else {
new_input.push_back(a);
}
}
_ => {
finished = true;
break;
} else {
new_input.push_back(a);
}
}
_ => {
finished = true;
break;
}
}
// Check if we've gone over our buffering threshold
if (idx + 1) % STREAM_TIMEOUT_CHECK_INTERVAL == 0 {
let end_time = Instant::now();
// Check if we've gone over our buffering threshold
if (idx + 1) % STREAM_TIMEOUT_CHECK_INTERVAL == 0 {
let end_time = Instant::now();
// If we've been buffering over a second, go ahead and send out what we have so far
if (end_time - start_time).as_secs() >= 1 {
break;
}
// If we've been buffering over a second, go ahead and send out what we have so far
if (end_time - start_time).as_secs() >= 1 {
break;
}
}
}
}
let input: Vec<Value> = new_input.into();
let input: Vec<Value> = new_input.into();
if input.len() > 0 {
let mut host = host.lock();
let view = TableView::from_list(&input, start_number);
if !input.is_empty() {
let mut host = host.lock();
let view = TableView::from_list(&input, start_number);
if let Some(view) = view {
handle_unexpected(&mut *host, |host| crate::format::print_view(&view, host));
}
if let Some(view) = view {
handle_unexpected(&mut *host, |host| crate::format::print_view(&view, host));
}
start_number += input.len();
}
// Needed for async_stream to type check
if false {
yield ReturnSuccess::value(UntaggedValue::nothing().into_value(Tag::unknown()));
}
};
start_number += input.len();
}
Ok(OutputStream::new(stream))
Ok(OutputStream::empty())
}
#[cfg(test)]

View file

@ -29,7 +29,7 @@ impl WholeStreamCommand for ToBSON {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
to_bson(args, registry)
to_bson(args, registry).await
}
fn is_binary(&self) -> bool {
@ -261,51 +261,53 @@ fn bson_value_to_bytes(bson: Bson, tag: Tag) -> Result<Vec<u8>, ShellError> {
Ok(out)
}
fn to_bson(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
async fn to_bson(
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let args = args.evaluate_once(&registry).await?;
let name_tag = args.name_tag();
let name_span = name_tag.span;
let args = args.evaluate_once(&registry).await?;
let name_tag = args.name_tag();
let name_span = name_tag.span;
let input: Vec<Value> = args.input.collect().await;
let input: Vec<Value> = args.input.collect().await;
let to_process_input = if input.len() > 1 {
let to_process_input = match input.len() {
x if x > 1 => {
let tag = input[0].tag.clone();
vec![Value { value: UntaggedValue::Table(input), tag } ]
} else if input.len() == 1 {
input
} else {
vec![]
};
for value in to_process_input {
match value_to_bson_value(&value) {
Ok(bson_value) => {
let value_span = value.tag.span;
match bson_value_to_bytes(bson_value, name_tag.clone()) {
Ok(x) => yield ReturnSuccess::value(
UntaggedValue::binary(x).into_value(&name_tag),
),
_ => yield Err(ShellError::labeled_error_with_secondary(
"Expected a table with BSON-compatible structure from pipeline",
"requires BSON-compatible input",
name_span,
"originates from here".to_string(),
value_span,
)),
}
}
_ => yield Err(ShellError::labeled_error(
"Expected a table with BSON-compatible structure from pipeline",
"requires BSON-compatible input",
&name_tag))
}
vec![Value {
value: UntaggedValue::Table(input),
tag,
}]
}
1 => input,
_ => vec![],
};
Ok(stream.to_output_stream())
Ok(futures::stream::iter(to_process_input.into_iter().map(
move |value| match value_to_bson_value(&value) {
Ok(bson_value) => {
let value_span = value.tag.span;
match bson_value_to_bytes(bson_value, name_tag.clone()) {
Ok(x) => ReturnSuccess::value(UntaggedValue::binary(x).into_value(&name_tag)),
_ => Err(ShellError::labeled_error_with_secondary(
"Expected a table with BSON-compatible structure from pipeline",
"requires BSON-compatible input",
name_span,
"originates from here".to_string(),
value_span,
)),
}
}
_ => Err(ShellError::labeled_error(
"Expected a table with BSON-compatible structure from pipeline",
"requires BSON-compatible input",
&name_tag,
)),
},
))
.to_output_stream())
}
#[cfg(test)]

View file

@ -42,47 +42,44 @@ impl WholeStreamCommand for ToCSV {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
to_csv(args, registry)
to_csv(args, registry).await
}
}
fn to_csv(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
async fn to_csv(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let name = args.call_info.name_tag.clone();
let (ToCSVArgs { separator, headerless }, mut input) = args.process(&registry).await?;
let sep = match separator {
Some(Value {
value: UntaggedValue::Primitive(Primitive::String(s)),
tag,
..
}) => {
if s == r"\t" {
'\t'
} else {
let vec_s: Vec<char> = s.chars().collect();
if vec_s.len() != 1 {
yield Err(ShellError::labeled_error(
"Expected a single separator char from --separator",
"requires a single character string input",
tag,
));
return;
};
vec_s[0]
}
let name = args.call_info.name_tag.clone();
let (
ToCSVArgs {
separator,
headerless,
},
input,
) = args.process(&registry).await?;
let sep = match separator {
Some(Value {
value: UntaggedValue::Primitive(Primitive::String(s)),
tag,
..
}) => {
if s == r"\t" {
'\t'
} else {
let vec_s: Vec<char> = s.chars().collect();
if vec_s.len() != 1 {
return Err(ShellError::labeled_error(
"Expected a single separator char from --separator",
"requires a single character string input",
tag,
));
};
vec_s[0]
}
_ => ',',
};
let mut result = to_delimited_data(headerless, sep, "CSV", input, name)?;
while let Some(item) = result.next().await {
yield item;
}
_ => ',',
};
Ok(stream.to_output_stream())
to_delimited_data(headerless, sep, "CSV", input, name).await
}
#[cfg(test)]

View file

@ -165,7 +165,7 @@ fn merge_descriptors(values: &[Value]) -> Vec<Spanned<String>> {
ret
}
pub fn to_delimited_data(
pub async fn to_delimited_data(
headerless: bool,
sep: char,
format_name: &'static str,
@ -175,33 +175,41 @@ pub fn to_delimited_data(
let name_tag = name;
let name_span = name_tag.span;
let stream = async_stream! {
let input: Vec<Value> = input.collect().await;
let input: Vec<Value> = input.collect().await;
let to_process_input = if input.len() > 1 {
let to_process_input = match input.len() {
x if x > 1 => {
let tag = input[0].tag.clone();
vec![Value { value: UntaggedValue::Table(input), tag } ]
} else if input.len() == 1 {
input
} else {
vec![]
};
vec![Value {
value: UntaggedValue::Table(input),
tag,
}]
}
1 => input,
_ => vec![],
};
for value in to_process_input {
Ok(
futures::stream::iter(to_process_input.into_iter().map(move |value| {
match from_value_to_delimited_string(&clone_tagged_value(&value), sep) {
Ok(mut x) => {
if headerless {
x.find('\n').map(|second_line|{
if let Some(second_line) = x.find('\n') {
let start = second_line + 1;
x.replace_range(0..start, "");
});
}
}
yield ReturnSuccess::value(UntaggedValue::Primitive(Primitive::String(x)).into_value(&name_tag))
ReturnSuccess::value(
UntaggedValue::Primitive(Primitive::String(x)).into_value(&name_tag),
)
}
Err(x) => {
let expected = format!("Expected a table with {}-compatible structure from pipeline", format_name);
Err(_) => {
let expected = format!(
"Expected a table with {}-compatible structure from pipeline",
format_name
);
let requires = format!("requires {}-compatible input", format_name);
yield Err(ShellError::labeled_error_with_secondary(
Err(ShellError::labeled_error_with_secondary(
expected,
requires,
name_span,
@ -210,8 +218,7 @@ pub fn to_delimited_data(
))
}
}
}
};
Ok(stream.to_output_stream())
}))
.to_output_stream(),
)
}

View file

@ -38,7 +38,7 @@ impl WholeStreamCommand for ToJSON {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
to_json(args, registry)
to_json(args, registry).await
}
fn examples(&self) -> Vec<Example> {
@ -163,78 +163,103 @@ fn json_list(input: &[Value]) -> Result<Vec<serde_json::Value>, ShellError> {
Ok(out)
}
fn to_json(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
async fn to_json(
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let name_tag = args.call_info.name_tag.clone();
let (ToJSONArgs { pretty }, mut input) = args.process(&registry).await?;
let name_span = name_tag.span;
let input: Vec<Value> = input.collect().await;
let name_tag = args.call_info.name_tag.clone();
let (ToJSONArgs { pretty }, input) = args.process(&registry).await?;
let name_span = name_tag.span;
let input: Vec<Value> = input.collect().await;
let to_process_input = if input.len() > 1 {
let to_process_input = match input.len() {
x if x > 1 => {
let tag = input[0].tag.clone();
vec![Value { value: UntaggedValue::Table(input), tag } ]
} else if input.len() == 1 {
input
} else {
vec![]
};
vec![Value {
value: UntaggedValue::Table(input),
tag,
}]
}
1 => input,
_ => vec![],
};
for value in to_process_input {
match value_to_json_value(&value) {
Ok(json_value) => {
let value_span = value.tag.span;
Ok(futures::stream::iter(to_process_input.into_iter().map(
move |value| match value_to_json_value(&value) {
Ok(json_value) => {
let value_span = value.tag.span;
match serde_json::to_string(&json_value) {
Ok(mut serde_json_string) => {
if let Some(pretty_value) = &pretty {
let mut pretty_format_failed = true;
match serde_json::to_string(&json_value) {
Ok(mut serde_json_string) => {
if let Some(pretty_value) = &pretty {
let mut pretty_format_failed = true;
if let Ok(pretty_u64) = pretty_value.as_u64() {
if let Ok(serde_json_value) = serde_json::from_str::<serde_json::Value>(serde_json_string.as_str()) {
let indentation_string = std::iter::repeat(" ").take(pretty_u64 as usize).collect::<String>();
let serde_formatter = serde_json::ser::PrettyFormatter::with_indent(indentation_string.as_bytes());
let serde_buffer = Vec::new();
let mut serde_serializer = serde_json::Serializer::with_formatter(serde_buffer, serde_formatter);
let serde_json_object = json!(serde_json_value);
if let Ok(pretty_u64) = pretty_value.as_u64() {
if let Ok(serde_json_value) =
serde_json::from_str::<serde_json::Value>(
serde_json_string.as_str(),
)
{
let indentation_string = std::iter::repeat(" ")
.take(pretty_u64 as usize)
.collect::<String>();
let serde_formatter =
serde_json::ser::PrettyFormatter::with_indent(
indentation_string.as_bytes(),
);
let serde_buffer = Vec::new();
let mut serde_serializer =
serde_json::Serializer::with_formatter(
serde_buffer,
serde_formatter,
);
let serde_json_object = json!(serde_json_value);
if let Ok(()) = serde_json_object.serialize(&mut serde_serializer) {
if let Ok(ser_json_string) = String::from_utf8(serde_serializer.into_inner()) {
pretty_format_failed = false;
serde_json_string = ser_json_string
}
if let Ok(()) =
serde_json_object.serialize(&mut serde_serializer)
{
if let Ok(ser_json_string) =
String::from_utf8(serde_serializer.into_inner())
{
pretty_format_failed = false;
serde_json_string = ser_json_string
}
}
}
if pretty_format_failed {
yield Err(ShellError::labeled_error("Pretty formatting failed", "failed", pretty_value.tag()));
return;
}
}
yield ReturnSuccess::value(
UntaggedValue::Primitive(Primitive::String(serde_json_string)).into_value(&name_tag),
)
},
_ => yield Err(ShellError::labeled_error_with_secondary(
"Expected a table with JSON-compatible structure.tag() from pipeline",
"requires JSON-compatible input",
name_span,
"originates from here".to_string(),
value_span,
)),
}
}
_ => yield Err(ShellError::labeled_error(
"Expected a table with JSON-compatible structure from pipeline",
"requires JSON-compatible input",
&name_tag))
}
}
};
if pretty_format_failed {
return Err(ShellError::labeled_error(
"Pretty formatting failed",
"failed",
pretty_value.tag(),
));
}
}
Ok(stream.to_output_stream())
ReturnSuccess::value(
UntaggedValue::Primitive(Primitive::String(serde_json_string))
.into_value(&name_tag),
)
}
_ => Err(ShellError::labeled_error_with_secondary(
"Expected a table with JSON-compatible structure.tag() from pipeline",
"requires JSON-compatible input",
name_span,
"originates from here".to_string(),
value_span,
)),
}
}
_ => Err(ShellError::labeled_error(
"Expected a table with JSON-compatible structure from pipeline",
"requires JSON-compatible input",
&name_tag,
)),
},
))
.to_output_stream())
}
#[cfg(test)]

View file

@ -24,7 +24,7 @@ impl WholeStreamCommand for ToTOML {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
to_toml(args, registry)
to_toml(args, registry).await
}
// TODO: add an example here. What commands to run to get a Row(Dictionary)?
// fn examples(&self) -> Vec<Example> {
@ -135,49 +135,53 @@ fn collect_values(input: &[Value]) -> Result<Vec<toml::Value>, ShellError> {
Ok(out)
}
fn to_toml(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
async fn to_toml(
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let args = args.evaluate_once(&registry).await?;
let name_tag = args.name_tag();
let name_span = name_tag.span;
let input: Vec<Value> = args.input.collect().await;
let args = args.evaluate_once(&registry).await?;
let name_tag = args.name_tag();
let name_span = name_tag.span;
let input: Vec<Value> = args.input.collect().await;
let to_process_input = if input.len() > 1 {
let to_process_input = match input.len() {
x if x > 1 => {
let tag = input[0].tag.clone();
vec![Value { value: UntaggedValue::Table(input), tag } ]
} else if input.len() == 1 {
input
} else {
vec![]
};
for value in to_process_input {
let value_span = value.tag.span;
match value_to_toml_value(&value) {
Ok(toml_value) => {
match toml::to_string(&toml_value) {
Ok(x) => yield ReturnSuccess::value(
UntaggedValue::Primitive(Primitive::String(x)).into_value(&name_tag),
),
_ => yield Err(ShellError::labeled_error_with_secondary(
"Expected a table with TOML-compatible structure.tag() from pipeline",
"requires TOML-compatible input",
name_span,
"originates from here".to_string(),
value_span,
)),
}
}
_ => yield Err(ShellError::labeled_error(
"Expected a table with TOML-compatible structure from pipeline",
"requires TOML-compatible input",
&name_tag))
}
vec![Value {
value: UntaggedValue::Table(input),
tag,
}]
}
1 => input,
_ => vec![],
};
Ok(stream.to_output_stream())
Ok(
futures::stream::iter(to_process_input.into_iter().map(move |value| {
let value_span = value.tag.span;
match value_to_toml_value(&value) {
Ok(toml_value) => match toml::to_string(&toml_value) {
Ok(x) => ReturnSuccess::value(
UntaggedValue::Primitive(Primitive::String(x)).into_value(&name_tag),
),
_ => Err(ShellError::labeled_error_with_secondary(
"Expected a table with TOML-compatible structure.tag() from pipeline",
"requires TOML-compatible input",
name_span,
"originates from here".to_string(),
value_span,
)),
},
_ => Err(ShellError::labeled_error(
"Expected a table with TOML-compatible structure from pipeline",
"requires TOML-compatible input",
&name_tag,
)),
}
}))
.to_output_stream(),
)
}
#[cfg(test)]

View file

@ -43,7 +43,7 @@ async fn to_tsv(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputS
let name = args.call_info.name_tag.clone();
let (ToTSVArgs { headerless }, input) = args.process(&registry).await?;
to_delimited_data(headerless, '\t', "TSV", input, name)
to_delimited_data(headerless, '\t', "TSV", input, name).await
}
#[cfg(test)]

View file

@ -35,7 +35,7 @@ impl WholeStreamCommand for Where {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
where_command(args, registry)
where_command(args, registry).await
}
fn examples(&self) -> Vec<Example> {
@ -63,65 +63,71 @@ impl WholeStreamCommand for Where {
]
}
}
fn where_command(
async fn where_command(
raw_args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let scope = raw_args.call_info.scope.clone();
let registry = Arc::new(registry.clone());
let scope = Arc::new(raw_args.call_info.scope.clone());
let tag = raw_args.call_info.name_tag.clone();
let stream = async_stream! {
let (WhereArgs { block }, mut input) = raw_args.process(&registry).await?;
let condition = {
if block.block.len() != 1 {
yield Err(ShellError::labeled_error(
"Expected a condition",
"expected a condition",
tag,
));
return;
}
match block.block[0].list.get(0) {
Some(item) => match item {
ClassifiedCommand::Expr(expr) => expr.clone(),
_ => {
yield Err(ShellError::labeled_error(
"Expected a condition",
"expected a condition",
tag,
));
return;
}
},
None => {
yield Err(ShellError::labeled_error(
let (WhereArgs { block }, input) = raw_args.process(&registry).await?;
let condition = {
if block.block.len() != 1 {
return Err(ShellError::labeled_error(
"Expected a condition",
"expected a condition",
tag,
));
}
match block.block[0].list.get(0) {
Some(item) => match item {
ClassifiedCommand::Expr(expr) => expr.clone(),
_ => {
return Err(ShellError::labeled_error(
"Expected a condition",
"expected a condition",
tag,
));
return;
}
},
None => {
return Err(ShellError::labeled_error(
"Expected a condition",
"expected a condition",
tag,
));
}
};
let mut input = input;
while let Some(input) = input.next().await {
//FIXME: should we use the scope that's brought in as well?
let condition = evaluate_baseline_expr(&condition, &registry, &input, &scope.vars, &scope.env).await?;
match condition.as_bool() {
Ok(b) => {
if b {
yield Ok(ReturnSuccess::Value(input));
}
}
Err(e) => yield Err(e),
};
}
};
Ok(stream.to_output_stream())
Ok(input
.filter_map(move |input| {
let condition = condition.clone();
let registry = registry.clone();
let scope = scope.clone();
async move {
//FIXME: should we use the scope that's brought in as well?
let condition =
evaluate_baseline_expr(&condition, &*registry, &input, &scope.vars, &scope.env)
.await;
match condition {
Ok(condition) => match condition.as_bool() {
Ok(b) => {
if b {
Some(Ok(ReturnSuccess::Value(input)))
} else {
None
}
}
Err(e) => Some(Err(e)),
},
Err(e) => Some(Err(e)),
}
}
})
.to_output_stream())
}
#[cfg(test)]