Fix buffering in lines command (#2111)

This commit is contained in:
Jonathan Turner 2020-07-04 17:20:58 -07:00 committed by GitHub
parent 04120e00e4
commit bbc5a28fe9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 40 additions and 37 deletions

View file

@ -2,6 +2,7 @@ use crate::commands::WholeStreamCommand;
use crate::prelude::*; use crate::prelude::*;
use nu_errors::ShellError; use nu_errors::ShellError;
use nu_protocol::{Primitive, ReturnSuccess, Signature, UntaggedValue, Value}; use nu_protocol::{Primitive, ReturnSuccess, Signature, UntaggedValue, Value};
use parking_lot::Mutex;
pub struct Lines; pub struct Lines;
@ -47,8 +48,7 @@ fn ends_with_line_ending(st: &str) -> bool {
} }
async fn lines(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { async fn lines(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let leftover = Arc::new(vec![]); let leftover_string = Arc::new(Mutex::new(String::new()));
let leftover_string = Arc::new(String::new());
let registry = registry.clone(); let registry = registry.clone();
let args = args.evaluate_once(&registry).await?; let args = args.evaluate_once(&registry).await?;
let tag = args.name_tag(); let tag = args.name_tag();
@ -62,31 +62,27 @@ async fn lines(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputSt
.input .input
.chain(eos) .chain(eos)
.map(move |item| { .map(move |item| {
let mut leftover = leftover.clone(); let leftover_string = leftover_string.clone();
let mut leftover_string = leftover_string.clone();
match item { match item {
Value { Value {
value: UntaggedValue::Primitive(Primitive::String(st)), value: UntaggedValue::Primitive(Primitive::String(st)),
.. ..
} => { } => {
let mut leftover_string = leftover_string.lock();
let st = (&*leftover_string).clone() + &st; let st = (&*leftover_string).clone() + &st;
if let Some(leftover) = Arc::get_mut(&mut leftover) {
leftover.clear();
}
let mut lines: Vec<String> = st.lines().map(|x| x.to_string()).collect(); let mut lines: Vec<String> = st.lines().map(|x| x.to_string()).collect();
if !ends_with_line_ending(&st) { if !ends_with_line_ending(&st) {
if let Some(last) = lines.pop() { if let Some(last) = lines.pop() {
if let Some(leftover_string) = Arc::get_mut(&mut leftover_string) { leftover_string.clear();
leftover_string.clear(); leftover_string.push_str(&last);
leftover_string.push_str(&last); } else {
}
} else if let Some(leftover_string) = Arc::get_mut(&mut leftover_string) {
leftover_string.clear(); leftover_string.clear();
} }
} else if let Some(leftover_string) = Arc::get_mut(&mut leftover_string) { } else {
leftover_string.clear(); leftover_string.clear();
} }
@ -101,21 +97,20 @@ async fn lines(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputSt
value: UntaggedValue::Primitive(Primitive::Line(st)), value: UntaggedValue::Primitive(Primitive::Line(st)),
.. ..
} => { } => {
let mut leftover_string = leftover_string.lock();
let st = (&*leftover_string).clone() + &st; let st = (&*leftover_string).clone() + &st;
if let Some(leftover) = Arc::get_mut(&mut leftover) {
leftover.clear();
}
let mut lines: Vec<String> = st.lines().map(|x| x.to_string()).collect(); let mut lines: Vec<String> = st.lines().map(|x| x.to_string()).collect();
if !ends_with_line_ending(&st) { if !ends_with_line_ending(&st) {
if let Some(last) = lines.pop() { if let Some(last) = lines.pop() {
if let Some(leftover_string) = Arc::get_mut(&mut leftover_string) { leftover_string.clear();
leftover_string.clear(); leftover_string.push_str(&last);
leftover_string.push_str(&last); } else {
}
} else if let Some(leftover_string) = Arc::get_mut(&mut leftover_string) {
leftover_string.clear(); leftover_string.clear();
} }
} else if let Some(leftover_string) = Arc::get_mut(&mut leftover_string) { } else {
leftover_string.clear(); leftover_string.clear();
} }
@ -123,27 +118,18 @@ async fn lines(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputSt
.iter() .iter()
.map(|x| ReturnSuccess::value(UntaggedValue::line(x).into_untagged_value())) .map(|x| ReturnSuccess::value(UntaggedValue::line(x).into_untagged_value()))
.collect(); .collect();
futures::stream::iter(success_lines) futures::stream::iter(success_lines)
} }
Value { Value {
value: UntaggedValue::Primitive(Primitive::EndOfStream), value: UntaggedValue::Primitive(Primitive::EndOfStream),
.. ..
} => { } => {
if !leftover.is_empty() { let st = (&*leftover_string).lock().clone();
let mut st = (&*leftover_string).clone(); if !st.is_empty() {
if let Ok(extra) = String::from_utf8((&*leftover).clone()) { futures::stream::iter(vec![ReturnSuccess::value(
st.push_str(&extra); UntaggedValue::string(st).into_untagged_value(),
} )])
// futures::stream::iter(vec![ReturnSuccess::value(
// UntaggedValue::string(st).into_untagged_value(),
// )])
if !st.is_empty() {
futures::stream::iter(vec![ReturnSuccess::value(
UntaggedValue::string(&*leftover_string).into_untagged_value(),
)])
} else {
futures::stream::iter(vec![])
}
} else { } else {
futures::stream::iter(vec![]) futures::stream::iter(vec![])
} }

View file

@ -19,3 +19,18 @@ fn lines() {
assert_eq!(actual.out, "rustyline"); assert_eq!(actual.out, "rustyline");
} }
#[test]
fn lines_proper_buffering() {
let actual = nu!(
cwd: "tests/fixtures/formats", pipeline(
r#"
open lines_test.txt -r
| lines
| str length
| to json
"#
));
assert_eq!(actual.out, "[8194,4]");
}

2
tests/fixtures/formats/lines_test.txt vendored Normal file

File diff suppressed because one or more lines are too long