2019-05-23 07:23:06 +00:00
|
|
|
use crate::prelude::*;
|
Move external closer to internal (#1611)
* Refactor InputStream and affected commands.
First, making `values` private and leaning on the `Stream` implementation makes
consumes of `InputStream` less likely to have to change in the future, if we
change what an `InputStream` is internally.
Second, we're dropping `Option<InputStream>` as the input to pipelines,
internals, and externals. Instead, `InputStream.is_empty` can be used to check
for "emptiness". Empty streams are typically only ever used as the first input
to a pipeline.
* Add run_external internal command.
We want to push external commands closer to internal commands, eventually
eliminating the concept of "external" completely. This means we can consolidate
a couple of things:
- Variable evaluation (for example, `$it`, `$nu`, alias vars)
- Behaviour of whole stream vs per-item external execution
It should also make it easier for us to start introducing argument signatures
for external commands,
* Update run_external.rs
* Update run_external.rs
* Update run_external.rs
* Update run_external.rs
Co-authored-by: Jonathan Turner <jonathandturner@users.noreply.github.com>
2020-04-20 03:30:44 +00:00
|
|
|
use futures::stream::{iter, once};
|
2020-03-06 16:06:39 +00:00
|
|
|
use nu_errors::ShellError;
|
2020-03-29 14:23:19 +00:00
|
|
|
use nu_protocol::{Primitive, UntaggedValue, Value};
|
2020-03-06 16:06:39 +00:00
|
|
|
use nu_source::{Tagged, TaggedItem};
|
2019-05-23 07:23:06 +00:00
|
|
|
|
2019-07-03 20:31:15 +00:00
|
|
|
pub struct InputStream {
|
Move external closer to internal (#1611)
* Refactor InputStream and affected commands.
First, making `values` private and leaning on the `Stream` implementation makes
consumes of `InputStream` less likely to have to change in the future, if we
change what an `InputStream` is internally.
Second, we're dropping `Option<InputStream>` as the input to pipelines,
internals, and externals. Instead, `InputStream.is_empty` can be used to check
for "emptiness". Empty streams are typically only ever used as the first input
to a pipeline.
* Add run_external internal command.
We want to push external commands closer to internal commands, eventually
eliminating the concept of "external" completely. This means we can consolidate
a couple of things:
- Variable evaluation (for example, `$it`, `$nu`, alias vars)
- Behaviour of whole stream vs per-item external execution
It should also make it easier for us to start introducing argument signatures
for external commands,
* Update run_external.rs
* Update run_external.rs
* Update run_external.rs
* Update run_external.rs
Co-authored-by: Jonathan Turner <jonathandturner@users.noreply.github.com>
2020-04-20 03:30:44 +00:00
|
|
|
values: BoxStream<'static, Value>,
|
|
|
|
|
|
|
|
// Whether or not an empty stream was explicitly requeted via InputStream::empty
|
|
|
|
empty: bool,
|
2019-07-03 20:31:15 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
impl InputStream {
|
2020-01-07 21:00:01 +00:00
|
|
|
pub fn empty() -> InputStream {
|
Move external closer to internal (#1611)
* Refactor InputStream and affected commands.
First, making `values` private and leaning on the `Stream` implementation makes
consumes of `InputStream` less likely to have to change in the future, if we
change what an `InputStream` is internally.
Second, we're dropping `Option<InputStream>` as the input to pipelines,
internals, and externals. Instead, `InputStream.is_empty` can be used to check
for "emptiness". Empty streams are typically only ever used as the first input
to a pipeline.
* Add run_external internal command.
We want to push external commands closer to internal commands, eventually
eliminating the concept of "external" completely. This means we can consolidate
a couple of things:
- Variable evaluation (for example, `$it`, `$nu`, alias vars)
- Behaviour of whole stream vs per-item external execution
It should also make it easier for us to start introducing argument signatures
for external commands,
* Update run_external.rs
* Update run_external.rs
* Update run_external.rs
* Update run_external.rs
Co-authored-by: Jonathan Turner <jonathandturner@users.noreply.github.com>
2020-04-20 03:30:44 +00:00
|
|
|
InputStream {
|
|
|
|
values: once(async { UntaggedValue::nothing().into_untagged_value() }).boxed(),
|
|
|
|
empty: true,
|
|
|
|
}
|
2020-01-07 21:00:01 +00:00
|
|
|
}
|
|
|
|
|
2020-05-16 03:18:24 +00:00
|
|
|
pub fn one(item: impl Into<Value>) -> InputStream {
|
|
|
|
let mut v: VecDeque<Value> = VecDeque::new();
|
|
|
|
v.push_back(item.into());
|
|
|
|
v.into()
|
|
|
|
}
|
|
|
|
|
2019-11-21 14:33:14 +00:00
|
|
|
pub fn into_vec(self) -> impl Future<Output = Vec<Value>> {
|
2019-07-03 20:31:15 +00:00
|
|
|
self.values.collect()
|
|
|
|
}
|
|
|
|
|
Move external closer to internal (#1611)
* Refactor InputStream and affected commands.
First, making `values` private and leaning on the `Stream` implementation makes
consumes of `InputStream` less likely to have to change in the future, if we
change what an `InputStream` is internally.
Second, we're dropping `Option<InputStream>` as the input to pipelines,
internals, and externals. Instead, `InputStream.is_empty` can be used to check
for "emptiness". Empty streams are typically only ever used as the first input
to a pipeline.
* Add run_external internal command.
We want to push external commands closer to internal commands, eventually
eliminating the concept of "external" completely. This means we can consolidate
a couple of things:
- Variable evaluation (for example, `$it`, `$nu`, alias vars)
- Behaviour of whole stream vs per-item external execution
It should also make it easier for us to start introducing argument signatures
for external commands,
* Update run_external.rs
* Update run_external.rs
* Update run_external.rs
* Update run_external.rs
Co-authored-by: Jonathan Turner <jonathandturner@users.noreply.github.com>
2020-04-20 03:30:44 +00:00
|
|
|
pub fn is_empty(&self) -> bool {
|
|
|
|
self.empty
|
|
|
|
}
|
|
|
|
|
2019-11-21 14:33:14 +00:00
|
|
|
pub fn drain_vec(&mut self) -> impl Future<Output = Vec<Value>> {
|
Futures v0.3 upgrade (#1344)
* Upgrade futures, async-stream, and futures_codec
These were the last three dependencies on futures-preview. `nu` itself
is now fully dependent on `futures@0.3`, as opposed to `futures-preview`
alpha.
Because the update to `futures` from `0.3.0-alpha.19` to `0.3.0` removed
the `Stream` implementation of `VecDeque` ([changelog][changelog]), most
commands that convert a `VecDeque` to an `OutputStream` broke and had to
be fixed.
The current solution is to now convert `VecDeque`s to a `Stream` via
`futures::stream::iter`. However, it may be useful for `futures` to
create an `IntoStream` trait, implemented on the `std::collections` (or
really any `IntoIterator`). If something like this happends, it may be
worthwhile to update the trait implementations on `OutputStream` and
refactor these commands again.
While upgrading `futures_codec`, we remove a custom implementation of
`LinesCodec`, as one has been added to the library. There's also a small
refactor to make the stream output more idiomatic.
[changelog]: https://github.com/rust-lang/futures-rs/blob/master/CHANGELOG.md#030---2019-11-5
* Upgrade sys & ps plugin dependencies
They were previously dependent on `futures-preview`, and `nu_plugin_ps`
was dependent on an old version of `futures-timer`.
* Remove dependency on futures-timer from nu
* Update Cargo.lock
* Fix formatting
* Revert fmt regressions
CI is still on 1.40.0, but the latest rustfmt v1.41.0 has changes to the
`val @ pattern` syntax, causing the linting job to fail.
* Fix clippy warnings
2020-02-06 03:46:48 +00:00
|
|
|
let mut values: BoxStream<'static, Value> = iter(VecDeque::new()).boxed();
|
2019-08-03 02:17:28 +00:00
|
|
|
std::mem::swap(&mut values, &mut self.values);
|
|
|
|
|
|
|
|
values.collect()
|
|
|
|
}
|
|
|
|
|
2019-11-21 14:33:14 +00:00
|
|
|
pub fn from_stream(input: impl Stream<Item = Value> + Send + 'static) -> InputStream {
|
2019-07-03 20:31:15 +00:00
|
|
|
InputStream {
|
|
|
|
values: input.boxed(),
|
Move external closer to internal (#1611)
* Refactor InputStream and affected commands.
First, making `values` private and leaning on the `Stream` implementation makes
consumes of `InputStream` less likely to have to change in the future, if we
change what an `InputStream` is internally.
Second, we're dropping `Option<InputStream>` as the input to pipelines,
internals, and externals. Instead, `InputStream.is_empty` can be used to check
for "emptiness". Empty streams are typically only ever used as the first input
to a pipeline.
* Add run_external internal command.
We want to push external commands closer to internal commands, eventually
eliminating the concept of "external" completely. This means we can consolidate
a couple of things:
- Variable evaluation (for example, `$it`, `$nu`, alias vars)
- Behaviour of whole stream vs per-item external execution
It should also make it easier for us to start introducing argument signatures
for external commands,
* Update run_external.rs
* Update run_external.rs
* Update run_external.rs
* Update run_external.rs
Co-authored-by: Jonathan Turner <jonathandturner@users.noreply.github.com>
2020-04-20 03:30:44 +00:00
|
|
|
empty: false,
|
2019-07-03 20:31:15 +00:00
|
|
|
}
|
|
|
|
}
|
2020-03-06 16:06:39 +00:00
|
|
|
|
|
|
|
pub async fn collect_string(mut self, tag: Tag) -> Result<Tagged<String>, ShellError> {
|
|
|
|
let mut bytes = vec![];
|
|
|
|
let mut value_tag = tag.clone();
|
|
|
|
|
|
|
|
loop {
|
|
|
|
match self.values.next().await {
|
|
|
|
Some(Value {
|
|
|
|
value: UntaggedValue::Primitive(Primitive::String(s)),
|
|
|
|
tag: value_t,
|
|
|
|
}) => {
|
|
|
|
value_tag = value_t;
|
|
|
|
bytes.extend_from_slice(&s.into_bytes());
|
|
|
|
}
|
|
|
|
Some(Value {
|
|
|
|
value: UntaggedValue::Primitive(Primitive::Line(s)),
|
|
|
|
tag: value_t,
|
|
|
|
}) => {
|
|
|
|
value_tag = value_t;
|
|
|
|
bytes.extend_from_slice(&s.into_bytes());
|
|
|
|
}
|
|
|
|
Some(Value {
|
|
|
|
value: UntaggedValue::Primitive(Primitive::Binary(b)),
|
|
|
|
tag: value_t,
|
|
|
|
}) => {
|
|
|
|
value_tag = value_t;
|
|
|
|
bytes.extend_from_slice(&b);
|
|
|
|
}
|
|
|
|
Some(Value { tag: value_tag, .. }) => {
|
|
|
|
return Err(ShellError::labeled_error_with_secondary(
|
|
|
|
"Expected a string from pipeline",
|
|
|
|
"requires string input",
|
|
|
|
tag,
|
|
|
|
"value originates from here",
|
|
|
|
value_tag,
|
|
|
|
))
|
|
|
|
}
|
|
|
|
None => break,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
match String::from_utf8(bytes) {
|
|
|
|
Ok(s) => Ok(s.tagged(value_tag.clone())),
|
|
|
|
Err(_) => Err(ShellError::labeled_error_with_secondary(
|
|
|
|
"Expected a string from pipeline",
|
|
|
|
"requires string input",
|
|
|
|
tag,
|
|
|
|
"value originates from here",
|
|
|
|
value_tag,
|
|
|
|
)),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn collect_binary(mut self, tag: Tag) -> Result<Tagged<Vec<u8>>, ShellError> {
|
|
|
|
let mut bytes = vec![];
|
|
|
|
let mut value_tag = tag.clone();
|
|
|
|
|
|
|
|
loop {
|
|
|
|
match self.values.next().await {
|
|
|
|
Some(Value {
|
|
|
|
value: UntaggedValue::Primitive(Primitive::Binary(b)),
|
|
|
|
tag: value_t,
|
|
|
|
}) => {
|
|
|
|
value_tag = value_t;
|
|
|
|
bytes.extend_from_slice(&b);
|
|
|
|
}
|
|
|
|
Some(Value {
|
|
|
|
tag: value_tag,
|
|
|
|
value: v,
|
|
|
|
}) => {
|
|
|
|
println!("{:?}", v);
|
|
|
|
return Err(ShellError::labeled_error_with_secondary(
|
|
|
|
"Expected binary from pipeline",
|
|
|
|
"requires binary input",
|
|
|
|
tag,
|
|
|
|
"value originates from here",
|
|
|
|
value_tag,
|
|
|
|
));
|
|
|
|
}
|
|
|
|
None => break,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(bytes.tagged(value_tag))
|
|
|
|
}
|
2019-07-03 20:31:15 +00:00
|
|
|
}
|
|
|
|
|
2019-10-13 04:12:43 +00:00
|
|
|
impl Stream for InputStream {
|
2019-11-21 14:33:14 +00:00
|
|
|
type Item = Value;
|
2019-10-13 04:12:43 +00:00
|
|
|
|
|
|
|
fn poll_next(
|
|
|
|
mut self: std::pin::Pin<&mut Self>,
|
|
|
|
cx: &mut std::task::Context<'_>,
|
|
|
|
) -> core::task::Poll<Option<Self::Item>> {
|
|
|
|
Stream::poll_next(std::pin::Pin::new(&mut self.values), cx)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-11-21 14:33:14 +00:00
|
|
|
impl From<BoxStream<'static, Value>> for InputStream {
|
|
|
|
fn from(input: BoxStream<'static, Value>) -> InputStream {
|
Move external closer to internal (#1611)
* Refactor InputStream and affected commands.
First, making `values` private and leaning on the `Stream` implementation makes
consumes of `InputStream` less likely to have to change in the future, if we
change what an `InputStream` is internally.
Second, we're dropping `Option<InputStream>` as the input to pipelines,
internals, and externals. Instead, `InputStream.is_empty` can be used to check
for "emptiness". Empty streams are typically only ever used as the first input
to a pipeline.
* Add run_external internal command.
We want to push external commands closer to internal commands, eventually
eliminating the concept of "external" completely. This means we can consolidate
a couple of things:
- Variable evaluation (for example, `$it`, `$nu`, alias vars)
- Behaviour of whole stream vs per-item external execution
It should also make it easier for us to start introducing argument signatures
for external commands,
* Update run_external.rs
* Update run_external.rs
* Update run_external.rs
* Update run_external.rs
Co-authored-by: Jonathan Turner <jonathandturner@users.noreply.github.com>
2020-04-20 03:30:44 +00:00
|
|
|
InputStream {
|
|
|
|
values: input,
|
|
|
|
empty: false,
|
|
|
|
}
|
2019-07-03 20:31:15 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-11-21 14:33:14 +00:00
|
|
|
impl From<VecDeque<Value>> for InputStream {
|
|
|
|
fn from(input: VecDeque<Value>) -> InputStream {
|
2019-07-03 20:31:15 +00:00
|
|
|
InputStream {
|
Futures v0.3 upgrade (#1344)
* Upgrade futures, async-stream, and futures_codec
These were the last three dependencies on futures-preview. `nu` itself
is now fully dependent on `futures@0.3`, as opposed to `futures-preview`
alpha.
Because the update to `futures` from `0.3.0-alpha.19` to `0.3.0` removed
the `Stream` implementation of `VecDeque` ([changelog][changelog]), most
commands that convert a `VecDeque` to an `OutputStream` broke and had to
be fixed.
The current solution is to now convert `VecDeque`s to a `Stream` via
`futures::stream::iter`. However, it may be useful for `futures` to
create an `IntoStream` trait, implemented on the `std::collections` (or
really any `IntoIterator`). If something like this happends, it may be
worthwhile to update the trait implementations on `OutputStream` and
refactor these commands again.
While upgrading `futures_codec`, we remove a custom implementation of
`LinesCodec`, as one has been added to the library. There's also a small
refactor to make the stream output more idiomatic.
[changelog]: https://github.com/rust-lang/futures-rs/blob/master/CHANGELOG.md#030---2019-11-5
* Upgrade sys & ps plugin dependencies
They were previously dependent on `futures-preview`, and `nu_plugin_ps`
was dependent on an old version of `futures-timer`.
* Remove dependency on futures-timer from nu
* Update Cargo.lock
* Fix formatting
* Revert fmt regressions
CI is still on 1.40.0, but the latest rustfmt v1.41.0 has changes to the
`val @ pattern` syntax, causing the linting job to fail.
* Fix clippy warnings
2020-02-06 03:46:48 +00:00
|
|
|
values: futures::stream::iter(input).boxed(),
|
Move external closer to internal (#1611)
* Refactor InputStream and affected commands.
First, making `values` private and leaning on the `Stream` implementation makes
consumes of `InputStream` less likely to have to change in the future, if we
change what an `InputStream` is internally.
Second, we're dropping `Option<InputStream>` as the input to pipelines,
internals, and externals. Instead, `InputStream.is_empty` can be used to check
for "emptiness". Empty streams are typically only ever used as the first input
to a pipeline.
* Add run_external internal command.
We want to push external commands closer to internal commands, eventually
eliminating the concept of "external" completely. This means we can consolidate
a couple of things:
- Variable evaluation (for example, `$it`, `$nu`, alias vars)
- Behaviour of whole stream vs per-item external execution
It should also make it easier for us to start introducing argument signatures
for external commands,
* Update run_external.rs
* Update run_external.rs
* Update run_external.rs
* Update run_external.rs
Co-authored-by: Jonathan Turner <jonathandturner@users.noreply.github.com>
2020-04-20 03:30:44 +00:00
|
|
|
empty: false,
|
2019-07-03 20:31:15 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-11-21 14:33:14 +00:00
|
|
|
impl From<Vec<Value>> for InputStream {
|
|
|
|
fn from(input: Vec<Value>) -> InputStream {
|
2019-07-03 20:31:15 +00:00
|
|
|
InputStream {
|
Futures v0.3 upgrade (#1344)
* Upgrade futures, async-stream, and futures_codec
These were the last three dependencies on futures-preview. `nu` itself
is now fully dependent on `futures@0.3`, as opposed to `futures-preview`
alpha.
Because the update to `futures` from `0.3.0-alpha.19` to `0.3.0` removed
the `Stream` implementation of `VecDeque` ([changelog][changelog]), most
commands that convert a `VecDeque` to an `OutputStream` broke and had to
be fixed.
The current solution is to now convert `VecDeque`s to a `Stream` via
`futures::stream::iter`. However, it may be useful for `futures` to
create an `IntoStream` trait, implemented on the `std::collections` (or
really any `IntoIterator`). If something like this happends, it may be
worthwhile to update the trait implementations on `OutputStream` and
refactor these commands again.
While upgrading `futures_codec`, we remove a custom implementation of
`LinesCodec`, as one has been added to the library. There's also a small
refactor to make the stream output more idiomatic.
[changelog]: https://github.com/rust-lang/futures-rs/blob/master/CHANGELOG.md#030---2019-11-5
* Upgrade sys & ps plugin dependencies
They were previously dependent on `futures-preview`, and `nu_plugin_ps`
was dependent on an old version of `futures-timer`.
* Remove dependency on futures-timer from nu
* Update Cargo.lock
* Fix formatting
* Revert fmt regressions
CI is still on 1.40.0, but the latest rustfmt v1.41.0 has changes to the
`val @ pattern` syntax, causing the linting job to fail.
* Fix clippy warnings
2020-02-06 03:46:48 +00:00
|
|
|
values: futures::stream::iter(input).boxed(),
|
Move external closer to internal (#1611)
* Refactor InputStream and affected commands.
First, making `values` private and leaning on the `Stream` implementation makes
consumes of `InputStream` less likely to have to change in the future, if we
change what an `InputStream` is internally.
Second, we're dropping `Option<InputStream>` as the input to pipelines,
internals, and externals. Instead, `InputStream.is_empty` can be used to check
for "emptiness". Empty streams are typically only ever used as the first input
to a pipeline.
* Add run_external internal command.
We want to push external commands closer to internal commands, eventually
eliminating the concept of "external" completely. This means we can consolidate
a couple of things:
- Variable evaluation (for example, `$it`, `$nu`, alias vars)
- Behaviour of whole stream vs per-item external execution
It should also make it easier for us to start introducing argument signatures
for external commands,
* Update run_external.rs
* Update run_external.rs
* Update run_external.rs
* Update run_external.rs
Co-authored-by: Jonathan Turner <jonathandturner@users.noreply.github.com>
2020-04-20 03:30:44 +00:00
|
|
|
empty: false,
|
2019-07-03 20:31:15 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|