Last batch of removing async_stream (#1983)

This commit is contained in:
Jonathan Turner 2020-06-14 14:00:42 -07:00 committed by GitHub
parent bd7ac0d48e
commit ee835f75db
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 494 additions and 420 deletions

22
Cargo.lock generated
View file

@ -128,27 +128,6 @@ dependencies = [
"slab", "slab",
] ]
[[package]]
name = "async-stream"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22068c0c19514942eefcfd4daf8976ef1aad84e61539f95cd200c35202f80af5"
dependencies = [
"async-stream-impl",
"futures-core",
]
[[package]]
name = "async-stream-impl"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25f9db3b38af870bf7e5cc649167533b493928e50744e2c30ae350230b414670"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "async-task" name = "async-task"
version = "1.3.1" version = "1.3.1"
@ -2227,7 +2206,6 @@ dependencies = [
"ansi_term 0.12.1", "ansi_term 0.12.1",
"app_dirs", "app_dirs",
"async-recursion", "async-recursion",
"async-stream",
"async-trait", "async-trait",
"base64 0.12.1", "base64 0.12.1",
"bigdecimal", "bigdecimal",

View file

@ -23,7 +23,6 @@ app_dirs = "1.2.1"
async-recursion = "0.3.1" async-recursion = "0.3.1"
async-trait = "0.1.31" async-trait = "0.1.31"
directories = "2.0.2" directories = "2.0.2"
async-stream = "0.2"
base64 = "0.12.1" base64 = "0.12.1"
bigdecimal = { version = "0.1.2", features = ["serde"] } bigdecimal = { version = "0.1.2", features = ["serde"] }
bson = { version = "0.14.1", features = ["decimal128"] } bson = { version = "0.14.1", features = ["decimal128"] }

View file

@ -28,7 +28,7 @@ pub(crate) async fn run_internal_command(
let objects: InputStream = trace_stream!(target: "nu::trace_stream::internal", "input" = input); let objects: InputStream = trace_stream!(target: "nu::trace_stream::internal", "input" = input);
let internal_command = context.expect_command(&command.name); let internal_command = context.expect_command(&command.name);
let mut result = { let result = {
context context
.run_command( .run_command(
internal_command?, internal_command?,
@ -40,29 +40,38 @@ pub(crate) async fn run_internal_command(
.await .await
}; };
let mut context = context.clone(); let head = Arc::new(command.args.head.clone());
//let context = Arc::new(context.clone());
let context = context.clone();
let command = Arc::new(command);
let scope = Arc::new(scope);
// let scope = scope.clone(); // let scope = scope.clone();
let stream = async_stream! { Ok(InputStream::from_stream(
let mut soft_errs: Vec<ShellError> = vec![]; result
let mut yielded = false; .then(move |item| {
let head = head.clone();
while let Some(item) = result.next().await { let command = command.clone();
let mut context = context.clone();
let scope = scope.clone();
async move {
match item { match item {
Ok(ReturnSuccess::Action(action)) => match action { Ok(ReturnSuccess::Action(action)) => match action {
CommandAction::ChangePath(path) => { CommandAction::ChangePath(path) => {
context.shell_manager.set_path(path); context.shell_manager.set_path(path);
InputStream::from_stream(futures::stream::iter(vec![]))
} }
CommandAction::Exit => std::process::exit(0), // TODO: save history.txt CommandAction::Exit => std::process::exit(0), // TODO: save history.txt
CommandAction::Error(err) => { CommandAction::Error(err) => {
context.error(err); context.error(err.clone());
break; InputStream::one(UntaggedValue::Error(err).into_untagged_value())
} }
CommandAction::AutoConvert(tagged_contents, extension) => { CommandAction::AutoConvert(tagged_contents, extension) => {
let contents_tag = tagged_contents.tag.clone(); let contents_tag = tagged_contents.tag.clone();
let command_name = format!("from {}", extension); let command_name = format!("from {}", extension);
let command = command.clone(); let command = command.clone();
if let Some(converter) = context.registry.get_command(&command_name) { if let Some(converter) = context.registry.get_command(&command_name)
{
let new_args = RawCommandArgs { let new_args = RawCommandArgs {
host: context.host.clone(), host: context.host.clone(),
ctrl_c: context.ctrl_c.clone(), ctrl_c: context.ctrl_c.clone(),
@ -70,106 +79,142 @@ pub(crate) async fn run_internal_command(
shell_manager: context.shell_manager.clone(), shell_manager: context.shell_manager.clone(),
call_info: UnevaluatedCallInfo { call_info: UnevaluatedCallInfo {
args: nu_protocol::hir::Call { args: nu_protocol::hir::Call {
head: command.args.head, head: (&*head).clone(),
positional: None, positional: None,
named: None, named: None,
span: Span::unknown(), span: Span::unknown(),
is_last: false, is_last: false,
}, },
name_tag: Tag::unknown_anchor(command.name_span), name_tag: Tag::unknown_anchor(command.name_span),
scope: scope.clone(), scope: (&*scope).clone(),
} },
}; };
let mut result = converter.run(new_args.with_input(vec![tagged_contents]), &context.registry).await; let mut result = converter
let result_vec: Vec<Result<ReturnSuccess, ShellError>> = result.drain_vec().await; .run(
new_args.with_input(vec![tagged_contents]),
&context.registry,
)
.await;
let result_vec: Vec<Result<ReturnSuccess, ShellError>> =
result.drain_vec().await;
let mut output = vec![];
for res in result_vec { for res in result_vec {
match res { match res {
Ok(ReturnSuccess::Value(Value { value: UntaggedValue::Table(list), ..})) => { Ok(ReturnSuccess::Value(Value {
value: UntaggedValue::Table(list),
..
})) => {
for l in list { for l in list {
yield Ok(l); output.push(Ok(l));
} }
} }
Ok(ReturnSuccess::Value(Value { value, .. })) => { Ok(ReturnSuccess::Value(Value { value, .. })) => {
yield Ok(value.into_value(contents_tag.clone())); output.push(Ok(
value.into_value(contents_tag.clone())
));
} }
Err(e) => yield Err(e), Err(e) => output.push(Err(e)),
_ => {} _ => {}
} }
} }
futures::stream::iter(output).to_input_stream()
} else { } else {
yield Ok(tagged_contents) InputStream::one(tagged_contents)
} }
} }
CommandAction::EnterHelpShell(value) => { CommandAction::EnterHelpShell(value) => match value {
match value {
Value { Value {
value: UntaggedValue::Primitive(Primitive::String(cmd)), value: UntaggedValue::Primitive(Primitive::String(cmd)),
tag, tag,
} => { } => {
context.shell_manager.insert_at_current(Box::new( context.shell_manager.insert_at_current(Box::new(
HelpShell::for_command( match HelpShell::for_command(
UntaggedValue::string(cmd).into_value(tag), UntaggedValue::string(cmd).into_value(tag),
&context.registry(), &context.registry(),
)?, ) {
Ok(v) => v,
Err(err) => {
return InputStream::one(
UntaggedValue::Error(err).into_untagged_value(),
)
}
},
)); ));
InputStream::from_stream(futures::stream::iter(vec![]))
} }
_ => { _ => {
context.shell_manager.insert_at_current(Box::new( context.shell_manager.insert_at_current(Box::new(
HelpShell::index(&context.registry())?, match HelpShell::index(&context.registry()) {
Ok(v) => v,
Err(err) => {
return InputStream::one(
UntaggedValue::Error(err).into_untagged_value(),
)
}
},
)); ));
InputStream::from_stream(futures::stream::iter(vec![]))
} }
} },
}
CommandAction::EnterValueShell(value) => { CommandAction::EnterValueShell(value) => {
context context
.shell_manager .shell_manager
.insert_at_current(Box::new(ValueShell::new(value))); .insert_at_current(Box::new(ValueShell::new(value)));
InputStream::from_stream(futures::stream::iter(vec![]))
} }
CommandAction::EnterShell(location) => { CommandAction::EnterShell(location) => {
context.shell_manager.insert_at_current(Box::new( context.shell_manager.insert_at_current(Box::new(
FilesystemShell::with_location(location, context.registry().clone())?, match FilesystemShell::with_location(
location,
context.registry().clone(),
) {
Ok(v) => v,
Err(err) => {
return InputStream::one(
UntaggedValue::Error(err.into())
.into_untagged_value(),
)
}
},
)); ));
InputStream::from_stream(futures::stream::iter(vec![]))
} }
CommandAction::AddAlias(name, args, block) => { CommandAction::AddAlias(name, args, block) => {
context.add_commands(vec![ context.add_commands(vec![whole_stream_command(
whole_stream_command(AliasCommand::new( AliasCommand::new(name, args, block),
name, )]);
args, InputStream::from_stream(futures::stream::iter(vec![]))
block,
))
]);
} }
CommandAction::PreviousShell => { CommandAction::PreviousShell => {
context.shell_manager.prev(); context.shell_manager.prev();
InputStream::from_stream(futures::stream::iter(vec![]))
} }
CommandAction::NextShell => { CommandAction::NextShell => {
context.shell_manager.next(); context.shell_manager.next();
InputStream::from_stream(futures::stream::iter(vec![]))
} }
CommandAction::LeaveShell => { CommandAction::LeaveShell => {
context.shell_manager.remove_at_current(); context.shell_manager.remove_at_current();
if context.shell_manager.is_empty() { if context.shell_manager.is_empty() {
std::process::exit(0); // TODO: save history.txt std::process::exit(0); // TODO: save history.txt
} }
InputStream::from_stream(futures::stream::iter(vec![]))
} }
}, },
Ok(ReturnSuccess::Value(Value { Ok(ReturnSuccess::Value(Value {
value: UntaggedValue::Error(err), value: UntaggedValue::Error(err),
.. tag,
})) => { })) => {
context.error(err.clone()); context.error(err.clone());
yield Err(err); InputStream::one(UntaggedValue::Error(err).into_value(tag))
break;
} }
Ok(ReturnSuccess::Value(v)) => { Ok(ReturnSuccess::Value(v)) => InputStream::one(v),
yielded = true;
yield Ok(v);
}
Ok(ReturnSuccess::DebugValue(v)) => { Ok(ReturnSuccess::DebugValue(v)) => {
yielded = true;
let doc = PrettyDebug::pretty_doc(&v); let doc = PrettyDebug::pretty_doc(&v);
let mut buffer = termcolor::Buffer::ansi(); let mut buffer = termcolor::Buffer::ansi();
@ -180,16 +225,17 @@ pub(crate) async fn run_internal_command(
let value = String::from_utf8_lossy(buffer.as_slice()); let value = String::from_utf8_lossy(buffer.as_slice());
yield Ok(UntaggedValue::string(value).into_untagged_value()) InputStream::one(UntaggedValue::string(value).into_untagged_value())
} }
Err(err) => { Err(err) => {
context.error(err); context.error(err.clone());
break; InputStream::one(UntaggedValue::Error(err).into_untagged_value())
} }
} }
} }
}; })
.flatten()
Ok(stream.to_input_stream()) .take_while(|x| futures::future::ready(!x.is_error())),
))
} }

View file

@ -24,7 +24,7 @@ impl WholeStreamCommand for Lines {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
lines(args, registry) lines(args, registry).await
} }
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {
@ -46,82 +46,121 @@ fn ends_with_line_ending(st: &str) -> bool {
} }
} }
fn lines(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { async fn lines(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let mut leftover = vec![]; let leftover = Arc::new(vec![]);
let mut leftover_string = String::new(); let leftover_string = Arc::new(String::new());
let registry = registry.clone(); let registry = registry.clone();
let stream = async_stream! { let args = args.evaluate_once(&registry).await?;
let args = args.evaluate_once(&registry).await.unwrap();
let tag = args.name_tag(); let tag = args.name_tag();
let name_span = tag.span; let name_span = tag.span;
let mut input = args.input;
loop { let eos = futures::stream::iter(vec![
match input.next().await { UntaggedValue::Primitive(Primitive::EndOfStream).into_untagged_value()
Some(Value { value: UntaggedValue::Primitive(Primitive::String(st)), ..}) => { ]);
let mut st = leftover_string.clone() + &st;
Ok(args
.input
.chain(eos)
.map(move |item| {
let mut leftover = leftover.clone();
let mut leftover_string = leftover_string.clone();
match item {
Value {
value: UntaggedValue::Primitive(Primitive::String(st)),
..
} => {
let st = (&*leftover_string).clone() + &st;
if let Some(leftover) = Arc::get_mut(&mut leftover) {
leftover.clear(); leftover.clear();
}
let mut lines: Vec<String> = st.lines().map(|x| x.to_string()).collect(); let mut lines: Vec<String> = st.lines().map(|x| x.to_string()).collect();
if !ends_with_line_ending(&st) { if !ends_with_line_ending(&st) {
if let Some(last) = lines.pop() { if let Some(last) = lines.pop() {
leftover_string = last; if let Some(leftover_string) = Arc::get_mut(&mut leftover_string) {
} else { leftover_string.clear();
leftover_string.push_str(&last);
}
} else if let Some(leftover_string) = Arc::get_mut(&mut leftover_string) {
leftover_string.clear(); leftover_string.clear();
} }
} else { } else if let Some(leftover_string) = Arc::get_mut(&mut leftover_string) {
leftover_string.clear(); leftover_string.clear();
} }
let success_lines: Vec<_> = lines.iter().map(|x| ReturnSuccess::value(UntaggedValue::line(x).into_untagged_value())).collect(); let success_lines: Vec<_> = lines
.iter()
.map(|x| ReturnSuccess::value(UntaggedValue::line(x).into_untagged_value()))
.collect();
yield futures::stream::iter(success_lines) futures::stream::iter(success_lines)
} }
Some(Value { value: UntaggedValue::Primitive(Primitive::Line(st)), ..}) => { Value {
let mut st = leftover_string.clone() + &st; value: UntaggedValue::Primitive(Primitive::Line(st)),
..
} => {
let st = (&*leftover_string).clone() + &st;
if let Some(leftover) = Arc::get_mut(&mut leftover) {
leftover.clear(); leftover.clear();
}
let mut lines: Vec<String> = st.lines().map(|x| x.to_string()).collect(); let mut lines: Vec<String> = st.lines().map(|x| x.to_string()).collect();
if !ends_with_line_ending(&st) { if !ends_with_line_ending(&st) {
if let Some(last) = lines.pop() { if let Some(last) = lines.pop() {
leftover_string = last; if let Some(leftover_string) = Arc::get_mut(&mut leftover_string) {
} else { leftover_string.clear();
leftover_string.push_str(&last);
}
} else if let Some(leftover_string) = Arc::get_mut(&mut leftover_string) {
leftover_string.clear(); leftover_string.clear();
} }
} else { } else if let Some(leftover_string) = Arc::get_mut(&mut leftover_string) {
leftover_string.clear(); leftover_string.clear();
} }
let success_lines: Vec<_> = lines.iter().map(|x| ReturnSuccess::value(UntaggedValue::line(x).into_untagged_value())).collect(); let success_lines: Vec<_> = lines
yield futures::stream::iter(success_lines) .iter()
.map(|x| ReturnSuccess::value(UntaggedValue::line(x).into_untagged_value()))
.collect();
futures::stream::iter(success_lines)
} }
Some( Value { tag: value_span, ..}) => { Value {
yield futures::stream::iter(vec![Err(ShellError::labeled_error_with_secondary( value: UntaggedValue::Primitive(Primitive::EndOfStream),
..
} => {
if !leftover.is_empty() {
let mut st = (&*leftover_string).clone();
if let Ok(extra) = String::from_utf8((&*leftover).clone()) {
st.push_str(&extra);
}
// futures::stream::iter(vec![ReturnSuccess::value(
// UntaggedValue::string(st).into_untagged_value(),
// )])
if !st.is_empty() {
futures::stream::iter(vec![ReturnSuccess::value(
UntaggedValue::string(&*leftover_string).into_untagged_value(),
)])
} else {
futures::stream::iter(vec![])
}
} else {
futures::stream::iter(vec![])
}
}
Value {
tag: value_span, ..
} => futures::stream::iter(vec![Err(ShellError::labeled_error_with_secondary(
"Expected a string from pipeline", "Expected a string from pipeline",
"requires string input", "requires string input",
name_span, name_span,
"value originates from here", "value originates from here",
value_span, value_span,
))]); ))]),
} }
None => { })
if !leftover.is_empty() { .flatten()
let mut st = leftover_string.clone(); .to_output_stream())
if let Ok(extra) = String::from_utf8(leftover) {
st.push_str(&extra);
}
yield futures::stream::iter(vec![ReturnSuccess::value(UntaggedValue::string(st).into_untagged_value())])
}
break;
}
}
}
if !leftover_string.is_empty() {
yield futures::stream::iter(vec![ReturnSuccess::value(UntaggedValue::string(leftover_string).into_untagged_value())]);
}
}
.flatten();
Ok(stream.to_output_stream())
} }
#[cfg(test)] #[cfg(test)]

View file

@ -3,7 +3,7 @@ use crate::prelude::*;
use derive_new::new; use derive_new::new;
use log::trace; use log::trace;
use nu_errors::ShellError; use nu_errors::ShellError;
use nu_protocol::{ReturnValue, Signature, Value}; use nu_protocol::{Primitive, ReturnValue, Signature, UntaggedValue, Value};
use serde::{self, Deserialize, Serialize}; use serde::{self, Deserialize, Serialize};
use std::io::prelude::*; use std::io::prelude::*;
use std::io::BufReader; use std::io::BufReader;
@ -61,11 +61,11 @@ impl WholeStreamCommand for PluginCommand {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
filter_plugin(self.path.clone(), args, registry) filter_plugin(self.path.clone(), args, registry).await
} }
} }
pub fn filter_plugin( pub async fn filter_plugin(
path: String, path: String,
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
@ -75,8 +75,14 @@ pub fn filter_plugin(
let scope = args.call_info.scope.clone(); let scope = args.call_info.scope.clone();
let stream = async_stream! { let bos = futures::stream::iter(vec![
let mut args = args.evaluate_once_with_scope(&registry, &scope).await?; UntaggedValue::Primitive(Primitive::BeginningOfStream).into_untagged_value()
]);
let eos = futures::stream::iter(vec![
UntaggedValue::Primitive(Primitive::EndOfStream).into_untagged_value()
]);
let args = args.evaluate_once_with_scope(&registry, &scope).await?;
let mut child = std::process::Command::new(path) let mut child = std::process::Command::new(path)
.stdin(std::process::Stdio::piped()) .stdin(std::process::Stdio::piped())
@ -88,8 +94,16 @@ pub fn filter_plugin(
trace!("filtering :: {:?}", call_info); trace!("filtering :: {:?}", call_info);
Ok(bos
.chain(args.input)
.chain(eos)
.map(move |item| {
match item {
Value {
value: UntaggedValue::Primitive(Primitive::BeginningOfStream),
..
} => {
// Beginning of the stream // Beginning of the stream
{
let stdin = child.stdin.as_mut().expect("Failed to open stdin"); let stdin = child.stdin.as_mut().expect("Failed to open stdin");
let stdout = child.stdout.as_mut().expect("Failed to open stdout"); let stdout = child.stdout.as_mut().expect("Failed to open stdout");
@ -100,18 +114,22 @@ pub fn filter_plugin(
match request_raw { match request_raw {
Err(_) => { Err(_) => {
yield Err(ShellError::labeled_error( return OutputStream::one(Err(ShellError::labeled_error(
"Could not load json from plugin", "Could not load json from plugin",
"could not load json from plugin", "could not load json from plugin",
&call_info.name_tag, &call_info.name_tag,
)); )));
} }
Ok(request_raw) => match stdin.write(format!("{}\n", request_raw).as_bytes()) { Ok(request_raw) => {
match stdin.write(format!("{}\n", request_raw).as_bytes()) {
Ok(_) => {} Ok(_) => {}
Err(err) => { Err(err) => {
yield Err(ShellError::unexpected(format!("{}", err))); return OutputStream::one(Err(ShellError::unexpected(
format!("{}", err),
)));
}
}
} }
},
} }
let mut input = String::new(); let mut input = String::new();
@ -120,82 +138,29 @@ pub fn filter_plugin(
let response = serde_json::from_str::<NuResult>(&input); let response = serde_json::from_str::<NuResult>(&input);
match response { match response {
Ok(NuResult::response { params }) => match params { Ok(NuResult::response { params }) => match params {
Ok(params) => for param in params { yield param }, Ok(params) => futures::stream::iter(params).to_output_stream(),
Err(e) => { Err(e) => futures::stream::iter(vec![ReturnValue::Err(e)])
yield ReturnValue::Err(e); .to_output_stream(),
}
}, },
Err(e) => {
yield Err(ShellError::untagged_runtime_error(format!( Err(e) => OutputStream::one(Err(
ShellError::untagged_runtime_error(format!(
"Error while processing begin_filter response: {:?} {}", "Error while processing begin_filter response: {:?} {}",
e, input e, input
))); )),
)),
} }
} }
} Err(e) => OutputStream::one(Err(ShellError::untagged_runtime_error(
Err(e) => { format!("Error while reading begin_filter response: {:?}", e),
yield Err(ShellError::untagged_runtime_error(format!( ))),
"Error while reading begin_filter response: {:?}",
e
)));
} }
} }
} Value {
value: UntaggedValue::Primitive(Primitive::EndOfStream),
// Stream contents ..
{ } => {
while let Some(v) = args.input.next().await {
let stdin = child.stdin.as_mut().expect("Failed to open stdin");
let stdout = child.stdout.as_mut().expect("Failed to open stdout");
let mut reader = BufReader::new(stdout);
let request = JsonRpc::new("filter", v);
let request_raw = serde_json::to_string(&request);
match request_raw {
Ok(request_raw) => {
let _ = stdin.write(format!("{}\n", request_raw).as_bytes()); // TODO: Handle error
}
Err(e) => {
yield Err(ShellError::untagged_runtime_error(format!(
"Error while processing filter response: {:?}",
e
)));
}
}
let mut input = String::new();
match reader.read_line(&mut input) {
Ok(_) => {
let response = serde_json::from_str::<NuResult>(&input);
match response {
Ok(NuResult::response { params }) => match params {
Ok(params) => for param in params { yield param },
Err(e) => {
yield ReturnValue::Err(e);
}
},
Err(e) => {
yield Err(ShellError::untagged_runtime_error(format!(
"Error while processing filter response: {:?}\n== input ==\n{}",
e, input
)));
}
}
}
Err(e) => {
yield Err(ShellError::untagged_runtime_error(format!(
"Error while reading filter response: {:?}",
e
)));
}
}
}
}
// post stream contents // post stream contents
{
let stdin = child.stdin.as_mut().expect("Failed to open stdin"); let stdin = child.stdin.as_mut().expect("Failed to open stdin");
let stdout = child.stdout.as_mut().expect("Failed to open stdout"); let stdout = child.stdout.as_mut().expect("Failed to open stdout");
@ -206,18 +171,91 @@ pub fn filter_plugin(
match request_raw { match request_raw {
Err(_) => { Err(_) => {
yield Err(ShellError::labeled_error( return OutputStream::one(Err(ShellError::labeled_error(
"Could not load json from plugin", "Could not load json from plugin",
"could not load json from plugin", "could not load json from plugin",
&call_info.name_tag, &call_info.name_tag,
)); )));
} }
Ok(request_raw) => match stdin.write(format!("{}\n", request_raw).as_bytes()) { Ok(request_raw) => {
match stdin.write(format!("{}\n", request_raw).as_bytes()) {
Ok(_) => {} Ok(_) => {}
Err(err) => { Err(err) => {
yield Err(ShellError::unexpected(format!("{}", err))); return OutputStream::one(Err(ShellError::unexpected(
format!("{}", err),
)));
} }
}
}
}
let mut input = String::new();
let stream = match reader.read_line(&mut input) {
Ok(_) => {
let response = serde_json::from_str::<NuResult>(&input);
match response {
Ok(NuResult::response { params }) => match params {
Ok(params) => futures::stream::iter(params).to_output_stream(),
Err(e) => futures::stream::iter(vec![ReturnValue::Err(e)])
.to_output_stream(),
}, },
Err(e) => futures::stream::iter(vec![Err(
ShellError::untagged_runtime_error(format!(
"Error while processing end_filter response: {:?} {}",
e, input
)),
)])
.to_output_stream(),
}
}
Err(e) => {
futures::stream::iter(vec![Err(ShellError::untagged_runtime_error(
format!("Error while reading end_filter response: {:?}", e),
))])
.to_output_stream()
}
};
let stdin = child.stdin.as_mut().expect("Failed to open stdin");
let request: JsonRpc<std::vec::Vec<Value>> = JsonRpc::new("quit", vec![]);
let request_raw = serde_json::to_string(&request);
match request_raw {
Ok(request_raw) => {
let _ = stdin.write(format!("{}\n", request_raw).as_bytes());
// TODO: Handle error
}
Err(e) => {
return OutputStream::one(Err(ShellError::untagged_runtime_error(
format!("Error while processing quit response: {:?}", e),
)));
}
}
let _ = child.wait();
stream
}
v => {
// Stream contents
let stdin = child.stdin.as_mut().expect("Failed to open stdin");
let stdout = child.stdout.as_mut().expect("Failed to open stdout");
let mut reader = BufReader::new(stdout);
let request = JsonRpc::new("filter", v);
let request_raw = serde_json::to_string(&request);
match request_raw {
Ok(request_raw) => {
let _ = stdin.write(format!("{}\n", request_raw).as_bytes());
// TODO: Handle error
}
Err(e) => {
return OutputStream::one(Err(ShellError::untagged_runtime_error(
format!("Error while processing filter response: {:?}", e),
)));
}
} }
let mut input = String::new(); let mut input = String::new();
@ -226,56 +264,27 @@ pub fn filter_plugin(
let response = serde_json::from_str::<NuResult>(&input); let response = serde_json::from_str::<NuResult>(&input);
match response { match response {
Ok(NuResult::response { params }) => match params { Ok(NuResult::response { params }) => match params {
Ok(params) => for param in params { yield param }, Ok(params) => futures::stream::iter(params).to_output_stream(),
Err(e) => { Err(e) => futures::stream::iter(vec![ReturnValue::Err(e)])
yield ReturnValue::Err(e); .to_output_stream(),
}
}, },
Err(e) => { Err(e) => OutputStream::one(Err(
yield Err(ShellError::untagged_runtime_error(format!( ShellError::untagged_runtime_error(format!(
"Error while processing end_filter response: {:?} {}", "Error while processing filter response: {:?}\n== input ==\n{}",
e, input e, input
))); )),
)),
}
}
Err(e) => OutputStream::one(Err(ShellError::untagged_runtime_error(
format!("Error while reading filter response: {:?}", e),
))),
} }
} }
} }
Err(e) => { })
yield Err(ShellError::untagged_runtime_error(format!( .flatten()
"Error while reading end_filter response: {:?}", .to_output_stream())
e
)));
}
}
}
// End of the stream
{
let stdin = child.stdin.as_mut().expect("Failed to open stdin");
let stdout = child.stdout.as_mut().expect("Failed to open stdout");
let mut reader = BufReader::new(stdout);
let request: JsonRpc<std::vec::Vec<Value>> = JsonRpc::new("quit", vec![]);
let request_raw = serde_json::to_string(&request);
match request_raw {
Ok(request_raw) => {
let _ = stdin.write(format!("{}\n", request_raw).as_bytes()); // TODO: Handle error
}
Err(e) => {
yield Err(ShellError::untagged_runtime_error(format!(
"Error while processing quit response: {:?}",
e
)));
return;
}
}
}
let _ = child.wait();
};
Ok(stream.to_output_stream())
} }
#[derive(new)] #[derive(new)]

View file

@ -84,7 +84,6 @@ pub(crate) use crate::shell::help_shell::HelpShell;
pub(crate) use crate::shell::shell_manager::ShellManager; pub(crate) use crate::shell::shell_manager::ShellManager;
pub(crate) use crate::shell::value_shell::ValueShell; pub(crate) use crate::shell::value_shell::ValueShell;
pub(crate) use crate::stream::{InputStream, InterruptibleStream, OutputStream}; pub(crate) use crate::stream::{InputStream, InterruptibleStream, OutputStream};
pub(crate) use async_stream::stream as async_stream;
pub(crate) use bigdecimal::BigDecimal; pub(crate) use bigdecimal::BigDecimal;
pub(crate) use futures::stream::BoxStream; pub(crate) use futures::stream::BoxStream;
pub(crate) use futures::{Stream, StreamExt}; pub(crate) use futures::{Stream, StreamExt};

View file

@ -147,22 +147,27 @@ impl Shell for FilesystemShell {
} }
// Generated stream: impl Stream<Item = Result<ReturnSuccess, ShellError> // Generated stream: impl Stream<Item = Result<ReturnSuccess, ShellError>
let stream = async_stream::try_stream! {
for path in paths { Ok(futures::stream::iter(paths.filter_map(move |path| {
let path = path.map_err(|e| ShellError::from(e.into_error()))?; let path = match path.map_err(|e| ShellError::from(e.into_error())) {
Ok(path) => path,
Err(err) => return Some(Err(err)),
};
if !all && is_hidden_dir(&path) { if !all && is_hidden_dir(&path) {
continue; return None;
} }
let metadata = match std::fs::symlink_metadata(&path) { let metadata = match std::fs::symlink_metadata(&path) {
Ok(metadata) => Ok(Some(metadata)), Ok(metadata) => Some(metadata),
Err(e) => if let PermissionDenied = e.kind() { Err(e) => {
Ok(None) if e.kind() == std::io::ErrorKind::PermissionDenied {
None
} else { } else {
Err(e) return Some(Err(e.into()));
}, }
}?; }
};
let entry = dir_entry_dict( let entry = dir_entry_dict(
&path, &path,
@ -172,15 +177,14 @@ impl Shell for FilesystemShell {
short_names, short_names,
with_symlink_targets, with_symlink_targets,
du, du,
ctrl_c.clone() ctrl_c.clone(),
) )
.map(|entry| ReturnSuccess::Value(entry.into()))?; .map(ReturnSuccess::Value);
yield entry; Some(entry)
} }))
}; .interruptible(ctrl_c_copy)
.to_output_stream())
Ok(stream.interruptible(ctrl_c_copy).to_output_stream())
} }
fn cd(&self, args: CdArgs, name: Tag) -> Result<OutputStream, ShellError> { fn cd(&self, args: CdArgs, name: Tag) -> Result<OutputStream, ShellError> {