Streaming support for lines with raw streams (#4832)

This commit is contained in:
Andrew Barnes 2022-03-13 22:52:55 +11:00 committed by GitHub
parent c73d8d5f95
commit dfffd45bcd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 132 additions and 39 deletions

View file

@ -1,8 +1,8 @@
use nu_protocol::ast::Call;
use nu_protocol::engine::{Command, EngineState, Stack};
use nu_protocol::{
Category, Example, IntoInterruptiblePipelineData, PipelineData, ShellError, Signature, Span,
Value,
Category, Example, IntoInterruptiblePipelineData, PipelineData, RawStream, ShellError,
Signature, Span, Value,
};
#[derive(Clone)]
@ -26,11 +26,12 @@ impl Command for Lines {
fn run(
&self,
engine_state: &EngineState,
stack: &mut Stack,
_stack: &mut Stack,
call: &Call,
input: PipelineData,
) -> Result<nu_protocol::PipelineData, nu_protocol::ShellError> {
let head = call.head;
let ctrlc = engine_state.ctrlc.clone();
let skip_empty = call.has_flag("skip-empty");
match input {
#[allow(clippy::needless_collect)]
@ -108,41 +109,20 @@ impl Command for Lines {
}
PipelineData::Value(val, ..) => Err(ShellError::UnsupportedInput(
format!("Not supported input: {}", val.as_string()?),
call.head,
head,
)),
PipelineData::ExternalStream { .. } => {
let config = stack.get_config()?;
//FIXME: Make sure this can fail in the future to let the user
//know to use a different encoding
let s = input.collect_string("", &config)?;
let split_char = if s.contains("\r\n") { "\r\n" } else { "\n" };
#[allow(clippy::needless_collect)]
let mut lines = s
.split(split_char)
.map(|s| s.to_string())
.collect::<Vec<String>>();
// if the last one is empty, remove it, as it was just
// a newline at the end of the input we got
if let Some(last) = lines.last() {
if last.is_empty() {
lines.pop();
}
}
let iter = lines.into_iter().filter_map(move |s| {
if skip_empty && s.trim().is_empty() {
None
} else {
Some(Value::string(s, head))
}
});
Ok(iter.into_pipeline_data(engine_state.ctrlc.clone()))
}
PipelineData::ExternalStream { stdout: None, .. } => Ok(PipelineData::new(head)),
PipelineData::ExternalStream {
stdout: Some(stream),
..
} => Ok(RawStreamLinesAdapter::new(stream, head, skip_empty)
.into_iter()
.enumerate()
.map(move |(_idx, x)| match x {
Ok(x) => x,
Err(err) => Value::Error { error: err },
})
.into_pipeline_data(ctrlc)),
}
}
@ -157,3 +137,118 @@ impl Command for Lines {
}]
}
}
#[derive(Debug)]
struct RawStreamLinesAdapter {
inner: RawStream,
inner_complete: bool,
skip_empty: bool,
span: Span,
incomplete_line: String,
queue: Vec<String>,
}
impl Iterator for RawStreamLinesAdapter {
type Item = Result<Value, ShellError>;
fn next(&mut self) -> Option<Self::Item> {
loop {
if !self.queue.is_empty() {
let s = self.queue.remove(0usize);
if self.skip_empty && s.trim().is_empty() {
continue;
}
return Some(Ok(Value::String {
val: s,
span: self.span,
}));
} else {
// inner is complete, feed out remaining state
if self.inner_complete {
if !self.incomplete_line.is_empty() {
let r = Some(Ok(Value::String {
val: self.incomplete_line.to_string(),
span: self.span,
}));
self.incomplete_line = String::new();
return r;
}
return None;
}
// pull more data from inner
if let Some(result) = self.inner.next() {
match result {
Ok(v) => {
match v {
Value::String { val, span } => {
self.span = span;
let split_char =
if val.contains("\r\n") { "\r\n" } else { "\n" };
let mut lines = val
.split(split_char)
.map(|s| s.to_string())
.collect::<Vec<_>>();
// handle incomplete line from previous
if !self.incomplete_line.is_empty() {
if let Some(first) = lines.first() {
let new_incomplete_line =
self.incomplete_line.to_string() + first;
lines.splice(0..1, vec![new_incomplete_line]);
self.incomplete_line = String::new();
}
}
// store incomplete line from current
if let Some(last) = lines.last() {
if last.is_empty() {
// we ended on a line ending
lines.pop();
} else {
// incomplete line, save for next time
if let Some(s) = lines.pop() {
self.incomplete_line = s;
}
}
}
// save completed lines
self.queue.append(&mut lines);
}
// TODO: Value::Binary support required?
_ => {
return Some(Err(ShellError::UnsupportedInput(
"Unsupport type from raw stream".to_string(),
self.span,
)))
}
}
}
Err(_) => todo!(),
}
} else {
self.inner_complete = true;
}
}
}
}
}
impl RawStreamLinesAdapter {
pub fn new(inner: RawStream, span: Span, skip_empty: bool) -> Self {
Self {
inner,
span,
skip_empty,
incomplete_line: String::new(),
queue: Vec::<String>::new(),
inner_complete: false,
}
}
}

View file

@ -195,8 +195,6 @@ mod stdin_evaluation {
assert_eq!(actual.err, "");
}
// FIXME: JT: `lines` doesn't currently support this kind of streaming
#[ignore]
#[test]
fn does_not_block_indefinitely() {
let stdout = nu!(