mirror of
https://github.com/nushell/nushell
synced 2024-12-27 05:23:11 +00:00
Add interleave command for reading multiple streams in parallel (#11955)
<!-- if this PR closes one or more issues, you can automatically link the PR with them by using one of the [*linking keywords*](https://docs.github.com/en/issues/tracking-your-work-with-issues/linking-a-pull-request-to-an-issue#linking-a-pull-request-to-an-issue-using-a-keyword), e.g. - this PR should close #xxxx - fixes #xxxx you can also mention related issues, PRs or discussions! --> # Description <!-- Thank you for improving Nushell. Please, check our [contributing guide](../CONTRIBUTING.md) and talk to the core team before making major changes. Description of your pull request goes here. **Provide examples and/or screenshots** if your changes affect the user experience. --> This command mixes input from multiple sources and sends items to the final stream as soon as they're available. It can be called as part of a pipeline with input, or it can take multiple closures and mix them that way. See `crates/nu-command/tests/commands/interleave.rs` for a practical example. I imagine this will be most often used to run multiple commands in parallel and print their outputs line-by-line. A stdlib command could potentially use `interleave` to make this particular use case easier. It's quite common to wish that nushell had a command for running things in the background, and instead of providing job control, this provides an alternative to some use cases for that by just allowing multiple commands to run simultaneously and direct their output to the same place. This enables certain things that are not possible with `par-each` - for example, you may wish to run `make` across several projects in parallel: ```nushell (ls projects).name | par-each { |project| cd $project; make } ``` This works well enough, but the output will only be available after each `make` command finishes. `interleave` allows you to get each line: ```nushell interleave ...( (ls projects).name | each { |project| { cd $project make | lines | each { |line| {project: $project, out: $line} } } } ) ``` The result of this is a stream that you could process further - for example, by saving to a text file. Note that the closures themselves are not run in parallel. The initial execution happens serially, and then the streams are consumed in parallel. # User-Facing Changes <!-- List of all changes that impact the user experience here. This helps us keep track of breaking changes. --> Adds a new command. # Tests + Formatting <!-- Don't forget to add tests that cover your changes. Make sure you've run and fixed any issues with these commands: - `cargo fmt --all -- --check` to check standard code formatting (`cargo fmt --all` applies these changes) - `cargo clippy --workspace -- -D warnings -D clippy::unwrap_used` to check that you're using the standard code style - `cargo test --workspace` to check that all tests pass (on Windows make sure to [enable developer mode](https://learn.microsoft.com/en-us/windows/apps/get-started/developer-mode-features-and-debugging)) - `cargo run -- -c "use std testing; testing run-tests --path crates/nu-std"` to run the tests for the standard library > **Note** > from `nushell` you can also use the `toolkit` as follows > ```bash > use toolkit.nu # or use an `env_change` hook to activate it automatically > toolkit check pr > ``` --> - 🟢 `toolkit fmt` - 🟢 `toolkit clippy` - 🟢 `toolkit test` - 🟢 `toolkit test stdlib` # After Submitting <!-- If your PR had any user-facing changes, update [the documentation](https://github.com/nushell/nushell.github.io) after the PR is merged, if necessary. This will help us keep the docs up to date. -->
This commit is contained in:
parent
8948c350d4
commit
872aa78373
6 changed files with 199 additions and 2 deletions
|
@ -54,6 +54,7 @@ pub fn add_shell_command_context(mut engine_state: EngineState) -> EngineState {
|
||||||
Insert,
|
Insert,
|
||||||
IsEmpty,
|
IsEmpty,
|
||||||
IsNotEmpty,
|
IsNotEmpty,
|
||||||
|
Interleave,
|
||||||
Items,
|
Items,
|
||||||
Join,
|
Join,
|
||||||
SplitBy,
|
SplitBy,
|
||||||
|
|
|
@ -10,8 +10,9 @@ pub fn test_examples(cmd: impl Command + 'static) {
|
||||||
mod test_examples {
|
mod test_examples {
|
||||||
use super::super::{
|
use super::super::{
|
||||||
Ansi, Date, Enumerate, Filter, First, Flatten, From, Get, Into, IntoDatetime, IntoString,
|
Ansi, Date, Enumerate, Filter, First, Flatten, From, Get, Into, IntoDatetime, IntoString,
|
||||||
Math, MathRound, MathSum, ParEach, Path, PathParse, Random, Seq, Sort, SortBy, Split,
|
Lines, Math, MathRound, MathSum, ParEach, Path, PathParse, Random, Seq, Sort, SortBy,
|
||||||
SplitColumn, SplitRow, Str, StrJoin, StrLength, StrReplace, Update, Url, Values, Wrap,
|
Split, SplitColumn, SplitRow, Str, StrJoin, StrLength, StrReplace, Update, Url, Values,
|
||||||
|
Wrap,
|
||||||
};
|
};
|
||||||
use crate::{Default, Each, To};
|
use crate::{Default, Each, To};
|
||||||
use nu_cmd_lang::example_support::{
|
use nu_cmd_lang::example_support::{
|
||||||
|
@ -81,6 +82,7 @@ mod test_examples {
|
||||||
working_set.add_decl(Box::new(IntoString));
|
working_set.add_decl(Box::new(IntoString));
|
||||||
working_set.add_decl(Box::new(IntoDatetime));
|
working_set.add_decl(Box::new(IntoDatetime));
|
||||||
working_set.add_decl(Box::new(Let));
|
working_set.add_decl(Box::new(Let));
|
||||||
|
working_set.add_decl(Box::new(Lines));
|
||||||
working_set.add_decl(Box::new(Math));
|
working_set.add_decl(Box::new(Math));
|
||||||
working_set.add_decl(Box::new(MathRound));
|
working_set.add_decl(Box::new(MathRound));
|
||||||
working_set.add_decl(Box::new(MathSum));
|
working_set.add_decl(Box::new(MathSum));
|
||||||
|
|
178
crates/nu-command/src/filters/interleave.rs
Normal file
178
crates/nu-command/src/filters/interleave.rs
Normal file
|
@ -0,0 +1,178 @@
|
||||||
|
use std::{sync::mpsc, thread};
|
||||||
|
|
||||||
|
use nu_engine::{eval_block_with_early_return, CallExt};
|
||||||
|
use nu_protocol::{
|
||||||
|
ast::Call,
|
||||||
|
engine::{Closure, Command, EngineState, Stack},
|
||||||
|
Category, Example, IntoInterruptiblePipelineData, PipelineData, ShellError, Signature,
|
||||||
|
SyntaxShape, Type, Value,
|
||||||
|
};
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct Interleave;
|
||||||
|
|
||||||
|
impl Command for Interleave {
|
||||||
|
fn name(&self) -> &str {
|
||||||
|
"interleave"
|
||||||
|
}
|
||||||
|
|
||||||
|
fn usage(&self) -> &str {
|
||||||
|
"Read multiple streams in parallel and combine them into one stream."
|
||||||
|
}
|
||||||
|
|
||||||
|
fn extra_usage(&self) -> &str {
|
||||||
|
r#"This combinator is useful for reading output from multiple commands.
|
||||||
|
|
||||||
|
If input is provided to `interleave`, the input will be combined with the
|
||||||
|
output of the closures. This enables `interleave` to be used at any position
|
||||||
|
within a pipeline.
|
||||||
|
|
||||||
|
Because items from each stream will be inserted into the final stream as soon
|
||||||
|
as they are available, there is no guarantee of how the final output will be
|
||||||
|
ordered. However, the order of items from any given stream is guaranteed to be
|
||||||
|
preserved as they were in that stream.
|
||||||
|
|
||||||
|
If interleaving streams in a fair (round-robin) manner is desired, consider
|
||||||
|
using `zip { ... } | flatten` instead."#
|
||||||
|
}
|
||||||
|
|
||||||
|
fn signature(&self) -> Signature {
|
||||||
|
Signature::build("interleave")
|
||||||
|
.input_output_types(vec![
|
||||||
|
(Type::List(Type::Any.into()), Type::List(Type::Any.into())),
|
||||||
|
(Type::Nothing, Type::List(Type::Any.into())),
|
||||||
|
])
|
||||||
|
.named(
|
||||||
|
"buffer-size",
|
||||||
|
SyntaxShape::Int,
|
||||||
|
"Number of items to buffer from the streams. Increases memory usage, but can help \
|
||||||
|
performance when lots of output is produced.",
|
||||||
|
Some('b'),
|
||||||
|
)
|
||||||
|
.rest(
|
||||||
|
"closures",
|
||||||
|
SyntaxShape::Closure(None),
|
||||||
|
"The closures that will generate streams to be combined.",
|
||||||
|
)
|
||||||
|
.allow_variants_without_examples(true)
|
||||||
|
.category(Category::Filters)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn examples(&self) -> Vec<Example> {
|
||||||
|
vec![
|
||||||
|
Example {
|
||||||
|
example: "seq 1 50 | wrap a | interleave { seq 1 50 | wrap b }",
|
||||||
|
description: r#"Read two sequences of numbers into separate columns of a table.
|
||||||
|
Note that the order of rows with 'a' columns and rows with 'b' columns is arbitrary."#,
|
||||||
|
result: None,
|
||||||
|
},
|
||||||
|
Example {
|
||||||
|
example: "seq 1 3 | interleave { seq 4 6 } | sort",
|
||||||
|
description: "Read two sequences of numbers, one from input. Sort for consistency.",
|
||||||
|
result: Some(Value::test_list(vec![
|
||||||
|
Value::test_int(1),
|
||||||
|
Value::test_int(2),
|
||||||
|
Value::test_int(3),
|
||||||
|
Value::test_int(4),
|
||||||
|
Value::test_int(5),
|
||||||
|
Value::test_int(6),
|
||||||
|
])),
|
||||||
|
},
|
||||||
|
Example {
|
||||||
|
example: r#"interleave { "foo\nbar\n" | lines } { "baz\nquux\n" | lines } | sort"#,
|
||||||
|
description: "Read two sequences, but without any input. Sort for consistency.",
|
||||||
|
result: Some(Value::test_list(vec![
|
||||||
|
Value::test_string("bar"),
|
||||||
|
Value::test_string("baz"),
|
||||||
|
Value::test_string("foo"),
|
||||||
|
Value::test_string("quux"),
|
||||||
|
])),
|
||||||
|
},
|
||||||
|
Example {
|
||||||
|
example: r#"(
|
||||||
|
interleave
|
||||||
|
{ nu -c "print hello; print world" | lines | each { "greeter: " ++ $in } }
|
||||||
|
{ nu -c "print nushell; print rocks" | lines | each { "evangelist: " ++ $in } }
|
||||||
|
)"#,
|
||||||
|
description: "Run two commands in parallel and annotate their output.",
|
||||||
|
result: None,
|
||||||
|
},
|
||||||
|
Example {
|
||||||
|
example: "seq 1 20000 | interleave --buffer-size 16 { seq 1 20000 } | math sum",
|
||||||
|
description: "Use a buffer to increase the performance of high-volume streams.",
|
||||||
|
result: None,
|
||||||
|
},
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
fn run(
|
||||||
|
&self,
|
||||||
|
engine_state: &EngineState,
|
||||||
|
stack: &mut Stack,
|
||||||
|
call: &Call,
|
||||||
|
input: PipelineData,
|
||||||
|
) -> Result<PipelineData, ShellError> {
|
||||||
|
let buffer_size: usize = call
|
||||||
|
.get_flag(engine_state, stack, "buffer-size")?
|
||||||
|
.unwrap_or(0);
|
||||||
|
let (tx, rx) = mpsc::sync_channel(buffer_size);
|
||||||
|
|
||||||
|
let closures: Vec<Closure> = call.rest(engine_state, stack, 0)?;
|
||||||
|
|
||||||
|
// Spawn the threads for the input and closure outputs
|
||||||
|
(!input.is_nothing())
|
||||||
|
.then(|| Ok(input))
|
||||||
|
.into_iter()
|
||||||
|
.chain(closures.into_iter().map(|closure| {
|
||||||
|
// Evaluate the closure on this thread
|
||||||
|
let block = engine_state.get_block(closure.block_id);
|
||||||
|
let mut stack = stack.captures_to_stack(closure.captures);
|
||||||
|
eval_block_with_early_return(
|
||||||
|
engine_state,
|
||||||
|
&mut stack,
|
||||||
|
block,
|
||||||
|
PipelineData::Empty,
|
||||||
|
true,
|
||||||
|
false,
|
||||||
|
)
|
||||||
|
}))
|
||||||
|
.try_for_each(|stream| {
|
||||||
|
stream.and_then(|stream| {
|
||||||
|
// Then take the stream and spawn a thread to send it to our channel
|
||||||
|
let tx = tx.clone();
|
||||||
|
thread::Builder::new()
|
||||||
|
.name("interleave consumer".into())
|
||||||
|
.spawn(move || {
|
||||||
|
for value in stream {
|
||||||
|
if tx.send(value).is_err() {
|
||||||
|
// Stop sending if the channel is dropped
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.map(|_| ())
|
||||||
|
.map_err(|err| ShellError::IOErrorSpanned {
|
||||||
|
msg: err.to_string(),
|
||||||
|
span: call.head,
|
||||||
|
})
|
||||||
|
})
|
||||||
|
})?;
|
||||||
|
|
||||||
|
// Now that threads are writing to the channel, we just return it as a stream
|
||||||
|
Ok(rx
|
||||||
|
.into_iter()
|
||||||
|
.into_pipeline_data(engine_state.ctrlc.clone()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod test {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_examples() {
|
||||||
|
use crate::test_examples;
|
||||||
|
|
||||||
|
test_examples(Interleave {})
|
||||||
|
}
|
||||||
|
}
|
|
@ -18,6 +18,7 @@ mod group;
|
||||||
mod group_by;
|
mod group_by;
|
||||||
mod headers;
|
mod headers;
|
||||||
mod insert;
|
mod insert;
|
||||||
|
mod interleave;
|
||||||
mod is_empty;
|
mod is_empty;
|
||||||
mod is_not_empty;
|
mod is_not_empty;
|
||||||
mod items;
|
mod items;
|
||||||
|
@ -74,6 +75,7 @@ pub use group::Group;
|
||||||
pub use group_by::GroupBy;
|
pub use group_by::GroupBy;
|
||||||
pub use headers::Headers;
|
pub use headers::Headers;
|
||||||
pub use insert::Insert;
|
pub use insert::Insert;
|
||||||
|
pub use interleave::Interleave;
|
||||||
pub use is_empty::IsEmpty;
|
pub use is_empty::IsEmpty;
|
||||||
pub use is_not_empty::IsNotEmpty;
|
pub use is_not_empty::IsNotEmpty;
|
||||||
pub use items::Items;
|
pub use items::Items;
|
||||||
|
|
13
crates/nu-command/tests/commands/interleave.rs
Normal file
13
crates/nu-command/tests/commands/interleave.rs
Normal file
|
@ -0,0 +1,13 @@
|
||||||
|
use nu_test_support::nu;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn interleave_external_commands() {
|
||||||
|
let result = nu!("interleave \
|
||||||
|
{ nu -c 'print hello; print world' | lines | each { 'greeter: ' ++ $in } } \
|
||||||
|
{ nu -c 'print nushell; print rocks' | lines | each { 'evangelist: ' ++ $in } } | \
|
||||||
|
each { print }; null");
|
||||||
|
assert!(result.out.contains("greeter: hello"), "{}", result.out);
|
||||||
|
assert!(result.out.contains("greeter: world"), "{}", result.out);
|
||||||
|
assert!(result.out.contains("evangelist: nushell"), "{}", result.out);
|
||||||
|
assert!(result.out.contains("evangelist: rocks"), "{}", result.out);
|
||||||
|
}
|
|
@ -47,6 +47,7 @@ mod help;
|
||||||
mod histogram;
|
mod histogram;
|
||||||
mod insert;
|
mod insert;
|
||||||
mod inspect;
|
mod inspect;
|
||||||
|
mod interleave;
|
||||||
mod into_datetime;
|
mod into_datetime;
|
||||||
mod into_filesize;
|
mod into_filesize;
|
||||||
mod into_int;
|
mod into_int;
|
||||||
|
|
Loading…
Reference in a new issue