Make tee work more nicely with non-collections (#13652)

# Description

This changes the behavior of `tee` to be more transparent when given a
value that isn't a list or range. Previously, anything that wasn't a
byte stream would converted to a list stream using the iterator
implementation, which led to some surprising results. Instead, now, if
the value is a string or binary, it will be treated the same way a byte
stream is, and the output of `tee` is a byte stream instead of the
original value. This is done so that we can synchronize with the other
thread on collect, and potentially capture any error produced by the
closure.

For values that can't be converted to streams, the closure is just run
with a clone of the value instead on another thread. Because we can't
wait for the other thread, there is no way to send an error back to the
original thread, so instead it's just written to stderr using
`report_error_new()`.

There are a couple of follow up edge cases I see where byte streams
aren't necessarily treated exactly the same way strings are, but this
should mostly be a good experience.

Fixes #13489.

# User-Facing Changes

Breaking change.

- `tee` now outputs and sends string/binary stream for string/binary
input.
- `tee` now outputs and sends the original value for any other input
other than lists/ranges.

# Tests + Formatting

Added for new behavior.

# After Submitting

- [ ] release notes: breaking change, command change
This commit is contained in:
Devyn Cairns 2024-09-01 10:03:46 -07:00 committed by GitHub
parent ee997ef3dd
commit 39bda8986e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 116 additions and 18 deletions

View file

@ -1,11 +1,14 @@
use nu_engine::{command_prelude::*, get_eval_block_with_early_return}; use nu_engine::{command_prelude::*, get_eval_block_with_early_return};
use nu_protocol::{ use nu_protocol::{
byte_stream::copy_with_signals, engine::Closure, process::ChildPipe, ByteStream, byte_stream::copy_with_signals, engine::Closure, process::ChildPipe, report_error_new,
ByteStreamSource, OutDest, PipelineMetadata, Signals, ByteStream, ByteStreamSource, OutDest, PipelineMetadata, Signals,
}; };
use std::{ use std::{
io::{self, Read, Write}, io::{self, Read, Write},
sync::mpsc::{self, Sender}, sync::{
mpsc::{self, Sender},
Arc,
},
thread::{self, JoinHandle}, thread::{self, JoinHandle},
}; };
@ -61,6 +64,11 @@ use it in your pipeline."#
description: "Print numbers and their sum", description: "Print numbers and their sum",
result: None, result: None,
}, },
Example {
example: "10000 | tee { 1..$in | print } | $in * 5",
description: "Do something with a value on another thread, while also passing through the value",
result: Some(Value::test_int(50000)),
}
] ]
} }
@ -78,8 +86,10 @@ use it in your pipeline."#
let closure_span = closure.span; let closure_span = closure.span;
let closure = closure.item; let closure = closure.item;
let engine_state_arc = Arc::new(engine_state.clone());
let mut eval_block = { let mut eval_block = {
let closure_engine_state = engine_state.clone(); let closure_engine_state = engine_state_arc.clone();
let mut closure_stack = stack let mut closure_stack = stack
.captures_to_stack_preserve_out_dest(closure.captures) .captures_to_stack_preserve_out_dest(closure.captures)
.reset_pipes(); .reset_pipes();
@ -97,8 +107,15 @@ use it in your pipeline."#
} }
}; };
// Convert values that can be represented as streams into streams. Streams can pass errors
// through later, so if we treat string/binary/list as a stream instead, it's likely that
// we can get the error back to the original thread.
let span = input.span().unwrap_or(head);
let input = input
.try_into_stream(engine_state)
.unwrap_or_else(|original_input| original_input);
if let PipelineData::ByteStream(stream, metadata) = input { if let PipelineData::ByteStream(stream, metadata) = input {
let span = stream.span();
let type_ = stream.type_(); let type_ = stream.type_();
let info = StreamInfo { let info = StreamInfo {
@ -228,9 +245,13 @@ use it in your pipeline."#
return stderr_misuse(input.span().unwrap_or(head), head); return stderr_misuse(input.span().unwrap_or(head), head);
} }
let span = input.span().unwrap_or(head);
let metadata = input.metadata(); let metadata = input.metadata();
let metadata_clone = metadata.clone(); let metadata_clone = metadata.clone();
if matches!(input, PipelineData::ListStream(..)) {
// Only use the iterator implementation on lists / list streams. We want to be able
// to preserve errors as much as possible, and only the stream implementations can
// really do that
let signals = engine_state.signals().clone(); let signals = engine_state.signals().clone();
Ok(tee(input.into_iter(), move |rx| { Ok(tee(input.into_iter(), move |rx| {
@ -244,6 +265,17 @@ use it in your pipeline."#
engine_state.signals().clone(), engine_state.signals().clone(),
metadata, metadata,
)) ))
} else {
// Otherwise, we can spawn a thread with the input value, but we have nowhere to
// send an error to other than just trying to print it to stderr.
let value = input.into_value(span)?;
let value_clone = value.clone();
tee_once(engine_state_arc, move || {
eval_block(value_clone.into_pipeline_data_with_metadata(metadata_clone))
})
.err_span(call.head)?;
Ok(value.into_pipeline_data_with_metadata(metadata))
}
} }
} }
@ -314,6 +346,18 @@ where
})) }))
} }
/// "tee" for a single value. No stream handling, just spawns a thread, printing any resulting error
fn tee_once(
engine_state: Arc<EngineState>,
on_thread: impl FnOnce() -> Result<(), ShellError> + Send + 'static,
) -> Result<JoinHandle<()>, std::io::Error> {
thread::Builder::new().name("tee".into()).spawn(move || {
if let Err(err) = on_thread() {
report_error_new(&engine_state, &err);
}
})
}
fn stderr_misuse<T>(span: Span, head: Span) -> Result<T, ShellError> { fn stderr_misuse<T>(span: Span, head: Span) -> Result<T, ShellError> {
Err(ShellError::UnsupportedInput { Err(ShellError::UnsupportedInput {
msg: "--stderr can only be used on external commands".into(), msg: "--stderr can only be used on external commands".into(),

View file

@ -47,3 +47,23 @@ fn tee_save_stderr_to_file() {
assert_eq!("teststring\n", file_contents(dirs.test().join("copy.txt"))); assert_eq!("teststring\n", file_contents(dirs.test().join("copy.txt")));
}) })
} }
#[test]
fn tee_single_value_streamable() {
let actual = nu!("'Hello, world!' | tee { print -e } | print");
assert!(actual.status.success());
assert_eq!("Hello, world!", actual.out);
// FIXME: note the lack of newline: this is a consequence of converting the string to a stream
// for now, but most likely the printer should be checking whether a string stream ends with a
// newline and adding it unless no_newline is true
assert_eq!("Hello, world!", actual.err);
}
#[test]
fn tee_single_value_non_streamable() {
// Non-streamable values don't have any synchronization point, so we have to wait.
let actual = nu!("500 | tee { print -e } | print; sleep 1sec");
assert!(actual.status.success());
assert_eq!("500", actual.out);
assert_eq!("500\n", actual.err);
}

View file

@ -142,6 +142,40 @@ impl PipelineData {
} }
} }
/// Converts any `Value` variant that can be represented as a stream into its stream variant.
///
/// This means that lists and ranges are converted into list streams, and strings and binary are
/// converted into byte streams.
///
/// Returns an `Err` with the original stream if the variant couldn't be converted to a stream
/// variant. If the variant is already a stream variant, it is returned as-is.
pub fn try_into_stream(self, engine_state: &EngineState) -> Result<PipelineData, PipelineData> {
let span = self.span().unwrap_or(Span::unknown());
match self {
PipelineData::ListStream(..) | PipelineData::ByteStream(..) => Ok(self),
PipelineData::Value(Value::List { .. } | Value::Range { .. }, ref metadata) => {
let metadata = metadata.clone();
Ok(PipelineData::ListStream(
ListStream::new(self.into_iter(), span, engine_state.signals().clone()),
metadata,
))
}
PipelineData::Value(Value::String { val, .. }, metadata) => {
Ok(PipelineData::ByteStream(
ByteStream::read_string(val, span, engine_state.signals().clone()),
metadata,
))
}
PipelineData::Value(Value::Binary { val, .. }, metadata) => {
Ok(PipelineData::ByteStream(
ByteStream::read_binary(val, span, engine_state.signals().clone()),
metadata,
))
}
_ => Err(self),
}
}
/// Writes all values or redirects all output to the current [`OutDest`]s in `stack`. /// Writes all values or redirects all output to the current [`OutDest`]s in `stack`.
/// ///
/// For [`OutDest::Pipe`] and [`OutDest::Capture`], this will return the `PipelineData` as is /// For [`OutDest::Pipe`] and [`OutDest::Capture`], this will return the `PipelineData` as is