Allow the table command to stream (#1278)

This commit is contained in:
Jonathan Turner 2020-01-25 16:13:12 +13:00 committed by GitHub
parent a5e1372bc2
commit cdbfdf282f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 131 additions and 150 deletions

View file

@ -1,15 +1,11 @@
use crate::commands::{RawCommandArgs, WholeStreamCommand};
use crate::prelude::*;
use futures::stream::TryStreamExt;
use nu_errors::ShellError;
use nu_parser::hir::{Expression, NamedArguments};
use nu_protocol::{Primitive, ReturnSuccess, Signature, UntaggedValue, Value};
use std::sync::atomic::Ordering;
pub struct Autoview;
const STREAM_PAGE_SIZE: u64 = 50;
#[derive(Deserialize)]
pub struct AutoviewArgs {}
@ -45,22 +41,23 @@ pub fn autoview(
let table = context.get_command("table");
Ok(OutputStream::new(async_stream! {
let mut output_stream: OutputStream = context.input.into();
//let mut output_stream: OutputStream = context.input.into();
//let next = output_stream.try_next().await;
let next = output_stream.try_next().await;
let mut input_stream = context.input;
match next {
Ok(Some(x)) => {
match output_stream.try_next().await {
Ok(Some(y)) => {
match input_stream.next().await {
Some(x) => {
match input_stream.next().await {
Some(y) => {
let ctrl_c = context.ctrl_c.clone();
let stream = async_stream! {
yield Ok(x);
yield Ok(y);
loop {
match output_stream.try_next().await {
Ok(Some(z)) => {
match input_stream.next().await {
Some(z) => {
if ctrl_c.load(Ordering::SeqCst) {
break;
}
@ -70,142 +67,95 @@ pub fn autoview(
}
}
};
let stream = stream.to_input_stream();
if let Some(table) = table {
let mut new_output_stream: OutputStream = stream.to_output_stream();
let mut finished = false;
let mut current_idx = 0;
loop {
let mut new_input = VecDeque::new();
for _ in 0..STREAM_PAGE_SIZE {
match new_output_stream.try_next().await {
Ok(Some(a)) => {
if let ReturnSuccess::Value(v) = a {
new_input.push_back(v);
}
}
_ => {
finished = true;
break;
}
}
}
let raw = raw.clone();
let input: Vec<Value> = new_input.into();
if input.len() > 0 && input.iter().all(|value| value.value.is_error()) {
let first = &input[0];
let mut host = context.host.clone();
let host = host.lock();
crate::cli::print_err(first.value.expect_error(), &*host, &context.source);
return;
}
let mut command_args = raw.with_input(input);
let mut named_args = NamedArguments::new();
named_args.insert_optional("start_number", Some(Expression::number(current_idx).into_expr(Span::unknown())));
command_args.call_info.args.named = Some(named_args);
let result = table.run(command_args, &context.commands);
result.collect::<Vec<_>>().await;
if finished {
break;
} else {
current_idx += STREAM_PAGE_SIZE;
}
}
let mut command_args = raw.with_input(stream);
let result = table.run(command_args, &context.commands);
result.collect::<Vec<_>>().await;
}
}
_ => {
if let ReturnSuccess::Value(x) = x {
match x {
Value {
value: UntaggedValue::Primitive(Primitive::String(ref s)),
tag: Tag { anchor, span },
} if anchor.is_some() => {
if let Some(text) = text {
let mut stream = VecDeque::new();
stream.push_back(UntaggedValue::string(s).into_value(Tag { anchor, span }));
let result = text.run(raw.with_input(stream.into()), &context.commands);
result.collect::<Vec<_>>().await;
} else {
outln!("{}", s);
}
}
Value {
value: UntaggedValue::Primitive(Primitive::String(s)),
..
} => {
match x {
Value {
value: UntaggedValue::Primitive(Primitive::String(ref s)),
tag: Tag { anchor, span },
} if anchor.is_some() => {
if let Some(text) = text {
let mut stream = VecDeque::new();
stream.push_back(UntaggedValue::string(s).into_value(Tag { anchor, span }));
let result = text.run(raw.with_input(stream), &context.commands);
result.collect::<Vec<_>>().await;
} else {
outln!("{}", s);
}
Value {
value: UntaggedValue::Primitive(Primitive::Line(ref s)),
tag: Tag { anchor, span },
} if anchor.is_some() => {
if let Some(text) = text {
let mut stream = VecDeque::new();
stream.push_back(UntaggedValue::string(s).into_value(Tag { anchor, span }));
let result = text.run(raw.with_input(stream.into()), &context.commands);
result.collect::<Vec<_>>().await;
} else {
outln!("{}\n", s);
}
}
Value {
value: UntaggedValue::Primitive(Primitive::Line(s)),
..
} => {
}
Value {
value: UntaggedValue::Primitive(Primitive::String(s)),
..
} => {
outln!("{}", s);
}
Value {
value: UntaggedValue::Primitive(Primitive::Line(ref s)),
tag: Tag { anchor, span },
} if anchor.is_some() => {
if let Some(text) = text {
let mut stream = VecDeque::new();
stream.push_back(UntaggedValue::string(s).into_value(Tag { anchor, span }));
let result = text.run(raw.with_input(stream), &context.commands);
result.collect::<Vec<_>>().await;
} else {
outln!("{}\n", s);
}
Value {
value: UntaggedValue::Primitive(Primitive::Path(s)),
..
} => {
outln!("{}", s.display());
}
Value {
value: UntaggedValue::Primitive(Primitive::Int(n)),
..
} => {
outln!("{}", n);
}
Value {
value: UntaggedValue::Primitive(Primitive::Decimal(n)),
..
} => {
outln!("{}", n);
}
}
Value {
value: UntaggedValue::Primitive(Primitive::Line(s)),
..
} => {
outln!("{}\n", s);
}
Value {
value: UntaggedValue::Primitive(Primitive::Path(s)),
..
} => {
outln!("{}", s.display());
}
Value {
value: UntaggedValue::Primitive(Primitive::Int(n)),
..
} => {
outln!("{}", n);
}
Value {
value: UntaggedValue::Primitive(Primitive::Decimal(n)),
..
} => {
outln!("{}", n);
}
Value { value: UntaggedValue::Primitive(Primitive::Binary(ref b)), .. } => {
if let Some(binary) = binary {
let mut stream = VecDeque::new();
stream.push_back(x);
let result = binary.run(raw.with_input(stream.into()), &context.commands);
result.collect::<Vec<_>>().await;
} else {
use pretty_hex::*;
outln!("{:?}", b.hex_dump());
}
Value { value: UntaggedValue::Primitive(Primitive::Binary(ref b)), .. } => {
if let Some(binary) = binary {
let mut stream = VecDeque::new();
stream.push_back(x);
let result = binary.run(raw.with_input(stream), &context.commands);
result.collect::<Vec<_>>().await;
} else {
use pretty_hex::*;
outln!("{:?}", b.hex_dump());
}
}
Value { value: UntaggedValue::Error(e), .. } => {
yield Err(e);
}
Value { value: ref item, .. } => {
if let Some(table) = table {
let mut stream = VecDeque::new();
stream.push_back(x);
let result = table.run(raw.with_input(stream.into()), &context.commands);
result.collect::<Vec<_>>().await;
} else {
outln!("{:?}", item);
}
Value { value: UntaggedValue::Error(e), .. } => {
yield Err(e);
}
Value { value: ref item, .. } => {
if let Some(table) = table {
let mut stream = VecDeque::new();
stream.push_back(x);
let result = table.run(raw.with_input(stream), &context.commands);
result.collect::<Vec<_>>().await;
} else {
outln!("{:?}", item);
}
}
}

View file

@ -88,7 +88,7 @@ pub struct RawCommandArgs {
}
impl RawCommandArgs {
pub fn with_input(self, input: Vec<Value>) -> CommandArgs {
pub fn with_input(self, input: impl Into<InputStream>) -> CommandArgs {
CommandArgs {
host: self.host,
ctrl_c: self.ctrl_c,

View file

@ -106,10 +106,14 @@ fn from_json(
match from_json_string_to_value(json_str.to_string(), &name_tag) {
Ok(x) =>
yield ReturnSuccess::value(x),
Err(_) => {
Err(e) => {
if let Some(ref last_tag) = latest_tag {
let mut message = "Could not parse as JSON (".to_string();
message.push_str(&e.to_string());
message.push_str(")");
yield Err(ShellError::labeled_error_with_secondary(
"Could nnot parse as JSON",
message,
"input cannot be parsed as JSON",
&name_tag,
"value originates from here",
@ -129,10 +133,14 @@ fn from_json(
}
x => yield ReturnSuccess::value(x),
}
Err(_) => {
Err(e) => {
if let Some(last_tag) = latest_tag {
let mut message = "Could not parse as JSON (".to_string();
message.push_str(&e.to_string());
message.push_str(")");
yield Err(ShellError::labeled_error_with_secondary(
"Could not parse as JSON",
message,
"input cannot be parsed as JSON",
name_tag,
"value originates from here",

View file

@ -4,6 +4,8 @@ use crate::prelude::*;
use nu_errors::ShellError;
use nu_protocol::{Primitive, ReturnSuccess, Signature, SyntaxShape, UntaggedValue, Value};
const STREAM_PAGE_SIZE: usize = 100;
pub struct Table;
impl WholeStreamCommand for Table {
@ -33,11 +35,12 @@ impl WholeStreamCommand for Table {
}
fn table(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let args = args.evaluate_once(registry)?;
let mut args = args.evaluate_once(registry)?;
let mut finished = false;
let stream = async_stream! {
let host = args.host.clone();
let start_number = match args.get("start_number") {
let mut start_number = match args.get("start_number") {
Some(Value { value: UntaggedValue::Primitive(Primitive::Int(i)), .. }) => {
if let Some(num) = i.to_usize() {
num
@ -51,15 +54,35 @@ fn table(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream,
}
};
let input: Vec<Value> = args.input.into_vec().await;
if input.len() > 0 {
let mut host = host.lock();
let view = TableView::from_list(&input, start_number);
while !finished {
let mut new_input = VecDeque::new();
if let Some(view) = view {
handle_unexpected(&mut *host, |host| crate::format::print_view(&view, host));
for _ in 0..STREAM_PAGE_SIZE {
match args.input.next().await {
Some(a) => {
new_input.push_back(a);
}
_ => {
finished = true;
break;
}
}
}
let input: Vec<Value> = new_input.into();
if input.len() > 0 {
let mut host = host.lock();
let view = TableView::from_list(&input, start_number);
if let Some(view) = view {
handle_unexpected(&mut *host, |host| crate::format::print_view(&view, host));
}
}
start_number += STREAM_PAGE_SIZE;
}
// Needed for async_stream to type check
if false {
yield ReturnSuccess::value(UntaggedValue::nothing().into_value(Tag::unknown()));