Split input and output streams into separate modules

This commit is contained in:
Jason Gedge 2020-03-29 10:23:19 -04:00 committed by Jason Gedge
parent ae5f3c8210
commit cad2741e9e
4 changed files with 112 additions and 115 deletions

View file

@ -163,23 +163,16 @@ impl Shell for FilesystemShell {
// Generated stream: impl Stream<Item = Result<ReturnSuccess, ShellError>
let stream = async_stream::try_stream! {
for path in paths {
// Handle CTRL+C presence
if ctrl_c.load(Ordering::SeqCst) {
break;
}
// Map GlobError to ShellError and gracefully try to unwrap the path
let path = path.map_err(|e| ShellError::from(e.into_error()))?;
// Skip if '--all/-a' flag is present and this path is hidden
if !all && is_hidden_dir(&path) {
continue;
}
// Get metadata from current path, if we don't have enough
// permissions to stat on file don't use any metadata, otherwise
// return the error and gracefully unwrap metadata (which yields
// Option<Metadata>)
let metadata = match std::fs::symlink_metadata(&path) {
Ok(metadata) => Ok(Some(metadata)),
Err(e) => if let PermissionDenied = e.kind() {
@ -189,9 +182,6 @@ impl Shell for FilesystemShell {
},
}?;
// Build dict entry for this path and possibly using some metadata.
// Map the possible dict entry into a Value, gracefully unwrap it
// with '?'
let entry = dir_entry_dict(
&path,
metadata.as_ref(),
@ -202,7 +192,6 @@ impl Shell for FilesystemShell {
)
.map(|entry| ReturnSuccess::Value(entry.into()))?;
// Finally yield the generated entry that was mapped to Value
yield entry;
}
};

View file

@ -1,7 +1,7 @@
use crate::prelude::*;
use futures::stream::iter;
use nu_errors::ShellError;
use nu_protocol::{Primitive, ReturnSuccess, ReturnValue, UntaggedValue, Value};
use nu_protocol::{Primitive, UntaggedValue, Value};
use nu_source::{Tagged, TaggedItem};
pub struct InputStream {
@ -148,106 +148,3 @@ impl From<Vec<Value>> for InputStream {
}
}
}
pub struct OutputStream {
pub(crate) values: BoxStream<'static, ReturnValue>,
}
impl OutputStream {
pub fn new(values: impl Stream<Item = ReturnValue> + Send + 'static) -> OutputStream {
OutputStream {
values: values.boxed(),
}
}
pub fn empty() -> OutputStream {
let v: VecDeque<ReturnValue> = VecDeque::new();
v.into()
}
pub fn one(item: impl Into<ReturnValue>) -> OutputStream {
let mut v: VecDeque<ReturnValue> = VecDeque::new();
v.push_back(item.into());
v.into()
}
pub fn from_input(input: impl Stream<Item = Value> + Send + 'static) -> OutputStream {
OutputStream {
values: input.map(ReturnSuccess::value).boxed(),
}
}
pub fn drain_vec(&mut self) -> impl Future<Output = Vec<ReturnValue>> {
let mut values: BoxStream<'static, ReturnValue> = iter(VecDeque::new()).boxed();
std::mem::swap(&mut values, &mut self.values);
values.collect()
}
}
impl Stream for OutputStream {
type Item = ReturnValue;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> core::task::Poll<Option<Self::Item>> {
Stream::poll_next(std::pin::Pin::new(&mut self.values), cx)
}
}
impl From<InputStream> for OutputStream {
fn from(input: InputStream) -> OutputStream {
OutputStream {
values: input.values.map(ReturnSuccess::value).boxed(),
}
}
}
impl From<BoxStream<'static, Value>> for OutputStream {
fn from(input: BoxStream<'static, Value>) -> OutputStream {
OutputStream {
values: input.map(ReturnSuccess::value).boxed(),
}
}
}
impl From<BoxStream<'static, ReturnValue>> for OutputStream {
fn from(input: BoxStream<'static, ReturnValue>) -> OutputStream {
OutputStream { values: input }
}
}
impl From<VecDeque<ReturnValue>> for OutputStream {
fn from(input: VecDeque<ReturnValue>) -> OutputStream {
OutputStream {
values: futures::stream::iter(input).boxed(),
}
}
}
impl From<VecDeque<Value>> for OutputStream {
fn from(input: VecDeque<Value>) -> OutputStream {
let stream = input.into_iter().map(ReturnSuccess::value);
OutputStream {
values: futures::stream::iter(stream).boxed(),
}
}
}
impl From<Vec<ReturnValue>> for OutputStream {
fn from(input: Vec<ReturnValue>) -> OutputStream {
OutputStream {
values: futures::stream::iter(input).boxed(),
}
}
}
impl From<Vec<Value>> for OutputStream {
fn from(input: Vec<Value>) -> OutputStream {
let stream = input.into_iter().map(ReturnSuccess::value);
OutputStream {
values: futures::stream::iter(stream).boxed(),
}
}
}

View file

@ -0,0 +1,5 @@
mod input;
mod output;
pub use input::*;
pub use output::*;

View file

@ -0,0 +1,106 @@
use crate::prelude::*;
use futures::stream::iter;
use nu_protocol::{ReturnSuccess, ReturnValue, Value};
pub struct OutputStream {
pub(crate) values: BoxStream<'static, ReturnValue>,
}
impl OutputStream {
pub fn new(values: impl Stream<Item = ReturnValue> + Send + 'static) -> OutputStream {
OutputStream {
values: values.boxed(),
}
}
pub fn empty() -> OutputStream {
let v: VecDeque<ReturnValue> = VecDeque::new();
v.into()
}
pub fn one(item: impl Into<ReturnValue>) -> OutputStream {
let mut v: VecDeque<ReturnValue> = VecDeque::new();
v.push_back(item.into());
v.into()
}
pub fn from_input(input: impl Stream<Item = Value> + Send + 'static) -> OutputStream {
OutputStream {
values: input.map(ReturnSuccess::value).boxed(),
}
}
pub fn drain_vec(&mut self) -> impl Future<Output = Vec<ReturnValue>> {
let mut values: BoxStream<'static, ReturnValue> = iter(VecDeque::new()).boxed();
std::mem::swap(&mut values, &mut self.values);
values.collect()
}
}
impl Stream for OutputStream {
type Item = ReturnValue;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> core::task::Poll<Option<Self::Item>> {
Stream::poll_next(std::pin::Pin::new(&mut self.values), cx)
}
}
impl From<InputStream> for OutputStream {
fn from(input: InputStream) -> OutputStream {
OutputStream {
values: input.values.map(ReturnSuccess::value).boxed(),
}
}
}
impl From<BoxStream<'static, Value>> for OutputStream {
fn from(input: BoxStream<'static, Value>) -> OutputStream {
OutputStream {
values: input.map(ReturnSuccess::value).boxed(),
}
}
}
impl From<BoxStream<'static, ReturnValue>> for OutputStream {
fn from(input: BoxStream<'static, ReturnValue>) -> OutputStream {
OutputStream { values: input }
}
}
impl From<VecDeque<ReturnValue>> for OutputStream {
fn from(input: VecDeque<ReturnValue>) -> OutputStream {
OutputStream {
values: futures::stream::iter(input).boxed(),
}
}
}
impl From<VecDeque<Value>> for OutputStream {
fn from(input: VecDeque<Value>) -> OutputStream {
let stream = input.into_iter().map(ReturnSuccess::value);
OutputStream {
values: futures::stream::iter(stream).boxed(),
}
}
}
impl From<Vec<ReturnValue>> for OutputStream {
fn from(input: Vec<ReturnValue>) -> OutputStream {
OutputStream {
values: futures::stream::iter(input).boxed(),
}
}
}
impl From<Vec<Value>> for OutputStream {
fn from(input: Vec<Value>) -> OutputStream {
let stream = input.into_iter().map(ReturnSuccess::value);
OutputStream {
values: futures::stream::iter(stream).boxed(),
}
}
}