Move to bytes/string hybrid codec (#1457)

* WIP: move to bytes codec

* Progress on adding collect helpers

* Progress on adding collect helpers

* Add in line splitting back to lines

* Lines outputting line primitives

* Close to ready?

* Finish fixing lines

* clippy fixes

* fmt fixes

* removed unused code

* Cleanup a few bits

* Cleanup a few bits

* Cleanup a few more bits

* Fix failing test with corrected test case
This commit is contained in:
Jonathan Turner 2020-03-07 05:06:39 +13:00 committed by GitHub
parent 287652573b
commit 8925ca5da3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 475 additions and 524 deletions

2
Cargo.lock generated
View file

@ -2251,7 +2251,7 @@ dependencies = [
"bigdecimal", "bigdecimal",
"bson", "bson",
"byte-unit", "byte-unit",
"bytes 0.4.12", "bytes 0.5.4",
"calamine", "calamine",
"cfg-if", "cfg-if",
"chrono", "chrono",

View file

@ -25,7 +25,7 @@ base64 = "0.11"
bigdecimal = { version = "0.1.0", features = ["serde"] } bigdecimal = { version = "0.1.0", features = ["serde"] }
bson = { version = "0.14.0", features = ["decimal128"] } bson = { version = "0.14.0", features = ["decimal128"] }
byte-unit = "3.0.3" byte-unit = "3.0.3"
bytes = "0.4.12" bytes = "0.5.4"
calamine = "0.16" calamine = "0.16"
cfg-if = "0.1" cfg-if = "0.1"
chrono = { version = "0.4.10", features = ["serde"] } chrono = { version = "0.4.10", features = ["serde"] }

View file

@ -1,3 +1,4 @@
use crate::commands::classified::external::{MaybeTextCodec, StringOrBinary};
use crate::commands::classified::pipeline::run_pipeline; use crate::commands::classified::pipeline::run_pipeline;
use crate::commands::plugin::JsonRpc; use crate::commands::plugin::JsonRpc;
use crate::commands::plugin::{PluginCommand, PluginSink}; use crate::commands::plugin::{PluginCommand, PluginSink};
@ -6,7 +7,8 @@ use crate::context::Context;
#[cfg(not(feature = "starship-prompt"))] #[cfg(not(feature = "starship-prompt"))]
use crate::git::current_branch; use crate::git::current_branch;
use crate::prelude::*; use crate::prelude::*;
use futures_codec::{FramedRead, LinesCodec}; use futures_codec::FramedRead;
use nu_errors::ShellError; use nu_errors::ShellError;
use nu_parser::{ClassifiedPipeline, PipelineShape, SpannedToken, TokensIterator}; use nu_parser::{ClassifiedPipeline, PipelineShape, SpannedToken, TokensIterator};
use nu_protocol::{Primitive, ReturnSuccess, Signature, UntaggedValue, Value}; use nu_protocol::{Primitive, ReturnSuccess, Signature, UntaggedValue, Value};
@ -620,15 +622,21 @@ async fn process_line(
} }
let input_stream = if redirect_stdin { let input_stream = if redirect_stdin {
let file = futures::io::AllowStdIo::new( let file = futures::io::AllowStdIo::new(std::io::stdin());
crate::commands::classified::external::StdoutWithNewline::new(std::io::stdin()), let stream = FramedRead::new(file, MaybeTextCodec).map(|line| {
);
let stream = FramedRead::new(file, LinesCodec).map(|line| {
if let Ok(line) = line { if let Ok(line) = line {
Ok(Value { match line {
value: UntaggedValue::Primitive(Primitive::String(line)), StringOrBinary::String(s) => Ok(Value {
value: UntaggedValue::Primitive(Primitive::String(s)),
tag: Tag::unknown(), tag: Tag::unknown(),
}) }),
StringOrBinary::Binary(b) => Ok(Value {
value: UntaggedValue::Primitive(Primitive::Binary(
b.into_iter().collect(),
)),
tag: Tag::unknown(),
}),
}
} else { } else {
panic!("Internal error: could not read lines of text from stdin") panic!("Internal error: could not read lines of text from stdin")
} }

View file

@ -92,6 +92,7 @@ pub fn autoview(context: RunnableContext) -> Result<OutputStream, ShellError> {
} }
}; };
let stream = stream.to_input_stream(); let stream = stream.to_input_stream();
if let Some(table) = table { if let Some(table) = table {
let command_args = create_default_command_args(&context).with_input(stream); let command_args = create_default_command_args(&context).with_input(stream);
let result = table.run(command_args, &context.commands); let result = table.run(command_args, &context.commands);

View file

@ -1,8 +1,9 @@
use crate::futures::ThreadedReceiver; use crate::futures::ThreadedReceiver;
use crate::prelude::*; use crate::prelude::*;
use bytes::{BufMut, Bytes, BytesMut};
use futures::executor::block_on_stream; use futures::executor::block_on_stream;
use futures::stream::StreamExt; use futures::stream::StreamExt;
use futures_codec::{FramedRead, LinesCodec}; use futures_codec::FramedRead;
use log::trace; use log::trace;
use nu_errors::ShellError; use nu_errors::ShellError;
use nu_parser::commands::classified::external::ExternalArg; use nu_parser::commands::classified::external::ExternalArg;
@ -15,6 +16,70 @@ use std::ops::Deref;
use std::process::{Command, Stdio}; use std::process::{Command, Stdio};
use std::sync::mpsc; use std::sync::mpsc;
pub enum StringOrBinary {
String(String),
Binary(Vec<u8>),
}
pub struct MaybeTextCodec;
impl futures_codec::Encoder for MaybeTextCodec {
type Item = StringOrBinary;
type Error = std::io::Error;
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
match item {
StringOrBinary::String(s) => {
dst.reserve(s.len());
dst.put(s.as_bytes());
Ok(())
}
StringOrBinary::Binary(b) => {
dst.reserve(b.len());
dst.put(Bytes::from(b));
Ok(())
}
}
}
}
impl futures_codec::Decoder for MaybeTextCodec {
type Item = StringOrBinary;
type Error = std::io::Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
let v: Vec<u8> = src.to_vec();
match String::from_utf8(v) {
Ok(s) => {
src.clear();
if s.is_empty() {
Ok(None)
} else {
Ok(Some(StringOrBinary::String(s)))
}
}
Err(err) => {
// Note: the longest UTF-8 character per Unicode spec is currently 6 bytes. If we fail somewhere earlier than the last 6 bytes,
// we know that we're failing to understand the string encoding and not just seeing a partial character. When this happens, let's
// fall back to assuming it's a binary buffer.
if src.is_empty() {
Ok(None)
} else if src.len() > 6 && (src.len() - err.utf8_error().valid_up_to() > 6) {
// Fall back to assuming binary
let buf = src.to_vec();
src.clear();
Ok(Some(StringOrBinary::Binary(buf)))
} else {
// Looks like a utf-8 string, so let's assume that
let buf = src.split_to(err.utf8_error().valid_up_to() + 1);
String::from_utf8(buf.to_vec())
.map(|x| Some(StringOrBinary::String(x)))
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
}
}
}
}
}
pub fn nu_value_to_string(command: &ExternalCommand, from: &Value) -> Result<String, ShellError> { pub fn nu_value_to_string(command: &ExternalCommand, from: &Value) -> Result<String, ShellError> {
match &from.value { match &from.value {
UntaggedValue::Primitive(Primitive::Int(i)) => Ok(i.to_string()), UntaggedValue::Primitive(Primitive::Int(i)) => Ok(i.to_string()),
@ -29,25 +94,6 @@ pub fn nu_value_to_string(command: &ExternalCommand, from: &Value) -> Result<Str
} }
} }
pub fn nu_value_to_string_for_stdin(
name_tag: &Tag,
from: &Value,
) -> Result<Option<String>, ShellError> {
match &from.value {
UntaggedValue::Primitive(Primitive::Nothing) => Ok(None),
UntaggedValue::Primitive(Primitive::String(s))
| UntaggedValue::Primitive(Primitive::Line(s)) => Ok(Some(s.clone())),
unsupported => Err(ShellError::labeled_error(
format!(
"Received unexpected type from pipeline ({})",
unsupported.type_name()
),
"expected a string",
name_tag,
)),
}
}
pub(crate) fn run_external_command( pub(crate) fn run_external_command(
command: ExternalCommand, command: ExternalCommand,
context: &mut Context, context: &mut Context,
@ -382,45 +428,6 @@ fn run_with_stdin(
spawn(&command, &path, &process_args[..], input, is_last) spawn(&command, &path, &process_args[..], input, is_last)
} }
/// This is a wrapper for stdout-like readers that ensure a carriage return ends the stream
pub struct StdoutWithNewline<T: std::io::Read> {
stdout: T,
ended_in_newline: bool,
}
impl<T: std::io::Read> StdoutWithNewline<T> {
pub fn new(stdout: T) -> StdoutWithNewline<T> {
StdoutWithNewline {
stdout,
ended_in_newline: false,
}
}
}
impl<T: std::io::Read> std::io::Read for StdoutWithNewline<T> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
match self.stdout.read(buf) {
Err(e) => Err(e),
Ok(0) => {
if !self.ended_in_newline && !buf.is_empty() {
self.ended_in_newline = true;
buf[0] = b'\n';
Ok(1)
} else {
Ok(0)
}
}
Ok(len) => {
if buf[len - 1] == b'\n' {
self.ended_in_newline = true;
} else {
self.ended_in_newline = false;
}
Ok(len)
}
}
}
}
fn spawn( fn spawn(
command: &ExternalCommand, command: &ExternalCommand,
path: &str, path: &str,
@ -487,19 +494,11 @@ fn spawn(
.expect("Internal error: could not get stdin pipe for external command"); .expect("Internal error: could not get stdin pipe for external command");
for value in block_on_stream(input) { for value in block_on_stream(input) {
let input_string = match nu_value_to_string_for_stdin(&stdin_name_tag, &value) { match &value.value {
Ok(None) => continue, UntaggedValue::Primitive(Primitive::Nothing) => continue,
Ok(Some(v)) => v, UntaggedValue::Primitive(Primitive::String(s))
Err(e) => { | UntaggedValue::Primitive(Primitive::Line(s)) => {
let _ = stdin_write_tx.send(Ok(Value { if let Err(e) = stdin_write.write(s.as_bytes()) {
value: UntaggedValue::Error(e),
tag: stdin_name_tag,
}));
return Err(());
}
};
if let Err(e) = stdin_write.write(input_string.as_bytes()) {
let message = format!("Unable to write to stdin (error = {})", e); let message = format!("Unable to write to stdin (error = {})", e);
let _ = stdin_write_tx.send(Ok(Value { let _ = stdin_write_tx.send(Ok(Value {
@ -513,6 +512,37 @@ fn spawn(
return Err(()); return Err(());
} }
} }
UntaggedValue::Primitive(Primitive::Binary(b)) => {
if let Err(e) = stdin_write.write(b) {
let message = format!("Unable to write to stdin (error = {})", e);
let _ = stdin_write_tx.send(Ok(Value {
value: UntaggedValue::Error(ShellError::labeled_error(
message,
"application may have closed before completing pipeline",
&stdin_name_tag,
)),
tag: stdin_name_tag,
}));
return Err(());
}
}
unsupported => {
let _ = stdin_write_tx.send(Ok(Value {
value: UntaggedValue::Error(ShellError::labeled_error(
format!(
"Received unexpected type from pipeline ({})",
unsupported.type_name()
),
"expected a string",
stdin_name_tag.clone(),
)),
tag: stdin_name_tag,
}));
return Err(());
}
};
}
} }
Ok(()) Ok(())
@ -534,23 +564,39 @@ fn spawn(
return Err(()); return Err(());
}; };
let file = futures::io::AllowStdIo::new(StdoutWithNewline::new(stdout)); let file = futures::io::AllowStdIo::new(stdout);
let stream = FramedRead::new(file, LinesCodec); let stream = FramedRead::new(file, MaybeTextCodec);
for line in block_on_stream(stream) { for line in block_on_stream(stream) {
if let Ok(line) = line { match line {
Ok(line) => match line {
StringOrBinary::String(s) => {
let result = stdout_read_tx.send(Ok(Value { let result = stdout_read_tx.send(Ok(Value {
value: UntaggedValue::Primitive(Primitive::Line(line)), value: UntaggedValue::Primitive(Primitive::String(s.clone())),
tag: stdout_name_tag.clone(), tag: stdout_name_tag.clone(),
})); }));
if result.is_err() { if result.is_err() {
break; break;
} }
} else { }
StringOrBinary::Binary(b) => {
let result = stdout_read_tx.send(Ok(Value {
value: UntaggedValue::Primitive(Primitive::Binary(
b.into_iter().collect(),
)),
tag: stdout_name_tag.clone(),
}));
if result.is_err() {
break;
}
}
},
Err(_) => {
let _ = stdout_read_tx.send(Ok(Value { let _ = stdout_read_tx.send(Ok(Value {
value: UntaggedValue::Error(ShellError::labeled_error( value: UntaggedValue::Error(ShellError::labeled_error(
"Unable to read lines from stdout. This usually happens when the output does not end with a newline.", "Unable to read from stdout.",
"unable to read from stdout", "unable to read from stdout",
&stdout_name_tag, &stdout_name_tag,
)), )),
@ -560,6 +606,7 @@ fn spawn(
} }
} }
} }
}
// We can give an error when we see a non-zero exit code, but this is different // We can give an error when we see a non-zero exit code, but this is different
// than what other shells will do. // than what other shells will do.

View file

@ -205,13 +205,9 @@ fn from_bson(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStre
let input = args.input; let input = args.input;
let stream = async_stream! { let stream = async_stream! {
let values: Vec<Value> = input.values.collect().await; let bytes = input.collect_binary(tag.clone()).await?;
for value in values { match from_bson_bytes_to_value(bytes.item, tag.clone()) {
let value_tag = &value.tag;
match value.value {
UntaggedValue::Primitive(Primitive::Binary(vb)) =>
match from_bson_bytes_to_value(vb, tag.clone()) {
Ok(x) => yield ReturnSuccess::value(x), Ok(x) => yield ReturnSuccess::value(x),
Err(_) => { Err(_) => {
yield Err(ShellError::labeled_error_with_secondary( yield Err(ShellError::labeled_error_with_secondary(
@ -219,20 +215,10 @@ fn from_bson(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStre
"input cannot be parsed as BSON", "input cannot be parsed as BSON",
tag.clone(), tag.clone(),
"value originates from here", "value originates from here",
value_tag, bytes.tag,
)) ))
} }
} }
_ => yield Err(ShellError::labeled_error_with_secondary(
"Expected a string from pipeline",
"requires string input",
tag.clone(),
"value originates from here",
value_tag,
)),
}
}
}; };
Ok(stream.to_output_stream()) Ok(stream.to_output_stream())

View file

@ -47,28 +47,9 @@ pub fn from_delimited_data(
let name_tag = name; let name_tag = name;
let stream = async_stream! { let stream = async_stream! {
let values: Vec<Value> = input.values.collect().await; let concat_string = input.collect_string(name_tag.clone()).await?;
let mut concat_string = String::new(); match from_delimited_string_to_value(concat_string.item, headerless, sep, name_tag.clone()) {
let mut latest_tag: Option<Tag> = None;
for value in values {
let value_tag = &value.tag;
latest_tag = Some(value_tag.clone());
if let Ok(s) = value.as_string() {
concat_string.push_str(&s);
} else {
yield Err(ShellError::labeled_error_with_secondary(
"Expected a string from pipeline",
"requires string input",
name_tag.clone(),
"value originates from here",
value_tag.clone(),
))
}
}
match from_delimited_string_to_value(concat_string, headerless, sep, name_tag.clone()) {
Ok(x) => match x { Ok(x) => match x {
Value { value: UntaggedValue::Table(list), .. } => { Value { value: UntaggedValue::Table(list), .. } => {
for l in list { for l in list {
@ -77,7 +58,7 @@ pub fn from_delimited_data(
} }
x => yield ReturnSuccess::value(x), x => yield ReturnSuccess::value(x),
}, },
Err(_) => if let Some(last_tag) = latest_tag { Err(_) => {
let line_one = format!("Could not parse as {}", format_name); let line_one = format!("Could not parse as {}", format_name);
let line_two = format!("input cannot be parsed as {}", format_name); let line_two = format!("input cannot be parsed as {}", format_name);
yield Err(ShellError::labeled_error_with_secondary( yield Err(ShellError::labeled_error_with_secondary(
@ -85,7 +66,7 @@ pub fn from_delimited_data(
line_two, line_two,
name_tag.clone(), name_tag.clone(),
"value originates from here", "value originates from here",
last_tag.clone(), concat_string.tag,
)) ))
} , } ,
} }

View file

@ -66,32 +66,12 @@ pub fn from_ini_string_to_value(
fn from_ini(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { fn from_ini(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let args = args.evaluate_once(registry)?; let args = args.evaluate_once(registry)?;
let tag = args.name_tag(); let tag = args.name_tag();
let span = tag.span;
let input = args.input; let input = args.input;
let stream = async_stream! { let stream = async_stream! {
let values: Vec<Value> = input.values.collect().await; let concat_string = input.collect_string(tag.clone()).await?;
let mut concat_string = String::new(); match from_ini_string_to_value(concat_string.item, tag.clone()) {
let mut latest_tag: Option<Tag> = None;
for value in values {
latest_tag = Some(value.tag.clone());
let value_span = value.tag.span;
if let Ok(s) = value.as_string() {
concat_string.push_str(&s);
} else {
yield Err(ShellError::labeled_error_with_secondary(
"Expected a string from pipeline",
"requires string input",
span,
"value originates from here",
value_span,
))
}
}
match from_ini_string_to_value(concat_string, tag.clone()) {
Ok(x) => match x { Ok(x) => match x {
Value { value: UntaggedValue::Table(list), .. } => { Value { value: UntaggedValue::Table(list), .. } => {
for l in list { for l in list {
@ -100,15 +80,15 @@ fn from_ini(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStrea
} }
x => yield ReturnSuccess::value(x), x => yield ReturnSuccess::value(x),
}, },
Err(_) => if let Some(last_tag) = latest_tag { Err(_) => {
yield Err(ShellError::labeled_error_with_secondary( yield Err(ShellError::labeled_error_with_secondary(
"Could not parse as INI", "Could not parse as INI",
"input cannot be parsed as INI", "input cannot be parsed as INI",
&tag, &tag,
"value originates from here", "value originates from here",
last_tag, concat_string.tag,
)) ))
} , }
} }
}; };

View file

@ -74,35 +74,13 @@ fn from_json(
FromJSONArgs { objects }: FromJSONArgs, FromJSONArgs { objects }: FromJSONArgs,
RunnableContext { input, name, .. }: RunnableContext, RunnableContext { input, name, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
let name_span = name.span;
let name_tag = name; let name_tag = name;
let stream = async_stream! { let stream = async_stream! {
let values: Vec<Value> = input.values.collect().await; let concat_string = input.collect_string(name_tag.clone()).await?;
let mut concat_string = String::new();
let mut latest_tag: Option<Tag> = None;
for value in values {
latest_tag = Some(value.tag.clone());
let value_span = value.tag.span;
if let Ok(s) = value.as_string() {
concat_string.push_str(&s);
} else {
yield Err(ShellError::labeled_error_with_secondary(
"Expected a string from pipeline",
"requires string input",
name_span,
"value originates from here",
value_span,
))
}
}
if objects { if objects {
for json_str in concat_string.lines() { for json_str in concat_string.item.lines() {
if json_str.is_empty() { if json_str.is_empty() {
continue; continue;
} }
@ -111,7 +89,6 @@ fn from_json(
Ok(x) => Ok(x) =>
yield ReturnSuccess::value(x), yield ReturnSuccess::value(x),
Err(e) => { Err(e) => {
if let Some(ref last_tag) = latest_tag {
let mut message = "Could not parse as JSON (".to_string(); let mut message = "Could not parse as JSON (".to_string();
message.push_str(&e.to_string()); message.push_str(&e.to_string());
message.push_str(")"); message.push_str(")");
@ -121,13 +98,12 @@ fn from_json(
"input cannot be parsed as JSON", "input cannot be parsed as JSON",
&name_tag, &name_tag,
"value originates from here", "value originates from here",
last_tag)) concat_string.tag.clone()))
}
} }
} }
} }
} else { } else {
match from_json_string_to_value(concat_string, name_tag.clone()) { match from_json_string_to_value(concat_string.item, name_tag.clone()) {
Ok(x) => Ok(x) =>
match x { match x {
Value { value: UntaggedValue::Table(list), .. } => { Value { value: UntaggedValue::Table(list), .. } => {
@ -138,7 +114,6 @@ fn from_json(
x => yield ReturnSuccess::value(x), x => yield ReturnSuccess::value(x),
} }
Err(e) => { Err(e) => {
if let Some(last_tag) = latest_tag {
let mut message = "Could not parse as JSON (".to_string(); let mut message = "Could not parse as JSON (".to_string();
message.push_str(&e.to_string()); message.push_str(&e.to_string());
message.push_str(")"); message.push_str(")");
@ -148,8 +123,7 @@ fn from_json(
"input cannot be parsed as JSON", "input cannot be parsed as JSON",
name_tag, name_tag,
"value originates from here", "value originates from here",
last_tag)) concat_string.tag))
}
} }
} }
} }

View file

@ -3,7 +3,7 @@ use crate::prelude::*;
use crate::TaggedListBuilder; use crate::TaggedListBuilder;
use calamine::*; use calamine::*;
use nu_errors::ShellError; use nu_errors::ShellError;
use nu_protocol::{Primitive, ReturnSuccess, Signature, TaggedDictBuilder, UntaggedValue, Value}; use nu_protocol::{ReturnSuccess, Signature, TaggedDictBuilder, UntaggedValue};
use std::io::Cursor; use std::io::Cursor;
pub struct FromODS; pub struct FromODS;
@ -49,15 +49,8 @@ fn from_ods(
let tag = runnable_context.name; let tag = runnable_context.name;
let stream = async_stream! { let stream = async_stream! {
let values: Vec<Value> = input.values.collect().await; let bytes = input.collect_binary(tag.clone()).await?;
let mut buf: Cursor<Vec<u8>> = Cursor::new(bytes.item);
for value in values {
let value_span = value.tag.span;
let value_tag = value.tag.clone();
match value.value {
UntaggedValue::Primitive(Primitive::Binary(vb)) => {
let mut buf: Cursor<Vec<u8>> = Cursor::new(vb);
let mut ods = Ods::<_>::new(buf).map_err(|_| ShellError::labeled_error( let mut ods = Ods::<_>::new(buf).map_err(|_| ShellError::labeled_error(
"Could not load ods file", "Could not load ods file",
"could not load ods file", "could not load ods file",
@ -99,17 +92,6 @@ fn from_ods(
} }
yield ReturnSuccess::value(dict.into_value()); yield ReturnSuccess::value(dict.into_value());
}
_ => yield Err(ShellError::labeled_error_with_secondary(
"Expected binary data from pipeline",
"requires binary data input",
&tag,
"value originates from here",
value_tag,
)),
}
}
}; };
Ok(stream.to_output_stream()) Ok(stream.to_output_stream())

View file

@ -138,13 +138,8 @@ fn from_sqlite(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputSt
let input = args.input; let input = args.input;
let stream = async_stream! { let stream = async_stream! {
let values: Vec<Value> = input.values.collect().await; let bytes = input.collect_binary(tag.clone()).await?;
match from_sqlite_bytes_to_value(bytes.item, tag.clone()) {
for value in values {
let value_tag = &value.tag;
match value.value {
UntaggedValue::Primitive(Primitive::Binary(vb)) =>
match from_sqlite_bytes_to_value(vb, tag.clone()) {
Ok(x) => match x { Ok(x) => match x {
Value { value: UntaggedValue::Table(list), .. } => { Value { value: UntaggedValue::Table(list), .. } => {
for l in list { for l in list {
@ -160,20 +155,10 @@ fn from_sqlite(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputSt
"input cannot be parsed as SQLite", "input cannot be parsed as SQLite",
&tag, &tag,
"value originates from here", "value originates from here",
value_tag, bytes.tag,
)) ))
} }
} }
_ => yield Err(ShellError::labeled_error_with_secondary(
"Expected binary data from pipeline",
"requires binary data input",
&tag,
"value originates from here",
value_tag,
)),
}
}
}; };
Ok(stream.to_output_stream()) Ok(stream.to_output_stream())

View file

@ -259,45 +259,26 @@ fn from_ssv(
RunnableContext { input, name, .. }: RunnableContext, RunnableContext { input, name, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
let stream = async_stream! { let stream = async_stream! {
let values: Vec<Value> = input.values.collect().await; let concat_string = input.collect_string(name.clone()).await?;
let mut concat_string = String::new();
let mut latest_tag: Option<Tag> = None;
let split_at = match minimum_spaces { let split_at = match minimum_spaces {
Some(number) => number.item, Some(number) => number.item,
None => DEFAULT_MINIMUM_SPACES None => DEFAULT_MINIMUM_SPACES
}; };
for value in values { match from_ssv_string_to_value(&concat_string.item, headerless, aligned_columns, split_at, name.clone()) {
let value_tag = value.tag.clone();
latest_tag = Some(value_tag.clone());
if let Ok(s) = value.as_string() {
concat_string.push_str(&s);
}
else {
yield Err(ShellError::labeled_error_with_secondary (
"Expected a string from pipeline",
"requires string input",
&name,
"value originates from here",
&value_tag
))
}
}
match from_ssv_string_to_value(&concat_string, headerless, aligned_columns, split_at, name.clone()) {
Some(x) => match x { Some(x) => match x {
Value { value: UntaggedValue::Table(list), ..} => { Value { value: UntaggedValue::Table(list), ..} => {
for l in list { yield ReturnSuccess::value(l) } for l in list { yield ReturnSuccess::value(l) }
} }
x => yield ReturnSuccess::value(x) x => yield ReturnSuccess::value(x)
}, },
None => if let Some(tag) = latest_tag { None => {
yield Err(ShellError::labeled_error_with_secondary( yield Err(ShellError::labeled_error_with_secondary(
"Could not parse as SSV", "Could not parse as SSV",
"input cannot be parsed ssv", "input cannot be parsed ssv",
&name, &name,
"value originates from here", "value originates from here",
&tag, &concat_string.tag,
)) ))
}, },
} }

View file

@ -69,33 +69,11 @@ pub fn from_toml(
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
let args = args.evaluate_once(registry)?; let args = args.evaluate_once(registry)?;
let tag = args.name_tag(); let tag = args.name_tag();
let name_span = tag.span;
let input = args.input; let input = args.input;
let stream = async_stream! { let stream = async_stream! {
let values: Vec<Value> = input.values.collect().await; let concat_string = input.collect_string(tag.clone()).await?;
match from_toml_string_to_value(concat_string.item, tag.clone()) {
let mut concat_string = String::new();
let mut latest_tag: Option<Tag> = None;
for value in values {
latest_tag = Some(value.tag.clone());
let value_span = value.tag.span;
if let Ok(s) = value.as_string() {
concat_string.push_str(&s);
}
else {
yield Err(ShellError::labeled_error_with_secondary(
"Expected a string from pipeline",
"requires string input",
name_span,
"value originates from here",
value_span,
))
}
}
match from_toml_string_to_value(concat_string, tag.clone()) {
Ok(x) => match x { Ok(x) => match x {
Value { value: UntaggedValue::Table(list), .. } => { Value { value: UntaggedValue::Table(list), .. } => {
for l in list { for l in list {
@ -104,15 +82,15 @@ pub fn from_toml(
} }
x => yield ReturnSuccess::value(x), x => yield ReturnSuccess::value(x),
}, },
Err(_) => if let Some(last_tag) = latest_tag { Err(_) => {
yield Err(ShellError::labeled_error_with_secondary( yield Err(ShellError::labeled_error_with_secondary(
"Could not parse as TOML", "Could not parse as TOML",
"input cannot be parsed as TOML", "input cannot be parsed as TOML",
&tag, &tag,
"value originates from here", "value originates from here",
last_tag, concat_string.tag,
)) ))
} , }
} }
}; };

View file

@ -1,7 +1,7 @@
use crate::commands::WholeStreamCommand; use crate::commands::WholeStreamCommand;
use crate::prelude::*; use crate::prelude::*;
use nu_errors::ShellError; use nu_errors::ShellError;
use nu_protocol::{ReturnSuccess, Signature, TaggedDictBuilder, UntaggedValue, Value}; use nu_protocol::{ReturnSuccess, Signature, TaggedDictBuilder, UntaggedValue};
pub struct FromURL; pub struct FromURL;
@ -30,32 +30,12 @@ impl WholeStreamCommand for FromURL {
fn from_url(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { fn from_url(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let args = args.evaluate_once(registry)?; let args = args.evaluate_once(registry)?;
let tag = args.name_tag(); let tag = args.name_tag();
let name_span = tag.span;
let input = args.input; let input = args.input;
let stream = async_stream! { let stream = async_stream! {
let values: Vec<Value> = input.values.collect().await; let concat_string = input.collect_string(tag.clone()).await?;
let mut concat_string = String::new(); let result = serde_urlencoded::from_str::<Vec<(String, String)>>(&concat_string.item);
let mut latest_tag: Option<Tag> = None;
for value in values {
latest_tag = Some(value.tag.clone());
let value_span = value.tag.span;
if let Ok(s) = value.as_string() {
concat_string.push_str(&s);
} else {
yield Err(ShellError::labeled_error_with_secondary(
"Expected a string from pipeline",
"requires string input",
name_span,
"value originates from here",
value_span,
))
}
}
let result = serde_urlencoded::from_str::<Vec<(String, String)>>(&concat_string);
match result { match result {
Ok(result) => { Ok(result) => {
@ -68,17 +48,15 @@ fn from_url(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStrea
yield ReturnSuccess::value(row.into_value()); yield ReturnSuccess::value(row.into_value());
} }
_ => { _ => {
if let Some(last_tag) = latest_tag {
yield Err(ShellError::labeled_error_with_secondary( yield Err(ShellError::labeled_error_with_secondary(
"String not compatible with url-encoding", "String not compatible with url-encoding",
"input not url-encoded", "input not url-encoded",
tag, tag,
"value originates from here", "value originates from here",
last_tag, concat_string.tag,
)); ));
} }
} }
}
}; };
Ok(stream.to_output_stream()) Ok(stream.to_output_stream())

View file

@ -3,7 +3,7 @@ use crate::prelude::*;
use crate::TaggedListBuilder; use crate::TaggedListBuilder;
use calamine::*; use calamine::*;
use nu_errors::ShellError; use nu_errors::ShellError;
use nu_protocol::{Primitive, ReturnSuccess, Signature, TaggedDictBuilder, UntaggedValue, Value}; use nu_protocol::{ReturnSuccess, Signature, TaggedDictBuilder, UntaggedValue};
use std::io::Cursor; use std::io::Cursor;
pub struct FromXLSX; pub struct FromXLSX;
@ -49,19 +49,12 @@ fn from_xlsx(
let tag = runnable_context.name; let tag = runnable_context.name;
let stream = async_stream! { let stream = async_stream! {
let values: Vec<Value> = input.values.collect().await; let value = input.collect_binary(tag.clone()).await?;
for value in values { let mut buf: Cursor<Vec<u8>> = Cursor::new(value.item);
let value_span = value.tag.span; let mut xls = Xlsx::<_>::new(buf).map_err(|_| {
let value_tag = value.tag.clone(); ShellError::labeled_error("Could not load xlsx file", "could not load xlsx file", &tag)
})?;
match value.value {
UntaggedValue::Primitive(Primitive::Binary(vb)) => {
let mut buf: Cursor<Vec<u8>> = Cursor::new(vb);
let mut xls = Xlsx::<_>::new(buf).map_err(|_| ShellError::labeled_error(
"Could not load xlsx file",
"could not load xlsx file",
&tag))?;
let mut dict = TaggedDictBuilder::new(&tag); let mut dict = TaggedDictBuilder::new(&tag);
@ -94,22 +87,12 @@ fn from_xlsx(
yield Err(ShellError::labeled_error( yield Err(ShellError::labeled_error(
"Could not load sheet", "Could not load sheet",
"could not load sheet", "could not load sheet",
&tag)); &tag,
));
} }
} }
yield ReturnSuccess::value(dict.into_value()); yield ReturnSuccess::value(dict.into_value());
}
_ => yield Err(ShellError::labeled_error_with_secondary(
"Expected binary data from pipeline",
"requires binary data input",
&tag,
"value originates from here",
value_tag,
)),
}
}
}; };
Ok(stream.to_output_stream()) Ok(stream.to_output_stream())

View file

@ -101,34 +101,12 @@ pub fn from_xml_string_to_value(s: String, tag: impl Into<Tag>) -> Result<Value,
fn from_xml(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { fn from_xml(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let args = args.evaluate_once(registry)?; let args = args.evaluate_once(registry)?;
let tag = args.name_tag(); let tag = args.name_tag();
let name_span = tag.span;
let input = args.input; let input = args.input;
let stream = async_stream! { let stream = async_stream! {
let values: Vec<Value> = input.values.collect().await; let concat_string = input.collect_string(tag.clone()).await?;
let mut concat_string = String::new(); match from_xml_string_to_value(concat_string.item, tag.clone()) {
let mut latest_tag: Option<Tag> = None;
for value in values {
latest_tag = Some(value.tag.clone());
let value_span = value.tag.span;
if let Ok(s) = value.as_string() {
concat_string.push_str(&s);
}
else {
yield Err(ShellError::labeled_error_with_secondary(
"Expected a string from pipeline",
"requires string input",
name_span,
"value originates from here",
value_span,
))
}
}
match from_xml_string_to_value(concat_string, tag.clone()) {
Ok(x) => match x { Ok(x) => match x {
Value { value: UntaggedValue::Table(list), .. } => { Value { value: UntaggedValue::Table(list), .. } => {
for l in list { for l in list {
@ -137,13 +115,13 @@ fn from_xml(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStrea
} }
x => yield ReturnSuccess::value(x), x => yield ReturnSuccess::value(x),
}, },
Err(_) => if let Some(last_tag) = latest_tag { Err(_) => {
yield Err(ShellError::labeled_error_with_secondary( yield Err(ShellError::labeled_error_with_secondary(
"Could not parse as XML", "Could not parse as XML",
"input cannot be parsed as XML", "input cannot be parsed as XML",
&tag, &tag,
"value originates from here", "value originates from here",
&last_tag, &concat_string.tag,
)) ))
} , } ,
} }

View file

@ -121,34 +121,12 @@ pub fn from_yaml_string_to_value(s: String, tag: impl Into<Tag>) -> Result<Value
fn from_yaml(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { fn from_yaml(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let args = args.evaluate_once(registry)?; let args = args.evaluate_once(registry)?;
let tag = args.name_tag(); let tag = args.name_tag();
let name_span = tag.span;
let input = args.input; let input = args.input;
let stream = async_stream! { let stream = async_stream! {
let values: Vec<Value> = input.values.collect().await; let concat_string = input.collect_string(tag.clone()).await?;
let mut concat_string = String::new(); match from_yaml_string_to_value(concat_string.item, tag.clone()) {
let mut latest_tag: Option<Tag> = None;
for value in values {
latest_tag = Some(value.tag.clone());
let value_span = value.tag.span;
if let Ok(s) = value.as_string() {
concat_string.push_str(&s);
}
else {
yield Err(ShellError::labeled_error_with_secondary(
"Expected a string from pipeline",
"requires string input",
name_span,
"value originates from here",
value_span,
))
}
}
match from_yaml_string_to_value(concat_string, tag.clone()) {
Ok(x) => match x { Ok(x) => match x {
Value { value: UntaggedValue::Table(list), .. } => { Value { value: UntaggedValue::Table(list), .. } => {
for l in list { for l in list {
@ -157,15 +135,15 @@ fn from_yaml(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStre
} }
x => yield ReturnSuccess::value(x), x => yield ReturnSuccess::value(x),
}, },
Err(_) => if let Some(last_tag) = latest_tag { Err(_) => {
yield Err(ShellError::labeled_error_with_secondary( yield Err(ShellError::labeled_error_with_secondary(
"Could not parse as YAML", "Could not parse as YAML",
"input cannot be parsed as YAML", "input cannot be parsed as YAML",
&tag, &tag,
"value originates from here", "value originates from here",
&last_tag, &concat_string.tag,
)) ))
} , }
} }
}; };

View file

@ -1,8 +1,7 @@
use crate::commands::WholeStreamCommand; use crate::commands::WholeStreamCommand;
use crate::prelude::*; use crate::prelude::*;
use log::trace;
use nu_errors::ShellError; use nu_errors::ShellError;
use nu_protocol::{Primitive, ReturnSuccess, Signature, UntaggedValue}; use nu_protocol::{Primitive, ReturnSuccess, Signature, UntaggedValue, Value};
pub struct Lines; pub struct Lines;
@ -28,45 +27,89 @@ impl WholeStreamCommand for Lines {
} }
} }
// TODO: "Amount remaining" wrapper fn ends_with_line_ending(st: &str) -> bool {
let mut temp = st.to_string();
let last = temp.pop();
if let Some(c) = last {
c == '\n'
} else {
false
}
}
fn lines(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { fn lines(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let args = args.evaluate_once(registry)?; let args = args.evaluate_once(registry)?;
let tag = args.name_tag(); let tag = args.name_tag();
let name_span = tag.span; let name_span = tag.span;
let input = args.input; let mut input = args.input;
let stream = input let mut leftover = vec![];
.values let mut leftover_string = String::new();
.map(move |v| { let stream = async_stream! {
if let Ok(s) = v.as_string() { loop {
let split_result: Vec<_> = s.lines().filter(|s| s.trim() != "").collect(); match input.values.next().await {
Some(Value { value: UntaggedValue::Primitive(Primitive::String(st)), ..}) => {
let mut st = leftover_string.clone() + &st;
leftover.clear();
trace!("split result = {:?}", split_result); let mut lines: Vec<String> = st.lines().map(|x| x.to_string()).collect();
let result = split_result if !ends_with_line_ending(&st) {
.into_iter() if let Some(last) = lines.pop() {
.map(|s| { leftover_string = last;
ReturnSuccess::value(
UntaggedValue::Primitive(Primitive::Line(s.into()))
.into_untagged_value(),
)
})
.collect::<Vec<_>>();
futures::stream::iter(result)
} else { } else {
let value_span = v.tag.span; leftover_string.clear();
}
} else {
leftover_string.clear();
}
futures::stream::iter(vec![Err(ShellError::labeled_error_with_secondary( let success_lines: Vec<_> = lines.iter().map(|x| ReturnSuccess::value(UntaggedValue::line(x).into_untagged_value())).collect();
yield futures::stream::iter(success_lines)
}
Some(Value { value: UntaggedValue::Primitive(Primitive::Line(st)), ..}) => {
let mut st = leftover_string.clone() + &st;
leftover.clear();
let mut lines: Vec<String> = st.lines().map(|x| x.to_string()).collect();
if !ends_with_line_ending(&st) {
if let Some(last) = lines.pop() {
leftover_string = last;
} else {
leftover_string.clear();
}
} else {
leftover_string.clear();
}
let success_lines: Vec<_> = lines.iter().map(|x| ReturnSuccess::value(UntaggedValue::line(x).into_untagged_value())).collect();
yield futures::stream::iter(success_lines)
}
Some( Value { tag: value_span, ..}) => {
yield futures::stream::iter(vec![Err(ShellError::labeled_error_with_secondary(
"Expected a string from pipeline", "Expected a string from pipeline",
"requires string input", "requires string input",
name_span, name_span,
"value originates from here", "value originates from here",
value_span, value_span,
))]) ))]);
}
None => {
if !leftover.is_empty() {
let mut st = leftover_string.clone();
if let Ok(extra) = String::from_utf8(leftover) {
st.push_str(&extra);
}
yield futures::stream::iter(vec![ReturnSuccess::value(UntaggedValue::string(st).into_untagged_value())])
}
break;
}
}
}
if !leftover_string.is_empty() {
yield futures::stream::iter(vec![ReturnSuccess::value(UntaggedValue::string(leftover_string).into_untagged_value())]);
}
} }
})
.flatten(); .flatten();
Ok(stream.to_output_stream()) Ok(stream.to_output_stream())

View file

@ -1,6 +1,8 @@
use crate::prelude::*; use crate::prelude::*;
use futures::stream::iter; use futures::stream::iter;
use nu_protocol::{ReturnSuccess, ReturnValue, UntaggedValue, Value}; use nu_errors::ShellError;
use nu_protocol::{Primitive, ReturnSuccess, ReturnValue, UntaggedValue, Value};
use nu_source::{Tagged, TaggedItem};
pub struct InputStream { pub struct InputStream {
pub(crate) values: BoxStream<'static, Value>, pub(crate) values: BoxStream<'static, Value>,
@ -27,6 +29,91 @@ impl InputStream {
values: input.boxed(), values: input.boxed(),
} }
} }
pub async fn collect_string(mut self, tag: Tag) -> Result<Tagged<String>, ShellError> {
let mut bytes = vec![];
let mut value_tag = tag.clone();
loop {
match self.values.next().await {
Some(Value {
value: UntaggedValue::Primitive(Primitive::String(s)),
tag: value_t,
}) => {
value_tag = value_t;
bytes.extend_from_slice(&s.into_bytes());
}
Some(Value {
value: UntaggedValue::Primitive(Primitive::Line(s)),
tag: value_t,
}) => {
value_tag = value_t;
bytes.extend_from_slice(&s.into_bytes());
}
Some(Value {
value: UntaggedValue::Primitive(Primitive::Binary(b)),
tag: value_t,
}) => {
value_tag = value_t;
bytes.extend_from_slice(&b);
}
Some(Value { tag: value_tag, .. }) => {
return Err(ShellError::labeled_error_with_secondary(
"Expected a string from pipeline",
"requires string input",
tag,
"value originates from here",
value_tag,
))
}
None => break,
}
}
match String::from_utf8(bytes) {
Ok(s) => Ok(s.tagged(value_tag.clone())),
Err(_) => Err(ShellError::labeled_error_with_secondary(
"Expected a string from pipeline",
"requires string input",
tag,
"value originates from here",
value_tag,
)),
}
}
pub async fn collect_binary(mut self, tag: Tag) -> Result<Tagged<Vec<u8>>, ShellError> {
let mut bytes = vec![];
let mut value_tag = tag.clone();
loop {
match self.values.next().await {
Some(Value {
value: UntaggedValue::Primitive(Primitive::Binary(b)),
tag: value_t,
}) => {
value_tag = value_t;
bytes.extend_from_slice(&b);
}
Some(Value {
tag: value_tag,
value: v,
}) => {
println!("{:?}", v);
return Err(ShellError::labeled_error_with_secondary(
"Expected binary from pipeline",
"requires binary input",
tag,
"value originates from here",
value_tag,
));
}
None => break,
}
}
Ok(bytes.tagged(value_tag))
}
} }
impl Stream for InputStream { impl Stream for InputStream {

View file

@ -126,6 +126,7 @@ mod stdin_evaluation {
iecho yes iecho yes
| chop | chop
| chop | chop
| lines
| first 1 | first 1
"# "#
)); ));