Make PipelineData helpers collect rawstreams (#969)

This commit is contained in:
JT 2022-02-07 07:44:18 -05:00 committed by GitHub
parent 3ab55f7de9
commit a78c82d811
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 74 additions and 28 deletions

View file

@ -103,7 +103,7 @@ fn into_binary(
// TODO: in the future, we may want this to stream out, converting each to bytes // TODO: in the future, we may want this to stream out, converting each to bytes
let output = stream.into_bytes()?; let output = stream.into_bytes()?;
Ok(Value::Binary { Ok(Value::Binary {
val: output, val: output.item,
span: head, span: head,
} }
.into_pipeline_data()) .into_pipeline_data())

View file

@ -154,7 +154,7 @@ fn string_helper(
// TODO: in the future, we may want this to stream out, converting each to bytes // TODO: in the future, we may want this to stream out, converting each to bytes
let output = stream.into_string()?; let output = stream.into_string()?;
Ok(Value::String { Ok(Value::String {
val: output, val: output.item,
span: head, span: head,
} }
.into_pipeline_data()) .into_pipeline_data())

View file

@ -45,7 +45,7 @@ impl Command for Decode {
match input { match input {
PipelineData::RawStream(stream, ..) => { PipelineData::RawStream(stream, ..) => {
let bytes: Vec<u8> = stream.into_bytes()?; let bytes: Vec<u8> = stream.into_bytes()?.item;
let encoding = match Encoding::for_label(encoding.item.as_bytes()) { let encoding = match Encoding::for_label(encoding.item.as_bytes()) {
None => Err(ShellError::SpannedLabeledError( None => Err(ShellError::SpannedLabeledError(

View file

@ -230,12 +230,23 @@ impl PipelineData {
Ok(vals.into_iter().map(f).into_pipeline_data(ctrlc)) Ok(vals.into_iter().map(f).into_pipeline_data(ctrlc))
} }
PipelineData::ListStream(stream, ..) => Ok(stream.map(f).into_pipeline_data(ctrlc)), PipelineData::ListStream(stream, ..) => Ok(stream.map(f).into_pipeline_data(ctrlc)),
PipelineData::RawStream(stream, ..) => Ok(stream PipelineData::RawStream(stream, ..) => {
.map(move |x| match x { let collected = stream.into_bytes()?;
Ok(v) => f(v),
Err(err) => Value::Error { error: err }, if let Ok(st) = String::from_utf8(collected.clone().item) {
Ok(f(Value::String {
val: st,
span: collected.span,
}) })
.into_pipeline_data(ctrlc)), .into_pipeline_data())
} else {
Ok(f(Value::Binary {
val: collected.item,
span: collected.span,
})
.into_pipeline_data())
}
}
PipelineData::Value(Value::Range { val, .. }, ..) => { PipelineData::Value(Value::Range { val, .. }, ..) => {
Ok(val.into_range_iter()?.map(f).into_pipeline_data(ctrlc)) Ok(val.into_range_iter()?.map(f).into_pipeline_data(ctrlc))
@ -266,14 +277,25 @@ impl PipelineData {
PipelineData::ListStream(stream, ..) => { PipelineData::ListStream(stream, ..) => {
Ok(stream.map(f).flatten().into_pipeline_data(ctrlc)) Ok(stream.map(f).flatten().into_pipeline_data(ctrlc))
} }
PipelineData::RawStream(stream, ..) => Ok(stream PipelineData::RawStream(stream, ..) => {
.map(move |x| match x { let collected = stream.into_bytes()?;
Ok(v) => v,
Err(err) => Value::Error { error: err }, if let Ok(st) = String::from_utf8(collected.clone().item) {
Ok(f(Value::String {
val: st,
span: collected.span,
}) })
.map(f) .into_iter()
.flatten() .into_pipeline_data(ctrlc))
.into_pipeline_data(ctrlc)), } else {
Ok(f(Value::Binary {
val: collected.item,
span: collected.span,
})
.into_iter()
.into_pipeline_data(ctrlc))
}
}
PipelineData::Value(Value::Range { val, .. }, ..) => match val.into_range_iter() { PipelineData::Value(Value::Range { val, .. }, ..) => match val.into_range_iter() {
Ok(iter) => Ok(iter.map(f).flatten().into_pipeline_data(ctrlc)), Ok(iter) => Ok(iter.map(f).flatten().into_pipeline_data(ctrlc)),
Err(error) => Err(error), Err(error) => Err(error),
@ -296,13 +318,33 @@ impl PipelineData {
Ok(vals.into_iter().filter(f).into_pipeline_data(ctrlc)) Ok(vals.into_iter().filter(f).into_pipeline_data(ctrlc))
} }
PipelineData::ListStream(stream, ..) => Ok(stream.filter(f).into_pipeline_data(ctrlc)), PipelineData::ListStream(stream, ..) => Ok(stream.filter(f).into_pipeline_data(ctrlc)),
PipelineData::RawStream(stream, ..) => Ok(stream PipelineData::RawStream(stream, ..) => {
.map(move |x| match x { let collected = stream.into_bytes()?;
Ok(v) => v,
Err(err) => Value::Error { error: err }, if let Ok(st) = String::from_utf8(collected.clone().item) {
}) let v = Value::String {
.filter(f) val: st,
.into_pipeline_data(ctrlc)), span: collected.span,
};
if f(&v) {
Ok(v.into_pipeline_data())
} else {
Ok(PipelineData::new(collected.span))
}
} else {
let v = Value::Binary {
val: collected.item,
span: collected.span,
};
if f(&v) {
Ok(v.into_pipeline_data())
} else {
Ok(PipelineData::new(collected.span))
}
}
}
PipelineData::Value(Value::Range { val, .. }, ..) => { PipelineData::Value(Value::Range { val, .. }, ..) => {
Ok(val.into_range_iter()?.filter(f).into_pipeline_data(ctrlc)) Ok(val.into_range_iter()?.filter(f).into_pipeline_data(ctrlc))
} }

View file

@ -30,24 +30,28 @@ impl RawStream {
} }
} }
pub fn into_bytes(self) -> Result<Vec<u8>, ShellError> { pub fn into_bytes(self) -> Result<Spanned<Vec<u8>>, ShellError> {
let mut output = vec![]; let mut output = vec![];
for item in self.stream { for item in self.stream {
output.extend(item?); output.extend(item?);
} }
Ok(output) Ok(Spanned {
item: output,
span: self.span,
})
} }
pub fn into_string(self) -> Result<String, ShellError> { pub fn into_string(self) -> Result<Spanned<String>, ShellError> {
let mut output = String::new(); let mut output = String::new();
let span = self.span;
for item in self { for item in self {
output.push_str(&item?.as_string()?); output.push_str(&item?.as_string()?);
} }
Ok(output) Ok(Spanned { item: output, span })
} }
} }
impl Debug for RawStream { impl Debug for RawStream {