From 7766b3893f39ae6aefa8aada57f3937babf8d8de Mon Sep 17 00:00:00 2001 From: cosineblast <55855728+cosineblast@users.noreply.github.com> Date: Wed, 18 Dec 2024 16:17:55 -0300 Subject: [PATCH] `reject` now streams properly --- crates/nu-command/src/filters/reject.rs | 37 +++++++++++++++++++++---- 1 file changed, 31 insertions(+), 6 deletions(-) diff --git a/crates/nu-command/src/filters/reject.rs b/crates/nu-command/src/filters/reject.rs index 3f966e2da5..058ad1d70b 100644 --- a/crates/nu-command/src/filters/reject.rs +++ b/crates/nu-command/src/filters/reject.rs @@ -166,15 +166,13 @@ impl Command for Reject { } fn reject( - _engine_state: &EngineState, + engine_state: &EngineState, span: Span, input: PipelineData, cell_paths: Vec, ) -> Result { let mut unique_rows: HashSet = HashSet::new(); let metadata = input.metadata(); - let val = input.into_value(span)?; - let mut val = val; let mut new_columns = vec![]; let mut new_rows = vec![]; for column in cell_paths { @@ -212,10 +210,37 @@ fn reject( }); new_columns.append(&mut new_rows); - for cell_path in new_columns { - val.remove_data_at_cell_path(&cell_path.members)?; + + match input { + PipelineData::ListStream(stream, ..) => { + let result = stream + .into_iter() + .map(move |mut value| { + let span = value.span(); + + for cell_path in new_columns.iter() { + if let Err(error) = value.remove_data_at_cell_path(&cell_path.members) { + return Value::error(error, span); + } + } + + value + }) + .into_pipeline_data(span, engine_state.signals().clone()); + + Ok(result) + } + + input => { + let mut val = input.into_value(span)?; + + for cell_path in new_columns { + val.remove_data_at_cell_path(&cell_path.members)?; + } + + Ok(val.into_pipeline_data_with_metadata(metadata)) + } } - Ok(val.into_pipeline_data_with_metadata(metadata)) } #[cfg(test)]