Keep order for par-each (#10249)

# Description
This PR adds new flag `--keep-order/-k` for the `par_each` filter. This
flag keeps sequence of output same as the order of input.

Output without the flag:
```nu
> 1..6 | par-each {|n| $n * 2 }
╭────╮
│  4 │
│ 10 │
│  2 │
│  8 │
│ 12 │
│  6 │
╰────╯
```

Output with the `--keep-order` flag:
```nu
> 1..6 | par-each --keep-order {|n| $n * 2 }
╭────╮
│  2 │
│  4 │
│  6 │
│  8 │
│ 10 │
│ 12 │
╰────╯
```

I think the presence of this flag is justified, since:
- Much easier to use than `.. | enumerate | par-each {|p| update item
..} | sort-by index | get item`
- Faster, as it uses internally parallel sorting in the same thread pool

A note about naming: it may conflict with `--keep-empty/-k` flag of the
`each` filter if the same feature will be used in `par-each`, so maybe
it needs some other name.
This commit is contained in:
Nano 2023-09-11 23:42:09 +12:00 committed by GitHub
parent eddff46155
commit 7b89fab327
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -37,6 +37,11 @@ impl Command for ParEach {
"the number of threads to use", "the number of threads to use",
Some('t'), Some('t'),
) )
.switch(
"keep-order",
"keep sequence of output same as the order of input",
Some('k'),
)
.required( .required(
"closure", "closure",
SyntaxShape::Closure(Some(vec![SyntaxShape::Any, SyntaxShape::Int])), SyntaxShape::Closure(Some(vec![SyntaxShape::Any, SyntaxShape::Int])),
@ -49,39 +54,43 @@ impl Command for ParEach {
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {
vec![ vec![
Example { Example {
example: "[1 2 3] | par-each {|| 2 * $in }", example: "[1 2 3] | par-each {|e| $e * 2 }",
description: description:
"Multiplies each number. Note that the list will become arbitrarily disordered.", "Multiplies each number. Note that the list will become arbitrarily disordered.",
result: None, result: None,
}, },
Example { Example {
example: r#"[foo bar baz] | par-each {|e| $e + '!' } | sort"#, example: r#"[1 2 3] | par-each --keep-order {|e| $e * 2 }"#,
description: "Output can still be sorted afterward", description: "Multiplies each number, keeping an original order",
result: Some(Value::list( result: Some(Value::test_list(vec![
vec![ Value::test_int(2),
Value::test_string("bar!"), Value::test_int(4),
Value::test_string("baz!"), Value::test_int(6),
Value::test_string("foo!"), ])),
],
Span::test_data(),
)),
}, },
Example { Example {
example: r#"1..3 | enumerate | par-each {|p| update item ($p.item * 2)} | sort-by item | get item"#, example: r#"1..3 | enumerate | par-each {|p| update item ($p.item * 2)} | sort-by item | get item"#,
description: "Enumerate and sort-by can be used to reconstruct the original order", description: "Enumerate and sort-by can be used to reconstruct the original order",
result: Some(Value::list( result: Some(Value::test_list(vec![
vec![Value::test_int(2), Value::test_int(4), Value::test_int(6)], Value::test_int(2),
Span::test_data(), Value::test_int(4),
)), Value::test_int(6),
])),
},
Example {
example: r#"[foo bar baz] | par-each {|e| $e + '!' } | sort"#,
description: "Output can still be sorted afterward",
result: Some(Value::test_list(vec![
Value::test_string("bar!"),
Value::test_string("baz!"),
Value::test_string("foo!"),
])),
}, },
Example { Example {
example: r#"[1 2 3] | enumerate | par-each { |e| if $e.item == 2 { $"found 2 at ($e.index)!"} }"#, example: r#"[1 2 3] | enumerate | par-each { |e| if $e.item == 2 { $"found 2 at ($e.index)!"} }"#,
description: description:
"Iterate over each element, producing a list showing indexes of any 2s", "Iterate over each element, producing a list showing indexes of any 2s",
result: Some(Value::list( result: Some(Value::test_list(vec![Value::test_string("found 2 at 1!")])),
vec![Value::test_string("found 2 at 1!")],
Span::test_data(),
)),
}, },
] ]
} }
@ -114,6 +123,7 @@ impl Command for ParEach {
let capture_block: Closure = call.req(engine_state, stack, 0)?; let capture_block: Closure = call.req(engine_state, stack, 0)?;
let threads: Option<usize> = call.get_flag(engine_state, stack, "threads")?; let threads: Option<usize> = call.get_flag(engine_state, stack, "threads")?;
let max_threads = threads.unwrap_or(0); let max_threads = threads.unwrap_or(0);
let keep_order = call.has_flag("keep-order");
let metadata = input.metadata(); let metadata = input.metadata();
let ctrlc = engine_state.ctrlc.clone(); let ctrlc = engine_state.ctrlc.clone();
let outer_ctrlc = engine_state.ctrlc.clone(); let outer_ctrlc = engine_state.ctrlc.clone();
@ -123,14 +133,27 @@ impl Command for ParEach {
let redirect_stdout = call.redirect_stdout; let redirect_stdout = call.redirect_stdout;
let redirect_stderr = call.redirect_stderr; let redirect_stderr = call.redirect_stderr;
// A helper function sorts the output if needed
let apply_order = |mut vec: Vec<(usize, Value)>| {
if keep_order {
// It runs inside the rayon's thread pool so parallel sorting can be used.
// There are no identical indexes, so unstable sorting can be used.
vec.par_sort_unstable_by_key(|(index, _)| *index);
}
vec.into_iter().map(|(_, val)| val)
};
match input { match input {
PipelineData::Empty => Ok(PipelineData::Empty), PipelineData::Empty => Ok(PipelineData::Empty),
PipelineData::Value(Value::Range { val, .. }, ..) => Ok(create_pool(max_threads)? PipelineData::Value(Value::Range { val, .. }, ..) => Ok(create_pool(max_threads)?
.install(|| { .install(|| {
val.into_range_iter(ctrlc.clone()) let vec = val
.into_range_iter(ctrlc.clone())
.expect("unable to create a range iterator") .expect("unable to create a range iterator")
.enumerate()
.par_bridge() .par_bridge()
.map(move |x| { .map(move |(index, x)| {
let block = engine_state.get_block(block_id); let block = engine_state.get_block(block_id);
let mut stack = stack.clone(); let mut stack = stack.clone();
@ -144,7 +167,7 @@ impl Command for ParEach {
let val_span = x.span(); let val_span = x.span();
let x_is_error = x.is_error(); let x_is_error = x.is_error();
match eval_block_with_early_return( let val = match eval_block_with_early_return(
engine_state, engine_state,
&mut stack, &mut stack,
block, block,
@ -153,21 +176,24 @@ impl Command for ParEach {
redirect_stderr, redirect_stderr,
) { ) {
Ok(v) => v.into_value(span), Ok(v) => v.into_value(span),
Err(error) => Value::error( Err(error) => Value::error(
chain_error_with_input(error, x_is_error, val_span), chain_error_with_input(error, x_is_error, val_span),
val_span, val_span,
), ),
} };
(index, val)
}) })
.collect::<Vec<_>>() .collect::<Vec<_>>();
.into_iter()
.into_pipeline_data(ctrlc) apply_order(vec).into_pipeline_data(ctrlc)
})), })),
PipelineData::Value(Value::List { vals: val, .. }, ..) => Ok(create_pool(max_threads)? PipelineData::Value(Value::List { vals: val, .. }, ..) => Ok(create_pool(max_threads)?
.install(|| { .install(|| {
val.par_iter() let vec = val
.map(move |x| { .par_iter()
.enumerate()
.map(move |(index, x)| {
let block = engine_state.get_block(block_id); let block = engine_state.get_block(block_id);
let mut stack = stack.clone(); let mut stack = stack.clone();
@ -181,7 +207,7 @@ impl Command for ParEach {
let val_span = x.span(); let val_span = x.span();
let x_is_error = x.is_error(); let x_is_error = x.is_error();
match eval_block_with_early_return( let val = match eval_block_with_early_return(
engine_state, engine_state,
&mut stack, &mut stack,
block, block,
@ -194,16 +220,19 @@ impl Command for ParEach {
chain_error_with_input(error, x_is_error, val_span), chain_error_with_input(error, x_is_error, val_span),
val_span, val_span,
), ),
} };
(index, val)
}) })
.collect::<Vec<_>>() .collect::<Vec<_>>();
.into_iter()
.into_pipeline_data(ctrlc) apply_order(vec).into_pipeline_data(ctrlc)
})), })),
PipelineData::ListStream(stream, ..) => Ok(create_pool(max_threads)?.install(|| { PipelineData::ListStream(stream, ..) => Ok(create_pool(max_threads)?.install(|| {
stream let vec = stream
.enumerate()
.par_bridge() .par_bridge()
.map(move |x| { .map(move |(index, x)| {
let block = engine_state.get_block(block_id); let block = engine_state.get_block(block_id);
let mut stack = stack.clone(); let mut stack = stack.clone();
@ -217,7 +246,7 @@ impl Command for ParEach {
let val_span = x.span(); let val_span = x.span();
let x_is_error = x.is_error(); let x_is_error = x.is_error();
match eval_block_with_early_return( let val = match eval_block_with_early_return(
engine_state, engine_state,
&mut stack, &mut stack,
block, block,
@ -230,23 +259,26 @@ impl Command for ParEach {
chain_error_with_input(error, x_is_error, val_span), chain_error_with_input(error, x_is_error, val_span),
val_span, val_span,
), ),
} };
(index, val)
}) })
.collect::<Vec<_>>() .collect::<Vec<_>>();
.into_iter()
.into_pipeline_data(ctrlc) apply_order(vec).into_pipeline_data(ctrlc)
})), })),
PipelineData::ExternalStream { stdout: None, .. } => Ok(PipelineData::empty()), PipelineData::ExternalStream { stdout: None, .. } => Ok(PipelineData::empty()),
PipelineData::ExternalStream { PipelineData::ExternalStream {
stdout: Some(stream), stdout: Some(stream),
.. ..
} => Ok(create_pool(max_threads)?.install(|| { } => Ok(create_pool(max_threads)?.install(|| {
stream let vec = stream
.enumerate()
.par_bridge() .par_bridge()
.map(move |x| { .map(move |(index, x)| {
let x = match x { let x = match x {
Ok(x) => x, Ok(x) => x,
Err(err) => return Value::error(err, span), Err(err) => return (index, Value::error(err, span)),
}; };
let block = engine_state.get_block(block_id); let block = engine_state.get_block(block_id);
@ -259,7 +291,7 @@ impl Command for ParEach {
} }
} }
match eval_block_with_early_return( let val = match eval_block_with_early_return(
engine_state, engine_state,
&mut stack, &mut stack,
block, block,
@ -269,11 +301,13 @@ impl Command for ParEach {
) { ) {
Ok(v) => v.into_value(span), Ok(v) => v.into_value(span),
Err(error) => Value::error(error, span), Err(error) => Value::error(error, span),
} };
(index, val)
}) })
.collect::<Vec<_>>() .collect::<Vec<_>>();
.into_iter()
.into_pipeline_data(ctrlc) apply_order(vec).into_pipeline_data(ctrlc)
})), })),
// This match allows non-iterables to be accepted, // This match allows non-iterables to be accepted,
// which is currently considered undesirable (Nov 2022). // which is currently considered undesirable (Nov 2022).