Add table streaming (#413)

This commit is contained in:
JT 2021-12-03 19:15:23 +13:00 committed by GitHub
parent 3d8394a909
commit 574d7f6936
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 161 additions and 25 deletions

View file

@ -226,6 +226,7 @@ impl ExternalCommand {
let mut process = std::process::Command::new(&self.name.item);
for arg in &self.args {
let arg = trim_enclosing_quotes(arg);
process.arg(&arg);
}
@ -275,6 +276,16 @@ fn shell_arg_escape(arg: &str) -> String {
}
}
fn trim_enclosing_quotes(input: &str) -> String {
let mut chars = input.chars();
match (chars.next(), chars.next_back()) {
(Some('"'), Some('"')) => chars.collect(),
(Some('\''), Some('\'')) => chars.collect(),
_ => input.to_string(),
}
}
// The piped data from stdout from the external command can be either String
// or binary. We use this enum to pass the data from the spawned process
#[derive(Debug)]

View file

@ -4,14 +4,18 @@ use lscolors::{LsColors, Style};
use nu_protocol::ast::{Call, PathMember};
use nu_protocol::engine::{Command, EngineState, Stack};
use nu_protocol::{
Category, Config, DataSource, IntoPipelineData, PipelineData, PipelineMetadata, ShellError,
Signature, Span, Value, ValueStream,
Category, Config, DataSource, IntoInterruptiblePipelineData, IntoPipelineData, PipelineData,
PipelineMetadata, ShellError, Signature, Span, Value, ValueStream,
};
use nu_table::{StyledString, TextStyle, Theme};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Instant;
use terminal_size::{Height, Width};
const STREAM_PAGE_SIZE: usize = 1000;
const STREAM_TIMEOUT_CHECK_INTERVAL: usize = 100;
#[derive(Clone)]
pub struct Table;
@ -48,7 +52,7 @@ impl Command for Table {
match input {
PipelineData::Value(Value::List { vals, .. }, ..) => {
let table = convert_to_table(vals, ctrlc, &config)?;
let table = convert_to_table(0, vals, ctrlc, &config)?;
if let Some(table) = table {
let result = nu_table::draw_table(&table, term_width, &color_hm, &config);
@ -140,19 +144,30 @@ impl Command for Table {
_ => stream,
};
let table = convert_to_table(stream, ctrlc, &config)?;
let head = call.head;
if let Some(table) = table {
let result = nu_table::draw_table(&table, term_width, &color_hm, &config);
Ok(Value::String {
val: result,
span: call.head,
}
.into_pipeline_data())
} else {
Ok(PipelineData::new(call.head))
Ok(PagingTableCreator {
row_offset: 0,
config,
ctrlc: ctrlc.clone(),
head,
stream,
}
.into_pipeline_data(ctrlc))
// let table = convert_to_table(stream, ctrlc, &config)?;
// if let Some(table) = table {
// let result = nu_table::draw_table(&table, term_width, &color_hm, &config);
// Ok(Value::String {
// val: result,
// span: call.head,
// }
// .into_pipeline_data())
// } else {
// Ok(PipelineData::new(call.head))
// }
}
PipelineData::Value(Value::Record { cols, vals, .. }, ..) => {
let mut output = vec![];
@ -195,6 +210,7 @@ impl Command for Table {
}
fn convert_to_table(
row_offset: usize,
iter: impl IntoIterator<Item = Value>,
ctrlc: Option<Arc<AtomicBool>>,
config: &Config,
@ -221,7 +237,8 @@ fn convert_to_table(
return Err(error);
}
// String1 = datatype, String2 = value as string
let mut row: Vec<(String, String)> = vec![("string".to_string(), row_num.to_string())];
let mut row: Vec<(String, String)> =
vec![("string".to_string(), (row_num + row_offset).to_string())];
if headers.is_empty() {
// if header row is empty, this is probably a list so format it that way
@ -294,6 +311,80 @@ fn convert_to_table(
}
}
struct PagingTableCreator {
head: Span,
stream: ValueStream,
ctrlc: Option<Arc<AtomicBool>>,
config: Config,
row_offset: usize,
}
impl Iterator for PagingTableCreator {
type Item = Value;
fn next(&mut self) -> Option<Self::Item> {
let mut batch = vec![];
let start_time = Instant::now();
let mut idx = 0;
// Pull from stream until time runs out or we have enough items
for item in self.stream.by_ref() {
batch.push(item);
idx += 1;
if idx % STREAM_TIMEOUT_CHECK_INTERVAL == 0 {
let end_time = Instant::now();
// If we've been buffering over a second, go ahead and send out what we have so far
if (end_time - start_time).as_secs() >= 1 {
break;
}
}
if idx == STREAM_PAGE_SIZE {
break;
}
if let Some(ctrlc) = &self.ctrlc {
if ctrlc.load(Ordering::SeqCst) {
break;
}
}
}
let color_hm = get_color_config(&self.config);
let term_width = if let Some((Width(w), Height(_h))) = terminal_size::terminal_size() {
(w - 1) as usize
} else {
80usize
};
let table = convert_to_table(
self.row_offset,
batch.into_iter(),
self.ctrlc.clone(),
&self.config,
);
self.row_offset += idx;
match table {
Ok(Some(table)) => {
let result = nu_table::draw_table(&table, term_width, &color_hm, &self.config);
Some(Value::String {
val: result,
span: self.head,
})
}
Err(err) => Some(Value::Error { error: err }),
_ => None,
}
}
}
fn load_theme_from_config(config: &Config) -> Theme {
match config.table_mode.as_str() {
"basic" => nu_table::Theme::basic(),

View file

@ -150,7 +150,16 @@ fn main() -> Result<()> {
) {
Ok(pipeline_data) => {
let config = stack.get_config()?;
println!("{}", pipeline_data.collect_string("\n", &config));
for item in pipeline_data {
if let Value::Error { error } = item {
let working_set = StateWorkingSet::new(&engine_state);
report_error(&working_set, &error);
std::process::exit(1);
}
println!("{}", item.into_string("\n", &config));
}
}
Err(err) => {
let working_set = StateWorkingSet::new(&engine_state);
@ -314,21 +323,46 @@ fn print_pipeline_data(
let config = stack.get_config()?;
let output = match engine_state.find_decl("table".as_bytes()) {
match engine_state.find_decl("table".as_bytes()) {
Some(decl_id) => {
let table =
engine_state
.get_decl(decl_id)
.run(engine_state, stack, &Call::new(), input)?;
table.collect_string("\n", &config)
}
None => input.collect_string(", ", &config),
};
let stdout = std::io::stdout();
match stdout.lock().write_all(output.as_bytes()) {
Ok(_) => (),
Err(err) => eprintln!("{}", err),
for item in table {
let stdout = std::io::stdout();
if let Value::Error { error } = item {
return Err(error);
}
let mut out = item.into_string("\n", &config);
out.push('\n');
match stdout.lock().write_all(out.as_bytes()) {
Ok(_) => (),
Err(err) => eprintln!("{}", err),
};
}
}
None => {
for item in input {
let stdout = std::io::stdout();
if let Value::Error { error } = item {
return Err(error);
}
let mut out = item.into_string("\n", &config);
out.push('\n');
match stdout.lock().write_all(out.as_bytes()) {
Ok(_) => (),
Err(err) => eprintln!("{}", err),
};
}
}
};
Ok(())