Add tee command for operating on copies of streams (#11928)

<!--
if this PR closes one or more issues, you can automatically link the PR
with
them by using one of the [*linking
keywords*](https://docs.github.com/en/issues/tracking-your-work-with-issues/linking-a-pull-request-to-an-issue#linking-a-pull-request-to-an-issue-using-a-keyword),
e.g.
- this PR should close #xxxx
- fixes #xxxx

you can also mention related issues, PRs or discussions!
-->

[Related conversation on
Discord](https://discord.com/channels/601130461678272522/615329862395101194/1209951539901366292)

# Description
<!--
Thank you for improving Nushell. Please, check our [contributing
guide](../CONTRIBUTING.md) and talk to the core team before making major
changes.

Description of your pull request goes here. **Provide examples and/or
screenshots** if your changes affect the user experience.
-->

This is inspired by the Unix tee command, but significantly more
powerful. Rather than just writing to a file, you can do any kind of
stream operation that Nushell supports within the closure.

The equivalent of Unix `tee -a file.txt` would be, for example, `command
| tee { save -a file.txt }` - but of course this is Nushell, and you can
do the same with structured data to JSON objects, or even just run any
other command on the system with it.

A `--stderr` flag is provided for operating on the stderr stream from
external programs. This may produce unexpected results if the stderr
stream is not then also printed by something else - nushell currently
doesn't. See #11929 for the fix for that.

# User-Facing Changes
<!-- List of all changes that impact the user experience here. This
helps us keep track of breaking changes. -->

If someone was using the system `tee` command, they might be surprised
to find that it's different.

# Tests + Formatting
<!--
Don't forget to add tests that cover your changes.

Make sure you've run and fixed any issues with these commands:

- `cargo fmt --all -- --check` to check standard code formatting (`cargo
fmt --all` applies these changes)
- `cargo clippy --workspace -- -D warnings -D clippy::unwrap_used` to
check that you're using the standard code style
- `cargo test --workspace` to check that all tests pass (on Windows make
sure to [enable developer
mode](https://learn.microsoft.com/en-us/windows/apps/get-started/developer-mode-features-and-debugging))
- `cargo run -- -c "use std testing; testing run-tests --path
crates/nu-std"` to run the tests for the standard library

> **Note**
> from `nushell` you can also use the `toolkit` as follows
> ```bash
> use toolkit.nu # or use an `env_change` hook to activate it
automatically
> toolkit check pr
> ```
-->
- 🟢 `toolkit fmt`
- 🟢 `toolkit clippy`
- 🟢 `toolkit test`
- 🟢 `toolkit test stdlib`


# After Submitting
<!-- If your PR had any user-facing changes, update [the
documentation](https://github.com/nushell/nushell.github.io) after the
PR is merged, if necessary. This will help us keep the docs up to date.
-->
This commit is contained in:
Devyn Cairns 2024-02-28 15:08:31 -08:00 committed by GitHub
parent 0126620c19
commit e69a02d379
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 397 additions and 2 deletions

View file

@ -79,6 +79,7 @@ pub fn add_shell_command_context(mut engine_state: EngineState) -> EngineState {
Sort, Sort,
SortBy, SortBy,
SplitList, SplitList,
Tee,
Transpose, Transpose,
Uniq, Uniq,
UniqBy, UniqBy,

View file

@ -10,8 +10,8 @@ pub fn test_examples(cmd: impl Command + 'static) {
mod test_examples { mod test_examples {
use super::super::{ use super::super::{
Ansi, Date, Enumerate, Filter, First, Flatten, From, Get, Into, IntoDatetime, IntoString, Ansi, Date, Enumerate, Filter, First, Flatten, From, Get, Into, IntoDatetime, IntoString,
Math, MathRound, ParEach, Path, PathParse, Random, Seq, Sort, SortBy, Split, SplitColumn, Math, MathRound, MathSum, ParEach, Path, PathParse, Random, Seq, Sort, SortBy, Split,
SplitRow, Str, StrJoin, StrLength, StrReplace, Update, Url, Values, Wrap, SplitColumn, SplitRow, Str, StrJoin, StrLength, StrReplace, Update, Url, Values, Wrap,
}; };
use crate::{Default, Each, To}; use crate::{Default, Each, To};
use nu_cmd_lang::example_support::{ use nu_cmd_lang::example_support::{
@ -83,6 +83,7 @@ mod test_examples {
working_set.add_decl(Box::new(Let)); working_set.add_decl(Box::new(Let));
working_set.add_decl(Box::new(Math)); working_set.add_decl(Box::new(Math));
working_set.add_decl(Box::new(MathRound)); working_set.add_decl(Box::new(MathRound));
working_set.add_decl(Box::new(MathSum));
working_set.add_decl(Box::new(Mut)); working_set.add_decl(Box::new(Mut));
working_set.add_decl(Box::new(Path)); working_set.add_decl(Box::new(Path));
working_set.add_decl(Box::new(PathParse)); working_set.add_decl(Box::new(PathParse));

View file

@ -39,6 +39,7 @@ mod sort;
mod sort_by; mod sort_by;
mod split_by; mod split_by;
mod take; mod take;
mod tee;
mod transpose; mod transpose;
mod uniq; mod uniq;
mod uniq_by; mod uniq_by;
@ -92,6 +93,7 @@ pub use sort::Sort;
pub use sort_by::SortBy; pub use sort_by::SortBy;
pub use split_by::SplitBy; pub use split_by::SplitBy;
pub use take::*; pub use take::*;
pub use tee::Tee;
pub use transpose::Transpose; pub use transpose::Transpose;
pub use uniq::*; pub use uniq::*;
pub use uniq_by::UniqBy; pub use uniq_by::UniqBy;

View file

@ -0,0 +1,341 @@
use std::{sync::mpsc, thread};
use nu_engine::{eval_block_with_early_return, CallExt};
use nu_protocol::{
ast::Call,
engine::{Closure, Command, EngineState, Stack},
Category, Example, IntoInterruptiblePipelineData, PipelineData, RawStream, ShellError,
Signature, Spanned, SyntaxShape, Type, Value,
};
#[derive(Clone)]
pub struct Tee;
impl Command for Tee {
fn name(&self) -> &str {
"tee"
}
fn usage(&self) -> &str {
"Copy a stream to another command in parallel."
}
fn extra_usage(&self) -> &str {
r#"This is useful for doing something else with a stream while still continuing to
use it in your pipeline."#
}
fn signature(&self) -> Signature {
Signature::build("tee")
.input_output_type(Type::Any, Type::Any)
.switch(
"stderr",
"For external commands: copy the standard error stream instead.",
Some('e'),
)
.required(
"closure",
SyntaxShape::Closure(None),
"The other command to send the stream to.",
)
.category(Category::Filters)
}
fn examples(&self) -> Vec<Example> {
vec![
Example {
example: "http get http://example.org/ | tee { save example.html }",
description: "Save a webpage to a file while also printing it",
result: None,
},
Example {
example: "do { nu --commands 'print -e error; print ok' } | \
tee --stderr { save error.log } | complete",
description: "Save error messages from an external command to a file without \
redirecting them",
result: None,
},
Example {
example: "1..100 | tee { each { print } } | math sum | wrap sum",
description: "Print numbers and their sum",
result: None,
},
]
}
fn run(
&self,
engine_state: &EngineState,
stack: &mut Stack,
call: &Call,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
let use_stderr = call.has_flag(engine_state, stack, "stderr")?;
let Spanned {
item: Closure { block_id, captures },
span: closure_span,
} = call.req(engine_state, stack, 0)?;
let closure_engine_state = engine_state.clone();
let mut closure_stack = stack.captures_to_stack(captures);
let metadata = input.metadata();
let metadata_clone = metadata.clone();
match input {
// Handle external streams specially, to make sure they pass through
PipelineData::ExternalStream {
stdout,
stderr,
exit_code,
span,
metadata,
trim_end_newline,
} => {
let known_size = if use_stderr {
stderr.as_ref().and_then(|s| s.known_size)
} else {
stdout.as_ref().and_then(|s| s.known_size)
};
let with_stream = move |rx: mpsc::Receiver<Result<Vec<u8>, ShellError>>| {
let iter = rx.into_iter();
let input_from_channel = PipelineData::ExternalStream {
stdout: Some(RawStream::new(
Box::new(iter),
closure_engine_state.ctrlc.clone(),
span,
known_size,
)),
stderr: None,
exit_code: None,
span,
metadata: metadata_clone,
trim_end_newline,
};
let result = eval_block_with_early_return(
&closure_engine_state,
&mut closure_stack,
closure_engine_state.get_block(block_id),
input_from_channel,
false,
false,
);
// Make sure to drain any iterator produced to avoid unexpected behavior
result.and_then(|data| data.drain())
};
if use_stderr {
if let Some(stderr) = stderr {
let raw_stream = RawStream::new(
Box::new(tee(stderr.stream, with_stream).map(flatten_result)),
stderr.ctrlc,
stderr.span,
stderr.known_size,
);
Ok(PipelineData::ExternalStream {
stdout,
stderr: Some(raw_stream),
exit_code,
span,
metadata,
trim_end_newline,
})
} else {
// Throw an error if the stream doesn't have stderr. This is probably the
// user's mistake (e.g., forgetting to use `do`)
Err(ShellError::GenericError {
error: "Stream passed to `tee --stderr` does not have stderr".into(),
msg: "this stream does not contain stderr".into(),
span: Some(span),
help: Some(
"if this is an external command, you probably need to wrap \
it in `do { ... }`"
.into(),
),
inner: vec![],
})
}
} else {
let stdout = stdout.map(|stdout| {
RawStream::new(
Box::new(tee(stdout.stream, with_stream).map(flatten_result)),
stdout.ctrlc,
stdout.span,
stdout.known_size,
)
});
Ok(PipelineData::ExternalStream {
stdout,
stderr,
exit_code,
span,
metadata,
trim_end_newline,
})
}
}
// --stderr is not allowed if the input is not an external stream
_ if use_stderr => Err(ShellError::UnsupportedInput {
msg: "--stderr can only be used on external streams".into(),
input: "the input to `tee` is not an external stream".into(),
msg_span: call.head,
input_span: input.span().unwrap_or(call.head),
}),
// Handle others with the plain iterator
_ => {
let teed = tee(input.into_iter(), move |rx| {
let input_from_channel = rx.into_pipeline_data_with_metadata(
metadata_clone,
closure_engine_state.ctrlc.clone(),
);
let result = eval_block_with_early_return(
&closure_engine_state,
&mut closure_stack,
closure_engine_state.get_block(block_id),
input_from_channel,
false,
false,
);
// Make sure to drain any iterator produced to avoid unexpected behavior
result.and_then(|data| data.drain())
})
.map(move |result| result.unwrap_or_else(|err| Value::error(err, closure_span)))
.into_pipeline_data_with_metadata(metadata, engine_state.ctrlc.clone());
Ok(teed)
}
}
}
}
fn panic_error() -> ShellError {
ShellError::NushellFailed {
msg: "A panic occurred on a thread spawned by `tee`".into(),
}
}
fn flatten_result<T, E>(result: Result<Result<T, E>, E>) -> Result<T, E> {
result.unwrap_or_else(Err)
}
/// Copies the iterator to a channel on another thread. If an error is produced on that thread,
/// it is embedded in the resulting iterator as an `Err` as soon as possible. When the iterator
/// finishes, it waits for the other thread to finish, also handling any error produced at that
/// point.
fn tee<T>(
input: impl Iterator<Item = T>,
with_cloned_stream: impl FnOnce(mpsc::Receiver<T>) -> Result<(), ShellError> + Send + 'static,
) -> impl Iterator<Item = Result<T, ShellError>>
where
T: Clone + Send + 'static,
{
// For sending the values to the other thread
let (tx, rx) = mpsc::channel();
let mut thread = Some(
thread::Builder::new()
.name("stderr consumer".into())
.spawn(move || with_cloned_stream(rx))
.expect("could not create thread"),
);
let mut iter = input.into_iter();
let mut tx = Some(tx);
std::iter::from_fn(move || {
if thread.as_ref().is_some_and(|t| t.is_finished()) {
// Check for an error from the other thread
let result = thread
.take()
.expect("thread was taken early")
.join()
.unwrap_or_else(|_| Err(panic_error()));
if let Err(err) = result {
// Embed the error early
return Some(Err(err));
}
}
// Get a value from the iterator
if let Some(value) = iter.next() {
// Send a copy, ignoring any error if the channel is closed
let _ = tx.as_ref().map(|tx| tx.send(value.clone()));
Some(Ok(value))
} else {
// Close the channel so the stream ends for the other thread
drop(tx.take());
// Wait for the other thread, and embed any error produced
thread.take().and_then(|t| {
t.join()
.unwrap_or_else(|_| Err(panic_error()))
.err()
.map(Err)
})
}
})
}
#[test]
fn tee_copies_values_to_other_thread_and_passes_them_through() {
let (tx, rx) = mpsc::channel();
let expected_values = vec![1, 2, 3, 4];
let my_result = tee(expected_values.clone().into_iter(), move |rx| {
for val in rx {
let _ = tx.send(val);
}
Ok(())
})
.collect::<Result<Vec<i32>, ShellError>>()
.expect("should not produce error");
assert_eq!(expected_values, my_result);
let other_threads_result = rx.into_iter().collect::<Vec<_>>();
assert_eq!(expected_values, other_threads_result);
}
#[test]
fn tee_forwards_errors_back_immediately() {
use std::time::Duration;
let slow_input = (0..100).inspect(|_| std::thread::sleep(Duration::from_millis(1)));
let iter = tee(slow_input, |_| {
Err(ShellError::IOError { msg: "test".into() })
});
for result in iter {
if let Ok(val) = result {
// should not make it to the end
assert!(val < 99, "the error did not come early enough");
} else {
// got the error
return;
}
}
panic!("never received the error");
}
#[test]
fn tee_waits_for_the_other_thread() {
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use std::time::Duration;
let waited = Arc::new(AtomicBool::new(false));
let waited_clone = waited.clone();
let iter = tee(0..100, move |_| {
std::thread::sleep(Duration::from_millis(10));
waited_clone.store(true, Ordering::Relaxed);
Err(ShellError::IOError { msg: "test".into() })
});
let last = iter.last();
assert!(waited.load(Ordering::Relaxed), "failed to wait");
assert!(
last.is_some_and(|res| res.is_err()),
"failed to return error from wait"
);
}

View file

@ -104,6 +104,7 @@ mod split_row;
mod str_; mod str_;
mod table; mod table;
mod take; mod take;
mod tee;
mod terminal; mod terminal;
mod to_text; mod to_text;
mod touch; mod touch;

View file

@ -0,0 +1,49 @@
use nu_test_support::{fs::file_contents, nu, playground::Playground};
#[test]
fn tee_save_values_to_file() {
Playground::setup("tee_save_values_to_file_test", |dirs, _sandbox| {
let output = nu!(
cwd: dirs.test(),
r#"1..5 | tee { save copy.txt } | to text"#
);
assert_eq!("12345", output.out);
assert_eq!(
"1\n2\n3\n4\n5\n",
file_contents(dirs.test().join("copy.txt"))
);
})
}
#[test]
fn tee_save_stdout_to_file() {
Playground::setup("tee_save_stdout_to_file_test", |dirs, _sandbox| {
let output = nu!(
cwd: dirs.test(),
r#"
$env.FOO = "teststring"
nu --testbin echo_env FOO | tee { save copy.txt }
"#
);
assert_eq!("teststring", output.out);
assert_eq!("teststring\n", file_contents(dirs.test().join("copy.txt")));
})
}
#[test]
fn tee_save_stderr_to_file() {
Playground::setup("tee_save_stderr_to_file_test", |dirs, _sandbox| {
let output = nu!(
cwd: dirs.test(),
"\
$env.FOO = \"teststring\"; \
do { nu --testbin echo_env_stderr FOO } | \
tee --stderr { save copy.txt } | \
complete | \
get stderr
"
);
assert_eq!("teststring", output.out);
assert_eq!("teststring\n", file_contents(dirs.test().join("copy.txt")));
})
}