From 574d7f69367fe4a7176b41700380d876d2e5e4b4 Mon Sep 17 00:00:00 2001 From: JT <547158+jntrnr@users.noreply.github.com> Date: Fri, 3 Dec 2021 19:15:23 +1300 Subject: [PATCH] Add table streaming (#413) --- crates/nu-command/src/system/run_external.rs | 11 ++ crates/nu-command/src/viewers/table.rs | 121 ++++++++++++++++--- src/main.rs | 54 +++++++-- 3 files changed, 161 insertions(+), 25 deletions(-) diff --git a/crates/nu-command/src/system/run_external.rs b/crates/nu-command/src/system/run_external.rs index 3994e326a8..1e84d67cc7 100644 --- a/crates/nu-command/src/system/run_external.rs +++ b/crates/nu-command/src/system/run_external.rs @@ -226,6 +226,7 @@ impl ExternalCommand { let mut process = std::process::Command::new(&self.name.item); for arg in &self.args { + let arg = trim_enclosing_quotes(arg); process.arg(&arg); } @@ -275,6 +276,16 @@ fn shell_arg_escape(arg: &str) -> String { } } +fn trim_enclosing_quotes(input: &str) -> String { + let mut chars = input.chars(); + + match (chars.next(), chars.next_back()) { + (Some('"'), Some('"')) => chars.collect(), + (Some('\''), Some('\'')) => chars.collect(), + _ => input.to_string(), + } +} + // The piped data from stdout from the external command can be either String // or binary. We use this enum to pass the data from the spawned process #[derive(Debug)] diff --git a/crates/nu-command/src/viewers/table.rs b/crates/nu-command/src/viewers/table.rs index 3fbcf3af6b..29da046396 100644 --- a/crates/nu-command/src/viewers/table.rs +++ b/crates/nu-command/src/viewers/table.rs @@ -4,14 +4,18 @@ use lscolors::{LsColors, Style}; use nu_protocol::ast::{Call, PathMember}; use nu_protocol::engine::{Command, EngineState, Stack}; use nu_protocol::{ - Category, Config, DataSource, IntoPipelineData, PipelineData, PipelineMetadata, ShellError, - Signature, Span, Value, ValueStream, + Category, Config, DataSource, IntoInterruptiblePipelineData, IntoPipelineData, PipelineData, + PipelineMetadata, ShellError, Signature, Span, Value, ValueStream, }; use nu_table::{StyledString, TextStyle, Theme}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +use std::time::Instant; use terminal_size::{Height, Width}; +const STREAM_PAGE_SIZE: usize = 1000; +const STREAM_TIMEOUT_CHECK_INTERVAL: usize = 100; + #[derive(Clone)] pub struct Table; @@ -48,7 +52,7 @@ impl Command for Table { match input { PipelineData::Value(Value::List { vals, .. }, ..) => { - let table = convert_to_table(vals, ctrlc, &config)?; + let table = convert_to_table(0, vals, ctrlc, &config)?; if let Some(table) = table { let result = nu_table::draw_table(&table, term_width, &color_hm, &config); @@ -140,19 +144,30 @@ impl Command for Table { _ => stream, }; - let table = convert_to_table(stream, ctrlc, &config)?; + let head = call.head; - if let Some(table) = table { - let result = nu_table::draw_table(&table, term_width, &color_hm, &config); - - Ok(Value::String { - val: result, - span: call.head, - } - .into_pipeline_data()) - } else { - Ok(PipelineData::new(call.head)) + Ok(PagingTableCreator { + row_offset: 0, + config, + ctrlc: ctrlc.clone(), + head, + stream, } + .into_pipeline_data(ctrlc)) + + // let table = convert_to_table(stream, ctrlc, &config)?; + + // if let Some(table) = table { + // let result = nu_table::draw_table(&table, term_width, &color_hm, &config); + + // Ok(Value::String { + // val: result, + // span: call.head, + // } + // .into_pipeline_data()) + // } else { + // Ok(PipelineData::new(call.head)) + // } } PipelineData::Value(Value::Record { cols, vals, .. }, ..) => { let mut output = vec![]; @@ -195,6 +210,7 @@ impl Command for Table { } fn convert_to_table( + row_offset: usize, iter: impl IntoIterator, ctrlc: Option>, config: &Config, @@ -221,7 +237,8 @@ fn convert_to_table( return Err(error); } // String1 = datatype, String2 = value as string - let mut row: Vec<(String, String)> = vec![("string".to_string(), row_num.to_string())]; + let mut row: Vec<(String, String)> = + vec![("string".to_string(), (row_num + row_offset).to_string())]; if headers.is_empty() { // if header row is empty, this is probably a list so format it that way @@ -294,6 +311,80 @@ fn convert_to_table( } } +struct PagingTableCreator { + head: Span, + stream: ValueStream, + ctrlc: Option>, + config: Config, + row_offset: usize, +} + +impl Iterator for PagingTableCreator { + type Item = Value; + + fn next(&mut self) -> Option { + let mut batch = vec![]; + + let start_time = Instant::now(); + + let mut idx = 0; + + // Pull from stream until time runs out or we have enough items + for item in self.stream.by_ref() { + batch.push(item); + idx += 1; + + if idx % STREAM_TIMEOUT_CHECK_INTERVAL == 0 { + let end_time = Instant::now(); + + // If we've been buffering over a second, go ahead and send out what we have so far + if (end_time - start_time).as_secs() >= 1 { + break; + } + } + + if idx == STREAM_PAGE_SIZE { + break; + } + + if let Some(ctrlc) = &self.ctrlc { + if ctrlc.load(Ordering::SeqCst) { + break; + } + } + } + + let color_hm = get_color_config(&self.config); + + let term_width = if let Some((Width(w), Height(_h))) = terminal_size::terminal_size() { + (w - 1) as usize + } else { + 80usize + }; + + let table = convert_to_table( + self.row_offset, + batch.into_iter(), + self.ctrlc.clone(), + &self.config, + ); + self.row_offset += idx; + + match table { + Ok(Some(table)) => { + let result = nu_table::draw_table(&table, term_width, &color_hm, &self.config); + + Some(Value::String { + val: result, + span: self.head, + }) + } + Err(err) => Some(Value::Error { error: err }), + _ => None, + } + } +} + fn load_theme_from_config(config: &Config) -> Theme { match config.table_mode.as_str() { "basic" => nu_table::Theme::basic(), diff --git a/src/main.rs b/src/main.rs index 9a84cf6923..653423a63f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -150,7 +150,16 @@ fn main() -> Result<()> { ) { Ok(pipeline_data) => { let config = stack.get_config()?; - println!("{}", pipeline_data.collect_string("\n", &config)); + for item in pipeline_data { + if let Value::Error { error } = item { + let working_set = StateWorkingSet::new(&engine_state); + + report_error(&working_set, &error); + + std::process::exit(1); + } + println!("{}", item.into_string("\n", &config)); + } } Err(err) => { let working_set = StateWorkingSet::new(&engine_state); @@ -314,21 +323,46 @@ fn print_pipeline_data( let config = stack.get_config()?; - let output = match engine_state.find_decl("table".as_bytes()) { + match engine_state.find_decl("table".as_bytes()) { Some(decl_id) => { let table = engine_state .get_decl(decl_id) .run(engine_state, stack, &Call::new(), input)?; - table.collect_string("\n", &config) - } - None => input.collect_string(", ", &config), - }; - let stdout = std::io::stdout(); - match stdout.lock().write_all(output.as_bytes()) { - Ok(_) => (), - Err(err) => eprintln!("{}", err), + for item in table { + let stdout = std::io::stdout(); + + if let Value::Error { error } = item { + return Err(error); + } + + let mut out = item.into_string("\n", &config); + out.push('\n'); + + match stdout.lock().write_all(out.as_bytes()) { + Ok(_) => (), + Err(err) => eprintln!("{}", err), + }; + } + } + None => { + for item in input { + let stdout = std::io::stdout(); + + if let Value::Error { error } = item { + return Err(error); + } + + let mut out = item.into_string("\n", &config); + out.push('\n'); + + match stdout.lock().write_all(out.as_bytes()) { + Ok(_) => (), + Err(err) => eprintln!("{}", err), + }; + } + } }; Ok(())