diff --git a/Cargo.lock b/Cargo.lock index 8db4a18084..c0730ef123 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -53,6 +53,25 @@ dependencies = [ "nodrop 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "async-stream" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "async-stream-impl 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-core-preview 0.3.0-alpha.18 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "async-stream-impl" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "proc-macro2 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "atty" version = "0.2.13" @@ -1516,6 +1535,7 @@ version = "0.3.0" dependencies = [ "ansi_term 0.12.1 (registry+https://github.com/rust-lang/crates.io-index)", "app_dirs 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "async-stream 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "base64 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)", "battery 0.7.4 (registry+https://github.com/rust-lang/crates.io-index)", "bigdecimal 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -3003,6 +3023,8 @@ dependencies = [ "checksum app_dirs 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e73a24bad9bd6a94d6395382a6c69fe071708ae4409f763c5475e14ee896313d" "checksum arrayref 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "0d382e583f07208808f6b1249e60848879ba3543f57c32277bf52d69c2f0f0ee" "checksum arrayvec 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)" = "b8d73f9beda665eaa98ab9e4f7442bd4e7de6652587de55b2525e52e29c1b0ba" +"checksum async-stream 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "650be9b667e47506c42ee53034fb1935443cb2447a3a5c0a75e303d2e756fa73" +"checksum async-stream-impl 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4f0d8c5b411e36dcfb04388bacfec54795726b1f0148adcb0f377a96d6747e0e" "checksum atty 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)" = "1803c647a3ec87095e7ae7acfca019e98de5ec9a7d01343f611cf3152ed71a90" "checksum autocfg 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "22130e92352b948e7e82a49cdb0aa94f2211761117f29e052dd397c1ac33542b" "checksum backtrace 0.3.34 (registry+https://github.com/rust-lang/crates.io-index)" = "b5164d292487f037ece34ec0de2fcede2faa162f085dd96d2385ab81b12765ba" diff --git a/Cargo.toml b/Cargo.toml index 132aad2393..84b84ca416 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ byte-unit = "3.0.1" base64 = "0.10.1" futures-preview = { version = "=0.3.0-alpha.18", features = ["compat", "io-compat"] } futures-async-stream = "=0.1.0-alpha.5" +async-stream = "0.1.1" futures_codec = "0.2.5" num-traits = "0.2.8" term = "0.5.2" diff --git a/src/commands/autoview.rs b/src/commands/autoview.rs index b9b9d8941c..e715491e91 100644 --- a/src/commands/autoview.rs +++ b/src/commands/autoview.rs @@ -1,6 +1,7 @@ use crate::commands::{RawCommandArgs, WholeStreamCommand}; use crate::errors::ShellError; use crate::prelude::*; +use futures_async_stream::async_stream_block; pub struct Autoview; diff --git a/src/commands/enter.rs b/src/commands/enter.rs index 9388abb941..923715d139 100644 --- a/src/commands/enter.rs +++ b/src/commands/enter.rs @@ -61,7 +61,7 @@ impl PerItemCommand for Enter { )))] .into()) } else { - let stream = async_stream_block! { + let stream = async_stream! { // If it's a file, attempt to open the file as a value and enter it let cwd = raw_args.shell_manager.path(); diff --git a/src/commands/fetch.rs b/src/commands/fetch.rs index 79806e76b2..886d6bd959 100644 --- a/src/commands/fetch.rs +++ b/src/commands/fetch.rs @@ -58,7 +58,7 @@ fn run( let registry = registry.clone(); let raw_args = raw_args.clone(); - let stream = async_stream_block! { + let stream = async_stream! { let result = fetch(&path_str, path_span).await; diff --git a/src/commands/from_bson.rs b/src/commands/from_bson.rs index a4171e3fec..7dd00983fc 100644 --- a/src/commands/from_bson.rs +++ b/src/commands/from_bson.rs @@ -201,7 +201,7 @@ fn from_bson(args: CommandArgs, registry: &CommandRegistry) -> Result> = input.values.collect().await; for value in values { diff --git a/src/commands/from_csv.rs b/src/commands/from_csv.rs index 68296eb11b..ea90ab3de1 100644 --- a/src/commands/from_csv.rs +++ b/src/commands/from_csv.rs @@ -88,7 +88,7 @@ fn from_csv( ) -> Result { let name_tag = name; - let stream = async_stream_block! { + let stream = async_stream! { let values: Vec> = input.values.collect().await; let mut concat_string = String::new(); diff --git a/src/commands/from_ini.rs b/src/commands/from_ini.rs index 8409cf8489..d53ad67773 100644 --- a/src/commands/from_ini.rs +++ b/src/commands/from_ini.rs @@ -67,7 +67,7 @@ fn from_ini(args: CommandArgs, registry: &CommandRegistry) -> Result> = input.values.collect().await; let mut concat_string = String::new(); diff --git a/src/commands/from_json.rs b/src/commands/from_json.rs index ab70685f2b..dae288a892 100644 --- a/src/commands/from_json.rs +++ b/src/commands/from_json.rs @@ -74,7 +74,7 @@ fn from_json( ) -> Result { let name_tag = name; - let stream = async_stream_block! { + let stream = async_stream! { let values: Vec> = input.values.collect().await; let mut concat_string = String::new(); diff --git a/src/commands/from_sqlite.rs b/src/commands/from_sqlite.rs index e880571911..20d087bd5c 100644 --- a/src/commands/from_sqlite.rs +++ b/src/commands/from_sqlite.rs @@ -131,7 +131,7 @@ fn from_sqlite(args: CommandArgs, registry: &CommandRegistry) -> Result> = input.values.collect().await; for value in values { diff --git a/src/commands/from_toml.rs b/src/commands/from_toml.rs index 29db38a77e..c0098d9267 100644 --- a/src/commands/from_toml.rs +++ b/src/commands/from_toml.rs @@ -71,7 +71,7 @@ pub fn from_toml( let tag = args.name_tag(); let input = args.input; - let stream = async_stream_block! { + let stream = async_stream! { let values: Vec> = input.values.collect().await; let mut concat_string = String::new(); diff --git a/src/commands/from_tsv.rs b/src/commands/from_tsv.rs index 66f070a5df..bba532d17b 100644 --- a/src/commands/from_tsv.rs +++ b/src/commands/from_tsv.rs @@ -89,7 +89,7 @@ fn from_tsv( ) -> Result { let name_tag = name; - let stream = async_stream_block! { + let stream = async_stream! { let values: Vec> = input.values.collect().await; let mut concat_string = String::new(); diff --git a/src/commands/from_url.rs b/src/commands/from_url.rs index 81113a83d4..662508deb6 100644 --- a/src/commands/from_url.rs +++ b/src/commands/from_url.rs @@ -31,7 +31,7 @@ fn from_url(args: CommandArgs, registry: &CommandRegistry) -> Result> = input.values.collect().await; let mut concat_string = String::new(); diff --git a/src/commands/from_xml.rs b/src/commands/from_xml.rs index f80d428f43..5bba67b42a 100644 --- a/src/commands/from_xml.rs +++ b/src/commands/from_xml.rs @@ -86,7 +86,7 @@ fn from_xml(args: CommandArgs, registry: &CommandRegistry) -> Result> = input.values.collect().await; let mut concat_string = String::new(); diff --git a/src/commands/from_yaml.rs b/src/commands/from_yaml.rs index 9bd2bf5629..9e156bbc75 100644 --- a/src/commands/from_yaml.rs +++ b/src/commands/from_yaml.rs @@ -100,7 +100,7 @@ fn from_yaml(args: CommandArgs, registry: &CommandRegistry) -> Result> = input.values.collect().await; let mut concat_string = String::new(); diff --git a/src/commands/last.rs b/src/commands/last.rs index fd7a3ecea2..4813c555a1 100644 --- a/src/commands/last.rs +++ b/src/commands/last.rs @@ -36,7 +36,7 @@ fn last( LastArgs { amount }: LastArgs, context: RunnableContext, ) -> Result { - let stream = async_stream_block! { + let stream = async_stream! { let v: Vec<_> = context.input.into_vec().await; let k = v.len() - (*amount as usize); for x in v[k..].iter() { diff --git a/src/commands/open.rs b/src/commands/open.rs index 603bb4da0b..78aa35db01 100644 --- a/src/commands/open.rs +++ b/src/commands/open.rs @@ -59,7 +59,7 @@ fn run( let registry = registry.clone(); let raw_args = raw_args.clone(); - let stream = async_stream_block! { + let stream = async_stream! { let result = fetch(&full_path, &path_str, path_span).await; diff --git a/src/commands/pivot.rs b/src/commands/pivot.rs index 0232f2d59e..1a6bb901fb 100644 --- a/src/commands/pivot.rs +++ b/src/commands/pivot.rs @@ -52,7 +52,7 @@ fn merge_descriptors(values: &[Tagged]) -> Vec { } pub fn pivot(args: PivotArgs, context: RunnableContext) -> Result { - let stream = async_stream_block! { + let stream = async_stream! { let input = context.input.into_vec().await; let descs = merge_descriptors(&input); diff --git a/src/commands/plugin.rs b/src/commands/plugin.rs index ae9b2ec64a..4e30b68f45 100644 --- a/src/commands/plugin.rs +++ b/src/commands/plugin.rs @@ -3,6 +3,7 @@ use crate::errors::ShellError; use crate::parser::registry; use crate::prelude::*; use derive_new::new; +use futures_async_stream::async_stream_block; use log::trace; use serde::{self, Deserialize, Serialize}; use std::io::prelude::*; diff --git a/src/commands/post.rs b/src/commands/post.rs index 6d5627a65f..1bff755d9e 100644 --- a/src/commands/post.rs +++ b/src/commands/post.rs @@ -73,7 +73,7 @@ fn run( let registry = registry.clone(); let raw_args = raw_args.clone(); - let stream = async_stream_block! { + let stream = async_stream! { let (file_extension, contents, contents_tag, span_source) = post(&path_str, &body, user, password, path_span, ®istry, &raw_args).await.unwrap(); diff --git a/src/commands/save.rs b/src/commands/save.rs index 9c12fd2414..982e578a8a 100644 --- a/src/commands/save.rs +++ b/src/commands/save.rs @@ -2,6 +2,7 @@ use crate::commands::{UnevaluatedCallInfo, WholeStreamCommand}; use crate::data::Value; use crate::errors::ShellError; use crate::prelude::*; +use futures_async_stream::async_stream_block; use std::path::{Path, PathBuf}; pub struct Save; diff --git a/src/commands/sort_by.rs b/src/commands/sort_by.rs index 8058b7889e..1e6b87491a 100644 --- a/src/commands/sort_by.rs +++ b/src/commands/sort_by.rs @@ -35,7 +35,7 @@ fn sort_by( SortByArgs { rest }: SortByArgs, mut context: RunnableContext, ) -> Result { - Ok(OutputStream::new(async_stream_block! { + Ok(OutputStream::new(async_stream! { let mut vec = context.input.drain_vec().await; let calc_key = |item: &Tagged| { diff --git a/src/commands/to_bson.rs b/src/commands/to_bson.rs index 0cec599265..a36d99c077 100644 --- a/src/commands/to_bson.rs +++ b/src/commands/to_bson.rs @@ -233,7 +233,7 @@ fn bson_value_to_bytes(bson: Bson, tag: Tag) -> Result, ShellError> { fn to_bson(args: CommandArgs, registry: &CommandRegistry) -> Result { let args = args.evaluate_once(registry)?; let name_tag = args.name_tag(); - let stream = async_stream_block! { + let stream = async_stream! { let input: Vec> = args.input.values.collect().await; let to_process_input = if input.len() > 1 { diff --git a/src/commands/to_csv.rs b/src/commands/to_csv.rs index fd77fdcb6f..1897fb86b7 100644 --- a/src/commands/to_csv.rs +++ b/src/commands/to_csv.rs @@ -135,7 +135,7 @@ fn to_csv( RunnableContext { input, name, .. }: RunnableContext, ) -> Result { let name_tag = name; - let stream = async_stream_block! { + let stream = async_stream! { let input: Vec> = input.values.collect().await; let to_process_input = if input.len() > 1 { diff --git a/src/commands/to_json.rs b/src/commands/to_json.rs index d8aaa96794..9c06299aad 100644 --- a/src/commands/to_json.rs +++ b/src/commands/to_json.rs @@ -81,7 +81,7 @@ fn json_list(input: &Vec>) -> Result, Shell fn to_json(args: CommandArgs, registry: &CommandRegistry) -> Result { let args = args.evaluate_once(registry)?; let name_tag = args.name_tag(); - let stream = async_stream_block! { + let stream = async_stream! { let input: Vec> = args.input.values.collect().await; let to_process_input = if input.len() > 1 { diff --git a/src/commands/to_sqlite.rs b/src/commands/to_sqlite.rs index c695667ca0..4f9181ec7c 100644 --- a/src/commands/to_sqlite.rs +++ b/src/commands/to_sqlite.rs @@ -201,7 +201,7 @@ fn sqlite_input_stream_to_bytes( fn to_sqlite(args: CommandArgs, registry: &CommandRegistry) -> Result { let args = args.evaluate_once(registry)?; let name_tag = args.name_tag(); - let stream = async_stream_block! { + let stream = async_stream! { let input: Vec> = args.input.values.collect().await; match sqlite_input_stream_to_bytes(input) { diff --git a/src/commands/to_toml.rs b/src/commands/to_toml.rs index a30c9d3cf7..6c8904e0c2 100644 --- a/src/commands/to_toml.rs +++ b/src/commands/to_toml.rs @@ -76,7 +76,7 @@ fn collect_values(input: &Vec>) -> Result, ShellE fn to_toml(args: CommandArgs, registry: &CommandRegistry) -> Result { let args = args.evaluate_once(registry)?; let name_tag = args.name_tag(); - let stream = async_stream_block! { + let stream = async_stream! { let input: Vec> = args.input.values.collect().await; let to_process_input = if input.len() > 1 { diff --git a/src/commands/to_tsv.rs b/src/commands/to_tsv.rs index 7bce174d01..4edc26face 100644 --- a/src/commands/to_tsv.rs +++ b/src/commands/to_tsv.rs @@ -134,7 +134,7 @@ fn to_tsv( RunnableContext { input, name, .. }: RunnableContext, ) -> Result { let name_tag = name; - let stream = async_stream_block! { + let stream = async_stream! { let input: Vec> = input.values.collect().await; let to_process_input = if input.len() > 1 { diff --git a/src/commands/to_url.rs b/src/commands/to_url.rs index d98a765a29..dfba5faf4d 100644 --- a/src/commands/to_url.rs +++ b/src/commands/to_url.rs @@ -31,7 +31,7 @@ fn to_url(args: CommandArgs, registry: &CommandRegistry) -> Result> = input.values.collect().await; for value in input { diff --git a/src/commands/to_yaml.rs b/src/commands/to_yaml.rs index db54af6e89..a72ab9ffcc 100644 --- a/src/commands/to_yaml.rs +++ b/src/commands/to_yaml.rs @@ -77,7 +77,7 @@ pub fn value_to_yaml_value(v: &Tagged) -> Result Result { let args = args.evaluate_once(registry)?; let name_tag = args.name_tag(); - let stream = async_stream_block! { + let stream = async_stream! { let input: Vec> = args.input.values.collect().await; let to_process_input = if input.len() > 1 { diff --git a/src/lib.rs b/src/lib.rs index f4ccb4e4e3..ef7f3c3812 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,6 @@ #![feature(generators)] #![feature(proc_macro_hygiene)] +#![recursion_limit = "512"] #[macro_use] mod prelude; diff --git a/src/prelude.rs b/src/prelude.rs index d58e7989a6..56cd21b33a 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -72,10 +72,10 @@ pub(crate) use crate::shell::value_shell::ValueShell; pub(crate) use crate::stream::{InputStream, OutputStream}; pub(crate) use crate::traits::{HasTag, ToDebug}; pub(crate) use crate::Text; +pub(crate) use async_stream::stream as async_stream; pub(crate) use bigdecimal::BigDecimal; pub(crate) use futures::stream::BoxStream; pub(crate) use futures::{FutureExt, Stream, StreamExt}; -pub(crate) use futures_async_stream::async_stream_block; pub(crate) use num_bigint::BigInt; pub(crate) use num_traits::cast::{FromPrimitive, ToPrimitive}; pub(crate) use num_traits::identities::Zero;