diff --git a/crates/nu-cli/src/commands/history/history_.rs b/crates/nu-cli/src/commands/history/history_.rs index 4572aaba35..8b0714216e 100644 --- a/crates/nu-cli/src/commands/history/history_.rs +++ b/crates/nu-cli/src/commands/history/history_.rs @@ -107,7 +107,7 @@ impl Command for History { file: history_path.display().to_string(), span: head, })? - .into_pipeline_data(ctrlc)), + .into_pipeline_data(head, ctrlc)), HistoryFileFormat::Sqlite => Ok(history_reader .and_then(|h| { h.search(SearchQuery::everything(SearchDirection::Forward, None)) @@ -122,7 +122,7 @@ impl Command for History { file: history_path.display().to_string(), span: head, })? - .into_pipeline_data(ctrlc)), + .into_pipeline_data(head, ctrlc)), } } } else { diff --git a/crates/nu-cmd-extra/src/extra/filters/each_while.rs b/crates/nu-cmd-extra/src/extra/filters/each_while.rs index 4e9741ef92..2dda815d43 100644 --- a/crates/nu-cmd-extra/src/extra/filters/each_while.rs +++ b/crates/nu-cmd-extra/src/extra/filters/each_while.rs @@ -87,7 +87,7 @@ impl Command for EachWhile { Err(_) => None, }) .fuse() - .into_pipeline_data(engine_state.ctrlc.clone())) + .into_pipeline_data(head, engine_state.ctrlc.clone())) } PipelineData::ExternalStream { stdout: None, .. } => Ok(PipelineData::empty()), PipelineData::ExternalStream { @@ -108,7 +108,7 @@ impl Command for EachWhile { } }) .fuse() - .into_pipeline_data(engine_state.ctrlc.clone())) + .into_pipeline_data(head, engine_state.ctrlc.clone())) } // This match allows non-iterables to be accepted, // which is currently considered undesirable (Nov 2022). diff --git a/crates/nu-cmd-extra/src/extra/filters/update_cells.rs b/crates/nu-cmd-extra/src/extra/filters/update_cells.rs index d117d7fad2..9fe9bfe389 100644 --- a/crates/nu-cmd-extra/src/extra/filters/update_cells.rs +++ b/crates/nu-cmd-extra/src/extra/filters/update_cells.rs @@ -108,7 +108,7 @@ impl Command for UpdateCells { columns, span: head, } - .into_pipeline_data(engine_state.ctrlc.clone()) + .into_pipeline_data(head, engine_state.ctrlc.clone()) .set_metadata(metadata)) } } diff --git a/crates/nu-cmd-extra/src/extra/strings/format/command.rs b/crates/nu-cmd-extra/src/extra/strings/format/command.rs index 437bbc14c9..932b5ccb7f 100644 --- a/crates/nu-cmd-extra/src/extra/strings/format/command.rs +++ b/crates/nu-cmd-extra/src/extra/strings/format/command.rs @@ -238,10 +238,7 @@ fn format( } } - Ok(PipelineData::ListStream( - ListStream::from_stream(list.into_iter(), None), - None, - )) + Ok(ListStream::new(list.into_iter(), head_span, engine_state.ctrlc.clone()).into()) } // Unwrapping this ShellError is a bit unfortunate. // Ideally, its Span would be preserved. diff --git a/crates/nu-cmd-lang/src/core_commands/do_.rs b/crates/nu-cmd-lang/src/core_commands/do_.rs index c2b630e08d..b057880cf3 100644 --- a/crates/nu-cmd-lang/src/core_commands/do_.rs +++ b/crates/nu-cmd-lang/src/core_commands/do_.rs @@ -62,6 +62,7 @@ impl Command for Do { call: &Call, input: PipelineData, ) -> Result { + let head = call.head; let block: Closure = call.req(engine_state, caller_stack, 0)?; let rest: Vec = call.rest(engine_state, caller_stack, 1)?; let ignore_all_errors = call.has_flag(engine_state, caller_stack, "ignore-errors")?; @@ -75,7 +76,7 @@ impl Command for Do { let mut callee_stack = caller_stack.captures_to_stack_preserve_out_dest(block.captures); let block = engine_state.get_block(block.block_id); - bind_args_to(&mut callee_stack, &block.signature, rest, call.head)?; + bind_args_to(&mut callee_stack, &block.signature, rest, head)?; let eval_block_with_early_return = get_eval_block_with_early_return(engine_state); let result = eval_block_with_early_return(engine_state, &mut callee_stack, block, input); @@ -117,7 +118,7 @@ impl Command for Do { None, ) }) - .err_span(call.head) + .err_span(head) }) .transpose()?; @@ -148,13 +149,9 @@ impl Command for Do { None }; - let mut exit_code_ctrlc = None; let exit_code: Vec = match exit_code { None => vec![], - Some(exit_code_stream) => { - exit_code_ctrlc.clone_from(&exit_code_stream.ctrlc); - exit_code_stream.into_iter().collect() - } + Some(exit_code_stream) => exit_code_stream.into_iter().collect(), }; if let Some(Value::Int { val: code, .. }) = exit_code.last() { if *code != 0 { @@ -174,10 +171,7 @@ impl Command for Do { span, None, )), - exit_code: Some(ListStream::from_stream( - exit_code.into_iter(), - exit_code_ctrlc, - )), + exit_code: Some(ListStream::new(exit_code.into_iter(), span, None)), span, metadata, trim_end_newline, @@ -205,21 +199,15 @@ impl Command for Do { Ok(PipelineData::Value(Value::Error { .. }, ..)) | Err(_) if ignore_shell_errors => { Ok(PipelineData::empty()) } - Ok(PipelineData::ListStream(ls, metadata)) if ignore_shell_errors => { - // check if there is a `Value::Error` in given list stream first. - let mut values = vec![]; - let ctrlc = ls.ctrlc.clone(); - for v in ls { - if let Value::Error { .. } = v { - values.push(Value::nothing(call.head)); + Ok(PipelineData::ListStream(stream, metadata)) if ignore_shell_errors => { + let stream = stream.map(move |value| { + if let Value::Error { .. } = value { + Value::nothing(head) } else { - values.push(v) + value } - } - Ok(PipelineData::ListStream( - ListStream::from_stream(values.into_iter(), ctrlc), - metadata, - )) + }); + Ok(PipelineData::ListStream(stream, metadata)) } r => r, } diff --git a/crates/nu-cmd-lang/src/core_commands/for_.rs b/crates/nu-cmd-lang/src/core_commands/for_.rs index e4e15e74dc..64e6c0a6ba 100644 --- a/crates/nu-cmd-lang/src/core_commands/for_.rs +++ b/crates/nu-cmd-lang/src/core_commands/for_.rs @@ -1,5 +1,4 @@ use nu_engine::{command_prelude::*, get_eval_block, get_eval_expression}; -use nu_protocol::ListStream; #[derive(Clone)] pub struct For; @@ -88,7 +87,11 @@ impl Command for For { let span = value.span(); match value { Value::List { vals, .. } => { - for (idx, x) in ListStream::from_stream(vals.into_iter(), ctrlc).enumerate() { + for (idx, x) in vals.into_iter().enumerate() { + if nu_utils::ctrl_c::was_pressed(&ctrlc) { + break; + } + // with_env() is used here to ensure that each iteration uses // a different set of environment variables. // Hence, a 'cd' in the first loop won't affect the next loop. diff --git a/crates/nu-cmd-lang/src/core_commands/scope/aliases.rs b/crates/nu-cmd-lang/src/core_commands/scope/aliases.rs index 780d8d5a13..0b8d2d0200 100644 --- a/crates/nu-cmd-lang/src/core_commands/scope/aliases.rs +++ b/crates/nu-cmd-lang/src/core_commands/scope/aliases.rs @@ -26,13 +26,10 @@ impl Command for ScopeAliases { call: &Call, _input: PipelineData, ) -> Result { - let span = call.head; - let ctrlc = engine_state.ctrlc.clone(); - + let head = call.head; let mut scope_data = ScopeData::new(engine_state, stack); scope_data.populate_decls(); - - Ok(scope_data.collect_aliases(span).into_pipeline_data(ctrlc)) + Ok(Value::list(scope_data.collect_aliases(head), head).into_pipeline_data()) } fn examples(&self) -> Vec { diff --git a/crates/nu-cmd-lang/src/core_commands/scope/commands.rs b/crates/nu-cmd-lang/src/core_commands/scope/commands.rs index 5feed1a9ee..6ce0448f4e 100644 --- a/crates/nu-cmd-lang/src/core_commands/scope/commands.rs +++ b/crates/nu-cmd-lang/src/core_commands/scope/commands.rs @@ -26,13 +26,10 @@ impl Command for ScopeCommands { call: &Call, _input: PipelineData, ) -> Result { - let span = call.head; - let ctrlc = engine_state.ctrlc.clone(); - + let head = call.head; let mut scope_data = ScopeData::new(engine_state, stack); scope_data.populate_decls(); - - Ok(scope_data.collect_commands(span).into_pipeline_data(ctrlc)) + Ok(Value::list(scope_data.collect_commands(head), head).into_pipeline_data()) } fn examples(&self) -> Vec { diff --git a/crates/nu-cmd-lang/src/core_commands/scope/externs.rs b/crates/nu-cmd-lang/src/core_commands/scope/externs.rs index 30cdc53f26..187f90206d 100644 --- a/crates/nu-cmd-lang/src/core_commands/scope/externs.rs +++ b/crates/nu-cmd-lang/src/core_commands/scope/externs.rs @@ -26,13 +26,10 @@ impl Command for ScopeExterns { call: &Call, _input: PipelineData, ) -> Result { - let span = call.head; - let ctrlc = engine_state.ctrlc.clone(); - + let head = call.head; let mut scope_data = ScopeData::new(engine_state, stack); scope_data.populate_decls(); - - Ok(scope_data.collect_externs(span).into_pipeline_data(ctrlc)) + Ok(Value::list(scope_data.collect_externs(head), head).into_pipeline_data()) } fn examples(&self) -> Vec { diff --git a/crates/nu-cmd-lang/src/core_commands/scope/modules.rs b/crates/nu-cmd-lang/src/core_commands/scope/modules.rs index 6031121847..9d78f2a061 100644 --- a/crates/nu-cmd-lang/src/core_commands/scope/modules.rs +++ b/crates/nu-cmd-lang/src/core_commands/scope/modules.rs @@ -26,13 +26,10 @@ impl Command for ScopeModules { call: &Call, _input: PipelineData, ) -> Result { - let span = call.head; - let ctrlc = engine_state.ctrlc.clone(); - + let head = call.head; let mut scope_data = ScopeData::new(engine_state, stack); scope_data.populate_modules(); - - Ok(scope_data.collect_modules(span).into_pipeline_data(ctrlc)) + Ok(Value::list(scope_data.collect_modules(head), head).into_pipeline_data()) } fn examples(&self) -> Vec { diff --git a/crates/nu-cmd-lang/src/core_commands/scope/variables.rs b/crates/nu-cmd-lang/src/core_commands/scope/variables.rs index 7f44289fb4..6c9a695c71 100644 --- a/crates/nu-cmd-lang/src/core_commands/scope/variables.rs +++ b/crates/nu-cmd-lang/src/core_commands/scope/variables.rs @@ -26,13 +26,10 @@ impl Command for ScopeVariables { call: &Call, _input: PipelineData, ) -> Result { - let span = call.head; - let ctrlc = engine_state.ctrlc.clone(); - + let head = call.head; let mut scope_data = ScopeData::new(engine_state, stack); scope_data.populate_vars(); - - Ok(scope_data.collect_vars(span).into_pipeline_data(ctrlc)) + Ok(Value::list(scope_data.collect_vars(head), head).into_pipeline_data()) } fn examples(&self) -> Vec { diff --git a/crates/nu-cmd-plugin/src/commands/plugin/list.rs b/crates/nu-cmd-plugin/src/commands/plugin/list.rs index 1d630b67c0..6b715a0001 100644 --- a/crates/nu-cmd-plugin/src/commands/plugin/list.rs +++ b/crates/nu-cmd-plugin/src/commands/plugin/list.rs @@ -69,35 +69,47 @@ impl Command for PluginList { call: &Call, _input: PipelineData, ) -> Result { - let span = call.span(); + let head = call.head; + // Group plugin decls by plugin identity let decls = engine_state.plugin_decls().into_group_map_by(|decl| { decl.plugin_identity() .expect("plugin decl should have identity") }); + // Build plugins list let list = engine_state.plugins().iter().map(|plugin| { // Find commands that belong to the plugin let commands = decls.get(plugin.identity()) .into_iter() .flat_map(|decls| { - decls.iter().map(|decl| Value::string(decl.name(), span)) + decls.iter().map(|decl| Value::string(decl.name(), head)) }) .collect(); - Value::record(record! { - "name" => Value::string(plugin.identity().name(), span), - "is_running" => Value::bool(plugin.is_running(), span), - "pid" => plugin.pid() - .map(|p| Value::int(p as i64, span)) - .unwrap_or(Value::nothing(span)), - "filename" => Value::string(plugin.identity().filename().to_string_lossy(), span), - "shell" => plugin.identity().shell() - .map(|s| Value::string(s.to_string_lossy(), span)) - .unwrap_or(Value::nothing(span)), - "commands" => Value::list(commands, span), - }, span) - }).collect::>(); - Ok(list.into_pipeline_data(engine_state.ctrlc.clone())) + let pid = plugin + .pid() + .map(|p| Value::int(p as i64, head)) + .unwrap_or(Value::nothing(head)); + + let shell = plugin + .identity() + .shell() + .map(|s| Value::string(s.to_string_lossy(), head)) + .unwrap_or(Value::nothing(head)); + + let record = record! { + "name" => Value::string(plugin.identity().name(), head), + "is_running" => Value::bool(plugin.is_running(), head), + "pid" => pid, + "filename" => Value::string(plugin.identity().filename().to_string_lossy(), head), + "shell" => shell, + "commands" => Value::list(commands, head), + }; + + Value::record(record, head) + }).collect(); + + Ok(Value::list(list, head).into_pipeline_data()) } } diff --git a/crates/nu-command/src/conversions/into/cell_path.rs b/crates/nu-command/src/conversions/into/cell_path.rs index 039656e9d6..4faa6e83d6 100644 --- a/crates/nu-command/src/conversions/into/cell_path.rs +++ b/crates/nu-command/src/conversions/into/cell_path.rs @@ -98,7 +98,7 @@ fn into_cell_path(call: &Call, input: PipelineData) -> Result Ok(value_to_cell_path(&value, head)?.into_pipeline_data()), PipelineData::ListStream(stream, ..) => { - let list: Vec<_> = stream.collect(); + let list: Vec<_> = stream.into_iter().collect(); Ok(list_to_cell_path(&list, head)?.into_pipeline_data()) } PipelineData::ExternalStream { span, .. } => Err(ShellError::OnlySupportsThisInputType { diff --git a/crates/nu-command/src/conversions/into/value.rs b/crates/nu-command/src/conversions/into/value.rs index 6021a4980a..4bf7e68f53 100644 --- a/crates/nu-command/src/conversions/into/value.rs +++ b/crates/nu-command/src/conversions/into/value.rs @@ -81,7 +81,7 @@ impl Command for IntoValue { display_as_filesizes, span, } - .into_pipeline_data(ctrlc) + .into_pipeline_data(span, ctrlc) .set_metadata(metadata)) } } diff --git a/crates/nu-command/src/database/commands/into_sqlite.rs b/crates/nu-command/src/database/commands/into_sqlite.rs index 7f61b6bc06..f3a2e3622a 100644 --- a/crates/nu-command/src/database/commands/into_sqlite.rs +++ b/crates/nu-command/src/database/commands/into_sqlite.rs @@ -203,8 +203,8 @@ fn action( ctrl_c: Option>, ) -> Result { match input { - PipelineData::ListStream(list_stream, _) => { - insert_in_transaction(list_stream.stream, span, table, ctrl_c) + PipelineData::ListStream(stream, _) => { + insert_in_transaction(stream.into_iter(), span, table, ctrl_c) } PipelineData::Value( Value::List { diff --git a/crates/nu-command/src/date/list_timezone.rs b/crates/nu-command/src/date/list_timezone.rs index bb56cb995d..6f9267947d 100644 --- a/crates/nu-command/src/date/list_timezone.rs +++ b/crates/nu-command/src/date/list_timezone.rs @@ -30,17 +30,17 @@ impl Command for SubCommand { call: &Call, _input: PipelineData, ) -> Result { - let span = call.head; + let head = call.head; Ok(TZ_VARIANTS .iter() .map(move |x| { Value::record( - record! { "timezone" => Value::string(x.name(), span) }, - span, + record! { "timezone" => Value::string(x.name(), head) }, + head, ) }) - .into_pipeline_data(engine_state.ctrlc.clone())) + .into_pipeline_data(head, engine_state.ctrlc.clone())) } fn examples(&self) -> Vec { diff --git a/crates/nu-command/src/debug/explain.rs b/crates/nu-command/src/debug/explain.rs index 0abd87b9a3..b451d6916a 100644 --- a/crates/nu-command/src/debug/explain.rs +++ b/crates/nu-command/src/debug/explain.rs @@ -34,16 +34,14 @@ impl Command for Explain { stack: &mut Stack, call: &Call, _input: PipelineData, - ) -> Result { + ) -> Result { + let head = call.head; // This was all delightfully stolen from benchmark :) let capture_block: Closure = call.req(engine_state, stack, 0)?; let block = engine_state.get_block(capture_block.block_id); - let ctrlc = engine_state.ctrlc.clone(); let mut stack = stack.captures_to_stack(capture_block.captures); - - let elements = get_pipeline_elements(engine_state, &mut stack, block, call.head); - - Ok(elements.into_pipeline_data(ctrlc)) + let elements = get_pipeline_elements(engine_state, &mut stack, block, head); + Ok(Value::list(elements, head).into_pipeline_data()) } fn examples(&self) -> Vec { diff --git a/crates/nu-command/src/debug/metadata_set.rs b/crates/nu-command/src/debug/metadata_set.rs index 29cb8ab0a2..6608827fda 100644 --- a/crates/nu-command/src/debug/metadata_set.rs +++ b/crates/nu-command/src/debug/metadata_set.rs @@ -47,13 +47,21 @@ impl Command for MetadataSet { let metadata = PipelineMetadata { data_source: DataSource::FilePath(path.into()), }; - Ok(input.into_pipeline_data_with_metadata(metadata, engine_state.ctrlc.clone())) + Ok(input.into_pipeline_data_with_metadata( + head, + engine_state.ctrlc.clone(), + metadata, + )) } (None, true) => { let metadata = PipelineMetadata { data_source: DataSource::Ls, }; - Ok(input.into_pipeline_data_with_metadata(metadata, engine_state.ctrlc.clone())) + Ok(input.into_pipeline_data_with_metadata( + head, + engine_state.ctrlc.clone(), + metadata, + )) } _ => Err(ShellError::IncorrectValue { msg: "Expected either --datasource-ls(-l) or --datasource-filepath(-f)".to_string(), diff --git a/crates/nu-command/src/filesystem/du.rs b/crates/nu-command/src/filesystem/du.rs index adf648f732..c48a1f7ac1 100644 --- a/crates/nu-command/src/filesystem/du.rs +++ b/crates/nu-command/src/filesystem/du.rs @@ -121,7 +121,7 @@ impl Command for Du { }; Ok( du_for_one_pattern(args, ¤t_dir, tag, engine_state.ctrlc.clone())? - .into_pipeline_data(engine_state.ctrlc.clone()), + .into_pipeline_data(tag, engine_state.ctrlc.clone()), ) } Some(paths) => { @@ -147,7 +147,7 @@ impl Command for Du { Ok(result_iters .into_iter() .flatten() - .into_pipeline_data(engine_state.ctrlc.clone())) + .into_pipeline_data(tag, engine_state.ctrlc.clone())) } } } diff --git a/crates/nu-command/src/filesystem/glob.rs b/crates/nu-command/src/filesystem/glob.rs index 909dc36837..c5f4bc08b4 100644 --- a/crates/nu-command/src/filesystem/glob.rs +++ b/crates/nu-command/src/filesystem/glob.rs @@ -199,7 +199,7 @@ impl Command for Glob { } }; - Ok(if !not_patterns.is_empty() { + let result = if !not_patterns.is_empty() { let np: Vec<&str> = not_patterns.iter().map(|s| s as &str).collect(); let glob_results = glob .walk_with_behavior( @@ -218,10 +218,7 @@ impl Command for Glob { inner: vec![], })? .flatten(); - let result = glob_to_value(ctrlc, glob_results, no_dirs, no_files, no_symlinks, span)?; - result - .into_iter() - .into_pipeline_data(engine_state.ctrlc.clone()) + glob_to_value(ctrlc, glob_results, no_dirs, no_files, no_symlinks, span) } else { let glob_results = glob .walk_with_behavior( @@ -232,11 +229,12 @@ impl Command for Glob { }, ) .flatten(); - let result = glob_to_value(ctrlc, glob_results, no_dirs, no_files, no_symlinks, span)?; - result - .into_iter() - .into_pipeline_data(engine_state.ctrlc.clone()) - }) + glob_to_value(ctrlc, glob_results, no_dirs, no_files, no_symlinks, span) + }?; + + Ok(result + .into_iter() + .into_pipeline_data(span, engine_state.ctrlc.clone())) } } diff --git a/crates/nu-command/src/filesystem/ls.rs b/crates/nu-command/src/filesystem/ls.rs index 9d8db0a8c6..dd1b9770d2 100644 --- a/crates/nu-command/src/filesystem/ls.rs +++ b/crates/nu-command/src/filesystem/ls.rs @@ -115,10 +115,11 @@ impl Command for Ls { match input_pattern_arg { None => Ok(ls_for_one_pattern(None, args, ctrl_c.clone(), cwd)? .into_pipeline_data_with_metadata( + call_span, + ctrl_c, PipelineMetadata { data_source: DataSource::Ls, }, - ctrl_c, )), Some(pattern) => { let mut result_iters = vec![]; @@ -137,10 +138,11 @@ impl Command for Ls { .into_iter() .flatten() .into_pipeline_data_with_metadata( + call_span, + ctrl_c, PipelineMetadata { data_source: DataSource::Ls, }, - ctrl_c, )) } } diff --git a/crates/nu-command/src/filesystem/open.rs b/crates/nu-command/src/filesystem/open.rs index 63b6aa8f04..23664bb576 100644 --- a/crates/nu-command/src/filesystem/open.rs +++ b/crates/nu-command/src/filesystem/open.rs @@ -209,7 +209,10 @@ impl Command for Open { } else if output.len() == 1 { Ok(output.remove(0)) } else { - Ok(output.into_iter().flatten().into_pipeline_data(ctrlc)) + Ok(output + .into_iter() + .flatten() + .into_pipeline_data(call_span, ctrlc)) } } diff --git a/crates/nu-command/src/filesystem/rm.rs b/crates/nu-command/src/filesystem/rm.rs index fe14253982..9b9e88b5ff 100644 --- a/crates/nu-command/src/filesystem/rm.rs +++ b/crates/nu-command/src/filesystem/rm.rs @@ -465,7 +465,7 @@ fn rm( } }) .filter(|x| !matches!(x.get_type(), Type::Nothing)) - .into_pipeline_data(ctrlc) + .into_pipeline_data(span, ctrlc) .print_not_formatted(engine_state, false, true)?; Ok(PipelineData::empty()) diff --git a/crates/nu-command/src/filters/append.rs b/crates/nu-command/src/filters/append.rs index 2ad5c2dbe4..af5bd49283 100644 --- a/crates/nu-command/src/filters/append.rs +++ b/crates/nu-command/src/filters/append.rs @@ -116,7 +116,7 @@ only unwrap the outer list, and leave the variable's contents untouched."# Ok(input .into_iter() .chain(other.into_pipeline_data()) - .into_pipeline_data_with_metadata(metadata, engine_state.ctrlc.clone())) + .into_pipeline_data_with_metadata(call.head, engine_state.ctrlc.clone(), metadata)) } } diff --git a/crates/nu-command/src/filters/columns.rs b/crates/nu-command/src/filters/columns.rs index 44b713e793..b6e15af8df 100644 --- a/crates/nu-command/src/filters/columns.rs +++ b/crates/nu-command/src/filters/columns.rs @@ -62,73 +62,68 @@ impl Command for Columns { fn run( &self, - engine_state: &EngineState, + _engine_state: &EngineState, _stack: &mut Stack, call: &Call, input: PipelineData, ) -> Result { - let span = call.head; - getcol(engine_state, span, input) + getcol(call.head, input) } } -fn getcol( - engine_state: &EngineState, - head: Span, - input: PipelineData, -) -> Result { - let ctrlc = engine_state.ctrlc.clone(); +fn getcol(head: Span, input: PipelineData) -> Result { let metadata = input.metadata(); match input { PipelineData::Empty => Ok(PipelineData::Empty), PipelineData::Value(v, ..) => { let span = v.span(); - match v { + let cols = match v { Value::List { vals: input_vals, .. - } => { - let input_cols = get_columns(&input_vals); - Ok(input_cols - .into_iter() - .map(move |x| Value::string(x, span)) - .into_pipeline_data(ctrlc) - .set_metadata(metadata)) - } + } => get_columns(&input_vals) + .into_iter() + .map(move |x| Value::string(x, span)) + .collect(), Value::Custom { val, .. } => { // TODO: should we get CustomValue to expose columns in a more efficient way? // Would be nice to be able to get columns without generating the whole value let input_as_base_value = val.to_base_value(span)?; - let input_cols = get_columns(&[input_as_base_value]); - Ok(input_cols + get_columns(&[input_as_base_value]) .into_iter() .map(move |x| Value::string(x, span)) - .into_pipeline_data(ctrlc) - .set_metadata(metadata)) + .collect() } - Value::Record { val, .. } => Ok(val + Value::Record { val, .. } => val .into_owned() .into_iter() .map(move |(x, _)| Value::string(x, head)) - .into_pipeline_data(ctrlc) - .set_metadata(metadata)), + .collect(), // Propagate errors - Value::Error { error, .. } => Err(*error), - other => Err(ShellError::OnlySupportsThisInputType { - exp_input_type: "record or table".into(), - wrong_type: other.get_type().to_string(), - dst_span: head, - src_span: other.span(), - }), - } + Value::Error { error, .. } => return Err(*error), + other => { + return Err(ShellError::OnlySupportsThisInputType { + exp_input_type: "record or table".into(), + wrong_type: other.get_type().to_string(), + dst_span: head, + src_span: other.span(), + }) + } + }; + + Ok(Value::list(cols, head) + .into_pipeline_data() + .set_metadata(metadata)) } PipelineData::ListStream(stream, ..) => { - let v: Vec<_> = stream.into_iter().collect(); - let input_cols = get_columns(&v); - - Ok(input_cols + let values = stream.into_iter().collect::>(); + let cols = get_columns(&values) .into_iter() - .map(move |x| Value::string(x, head)) - .into_pipeline_data_with_metadata(metadata, ctrlc)) + .map(|s| Value::string(s, head)) + .collect(); + + Ok(Value::list(cols, head) + .into_pipeline_data() + .set_metadata(metadata)) } PipelineData::ExternalStream { .. } => Err(ShellError::OnlySupportsThisInputType { exp_input_type: "record or table".into(), diff --git a/crates/nu-command/src/filters/drop/column.rs b/crates/nu-command/src/filters/drop/column.rs index 3f527bcc14..3354492570 100644 --- a/crates/nu-command/src/filters/drop/column.rs +++ b/crates/nu-command/src/filters/drop/column.rs @@ -90,7 +90,8 @@ fn drop_cols( // is displayed farther to the right. let metadata = input.metadata(); match input { - PipelineData::ListStream(mut stream, ..) => { + PipelineData::ListStream(stream, ..) => { + let mut stream = stream.into_iter(); if let Some(mut first) = stream.next() { let drop_cols = drop_cols_set(&mut first, head, columns)?; @@ -101,7 +102,7 @@ fn drop_cols( Err(e) => Value::error(e, head), } })) - .into_pipeline_data_with_metadata(metadata, engine_state.ctrlc.clone())) + .into_pipeline_data_with_metadata(head, engine_state.ctrlc.clone(), metadata)) } else { Ok(PipelineData::Empty) } diff --git a/crates/nu-command/src/filters/drop/nth.rs b/crates/nu-command/src/filters/drop/nth.rs index b0858b19cc..a530de5aa1 100644 --- a/crates/nu-command/src/filters/drop/nth.rs +++ b/crates/nu-command/src/filters/drop/nth.rs @@ -100,6 +100,7 @@ impl Command for DropNth { call: &Call, input: PipelineData, ) -> Result { + let head = call.head; let metadata = input.metadata(); let number_or_range = extract_int_or_range(engine_state, stack, call)?; @@ -115,7 +116,7 @@ impl Command for DropNth { return Err(ShellError::UnsupportedInput { msg: "float range".into(), input: "value originates from here".into(), - msg_span: call.head, + msg_span: head, input_span: number_or_range.span, }); } @@ -129,7 +130,7 @@ impl Command for DropNth { return Err(ShellError::UnsupportedInput { msg: "drop nth accepts only positive ints".into(), input: "value originates from here".into(), - msg_span: call.head, + msg_span: head, input_span: number_or_range.span, }); } @@ -139,7 +140,7 @@ impl Command for DropNth { msg: "The upper bound needs to be equal or larger to the lower bound" .into(), input: "value originates from here".into(), - msg_span: call.head, + msg_span: head, input_span: number_or_range.span, }); } @@ -154,8 +155,9 @@ impl Command for DropNth { .into_iter() .take(start) .into_pipeline_data_with_metadata( - metadata, + head, engine_state.ctrlc.clone(), + metadata, )) } }; @@ -175,7 +177,7 @@ impl Command for DropNth { rows, current: 0, } - .into_pipeline_data_with_metadata(metadata, engine_state.ctrlc.clone())) + .into_pipeline_data_with_metadata(head, engine_state.ctrlc.clone(), metadata)) } } diff --git a/crates/nu-command/src/filters/each.rs b/crates/nu-command/src/filters/each.rs index d0267b1fce..65d61fd3a8 100644 --- a/crates/nu-command/src/filters/each.rs +++ b/crates/nu-command/src/filters/each.rs @@ -138,7 +138,7 @@ with 'transpose' first."# } } }) - .into_pipeline_data(engine_state.ctrlc.clone())) + .into_pipeline_data(head, engine_state.ctrlc.clone())) } PipelineData::ExternalStream { stdout: None, .. } => Ok(PipelineData::empty()), PipelineData::ExternalStream { @@ -170,7 +170,7 @@ with 'transpose' first."# } } }) - .into_pipeline_data(engine_state.ctrlc.clone())) + .into_pipeline_data(head, engine_state.ctrlc.clone())) } // This match allows non-iterables to be accepted, // which is currently considered undesirable (Nov 2022). diff --git a/crates/nu-command/src/filters/empty.rs b/crates/nu-command/src/filters/empty.rs index e75700f26d..fd55921414 100644 --- a/crates/nu-command/src/filters/empty.rs +++ b/crates/nu-command/src/filters/empty.rs @@ -60,10 +60,11 @@ pub fn empty( } }, PipelineData::ListStream(s, ..) => { + let empty = s.into_iter().next().is_none(); if negate { - Ok(Value::bool(s.count() != 0, head).into_pipeline_data()) + Ok(Value::bool(!empty, head).into_pipeline_data()) } else { - Ok(Value::bool(s.count() == 0, head).into_pipeline_data()) + Ok(Value::bool(empty, head).into_pipeline_data()) } } PipelineData::Value(value, ..) => { diff --git a/crates/nu-command/src/filters/enumerate.rs b/crates/nu-command/src/filters/enumerate.rs index e46c873c4b..1034780657 100644 --- a/crates/nu-command/src/filters/enumerate.rs +++ b/crates/nu-command/src/filters/enumerate.rs @@ -50,9 +50,9 @@ impl Command for Enumerate { call: &Call, input: PipelineData, ) -> Result { + let head = call.head; let metadata = input.metadata(); let ctrlc = engine_state.ctrlc.clone(); - let span = call.head; Ok(input .into_iter() @@ -60,13 +60,13 @@ impl Command for Enumerate { .map(move |(idx, x)| { Value::record( record! { - "index" => Value::int(idx as i64, span), + "index" => Value::int(idx as i64, head), "item" => x, }, - span, + head, ) }) - .into_pipeline_data_with_metadata(metadata, ctrlc)) + .into_pipeline_data_with_metadata(head, ctrlc, metadata)) } } diff --git a/crates/nu-command/src/filters/every.rs b/crates/nu-command/src/filters/every.rs index a7fc4f1b02..1202b4f7c9 100644 --- a/crates/nu-command/src/filters/every.rs +++ b/crates/nu-command/src/filters/every.rs @@ -78,7 +78,7 @@ impl Command for Every { None } }) - .into_pipeline_data_with_metadata(metadata, engine_state.ctrlc.clone())) + .into_pipeline_data_with_metadata(call.head, engine_state.ctrlc.clone(), metadata)) } } diff --git a/crates/nu-command/src/filters/filter.rs b/crates/nu-command/src/filters/filter.rs index 7fe936b4f4..b158dd3be3 100644 --- a/crates/nu-command/src/filters/filter.rs +++ b/crates/nu-command/src/filters/filter.rs @@ -67,7 +67,7 @@ a variable. On the other hand, the "row condition" syntax is not supported."# Some(Value::error(err, span)) } }) - .into_pipeline_data(engine_state.ctrlc.clone())) + .into_pipeline_data(head, engine_state.ctrlc.clone())) } PipelineData::ExternalStream { stdout: None, .. } => Ok(PipelineData::empty()), PipelineData::ExternalStream { @@ -92,7 +92,7 @@ a variable. On the other hand, the "row condition" syntax is not supported."# } } }) - .into_pipeline_data(engine_state.ctrlc.clone())) + .into_pipeline_data(head, engine_state.ctrlc.clone())) } // This match allows non-iterables to be accepted, // which is currently considered undesirable (Nov 2022). @@ -108,7 +108,7 @@ a variable. On the other hand, the "row condition" syntax is not supported."# Some(Value::error(err, span)) } } - .into_pipeline_data(engine_state.ctrlc.clone())) + .into_pipeline_data(head, engine_state.ctrlc.clone())) } } .map(|data| data.set_metadata(metadata)) diff --git a/crates/nu-command/src/filters/find.rs b/crates/nu-command/src/filters/find.rs index 6efda0d836..b45fe8d810 100644 --- a/crates/nu-command/src/filters/find.rs +++ b/crates/nu-command/src/filters/find.rs @@ -3,7 +3,7 @@ use fancy_regex::Regex; use nu_ansi_term::Style; use nu_color_config::StyleComputer; use nu_engine::command_prelude::*; -use nu_protocol::{Config, ListStream}; +use nu_protocol::Config; use nu_utils::IgnoreCaseExt; #[derive(Clone)] @@ -416,9 +416,9 @@ fn find_with_rest_and_highlight( }, ctrlc, ), - PipelineData::ListStream(stream, metadata) => Ok(ListStream::from_stream( - stream - .map(move |mut x| { + PipelineData::ListStream(stream, metadata) => { + let stream = stream.modify(|iter| { + iter.map(move |mut x| { let span = x.span(); match &mut x { Value::Record { val, .. } => highlight_terms_in_record_with_search_columns( @@ -442,10 +442,11 @@ fn find_with_rest_and_highlight( &cols_to_search_in_filter, invert, ) - }), - ctrlc.clone(), - ) - .into_pipeline_data_with_metadata(metadata, ctrlc)), + }) + }); + + Ok(PipelineData::ListStream(stream, metadata)) + } PipelineData::ExternalStream { stdout: None, .. } => Ok(PipelineData::empty()), PipelineData::ExternalStream { stdout: Some(stream), @@ -496,7 +497,7 @@ fn find_with_rest_and_highlight( Err(e) => return Err(e), }; } - Ok(output.into_pipeline_data(ctrlc)) + Ok(output.into_pipeline_data(span, ctrlc)) } } } diff --git a/crates/nu-command/src/filters/first.rs b/crates/nu-command/src/filters/first.rs index eabd370858..1bc51f2562 100644 --- a/crates/nu-command/src/filters/first.rs +++ b/crates/nu-command/src/filters/first.rs @@ -143,7 +143,7 @@ fn first_helper( } else { Ok(iter .take(rows) - .into_pipeline_data_with_metadata(metadata, ctrlc)) + .into_pipeline_data_with_metadata(span, ctrlc, metadata)) } } // Propagate errors by explicitly matching them before the final case. @@ -156,17 +156,18 @@ fn first_helper( }), } } - PipelineData::ListStream(mut ls, metadata) => { + PipelineData::ListStream(stream, metadata) => { if return_single_element { - if let Some(v) = ls.next() { + if let Some(v) = stream.into_iter().next() { Ok(v.into_pipeline_data()) } else { Err(ShellError::AccessEmptyContent { span: head }) } } else { - Ok(ls - .take(rows) - .into_pipeline_data_with_metadata(metadata, engine_state.ctrlc.clone())) + Ok(PipelineData::ListStream( + stream.modify(|iter| iter.take(rows)), + metadata, + )) } } PipelineData::ExternalStream { span, .. } => Err(ShellError::OnlySupportsThisInputType { diff --git a/crates/nu-command/src/filters/get.rs b/crates/nu-command/src/filters/get.rs index 5772ec2f11..c6232b80fc 100644 --- a/crates/nu-command/src/filters/get.rs +++ b/crates/nu-command/src/filters/get.rs @@ -89,7 +89,7 @@ If multiple cell paths are given, this will produce a list of values."# output.push(val?); } - Ok(output.into_iter().into_pipeline_data(ctrlc)) + Ok(output.into_iter().into_pipeline_data(span, ctrlc)) } .map(|x| x.set_metadata(metadata)) } diff --git a/crates/nu-command/src/filters/group.rs b/crates/nu-command/src/filters/group.rs index 196d2f79c8..821f35f34e 100644 --- a/crates/nu-command/src/filters/group.rs +++ b/crates/nu-command/src/filters/group.rs @@ -1,4 +1,5 @@ use nu_engine::command_prelude::*; +use nu_protocol::ValueIterator; #[derive(Clone)] pub struct Group; @@ -52,6 +53,7 @@ impl Command for Group { call: &Call, input: PipelineData, ) -> Result { + let head = call.head; let group_size: Spanned = call.req(engine_state, stack, 0)?; let ctrlc = engine_state.ctrlc.clone(); let metadata = input.metadata(); @@ -61,16 +63,16 @@ impl Command for Group { let each_group_iterator = EachGroupIterator { group_size: group_size.item, input: Box::new(input.into_iter()), - span: call.head, + span: head, }; - Ok(each_group_iterator.into_pipeline_data_with_metadata(metadata, ctrlc)) + Ok(each_group_iterator.into_pipeline_data_with_metadata(head, ctrlc, metadata)) } } struct EachGroupIterator { group_size: usize, - input: Box + Send>, + input: ValueIterator, span: Span, } diff --git a/crates/nu-command/src/filters/insert.rs b/crates/nu-command/src/filters/insert.rs index 7814bcdb83..c87a2a78b9 100644 --- a/crates/nu-command/src/filters/insert.rs +++ b/crates/nu-command/src/filters/insert.rs @@ -159,7 +159,7 @@ fn insert( } Ok(value.into_pipeline_data_with_metadata(metadata)) } - PipelineData::ListStream(mut stream, metadata) => { + PipelineData::ListStream(stream, metadata) => { if let Some(( &PathMember::Int { val, @@ -169,6 +169,7 @@ fn insert( path, )) = cell_path.members.split_first() { + let mut stream = stream.into_iter(); let mut pre_elems = vec![]; for idx in 0..val { @@ -221,40 +222,39 @@ fn insert( Ok(pre_elems .into_iter() .chain(stream) - .into_pipeline_data_with_metadata(metadata, engine_state.ctrlc.clone())) + .into_pipeline_data_with_metadata(head, engine_state.ctrlc.clone(), metadata)) } else if let Value::Closure { val, .. } = replacement { let mut closure = ClosureEval::new(engine_state, stack, val); - Ok(stream - .map(move |mut value| { - let err = insert_value_by_closure( - &mut value, - &mut closure, - head, - &cell_path.members, - false, - ); + let stream = stream.map(move |mut value| { + let err = insert_value_by_closure( + &mut value, + &mut closure, + head, + &cell_path.members, + false, + ); - if let Err(e) = err { - Value::error(e, head) - } else { - value - } - }) - .into_pipeline_data_with_metadata(metadata, engine_state.ctrlc.clone())) + if let Err(e) = err { + Value::error(e, head) + } else { + value + } + }); + Ok(PipelineData::ListStream(stream, metadata)) } else { - Ok(stream - .map(move |mut value| { - if let Err(e) = value.insert_data_at_cell_path( - &cell_path.members, - replacement.clone(), - head, - ) { - Value::error(e, head) - } else { - value - } - }) - .into_pipeline_data_with_metadata(metadata, engine_state.ctrlc.clone())) + let stream = stream.map(move |mut value| { + if let Err(e) = value.insert_data_at_cell_path( + &cell_path.members, + replacement.clone(), + head, + ) { + Value::error(e, head) + } else { + value + } + }); + + Ok(PipelineData::ListStream(stream, metadata)) } } PipelineData::Empty => Err(ShellError::IncompatiblePathAccess { diff --git a/crates/nu-command/src/filters/interleave.rs b/crates/nu-command/src/filters/interleave.rs index 3365b108db..85a92741a1 100644 --- a/crates/nu-command/src/filters/interleave.rs +++ b/crates/nu-command/src/filters/interleave.rs @@ -147,7 +147,7 @@ interleave // Now that threads are writing to the channel, we just return it as a stream Ok(rx .into_iter() - .into_pipeline_data(engine_state.ctrlc.clone())) + .into_pipeline_data(head, engine_state.ctrlc.clone())) } } diff --git a/crates/nu-command/src/filters/items.rs b/crates/nu-command/src/filters/items.rs index 5a97a5a3db..f0cba01888 100644 --- a/crates/nu-command/src/filters/items.rs +++ b/crates/nu-command/src/filters/items.rs @@ -66,7 +66,7 @@ impl Command for Items { } } }) - .into_pipeline_data(engine_state.ctrlc.clone())) + .into_pipeline_data(head, engine_state.ctrlc.clone())) } Value::Error { error, .. } => Err(*error), other => Err(ShellError::OnlySupportsThisInputType { diff --git a/crates/nu-command/src/filters/lines.rs b/crates/nu-command/src/filters/lines.rs index 0a07378afb..e3e0f5d9fd 100644 --- a/crates/nu-command/src/filters/lines.rs +++ b/crates/nu-command/src/filters/lines.rs @@ -52,9 +52,8 @@ impl Command for Lines { } PipelineData::Empty => Ok(PipelineData::Empty), PipelineData::ListStream(stream, metadata) => { - let iter = stream - .into_iter() - .filter_map(move |value| { + let stream = stream.modify(|iter| { + iter.filter_map(move |value| { let span = value.span(); if let Value::String { val, .. } = value { Some( @@ -72,11 +71,10 @@ impl Command for Lines { None } }) - .flatten(); + .flatten() + }); - Ok(iter - .into_pipeline_data(engine_state.ctrlc.clone()) - .set_metadata(metadata)) + Ok(PipelineData::ListStream(stream, metadata)) } PipelineData::Value(val, ..) => { match val { @@ -97,7 +95,7 @@ impl Command for Lines { .. } => Ok(RawStreamLinesAdapter::new(stream, head, skip_empty) .map(move |x| x.unwrap_or_else(|err| Value::error(err, head))) - .into_pipeline_data(ctrlc) + .into_pipeline_data(head, ctrlc) .set_metadata(metadata)), } } diff --git a/crates/nu-command/src/filters/merge.rs b/crates/nu-command/src/filters/merge.rs index 5af331bf0c..8a3e613c56 100644 --- a/crates/nu-command/src/filters/merge.rs +++ b/crates/nu-command/src/filters/merge.rs @@ -85,11 +85,10 @@ repeating this process with row 1, and so on."# call: &Call, input: PipelineData, ) -> Result { + let head = call.head; let merge_value: Value = call.req(engine_state, stack, 0)?; - let metadata = input.metadata(); let ctrlc = engine_state.ctrlc.clone(); - let call = call.clone(); match (&input, merge_value) { // table (list of records) @@ -104,29 +103,25 @@ repeating this process with row 1, and so on."# .into_iter() .map(move |inp| match (inp.as_record(), table_iter.next()) { (Ok(inp), Some(to_merge)) => match to_merge.as_record() { - Ok(to_merge) => Value::record(do_merge(inp, to_merge), call.head), - Err(error) => Value::error(error, call.head), + Ok(to_merge) => Value::record(do_merge(inp, to_merge), head), + Err(error) => Value::error(error, head), }, (_, None) => inp, - (Err(error), _) => Value::error(error, call.head), + (Err(error), _) => Value::error(error, head), }); - if let Some(md) = metadata { - Ok(res.into_pipeline_data_with_metadata(md, ctrlc)) - } else { - Ok(res.into_pipeline_data(ctrlc)) - } + Ok(res.into_pipeline_data_with_metadata(head, ctrlc, metadata)) } // record ( PipelineData::Value(Value::Record { val: inp, .. }, ..), Value::Record { val: to_merge, .. }, - ) => Ok(Value::record(do_merge(inp, &to_merge), call.head).into_pipeline_data()), + ) => Ok(Value::record(do_merge(inp, &to_merge), head).into_pipeline_data()), (PipelineData::Value(val, ..), ..) => { // Only point the "value originates here" arrow at the merge value // if it was generated from a block. Otherwise, point at the pipeline value. -Leon 2022-10-27 let span = if val.span() == Span::test_data() { - Span::new(call.head.start, call.head.start) + Span::new(head.start, head.start) } else { val.span() }; @@ -134,14 +129,14 @@ repeating this process with row 1, and so on."# Err(ShellError::PipelineMismatch { exp_input_type: "input, and argument, to be both record or both table" .to_string(), - dst_span: call.head, + dst_span: head, src_span: span, }) } _ => Err(ShellError::PipelineMismatch { exp_input_type: "input, and argument, to be both record or both table".to_string(), - dst_span: call.head, - src_span: Span::new(call.head.start, call.head.start), + dst_span: head, + src_span: Span::new(head.start, head.start), }), } } diff --git a/crates/nu-command/src/filters/move_.rs b/crates/nu-command/src/filters/move_.rs index ab2f8c55cd..d93368f291 100644 --- a/crates/nu-command/src/filters/move_.rs +++ b/crates/nu-command/src/filters/move_.rs @@ -109,6 +109,7 @@ impl Command for Move { call: &Call, input: PipelineData, ) -> Result { + let head = call.head; let columns: Vec = call.rest(engine_state, stack, 0)?; let after: Option = call.get_flag(engine_state, stack, "after")?; let before: Option = call.get_flag(engine_state, stack, "before")?; @@ -126,7 +127,7 @@ impl Command for Move { return Err(ShellError::GenericError { error: "Cannot move columns".into(), msg: "Use either --after, or --before, not both".into(), - span: Some(call.head), + span: Some(head), help: None, inner: vec![], }) @@ -135,7 +136,7 @@ impl Command for Move { return Err(ShellError::GenericError { error: "Cannot move columns".into(), msg: "Missing --after or --before flag".into(), - span: Some(call.head), + span: Some(head), help: None, inner: vec![], }) @@ -144,36 +145,29 @@ impl Command for Move { let metadata = input.metadata(); let ctrlc = engine_state.ctrlc.clone(); - let call = call.clone(); match input { PipelineData::Value(Value::List { .. }, ..) | PipelineData::ListStream { .. } => { let res = input.into_iter().map(move |x| match x.as_record() { Ok(record) => { - match move_record_columns(record, &columns, &before_or_after, call.head) { + match move_record_columns(record, &columns, &before_or_after, head) { Ok(val) => val, - Err(error) => Value::error(error, call.head), + Err(error) => Value::error(error, head), } } - Err(error) => Value::error(error, call.head), + Err(error) => Value::error(error, head), }); - if let Some(md) = metadata { - Ok(res.into_pipeline_data_with_metadata(md, ctrlc)) - } else { - Ok(res.into_pipeline_data(ctrlc)) - } + Ok(res.into_pipeline_data_with_metadata(head, ctrlc, metadata)) } PipelineData::Value(Value::Record { val, .. }, ..) => { - Ok( - move_record_columns(&val, &columns, &before_or_after, call.head)? - .into_pipeline_data(), - ) + Ok(move_record_columns(&val, &columns, &before_or_after, head)? + .into_pipeline_data()) } _ => Err(ShellError::PipelineMismatch { exp_input_type: "record or table".to_string(), - dst_span: call.head, - src_span: Span::new(call.head.start, call.head.start), + dst_span: head, + src_span: Span::new(head.start, head.start), }), } } diff --git a/crates/nu-command/src/filters/par_each.rs b/crates/nu-command/src/filters/par_each.rs index 4f0a07b300..52c3024270 100644 --- a/crates/nu-command/src/filters/par_each.rs +++ b/crates/nu-command/src/filters/par_each.rs @@ -159,7 +159,7 @@ impl Command for ParEach { }) .collect::>(); - apply_order(vec).into_pipeline_data(engine_state.ctrlc.clone()) + apply_order(vec).into_pipeline_data(span, engine_state.ctrlc.clone()) })), Value::Range { val, .. } => Ok(create_pool(max_threads)?.install(|| { let ctrlc = engine_state.ctrlc.clone(); @@ -186,7 +186,7 @@ impl Command for ParEach { }) .collect::>(); - apply_order(vec).into_pipeline_data(ctrlc) + apply_order(vec).into_pipeline_data(span, ctrlc) })), // This match allows non-iterables to be accepted, // which is currently considered undesirable (Nov 2022). @@ -197,6 +197,7 @@ impl Command for ParEach { } PipelineData::ListStream(stream, ..) => Ok(create_pool(max_threads)?.install(|| { let vec = stream + .into_iter() .enumerate() .par_bridge() .map(move |(index, value)| { @@ -216,7 +217,7 @@ impl Command for ParEach { }) .collect::>(); - apply_order(vec).into_pipeline_data(engine_state.ctrlc.clone()) + apply_order(vec).into_pipeline_data(head, engine_state.ctrlc.clone()) })), PipelineData::ExternalStream { stdout: None, .. } => Ok(PipelineData::empty()), PipelineData::ExternalStream { @@ -241,7 +242,7 @@ impl Command for ParEach { }) .collect::>(); - apply_order(vec).into_pipeline_data(engine_state.ctrlc.clone()) + apply_order(vec).into_pipeline_data(head, engine_state.ctrlc.clone()) })), } .and_then(|x| x.filter(|v| !v.is_nothing(), engine_state.ctrlc.clone())) diff --git a/crates/nu-command/src/filters/prepend.rs b/crates/nu-command/src/filters/prepend.rs index cef8ce8c1a..f017420595 100644 --- a/crates/nu-command/src/filters/prepend.rs +++ b/crates/nu-command/src/filters/prepend.rs @@ -117,7 +117,7 @@ only unwrap the outer list, and leave the variable's contents untouched."# .into_pipeline_data() .into_iter() .chain(input) - .into_pipeline_data_with_metadata(metadata, engine_state.ctrlc.clone())) + .into_pipeline_data_with_metadata(call.head, engine_state.ctrlc.clone(), metadata)) } } diff --git a/crates/nu-command/src/filters/range.rs b/crates/nu-command/src/filters/range.rs index 08d1c0fe42..c89a0b11f2 100644 --- a/crates/nu-command/src/filters/range.rs +++ b/crates/nu-command/src/filters/range.rs @@ -64,6 +64,7 @@ impl Command for Range { call: &Call, input: PipelineData, ) -> Result { + let head = call.head; let metadata = input.metadata(); let rows: Spanned = call.req(engine_state, stack, 0)?; @@ -102,20 +103,20 @@ impl Command for Range { }; if from > to { - Ok(PipelineData::Value(Value::nothing(call.head), None)) + Ok(PipelineData::Value(Value::nothing(head), None)) } else { let iter = v.into_iter().skip(from).take(to - from + 1); - Ok(iter.into_pipeline_data(engine_state.ctrlc.clone())) + Ok(iter.into_pipeline_data(head, engine_state.ctrlc.clone())) } } else { let from = start as usize; let to = end as usize; if from > to { - Ok(PipelineData::Value(Value::nothing(call.head), None)) + Ok(PipelineData::Value(Value::nothing(head), None)) } else { let iter = input.into_iter().skip(from).take(to - from + 1); - Ok(iter.into_pipeline_data(engine_state.ctrlc.clone())) + Ok(iter.into_pipeline_data(head, engine_state.ctrlc.clone())) } } .map(|x| x.set_metadata(metadata)) diff --git a/crates/nu-command/src/filters/reverse.rs b/crates/nu-command/src/filters/reverse.rs index db39ee3bd5..f40e5f3be4 100644 --- a/crates/nu-command/src/filters/reverse.rs +++ b/crates/nu-command/src/filters/reverse.rs @@ -59,11 +59,11 @@ impl Command for Reverse { call: &Call, input: PipelineData, ) -> Result { + let head = call.head; let metadata = input.metadata(); - - let v: Vec<_> = input.into_iter_strict(call.head)?.collect(); - let iter = v.into_iter().rev(); - Ok(iter.into_pipeline_data_with_metadata(metadata, engine_state.ctrlc.clone())) + let values = input.into_iter_strict(head)?.collect::>(); + let iter = values.into_iter().rev(); + Ok(iter.into_pipeline_data_with_metadata(head, engine_state.ctrlc.clone(), metadata)) } } diff --git a/crates/nu-command/src/filters/select.rs b/crates/nu-command/src/filters/select.rs index 5b8e9e0420..3514ac6be7 100644 --- a/crates/nu-command/src/filters/select.rs +++ b/crates/nu-command/src/filters/select.rs @@ -215,7 +215,7 @@ fn select( rows: unique_rows.into_iter().peekable(), current: 0, } - .into_pipeline_data_with_metadata(metadata, engine_state.ctrlc.clone()) + .into_pipeline_data_with_metadata(call_span, engine_state.ctrlc.clone(), metadata) } else { input }; @@ -253,9 +253,11 @@ fn select( } } - Ok(output - .into_iter() - .into_pipeline_data_with_metadata(metadata, engine_state.ctrlc.clone())) + Ok(output.into_iter().into_pipeline_data_with_metadata( + call_span, + engine_state.ctrlc.clone(), + metadata, + )) } _ => { if !columns.is_empty() { @@ -300,7 +302,11 @@ fn select( } } - Ok(values.into_pipeline_data_with_metadata(metadata, engine_state.ctrlc.clone())) + Ok(values.into_pipeline_data_with_metadata( + call_span, + engine_state.ctrlc.clone(), + metadata, + )) } _ => Ok(PipelineData::empty()), } diff --git a/crates/nu-command/src/filters/shuffle.rs b/crates/nu-command/src/filters/shuffle.rs index 598a292e5d..9e023b86c6 100644 --- a/crates/nu-command/src/filters/shuffle.rs +++ b/crates/nu-command/src/filters/shuffle.rs @@ -30,10 +30,10 @@ impl Command for Shuffle { input: PipelineData, ) -> Result { let metadata = input.metadata(); - let mut v: Vec<_> = input.into_iter_strict(call.head)?.collect(); - v.shuffle(&mut thread_rng()); - let iter = v.into_iter(); - Ok(iter.into_pipeline_data_with_metadata(metadata, engine_state.ctrlc.clone())) + let mut values = input.into_iter_strict(call.head)?.collect::>(); + values.shuffle(&mut thread_rng()); + let iter = values.into_iter(); + Ok(iter.into_pipeline_data_with_metadata(call.head, engine_state.ctrlc.clone(), metadata)) } fn examples(&self) -> Vec { diff --git a/crates/nu-command/src/filters/skip/skip_.rs b/crates/nu-command/src/filters/skip/skip_.rs index a76e2b706d..1919263aa3 100644 --- a/crates/nu-command/src/filters/skip/skip_.rs +++ b/crates/nu-command/src/filters/skip/skip_.rs @@ -101,7 +101,7 @@ impl Command for Skip { _ => Ok(input .into_iter_strict(call.head)? .skip(n) - .into_pipeline_data_with_metadata(metadata, ctrlc)), + .into_pipeline_data_with_metadata(input_span, ctrlc, metadata)), } } } diff --git a/crates/nu-command/src/filters/skip/skip_until.rs b/crates/nu-command/src/filters/skip/skip_until.rs index b0a4dd4cd9..74deeda84d 100644 --- a/crates/nu-command/src/filters/skip/skip_until.rs +++ b/crates/nu-command/src/filters/skip/skip_until.rs @@ -88,7 +88,7 @@ impl Command for SkipUntil { .map(|data| data.into_value(head).is_false()) .unwrap_or(false) }) - .into_pipeline_data_with_metadata(metadata, engine_state.ctrlc.clone())) + .into_pipeline_data_with_metadata(head, engine_state.ctrlc.clone(), metadata)) } } diff --git a/crates/nu-command/src/filters/skip/skip_while.rs b/crates/nu-command/src/filters/skip/skip_while.rs index d72bbcd6fc..a832d8f7b1 100644 --- a/crates/nu-command/src/filters/skip/skip_while.rs +++ b/crates/nu-command/src/filters/skip/skip_while.rs @@ -93,7 +93,7 @@ impl Command for SkipWhile { .map(|data| data.into_value(head).is_true()) .unwrap_or(false) }) - .into_pipeline_data_with_metadata(metadata, engine_state.ctrlc.clone())) + .into_pipeline_data_with_metadata(head, engine_state.ctrlc.clone(), metadata)) } } diff --git a/crates/nu-command/src/filters/sort.rs b/crates/nu-command/src/filters/sort.rs index cc9a72546d..965b997355 100644 --- a/crates/nu-command/src/filters/sort.rs +++ b/crates/nu-command/src/filters/sort.rs @@ -134,10 +134,11 @@ impl Command for Sort { call: &Call, input: PipelineData, ) -> Result { + let head = call.head; let reverse = call.has_flag(engine_state, stack, "reverse")?; let insensitive = call.has_flag(engine_state, stack, "ignore-case")?; let natural = call.has_flag(engine_state, stack, "natural")?; - let metadata = &input.metadata(); + let metadata = input.metadata(); let span = input.span().unwrap_or(call.head); match input { @@ -163,18 +164,18 @@ impl Command for Sort { pipe_data => { let mut vec: Vec<_> = pipe_data.into_iter().collect(); - sort(&mut vec, call.head, insensitive, natural)?; + sort(&mut vec, head, insensitive, natural)?; if reverse { vec.reverse() } let iter = vec.into_iter(); - match metadata { - Some(m) => Ok(iter - .into_pipeline_data_with_metadata(m.clone(), engine_state.ctrlc.clone())), - None => Ok(iter.into_pipeline_data(engine_state.ctrlc.clone())), - } + Ok(iter.into_pipeline_data_with_metadata( + head, + engine_state.ctrlc.clone(), + metadata, + )) } } } diff --git a/crates/nu-command/src/filters/sort_by.rs b/crates/nu-command/src/filters/sort_by.rs index 66c1f2b03f..f3f715bc9d 100644 --- a/crates/nu-command/src/filters/sort_by.rs +++ b/crates/nu-command/src/filters/sort_by.rs @@ -78,33 +78,29 @@ impl Command for SortBy { call: &Call, input: PipelineData, ) -> Result { + let head = call.head; let columns: Vec = call.rest(engine_state, stack, 0)?; let reverse = call.has_flag(engine_state, stack, "reverse")?; let insensitive = call.has_flag(engine_state, stack, "ignore-case")?; let natural = call.has_flag(engine_state, stack, "natural")?; - let metadata = &input.metadata(); - let mut vec: Vec<_> = input.into_iter_strict(call.head)?.collect(); + let metadata = input.metadata(); + let mut vec: Vec<_> = input.into_iter_strict(head)?.collect(); if columns.is_empty() { return Err(ShellError::MissingParameter { param_name: "columns".into(), - span: call.head, + span: head, }); } - crate::sort(&mut vec, columns, call.head, insensitive, natural)?; + crate::sort(&mut vec, columns, head, insensitive, natural)?; if reverse { vec.reverse() } let iter = vec.into_iter(); - match metadata { - Some(m) => { - Ok(iter.into_pipeline_data_with_metadata(m.clone(), engine_state.ctrlc.clone())) - } - None => Ok(iter.into_pipeline_data(engine_state.ctrlc.clone())), - } + Ok(iter.into_pipeline_data_with_metadata(head, engine_state.ctrlc.clone(), metadata)) } } diff --git a/crates/nu-command/src/filters/take/take_.rs b/crates/nu-command/src/filters/take/take_.rs index a5ffe25301..01700420b8 100644 --- a/crates/nu-command/src/filters/take/take_.rs +++ b/crates/nu-command/src/filters/take/take_.rs @@ -42,6 +42,7 @@ impl Command for Take { call: &Call, input: PipelineData, ) -> Result { + let head = call.head; let rows_desired: usize = call.req(engine_state, stack, 0)?; let ctrlc = engine_state.ctrlc.clone(); @@ -54,7 +55,7 @@ impl Command for Take { Value::List { vals, .. } => Ok(vals .into_iter() .take(rows_desired) - .into_pipeline_data_with_metadata(metadata, ctrlc)), + .into_pipeline_data_with_metadata(head, ctrlc, metadata)), Value::Binary { val, .. } => { let slice: Vec = val.into_iter().take(rows_desired).collect(); Ok(PipelineData::Value(Value::binary(slice, span), metadata)) @@ -62,33 +63,34 @@ impl Command for Take { Value::Range { val, .. } => Ok(val .into_range_iter(span, ctrlc.clone()) .take(rows_desired) - .into_pipeline_data_with_metadata(metadata, ctrlc)), + .into_pipeline_data_with_metadata(head, ctrlc, metadata)), // Propagate errors by explicitly matching them before the final case. Value::Error { error, .. } => Err(*error), other => Err(ShellError::OnlySupportsThisInputType { exp_input_type: "list, binary or range".into(), wrong_type: other.get_type().to_string(), - dst_span: call.head, + dst_span: head, src_span: other.span(), }), } } - PipelineData::ListStream(ls, metadata) => Ok(ls - .take(rows_desired) - .into_pipeline_data_with_metadata(metadata, ctrlc)), + PipelineData::ListStream(stream, metadata) => Ok(PipelineData::ListStream( + stream.modify(|iter| iter.take(rows_desired)), + metadata, + )), PipelineData::ExternalStream { span, .. } => { Err(ShellError::OnlySupportsThisInputType { exp_input_type: "list, binary or range".into(), wrong_type: "raw data".into(), - dst_span: call.head, + dst_span: head, src_span: span, }) } PipelineData::Empty => Err(ShellError::OnlySupportsThisInputType { exp_input_type: "list, binary or range".into(), wrong_type: "null".into(), - dst_span: call.head, - src_span: call.head, + dst_span: head, + src_span: head, }), } } diff --git a/crates/nu-command/src/filters/take/take_until.rs b/crates/nu-command/src/filters/take/take_until.rs index 40d2eee019..e3a2a37162 100644 --- a/crates/nu-command/src/filters/take/take_until.rs +++ b/crates/nu-command/src/filters/take/take_until.rs @@ -84,7 +84,7 @@ impl Command for TakeUntil { .map(|data| data.into_value(head).is_false()) .unwrap_or(false) }) - .into_pipeline_data_with_metadata(metadata, engine_state.ctrlc.clone())) + .into_pipeline_data_with_metadata(head, engine_state.ctrlc.clone(), metadata)) } } diff --git a/crates/nu-command/src/filters/take/take_while.rs b/crates/nu-command/src/filters/take/take_while.rs index 1d2a98ee51..632c165847 100644 --- a/crates/nu-command/src/filters/take/take_while.rs +++ b/crates/nu-command/src/filters/take/take_while.rs @@ -84,7 +84,7 @@ impl Command for TakeWhile { .map(|data| data.into_value(head).is_true()) .unwrap_or(false) }) - .into_pipeline_data_with_metadata(metadata, engine_state.ctrlc.clone())) + .into_pipeline_data_with_metadata(head, engine_state.ctrlc.clone(), metadata)) } } diff --git a/crates/nu-command/src/filters/tee.rs b/crates/nu-command/src/filters/tee.rs index 5287f44512..319f70905c 100644 --- a/crates/nu-command/src/filters/tee.rs +++ b/crates/nu-command/src/filters/tee.rs @@ -64,6 +64,7 @@ use it in your pipeline."# call: &Call, input: PipelineData, ) -> Result { + let head = call.head; let use_stderr = call.has_flag(engine_state, stack, "stderr")?; let Spanned { @@ -125,7 +126,7 @@ use it in your pipeline."# if use_stderr { let stderr = stderr .map(|stderr| { - let iter = tee(stderr.stream, with_stream).err_span(call.head)?; + let iter = tee(stderr.stream, with_stream).err_span(head)?; Ok::<_, ShellError>(RawStream::new( Box::new(iter.map(flatten_result)), stderr.ctrlc, @@ -145,7 +146,7 @@ use it in your pipeline."# } else { let stdout = stdout .map(|stdout| { - let iter = tee(stdout.stream, with_stream).err_span(call.head)?; + let iter = tee(stdout.stream, with_stream).err_span(head)?; Ok::<_, ShellError>(RawStream::new( Box::new(iter.map(flatten_result)), stdout.ctrlc, @@ -168,15 +169,16 @@ use it in your pipeline."# _ if use_stderr => Err(ShellError::UnsupportedInput { msg: "--stderr can only be used on external streams".into(), input: "the input to `tee` is not an external stream".into(), - msg_span: call.head, - input_span: input.span().unwrap_or(call.head), + msg_span: head, + input_span: input.span().unwrap_or(head), }), // Handle others with the plain iterator _ => { let teed = tee(input.into_iter(), move |rx| { let input_from_channel = rx.into_pipeline_data_with_metadata( - metadata_clone, + head, closure_engine_state.ctrlc.clone(), + metadata_clone, ); let result = eval_block_with_early_return( &closure_engine_state, @@ -187,9 +189,13 @@ use it in your pipeline."# // Make sure to drain any iterator produced to avoid unexpected behavior result.and_then(|data| data.drain()) }) - .err_span(call.head)? + .err_span(head)? .map(move |result| result.unwrap_or_else(|err| Value::error(err, closure_span))) - .into_pipeline_data_with_metadata(metadata, engine_state.ctrlc.clone()); + .into_pipeline_data_with_metadata( + head, + engine_state.ctrlc.clone(), + metadata, + ); Ok(teed) } diff --git a/crates/nu-command/src/filters/transpose.rs b/crates/nu-command/src/filters/transpose.rs index 9a41457a8f..49caa56d18 100644 --- a/crates/nu-command/src/filters/transpose.rs +++ b/crates/nu-command/src/filters/transpose.rs @@ -284,7 +284,7 @@ pub fn transpose( metadata, )) } else { - Ok(result_data.into_pipeline_data_with_metadata(metadata, ctrlc)) + Ok(result_data.into_pipeline_data_with_metadata(name, ctrlc, metadata)) } } diff --git a/crates/nu-command/src/filters/update.rs b/crates/nu-command/src/filters/update.rs index 76e0674ad8..2cea7deead 100644 --- a/crates/nu-command/src/filters/update.rs +++ b/crates/nu-command/src/filters/update.rs @@ -143,7 +143,7 @@ fn update( } Ok(value.into_pipeline_data_with_metadata(metadata)) } - PipelineData::ListStream(mut stream, metadata) => { + PipelineData::ListStream(stream, metadata) => { if let Some(( &PathMember::Int { val, @@ -153,6 +153,7 @@ fn update( path, )) = cell_path.members.split_first() { + let mut stream = stream.into_iter(); let mut pre_elems = vec![]; for idx in 0..=val { @@ -186,38 +187,38 @@ fn update( Ok(pre_elems .into_iter() .chain(stream) - .into_pipeline_data_with_metadata(metadata, engine_state.ctrlc.clone())) + .into_pipeline_data_with_metadata(head, engine_state.ctrlc.clone(), metadata)) } else if let Value::Closure { val, .. } = replacement { let mut closure = ClosureEval::new(engine_state, stack, val); - Ok(stream - .map(move |mut value| { - let err = update_value_by_closure( - &mut value, - &mut closure, - head, - &cell_path.members, - false, - ); + let stream = stream.map(move |mut value| { + let err = update_value_by_closure( + &mut value, + &mut closure, + head, + &cell_path.members, + false, + ); - if let Err(e) = err { - Value::error(e, head) - } else { - value - } - }) - .into_pipeline_data_with_metadata(metadata, engine_state.ctrlc.clone())) + if let Err(e) = err { + Value::error(e, head) + } else { + value + } + }); + + Ok(PipelineData::ListStream(stream, metadata)) } else { - Ok(stream - .map(move |mut value| { - if let Err(e) = - value.update_data_at_cell_path(&cell_path.members, replacement.clone()) - { - Value::error(e, head) - } else { - value - } - }) - .into_pipeline_data_with_metadata(metadata, engine_state.ctrlc.clone())) + let stream = stream.map(move |mut value| { + if let Err(e) = + value.update_data_at_cell_path(&cell_path.members, replacement.clone()) + { + Value::error(e, head) + } else { + value + } + }); + + Ok(PipelineData::ListStream(stream, metadata)) } } PipelineData::Empty => Err(ShellError::IncompatiblePathAccess { diff --git a/crates/nu-command/src/filters/upsert.rs b/crates/nu-command/src/filters/upsert.rs index e62239f562..b7b4a782f2 100644 --- a/crates/nu-command/src/filters/upsert.rs +++ b/crates/nu-command/src/filters/upsert.rs @@ -189,7 +189,7 @@ fn upsert( } Ok(value.into_pipeline_data_with_metadata(metadata)) } - PipelineData::ListStream(mut stream, metadata) => { + PipelineData::ListStream(stream, metadata) => { if let Some(( &PathMember::Int { val, @@ -199,6 +199,7 @@ fn upsert( path, )) = cell_path.members.split_first() { + let mut stream = stream.into_iter(); let mut pre_elems = vec![]; for idx in 0..val { @@ -246,38 +247,38 @@ fn upsert( Ok(pre_elems .into_iter() .chain(stream) - .into_pipeline_data_with_metadata(metadata, engine_state.ctrlc.clone())) + .into_pipeline_data_with_metadata(head, engine_state.ctrlc.clone(), metadata)) } else if let Value::Closure { val, .. } = replacement { let mut closure = ClosureEval::new(engine_state, stack, val); - Ok(stream - .map(move |mut value| { - let err = upsert_value_by_closure( - &mut value, - &mut closure, - head, - &cell_path.members, - false, - ); + let stream = stream.map(move |mut value| { + let err = upsert_value_by_closure( + &mut value, + &mut closure, + head, + &cell_path.members, + false, + ); - if let Err(e) = err { - Value::error(e, head) - } else { - value - } - }) - .into_pipeline_data_with_metadata(metadata, engine_state.ctrlc.clone())) + if let Err(e) = err { + Value::error(e, head) + } else { + value + } + }); + + Ok(PipelineData::ListStream(stream, metadata)) } else { - Ok(stream - .map(move |mut value| { - if let Err(e) = - value.upsert_data_at_cell_path(&cell_path.members, replacement.clone()) - { - Value::error(e, head) - } else { - value - } - }) - .into_pipeline_data_with_metadata(metadata, engine_state.ctrlc.clone())) + let stream = stream.map(move |mut value| { + if let Err(e) = + value.upsert_data_at_cell_path(&cell_path.members, replacement.clone()) + { + Value::error(e, head) + } else { + value + } + }); + + Ok(PipelineData::ListStream(stream, metadata)) } } PipelineData::Empty => Err(ShellError::IncompatiblePathAccess { diff --git a/crates/nu-command/src/filters/values.rs b/crates/nu-command/src/filters/values.rs index 8b35352150..aa576de874 100644 --- a/crates/nu-command/src/filters/values.rs +++ b/crates/nu-command/src/filters/values.rs @@ -144,7 +144,7 @@ fn values( Value::List { vals, .. } => match get_values(&vals, head, span) { Ok(cols) => Ok(cols .into_iter() - .into_pipeline_data_with_metadata(metadata, ctrlc)), + .into_pipeline_data_with_metadata(head, ctrlc, metadata)), Err(err) => Err(err), }, Value::Custom { val, .. } => { @@ -152,7 +152,7 @@ fn values( match get_values(&[input_as_base_value], head, span) { Ok(cols) => Ok(cols .into_iter() - .into_pipeline_data_with_metadata(metadata, ctrlc)), + .into_pipeline_data_with_metadata(head, ctrlc, metadata)), Err(err) => Err(err), } } @@ -160,7 +160,7 @@ fn values( .values() .cloned() .collect::>() - .into_pipeline_data_with_metadata(metadata, ctrlc)), + .into_pipeline_data_with_metadata(head, ctrlc, metadata)), // Propagate errors Value::Error { error, .. } => Err(*error), other => Err(ShellError::OnlySupportsThisInputType { @@ -176,7 +176,7 @@ fn values( match get_values(&vals, head, head) { Ok(cols) => Ok(cols .into_iter() - .into_pipeline_data_with_metadata(metadata, ctrlc)), + .into_pipeline_data_with_metadata(head, ctrlc, metadata)), Err(err) => Err(err), } } diff --git a/crates/nu-command/src/filters/where_.rs b/crates/nu-command/src/filters/where_.rs index cb35bd8876..7507a7ede1 100644 --- a/crates/nu-command/src/filters/where_.rs +++ b/crates/nu-command/src/filters/where_.rs @@ -61,7 +61,7 @@ not supported."# Ok(data) => data.into_value(head).is_true().then_some(value), Err(err) => Some(Value::error(err, head)), }) - .into_pipeline_data_with_metadata(metadata, engine_state.ctrlc.clone())) + .into_pipeline_data_with_metadata(head, engine_state.ctrlc.clone(), metadata)) } fn examples(&self) -> Vec { diff --git a/crates/nu-command/src/filters/window.rs b/crates/nu-command/src/filters/window.rs index 5c84ad7e98..5b386b9f84 100644 --- a/crates/nu-command/src/filters/window.rs +++ b/crates/nu-command/src/filters/window.rs @@ -1,4 +1,5 @@ use nu_engine::command_prelude::*; +use nu_protocol::ValueIterator; #[derive(Clone)] pub struct Window; @@ -110,6 +111,7 @@ impl Command for Window { call: &Call, input: PipelineData, ) -> Result { + let head = call.head; let group_size: Spanned = call.req(engine_state, stack, 0)?; let ctrlc = engine_state.ctrlc.clone(); let metadata = input.metadata(); @@ -123,19 +125,19 @@ impl Command for Window { let each_group_iterator = EachWindowIterator { group_size: group_size.item, input: Box::new(input.into_iter()), - span: call.head, + span: head, previous: None, stride, remainder, }; - Ok(each_group_iterator.into_pipeline_data_with_metadata(metadata, ctrlc)) + Ok(each_group_iterator.into_pipeline_data_with_metadata(head, ctrlc, metadata)) } } struct EachWindowIterator { group_size: usize, - input: Box + Send>, + input: ValueIterator, span: Span, previous: Option>, stride: usize, diff --git a/crates/nu-command/src/filters/wrap.rs b/crates/nu-command/src/filters/wrap.rs index a5ccf9eed2..24ce8e6821 100644 --- a/crates/nu-command/src/filters/wrap.rs +++ b/crates/nu-command/src/filters/wrap.rs @@ -42,9 +42,9 @@ impl Command for Wrap { | PipelineData::ListStream { .. } => Ok(input .into_iter() .map(move |x| Value::record(record! { name.clone() => x }, span)) - .into_pipeline_data_with_metadata(metadata, engine_state.ctrlc.clone())), + .into_pipeline_data_with_metadata(span, engine_state.ctrlc.clone(), metadata)), PipelineData::ExternalStream { .. } => Ok(Value::record( - record! { name => input.into_value(call.head) }, + record! { name => input.into_value(span) }, span, ) .into_pipeline_data_with_metadata(metadata)), diff --git a/crates/nu-command/src/filters/zip.rs b/crates/nu-command/src/filters/zip.rs index 0a30450091..f4ee739f50 100644 --- a/crates/nu-command/src/filters/zip.rs +++ b/crates/nu-command/src/filters/zip.rs @@ -112,7 +112,7 @@ impl Command for Zip { .into_iter() .zip(other) .map(move |(x, y)| Value::list(vec![x, y], head)) - .into_pipeline_data_with_metadata(metadata, engine_state.ctrlc.clone())) + .into_pipeline_data_with_metadata(head, engine_state.ctrlc.clone(), metadata)) } } diff --git a/crates/nu-command/src/formats/from/json.rs b/crates/nu-command/src/formats/from/json.rs index 5baaea30dd..44f127152f 100644 --- a/crates/nu-command/src/formats/from/json.rs +++ b/crates/nu-command/src/formats/from/json.rs @@ -84,8 +84,11 @@ impl Command for FromJson { .collect() }; - Ok(converted_lines - .into_pipeline_data_with_metadata(metadata, engine_state.ctrlc.clone())) + Ok(converted_lines.into_pipeline_data_with_metadata( + span, + engine_state.ctrlc.clone(), + metadata, + )) } else if strict { Ok(convert_string_to_value_strict(&string_input, span)? .into_pipeline_data_with_metadata(metadata)) diff --git a/crates/nu-command/src/formats/from/msgpack.rs b/crates/nu-command/src/formats/from/msgpack.rs index 0311ecfd1a..75f2be2056 100644 --- a/crates/nu-command/src/formats/from/msgpack.rs +++ b/crates/nu-command/src/formats/from/msgpack.rs @@ -109,10 +109,9 @@ MessagePack: https://msgpack.org/ call: &Call, input: PipelineData, ) -> Result { - let span = input.span().unwrap_or(call.head); let objects = call.has_flag(engine_state, stack, "objects")?; let opts = Opts { - span, + span: call.head, objects, ctrlc: engine_state.ctrlc.clone(), }; @@ -126,10 +125,10 @@ MessagePack: https://msgpack.org/ stdout: Some(raw_stream), .. } => read_msgpack(ReadRawStream::new(raw_stream), opts), - _ => Err(ShellError::PipelineMismatch { + input => Err(ShellError::PipelineMismatch { exp_input_type: "binary".into(), dst_span: call.head, - src_span: span, + src_span: input.span().unwrap_or(call.head), }), } } @@ -257,7 +256,7 @@ pub(crate) fn read_msgpack( None } }) - .into_pipeline_data(ctrlc)) + .into_pipeline_data(span, ctrlc)) } else { // Read a single value and then make sure it's EOF let result = read_value(&mut input, span, 0)?; diff --git a/crates/nu-command/src/formats/to/text.rs b/crates/nu-command/src/formats/to/text.rs index 2515ca5f92..7c12dc2821 100644 --- a/crates/nu-command/src/formats/to/text.rs +++ b/crates/nu-command/src/formats/to/text.rs @@ -1,6 +1,6 @@ use chrono_humanize::HumanTime; use nu_engine::command_prelude::*; -use nu_protocol::{format_duration, format_filesize_from_conf, Config, ListStream, RawStream}; +use nu_protocol::{format_duration, format_filesize_from_conf, Config, RawStream, ValueIterator}; #[derive(Clone)] pub struct ToText; @@ -41,7 +41,7 @@ impl Command for ToText { Ok(PipelineData::ExternalStream { stdout: Some(RawStream::new( Box::new(ListStreamIterator { - stream, + stream: stream.into_inner(), separator: line_ending.into(), config: config.clone(), }), @@ -86,7 +86,7 @@ impl Command for ToText { } struct ListStreamIterator { - stream: ListStream, + stream: ValueIterator, separator: String, config: Config, } diff --git a/crates/nu-command/src/generators/generate.rs b/crates/nu-command/src/generators/generate.rs index 96343fd093..0a01b79c08 100644 --- a/crates/nu-command/src/generators/generate.rs +++ b/crates/nu-command/src/generators/generate.rs @@ -183,7 +183,7 @@ used as the next argument to the closure, otherwise generation stops. Ok(iter .flatten() - .into_pipeline_data(engine_state.ctrlc.clone())) + .into_pipeline_data(call.head, engine_state.ctrlc.clone())) } } diff --git a/crates/nu-command/src/generators/seq.rs b/crates/nu-command/src/generators/seq.rs index f43454b691..359d479ca0 100644 --- a/crates/nu-command/src/generators/seq.rs +++ b/crates/nu-command/src/generators/seq.rs @@ -1,4 +1,5 @@ use nu_engine::command_prelude::*; +use nu_protocol::ListStream; #[derive(Clone)] pub struct Seq; @@ -119,36 +120,32 @@ pub fn run_seq( let step = if free.len() > 2 { free[1] } else { 1.0 }; let last = { free[free.len() - 1] }; - if !contains_decimals { - // integers only - Ok(PipelineData::ListStream( - nu_protocol::ListStream::from_stream( - IntSeq { - count: first as i64, - step: step as i64, - last: last as i64, - span, - }, - engine_state.ctrlc.clone(), - ), - None, - )) + let stream = if !contains_decimals { + ListStream::new( + IntSeq { + count: first as i64, + step: step as i64, + last: last as i64, + span, + }, + span, + engine_state.ctrlc.clone(), + ) } else { - // floats - Ok(PipelineData::ListStream( - nu_protocol::ListStream::from_stream( - FloatSeq { - first, - step, - last, - index: 0, - span, - }, - engine_state.ctrlc.clone(), - ), - None, - )) - } + ListStream::new( + FloatSeq { + first, + step, + last, + index: 0, + span, + }, + span, + engine_state.ctrlc.clone(), + ) + }; + + Ok(stream.into()) } struct FloatSeq { diff --git a/crates/nu-command/src/help/help_aliases.rs b/crates/nu-command/src/help/help_aliases.rs index f97084bb51..2cc7c7f073 100644 --- a/crates/nu-command/src/help/help_aliases.rs +++ b/crates/nu-command/src/help/help_aliases.rs @@ -92,17 +92,12 @@ pub fn help_aliases( &highlight_style, )?; - return Ok(found_cmds_vec - .into_iter() - .into_pipeline_data(engine_state.ctrlc.clone())); + return Ok(Value::list(found_cmds_vec, head).into_pipeline_data()); } if rest.is_empty() { let found_cmds_vec = build_help_aliases(engine_state, stack, head); - - Ok(found_cmds_vec - .into_iter() - .into_pipeline_data(engine_state.ctrlc.clone())) + Ok(Value::list(found_cmds_vec, head).into_pipeline_data()) } else { let mut name = String::new(); diff --git a/crates/nu-command/src/help/help_commands.rs b/crates/nu-command/src/help/help_commands.rs index 128c838efd..bc508b249b 100644 --- a/crates/nu-command/src/help/help_commands.rs +++ b/crates/nu-command/src/help/help_commands.rs @@ -72,17 +72,12 @@ pub fn help_commands( &highlight_style, )?; - return Ok(found_cmds_vec - .into_iter() - .into_pipeline_data(engine_state.ctrlc.clone())); + return Ok(Value::list(found_cmds_vec, head).into_pipeline_data()); } if rest.is_empty() { let found_cmds_vec = build_help_commands(engine_state, head); - - Ok(found_cmds_vec - .into_iter() - .into_pipeline_data(engine_state.ctrlc.clone())) + Ok(Value::list(found_cmds_vec, head).into_pipeline_data()) } else { let mut name = String::new(); diff --git a/crates/nu-command/src/help/help_escapes.rs b/crates/nu-command/src/help/help_escapes.rs index 73a83f1175..212cbb58bf 100644 --- a/crates/nu-command/src/help/help_escapes.rs +++ b/crates/nu-command/src/help/help_escapes.rs @@ -21,7 +21,7 @@ impl Command for HelpEscapes { fn run( &self, - engine_state: &EngineState, + _engine_state: &EngineState, _stack: &mut Stack, call: &Call, _input: PipelineData, @@ -40,9 +40,7 @@ impl Command for HelpEscapes { )); } - Ok(recs - .into_iter() - .into_pipeline_data(engine_state.ctrlc.clone())) + Ok(Value::list(recs, call.head).into_pipeline_data()) } } diff --git a/crates/nu-command/src/help/help_externs.rs b/crates/nu-command/src/help/help_externs.rs index 3aad5b27fe..624a8d8060 100644 --- a/crates/nu-command/src/help/help_externs.rs +++ b/crates/nu-command/src/help/help_externs.rs @@ -92,17 +92,12 @@ pub fn help_externs( &highlight_style, )?; - return Ok(found_cmds_vec - .into_iter() - .into_pipeline_data(engine_state.ctrlc.clone())); + return Ok(Value::list(found_cmds_vec, head).into_pipeline_data()); } if rest.is_empty() { let found_cmds_vec = build_help_externs(engine_state, stack, head); - - Ok(found_cmds_vec - .into_iter() - .into_pipeline_data(engine_state.ctrlc.clone())) + Ok(Value::list(found_cmds_vec, head).into_pipeline_data()) } else { let mut name = String::new(); diff --git a/crates/nu-command/src/help/help_modules.rs b/crates/nu-command/src/help/help_modules.rs index e51b52154b..f2ddf55f1d 100644 --- a/crates/nu-command/src/help/help_modules.rs +++ b/crates/nu-command/src/help/help_modules.rs @@ -98,17 +98,12 @@ pub fn help_modules( &highlight_style, )?; - return Ok(found_cmds_vec - .into_iter() - .into_pipeline_data(engine_state.ctrlc.clone())); + return Ok(Value::list(found_cmds_vec, head).into_pipeline_data()); } if rest.is_empty() { let found_cmds_vec = build_help_modules(engine_state, stack, head); - - Ok(found_cmds_vec - .into_iter() - .into_pipeline_data(engine_state.ctrlc.clone())) + Ok(Value::list(found_cmds_vec, head).into_pipeline_data()) } else { let mut name = String::new(); diff --git a/crates/nu-command/src/help/help_operators.rs b/crates/nu-command/src/help/help_operators.rs index 72c9e752e9..a7beee3dd5 100644 --- a/crates/nu-command/src/help/help_operators.rs +++ b/crates/nu-command/src/help/help_operators.rs @@ -21,7 +21,7 @@ impl Command for HelpOperators { fn run( &self, - engine_state: &EngineState, + _engine_state: &EngineState, _stack: &mut Stack, call: &Call, _input: PipelineData, @@ -43,9 +43,7 @@ impl Command for HelpOperators { )); } - Ok(recs - .into_iter() - .into_pipeline_data(engine_state.ctrlc.clone())) + Ok(Value::list(recs, head).into_pipeline_data()) } } diff --git a/crates/nu-command/src/math/utils.rs b/crates/nu-command/src/math/utils.rs index 9abcce06e9..9d2c15e15f 100644 --- a/crates/nu-command/src/math/utils.rs +++ b/crates/nu-command/src/math/utils.rs @@ -69,7 +69,7 @@ pub fn calculate( let span = values.span().unwrap_or(name); match values { PipelineData::ListStream(s, ..) => { - helper_for_tables(&s.collect::>(), span, name, mf) + helper_for_tables(&s.into_iter().collect::>(), span, name, mf) } PipelineData::Value(Value::List { ref vals, .. }, ..) => match &vals[..] { [Value::Record { .. }, _end @ ..] => helper_for_tables( diff --git a/crates/nu-command/src/platform/ansi/ansi_.rs b/crates/nu-command/src/platform/ansi/ansi_.rs index e4f2bca378..427c7c3a73 100644 --- a/crates/nu-command/src/platform/ansi/ansi_.rs +++ b/crates/nu-command/src/platform/ansi/ansi_.rs @@ -660,7 +660,7 @@ Operating system commands: let ctrlc = engine_state.ctrlc.clone(); if list { - return generate_ansi_code_list(ctrlc, call.head, use_ansi_coloring); + return Ok(generate_ansi_code_list(ctrlc, call.head, use_ansi_coloring)); } // The code can now be one of the ansi abbreviations like green_bold @@ -694,7 +694,7 @@ Operating system commands: let ctrlc = working_set.permanent().ctrlc.clone(); if list { - return generate_ansi_code_list(ctrlc, call.head, use_ansi_coloring); + return Ok(generate_ansi_code_list(ctrlc, call.head, use_ansi_coloring)); } // The code can now be one of the ansi abbreviations like green_bold @@ -833,8 +833,8 @@ fn generate_ansi_code_list( ctrlc: Option>, call_span: Span, use_ansi_coloring: bool, -) -> Result { - return Ok(CODE_LIST +) -> PipelineData { + CODE_LIST .iter() .enumerate() .map(move |(i, ansi_code)| { @@ -865,7 +865,7 @@ fn generate_ansi_code_list( Value::record(record, call_span) }) - .into_pipeline_data(ctrlc)); + .into_pipeline_data(call_span, ctrlc) } fn build_ansi_hashmap(v: &[AnsiCode]) -> HashMap<&str, &str> { diff --git a/crates/nu-command/src/random/dice.rs b/crates/nu-command/src/random/dice.rs index b0569c697e..2fb659ad1e 100644 --- a/crates/nu-command/src/random/dice.rs +++ b/crates/nu-command/src/random/dice.rs @@ -78,10 +78,7 @@ fn dice( Value::int(thread_rng.gen_range(1..sides + 1) as i64, span) }); - Ok(PipelineData::ListStream( - ListStream::from_stream(iter, engine_state.ctrlc.clone()), - None, - )) + Ok(ListStream::new(iter, span, engine_state.ctrlc.clone()).into()) } #[cfg(test)] diff --git a/crates/nu-command/src/strings/char_.rs b/crates/nu-command/src/strings/char_.rs index befab1a93d..e227f2667f 100644 --- a/crates/nu-command/src/strings/char_.rs +++ b/crates/nu-command/src/strings/char_.rs @@ -230,7 +230,7 @@ impl Command for Char { // handle -l flag if list { - return generate_character_list(ctrlc, call.head); + return Ok(generate_character_list(ctrlc, call.head)); } // handle -i flag @@ -265,7 +265,7 @@ impl Command for Char { // handle -l flag if list { - return generate_character_list(ctrlc, call_span); + return Ok(generate_character_list(ctrlc, call_span)); } // handle -i flag @@ -286,11 +286,8 @@ impl Command for Char { } } -fn generate_character_list( - ctrlc: Option>, - call_span: Span, -) -> Result { - Ok(CHAR_MAP +fn generate_character_list(ctrlc: Option>, call_span: Span) -> PipelineData { + CHAR_MAP .iter() .map(move |(name, s)| { let unicode = Value::string( @@ -308,7 +305,7 @@ fn generate_character_list( Value::record(record, call_span) }) - .into_pipeline_data(ctrlc)) + .into_pipeline_data(call_span, ctrlc) } fn handle_integer_flag( diff --git a/crates/nu-command/src/strings/detect_columns.rs b/crates/nu-command/src/strings/detect_columns.rs index 9c33ffa494..74bcf07820 100644 --- a/crates/nu-command/src/strings/detect_columns.rs +++ b/crates/nu-command/src/strings/detect_columns.rs @@ -159,7 +159,7 @@ fn guess_width( Err(e) => Value::error(e, input_span), } }) - .into_pipeline_data(engine_state.ctrlc.clone())) + .into_pipeline_data(input_span, engine_state.ctrlc.clone())) } else { let length = result[0].len(); let columns: Vec = (0..length).map(|n| format!("column{n}")).collect(); @@ -184,7 +184,7 @@ fn guess_width( Err(e) => Value::error(e, input_span), } }) - .into_pipeline_data(engine_state.ctrlc.clone())) + .into_pipeline_data(input_span, engine_state.ctrlc.clone())) } } @@ -278,7 +278,7 @@ fn detect_columns( None => Value::record(record, name_span), } }) - .into_pipeline_data(ctrlc)) + .into_pipeline_data(call.head, ctrlc)) } else { Ok(PipelineData::empty()) } diff --git a/crates/nu-command/src/strings/parse.rs b/crates/nu-command/src/strings/parse.rs index 658fc20361..8f6d35b0b3 100644 --- a/crates/nu-command/src/strings/parse.rs +++ b/crates/nu-command/src/strings/parse.rs @@ -1,6 +1,6 @@ use fancy_regex::Regex; use nu_engine::command_prelude::*; -use nu_protocol::ListStream; +use nu_protocol::{ListStream, ValueIterator}; use std::sync::{ atomic::{AtomicBool, Ordering}, Arc, @@ -187,44 +187,36 @@ fn operate( } } - Ok(PipelineData::ListStream( - ListStream::from_stream(parsed.into_iter(), ctrlc), - None, - )) + Ok(ListStream::new(parsed.into_iter(), head, ctrlc).into()) } - PipelineData::ListStream(stream, ..) => Ok(PipelineData::ListStream( - ListStream::from_stream( - ParseStreamer { - span: head, - excess: Vec::new(), - regex: regex_pattern, - columns, - stream: stream.stream, - ctrlc: ctrlc.clone(), - }, + PipelineData::ListStream(stream, ..) => Ok(stream + .modify(|stream| ParseStreamer { + span: head, + excess: Vec::new(), + regex: regex_pattern, + columns, + stream, ctrlc, - ), - None, - )), + }) + .into()), PipelineData::ExternalStream { stdout: None, .. } => Ok(PipelineData::Empty), PipelineData::ExternalStream { stdout: Some(stream), .. - } => Ok(PipelineData::ListStream( - ListStream::from_stream( - ParseStreamerExternal { - span: head, - excess: Vec::new(), - regex: regex_pattern, - columns, - stream: stream.stream, - }, - ctrlc, - ), - None, - )), + } => Ok(ListStream::new( + ParseStreamerExternal { + span: head, + excess: Vec::new(), + regex: regex_pattern, + columns, + stream: stream.stream, + }, + head, + ctrlc, + ) + .into()), } } @@ -299,7 +291,7 @@ pub struct ParseStreamer { excess: Vec, regex: Regex, columns: Vec, - stream: Box + Send + 'static>, + stream: ValueIterator, ctrlc: Option>, } diff --git a/crates/nu-command/src/strings/split/list.rs b/crates/nu-command/src/strings/split/list.rs index 08bd8fbc61..470ba12ec4 100644 --- a/crates/nu-command/src/strings/split/list.rs +++ b/crates/nu-command/src/strings/split/list.rs @@ -196,9 +196,12 @@ fn split_list( let mut temp_list = Vec::new(); let mut returned_list = Vec::new(); - let iter = input.into_interruptible_iter(engine_state.ctrlc.clone()); let matcher = Matcher::new(call.has_flag(engine_state, stack, "regex")?, separator)?; - for val in iter { + for val in input { + if nu_utils::ctrl_c::was_pressed(&engine_state.ctrlc) { + break; + } + if matcher.compare(&val)? { if !temp_list.is_empty() { returned_list.push(Value::list(temp_list.clone(), call.head)); diff --git a/crates/nu-command/src/system/complete.rs b/crates/nu-command/src/system/complete.rs index 80dae4d37a..c622c86f3c 100644 --- a/crates/nu-command/src/system/complete.rs +++ b/crates/nu-command/src/system/complete.rs @@ -88,7 +88,7 @@ impl Command for Complete { }; if let Some(exit_code) = exit_code { - let mut v: Vec<_> = exit_code.collect(); + let mut v: Vec<_> = exit_code.into_iter().collect(); if let Some(v) = v.pop() { record.push("exit_code", v); diff --git a/crates/nu-command/src/system/ps.rs b/crates/nu-command/src/system/ps.rs index 11eb66011d..922bf7915b 100644 --- a/crates/nu-command/src/system/ps.rs +++ b/crates/nu-command/src/system/ps.rs @@ -195,5 +195,5 @@ fn run_ps( Ok(output .into_iter() - .into_pipeline_data(engine_state.ctrlc.clone())) + .into_pipeline_data(span, engine_state.ctrlc.clone())) } diff --git a/crates/nu-command/src/system/registry_query.rs b/crates/nu-command/src/system/registry_query.rs index c926ad6aaf..1e2a328356 100644 --- a/crates/nu-command/src/system/registry_query.rs +++ b/crates/nu-command/src/system/registry_query.rs @@ -106,7 +106,7 @@ fn registry_query( *registry_key_span, )) } - Ok(reg_values.into_pipeline_data(engine_state.ctrlc.clone())) + Ok(reg_values.into_pipeline_data(call_span, engine_state.ctrlc.clone())) } else { match registry_value { Some(value) => { diff --git a/crates/nu-command/src/system/run_external.rs b/crates/nu-command/src/system/run_external.rs index 734488fd81..e73bd4ab50 100644 --- a/crates/nu-command/src/system/run_external.rs +++ b/crates/nu-command/src/system/run_external.rs @@ -163,8 +163,6 @@ impl ExternalCommand { ) -> Result { let head = self.name.span; - let ctrlc = engine_state.ctrlc.clone(); - #[allow(unused_mut)] let (cmd, mut reader) = self.create_process(&input, false, head)?; @@ -431,7 +429,7 @@ impl ExternalCommand { ( Some(RawStream::new( Box::new(ByteLines::new(combined)), - ctrlc.clone(), + engine_state.ctrlc.clone(), head, None, )), @@ -439,11 +437,21 @@ impl ExternalCommand { ) } else { let stdout = child.as_mut().stdout.take().map(|out| { - RawStream::new(Box::new(ByteLines::new(out)), ctrlc.clone(), head, None) + RawStream::new( + Box::new(ByteLines::new(out)), + engine_state.ctrlc.clone(), + head, + None, + ) }); let stderr = child.as_mut().stderr.take().map(|err| { - RawStream::new(Box::new(ByteLines::new(err)), ctrlc.clone(), head, None) + RawStream::new( + Box::new(ByteLines::new(err)), + engine_state.ctrlc.clone(), + head, + None, + ) }); if matches!(self.err, OutDest::Pipe) { @@ -505,15 +513,16 @@ impl ExternalCommand { }) .err_span(head)?; - let exit_code_receiver = ValueReceiver::new(exit_code_rx); + let exit_code = Some(ListStream::new( + ValueReceiver::new(exit_code_rx), + head, + None, + )); Ok(PipelineData::ExternalStream { stdout, stderr, - exit_code: Some(ListStream::from_stream( - Box::new(exit_code_receiver), - ctrlc.clone(), - )), + exit_code, span: head, metadata: None, trim_end_newline: true, diff --git a/crates/nu-command/src/system/which_.rs b/crates/nu-command/src/system/which_.rs index f593bcf9fe..1244a57d99 100644 --- a/crates/nu-command/src/system/which_.rs +++ b/crates/nu-command/src/system/which_.rs @@ -214,6 +214,7 @@ fn which( stack: &mut Stack, call: &Call, ) -> Result { + let head = call.head; let which_args = WhichArgs { applications: call.rest(engine_state, stack, 0)?, all: call.has_flag(engine_state, stack, "all")?, @@ -223,7 +224,7 @@ fn which( if which_args.applications.is_empty() { return Err(ShellError::MissingParameter { param_name: "application".into(), - span: call.head, + span: head, }); } @@ -231,7 +232,7 @@ fn which( #[allow(deprecated)] let cwd = env::current_dir_str(engine_state, stack)?; - let paths = env::path_str(engine_state, stack, call.head)?; + let paths = env::path_str(engine_state, stack, head)?; for app in which_args.applications { let values = which_single( @@ -244,7 +245,7 @@ fn which( output.extend(values); } - Ok(output.into_iter().into_pipeline_data(ctrlc)) + Ok(output.into_iter().into_pipeline_data(head, ctrlc)) } #[cfg(test)] diff --git a/crates/nu-command/src/viewers/table.rs b/crates/nu-command/src/viewers/table.rs index e7d2774a5e..f4df2e03bc 100644 --- a/crates/nu-command/src/viewers/table.rs +++ b/crates/nu-command/src/viewers/table.rs @@ -5,7 +5,9 @@ use lscolors::{LsColors, Style}; use nu_color_config::{color_from_hex, StyleComputer, TextStyle}; use nu_engine::{command_prelude::*, env::get_config, env_to_string}; -use nu_protocol::{Config, DataSource, ListStream, PipelineMetadata, RawStream, TableMode}; +use nu_protocol::{ + Config, DataSource, ListStream, PipelineMetadata, RawStream, TableMode, ValueIterator, +}; use nu_table::{ common::create_nu_table_config, CollapsedTable, ExpandedTable, JustTable, NuTable, NuTableCell, StringResult, TableOpts, TableOutput, @@ -381,7 +383,7 @@ fn handle_table_command( // None of these two receive a StyleComputer because handle_row_stream() can produce it by itself using engine_state and stack. PipelineData::Value(Value::List { vals, .. }, metadata) => { let ctrlc = input.engine_state.ctrlc.clone(); - let stream = ListStream::from_stream(vals.into_iter(), ctrlc); + let stream = ListStream::new(vals.into_iter(), span, ctrlc); input.data = PipelineData::Empty; handle_row_stream(input, cfg, stream, metadata) @@ -405,7 +407,7 @@ fn handle_table_command( } PipelineData::Value(Value::Range { val, .. }, metadata) => { let ctrlc = input.engine_state.ctrlc.clone(); - let stream = ListStream::from_stream(val.into_range_iter(span, ctrlc.clone()), ctrlc); + let stream = ListStream::new(val.into_range_iter(span, ctrlc), span, None); input.data = PipelineData::Empty; handle_row_stream(input, cfg, stream, metadata) } @@ -537,7 +539,6 @@ fn handle_row_stream( data_source: DataSource::Ls, }) => { let config = get_config(input.engine_state, input.stack); - let ctrlc = ctrlc.clone(); let ls_colors_env_str = match input.stack.get_env_var(input.engine_state, "LS_COLORS") { Some(v) => Some(env_to_string( "LS_COLORS", @@ -549,67 +550,55 @@ fn handle_row_stream( }; let ls_colors = get_ls_colors(ls_colors_env_str); - ListStream::from_stream( - stream.map(move |mut x| match &mut x { - Value::Record { val: record, .. } => { - // Only the name column gets special colors, for now - if let Some(value) = record.to_mut().get_mut("name") { - let span = value.span(); - if let Value::String { val, .. } = value { - if let Some(val) = render_path_name(val, &config, &ls_colors, span) - { - *value = val; - } + stream.map(move |mut value| { + if let Value::Record { val: record, .. } = &mut value { + // Only the name column gets special colors, for now + if let Some(value) = record.to_mut().get_mut("name") { + let span = value.span(); + if let Value::String { val, .. } = value { + if let Some(val) = render_path_name(val, &config, &ls_colors, span) { + *value = val; } } - - x } - _ => x, - }), - ctrlc, - ) + } + value + }) } // Next, `to html -l` sources: Some(PipelineMetadata { data_source: DataSource::HtmlThemes, }) => { - let ctrlc = ctrlc.clone(); - - ListStream::from_stream( - stream.map(move |mut x| match &mut x { - Value::Record { val: record, .. } => { - for (rec_col, rec_val) in record.to_mut().iter_mut() { - // Every column in the HTML theme table except 'name' is colored - if rec_col != "name" { - continue; - } - // Simple routine to grab the hex code, convert to a style, - // then place it in a new Value::String. - - let span = rec_val.span(); - if let Value::String { val, .. } = rec_val { - let s = match color_from_hex(val) { - Ok(c) => match c { - // .normal() just sets the text foreground color. - Some(c) => c.normal(), - None => nu_ansi_term::Style::default(), - }, - Err(_) => nu_ansi_term::Style::default(), - }; - *rec_val = Value::string( - // Apply the style (ANSI codes) to the string - s.paint(&*val).to_string(), - span, - ); - } + stream.map(|mut value| { + if let Value::Record { val: record, .. } = &mut value { + for (rec_col, rec_val) in record.to_mut().iter_mut() { + // Every column in the HTML theme table except 'name' is colored + if rec_col != "name" { + continue; + } + // Simple routine to grab the hex code, convert to a style, + // then place it in a new Value::String. + + let span = rec_val.span(); + if let Value::String { val, .. } = rec_val { + let s = match color_from_hex(val) { + Ok(c) => match c { + // .normal() just sets the text foreground color. + Some(c) => c.normal(), + None => nu_ansi_term::Style::default(), + }, + Err(_) => nu_ansi_term::Style::default(), + }; + *rec_val = Value::string( + // Apply the style (ANSI codes) to the string + s.paint(&*val).to_string(), + span, + ); } - x } - _ => x, - }), - ctrlc, - ) + } + value + }) } _ => stream, }; @@ -662,7 +651,7 @@ fn make_clickable_link( struct PagingTableCreator { head: Span, - stream: ListStream, + stream: ValueIterator, engine_state: EngineState, stack: Stack, ctrlc: Option>, @@ -683,7 +672,7 @@ impl PagingTableCreator { ) -> Self { PagingTableCreator { head, - stream, + stream: stream.into_inner(), engine_state, stack, ctrlc, @@ -822,7 +811,7 @@ impl Iterator for PagingTableCreator { } fn stream_collect( - stream: &mut ListStream, + stream: impl Iterator, size: usize, ctrlc: Option>, ) -> (Vec, bool) { @@ -830,7 +819,7 @@ fn stream_collect( let mut end = true; let mut batch = Vec::with_capacity(size); - for (i, item) in stream.by_ref().enumerate() { + for (i, item) in stream.enumerate() { batch.push(item); // If we've been buffering over a second, go ahead and send out what we have so far @@ -853,7 +842,7 @@ fn stream_collect( } fn stream_collect_abbriviated( - stream: &mut ListStream, + stream: impl Iterator, size: usize, ctrlc: Option>, ) -> (Vec, usize, bool) { @@ -866,7 +855,7 @@ fn stream_collect_abbriviated( return (vec![], 0, false); } - for item in stream.by_ref() { + for item in stream { read += 1; if read <= size { diff --git a/crates/nu-explore/src/nu_common/value.rs b/crates/nu-explore/src/nu_common/value.rs index c61533770b..17b277cac5 100644 --- a/crates/nu-explore/src/nu_common/value.rs +++ b/crates/nu-explore/src/nu_common/value.rs @@ -22,9 +22,9 @@ pub fn collect_pipeline(input: PipelineData) -> Result<(Vec, Vec (Vec, Vec>) { +fn collect_list_stream(stream: ListStream) -> (Vec, Vec>) { let mut records = vec![]; - for item in stream.by_ref() { + for item in stream { records.push(item); } @@ -70,7 +70,7 @@ fn collect_external_stream( data.push(value); } if let Some(exit_code) = exit_code { - let list = exit_code.collect::>(); + let list = exit_code.into_iter().collect::>(); let val = Value::list(list, span); columns.push(String::from("exit_code")); diff --git a/crates/nu-plugin-core/src/interface/mod.rs b/crates/nu-plugin-core/src/interface/mod.rs index 4124e83bfb..3fb86aee36 100644 --- a/crates/nu-plugin-core/src/interface/mod.rs +++ b/crates/nu-plugin-core/src/interface/mod.rs @@ -183,7 +183,7 @@ pub trait InterfaceManager { PipelineDataHeader::ListStream(info) => { let handle = self.stream_manager().get_handle(); let reader = handle.read_stream(info.id, self.get_interface())?; - PipelineData::ListStream(ListStream::from_stream(reader, ctrlc.cloned()), None) + ListStream::new(reader, info.span, ctrlc.cloned()).into() } PipelineDataHeader::ExternalStream(info) => { let handle = self.stream_manager().get_handle(); @@ -203,7 +203,7 @@ pub trait InterfaceManager { .map(|list_info| { handle .read_stream(list_info.id, self.get_interface()) - .map(|reader| ListStream::from_stream(reader, ctrlc.cloned())) + .map(|reader| ListStream::new(reader, info.span, ctrlc.cloned())) }) .transpose()?, span: info.span, @@ -278,7 +278,10 @@ pub trait Interface: Clone + Send { PipelineData::ListStream(stream, _) => { let (id, writer) = new_stream(LIST_STREAM_HIGH_PRESSURE)?; Ok(( - PipelineDataHeader::ListStream(ListStreamInfo { id }), + PipelineDataHeader::ListStream(ListStreamInfo { + id, + span: stream.span(), + }), PipelineDataWriter::ListStream(writer, stream), )) } @@ -316,7 +319,7 @@ pub trait Interface: Clone + Send { .map(|(stream, (id, _))| RawStreamInfo::new(*id, stream)), exit_code: exit_code_stream .as_ref() - .map(|&(id, _)| ListStreamInfo { id }), + .map(|&(id, _)| ListStreamInfo { id, span }), trim_end_newline, }); // Collect the writers diff --git a/crates/nu-plugin-core/src/interface/tests.rs b/crates/nu-plugin-core/src/interface/tests.rs index 33b6ef27ae..ce7be52f30 100644 --- a/crates/nu-plugin-core/src/interface/tests.rs +++ b/crates/nu-plugin-core/src/interface/tests.rs @@ -161,7 +161,10 @@ fn read_pipeline_data_list_stream() -> Result<(), ShellError> { } test.add(StreamMessage::End(7)); - let header = PipelineDataHeader::ListStream(ListStreamInfo { id: 7 }); + let header = PipelineDataHeader::ListStream(ListStreamInfo { + id: 7, + span: Span::test_data(), + }); let pipe = manager.read_pipeline_data(header, None)?; assert!( @@ -221,7 +224,10 @@ fn read_pipeline_data_external_stream() -> Result<(), ShellError> { is_binary: true, known_size: None, }), - exit_code: Some(ListStreamInfo { id: 14 }), + exit_code: Some(ListStreamInfo { + id: 14, + span: Span::test_data(), + }), trim_end_newline: true, }); @@ -273,7 +279,10 @@ fn read_pipeline_data_external_stream() -> Result<(), ShellError> { } assert_eq!(iterations, count, "stderr length"); - assert_eq!(vec![Value::test_int(1)], exit_code.collect::>()); + assert_eq!( + vec![Value::test_int(1)], + exit_code.into_iter().collect::>() + ); } _ => panic!("unexpected PipelineData: {pipe:?}"), } @@ -284,30 +293,13 @@ fn read_pipeline_data_external_stream() -> Result<(), ShellError> { Ok(()) } -#[test] -fn read_pipeline_data_ctrlc() -> Result<(), ShellError> { - let manager = TestInterfaceManager::new(&TestCase::new()); - let header = PipelineDataHeader::ListStream(ListStreamInfo { id: 0 }); - let ctrlc = Default::default(); - match manager.read_pipeline_data(header, Some(&ctrlc))? { - PipelineData::ListStream( - ListStream { - ctrlc: stream_ctrlc, - .. - }, - _, - ) => { - assert!(Arc::ptr_eq(&ctrlc, &stream_ctrlc.expect("ctrlc not set"))); - Ok(()) - } - _ => panic!("Unexpected PipelineData, should have been ListStream"), - } -} - #[test] fn read_pipeline_data_prepared_properly() -> Result<(), ShellError> { let manager = TestInterfaceManager::new(&TestCase::new()); - let header = PipelineDataHeader::ListStream(ListStreamInfo { id: 0 }); + let header = PipelineDataHeader::ListStream(ListStreamInfo { + id: 0, + span: Span::test_data(), + }); match manager.read_pipeline_data(header, None)? { PipelineData::ListStream(_, meta) => match meta { Some(PipelineMetadata { data_source }) => match data_source { @@ -404,7 +396,7 @@ fn write_pipeline_data_list_stream() -> Result<(), ShellError> { // Set up pipeline data for a list stream let pipe = PipelineData::ListStream( - ListStream::from_stream(values.clone().into_iter(), None), + ListStream::new(values.clone().into_iter(), Span::test_data(), None), None, ); @@ -474,8 +466,9 @@ fn write_pipeline_data_external_stream() -> Result<(), ShellError> { span, None, )), - exit_code: Some(ListStream::from_stream( + exit_code: Some(ListStream::new( std::iter::once(exit_code.clone()), + Span::test_data(), None, )), span, diff --git a/crates/nu-plugin-engine/src/interface/mod.rs b/crates/nu-plugin-engine/src/interface/mod.rs index be14dbdc2d..3447d6a907 100644 --- a/crates/nu-plugin-engine/src/interface/mod.rs +++ b/crates/nu-plugin-engine/src/interface/mod.rs @@ -11,8 +11,8 @@ use nu_plugin_protocol::{ PluginOutput, ProtocolInfo, StreamId, StreamMessage, }; use nu_protocol::{ - ast::Operator, CustomValue, IntoInterruptiblePipelineData, IntoSpanned, ListStream, - PipelineData, PluginSignature, ShellError, Span, Spanned, Value, + ast::Operator, CustomValue, IntoSpanned, PipelineData, PluginSignature, ShellError, Span, + Spanned, Value, }; use std::{ collections::{btree_map, BTreeMap}, @@ -592,14 +592,15 @@ impl InterfaceManager for PluginInterfaceManager { })?; Ok(data) } - PipelineData::ListStream(ListStream { stream, ctrlc, .. }, meta) => { + PipelineData::ListStream(stream, meta) => { let source = self.state.source.clone(); - Ok(stream - .map(move |mut value| { + Ok(PipelineData::ListStream( + stream.map(move |mut value| { let _ = PluginCustomValueWithSource::add_source_in(&mut value, &source); value - }) - .into_pipeline_data_with_metadata(meta, ctrlc)) + }), + meta, + )) } PipelineData::Empty | PipelineData::ExternalStream { .. } => Ok(data), } @@ -1076,18 +1077,19 @@ impl Interface for PluginInterface { state.prepare_value(&mut value, &self.state.source)?; Ok(PipelineData::Value(value, meta)) } - PipelineData::ListStream(ListStream { stream, ctrlc, .. }, meta) => { + PipelineData::ListStream(stream, meta) => { let source = self.state.source.clone(); let state = state.clone(); - Ok(stream - .map(move |mut value| { + Ok(PipelineData::ListStream( + stream.map(move |mut value| { match state.prepare_value(&mut value, &source) { Ok(()) => value, // Put the error in the stream instead Err(err) => Value::error(err, value.span()), } - }) - .into_pipeline_data_with_metadata(meta, ctrlc)) + }), + meta, + )) } PipelineData::Empty | PipelineData::ExternalStream { .. } => Ok(data), } diff --git a/crates/nu-plugin-engine/src/interface/tests.rs b/crates/nu-plugin-engine/src/interface/tests.rs index 007568d726..7548703191 100644 --- a/crates/nu-plugin-engine/src/interface/tests.rs +++ b/crates/nu-plugin-engine/src/interface/tests.rs @@ -52,7 +52,10 @@ fn manager_consume_all_exits_after_streams_and_interfaces_are_dropped() -> Resul // Create a stream... let stream = manager.read_pipeline_data( - PipelineDataHeader::ListStream(ListStreamInfo { id: 0 }), + PipelineDataHeader::ListStream(ListStreamInfo { + id: 0, + span: Span::test_data(), + }), None, )?; @@ -105,7 +108,10 @@ fn manager_consume_all_propagates_io_error_to_readers() -> Result<(), ShellError test.set_read_error(test_io_error()); let stream = manager.read_pipeline_data( - PipelineDataHeader::ListStream(ListStreamInfo { id: 0 }), + PipelineDataHeader::ListStream(ListStreamInfo { + id: 0, + span: Span::test_data(), + }), None, )?; @@ -331,7 +337,10 @@ fn manager_consume_call_response_forwards_to_subscriber_with_pipeline_data( manager.consume(PluginOutput::CallResponse( 0, - PluginCallResponse::PipelineData(PipelineDataHeader::ListStream(ListStreamInfo { id: 0 })), + PluginCallResponse::PipelineData(PipelineDataHeader::ListStream(ListStreamInfo { + id: 0, + span: Span::test_data(), + })), ))?; for i in 0..2 { @@ -372,7 +381,10 @@ fn manager_consume_call_response_registers_streams() -> Result<(), ShellError> { // Check list streams, external streams manager.consume(PluginOutput::CallResponse( 0, - PluginCallResponse::PipelineData(PipelineDataHeader::ListStream(ListStreamInfo { id: 0 })), + PluginCallResponse::PipelineData(PipelineDataHeader::ListStream(ListStreamInfo { + id: 0, + span: Span::test_data(), + })), ))?; manager.consume(PluginOutput::CallResponse( 1, @@ -388,7 +400,10 @@ fn manager_consume_call_response_registers_streams() -> Result<(), ShellError> { is_binary: false, known_size: None, }), - exit_code: Some(ListStreamInfo { id: 3 }), + exit_code: Some(ListStreamInfo { + id: 3, + span: Span::test_data(), + }), trim_end_newline: false, })), ))?; @@ -448,7 +463,10 @@ fn manager_consume_engine_call_forwards_to_subscriber_with_pipeline_data() -> Re span: Span::test_data(), }, positional: vec![], - input: PipelineDataHeader::ListStream(ListStreamInfo { id: 2 }), + input: PipelineDataHeader::ListStream(ListStreamInfo { + id: 2, + span: Span::test_data(), + }), redirect_stdout: false, redirect_stderr: false, }, @@ -681,7 +699,7 @@ fn manager_prepare_pipeline_data_adds_source_to_list_streams() -> Result<(), She [Value::test_custom_value(Box::new( test_plugin_custom_value(), ))] - .into_pipeline_data(None), + .into_pipeline_data(Span::test_data(), None), )?; let value = data @@ -855,7 +873,7 @@ fn interface_write_plugin_call_writes_run_with_stream_input() -> Result<(), Shel positional: vec![], named: vec![], }, - input: values.clone().into_pipeline_data(None), + input: values.clone().into_pipeline_data(Span::test_data(), None), }), None, )?; @@ -1131,7 +1149,10 @@ fn interface_prepare_pipeline_data_accepts_normal_streams() -> Result<(), ShellE let interface = TestCase::new().plugin("test").get_interface(); let values = normal_values(&interface); let state = CurrentCallState::default(); - let data = interface.prepare_pipeline_data(values.clone().into_pipeline_data(None), &state)?; + let data = interface.prepare_pipeline_data( + values.clone().into_pipeline_data(Span::test_data(), None), + &state, + )?; let mut count = 0; for (expected_value, actual_value) in values.iter().zip(data) { @@ -1191,7 +1212,10 @@ fn interface_prepare_pipeline_data_rejects_bad_custom_value_in_a_stream() -> Res let interface = TestCase::new().plugin("test").get_interface(); let values = bad_custom_values(); let state = CurrentCallState::default(); - let data = interface.prepare_pipeline_data(values.clone().into_pipeline_data(None), &state)?; + let data = interface.prepare_pipeline_data( + values.clone().into_pipeline_data(Span::test_data(), None), + &state, + )?; let mut count = 0; for value in data { diff --git a/crates/nu-plugin-protocol/src/lib.rs b/crates/nu-plugin-protocol/src/lib.rs index 57bec28327..e40136ca56 100644 --- a/crates/nu-plugin-protocol/src/lib.rs +++ b/crates/nu-plugin-protocol/src/lib.rs @@ -116,6 +116,7 @@ impl PipelineDataHeader { #[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)] pub struct ListStreamInfo { pub id: StreamId, + pub span: Span, } /// Additional information about external streams diff --git a/crates/nu-plugin-test-support/src/lib.rs b/crates/nu-plugin-test-support/src/lib.rs index dd301d4d2b..8aa675fd1f 100644 --- a/crates/nu-plugin-test-support/src/lib.rs +++ b/crates/nu-plugin-test-support/src/lib.rs @@ -79,7 +79,7 @@ //! //! // #[test] //! fn test_lowercase() -> Result<(), ShellError> { -//! let input = vec![Value::test_string("FooBar")].into_pipeline_data(None); +//! let input = vec![Value::test_string("FooBar")].into_pipeline_data(Span::test_data(), None); //! let output = PluginTest::new("lowercase", LowercasePlugin.into())? //! .eval_with("lowercase", input)? //! .into_value(Span::test_data()); diff --git a/crates/nu-plugin-test-support/src/plugin_test.rs b/crates/nu-plugin-test-support/src/plugin_test.rs index 16eb62cf2e..18eee356ef 100644 --- a/crates/nu-plugin-test-support/src/plugin_test.rs +++ b/crates/nu-plugin-test-support/src/plugin_test.rs @@ -91,7 +91,7 @@ impl PluginTest { /// let result = PluginTest::new("my_plugin", MyPlugin.into())? /// .eval_with( /// "my-command", - /// vec![Value::test_int(42)].into_pipeline_data(None) + /// vec![Value::test_int(42)].into_pipeline_data(Span::test_data(), None) /// )? /// .into_value(Span::test_data()); /// assert_eq!(Value::test_string("42"), result); diff --git a/crates/nu-plugin-test-support/tests/lowercase/mod.rs b/crates/nu-plugin-test-support/tests/lowercase/mod.rs index 25a6063dc0..33446cea86 100644 --- a/crates/nu-plugin-test-support/tests/lowercase/mod.rs +++ b/crates/nu-plugin-test-support/tests/lowercase/mod.rs @@ -68,7 +68,7 @@ impl Plugin for LowercasePlugin { fn test_lowercase_using_eval_with() -> Result<(), ShellError> { let result = PluginTest::new("lowercase", LowercasePlugin.into())?.eval_with( "lowercase", - vec![Value::test_string("HeLlO wOrLd")].into_pipeline_data(None), + vec![Value::test_string("HeLlO wOrLd")].into_pipeline_data(Span::test_data(), None), )?; assert_eq!( diff --git a/crates/nu-plugin/src/plugin/interface/mod.rs b/crates/nu-plugin/src/plugin/interface/mod.rs index 630c6b2979..70e143ece5 100644 --- a/crates/nu-plugin/src/plugin/interface/mod.rs +++ b/crates/nu-plugin/src/plugin/interface/mod.rs @@ -11,8 +11,8 @@ use nu_plugin_protocol::{ ProtocolInfo, }; use nu_protocol::{ - engine::Closure, Config, IntoInterruptiblePipelineData, LabeledError, ListStream, PipelineData, - PluginSignature, ShellError, Span, Spanned, Value, + engine::Closure, Config, LabeledError, PipelineData, PluginSignature, ShellError, Span, + Spanned, Value, }; use std::{ collections::{btree_map, BTreeMap, HashMap}, @@ -336,14 +336,15 @@ impl InterfaceManager for EngineInterfaceManager { PluginCustomValue::deserialize_custom_values_in(value)?; Ok(data) } - PipelineData::ListStream(ListStream { stream, ctrlc, .. }, meta) => Ok(stream - .map(|mut value| { + PipelineData::ListStream(stream, meta) => { + let stream = stream.map(|mut value| { let span = value.span(); PluginCustomValue::deserialize_custom_values_in(&mut value) .map(|()| value) .unwrap_or_else(|err| Value::error(err, span)) - }) - .into_pipeline_data_with_metadata(meta, ctrlc)), + }); + Ok(PipelineData::ListStream(stream, meta)) + } PipelineData::Empty | PipelineData::ExternalStream { .. } => Ok(data), } } @@ -910,14 +911,15 @@ impl Interface for EngineInterface { PluginCustomValue::serialize_custom_values_in(value)?; Ok(data) } - PipelineData::ListStream(ListStream { stream, ctrlc, .. }, meta) => Ok(stream - .map(|mut value| { + PipelineData::ListStream(stream, meta) => { + let stream = stream.map(|mut value| { let span = value.span(); PluginCustomValue::serialize_custom_values_in(&mut value) .map(|_| value) .unwrap_or_else(|err| Value::error(err, span)) - }) - .into_pipeline_data_with_metadata(meta, ctrlc)), + }); + Ok(PipelineData::ListStream(stream, meta)) + } PipelineData::Empty | PipelineData::ExternalStream { .. } => Ok(data), } } diff --git a/crates/nu-plugin/src/plugin/interface/tests.rs b/crates/nu-plugin/src/plugin/interface/tests.rs index 7fc7bf06e4..17018cbc00 100644 --- a/crates/nu-plugin/src/plugin/interface/tests.rs +++ b/crates/nu-plugin/src/plugin/interface/tests.rs @@ -56,7 +56,10 @@ fn manager_consume_all_exits_after_streams_and_interfaces_are_dropped() -> Resul // Create a stream... let stream = manager.read_pipeline_data( - PipelineDataHeader::ListStream(ListStreamInfo { id: 0 }), + PipelineDataHeader::ListStream(ListStreamInfo { + id: 0, + span: Span::test_data(), + }), None, )?; @@ -109,7 +112,10 @@ fn manager_consume_all_propagates_io_error_to_readers() -> Result<(), ShellError test.set_read_error(test_io_error()); let stream = manager.read_pipeline_data( - PipelineDataHeader::ListStream(ListStreamInfo { id: 0 }), + PipelineDataHeader::ListStream(ListStreamInfo { + id: 0, + span: Span::test_data(), + }), None, )?; @@ -395,7 +401,10 @@ fn manager_consume_call_run_forwards_to_receiver_with_pipeline_data() -> Result< positional: vec![], named: vec![], }, - input: PipelineDataHeader::ListStream(ListStreamInfo { id: 6 }), + input: PipelineDataHeader::ListStream(ListStreamInfo { + id: 6, + span: Span::test_data(), + }), }), ))?; @@ -534,7 +543,10 @@ fn manager_consume_engine_call_response_forwards_to_subscriber_with_pipeline_dat manager.consume(PluginInput::EngineCallResponse( 0, - EngineCallResponse::PipelineData(PipelineDataHeader::ListStream(ListStreamInfo { id: 0 })), + EngineCallResponse::PipelineData(PipelineDataHeader::ListStream(ListStreamInfo { + id: 0, + span: Span::test_data(), + })), ))?; for i in 0..2 { @@ -590,7 +602,7 @@ fn manager_prepare_pipeline_data_deserializes_custom_values_in_streams() -> Resu [Value::test_custom_value(Box::new( test_plugin_custom_value(), ))] - .into_pipeline_data(None), + .into_pipeline_data(Span::test_data(), None), )?; let value = data @@ -621,7 +633,8 @@ fn manager_prepare_pipeline_data_embeds_deserialization_errors_in_streams() -> R let span = Span::new(20, 30); let data = manager.prepare_pipeline_data( - [Value::custom(Box::new(invalid_custom_value), span)].into_pipeline_data(None), + [Value::custom(Box::new(invalid_custom_value), span)] + .into_pipeline_data(Span::test_data(), None), )?; let value = data @@ -703,7 +716,8 @@ fn interface_write_response_with_stream() -> Result<(), ShellError> { interface .write_response(Ok::<_, ShellError>( - [Value::test_int(3), Value::test_int(4), Value::test_int(5)].into_pipeline_data(None), + [Value::test_int(3), Value::test_int(4), Value::test_int(5)] + .into_pipeline_data(Span::test_data(), None), ))? .write()?; @@ -1105,7 +1119,7 @@ fn interface_prepare_pipeline_data_serializes_custom_values_in_streams() -> Resu [Value::test_custom_value(Box::new( expected_test_custom_value(), ))] - .into_pipeline_data(None), + .into_pipeline_data(Span::test_data(), None), &(), )?; @@ -1163,7 +1177,8 @@ fn interface_prepare_pipeline_data_embeds_serialization_errors_in_streams() -> R let span = Span::new(40, 60); let data = interface.prepare_pipeline_data( - [Value::custom(Box::new(CantSerialize::BadVariant), span)].into_pipeline_data(None), + [Value::custom(Box::new(CantSerialize::BadVariant), span)] + .into_pipeline_data(Span::test_data(), None), &(), )?; diff --git a/crates/nu-protocol/src/eval_base.rs b/crates/nu-protocol/src/eval_base.rs index f211f48560..6a1c0d302f 100644 --- a/crates/nu-protocol/src/eval_base.rs +++ b/crates/nu-protocol/src/eval_base.rs @@ -4,8 +4,7 @@ use crate::{ ExternalArgument, ListItem, Math, Operator, RecordItem, }, debugger::DebugContext, - Config, IntoInterruptiblePipelineData, Range, Record, ShellError, Span, Value, VarId, - ENV_VARIABLE_ID, + Config, Range, Record, ShellError, Span, Value, VarId, ENV_VARIABLE_ID, }; use std::{borrow::Cow, collections::HashMap}; @@ -278,18 +277,13 @@ pub trait Eval { Self::eval_row_condition_or_closure(state, mut_state, *block_id, expr.span) } Expr::StringInterpolation(exprs) => { - let mut parts = vec![]; - for expr in exprs { - parts.push(Self::eval::(state, mut_state, expr)?); - } - let config = Self::get_config(state, mut_state); + let str = exprs + .iter() + .map(|expr| Self::eval::(state, mut_state, expr).map(|v| v.to_expanded_string(", ", &config))) + .collect::>()?; - parts - .into_iter() - .into_pipeline_data(None) - .collect_string("", &config) - .map(|x| Value::string(x, expr.span)) + Ok(Value::string(str, expr.span)) } Expr::Overlay(_) => Self::eval_overlay(state, expr.span), Expr::GlobPattern(pattern, quoted) => { diff --git a/crates/nu-protocol/src/pipeline_data/list_stream.rs b/crates/nu-protocol/src/pipeline_data/list_stream.rs new file mode 100644 index 0000000000..117c264219 --- /dev/null +++ b/crates/nu-protocol/src/pipeline_data/list_stream.rs @@ -0,0 +1,156 @@ +use crate::{Config, PipelineData, ShellError, Span, Value}; +use std::{ + fmt::Debug, + sync::{atomic::AtomicBool, Arc}, +}; + +pub type ValueIterator = Box + Send + 'static>; + +/// A potentially infinite, interruptible stream of [`Value`]s. +/// +/// In practice, a "stream" here means anything which can be iterated and produces Values. +/// Like other iterators in Rust, observing values from this stream will drain the items +/// as you view them and the stream cannot be replayed. +pub struct ListStream { + stream: ValueIterator, + span: Span, +} + +impl ListStream { + /// Create a new [`ListStream`] from a [`Value`] `Iterator`. + pub fn new( + iter: impl Iterator + Send + 'static, + span: Span, + interrupt: Option>, + ) -> Self { + Self { + stream: Box::new(Interrupt::new(iter, interrupt)), + span, + } + } + + /// Returns the [`Span`] associated with this [`ListStream`]. + pub fn span(&self) -> Span { + self.span + } + + /// Convert a [`ListStream`] into its inner [`Value`] `Iterator`. + pub fn into_inner(self) -> ValueIterator { + self.stream + } + + /// Converts each value in a [`ListStream`] into a string and then joins the strings together + /// using the given separator. + pub fn into_string(self, separator: &str, config: &Config) -> String { + self.into_iter() + .map(|val| val.to_expanded_string(", ", config)) + .collect::>() + .join(separator) + } + + /// Collect the values of a [`ListStream`] into a list [`Value`]. + pub fn into_value(self) -> Value { + Value::list(self.stream.collect(), self.span) + } + + /// Consume all values in the stream, returning an error if any of the values is a `Value::Error`. + pub fn drain(self) -> Result<(), ShellError> { + for next in self { + if let Value::Error { error, .. } = next { + return Err(*error); + } + } + Ok(()) + } + + /// Modify the inner iterator of a [`ListStream`] using a function. + /// + /// This can be used to call any number of standard iterator functions on the [`ListStream`]. + /// E.g., `take`, `filter`, `step_by`, and more. + /// + /// ``` + /// use nu_protocol::{ListStream, Span, Value}; + /// + /// let span = Span::unknown(); + /// let stream = ListStream::new(std::iter::repeat(Value::int(0, span)), span, None); + /// let new_stream = stream.modify(|iter| iter.take(100)); + /// ``` + pub fn modify(self, f: impl FnOnce(ValueIterator) -> I) -> Self + where + I: Iterator + Send + 'static, + { + Self { + stream: Box::new(f(self.stream)), + span: self.span, + } + } + + /// Create a new [`ListStream`] whose values are the results of applying the given function + /// to each of the values in the original [`ListStream`]. + pub fn map(self, mapping: impl FnMut(Value) -> Value + Send + 'static) -> Self { + self.modify(|iter| iter.map(mapping)) + } +} + +impl Debug for ListStream { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ListStream").finish() + } +} + +impl IntoIterator for ListStream { + type Item = Value; + + type IntoIter = IntoIter; + + fn into_iter(self) -> Self::IntoIter { + IntoIter { + stream: self.into_inner(), + } + } +} + +impl From for PipelineData { + fn from(stream: ListStream) -> Self { + Self::ListStream(stream, None) + } +} + +pub struct IntoIter { + stream: ValueIterator, +} + +impl Iterator for IntoIter { + type Item = Value; + + fn next(&mut self) -> Option { + self.stream.next() + } +} + +struct Interrupt { + iter: I, + interrupt: Option>, +} + +impl Interrupt { + fn new(iter: I, interrupt: Option>) -> Self { + Self { iter, interrupt } + } +} + +impl Iterator for Interrupt { + type Item = ::Item; + + fn next(&mut self) -> Option { + if nu_utils::ctrl_c::was_pressed(&self.interrupt) { + None + } else { + self.iter.next() + } + } + + fn size_hint(&self) -> (usize, Option) { + self.iter.size_hint() + } +} diff --git a/crates/nu-protocol/src/pipeline_data/mod.rs b/crates/nu-protocol/src/pipeline_data/mod.rs index 94096c31a4..71c667fa70 100644 --- a/crates/nu-protocol/src/pipeline_data/mod.rs +++ b/crates/nu-protocol/src/pipeline_data/mod.rs @@ -1,10 +1,12 @@ +pub mod list_stream; mod metadata; mod out_dest; -mod stream; +mod raw_stream; +pub use list_stream::{ListStream, ValueIterator}; pub use metadata::*; pub use out_dest::*; -pub use stream::*; +pub use raw_stream::*; use crate::{ ast::{Call, PathMember}, @@ -76,8 +78,9 @@ impl PipelineData { PipelineData::ExternalStream { stdout: None, stderr: None, - exit_code: Some(ListStream::from_stream( + exit_code: Some(ListStream::new( [Value::int(exit_code, Span::unknown())].into_iter(), + Span::unknown(), None, )), span: Span::unknown(), @@ -118,7 +121,7 @@ impl PipelineData { /// PipelineData doesn't always have a Span, but we can try! pub fn span(&self) -> Option { match self { - PipelineData::ListStream(..) => None, + PipelineData::ListStream(stream, ..) => Some(stream.span()), PipelineData::ExternalStream { span, .. } => Some(*span), PipelineData::Value(v, _) => Some(v.span()), PipelineData::Empty => None, @@ -131,7 +134,7 @@ impl PipelineData { PipelineData::Value(Value::Nothing { .. }, ..) => Value::nothing(span), PipelineData::Value(v, ..) => v.with_span(span), PipelineData::ListStream(s, ..) => Value::list( - s.collect(), + s.into_iter().collect(), span, // FIXME? ), PipelineData::ExternalStream { @@ -403,51 +406,54 @@ impl PipelineData { /// /// It returns Err if the `self` cannot be converted to an iterator. pub fn into_iter_strict(self, span: Span) -> Result { - match self { - PipelineData::Value(value, metadata) => match value { - Value::List { vals, .. } => Ok(PipelineIterator(PipelineData::ListStream( - ListStream::from_stream(vals.into_iter(), None), - metadata, - ))), - Value::Binary { val, .. } => Ok(PipelineIterator(PipelineData::ListStream( - ListStream::from_stream( - val.into_iter().map(move |x| Value::int(x as i64, span)), - None, + Ok(PipelineIterator(match self { + PipelineData::Value(value, ..) => { + let val_span = value.span(); + match value { + Value::List { vals, .. } => PipelineIteratorInner::ListStream( + ListStream::new(vals.into_iter(), val_span, None).into_iter(), ), - metadata, - ))), - Value::Range { val, .. } => Ok(PipelineIterator(PipelineData::ListStream( - ListStream::from_stream(val.into_range_iter(value.span(), None), None), - metadata, - ))) - , - // Propagate errors by explicitly matching them before the final case. - Value::Error { error, .. } => Err(*error), - other => Err(ShellError::OnlySupportsThisInputType { + Value::Binary { val, .. } => PipelineIteratorInner::ListStream( + ListStream::new( + val.into_iter().map(move |x| Value::int(x as i64, val_span)), + val_span, + None, + ) + .into_iter(), + ), + Value::Range { val, .. } => PipelineIteratorInner::ListStream( + ListStream::new(val.into_range_iter(value.span(), None), val_span, None) + .into_iter(), + ), + // Propagate errors by explicitly matching them before the final case. + Value::Error { error, .. } => return Err(*error), + other => { + return Err(ShellError::OnlySupportsThisInputType { + exp_input_type: "list, binary, raw data or range".into(), + wrong_type: other.get_type().to_string(), + dst_span: span, + src_span: val_span, + }) + } + } + } + PipelineData::ListStream(stream, ..) => { + PipelineIteratorInner::ListStream(stream.into_iter()) + } + PipelineData::Empty => { + return Err(ShellError::OnlySupportsThisInputType { exp_input_type: "list, binary, raw data or range".into(), - wrong_type: other.get_type().to_string(), + wrong_type: "null".into(), dst_span: span, - src_span: other.span(), - }), - }, - PipelineData::Empty => Err(ShellError::OnlySupportsThisInputType { - exp_input_type: "list, binary, raw data or range".into(), - wrong_type: "null".into(), - dst_span: span, - src_span: span, - }), - other => Ok(PipelineIterator(other)), - } - } - - pub fn into_interruptible_iter(self, ctrlc: Option>) -> PipelineIterator { - let mut iter = self.into_iter(); - - if let PipelineIterator(PipelineData::ListStream(s, ..)) = &mut iter { - s.ctrlc = ctrlc; - } - - iter + src_span: span, + }) + } + PipelineData::ExternalStream { + stdout: Some(stdout), + .. + } => PipelineIteratorInner::ExternalStream(stdout), + PipelineData::ExternalStream { stdout: None, .. } => PipelineIteratorInner::Empty, + })) } pub fn collect_string(self, separator: &str, config: &Config) -> Result { @@ -516,9 +522,8 @@ impl PipelineData { ) -> Result { match self { // FIXME: there are probably better ways of doing this - PipelineData::ListStream(stream, ..) => { - Value::list(stream.collect(), head).follow_cell_path(cell_path, insensitive) - } + PipelineData::ListStream(stream, ..) => Value::list(stream.into_iter().collect(), head) + .follow_cell_path(cell_path, insensitive), PipelineData::Value(v, ..) => v.follow_cell_path(cell_path, insensitive), PipelineData::Empty => Err(ShellError::IncompatiblePathAccess { type_name: "empty pipeline".to_string(), @@ -531,22 +536,6 @@ impl PipelineData { } } - pub fn upsert_cell_path( - &mut self, - cell_path: &[PathMember], - callback: Box Value>, - head: Span, - ) -> Result<(), ShellError> { - match self { - // FIXME: there are probably better ways of doing this - PipelineData::ListStream(stream, ..) => { - Value::list(stream.collect(), head).upsert_cell_path(cell_path, callback) - } - PipelineData::Value(v, ..) => v.upsert_cell_path(cell_path, callback), - _ => Ok(()), - } - } - /// Simplified mapper to help with simple values also. For full iterator support use `.into_iter()` instead pub fn map( self, @@ -562,12 +551,12 @@ impl PipelineData { let span = value.span(); match value { Value::List { vals, .. } => { - Ok(vals.into_iter().map(f).into_pipeline_data(ctrlc)) + Ok(vals.into_iter().map(f).into_pipeline_data(span, ctrlc)) } Value::Range { val, .. } => Ok(val .into_range_iter(span, ctrlc.clone()) .map(f) - .into_pipeline_data(ctrlc)), + .into_pipeline_data(span, ctrlc)), value => match f(value) { Value::Error { error, .. } => Err(*error), v => Ok(v.into_pipeline_data()), @@ -575,7 +564,9 @@ impl PipelineData { } } PipelineData::Empty => Ok(PipelineData::Empty), - PipelineData::ListStream(stream, ..) => Ok(stream.map(f).into_pipeline_data(ctrlc)), + PipelineData::ListStream(stream, ..) => { + Ok(PipelineData::ListStream(stream.map(f), None)) + } PipelineData::ExternalStream { stdout: None, .. } => Ok(PipelineData::empty()), PipelineData::ExternalStream { stdout: Some(stream), @@ -614,21 +605,22 @@ impl PipelineData { let span = value.span(); match value { Value::List { vals, .. } => { - Ok(vals.into_iter().flat_map(f).into_pipeline_data(ctrlc)) + Ok(vals.into_iter().flat_map(f).into_pipeline_data(span, ctrlc)) } Value::Range { val, .. } => Ok(val .into_range_iter(span, ctrlc.clone()) .flat_map(f) - .into_pipeline_data(ctrlc)), - value => Ok(f(value).into_iter().into_pipeline_data(ctrlc)), + .into_pipeline_data(span, ctrlc)), + value => Ok(f(value).into_iter().into_pipeline_data(span, ctrlc)), } } PipelineData::ListStream(stream, ..) => { - Ok(stream.flat_map(f).into_pipeline_data(ctrlc)) + Ok(stream.modify(|iter| iter.flat_map(f)).into()) } PipelineData::ExternalStream { stdout: None, .. } => Ok(PipelineData::Empty), PipelineData::ExternalStream { stdout: Some(stream), + span, trim_end_newline, .. } => { @@ -640,11 +632,11 @@ impl PipelineData { } Ok(f(Value::string(st, collected.span)) .into_iter() - .into_pipeline_data(ctrlc)) + .into_pipeline_data(span, ctrlc)) } else { Ok(f(Value::binary(collected.item, collected.span)) .into_iter() - .into_pipeline_data(ctrlc)) + .into_pipeline_data(span, ctrlc)) } } } @@ -665,12 +657,12 @@ impl PipelineData { let span = value.span(); match value { Value::List { vals, .. } => { - Ok(vals.into_iter().filter(f).into_pipeline_data(ctrlc)) + Ok(vals.into_iter().filter(f).into_pipeline_data(span, ctrlc)) } Value::Range { val, .. } => Ok(val .into_range_iter(span, ctrlc.clone()) .filter(f) - .into_pipeline_data(ctrlc)), + .into_pipeline_data(span, ctrlc)), value => { if f(&value) { Ok(value.into_pipeline_data()) @@ -680,7 +672,7 @@ impl PipelineData { } } } - PipelineData::ListStream(stream, ..) => Ok(stream.filter(f).into_pipeline_data(ctrlc)), + PipelineData::ListStream(stream, ..) => Ok(stream.modify(|iter| iter.filter(f)).into()), PipelineData::ExternalStream { stdout: None, .. } => Ok(PipelineData::Empty), PipelineData::ExternalStream { stdout: Some(stream), @@ -764,7 +756,6 @@ impl PipelineData { match exit_code { Some(exit_code_stream) => { - let ctrlc = exit_code_stream.ctrlc.clone(); let exit_code: Vec = exit_code_stream.into_iter().collect(); if let Some(Value::Int { val: code, .. }) = exit_code.last() { // if exit_code is not 0, it indicates error occurred, return back Err. @@ -776,7 +767,7 @@ impl PipelineData { PipelineData::ExternalStream { stdout: None, stderr, - exit_code: Some(ListStream::from_stream(exit_code.into_iter(), ctrlc)), + exit_code: Some(ListStream::new(exit_code.into_iter(), span, None)), span, metadata, trim_end_newline, @@ -947,7 +938,14 @@ impl PipelineData { } } -pub struct PipelineIterator(PipelineData); +enum PipelineIteratorInner { + Empty, + Value(Value), + ListStream(list_stream::IntoIter), + ExternalStream(RawStream), +} + +pub struct PipelineIterator(PipelineIteratorInner); impl IntoIterator for PipelineData { type Item = Value; @@ -955,23 +953,29 @@ impl IntoIterator for PipelineData { type IntoIter = PipelineIterator; fn into_iter(self) -> Self::IntoIter { - match self { - PipelineData::Value(value, metadata) => { + PipelineIterator(match self { + PipelineData::Value(value, ..) => { let span = value.span(); match value { - Value::List { vals, .. } => PipelineIterator(PipelineData::ListStream( - ListStream::from_stream(vals.into_iter(), None), - metadata, - )), - Value::Range { val, .. } => PipelineIterator(PipelineData::ListStream( - ListStream::from_stream(val.into_range_iter(span, None), None), - metadata, - )), - x => PipelineIterator(PipelineData::Value(x, metadata)), + Value::List { vals, .. } => PipelineIteratorInner::ListStream( + ListStream::new(vals.into_iter(), span, None).into_iter(), + ), + Value::Range { val, .. } => PipelineIteratorInner::ListStream( + ListStream::new(val.into_range_iter(span, None), span, None).into_iter(), + ), + x => PipelineIteratorInner::Value(x), } } - x => PipelineIterator(x), - } + PipelineData::ListStream(stream, ..) => { + PipelineIteratorInner::ListStream(stream.into_iter()) + } + PipelineData::ExternalStream { + stdout: Some(stdout), + .. + } => PipelineIteratorInner::ExternalStream(stdout), + PipelineData::ExternalStream { stdout: None, .. } => PipelineIteratorInner::Empty, + PipelineData::Empty => PipelineIteratorInner::Empty, + }) } } @@ -1075,15 +1079,11 @@ impl Iterator for PipelineIterator { fn next(&mut self) -> Option { match &mut self.0 { - PipelineData::Empty => None, - PipelineData::Value(Value::Nothing { .. }, ..) => None, - PipelineData::Value(v, ..) => Some(std::mem::take(v)), - PipelineData::ListStream(stream, ..) => stream.next(), - PipelineData::ExternalStream { stdout: None, .. } => None, - PipelineData::ExternalStream { - stdout: Some(stream), - .. - } => stream.next().map(|x| match x { + PipelineIteratorInner::Empty => None, + PipelineIteratorInner::Value(Value::Nothing { .. }, ..) => None, + PipelineIteratorInner::Value(v, ..) => Some(std::mem::take(v)), + PipelineIteratorInner::ListStream(stream, ..) => stream.next(), + PipelineIteratorInner::ExternalStream(stream) => stream.next().map(|x| match x { Ok(x) => x, Err(err) => Value::error( err, @@ -1120,11 +1120,12 @@ where } pub trait IntoInterruptiblePipelineData { - fn into_pipeline_data(self, ctrlc: Option>) -> PipelineData; + fn into_pipeline_data(self, span: Span, ctrlc: Option>) -> PipelineData; fn into_pipeline_data_with_metadata( self, - metadata: impl Into>, + span: Span, ctrlc: Option>, + metadata: impl Into>, ) -> PipelineData; } @@ -1134,20 +1135,18 @@ where I::IntoIter: Send + 'static, ::Item: Into, { - fn into_pipeline_data(self, ctrlc: Option>) -> PipelineData { - PipelineData::ListStream( - ListStream::from_stream(self.into_iter().map(Into::into), ctrlc), - None, - ) + fn into_pipeline_data(self, span: Span, ctrlc: Option>) -> PipelineData { + ListStream::new(self.into_iter().map(Into::into), span, ctrlc).into() } fn into_pipeline_data_with_metadata( self, - metadata: impl Into>, + span: Span, ctrlc: Option>, + metadata: impl Into>, ) -> PipelineData { PipelineData::ListStream( - ListStream::from_stream(self.into_iter().map(Into::into), ctrlc), + ListStream::new(self.into_iter().map(Into::into), span, ctrlc), metadata.into(), ) } diff --git a/crates/nu-protocol/src/pipeline_data/stream.rs b/crates/nu-protocol/src/pipeline_data/raw_stream.rs similarity index 74% rename from crates/nu-protocol/src/pipeline_data/stream.rs rename to crates/nu-protocol/src/pipeline_data/raw_stream.rs index 58e759229b..846cdd772b 100644 --- a/crates/nu-protocol/src/pipeline_data/stream.rs +++ b/crates/nu-protocol/src/pipeline_data/raw_stream.rs @@ -174,72 +174,3 @@ impl Iterator for RawStream { } } } - -/// A potentially infinite stream of values, optionally with a mean to send a Ctrl-C signal to stop -/// the stream from continuing. -/// -/// In practice, a "stream" here means anything which can be iterated and produce Values as it iterates. -/// Like other iterators in Rust, observing values from this stream will drain the items as you view them -/// and the stream cannot be replayed. -pub struct ListStream { - pub stream: Box + Send + 'static>, - pub ctrlc: Option>, - first_guard: bool, -} - -impl ListStream { - pub fn into_string(self, separator: &str, config: &Config) -> String { - self.map(|x: Value| x.to_expanded_string(", ", config)) - .collect::>() - .join(separator) - } - - pub fn drain(self) -> Result<(), ShellError> { - for next in self { - if let Value::Error { error, .. } = next { - return Err(*error); - } - } - Ok(()) - } - - pub fn from_stream( - input: impl Iterator + Send + 'static, - ctrlc: Option>, - ) -> ListStream { - ListStream { - stream: Box::new(input), - ctrlc, - first_guard: true, - } - } -} - -impl Debug for ListStream { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("ListStream").finish() - } -} - -impl Iterator for ListStream { - type Item = Value; - - fn next(&mut self) -> Option { - // We need to check `first_guard` to guarantee that it always have something to return in - // underlying stream. - // - // A realworld example is running an external commands, which have an `exit_code` - // ListStream. - // When we press ctrl-c, the external command receives the signal too, if we don't have - // `first_guard`, the `exit_code` ListStream will return Nothing, which is not expected - if self.first_guard { - self.first_guard = false; - return self.stream.next(); - } - if nu_utils::ctrl_c::was_pressed(&self.ctrlc) { - None - } else { - self.stream.next() - } - } -} diff --git a/crates/nu_plugin_example/src/commands/generate.rs b/crates/nu_plugin_example/src/commands/generate.rs index 67d30dcb9c..9938518f4c 100644 --- a/crates/nu_plugin_example/src/commands/generate.rs +++ b/crates/nu_plugin_example/src/commands/generate.rs @@ -60,8 +60,8 @@ impl PluginCommand for Generate { call: &EvaluatedCall, _input: PipelineData, ) -> Result { + let head = call.head; let engine = engine.clone(); - let call = call.clone(); let initial: Value = call.req(0)?; let closure = call.req(1)?; @@ -83,9 +83,9 @@ impl PluginCommand for Generate { }) .transpose() }) - .map(|result| result.unwrap_or_else(|err| Value::error(err, call.head))) + .map(|result| result.unwrap_or_else(|err| Value::error(err, head))) }) - .into_pipeline_data(None)) + .into_pipeline_data(head, None)) } } diff --git a/crates/nu_plugin_example/src/commands/seq.rs b/crates/nu_plugin_example/src/commands/seq.rs index 52a21a646a..5652b3ef9a 100644 --- a/crates/nu_plugin_example/src/commands/seq.rs +++ b/crates/nu_plugin_example/src/commands/seq.rs @@ -50,12 +50,11 @@ impl PluginCommand for Seq { call: &EvaluatedCall, _input: PipelineData, ) -> Result { + let head = call.head; let first: i64 = call.req(0)?; let last: i64 = call.req(1)?; - let span = call.head; - let iter = (first..=last).map(move |number| Value::int(number, span)); - let list_stream = ListStream::from_stream(iter, None); - Ok(PipelineData::ListStream(list_stream, None)) + let iter = (first..=last).map(move |number| Value::int(number, head)); + Ok(ListStream::new(iter, head, None).into()) } }