diff --git a/crates/nu-command/src/filters/tee.rs b/crates/nu-command/src/filters/tee.rs index 4e801bfcc8..db29d31e5f 100644 --- a/crates/nu-command/src/filters/tee.rs +++ b/crates/nu-command/src/filters/tee.rs @@ -1,11 +1,14 @@ use nu_engine::{command_prelude::*, get_eval_block_with_early_return}; use nu_protocol::{ - byte_stream::copy_with_signals, engine::Closure, process::ChildPipe, ByteStream, - ByteStreamSource, OutDest, PipelineMetadata, Signals, + byte_stream::copy_with_signals, engine::Closure, process::ChildPipe, report_error_new, + ByteStream, ByteStreamSource, OutDest, PipelineMetadata, Signals, }; use std::{ io::{self, Read, Write}, - sync::mpsc::{self, Sender}, + sync::{ + mpsc::{self, Sender}, + Arc, + }, thread::{self, JoinHandle}, }; @@ -61,6 +64,11 @@ use it in your pipeline."# description: "Print numbers and their sum", result: None, }, + Example { + example: "10000 | tee { 1..$in | print } | $in * 5", + description: "Do something with a value on another thread, while also passing through the value", + result: Some(Value::test_int(50000)), + } ] } @@ -78,8 +86,10 @@ use it in your pipeline."# let closure_span = closure.span; let closure = closure.item; + let engine_state_arc = Arc::new(engine_state.clone()); + let mut eval_block = { - let closure_engine_state = engine_state.clone(); + let closure_engine_state = engine_state_arc.clone(); let mut closure_stack = stack .captures_to_stack_preserve_out_dest(closure.captures) .reset_pipes(); @@ -97,8 +107,15 @@ use it in your pipeline."# } }; + // Convert values that can be represented as streams into streams. Streams can pass errors + // through later, so if we treat string/binary/list as a stream instead, it's likely that + // we can get the error back to the original thread. + let span = input.span().unwrap_or(head); + let input = input + .try_into_stream(engine_state) + .unwrap_or_else(|original_input| original_input); + if let PipelineData::ByteStream(stream, metadata) = input { - let span = stream.span(); let type_ = stream.type_(); let info = StreamInfo { @@ -228,22 +245,37 @@ use it in your pipeline."# return stderr_misuse(input.span().unwrap_or(head), head); } - let span = input.span().unwrap_or(head); let metadata = input.metadata(); let metadata_clone = metadata.clone(); - let signals = engine_state.signals().clone(); - Ok(tee(input.into_iter(), move |rx| { - let input = rx.into_pipeline_data_with_metadata(span, signals, metadata_clone); - eval_block(input) - }) - .err_span(call.head)? - .map(move |result| result.unwrap_or_else(|err| Value::error(err, closure_span))) - .into_pipeline_data_with_metadata( - span, - engine_state.signals().clone(), - metadata, - )) + if matches!(input, PipelineData::ListStream(..)) { + // Only use the iterator implementation on lists / list streams. We want to be able + // to preserve errors as much as possible, and only the stream implementations can + // really do that + let signals = engine_state.signals().clone(); + + Ok(tee(input.into_iter(), move |rx| { + let input = rx.into_pipeline_data_with_metadata(span, signals, metadata_clone); + eval_block(input) + }) + .err_span(call.head)? + .map(move |result| result.unwrap_or_else(|err| Value::error(err, closure_span))) + .into_pipeline_data_with_metadata( + span, + engine_state.signals().clone(), + metadata, + )) + } else { + // Otherwise, we can spawn a thread with the input value, but we have nowhere to + // send an error to other than just trying to print it to stderr. + let value = input.into_value(span)?; + let value_clone = value.clone(); + tee_once(engine_state_arc, move || { + eval_block(value_clone.into_pipeline_data_with_metadata(metadata_clone)) + }) + .err_span(call.head)?; + Ok(value.into_pipeline_data_with_metadata(metadata)) + } } } @@ -314,6 +346,18 @@ where })) } +/// "tee" for a single value. No stream handling, just spawns a thread, printing any resulting error +fn tee_once( + engine_state: Arc, + on_thread: impl FnOnce() -> Result<(), ShellError> + Send + 'static, +) -> Result, std::io::Error> { + thread::Builder::new().name("tee".into()).spawn(move || { + if let Err(err) = on_thread() { + report_error_new(&engine_state, &err); + } + }) +} + fn stderr_misuse(span: Span, head: Span) -> Result { Err(ShellError::UnsupportedInput { msg: "--stderr can only be used on external commands".into(), diff --git a/crates/nu-command/tests/commands/tee.rs b/crates/nu-command/tests/commands/tee.rs index 6a69d7fe6d..3f6ca38709 100644 --- a/crates/nu-command/tests/commands/tee.rs +++ b/crates/nu-command/tests/commands/tee.rs @@ -47,3 +47,23 @@ fn tee_save_stderr_to_file() { assert_eq!("teststring\n", file_contents(dirs.test().join("copy.txt"))); }) } + +#[test] +fn tee_single_value_streamable() { + let actual = nu!("'Hello, world!' | tee { print -e } | print"); + assert!(actual.status.success()); + assert_eq!("Hello, world!", actual.out); + // FIXME: note the lack of newline: this is a consequence of converting the string to a stream + // for now, but most likely the printer should be checking whether a string stream ends with a + // newline and adding it unless no_newline is true + assert_eq!("Hello, world!", actual.err); +} + +#[test] +fn tee_single_value_non_streamable() { + // Non-streamable values don't have any synchronization point, so we have to wait. + let actual = nu!("500 | tee { print -e } | print; sleep 1sec"); + assert!(actual.status.success()); + assert_eq!("500", actual.out); + assert_eq!("500\n", actual.err); +} diff --git a/crates/nu-protocol/src/pipeline/pipeline_data.rs b/crates/nu-protocol/src/pipeline/pipeline_data.rs index 33e932ecaf..85789b4f37 100644 --- a/crates/nu-protocol/src/pipeline/pipeline_data.rs +++ b/crates/nu-protocol/src/pipeline/pipeline_data.rs @@ -142,6 +142,40 @@ impl PipelineData { } } + /// Converts any `Value` variant that can be represented as a stream into its stream variant. + /// + /// This means that lists and ranges are converted into list streams, and strings and binary are + /// converted into byte streams. + /// + /// Returns an `Err` with the original stream if the variant couldn't be converted to a stream + /// variant. If the variant is already a stream variant, it is returned as-is. + pub fn try_into_stream(self, engine_state: &EngineState) -> Result { + let span = self.span().unwrap_or(Span::unknown()); + match self { + PipelineData::ListStream(..) | PipelineData::ByteStream(..) => Ok(self), + PipelineData::Value(Value::List { .. } | Value::Range { .. }, ref metadata) => { + let metadata = metadata.clone(); + Ok(PipelineData::ListStream( + ListStream::new(self.into_iter(), span, engine_state.signals().clone()), + metadata, + )) + } + PipelineData::Value(Value::String { val, .. }, metadata) => { + Ok(PipelineData::ByteStream( + ByteStream::read_string(val, span, engine_state.signals().clone()), + metadata, + )) + } + PipelineData::Value(Value::Binary { val, .. }, metadata) => { + Ok(PipelineData::ByteStream( + ByteStream::read_binary(val, span, engine_state.signals().clone()), + metadata, + )) + } + _ => Err(self), + } + } + /// Writes all values or redirects all output to the current [`OutDest`]s in `stack`. /// /// For [`OutDest::Pipe`] and [`OutDest::Capture`], this will return the `PipelineData` as is