mirror of
https://github.com/nushell/nushell
synced 2025-01-09 11:49:00 +00:00
6fd854ed9f
# Description This PR introduces a `ByteStream` type which is a `Read`-able stream of bytes. Internally, it has an enum over three different byte stream sources: ```rust pub enum ByteStreamSource { Read(Box<dyn Read + Send + 'static>), File(File), Child(ChildProcess), } ``` This is in comparison to the current `RawStream` type, which is an `Iterator<Item = Vec<u8>>` and has to allocate for each read chunk. Currently, `PipelineData::ExternalStream` serves a weird dual role where it is either external command output or a wrapper around `RawStream`. `ByteStream` makes this distinction more clear (via `ByteStreamSource`) and replaces `PipelineData::ExternalStream` in this PR: ```rust pub enum PipelineData { Empty, Value(Value, Option<PipelineMetadata>), ListStream(ListStream, Option<PipelineMetadata>), ByteStream(ByteStream, Option<PipelineMetadata>), } ``` The PR is relatively large, but a decent amount of it is just repetitive changes. This PR fixes #7017, fixes #10763, and fixes #12369. This PR also improves performance when piping external commands. Nushell should, in most cases, have competitive pipeline throughput compared to, e.g., bash. | Command | Before (MB/s) | After (MB/s) | Bash (MB/s) | | -------------------------------------------------- | -------------:| ------------:| -----------:| | `throughput \| rg 'x'` | 3059 | 3744 | 3739 | | `throughput \| nu --testbin relay o> /dev/null` | 3508 | 8087 | 8136 | # User-Facing Changes - This is a breaking change for the plugin communication protocol, because the `ExternalStreamInfo` was replaced with `ByteStreamInfo`. Plugins now only have to deal with a single input stream, as opposed to the previous three streams: stdout, stderr, and exit code. - The output of `describe` has been changed for external/byte streams. - Temporary breaking change: `bytes starts-with` no longer works with byte streams. This is to keep the PR smaller, and `bytes ends-with` already does not work on byte streams. - If a process core dumped, then instead of having a `Value::Error` in the `exit_code` column of the output returned from `complete`, it now is a `Value::Int` with the negation of the signal number. # After Submitting - Update docs and book as necessary - Release notes (e.g., plugin protocol changes) - Adapt/convert commands to work with byte streams (high priority is `str length`, `bytes starts-with`, and maybe `bytes ends-with`). - Refactor the `tee` code, Devyn has already done some work on this. --------- Co-authored-by: Devyn Cairns <devyn.cairns@gmail.com>
156 lines
4.3 KiB
Rust
156 lines
4.3 KiB
Rust
use crate::{Config, PipelineData, ShellError, Span, Value};
|
|
use std::{
|
|
fmt::Debug,
|
|
sync::{atomic::AtomicBool, Arc},
|
|
};
|
|
|
|
pub type ValueIterator = Box<dyn Iterator<Item = Value> + Send + 'static>;
|
|
|
|
/// A potentially infinite, interruptible stream of [`Value`]s.
|
|
///
|
|
/// In practice, a "stream" here means anything which can be iterated and produces Values.
|
|
/// Like other iterators in Rust, observing values from this stream will drain the items
|
|
/// as you view them and the stream cannot be replayed.
|
|
pub struct ListStream {
|
|
stream: ValueIterator,
|
|
span: Span,
|
|
}
|
|
|
|
impl ListStream {
|
|
/// Create a new [`ListStream`] from a [`Value`] `Iterator`.
|
|
pub fn new(
|
|
iter: impl Iterator<Item = Value> + Send + 'static,
|
|
span: Span,
|
|
interrupt: Option<Arc<AtomicBool>>,
|
|
) -> Self {
|
|
Self {
|
|
stream: Box::new(Interrupt::new(iter, interrupt)),
|
|
span,
|
|
}
|
|
}
|
|
|
|
/// Returns the [`Span`] associated with this [`ListStream`].
|
|
pub fn span(&self) -> Span {
|
|
self.span
|
|
}
|
|
|
|
/// Convert a [`ListStream`] into its inner [`Value`] `Iterator`.
|
|
pub fn into_inner(self) -> ValueIterator {
|
|
self.stream
|
|
}
|
|
|
|
/// Converts each value in a [`ListStream`] into a string and then joins the strings together
|
|
/// using the given separator.
|
|
pub fn into_string(self, separator: &str, config: &Config) -> String {
|
|
self.into_iter()
|
|
.map(|val| val.to_expanded_string(", ", config))
|
|
.collect::<Vec<String>>()
|
|
.join(separator)
|
|
}
|
|
|
|
/// Collect the values of a [`ListStream`] into a list [`Value`].
|
|
pub fn into_value(self) -> Value {
|
|
Value::list(self.stream.collect(), self.span)
|
|
}
|
|
|
|
/// Consume all values in the stream, returning an error if any of the values is a `Value::Error`.
|
|
pub fn drain(self) -> Result<(), ShellError> {
|
|
for next in self {
|
|
if let Value::Error { error, .. } = next {
|
|
return Err(*error);
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
/// Modify the inner iterator of a [`ListStream`] using a function.
|
|
///
|
|
/// This can be used to call any number of standard iterator functions on the [`ListStream`].
|
|
/// E.g., `take`, `filter`, `step_by`, and more.
|
|
///
|
|
/// ```
|
|
/// use nu_protocol::{ListStream, Span, Value};
|
|
///
|
|
/// let span = Span::unknown();
|
|
/// let stream = ListStream::new(std::iter::repeat(Value::int(0, span)), span, None);
|
|
/// let new_stream = stream.modify(|iter| iter.take(100));
|
|
/// ```
|
|
pub fn modify<I>(self, f: impl FnOnce(ValueIterator) -> I) -> Self
|
|
where
|
|
I: Iterator<Item = Value> + Send + 'static,
|
|
{
|
|
Self {
|
|
stream: Box::new(f(self.stream)),
|
|
span: self.span,
|
|
}
|
|
}
|
|
|
|
/// Create a new [`ListStream`] whose values are the results of applying the given function
|
|
/// to each of the values in the original [`ListStream`].
|
|
pub fn map(self, mapping: impl FnMut(Value) -> Value + Send + 'static) -> Self {
|
|
self.modify(|iter| iter.map(mapping))
|
|
}
|
|
}
|
|
|
|
impl Debug for ListStream {
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
f.debug_struct("ListStream").finish()
|
|
}
|
|
}
|
|
|
|
impl IntoIterator for ListStream {
|
|
type Item = Value;
|
|
|
|
type IntoIter = IntoIter;
|
|
|
|
fn into_iter(self) -> Self::IntoIter {
|
|
IntoIter {
|
|
stream: self.into_inner(),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl From<ListStream> for PipelineData {
|
|
fn from(stream: ListStream) -> Self {
|
|
Self::ListStream(stream, None)
|
|
}
|
|
}
|
|
|
|
pub struct IntoIter {
|
|
stream: ValueIterator,
|
|
}
|
|
|
|
impl Iterator for IntoIter {
|
|
type Item = Value;
|
|
|
|
fn next(&mut self) -> Option<Self::Item> {
|
|
self.stream.next()
|
|
}
|
|
}
|
|
|
|
struct Interrupt<I: Iterator> {
|
|
iter: I,
|
|
interrupt: Option<Arc<AtomicBool>>,
|
|
}
|
|
|
|
impl<I: Iterator> Interrupt<I> {
|
|
fn new(iter: I, interrupt: Option<Arc<AtomicBool>>) -> Self {
|
|
Self { iter, interrupt }
|
|
}
|
|
}
|
|
|
|
impl<I: Iterator> Iterator for Interrupt<I> {
|
|
type Item = <I as Iterator>::Item;
|
|
|
|
fn next(&mut self) -> Option<Self::Item> {
|
|
if nu_utils::ctrl_c::was_pressed(&self.interrupt) {
|
|
None
|
|
} else {
|
|
self.iter.next()
|
|
}
|
|
}
|
|
|
|
fn size_hint(&self) -> (usize, Option<usize>) {
|
|
self.iter.size_hint()
|
|
}
|
|
}
|