diff --git a/Cargo.lock b/Cargo.lock index 4a62717959..9174baba43 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8,9 +8,9 @@ checksum = "5d2e7343e7fc9de883d1b0341e0b13970f764c14101234857d2ddafa1cb1cac2" [[package]] name = "aho-corasick" -version = "0.7.7" +version = "0.7.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f56c476256dc249def911d6f7580b5fc7e875895b5d7ee88f5d602208035744" +checksum = "743ad5a418686aad3b87fd14c43badd828cf26e214a00f92a384291cf22e1811" dependencies = [ "memchr", ] @@ -116,19 +116,19 @@ dependencies = [ [[package]] name = "async-stream" -version = "0.1.2" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb6fa015ebe961e9908ca4c1854e7dc7aabd4417da77b6a0466e4dfb4c8f6f69" +checksum = "22068c0c19514942eefcfd4daf8976ef1aad84e61539f95cd200c35202f80af5" dependencies = [ "async-stream-impl", - "futures-core-preview", + "futures-core", ] [[package]] name = "async-stream-impl" -version = "0.1.1" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f0d8c5b411e36dcfb04388bacfec54795726b1f0148adcb0f377a96d6747e0e" +checksum = "25f9db3b38af870bf7e5cc649167533b493928e50744e2c30ae350230b414670" dependencies = [ "proc-macro2", "quote", @@ -137,9 +137,9 @@ dependencies = [ [[package]] name = "async-task" -version = "1.3.0" +version = "1.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f20c6fda19d0fc02406779587ca4f9a4171cd32e4a5bda0bd016f0a1334c8d4a" +checksum = "0ac2c016b079e771204030951c366db398864f5026f84a44dafb0ff20f02085d" dependencies = [ "libc", "winapi 0.3.8", @@ -288,9 +288,9 @@ dependencies = [ [[package]] name = "bstr" -version = "0.2.10" +version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe8a65814ca90dfc9705af76bb6ba3c6e2534489a72270e797e603783bb4990b" +checksum = "502ae1441a0a5adb8fbd38a5955a6416b9493e92b465de5e4a9bde6a539c2c48" dependencies = [ "lazy_static 1.4.0", "memchr", @@ -941,12 +941,13 @@ checksum = "1b980f2816d6ee8673b6517b52cb0e808a180efc92e5c19d02cdda79066703ef" [[package]] name = "futures" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6f16056ecbb57525ff698bb955162d0cd03bee84e6241c27ff75c08d8ca5987" +checksum = "ad6636318d07abeb4656157ef1936c64485f066c7f9ce5d7c5b879fcb6dd5ccb" dependencies = [ "futures-channel", "futures-core", + "futures-executor", "futures-io", "futures-sink", "futures-task", @@ -955,9 +956,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fcae98ca17d102fd8a3603727b9259fcf7fa4239b603d2142926189bc8999b86" +checksum = "7264eb65b194d2fa6ec31b898ead7c332854bfa42521659226e72a585fca5b85" dependencies = [ "futures-core", "futures-sink", @@ -975,9 +976,9 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79564c427afefab1dfb3298535b21eda083ef7935b4f0ecbfcb121f0aec10866" +checksum = "b597b16aa1a19ce2dfde5128a7c656d75346b35601a640be2d9efd4e9c83609d" [[package]] name = "futures-core-preview" @@ -985,6 +986,17 @@ version = "0.3.0-alpha.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b35b6263fb1ef523c3056565fa67b1d16f0a8604ff12b11b08c25f28a734c60a" +[[package]] +name = "futures-executor" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46a5e593d77bee52393c7f3b16b8b413214096d3f7dc4f5f4c57dee01ad2bdaf" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-executor-preview" version = "0.3.0-alpha.19" @@ -998,9 +1010,9 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e676577d229e70952ab25f3945795ba5b16d63ca794ca9d2c860e5595d20b5ff" +checksum = "3d429f824b5e5dbd45fc8e54e1005a37e1f8c6d570cd64d0b59b24d3a80b8b8e" [[package]] name = "futures-io-preview" @@ -1010,9 +1022,9 @@ checksum = "f4914ae450db1921a56c91bde97a27846287d062087d4a652efc09bb3a01ebda" [[package]] name = "futures-macro" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52e7c56c15537adb4f76d0b7a76ad131cb4d2f4f32d3b0bcabcbe1c7c5e87764" +checksum = "a1d75b72904b78044e0091355fc49d29f48bff07a68a719a41cf059711e071b4" dependencies = [ "proc-macro-hack", "proc-macro2", @@ -1036,9 +1048,9 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "171be33efae63c2d59e6dbba34186fe0d6394fb378069a76dfd80fdcffd43c16" +checksum = "04299e123547ea7c56f3e1b376703142f5fc0b6700433eed549e9d0b8a75a66c" [[package]] name = "futures-sink-preview" @@ -1048,19 +1060,9 @@ checksum = "86f148ef6b69f75bb610d4f9a2336d4fc88c4b5b67129d1a340dd0fd362efeec" [[package]] name = "futures-task" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0bae52d6b29cf440e298856fec3965ee6fa71b06aa7495178615953fd669e5f9" - -[[package]] -name = "futures-timer" -version = "1.0.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7946248e9429ff093345d3e8fdf4eb0f9b2d79091611c9c14f744971a6f8be45" -dependencies = [ - "futures-core-preview", - "pin-utils", -] +checksum = "86f9ceab4bce46555ee608b1ec7c414d6b2e76e196ef46fa5a8d4815a8571398" [[package]] name = "futures-timer" @@ -1069,11 +1071,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1de7508b218029b0f01662ed8f61b1c964b3ae99d6f25462d0f55a595109df6" [[package]] -name = "futures-util" -version = "0.3.1" +name = "futures-timer" +version = "3.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0d66274fb76985d3c62c886d1da7ac4c0903a8c9f754e8fe0f35a6a6cc39e76" +checksum = "3de1a2b2a2a33d9e60e17980b60ee061eeaae96a5abe9121db0fdb9af167a1c5" + +[[package]] +name = "futures-util" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d2f1296f7644d2cd908ebb2fa74645608e39f117c72bac251d40418c6d74c4f" dependencies = [ + "futures 0.1.29", "futures-channel", "futures-core", "futures-io", @@ -1085,6 +1094,7 @@ dependencies = [ "proc-macro-hack", "proc-macro-nested", "slab", + "tokio-io", ] [[package]] @@ -1106,12 +1116,14 @@ dependencies = [ [[package]] name = "futures_codec" -version = "0.2.5" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "36552cd31353fd135114510d53b8d120758120c36aa636a9341970f9efb1e4a0" +checksum = "fe8859feb7140742ed1a2a85a07941100ad2b5f98a421b353931d718a34144d1" dependencies = [ - "bytes 0.4.12", - "futures-preview", + "bytes 0.5.4", + "futures 0.3.3", + "memchr", + "pin-project", ] [[package]] @@ -1514,7 +1526,7 @@ checksum = "b37aaf19f3ca34681b606f8b973f4f3422e40e418b30945d3cf9ae72021692ed" dependencies = [ "async-std", "cfg-if", - "futures 0.3.1", + "futures 0.3.3", "thiserror", ] @@ -1545,9 +1557,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "1.3.1" +version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b54058f0a6ff80b6803da8faf8997cde53872b38f4023728f6830b06cd3c0dc" +checksum = "076f042c5b7b98f31d205f1249267e12a6518c1481e9dae9764af19b707d2292" dependencies = [ "autocfg 1.0.0", "serde 1.0.104", @@ -1935,9 +1947,9 @@ dependencies = [ [[package]] name = "miniz_oxide" -version = "0.3.5" +version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6f3f74f726ae935c3f514300cc6773a0c9492abc5e972d42ba0c0ebb88757625" +checksum = "aa679ff6578b1cddee93d7e82e263b94a575e0bfced07284eb0c037c1d2416a5" dependencies = [ "adler32", ] @@ -2151,8 +2163,7 @@ dependencies = [ "derive-new", "dirs 2.0.2", "dunce", - "futures-preview", - "futures-timer 1.0.3", + "futures 0.3.3", "futures-util", "futures_codec", "getset", @@ -2481,9 +2492,8 @@ dependencies = [ name = "nu_plugin_ps" version = "0.9.0" dependencies = [ - "futures-preview", - "futures-timer 1.0.3", - "futures-util", + "futures 0.3.3", + "futures-timer 3.0.1", "heim", "nu-build", "nu-errors", @@ -2523,7 +2533,7 @@ name = "nu_plugin_sys" version = "0.9.0" dependencies = [ "battery", - "futures-preview", + "futures 0.3.3", "futures-util", "heim", "nu-build", @@ -2703,9 +2713,9 @@ dependencies = [ [[package]] name = "open" -version = "1.3.2" +version = "1.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94b424e1086328b0df10235c6ff47be63708071881bead9e76997d9291c0134b" +checksum = "3dfa632621d66502e1e9298c038d903090fc810a33cc1e6a02958fa0be65e3fb" dependencies = [ "winapi 0.3.8", ] @@ -3229,9 +3239,9 @@ checksum = "194d8e591e405d1eecf28819740abed6d719d1a2db87fc0bcdedee9a26d55560" [[package]] name = "ring" -version = "0.16.9" +version = "0.16.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6747f8da1f2b1fabbee1aaa4eb8a11abf9adef0bf58a41cee45db5d59cecdfac" +checksum = "741ba1704ae21999c00942f9f5944f801e977f54302af346b596287599ad1862" dependencies = [ "cc", "lazy_static 1.4.0", @@ -3371,9 +3381,9 @@ dependencies = [ [[package]] name = "schannel" -version = "0.1.16" +version = "0.1.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87f550b06b6cba9c8b8be3ee73f391990116bf527450d2556e9b9ce263b9a021" +checksum = "507a9e6e8ffe0a4e0ebb9a10293e62fdf7657c06f1b8bb07a8fcf697d2abf295" dependencies = [ "lazy_static 1.4.0", "winapi 0.3.8", @@ -3515,9 +3525,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.45" +version = "1.0.46" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eab8f15f15d6c41a154c1b128a22f2dfabe350ef53c40953d84e36155c91192b" +checksum = "21b01d7f0288608a01dca632cf1df859df6fd6ffa885300fc275ce2ba6221953" dependencies = [ "indexmap", "itoa", @@ -3914,9 +3924,9 @@ dependencies = [ [[package]] name = "tokio-io" -version = "0.1.12" +version = "0.1.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5090db468dad16e1a7a54c8c67280c5e4b544f3d3e018f0b913b400261f85926" +checksum = "57fc868aae093479e3131e3d165c93b1c7474109d13c90ec0dda2a1bbfff0674" dependencies = [ "bytes 0.4.12", "futures 0.1.29", @@ -4315,9 +4325,9 @@ dependencies = [ [[package]] name = "webpki" -version = "0.21.0" +version = "0.21.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7e664e770ac0110e2384769bcc59ed19e329d81f555916a6e072714957b81b4" +checksum = "f1f50e1972865d6b1adb54167d1c8ed48606004c2c9d0ea5f1eeb34d95e863ef" dependencies = [ "ring", "untrusted", diff --git a/Cargo.toml b/Cargo.toml index fe9bf07b7e..d961490630 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -73,9 +73,9 @@ dunce = "1.0.0" indexmap = { version = "1.3.1", features = ["serde-1"] } byte-unit = "3.0.3" base64 = "0.11" -futures-preview = { version = "=0.3.0-alpha.19", features = ["compat", "io-compat"] } -async-stream = "0.1.2" -futures_codec = "0.2.5" +futures = { version = "0.3", features = ["compat", "io-compat"] } +async-stream = "0.2" +futures_codec = "0.4" num-traits = "0.2.11" term = "0.5.2" bytes = "0.4.12" @@ -125,7 +125,6 @@ natural = "0.3.0" parking_lot = "0.10.0" meval = "0.2" -futures-timer = { version = "1.0.2", optional = true } clipboard = {version = "0.5", optional = true } ptree = {version = "0.2" } starship = { version = "0.33.1", optional = true} diff --git a/crates/nu_plugin_ps/Cargo.toml b/crates/nu_plugin_ps/Cargo.toml index 28b438834f..4335db60c3 100644 --- a/crates/nu_plugin_ps/Cargo.toml +++ b/crates/nu_plugin_ps/Cargo.toml @@ -15,15 +15,13 @@ nu-protocol = { path = "../nu-protocol", version = "0.9.0" } nu-source = { path = "../nu-source", version = "0.9.0" } nu-errors = { path = "../nu-errors", version = "0.9.0" } -futures-preview = { version = "=0.3.0-alpha.19", features = ["compat", "io-compat"] } -futures-timer = "1.0.3" +futures = { version = "0.3", features = ["compat", "io-compat"] } +futures-timer = "3.0.1" pin-utils = "0.1.0-alpha.4" -futures-util = "0.3.1" [dependencies.heim] version = "0.0.9" default-features = false - features = ["process", "runtime-polyfill"] [build-dependencies] diff --git a/crates/nu_plugin_ps/src/ps.rs b/crates/nu_plugin_ps/src/ps.rs index 21cce38fbd..96ace00e8d 100644 --- a/crates/nu_plugin_ps/src/ps.rs +++ b/crates/nu_plugin_ps/src/ps.rs @@ -1,4 +1,4 @@ -use futures_util::{StreamExt, TryStreamExt}; +use futures::{StreamExt, TryStreamExt}; use heim::process::{self as process, Process, ProcessResult}; use heim::units::{information, ratio, Ratio}; use std::usize; diff --git a/crates/nu_plugin_sys/Cargo.toml b/crates/nu_plugin_sys/Cargo.toml index 62b39095dd..724592795e 100644 --- a/crates/nu_plugin_sys/Cargo.toml +++ b/crates/nu_plugin_sys/Cargo.toml @@ -12,7 +12,7 @@ nu-protocol = { path = "../nu-protocol", version = "0.9.0" } nu-source = { path = "../nu-source", version = "0.9.0" } nu-errors = { path = "../nu-errors", version = "0.9.0" } -futures-preview = { version = "=0.3.0-alpha.19", features = ["compat", "io-compat"] } +futures = { version = "0.3", features = ["compat", "io-compat"] } battery = "0.7.5" futures-util = "0.3.1" diff --git a/src/commands/append.rs b/src/commands/append.rs index 9866e05f95..14a622698f 100644 --- a/src/commands/append.rs +++ b/src/commands/append.rs @@ -43,6 +43,7 @@ fn append( ) -> Result { let mut after: VecDeque = VecDeque::new(); after.push_back(row); + let after = futures::stream::iter(after); Ok(OutputStream::from_input(input.values.chain(after))) } diff --git a/src/commands/classified/external.rs b/src/commands/classified/external.rs index c1724099d9..37082cc9ba 100644 --- a/src/commands/classified/external.rs +++ b/src/commands/classified/external.rs @@ -1,53 +1,14 @@ use crate::prelude::*; -use bytes::{BufMut, BytesMut}; use futures::stream::StreamExt; -use futures_codec::{Decoder, Encoder, FramedRead}; +use futures_codec::{FramedRead, LinesCodec}; use log::trace; use nu_errors::ShellError; use nu_parser::ExternalCommand; use nu_protocol::{Primitive, ShellTypeName, UntaggedValue, Value}; -use std::io::{Error, ErrorKind, Write}; +use std::io::Write; use std::ops::Deref; use std::process::{Command, Stdio}; -/// A simple `Codec` implementation that splits up data into lines. -pub struct LinesCodec {} - -impl Encoder for LinesCodec { - type Item = String; - type Error = Error; - - fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { - dst.put(item); - Ok(()) - } -} - -impl Decoder for LinesCodec { - type Item = nu_protocol::UntaggedValue; - type Error = Error; - - fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { - match src.iter().position(|b| b == &b'\n') { - Some(pos) if !src.is_empty() => { - let buf = src.split_to(pos + 1); - String::from_utf8(buf.to_vec()) - .map(UntaggedValue::line) - .map(Some) - .map_err(|e| Error::new(ErrorKind::InvalidData, e)) - } - _ if !src.is_empty() => { - let drained = src.take(); - String::from_utf8(drained.to_vec()) - .map(UntaggedValue::string) - .map(Some) - .map_err(|e| Error::new(ErrorKind::InvalidData, e)) - } - _ => Ok(None), - } - } -} - pub fn nu_value_to_string(command: &ExternalCommand, from: &Value) -> Result { match &from.value { UntaggedValue::Primitive(Primitive::Int(i)) => Ok(i.to_string()), @@ -376,21 +337,19 @@ async fn spawn( }; let file = futures::io::AllowStdIo::new(stdout); - let stream = FramedRead::new(file, LinesCodec {}); - - let mut stream = stream.map(|line| { + let mut stream = FramedRead::new(file, LinesCodec).map(|line| { if let Ok(line) = line { - line.into_value(&name_tag) + Value { + value: UntaggedValue::Primitive(Primitive::String(line)), + tag: name_tag.clone(), + } } else { panic!("Internal error: could not read lines of text from stdin") } }); - loop { - match stream.next().await { - Some(item) => yield Ok(item), - None => break, - } + while let Some(item) = stream.next().await { + yield Ok(item) } } diff --git a/src/commands/command.rs b/src/commands/command.rs index 482d9f1b41..17deb04ce2 100644 --- a/src/commands/command.rs +++ b/src/commands/command.rs @@ -525,9 +525,11 @@ impl Command { match call_info { Ok(call_info) => match command.run(&call_info, ®istry, &raw_args, x) { Ok(o) => o, - Err(e) => VecDeque::from(vec![ReturnValue::Err(e)]).to_output_stream(), + Err(e) => { + futures::stream::iter(vec![ReturnValue::Err(e)]).to_output_stream() + } }, - Err(e) => VecDeque::from(vec![ReturnValue::Err(e)]).to_output_stream(), + Err(e) => futures::stream::iter(vec![ReturnValue::Err(e)]).to_output_stream(), } }) .flatten(); diff --git a/src/commands/date.rs b/src/commands/date.rs index e0bae5ff89..b559821fa2 100644 --- a/src/commands/date.rs +++ b/src/commands/date.rs @@ -91,5 +91,5 @@ pub fn date(args: CommandArgs, registry: &CommandRegistry) -> Result match obj.replace_data_at_column_path(&field, replacement.item.clone()) { - Some(v) => VecDeque::from(vec![Ok(ReturnSuccess::Value(v))]), + Some(v) => futures::stream::iter(vec![Ok(ReturnSuccess::Value(v))]), None => { return Err(ShellError::labeled_error( "edit could not find place to insert column", diff --git a/src/commands/env.rs b/src/commands/env.rs index 5ad28f355f..5fa72651a6 100644 --- a/src/commands/env.rs +++ b/src/commands/env.rs @@ -85,5 +85,7 @@ pub fn env(args: CommandArgs, registry: &CommandRegistry) -> Result>, + rows: Option>, } impl WholeStreamCommand for First { diff --git a/src/commands/format.rs b/src/commands/format.rs index 99b49d10cd..9d79de9a1c 100644 --- a/src/commands/format.rs +++ b/src/commands/format.rs @@ -76,7 +76,7 @@ impl PerItemCommand for Format { String::new() }; - Ok(VecDeque::from(vec![ReturnSuccess::value( + Ok(futures::stream::iter(vec![ReturnSuccess::value( UntaggedValue::string(output).into_untagged_value(), )]) .to_output_stream()) diff --git a/src/commands/get.rs b/src/commands/get.rs index 032e4a1114..e5434ef78f 100644 --- a/src/commands/get.rs +++ b/src/commands/get.rs @@ -239,7 +239,7 @@ pub fn get( } } - result + futures::stream::iter(result) }) .flatten(); diff --git a/src/commands/help.rs b/src/commands/help.rs index f282692dc3..3594339dd9 100644 --- a/src/commands/help.rs +++ b/src/commands/help.rs @@ -77,6 +77,7 @@ impl PerItemCommand for Help { get_help(&command.name(), &command.usage(), command.signature()).into(), ); } + let help = futures::stream::iter(help); Ok(help.to_output_stream()) } _ => { @@ -102,11 +103,9 @@ Get the processes on your system actively using CPU: You can also learn more at https://www.nushell.sh/book/"#; - let mut output_stream = VecDeque::new(); - - output_stream.push_back(ReturnSuccess::value( + let output_stream = futures::stream::iter(vec![ReturnSuccess::value( UntaggedValue::string(msg).into_value(tag), - )); + )]); Ok(output_stream.to_output_stream()) } diff --git a/src/commands/insert.rs b/src/commands/insert.rs index d0d7452141..b2f69115ad 100644 --- a/src/commands/insert.rs +++ b/src/commands/insert.rs @@ -48,7 +48,7 @@ impl PerItemCommand for Insert { value: UntaggedValue::Row(_), .. } => match obj.insert_data_at_column_path(&field, replacement.item.clone()) { - Ok(v) => VecDeque::from(vec![Ok(ReturnSuccess::Value(v))]), + Ok(v) => futures::stream::iter(vec![Ok(ReturnSuccess::Value(v))]), Err(err) => return Err(err), }, diff --git a/src/commands/lines.rs b/src/commands/lines.rs index a78920e050..2b1ca87a06 100644 --- a/src/commands/lines.rs +++ b/src/commands/lines.rs @@ -44,25 +44,27 @@ fn lines(args: CommandArgs, registry: &CommandRegistry) -> Result>(); + + futures::stream::iter(result) } else { - let mut result = VecDeque::new(); let value_span = v.tag.span; - result.push_back(Err(ShellError::labeled_error_with_secondary( + futures::stream::iter(vec![Err(ShellError::labeled_error_with_secondary( "Expected a string from pipeline", "requires string input", name_span, "value originates from here", value_span, - ))); - result + ))]) } }) .flatten(); diff --git a/src/commands/nth.rs b/src/commands/nth.rs index ae20953a36..8dbbb6d03c 100644 --- a/src/commands/nth.rs +++ b/src/commands/nth.rs @@ -68,7 +68,7 @@ fn nth( result.push_back(ReturnSuccess::value(item)); } - result + futures::stream::iter(result) }) .flatten(); diff --git a/src/commands/parse.rs b/src/commands/parse.rs index fda1532dd6..726858e545 100644 --- a/src/commands/parse.rs +++ b/src/commands/parse.rs @@ -136,6 +136,6 @@ impl PerItemCommand for Parse { } else { VecDeque::new() }; - Ok(output.to_output_stream()) + Ok(output.into()) } } diff --git a/src/commands/plugin.rs b/src/commands/plugin.rs index 31ee177976..6f5fb19e62 100644 --- a/src/commands/plugin.rs +++ b/src/commands/plugin.rs @@ -84,9 +84,11 @@ pub fn filter_plugin( let mut bos: VecDeque = VecDeque::new(); bos.push_back(UntaggedValue::Primitive(Primitive::BeginningOfStream).into_untagged_value()); + let bos = futures::stream::iter(bos); let mut eos: VecDeque = VecDeque::new(); eos.push_back(UntaggedValue::Primitive(Primitive::EndOfStream).into_untagged_value()); + let eos = futures::stream::iter(eos); let call_info = args.call_info.clone(); @@ -294,6 +296,7 @@ pub fn filter_plugin( } } }) + .map(futures::stream::iter) // convert to a stream .flatten(); Ok(stream.to_output_stream()) diff --git a/src/commands/prepend.rs b/src/commands/prepend.rs index 6adb7c7ec5..0c9897d004 100644 --- a/src/commands/prepend.rs +++ b/src/commands/prepend.rs @@ -41,8 +41,7 @@ fn prepend( PrependArgs { row }: PrependArgs, RunnableContext { input, .. }: RunnableContext, ) -> Result { - let mut prepend: VecDeque = VecDeque::new(); - prepend.push_back(row); + let prepend = futures::stream::iter(vec![row]); Ok(OutputStream::from_input(prepend.chain(input.values))) } diff --git a/src/commands/range.rs b/src/commands/range.rs index c847c66196..d189606db9 100644 --- a/src/commands/range.rs +++ b/src/commands/range.rs @@ -47,7 +47,10 @@ fn range( let (from, _) = range.from; let (to, _) = range.to; + let from = *from as usize; + let to = *to as usize; + Ok(OutputStream::from_input( - input.values.skip(*from).take(*to - *from + 1), + input.values.skip(from).take(to - from + 1), )) } diff --git a/src/commands/reverse.rs b/src/commands/reverse.rs index fd70090d02..f7ea7244d1 100644 --- a/src/commands/reverse.rs +++ b/src/commands/reverse.rs @@ -32,11 +32,11 @@ fn reverse(args: CommandArgs, registry: &CommandRegistry) -> Result>(); + let input = input.values.collect::>(); - let output = output.map(move |mut vec| { + let output = input.map(move |mut vec| { vec.reverse(); - vec.into_iter().collect::>() + futures::stream::iter(vec) }); Ok(output.flatten_stream().from_input_stream()) diff --git a/src/commands/shells.rs b/src/commands/shells.rs index cee7cbe933..93d05b5193 100644 --- a/src/commands/shells.rs +++ b/src/commands/shells.rs @@ -46,5 +46,5 @@ fn shells(args: CommandArgs, _registry: &CommandRegistry) -> Result>, + rows: Option>, } impl WholeStreamCommand for Skip { diff --git a/src/commands/split_row.rs b/src/commands/split_row.rs index 5d8d712335..4ce299af67 100644 --- a/src/commands/split_row.rs +++ b/src/commands/split_row.rs @@ -58,7 +58,7 @@ fn split_row( UntaggedValue::Primitive(Primitive::String(s.into())).into_value(&v.tag), )); } - result + futures::stream::iter(result) } else { let mut result = VecDeque::new(); result.push_back(Err(ShellError::labeled_error_with_secondary( @@ -68,7 +68,7 @@ fn split_row( "value originates from here", v.tag.span, ))); - result + futures::stream::iter(result) } }) .flatten(); diff --git a/src/commands/where_.rs b/src/commands/where_.rs index f37e879543..d43101cd8b 100644 --- a/src/commands/where_.rs +++ b/src/commands/where_.rs @@ -57,6 +57,6 @@ impl PerItemCommand for Where { } }; - Ok(stream.to_output_stream()) + Ok(stream.into()) } } diff --git a/src/shell/help_shell.rs b/src/shell/help_shell.rs index 09566d1da1..77c104fdb8 100644 --- a/src/shell/help_shell.rs +++ b/src/shell/help_shell.rs @@ -142,7 +142,12 @@ impl Shell for HelpShell { _args: LsArgs, _context: &RunnablePerItemContext, ) -> Result { - Ok(self.commands().map(ReturnSuccess::value).to_output_stream()) + let output = self + .commands() + .into_iter() + .map(ReturnSuccess::value) + .collect::>(); + Ok(output.into()) } fn cd(&self, args: EvaluatedWholeStreamCommandArgs) -> Result { diff --git a/src/shell/value_shell.rs b/src/shell/value_shell.rs index c058ef585a..56f53ced96 100644 --- a/src/shell/value_shell.rs +++ b/src/shell/value_shell.rs @@ -120,10 +120,12 @@ impl Shell for ValueShell { )); } - Ok(self + let output = self .members_under(full_path.as_path()) + .into_iter() .map(ReturnSuccess::value) - .to_output_stream()) + .collect::>(); + Ok(output.into()) } fn cd(&self, args: EvaluatedWholeStreamCommandArgs) -> Result { diff --git a/src/stream.rs b/src/stream.rs index bf9563dc2c..0dfe6e0e88 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -1,4 +1,5 @@ use crate::prelude::*; +use futures::stream::iter; use nu_protocol::{ReturnSuccess, ReturnValue, UntaggedValue, Value}; pub struct InputStream { @@ -15,7 +16,7 @@ impl InputStream { } pub fn drain_vec(&mut self) -> impl Future> { - let mut values: BoxStream<'static, Value> = VecDeque::new().boxed(); + let mut values: BoxStream<'static, Value> = iter(VecDeque::new()).boxed(); std::mem::swap(&mut values, &mut self.values); values.collect() @@ -48,18 +49,15 @@ impl From> for InputStream { impl From> for InputStream { fn from(input: VecDeque) -> InputStream { InputStream { - values: input.boxed(), + values: futures::stream::iter(input).boxed(), } } } impl From> for InputStream { fn from(input: Vec) -> InputStream { - let mut list = VecDeque::default(); - list.extend(input); - InputStream { - values: list.boxed(), + values: futures::stream::iter(input).boxed(), } } } @@ -93,7 +91,7 @@ impl OutputStream { } pub fn drain_vec(&mut self) -> impl Future> { - let mut values: BoxStream<'static, ReturnValue> = VecDeque::new().boxed(); + let mut values: BoxStream<'static, ReturnValue> = iter(VecDeque::new()).boxed(); std::mem::swap(&mut values, &mut self.values); values.collect() @@ -136,41 +134,33 @@ impl From> for OutputStream { impl From> for OutputStream { fn from(input: VecDeque) -> OutputStream { OutputStream { - values: input.boxed(), + values: futures::stream::iter(input).boxed(), } } } impl From> for OutputStream { fn from(input: VecDeque) -> OutputStream { + let stream = input.into_iter().map(ReturnSuccess::value); OutputStream { - values: input - .into_iter() - .map(ReturnSuccess::value) - .collect::>() - .boxed(), + values: futures::stream::iter(stream).boxed(), } } } impl From> for OutputStream { fn from(input: Vec) -> OutputStream { - let mut list = VecDeque::default(); - list.extend(input); - OutputStream { - values: list.boxed(), + values: futures::stream::iter(input).boxed(), } } } impl From> for OutputStream { fn from(input: Vec) -> OutputStream { - let mut list = VecDeque::default(); - list.extend(input.into_iter().map(ReturnSuccess::value)); - + let stream = input.into_iter().map(ReturnSuccess::value); OutputStream { - values: list.boxed(), + values: futures::stream::iter(stream).boxed(), } } }