Merge pull request #713 from est31/stable_async

Use async-stream crate to replace most async_stream_block invocations
This commit is contained in:
Jonathan Turner 2019-09-28 06:12:38 +12:00 committed by GitHub
commit 2b89ddfb9e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
32 changed files with 53 additions and 26 deletions

22
Cargo.lock generated
View file

@ -53,6 +53,25 @@ dependencies = [
"nodrop 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)", "nodrop 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]]
name = "async-stream"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"async-stream-impl 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-core-preview 0.3.0-alpha.18 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "async-stream-impl"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"proc-macro2 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
"syn 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]] [[package]]
name = "atty" name = "atty"
version = "0.2.13" version = "0.2.13"
@ -1516,6 +1535,7 @@ version = "0.3.0"
dependencies = [ dependencies = [
"ansi_term 0.12.1 (registry+https://github.com/rust-lang/crates.io-index)", "ansi_term 0.12.1 (registry+https://github.com/rust-lang/crates.io-index)",
"app_dirs 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "app_dirs 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"async-stream 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"base64 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)", "base64 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)",
"battery 0.7.4 (registry+https://github.com/rust-lang/crates.io-index)", "battery 0.7.4 (registry+https://github.com/rust-lang/crates.io-index)",
"bigdecimal 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "bigdecimal 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -3003,6 +3023,8 @@ dependencies = [
"checksum app_dirs 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e73a24bad9bd6a94d6395382a6c69fe071708ae4409f763c5475e14ee896313d" "checksum app_dirs 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e73a24bad9bd6a94d6395382a6c69fe071708ae4409f763c5475e14ee896313d"
"checksum arrayref 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "0d382e583f07208808f6b1249e60848879ba3543f57c32277bf52d69c2f0f0ee" "checksum arrayref 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "0d382e583f07208808f6b1249e60848879ba3543f57c32277bf52d69c2f0f0ee"
"checksum arrayvec 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)" = "b8d73f9beda665eaa98ab9e4f7442bd4e7de6652587de55b2525e52e29c1b0ba" "checksum arrayvec 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)" = "b8d73f9beda665eaa98ab9e4f7442bd4e7de6652587de55b2525e52e29c1b0ba"
"checksum async-stream 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "650be9b667e47506c42ee53034fb1935443cb2447a3a5c0a75e303d2e756fa73"
"checksum async-stream-impl 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4f0d8c5b411e36dcfb04388bacfec54795726b1f0148adcb0f377a96d6747e0e"
"checksum atty 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)" = "1803c647a3ec87095e7ae7acfca019e98de5ec9a7d01343f611cf3152ed71a90" "checksum atty 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)" = "1803c647a3ec87095e7ae7acfca019e98de5ec9a7d01343f611cf3152ed71a90"
"checksum autocfg 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "22130e92352b948e7e82a49cdb0aa94f2211761117f29e052dd397c1ac33542b" "checksum autocfg 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "22130e92352b948e7e82a49cdb0aa94f2211761117f29e052dd397c1ac33542b"
"checksum backtrace 0.3.34 (registry+https://github.com/rust-lang/crates.io-index)" = "b5164d292487f037ece34ec0de2fcede2faa162f085dd96d2385ab81b12765ba" "checksum backtrace 0.3.34 (registry+https://github.com/rust-lang/crates.io-index)" = "b5164d292487f037ece34ec0de2fcede2faa162f085dd96d2385ab81b12765ba"

View file

@ -28,6 +28,7 @@ byte-unit = "3.0.1"
base64 = "0.10.1" base64 = "0.10.1"
futures-preview = { version = "=0.3.0-alpha.18", features = ["compat", "io-compat"] } futures-preview = { version = "=0.3.0-alpha.18", features = ["compat", "io-compat"] }
futures-async-stream = "=0.1.0-alpha.5" futures-async-stream = "=0.1.0-alpha.5"
async-stream = "0.1.1"
futures_codec = "0.2.5" futures_codec = "0.2.5"
num-traits = "0.2.8" num-traits = "0.2.8"
term = "0.5.2" term = "0.5.2"

View file

@ -1,6 +1,7 @@
use crate::commands::{RawCommandArgs, WholeStreamCommand}; use crate::commands::{RawCommandArgs, WholeStreamCommand};
use crate::errors::ShellError; use crate::errors::ShellError;
use crate::prelude::*; use crate::prelude::*;
use futures_async_stream::async_stream_block;
pub struct Autoview; pub struct Autoview;

View file

@ -61,7 +61,7 @@ impl PerItemCommand for Enter {
)))] )))]
.into()) .into())
} else { } else {
let stream = async_stream_block! { let stream = async_stream! {
// If it's a file, attempt to open the file as a value and enter it // If it's a file, attempt to open the file as a value and enter it
let cwd = raw_args.shell_manager.path(); let cwd = raw_args.shell_manager.path();

View file

@ -58,7 +58,7 @@ fn run(
let registry = registry.clone(); let registry = registry.clone();
let raw_args = raw_args.clone(); let raw_args = raw_args.clone();
let stream = async_stream_block! { let stream = async_stream! {
let result = fetch(&path_str, path_span).await; let result = fetch(&path_str, path_span).await;

View file

@ -201,7 +201,7 @@ fn from_bson(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStre
let tag = args.name_tag(); let tag = args.name_tag();
let input = args.input; let input = args.input;
let stream = async_stream_block! { let stream = async_stream! {
let values: Vec<Tagged<Value>> = input.values.collect().await; let values: Vec<Tagged<Value>> = input.values.collect().await;
for value in values { for value in values {

View file

@ -88,7 +88,7 @@ fn from_csv(
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
let name_tag = name; let name_tag = name;
let stream = async_stream_block! { let stream = async_stream! {
let values: Vec<Tagged<Value>> = input.values.collect().await; let values: Vec<Tagged<Value>> = input.values.collect().await;
let mut concat_string = String::new(); let mut concat_string = String::new();

View file

@ -67,7 +67,7 @@ fn from_ini(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStrea
let tag = args.name_tag(); let tag = args.name_tag();
let input = args.input; let input = args.input;
let stream = async_stream_block! { let stream = async_stream! {
let values: Vec<Tagged<Value>> = input.values.collect().await; let values: Vec<Tagged<Value>> = input.values.collect().await;
let mut concat_string = String::new(); let mut concat_string = String::new();

View file

@ -74,7 +74,7 @@ fn from_json(
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
let name_tag = name; let name_tag = name;
let stream = async_stream_block! { let stream = async_stream! {
let values: Vec<Tagged<Value>> = input.values.collect().await; let values: Vec<Tagged<Value>> = input.values.collect().await;
let mut concat_string = String::new(); let mut concat_string = String::new();

View file

@ -131,7 +131,7 @@ fn from_sqlite(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputSt
let tag = args.name_tag(); let tag = args.name_tag();
let input = args.input; let input = args.input;
let stream = async_stream_block! { let stream = async_stream! {
let values: Vec<Tagged<Value>> = input.values.collect().await; let values: Vec<Tagged<Value>> = input.values.collect().await;
for value in values { for value in values {

View file

@ -71,7 +71,7 @@ pub fn from_toml(
let tag = args.name_tag(); let tag = args.name_tag();
let input = args.input; let input = args.input;
let stream = async_stream_block! { let stream = async_stream! {
let values: Vec<Tagged<Value>> = input.values.collect().await; let values: Vec<Tagged<Value>> = input.values.collect().await;
let mut concat_string = String::new(); let mut concat_string = String::new();

View file

@ -89,7 +89,7 @@ fn from_tsv(
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
let name_tag = name; let name_tag = name;
let stream = async_stream_block! { let stream = async_stream! {
let values: Vec<Tagged<Value>> = input.values.collect().await; let values: Vec<Tagged<Value>> = input.values.collect().await;
let mut concat_string = String::new(); let mut concat_string = String::new();

View file

@ -31,7 +31,7 @@ fn from_url(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStrea
let tag = args.name_tag(); let tag = args.name_tag();
let input = args.input; let input = args.input;
let stream = async_stream_block! { let stream = async_stream! {
let values: Vec<Tagged<Value>> = input.values.collect().await; let values: Vec<Tagged<Value>> = input.values.collect().await;
let mut concat_string = String::new(); let mut concat_string = String::new();

View file

@ -86,7 +86,7 @@ fn from_xml(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStrea
let tag = args.name_tag(); let tag = args.name_tag();
let input = args.input; let input = args.input;
let stream = async_stream_block! { let stream = async_stream! {
let values: Vec<Tagged<Value>> = input.values.collect().await; let values: Vec<Tagged<Value>> = input.values.collect().await;
let mut concat_string = String::new(); let mut concat_string = String::new();

View file

@ -100,7 +100,7 @@ fn from_yaml(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStre
let tag = args.name_tag(); let tag = args.name_tag();
let input = args.input; let input = args.input;
let stream = async_stream_block! { let stream = async_stream! {
let values: Vec<Tagged<Value>> = input.values.collect().await; let values: Vec<Tagged<Value>> = input.values.collect().await;
let mut concat_string = String::new(); let mut concat_string = String::new();

View file

@ -36,7 +36,7 @@ fn last(
LastArgs { amount }: LastArgs, LastArgs { amount }: LastArgs,
context: RunnableContext, context: RunnableContext,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
let stream = async_stream_block! { let stream = async_stream! {
let v: Vec<_> = context.input.into_vec().await; let v: Vec<_> = context.input.into_vec().await;
let k = v.len() - (*amount as usize); let k = v.len() - (*amount as usize);
for x in v[k..].iter() { for x in v[k..].iter() {

View file

@ -59,7 +59,7 @@ fn run(
let registry = registry.clone(); let registry = registry.clone();
let raw_args = raw_args.clone(); let raw_args = raw_args.clone();
let stream = async_stream_block! { let stream = async_stream! {
let result = fetch(&full_path, &path_str, path_span).await; let result = fetch(&full_path, &path_str, path_span).await;

View file

@ -52,7 +52,7 @@ fn merge_descriptors(values: &[Tagged<Value>]) -> Vec<String> {
} }
pub fn pivot(args: PivotArgs, context: RunnableContext) -> Result<OutputStream, ShellError> { pub fn pivot(args: PivotArgs, context: RunnableContext) -> Result<OutputStream, ShellError> {
let stream = async_stream_block! { let stream = async_stream! {
let input = context.input.into_vec().await; let input = context.input.into_vec().await;
let descs = merge_descriptors(&input); let descs = merge_descriptors(&input);

View file

@ -3,6 +3,7 @@ use crate::errors::ShellError;
use crate::parser::registry; use crate::parser::registry;
use crate::prelude::*; use crate::prelude::*;
use derive_new::new; use derive_new::new;
use futures_async_stream::async_stream_block;
use log::trace; use log::trace;
use serde::{self, Deserialize, Serialize}; use serde::{self, Deserialize, Serialize};
use std::io::prelude::*; use std::io::prelude::*;

View file

@ -73,7 +73,7 @@ fn run(
let registry = registry.clone(); let registry = registry.clone();
let raw_args = raw_args.clone(); let raw_args = raw_args.clone();
let stream = async_stream_block! { let stream = async_stream! {
let (file_extension, contents, contents_tag, span_source) = let (file_extension, contents, contents_tag, span_source) =
post(&path_str, &body, user, password, path_span, &registry, &raw_args).await.unwrap(); post(&path_str, &body, user, password, path_span, &registry, &raw_args).await.unwrap();

View file

@ -2,6 +2,7 @@ use crate::commands::{UnevaluatedCallInfo, WholeStreamCommand};
use crate::data::Value; use crate::data::Value;
use crate::errors::ShellError; use crate::errors::ShellError;
use crate::prelude::*; use crate::prelude::*;
use futures_async_stream::async_stream_block;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
pub struct Save; pub struct Save;

View file

@ -35,7 +35,7 @@ fn sort_by(
SortByArgs { rest }: SortByArgs, SortByArgs { rest }: SortByArgs,
mut context: RunnableContext, mut context: RunnableContext,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
Ok(OutputStream::new(async_stream_block! { Ok(OutputStream::new(async_stream! {
let mut vec = context.input.drain_vec().await; let mut vec = context.input.drain_vec().await;
let calc_key = |item: &Tagged<Value>| { let calc_key = |item: &Tagged<Value>| {

View file

@ -233,7 +233,7 @@ fn bson_value_to_bytes(bson: Bson, tag: Tag) -> Result<Vec<u8>, ShellError> {
fn to_bson(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { fn to_bson(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let args = args.evaluate_once(registry)?; let args = args.evaluate_once(registry)?;
let name_tag = args.name_tag(); let name_tag = args.name_tag();
let stream = async_stream_block! { let stream = async_stream! {
let input: Vec<Tagged<Value>> = args.input.values.collect().await; let input: Vec<Tagged<Value>> = args.input.values.collect().await;
let to_process_input = if input.len() > 1 { let to_process_input = if input.len() > 1 {

View file

@ -135,7 +135,7 @@ fn to_csv(
RunnableContext { input, name, .. }: RunnableContext, RunnableContext { input, name, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
let name_tag = name; let name_tag = name;
let stream = async_stream_block! { let stream = async_stream! {
let input: Vec<Tagged<Value>> = input.values.collect().await; let input: Vec<Tagged<Value>> = input.values.collect().await;
let to_process_input = if input.len() > 1 { let to_process_input = if input.len() > 1 {

View file

@ -81,7 +81,7 @@ fn json_list(input: &Vec<Tagged<Value>>) -> Result<Vec<serde_json::Value>, Shell
fn to_json(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { fn to_json(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let args = args.evaluate_once(registry)?; let args = args.evaluate_once(registry)?;
let name_tag = args.name_tag(); let name_tag = args.name_tag();
let stream = async_stream_block! { let stream = async_stream! {
let input: Vec<Tagged<Value>> = args.input.values.collect().await; let input: Vec<Tagged<Value>> = args.input.values.collect().await;
let to_process_input = if input.len() > 1 { let to_process_input = if input.len() > 1 {

View file

@ -201,7 +201,7 @@ fn sqlite_input_stream_to_bytes(
fn to_sqlite(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { fn to_sqlite(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let args = args.evaluate_once(registry)?; let args = args.evaluate_once(registry)?;
let name_tag = args.name_tag(); let name_tag = args.name_tag();
let stream = async_stream_block! { let stream = async_stream! {
let input: Vec<Tagged<Value>> = args.input.values.collect().await; let input: Vec<Tagged<Value>> = args.input.values.collect().await;
match sqlite_input_stream_to_bytes(input) { match sqlite_input_stream_to_bytes(input) {

View file

@ -76,7 +76,7 @@ fn collect_values(input: &Vec<Tagged<Value>>) -> Result<Vec<toml::Value>, ShellE
fn to_toml(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { fn to_toml(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let args = args.evaluate_once(registry)?; let args = args.evaluate_once(registry)?;
let name_tag = args.name_tag(); let name_tag = args.name_tag();
let stream = async_stream_block! { let stream = async_stream! {
let input: Vec<Tagged<Value>> = args.input.values.collect().await; let input: Vec<Tagged<Value>> = args.input.values.collect().await;
let to_process_input = if input.len() > 1 { let to_process_input = if input.len() > 1 {

View file

@ -134,7 +134,7 @@ fn to_tsv(
RunnableContext { input, name, .. }: RunnableContext, RunnableContext { input, name, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
let name_tag = name; let name_tag = name;
let stream = async_stream_block! { let stream = async_stream! {
let input: Vec<Tagged<Value>> = input.values.collect().await; let input: Vec<Tagged<Value>> = input.values.collect().await;
let to_process_input = if input.len() > 1 { let to_process_input = if input.len() > 1 {

View file

@ -31,7 +31,7 @@ fn to_url(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream,
let tag = args.name_tag(); let tag = args.name_tag();
let input = args.input; let input = args.input;
let stream = async_stream_block! { let stream = async_stream! {
let input: Vec<Tagged<Value>> = input.values.collect().await; let input: Vec<Tagged<Value>> = input.values.collect().await;
for value in input { for value in input {

View file

@ -77,7 +77,7 @@ pub fn value_to_yaml_value(v: &Tagged<Value>) -> Result<serde_yaml::Value, Shell
fn to_yaml(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { fn to_yaml(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let args = args.evaluate_once(registry)?; let args = args.evaluate_once(registry)?;
let name_tag = args.name_tag(); let name_tag = args.name_tag();
let stream = async_stream_block! { let stream = async_stream! {
let input: Vec<Tagged<Value>> = args.input.values.collect().await; let input: Vec<Tagged<Value>> = args.input.values.collect().await;
let to_process_input = if input.len() > 1 { let to_process_input = if input.len() > 1 {

View file

@ -1,5 +1,6 @@
#![feature(generators)] #![feature(generators)]
#![feature(proc_macro_hygiene)] #![feature(proc_macro_hygiene)]
#![recursion_limit = "512"]
#[macro_use] #[macro_use]
mod prelude; mod prelude;

View file

@ -72,10 +72,10 @@ pub(crate) use crate::shell::value_shell::ValueShell;
pub(crate) use crate::stream::{InputStream, OutputStream}; pub(crate) use crate::stream::{InputStream, OutputStream};
pub(crate) use crate::traits::{HasTag, ToDebug}; pub(crate) use crate::traits::{HasTag, ToDebug};
pub(crate) use crate::Text; pub(crate) use crate::Text;
pub(crate) use async_stream::stream as async_stream;
pub(crate) use bigdecimal::BigDecimal; pub(crate) use bigdecimal::BigDecimal;
pub(crate) use futures::stream::BoxStream; pub(crate) use futures::stream::BoxStream;
pub(crate) use futures::{FutureExt, Stream, StreamExt}; pub(crate) use futures::{FutureExt, Stream, StreamExt};
pub(crate) use futures_async_stream::async_stream_block;
pub(crate) use num_bigint::BigInt; pub(crate) use num_bigint::BigInt;
pub(crate) use num_traits::cast::{FromPrimitive, ToPrimitive}; pub(crate) use num_traits::cast::{FromPrimitive, ToPrimitive};
pub(crate) use num_traits::identities::Zero; pub(crate) use num_traits::identities::Zero;