add streaming to get and reject (#14622)

Closes #14487.

# Description

`get` and `reject` now stream properly:

Before:

![image](https://github.com/user-attachments/assets/57ecb705-1f98-49a4-a47e-27bba1c6c732)

Now:

![image](https://github.com/user-attachments/assets/dc5c7fba-e1ef-46d2-bd78-fd777b9e9dad)

# User-Facing Changes

# Tests + Formatting

# After Submitting

---------

Co-authored-by: Wind <WindSoilder@outlook.com>
Co-authored-by: 132ikl <132@ikl.sh>
This commit is contained in:
Renan Ribeiro 2024-12-25 11:13:05 -03:00 committed by GitHub
parent 38ffcaad7b
commit 0a0475ebad
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 88 additions and 12 deletions

View file

@ -1,4 +1,5 @@
use nu_engine::command_prelude::*;
use nu_protocol::{ast::PathMember, Signals};
#[derive(Clone)]
pub struct Get;
@ -72,9 +73,13 @@ If multiple cell paths are given, this will produce a list of values."#
}
if rest.is_empty() {
input
.follow_cell_path(&cell_path.members, call.head, !sensitive)
.map(|x| x.into_pipeline_data())
follow_cell_path_into_stream(
input,
engine_state.signals().clone(),
cell_path.members,
call.head,
!sensitive,
)
} else {
let mut output = vec![];
@ -94,6 +99,7 @@ If multiple cell paths are given, this will produce a list of values."#
}
.map(|x| x.set_metadata(metadata))
}
fn examples(&self) -> Vec<Example> {
vec![
Example {
@ -139,6 +145,48 @@ If multiple cell paths are given, this will produce a list of values."#
}
}
// the PipelineData.follow_cell_path function, when given a
// stream, collects it into a vec before doing its job
//
// this is fine, since it returns a Result<Value ShellError>,
// but if we want to follow a PipelineData into a cell path and
// return another PipelineData, then we have to take care to
// make sure it streams
pub fn follow_cell_path_into_stream(
data: PipelineData,
signals: Signals,
cell_path: Vec<PathMember>,
head: Span,
insensitive: bool,
) -> Result<PipelineData, ShellError> {
// when given an integer/indexing, we fallback to
// the default nushell indexing behaviour
let has_int_member = cell_path
.iter()
.any(|it| matches!(it, PathMember::Int { .. }));
match data {
PipelineData::ListStream(stream, ..) if !has_int_member => {
let result = stream
.into_iter()
.map(move |value| {
let span = value.span();
match value.follow_cell_path(&cell_path, insensitive) {
Ok(v) => v,
Err(error) => Value::error(error, span),
}
})
.into_pipeline_data(head, signals);
Ok(result)
}
_ => data
.follow_cell_path(&cell_path, head, insensitive)
.map(|x| x.into_pipeline_data()),
}
}
#[cfg(test)]
mod tests {
use super::*;

View file

@ -166,15 +166,13 @@ impl Command for Reject {
}
fn reject(
_engine_state: &EngineState,
engine_state: &EngineState,
span: Span,
input: PipelineData,
cell_paths: Vec<CellPath>,
) -> Result<PipelineData, ShellError> {
let mut unique_rows: HashSet<usize> = HashSet::new();
let metadata = input.metadata();
let val = input.into_value(span)?;
let mut val = val;
let mut new_columns = vec![];
let mut new_rows = vec![];
for column in cell_paths {
@ -212,10 +210,37 @@ fn reject(
});
new_columns.append(&mut new_rows);
match input {
PipelineData::ListStream(stream, ..) => {
let result = stream
.into_iter()
.map(move |mut value| {
let span = value.span();
for cell_path in new_columns.iter() {
if let Err(error) = value.remove_data_at_cell_path(&cell_path.members) {
return Value::error(error, span);
}
}
value
})
.into_pipeline_data(span, engine_state.signals().clone());
Ok(result)
}
input => {
let mut val = input.into_value(span)?;
for cell_path in new_columns {
val.remove_data_at_cell_path(&cell_path.members)?;
}
Ok(val.into_pipeline_data_with_metadata(metadata))
}
}
}
#[cfg(test)]

View file

@ -56,8 +56,11 @@ fn errors_if_non_record_input() {
"
));
assert!(only_supports
assert!(
only_supports
.err
.contains("only Record input data is supported"));
.contains("only Record input data is supported")
|| only_supports.err.contains("expected: record")
);
})
}