reject now streams properly

This commit is contained in:
cosineblast 2024-12-18 16:17:55 -03:00
parent 65b8b7e36e
commit 7766b3893f

View file

@ -166,15 +166,13 @@ impl Command for Reject {
} }
fn reject( fn reject(
_engine_state: &EngineState, engine_state: &EngineState,
span: Span, span: Span,
input: PipelineData, input: PipelineData,
cell_paths: Vec<CellPath>, cell_paths: Vec<CellPath>,
) -> Result<PipelineData, ShellError> { ) -> Result<PipelineData, ShellError> {
let mut unique_rows: HashSet<usize> = HashSet::new(); let mut unique_rows: HashSet<usize> = HashSet::new();
let metadata = input.metadata(); let metadata = input.metadata();
let val = input.into_value(span)?;
let mut val = val;
let mut new_columns = vec![]; let mut new_columns = vec![];
let mut new_rows = vec![]; let mut new_rows = vec![];
for column in cell_paths { for column in cell_paths {
@ -212,10 +210,37 @@ fn reject(
}); });
new_columns.append(&mut new_rows); new_columns.append(&mut new_rows);
for cell_path in new_columns {
val.remove_data_at_cell_path(&cell_path.members)?; match input {
PipelineData::ListStream(stream, ..) => {
let result = stream
.into_iter()
.map(move |mut value| {
let span = value.span();
for cell_path in new_columns.iter() {
if let Err(error) = value.remove_data_at_cell_path(&cell_path.members) {
return Value::error(error, span);
}
}
value
})
.into_pipeline_data(span, engine_state.signals().clone());
Ok(result)
}
input => {
let mut val = input.into_value(span)?;
for cell_path in new_columns {
val.remove_data_at_cell_path(&cell_path.members)?;
}
Ok(val.into_pipeline_data_with_metadata(metadata))
}
} }
Ok(val.into_pipeline_data_with_metadata(metadata))
} }
#[cfg(test)] #[cfg(test)]