Removing async_stream! from some more commands (#1973)

* Removing async_stream! from some more commands

* Fix await error

* Fix Clippy warnings
This commit is contained in:
Joseph T. Lyons 2020-06-13 04:03:13 -04:00 committed by GitHub
parent c959dc1ee3
commit e24e0242d1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 114 additions and 130 deletions

View file

@ -43,7 +43,7 @@ impl WholeStreamCommand for Move {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
mv(args, registry) mv(args, registry).await
} }
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {
@ -67,20 +67,13 @@ impl WholeStreamCommand for Move {
} }
} }
fn mv(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { async fn mv(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone(); let registry = registry.clone();
let stream = async_stream! {
let name = args.call_info.name_tag.clone(); let name = args.call_info.name_tag.clone();
let shell_manager = args.shell_manager.clone(); let shell_manager = args.shell_manager.clone();
let (args, _) = args.process(&registry).await?; let (args, _) = args.process(&registry).await?;
let mut result = shell_manager.mv(args, name)?;
while let Some(item) = result.next().await { shell_manager.mv(args, name)
yield item;
}
};
Ok(stream.to_output_stream())
} }
#[cfg(test)] #[cfg(test)]

View file

@ -31,7 +31,7 @@ impl WholeStreamCommand for Reject {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
reject(args, registry) reject(args, registry).await
} }
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {
@ -43,28 +43,24 @@ impl WholeStreamCommand for Reject {
} }
} }
fn reject(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { async fn reject(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone(); let registry = registry.clone();
let stream = async_stream! {
let name = args.call_info.name_tag.clone(); let name = args.call_info.name_tag.clone();
let (RejectArgs { rest: fields }, mut input) = args.process(&registry).await?; let (RejectArgs { rest: fields }, input) = args.process(&registry).await?;
if fields.is_empty() { if fields.is_empty() {
yield Err(ShellError::labeled_error( return Err(ShellError::labeled_error(
"Reject requires fields", "Reject requires fields",
"needs parameter", "needs parameter",
name, name,
)); ));
return;
} }
let fields: Vec<_> = fields.iter().map(|f| f.item.clone()).collect(); let fields: Vec<_> = fields.iter().map(|f| f.item.clone()).collect();
while let Some(item) = input.next().await { Ok(input
yield ReturnSuccess::value(reject_fields(&item, &fields, &item.tag)); .map(move |item| ReturnSuccess::value(reject_fields(&item, &fields, &item.tag)))
} .to_output_stream())
};
Ok(stream.to_output_stream())
} }
#[cfg(test)] #[cfg(test)]

View file

@ -154,11 +154,14 @@ impl WholeStreamCommand for Save {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
save(args, registry) save(args, registry).await
} }
} }
fn save(raw_args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { async fn save(
raw_args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let mut full_path = PathBuf::from(raw_args.shell_manager.path()); let mut full_path = PathBuf::from(raw_args.shell_manager.path());
let name_tag = raw_args.call_info.name_tag.clone(); let name_tag = raw_args.call_info.name_tag.clone();
let name = raw_args.call_info.name_tag.clone(); let name = raw_args.call_info.name_tag.clone();
@ -169,51 +172,45 @@ fn save(raw_args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStrea
let current_errors = raw_args.current_errors.clone(); let current_errors = raw_args.current_errors.clone();
let shell_manager = raw_args.shell_manager.clone(); let shell_manager = raw_args.shell_manager.clone();
let stream = async_stream! {
let head = raw_args.call_info.args.head.clone(); let head = raw_args.call_info.args.head.clone();
let (SaveArgs { path, raw: save_raw }, mut input) = raw_args.process(&registry).await?; let (
SaveArgs {
path,
raw: save_raw,
},
input,
) = raw_args.process(&registry).await?;
let input: Vec<Value> = input.collect().await; let input: Vec<Value> = input.collect().await;
if path.is_none() { if path.is_none() {
let mut should_return_file_path_error = true;
// If there is no filename, check the metadata for the anchor filename // If there is no filename, check the metadata for the anchor filename
if input.len() > 0 { if !input.is_empty() {
let anchor = input[0].tag.anchor(); let anchor = input[0].tag.anchor();
match anchor {
Some(path) => match path { if let Some(path) = anchor {
AnchorLocation::File(file) => { if let AnchorLocation::File(file) = path {
should_return_file_path_error = false;
full_path.push(Path::new(&file)); full_path.push(Path::new(&file));
} }
_ => { }
yield Err(ShellError::labeled_error( }
if should_return_file_path_error {
return Err(ShellError::labeled_error(
"Save requires a filepath", "Save requires a filepath",
"needs path", "needs path",
name_tag.clone(), name_tag.clone(),
)); ));
} }
}, } else if let Some(file) = path {
None => {
yield Err(ShellError::labeled_error(
"Save requires a filepath",
"needs path",
name_tag.clone(),
));
}
}
} else {
yield Err(ShellError::labeled_error(
"Save requires a filepath",
"needs path",
name_tag.clone(),
));
}
} else {
if let Some(file) = path {
full_path.push(file.item()); full_path.push(file.item());
} }
}
// TODO use label_break_value once it is stable: // TODO use label_break_value once it is stable:
// https://github.com/rust-lang/rust/issues/48594 // https://github.com/rust-lang/rust/issues/48594
let content : Result<Vec<u8>, ShellError> = 'scope: loop { #[allow(clippy::never_loop)]
let content: Result<Vec<u8>, ShellError> = 'scope: loop {
break if !save_raw { break if !save_raw {
if let Some(extension) = full_path.extension() { if let Some(extension) = full_path.extension() {
let command_name = format!("to {}", extension.to_string_lossy()); let command_name = format!("to {}", extension.to_string_lossy());
@ -233,10 +230,11 @@ fn save(raw_args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStrea
}, },
name_tag: name_tag.clone(), name_tag: name_tag.clone(),
scope, scope,
} },
}; };
let mut result = converter.run(new_args.with_input(input), &registry).await; let mut result = converter.run(new_args.with_input(input), &registry).await;
let result_vec: Vec<Result<ReturnSuccess, ShellError>> = result.drain_vec().await; let result_vec: Vec<Result<ReturnSuccess, ShellError>> =
result.drain_vec().await;
if converter.is_binary() { if converter.is_binary() {
process_binary_return_success!('scope, result_vec, name_tag) process_binary_return_success!('scope, result_vec, name_tag)
} else { } else {
@ -255,15 +253,15 @@ fn save(raw_args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStrea
match content { match content {
Ok(save_data) => match std::fs::write(full_path, save_data) { Ok(save_data) => match std::fs::write(full_path, save_data) {
Ok(o) => o, Ok(_) => Ok(OutputStream::empty()),
Err(e) => yield Err(ShellError::labeled_error(e.to_string(), "IO error while saving", name)), Err(e) => Err(ShellError::labeled_error(
e.to_string(),
"IO error while saving",
name,
)),
}, },
Err(e) => yield Err(e), Err(e) => Err(e),
} }
};
Ok(OutputStream::new(stream))
} }
fn string_from(input: &[Value]) -> String { fn string_from(input: &[Value]) -> String {

View file

@ -33,7 +33,7 @@ impl WholeStreamCommand for Touch {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
touch(args, registry) touch(args, registry).await
} }
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {
@ -45,21 +45,18 @@ impl WholeStreamCommand for Touch {
} }
} }
fn touch(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { async fn touch(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone(); let registry = registry.clone();
let stream = async_stream! {
let (TouchArgs { target }, _) = args.process(&registry).await?; let (TouchArgs { target }, _) = args.process(&registry).await?;
match OpenOptions::new().write(true).create(true).open(&target) { match OpenOptions::new().write(true).create(true).open(&target) {
Ok(_) => {}, Ok(_) => Ok(OutputStream::empty()),
Err(err) => yield Err(ShellError::labeled_error( Err(err) => Err(ShellError::labeled_error(
"File Error", "File Error",
err.to_string(), err.to_string(),
&target.tag, &target.tag,
)), )),
} }
};
Ok(stream.to_output_stream())
} }
#[cfg(test)] #[cfg(test)]