Add second plugin

This commit is contained in:
Jonathan Turner 2019-06-28 04:47:24 +12:00
parent 78bb5647fc
commit 47f23cacc7
8 changed files with 214 additions and 98 deletions

View file

@ -73,6 +73,10 @@ pretty_assertions = "0.6.1"
name = "nu"
path = "src/lib.rs"
[[bin]]
name = "nu_plugin_sum"
path = "src/plugins/sum.rs"
[[bin]]
name = "nu_plugin_inc"
path = "src/plugins/inc.rs"

View file

@ -64,7 +64,6 @@ pub async fn cli() -> Result<(), Box<dyn Error>> {
command("get", get::get),
command("enter", enter::enter),
command("exit", exit::exit),
command("inc", plugin_inc::plugin_inc),
command("lines", lines::lines),
command("pick", pick::pick),
command("split-column", split_column::split_column),
@ -82,6 +81,8 @@ pub async fn cli() -> Result<(), Box<dyn Error>> {
Arc::new(Config),
Arc::new(SkipWhile),
command("sort-by", sort_by::sort_by),
command("inc", |x| plugin::plugin("inc".into(), x)),
command("sum", |x| plugin::plugin("sum".into(), x)),
]);
context.add_sinks(vec![

View file

@ -18,7 +18,7 @@ crate mod lines;
crate mod ls;
crate mod open;
crate mod pick;
crate mod plugin_inc;
crate mod plugin;
crate mod ps;
crate mod reject;
crate mod save;

117
src/commands/plugin.rs Normal file
View file

@ -0,0 +1,117 @@
use crate::errors::ShellError;
use crate::parser::registry::{CommandConfig, PositionalType};
use crate::parser::Spanned;
use crate::prelude::*;
use serde::{self, Deserialize, Serialize};
use std::io::prelude::*;
use std::io::BufReader;
use std::io::{Read, Write};
use subprocess::Exec;
#[derive(Debug, Serialize, Deserialize)]
pub struct JsonRpc<T> {
jsonrpc: String,
pub method: String,
pub params: T,
}
impl<T> JsonRpc<T> {
pub fn new<U: Into<String>>(method: U, params: T) -> Self {
JsonRpc {
jsonrpc: "2.0".into(),
method: method.into(),
params,
}
}
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "method")]
#[allow(non_camel_case_types)]
pub enum NuResult {
response { params: VecDeque<ReturnValue> },
}
pub fn plugin(plugin_name: String, args: CommandArgs) -> Result<OutputStream, ShellError> {
let input = args.input;
let args = if let Some(ref positional) = args.args.positional {
positional.clone()
} else {
vec![]
};
let mut path = std::path::PathBuf::from(".");
path.push("target");
path.push("debug");
path.push(format!("nu_plugin_{}", plugin_name));
let mut child = std::process::Command::new(path)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.spawn()
.expect("Failed to spawn child process");
{
let mut stdin = child.stdin.as_mut().expect("Failed to open stdin");
let mut stdout = child.stdout.as_mut().expect("Failed to open stdout");
let mut reader = BufReader::new(stdout);
let request = JsonRpc::new("init", args.clone());
let request_raw = serde_json::to_string(&request).unwrap();
stdin.write(format!("{}\n", request_raw).as_bytes());
}
let mut eos = VecDeque::new();
eos.push_back(Value::Primitive(Primitive::EndOfStream));
let stream = input
.chain(eos)
.map(move |v| match v {
Value::Primitive(Primitive::EndOfStream) => {
let mut stdin = child.stdin.as_mut().expect("Failed to open stdin");
let mut stdout = child.stdout.as_mut().expect("Failed to open stdout");
let mut reader = BufReader::new(stdout);
let request: JsonRpc<std::vec::Vec<Value>> = JsonRpc::new("quit", vec![]);
let request_raw = serde_json::to_string(&request).unwrap();
stdin.write(format!("{}\n", request_raw).as_bytes());
VecDeque::new()
}
_ => {
let mut stdin = child.stdin.as_mut().expect("Failed to open stdin");
let mut stdout = child.stdout.as_mut().expect("Failed to open stdout");
let mut reader = BufReader::new(stdout);
let request = JsonRpc::new("filter", v);
let request_raw = serde_json::to_string(&request).unwrap();
stdin.write(format!("{}\n", request_raw).as_bytes());
let mut input = String::new();
match reader.read_line(&mut input) {
Ok(_) => {
let response = serde_json::from_str::<NuResult>(&input);
match response {
Ok(NuResult::response { params }) => params,
Err(_) => {
let mut result = VecDeque::new();
result.push_back(ReturnValue::Value(Value::Error(Box::new(
ShellError::string("Error while processing input"),
))));
result
}
}
}
Err(x) => {
let mut result = VecDeque::new();
result.push_back(ReturnValue::Value(Value::Error(Box::new(
ShellError::string("Error while processing input"),
))));
result
}
}
}
})
.flatten();
Ok(stream.boxed())
}

View file

@ -1,95 +0,0 @@
use crate::errors::ShellError;
use crate::parser::registry::{CommandConfig, PositionalType};
use crate::parser::Spanned;
use crate::prelude::*;
use serde::{self, Deserialize, Serialize};
use std::io::prelude::*;
use std::io::BufReader;
use std::io::{Read, Write};
use subprocess::Exec;
#[derive(Debug, Serialize, Deserialize)]
pub struct JsonRpc<T> {
jsonrpc: String,
pub method: String,
pub params: T,
}
impl<T> JsonRpc<T> {
pub fn new<U: Into<String>>(method: U, params: T) -> Self {
JsonRpc {
jsonrpc: "2.0".into(),
method: method.into(),
params,
}
}
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "method")]
#[allow(non_camel_case_types)]
pub enum NuResult {
response { params: VecDeque<ReturnValue> },
}
pub fn plugin_inc(args: CommandArgs) -> Result<OutputStream, ShellError> {
let input = args.input;
let args = if let Some(ref positional) = args.args.positional {
positional.clone()
} else {
vec![]
};
let stream = input
.map(move |v| {
let mut process = Exec::cmd("./target/debug/nu_plugin_inc");
process = process.stdout(subprocess::Redirection::Pipe);
process = process.stdin(subprocess::Redirection::Pipe);
let mut popen = process.popen().unwrap();
let mut stdout = popen.stdout.take().unwrap();
let mut stdin = popen.stdin.take().unwrap();
let mut reader = BufReader::new(stdout);
let request = JsonRpc::new("init", args.clone());
let request_raw = serde_json::to_string(&request).unwrap();
stdin.write(format!("{}\n", request_raw).as_bytes());
let request = JsonRpc::new("filter", v);
let request_raw = serde_json::to_string(&request).unwrap();
stdin.write(format!("{}\n", request_raw).as_bytes());
let mut input = String::new();
match reader.read_line(&mut input) {
Ok(_) => {
let response = serde_json::from_str::<NuResult>(&input);
match response {
Ok(NuResult::response { params }) => {
let request: JsonRpc<std::vec::Vec<Value>> =
JsonRpc::new("quit", vec![]);
let request_raw = serde_json::to_string(&request).unwrap();
stdin.write(format!("{}\n", request_raw).as_bytes());
params
}
Err(_) => {
let mut result = VecDeque::new();
result.push_back(ReturnValue::Value(Value::Error(Box::new(
ShellError::string("Error while processing input"),
))));
result
}
}
}
Err(x) => {
let mut result = VecDeque::new();
result.push_back(ReturnValue::Value(Value::Error(Box::new(
ShellError::string("Error while processing input"),
))));
result
}
}
})
.flatten();
Ok(stream.boxed())
}

View file

@ -40,6 +40,8 @@ pub enum Primitive {
String(String),
Boolean(bool),
Date(DateTime<Utc>),
EndOfStream,
}
impl Serialize for Primitive {
@ -49,6 +51,7 @@ impl Serialize for Primitive {
{
match self {
Primitive::Nothing => serializer.serialize_i32(0),
Primitive::EndOfStream => serializer.serialize_i32(0),
Primitive::Int(i) => serializer.serialize_i64(*i),
Primitive::Float(OF64 { inner: f }) => serializer.serialize_f64(f.into_inner()),
Primitive::Bytes(b) => serializer.serialize_u128(*b),
@ -65,6 +68,7 @@ impl Primitive {
match self {
Nothing => "nothing",
EndOfStream => "end-of-stream",
Int(_) => "int",
Float(_) => "float",
Bytes(_) => "bytes",
@ -80,6 +84,7 @@ impl Primitive {
match self {
Nothing => write!(f, "Nothing"),
EndOfStream => write!(f, "EndOfStream"),
Int(int) => write!(f, "{}", int),
Float(float) => write!(f, "{:?}", float),
Bytes(bytes) => write!(f, "{}", bytes),
@ -92,6 +97,7 @@ impl Primitive {
crate fn format(&self, field_name: Option<&DataDescriptor>) -> String {
match self {
Primitive::Nothing => format!("{}", Color::Black.bold().paint("-")),
Primitive::EndOfStream => format!("{}", Color::Black.bold().paint("-")),
Primitive::Bytes(b) => {
let byte = byte_unit::Byte::from_bytes(*b);

83
src/plugins/sum.rs Normal file
View file

@ -0,0 +1,83 @@
use nu::{Primitive, ReturnValue, ShellError, Spanned, Value};
use serde::{Deserialize, Serialize};
use std::io;
/// A wrapper for proactive notifications to the IDE (eg. diagnostics). These must
/// follow the JSON 2.0 RPC spec
#[derive(Debug, Serialize, Deserialize)]
pub struct JsonRpc<T> {
jsonrpc: String,
pub method: String,
pub params: Vec<T>,
}
impl<T> JsonRpc<T> {
pub fn new<U: Into<String>>(method: U, params: Vec<T>) -> Self {
JsonRpc {
jsonrpc: "2.0".into(),
method: method.into(),
params,
}
}
}
fn send_response<T: Serialize>(result: Vec<T>) {
let response = JsonRpc::new("response", result);
let response_raw = serde_json::to_string(&response).unwrap();
println!("{}", response_raw);
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "method")]
#[allow(non_camel_case_types)]
pub enum NuCommand {
init { params: Vec<Spanned<Value>> },
filter { params: Value },
quit,
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut total = 0i64;
loop {
let mut input = String::new();
match io::stdin().read_line(&mut input) {
Ok(_) => {
let command = serde_json::from_str::<NuCommand>(&input);
match command {
Ok(NuCommand::init { .. }) => {}
Ok(NuCommand::filter { params }) => match params {
Value::Primitive(Primitive::Int(i)) => {
total += i as i64;
send_response(vec![ReturnValue::Value(Value::int(total))]);
}
Value::Primitive(Primitive::Bytes(b)) => {
total += b as i64;
send_response(vec![ReturnValue::Value(Value::bytes(total as u128))]);
}
_ => {
send_response(vec![ReturnValue::Value(Value::Error(Box::new(
ShellError::string("Unrecognized type in stream"),
)))]);
}
},
Ok(NuCommand::quit) => {
break;
}
Err(_) => {
send_response(vec![ReturnValue::Value(Value::Error(Box::new(
ShellError::string("Unrecognized type in stream"),
)))]);
}
}
}
Err(_) => {
send_response(vec![ReturnValue::Value(Value::Error(Box::new(
ShellError::string("Unrecognized type in stream"),
)))]);
}
}
}
Ok(())
}

View file

@ -4,7 +4,7 @@ crate use crate::context::Context;
crate use crate::env::host::handle_unexpected;
crate use crate::env::{Environment, Host};
crate use crate::errors::ShellError;
crate use crate::object::Value;
crate use crate::object::{Primitive, Value};
crate use crate::stream::{single_output, InputStream, OutputStream};
crate use crate::Text;
crate use futures::{FutureExt, StreamExt};