diff --git a/Cargo.toml b/Cargo.toml index 6f7d31f0ec..56f6eee90f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -73,6 +73,10 @@ pretty_assertions = "0.6.1" name = "nu" path = "src/lib.rs" +[[bin]] +name = "nu_plugin_sum" +path = "src/plugins/sum.rs" + [[bin]] name = "nu_plugin_inc" path = "src/plugins/inc.rs" diff --git a/src/cli.rs b/src/cli.rs index b1550165d9..27bbf83443 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -64,7 +64,6 @@ pub async fn cli() -> Result<(), Box> { command("get", get::get), command("enter", enter::enter), command("exit", exit::exit), - command("inc", plugin_inc::plugin_inc), command("lines", lines::lines), command("pick", pick::pick), command("split-column", split_column::split_column), @@ -82,6 +81,8 @@ pub async fn cli() -> Result<(), Box> { Arc::new(Config), Arc::new(SkipWhile), command("sort-by", sort_by::sort_by), + command("inc", |x| plugin::plugin("inc".into(), x)), + command("sum", |x| plugin::plugin("sum".into(), x)), ]); context.add_sinks(vec![ diff --git a/src/commands.rs b/src/commands.rs index 9d4f94d683..c5a9a4778c 100644 --- a/src/commands.rs +++ b/src/commands.rs @@ -18,7 +18,7 @@ crate mod lines; crate mod ls; crate mod open; crate mod pick; -crate mod plugin_inc; +crate mod plugin; crate mod ps; crate mod reject; crate mod save; diff --git a/src/commands/plugin.rs b/src/commands/plugin.rs new file mode 100644 index 0000000000..b3b85a2bfd --- /dev/null +++ b/src/commands/plugin.rs @@ -0,0 +1,117 @@ +use crate::errors::ShellError; +use crate::parser::registry::{CommandConfig, PositionalType}; +use crate::parser::Spanned; +use crate::prelude::*; +use serde::{self, Deserialize, Serialize}; +use std::io::prelude::*; +use std::io::BufReader; +use std::io::{Read, Write}; +use subprocess::Exec; + +#[derive(Debug, Serialize, Deserialize)] +pub struct JsonRpc { + jsonrpc: String, + pub method: String, + pub params: T, +} +impl JsonRpc { + pub fn new>(method: U, params: T) -> Self { + JsonRpc { + jsonrpc: "2.0".into(), + method: method.into(), + params, + } + } +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(tag = "method")] +#[allow(non_camel_case_types)] +pub enum NuResult { + response { params: VecDeque }, +} + +pub fn plugin(plugin_name: String, args: CommandArgs) -> Result { + let input = args.input; + let args = if let Some(ref positional) = args.args.positional { + positional.clone() + } else { + vec![] + }; + + let mut path = std::path::PathBuf::from("."); + path.push("target"); + path.push("debug"); + path.push(format!("nu_plugin_{}", plugin_name)); + let mut child = std::process::Command::new(path) + .stdin(std::process::Stdio::piped()) + .stdout(std::process::Stdio::piped()) + .spawn() + .expect("Failed to spawn child process"); + + { + let mut stdin = child.stdin.as_mut().expect("Failed to open stdin"); + let mut stdout = child.stdout.as_mut().expect("Failed to open stdout"); + + let mut reader = BufReader::new(stdout); + + let request = JsonRpc::new("init", args.clone()); + let request_raw = serde_json::to_string(&request).unwrap(); + stdin.write(format!("{}\n", request_raw).as_bytes()); + } + let mut eos = VecDeque::new(); + eos.push_back(Value::Primitive(Primitive::EndOfStream)); + + let stream = input + .chain(eos) + .map(move |v| match v { + Value::Primitive(Primitive::EndOfStream) => { + let mut stdin = child.stdin.as_mut().expect("Failed to open stdin"); + let mut stdout = child.stdout.as_mut().expect("Failed to open stdout"); + + let mut reader = BufReader::new(stdout); + let request: JsonRpc> = JsonRpc::new("quit", vec![]); + let request_raw = serde_json::to_string(&request).unwrap(); + stdin.write(format!("{}\n", request_raw).as_bytes()); + + VecDeque::new() + } + _ => { + let mut stdin = child.stdin.as_mut().expect("Failed to open stdin"); + let mut stdout = child.stdout.as_mut().expect("Failed to open stdout"); + + let mut reader = BufReader::new(stdout); + + let request = JsonRpc::new("filter", v); + let request_raw = serde_json::to_string(&request).unwrap(); + stdin.write(format!("{}\n", request_raw).as_bytes()); + + let mut input = String::new(); + match reader.read_line(&mut input) { + Ok(_) => { + let response = serde_json::from_str::(&input); + match response { + Ok(NuResult::response { params }) => params, + Err(_) => { + let mut result = VecDeque::new(); + result.push_back(ReturnValue::Value(Value::Error(Box::new( + ShellError::string("Error while processing input"), + )))); + result + } + } + } + Err(x) => { + let mut result = VecDeque::new(); + result.push_back(ReturnValue::Value(Value::Error(Box::new( + ShellError::string("Error while processing input"), + )))); + result + } + } + } + }) + .flatten(); + + Ok(stream.boxed()) +} diff --git a/src/commands/plugin_inc.rs b/src/commands/plugin_inc.rs deleted file mode 100644 index 55ae2c3e92..0000000000 --- a/src/commands/plugin_inc.rs +++ /dev/null @@ -1,95 +0,0 @@ -use crate::errors::ShellError; -use crate::parser::registry::{CommandConfig, PositionalType}; -use crate::parser::Spanned; -use crate::prelude::*; -use serde::{self, Deserialize, Serialize}; -use std::io::prelude::*; -use std::io::BufReader; -use std::io::{Read, Write}; -use subprocess::Exec; - -#[derive(Debug, Serialize, Deserialize)] -pub struct JsonRpc { - jsonrpc: String, - pub method: String, - pub params: T, -} -impl JsonRpc { - pub fn new>(method: U, params: T) -> Self { - JsonRpc { - jsonrpc: "2.0".into(), - method: method.into(), - params, - } - } -} - -#[derive(Debug, Serialize, Deserialize)] -#[serde(tag = "method")] -#[allow(non_camel_case_types)] -pub enum NuResult { - response { params: VecDeque }, -} - -pub fn plugin_inc(args: CommandArgs) -> Result { - let input = args.input; - let args = if let Some(ref positional) = args.args.positional { - positional.clone() - } else { - vec![] - }; - - let stream = input - .map(move |v| { - let mut process = Exec::cmd("./target/debug/nu_plugin_inc"); - process = process.stdout(subprocess::Redirection::Pipe); - process = process.stdin(subprocess::Redirection::Pipe); - - let mut popen = process.popen().unwrap(); - - let mut stdout = popen.stdout.take().unwrap(); - let mut stdin = popen.stdin.take().unwrap(); - let mut reader = BufReader::new(stdout); - - let request = JsonRpc::new("init", args.clone()); - let request_raw = serde_json::to_string(&request).unwrap(); - stdin.write(format!("{}\n", request_raw).as_bytes()); - - let request = JsonRpc::new("filter", v); - let request_raw = serde_json::to_string(&request).unwrap(); - stdin.write(format!("{}\n", request_raw).as_bytes()); - - let mut input = String::new(); - match reader.read_line(&mut input) { - Ok(_) => { - let response = serde_json::from_str::(&input); - match response { - Ok(NuResult::response { params }) => { - let request: JsonRpc> = - JsonRpc::new("quit", vec![]); - let request_raw = serde_json::to_string(&request).unwrap(); - stdin.write(format!("{}\n", request_raw).as_bytes()); - params - } - Err(_) => { - let mut result = VecDeque::new(); - result.push_back(ReturnValue::Value(Value::Error(Box::new( - ShellError::string("Error while processing input"), - )))); - result - } - } - } - Err(x) => { - let mut result = VecDeque::new(); - result.push_back(ReturnValue::Value(Value::Error(Box::new( - ShellError::string("Error while processing input"), - )))); - result - } - } - }) - .flatten(); - - Ok(stream.boxed()) -} diff --git a/src/object/base.rs b/src/object/base.rs index ac94427d79..b07c431611 100644 --- a/src/object/base.rs +++ b/src/object/base.rs @@ -40,6 +40,8 @@ pub enum Primitive { String(String), Boolean(bool), Date(DateTime), + + EndOfStream, } impl Serialize for Primitive { @@ -49,6 +51,7 @@ impl Serialize for Primitive { { match self { Primitive::Nothing => serializer.serialize_i32(0), + Primitive::EndOfStream => serializer.serialize_i32(0), Primitive::Int(i) => serializer.serialize_i64(*i), Primitive::Float(OF64 { inner: f }) => serializer.serialize_f64(f.into_inner()), Primitive::Bytes(b) => serializer.serialize_u128(*b), @@ -65,6 +68,7 @@ impl Primitive { match self { Nothing => "nothing", + EndOfStream => "end-of-stream", Int(_) => "int", Float(_) => "float", Bytes(_) => "bytes", @@ -80,6 +84,7 @@ impl Primitive { match self { Nothing => write!(f, "Nothing"), + EndOfStream => write!(f, "EndOfStream"), Int(int) => write!(f, "{}", int), Float(float) => write!(f, "{:?}", float), Bytes(bytes) => write!(f, "{}", bytes), @@ -92,6 +97,7 @@ impl Primitive { crate fn format(&self, field_name: Option<&DataDescriptor>) -> String { match self { Primitive::Nothing => format!("{}", Color::Black.bold().paint("-")), + Primitive::EndOfStream => format!("{}", Color::Black.bold().paint("-")), Primitive::Bytes(b) => { let byte = byte_unit::Byte::from_bytes(*b); diff --git a/src/plugins/sum.rs b/src/plugins/sum.rs new file mode 100644 index 0000000000..431452dc21 --- /dev/null +++ b/src/plugins/sum.rs @@ -0,0 +1,83 @@ +use nu::{Primitive, ReturnValue, ShellError, Spanned, Value}; +use serde::{Deserialize, Serialize}; +use std::io; + +/// A wrapper for proactive notifications to the IDE (eg. diagnostics). These must +/// follow the JSON 2.0 RPC spec + +#[derive(Debug, Serialize, Deserialize)] +pub struct JsonRpc { + jsonrpc: String, + pub method: String, + pub params: Vec, +} +impl JsonRpc { + pub fn new>(method: U, params: Vec) -> Self { + JsonRpc { + jsonrpc: "2.0".into(), + method: method.into(), + params, + } + } +} + +fn send_response(result: Vec) { + let response = JsonRpc::new("response", result); + let response_raw = serde_json::to_string(&response).unwrap(); + println!("{}", response_raw); +} +#[derive(Debug, Serialize, Deserialize)] +#[serde(tag = "method")] +#[allow(non_camel_case_types)] +pub enum NuCommand { + init { params: Vec> }, + filter { params: Value }, + quit, +} + +fn main() -> Result<(), Box> { + let mut total = 0i64; + + loop { + let mut input = String::new(); + match io::stdin().read_line(&mut input) { + Ok(_) => { + let command = serde_json::from_str::(&input); + + match command { + Ok(NuCommand::init { .. }) => {} + Ok(NuCommand::filter { params }) => match params { + Value::Primitive(Primitive::Int(i)) => { + total += i as i64; + send_response(vec![ReturnValue::Value(Value::int(total))]); + } + Value::Primitive(Primitive::Bytes(b)) => { + total += b as i64; + send_response(vec![ReturnValue::Value(Value::bytes(total as u128))]); + } + _ => { + send_response(vec![ReturnValue::Value(Value::Error(Box::new( + ShellError::string("Unrecognized type in stream"), + )))]); + } + }, + Ok(NuCommand::quit) => { + break; + } + Err(_) => { + send_response(vec![ReturnValue::Value(Value::Error(Box::new( + ShellError::string("Unrecognized type in stream"), + )))]); + } + } + } + Err(_) => { + send_response(vec![ReturnValue::Value(Value::Error(Box::new( + ShellError::string("Unrecognized type in stream"), + )))]); + } + } + } + + Ok(()) +} diff --git a/src/prelude.rs b/src/prelude.rs index 241b8aad4f..90b7945cb1 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -4,7 +4,7 @@ crate use crate::context::Context; crate use crate::env::host::handle_unexpected; crate use crate::env::{Environment, Host}; crate use crate::errors::ShellError; -crate use crate::object::Value; +crate use crate::object::{Primitive, Value}; crate use crate::stream::{single_output, InputStream, OutputStream}; crate use crate::Text; crate use futures::{FutureExt, StreamExt};