Refactor parse command (#12791)

# Description
- Switches the `excess` in the `ParserStream` and
`ParseStreamerExternal` types from a `Vec` to a `VecDeque`
- Removes unnecessary clones to `stream_helper`
- Other simplifications and loop restructuring
- Merges the `ParseStreamer` and `ParseStreamerExternal` types into a
common `ParseIter`
- `parse` now streams for list values
This commit is contained in:
Ian Manske 2024-05-08 11:50:58 +00:00 committed by GitHub
parent e462b6cd99
commit 3b26c08dab
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -1,9 +1,9 @@
use fancy_regex::Regex;
use fancy_regex::{Captures, Regex};
use nu_engine::command_prelude::*;
use nu_protocol::{ListStream, ValueIterator};
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
use nu_protocol::ListStream;
use std::{
collections::VecDeque,
sync::{atomic::AtomicBool, Arc},
};
#[derive(Clone)]
@ -119,7 +119,6 @@ fn operate(
let head = call.head;
let pattern: Spanned<String> = call.req(engine_state, stack, 0)?;
let regex: bool = call.has_flag(engine_state, stack, "regex")?;
let ctrlc = engine_state.ctrlc.clone();
let pattern_item = pattern.item;
let pattern_span = pattern.span;
@ -130,7 +129,7 @@ fn operate(
build_regex(&pattern_item, pattern_span)?
};
let regex_pattern = Regex::new(&item_to_parse).map_err(|e| ShellError::GenericError {
let regex = Regex::new(&item_to_parse).map_err(|e| ShellError::GenericError {
error: "Error with regular expression".into(),
msg: e.to_string(),
span: Some(pattern_span),
@ -138,92 +137,108 @@ fn operate(
inner: vec![],
})?;
let columns = column_names(&regex_pattern);
let columns = regex
.capture_names()
.skip(1)
.enumerate()
.map(|(i, name)| {
name.map(String::from)
.unwrap_or_else(|| format!("capture{i}"))
})
.collect::<Vec<_>>();
let ctrlc = engine_state.ctrlc.clone();
match input {
PipelineData::Empty => Ok(PipelineData::Empty),
PipelineData::Value(..) => {
let mut parsed: Vec<Value> = Vec::new();
PipelineData::Value(value, ..) => match value {
Value::String { val, .. } => {
let captures = regex
.captures_iter(&val)
.map(|captures| captures_to_value(captures, &columns, head))
.collect::<Result<_, _>>()?;
for v in input {
let v_span = v.span();
match v.coerce_into_string() {
Ok(s) => {
let results = regex_pattern.captures_iter(&s);
for c in results {
let captures = match c {
Ok(c) => c,
Err(e) => {
return Err(ShellError::GenericError {
error: "Error with regular expression captures".into(),
msg: e.to_string(),
span: None,
help: None,
inner: vec![],
})
}
};
let record = columns
.iter()
.zip(captures.iter().skip(1))
.map(|(column_name, cap)| {
let cap_string = cap.map(|v| v.as_str()).unwrap_or("");
(column_name.clone(), Value::string(cap_string, v_span))
})
.collect();
parsed.push(Value::record(record, head));
}
}
Err(_) => {
return Err(ShellError::PipelineMismatch {
exp_input_type: "string".into(),
dst_span: head,
src_span: v_span,
})
}
}
Ok(Value::list(captures, head).into_pipeline_data())
}
Value::List { vals, .. } => {
let iter = vals.into_iter().map(move |val| {
let span = val.span();
val.into_string().map_err(|_| ShellError::PipelineMismatch {
exp_input_type: "string".into(),
dst_span: head,
src_span: span,
})
});
Ok(ListStream::new(parsed.into_iter(), head, ctrlc).into())
}
let iter = ParseIter {
captures: VecDeque::new(),
regex,
columns,
iter,
span: head,
ctrlc,
};
Ok(ListStream::new(iter, head, None).into())
}
value => Err(ShellError::PipelineMismatch {
exp_input_type: "string".into(),
dst_span: head,
src_span: value.span(),
}),
},
PipelineData::ListStream(stream, ..) => Ok(stream
.modify(|stream| ParseStreamer {
span: head,
excess: Vec::new(),
regex: regex_pattern,
columns,
stream,
ctrlc,
.modify(|stream| {
let iter = stream.map(move |val| {
let span = val.span();
val.into_string().map_err(|_| ShellError::PipelineMismatch {
exp_input_type: "string".into(),
dst_span: head,
src_span: span,
})
});
ParseIter {
captures: VecDeque::new(),
regex,
columns,
iter,
span: head,
ctrlc,
}
})
.into()),
PipelineData::ExternalStream { stdout: None, .. } => Ok(PipelineData::Empty),
PipelineData::ExternalStream {
stdout: Some(stream),
..
} => Ok(ListStream::new(
ParseStreamerExternal {
span: head,
excess: Vec::new(),
regex: regex_pattern,
} => {
// Collect all `stream` chunks into a single `chunk` to be able to deal with matches that
// extend across chunk boundaries.
// This is a stop-gap solution until the `regex` crate supports streaming or an alternative
// solution is found.
// See https://github.com/nushell/nushell/issues/9795
let str = stream.into_string()?.item;
// let iter = stream.lines();
let iter = ParseIter {
captures: VecDeque::new(),
regex,
columns,
stream: stream.stream,
},
head,
ctrlc,
)
.into()),
iter: std::iter::once(Ok(str)),
span: head,
ctrlc,
};
Ok(ListStream::new(iter, head, None).into())
}
}
}
fn build_regex(input: &str, span: Span) -> Result<String, ShellError> {
let mut output = "(?s)\\A".to_string();
//let mut loop_input = input;
let mut loop_input = input.chars().peekable();
loop {
let mut before = String::new();
@ -274,172 +289,73 @@ fn build_regex(input: &str, span: Span) -> Result<String, ShellError> {
Ok(output)
}
fn column_names(regex: &Regex) -> Vec<String> {
regex
.capture_names()
.enumerate()
.skip(1)
.map(|(i, name)| {
name.map(String::from)
.unwrap_or_else(|| format!("capture{}", i - 1))
})
.collect()
}
pub struct ParseStreamer {
span: Span,
excess: Vec<Value>,
struct ParseIter<I: Iterator<Item = Result<String, ShellError>>> {
captures: VecDeque<Value>,
regex: Regex,
columns: Vec<String>,
stream: ValueIterator,
iter: I,
span: Span,
ctrlc: Option<Arc<AtomicBool>>,
}
impl Iterator for ParseStreamer {
type Item = Value;
fn next(&mut self) -> Option<Value> {
if !self.excess.is_empty() {
return Some(self.excess.remove(0));
impl<I: Iterator<Item = Result<String, ShellError>>> ParseIter<I> {
fn populate_captures(&mut self, str: &str) -> Result<(), ShellError> {
for captures in self.regex.captures_iter(str) {
self.captures
.push_back(captures_to_value(captures, &self.columns, self.span)?);
}
Ok(())
}
}
impl<I: Iterator<Item = Result<String, ShellError>>> Iterator for ParseIter<I> {
type Item = Value;
fn next(&mut self) -> Option<Value> {
loop {
if let Some(ctrlc) = &self.ctrlc {
if ctrlc.load(Ordering::SeqCst) {
break None;
}
if nu_utils::ctrl_c::was_pressed(&self.ctrlc) {
return None;
}
let v = self.stream.next()?;
let span = v.span();
if let Some(val) = self.captures.pop_front() {
return Some(val);
}
let Ok(s) = v.coerce_into_string() else {
return Some(Value::error(
ShellError::PipelineMismatch {
exp_input_type: "string".into(),
dst_span: self.span,
src_span: span,
},
span,
));
};
let result = self
.iter
.next()?
.and_then(|str| self.populate_captures(&str));
let parsed = stream_helper(
self.regex.clone(),
span,
s,
self.columns.clone(),
&mut self.excess,
);
if parsed.is_none() {
continue;
};
return parsed;
if let Err(err) = result {
return Some(Value::error(err, self.span));
}
}
}
}
pub struct ParseStreamerExternal {
fn captures_to_value(
captures: Result<Captures, fancy_regex::Error>,
columns: &[String],
span: Span,
excess: Vec<Value>,
regex: Regex,
columns: Vec<String>,
stream: Box<dyn Iterator<Item = Result<Vec<u8>, ShellError>> + Send + 'static>,
}
) -> Result<Value, ShellError> {
let captures = captures.map_err(|err| ShellError::GenericError {
error: "Error with regular expression captures".into(),
msg: err.to_string(),
span: Some(span),
help: None,
inner: vec![],
})?;
impl Iterator for ParseStreamerExternal {
type Item = Value;
fn next(&mut self) -> Option<Value> {
if !self.excess.is_empty() {
return Some(self.excess.remove(0));
}
let record = columns
.iter()
.zip(captures.iter().skip(1))
.map(|(column, match_)| {
let match_str = match_.map(|m| m.as_str()).unwrap_or("");
(column.clone(), Value::string(match_str, span))
})
.collect();
let mut chunk = self.stream.next();
// Collect all `stream` chunks into a single `chunk` to be able to deal with matches that
// extend across chunk boundaries.
// This is a stop-gap solution until the `regex` crate supports streaming or an alternative
// solution is found.
// See https://github.com/nushell/nushell/issues/9795
while let Some(Ok(chunks)) = &mut chunk {
match self.stream.next() {
Some(Ok(mut next_chunk)) => chunks.append(&mut next_chunk),
error @ Some(Err(_)) => chunk = error,
None => break,
}
}
let chunk = match chunk {
Some(Ok(chunk)) => chunk,
Some(Err(err)) => return Some(Value::error(err, self.span)),
_ => return None,
};
let Ok(chunk) = String::from_utf8(chunk) else {
return Some(Value::error(
ShellError::PipelineMismatch {
exp_input_type: "string".into(),
dst_span: self.span,
src_span: self.span,
},
self.span,
));
};
stream_helper(
self.regex.clone(),
self.span,
chunk,
self.columns.clone(),
&mut self.excess,
)
}
}
fn stream_helper(
regex: Regex,
span: Span,
s: String,
columns: Vec<String>,
excess: &mut Vec<Value>,
) -> Option<Value> {
let results = regex.captures_iter(&s);
for c in results {
let captures = match c {
Ok(c) => c,
Err(e) => {
return Some(Value::error(
ShellError::GenericError {
error: "Error with regular expression captures".into(),
msg: e.to_string(),
span: Some(span),
help: Some(e.to_string()),
inner: vec![],
},
span,
))
}
};
let record = columns
.iter()
.zip(captures.iter().skip(1))
.map(|(column_name, cap)| {
let cap_string = cap.map(|v| v.as_str()).unwrap_or("");
(column_name.clone(), Value::string(cap_string, span))
})
.collect();
excess.push(Value::record(record, span));
}
if !excess.is_empty() {
Some(excess.remove(0))
} else {
None
}
Ok(Value::record(record, span))
}
#[cfg(test)]