mirror of
https://github.com/nushell/nushell
synced 2025-01-06 18:29:02 +00:00
d9d956e54f
* Fix issue in external subexpression paths * new clippy dropped * clippy
188 lines
5.5 KiB
Rust
188 lines
5.5 KiB
Rust
use crate::prelude::*;
|
|
use nu_errors::ShellError;
|
|
use nu_protocol::{Primitive, Type, UntaggedValue, Value};
|
|
use nu_source::{HasFallibleSpan, PrettyDebug, Tag, Tagged, TaggedItem};
|
|
|
|
pub struct InputStream {
|
|
values: Box<dyn Iterator<Item = Value> + Send + Sync>,
|
|
|
|
// Whether or not an empty stream was explicitly requested via InputStream::empty
|
|
empty: bool,
|
|
}
|
|
|
|
impl Iterator for InputStream {
|
|
type Item = Value;
|
|
|
|
fn next(&mut self) -> Option<Self::Item> {
|
|
self.values.next()
|
|
}
|
|
}
|
|
|
|
impl InputStream {
|
|
pub fn empty() -> InputStream {
|
|
InputStream {
|
|
values: Box::new(std::iter::empty()),
|
|
empty: true,
|
|
}
|
|
}
|
|
|
|
pub fn one(item: impl Into<Value>) -> InputStream {
|
|
InputStream {
|
|
values: Box::new(std::iter::once(item.into())),
|
|
empty: false,
|
|
}
|
|
}
|
|
|
|
pub fn into_vec(self) -> Vec<Value> {
|
|
self.values.collect()
|
|
}
|
|
|
|
pub fn is_empty(&self) -> bool {
|
|
self.empty
|
|
}
|
|
|
|
pub fn drain_vec(&mut self) -> Vec<Value> {
|
|
let mut output = vec![];
|
|
for x in &mut self.values {
|
|
output.push(x);
|
|
}
|
|
output
|
|
}
|
|
|
|
pub fn from_stream(input: impl Iterator<Item = Value> + Send + Sync + 'static) -> InputStream {
|
|
InputStream {
|
|
values: Box::new(input),
|
|
empty: false,
|
|
}
|
|
}
|
|
|
|
pub fn collect_string(mut self, tag: Tag) -> Result<Tagged<String>, ShellError> {
|
|
let mut bytes = vec![];
|
|
let mut value_tag = tag.clone();
|
|
|
|
loop {
|
|
match self.values.next() {
|
|
Some(Value {
|
|
value: UntaggedValue::Primitive(Primitive::String(s)),
|
|
tag: value_t,
|
|
}) => {
|
|
value_tag = value_t;
|
|
bytes.extend_from_slice(&s.into_bytes());
|
|
}
|
|
Some(Value {
|
|
value: UntaggedValue::Primitive(Primitive::Binary(b)),
|
|
tag: value_t,
|
|
}) => {
|
|
value_tag = value_t;
|
|
bytes.extend_from_slice(&b);
|
|
}
|
|
Some(Value {
|
|
value: UntaggedValue::Primitive(Primitive::Nothing),
|
|
tag: value_t,
|
|
}) => {
|
|
value_tag = value_t;
|
|
}
|
|
Some(Value {
|
|
tag: value_tag,
|
|
value,
|
|
}) => {
|
|
return Err(ShellError::labeled_error_with_secondary(
|
|
"Expected a string from pipeline",
|
|
"requires string input",
|
|
tag,
|
|
format!(
|
|
"{} originates from here",
|
|
Type::from_value(&value).plain_string(100000)
|
|
),
|
|
value_tag,
|
|
))
|
|
}
|
|
None => break,
|
|
}
|
|
}
|
|
|
|
match String::from_utf8(bytes) {
|
|
Ok(s) => Ok(s.tagged(value_tag)),
|
|
Err(_) => Err(ShellError::labeled_error_with_secondary(
|
|
"Expected a string from pipeline",
|
|
"requires string input",
|
|
tag,
|
|
"value originates from here",
|
|
value_tag,
|
|
)),
|
|
}
|
|
}
|
|
|
|
pub fn collect_binary(mut self, tag: Tag) -> Result<Tagged<Vec<u8>>, ShellError> {
|
|
let mut bytes = vec![];
|
|
let mut value_tag = tag.clone();
|
|
|
|
loop {
|
|
match self.values.next() {
|
|
Some(Value {
|
|
value: UntaggedValue::Primitive(Primitive::Binary(b)),
|
|
tag: value_t,
|
|
}) => {
|
|
value_tag = value_t;
|
|
bytes.extend_from_slice(&b);
|
|
}
|
|
Some(Value {
|
|
tag: value_tag,
|
|
value: _,
|
|
}) => {
|
|
return Err(ShellError::labeled_error_with_secondary(
|
|
"Expected binary from pipeline",
|
|
"requires binary input",
|
|
tag,
|
|
"value originates from here",
|
|
value_tag,
|
|
));
|
|
}
|
|
None => break,
|
|
}
|
|
}
|
|
|
|
Ok(bytes.tagged(value_tag))
|
|
}
|
|
}
|
|
|
|
impl From<VecDeque<Value>> for InputStream {
|
|
fn from(input: VecDeque<Value>) -> InputStream {
|
|
InputStream {
|
|
values: Box::new(input.into_iter()),
|
|
empty: false,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl From<Vec<Value>> for InputStream {
|
|
fn from(input: Vec<Value>) -> InputStream {
|
|
InputStream {
|
|
values: Box::new(input.into_iter()),
|
|
empty: false,
|
|
}
|
|
}
|
|
}
|
|
|
|
pub trait IntoInputStream {
|
|
fn into_input_stream(self) -> InputStream;
|
|
}
|
|
|
|
impl<T, U> IntoInputStream for T
|
|
where
|
|
T: Iterator<Item = U> + Send + Sync + 'static,
|
|
U: Into<Result<nu_protocol::Value, nu_errors::ShellError>>,
|
|
{
|
|
fn into_input_stream(self) -> InputStream {
|
|
InputStream {
|
|
empty: false,
|
|
values: Box::new(self.map(|item| match item.into() {
|
|
Ok(result) => result,
|
|
Err(err) => match HasFallibleSpan::maybe_span(&err) {
|
|
Some(span) => nu_protocol::UntaggedValue::Error(err).into_value(span),
|
|
None => nu_protocol::UntaggedValue::Error(err).into_untagged_value(),
|
|
},
|
|
})),
|
|
}
|
|
}
|
|
}
|