mirror of
https://github.com/nushell/nushell
synced 2025-01-14 14:14:13 +00:00
Add support for stderr and exit code (#4647)
This commit is contained in:
parent
2b377391c2
commit
0c3ea636fb
23 changed files with 290 additions and 73 deletions
|
@ -18,7 +18,7 @@ pub fn print_pipeline_data(
|
||||||
|
|
||||||
let stdout = std::io::stdout();
|
let stdout = std::io::stdout();
|
||||||
|
|
||||||
if let PipelineData::RawStream(stream, _, _) = input {
|
if let PipelineData::ExternalStream { stdout: stream, .. } = input {
|
||||||
for s in stream {
|
for s in stream {
|
||||||
let _ = stdout.lock().write_all(s?.as_binary()?);
|
let _ = stdout.lock().write_all(s?.as_binary()?);
|
||||||
}
|
}
|
||||||
|
|
|
@ -99,7 +99,7 @@ fn into_binary(
|
||||||
let column_paths: Vec<CellPath> = call.rest(engine_state, stack, 0)?;
|
let column_paths: Vec<CellPath> = call.rest(engine_state, stack, 0)?;
|
||||||
|
|
||||||
match input {
|
match input {
|
||||||
PipelineData::RawStream(stream, ..) => {
|
PipelineData::ExternalStream { stdout: stream, .. } => {
|
||||||
// TODO: in the future, we may want this to stream out, converting each to bytes
|
// TODO: in the future, we may want this to stream out, converting each to bytes
|
||||||
let output = stream.into_bytes()?;
|
let output = stream.into_bytes()?;
|
||||||
Ok(Value::Binary {
|
Ok(Value::Binary {
|
||||||
|
|
|
@ -150,7 +150,7 @@ fn string_helper(
|
||||||
}
|
}
|
||||||
|
|
||||||
match input {
|
match input {
|
||||||
PipelineData::RawStream(stream, ..) => {
|
PipelineData::ExternalStream { stdout: stream, .. } => {
|
||||||
// TODO: in the future, we may want this to stream out, converting each to bytes
|
// TODO: in the future, we may want this to stream out, converting each to bytes
|
||||||
let output = stream.into_string()?;
|
let output = stream.into_string()?;
|
||||||
Ok(Value::String {
|
Ok(Value::String {
|
||||||
|
|
|
@ -28,7 +28,7 @@ impl Command for Describe {
|
||||||
input: PipelineData,
|
input: PipelineData,
|
||||||
) -> Result<PipelineData, ShellError> {
|
) -> Result<PipelineData, ShellError> {
|
||||||
let head = call.head;
|
let head = call.head;
|
||||||
if matches!(input, PipelineData::RawStream(..)) {
|
if matches!(input, PipelineData::ExternalStream { .. }) {
|
||||||
Ok(PipelineData::Value(
|
Ok(PipelineData::Value(
|
||||||
Value::string("raw input", call.head),
|
Value::string("raw input", call.head),
|
||||||
None,
|
None,
|
||||||
|
|
|
@ -132,6 +132,7 @@ pub fn create_default_context(cwd: impl AsRef<Path>) -> EngineState {
|
||||||
// System
|
// System
|
||||||
bind_command! {
|
bind_command! {
|
||||||
Benchmark,
|
Benchmark,
|
||||||
|
Complete,
|
||||||
Exec,
|
Exec,
|
||||||
External,
|
External,
|
||||||
Ps,
|
Ps,
|
||||||
|
|
|
@ -120,15 +120,17 @@ impl Command for Open {
|
||||||
|
|
||||||
let buf_reader = BufReader::new(file);
|
let buf_reader = BufReader::new(file);
|
||||||
|
|
||||||
let output = PipelineData::RawStream(
|
let output = PipelineData::ExternalStream {
|
||||||
RawStream::new(
|
stdout: RawStream::new(
|
||||||
Box::new(BufferedReader { input: buf_reader }),
|
Box::new(BufferedReader { input: buf_reader }),
|
||||||
ctrlc,
|
ctrlc,
|
||||||
call_span,
|
call_span,
|
||||||
),
|
),
|
||||||
call_span,
|
stderr: None,
|
||||||
None,
|
exit_code: None,
|
||||||
);
|
span: call_span,
|
||||||
|
metadata: None,
|
||||||
|
};
|
||||||
|
|
||||||
let ext = if raw {
|
let ext = if raw {
|
||||||
None
|
None
|
||||||
|
|
|
@ -86,7 +86,7 @@ fn getcol(
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(move |x| Value::String { val: x, span })
|
.map(move |x| Value::String { val: x, span })
|
||||||
.into_pipeline_data(engine_state.ctrlc.clone())),
|
.into_pipeline_data(engine_state.ctrlc.clone())),
|
||||||
PipelineData::Value(..) | PipelineData::RawStream(..) => {
|
PipelineData::Value(..) | PipelineData::ExternalStream { .. } => {
|
||||||
let cols = vec![];
|
let cols = vec![];
|
||||||
let vals = vec![];
|
let vals = vec![];
|
||||||
Ok(Value::Record { cols, vals, span }.into_pipeline_data())
|
Ok(Value::Record { cols, vals, span }.into_pipeline_data())
|
||||||
|
|
|
@ -158,7 +158,7 @@ impl Command for Each {
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.into_pipeline_data(ctrlc)),
|
.into_pipeline_data(ctrlc)),
|
||||||
PipelineData::RawStream(stream, ..) => Ok(stream
|
PipelineData::ExternalStream { stdout: stream, .. } => Ok(stream
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.enumerate()
|
.enumerate()
|
||||||
.map(move |(idx, x)| {
|
.map(move |(idx, x)| {
|
||||||
|
|
|
@ -111,7 +111,7 @@ fn getcol(
|
||||||
.map(move |x| Value::String { val: x, span })
|
.map(move |x| Value::String { val: x, span })
|
||||||
.into_pipeline_data(engine_state.ctrlc.clone()))
|
.into_pipeline_data(engine_state.ctrlc.clone()))
|
||||||
}
|
}
|
||||||
PipelineData::Value(..) | PipelineData::RawStream(..) => {
|
PipelineData::Value(..) | PipelineData::ExternalStream { .. } => {
|
||||||
let cols = vec![];
|
let cols = vec![];
|
||||||
let vals = vec![];
|
let vals = vec![];
|
||||||
Ok(Value::Record { cols, vals, span }.into_pipeline_data())
|
Ok(Value::Record { cols, vals, span }.into_pipeline_data())
|
||||||
|
|
|
@ -110,7 +110,7 @@ impl Command for Lines {
|
||||||
format!("Not supported input: {}", val.as_string()?),
|
format!("Not supported input: {}", val.as_string()?),
|
||||||
call.head,
|
call.head,
|
||||||
)),
|
)),
|
||||||
PipelineData::RawStream(..) => {
|
PipelineData::ExternalStream { .. } => {
|
||||||
let config = stack.get_config()?;
|
let config = stack.get_config()?;
|
||||||
|
|
||||||
//FIXME: Make sure this can fail in the future to let the user
|
//FIXME: Make sure this can fail in the future to let the user
|
||||||
|
|
|
@ -200,7 +200,7 @@ impl Command for ParEach {
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.flatten()
|
.flatten()
|
||||||
.into_pipeline_data(ctrlc)),
|
.into_pipeline_data(ctrlc)),
|
||||||
PipelineData::RawStream(stream, ..) => Ok(stream
|
PipelineData::ExternalStream { stdout: stream, .. } => Ok(stream
|
||||||
.enumerate()
|
.enumerate()
|
||||||
.par_bridge()
|
.par_bridge()
|
||||||
.map(move |(idx, x)| {
|
.map(move |(idx, x)| {
|
||||||
|
|
|
@ -76,7 +76,12 @@ impl Command for Skip {
|
||||||
let ctrlc = engine_state.ctrlc.clone();
|
let ctrlc = engine_state.ctrlc.clone();
|
||||||
|
|
||||||
match input {
|
match input {
|
||||||
PipelineData::RawStream(stream, bytes_span, metadata) => {
|
PipelineData::ExternalStream {
|
||||||
|
stdout: stream,
|
||||||
|
span: bytes_span,
|
||||||
|
metadata,
|
||||||
|
..
|
||||||
|
} => {
|
||||||
let mut remaining = n;
|
let mut remaining = n;
|
||||||
let mut output = vec![];
|
let mut output = vec![];
|
||||||
|
|
||||||
|
|
|
@ -50,7 +50,7 @@ impl Command for Wrap {
|
||||||
span,
|
span,
|
||||||
})
|
})
|
||||||
.into_pipeline_data(engine_state.ctrlc.clone())),
|
.into_pipeline_data(engine_state.ctrlc.clone())),
|
||||||
PipelineData::RawStream(..) => Ok(Value::Record {
|
PipelineData::ExternalStream { .. } => Ok(Value::Record {
|
||||||
cols: vec![name],
|
cols: vec![name],
|
||||||
vals: vec![input.into_value(call.head)],
|
vals: vec![input.into_value(call.head)],
|
||||||
span,
|
span,
|
||||||
|
|
|
@ -356,17 +356,19 @@ fn response_to_buffer(
|
||||||
) -> nu_protocol::PipelineData {
|
) -> nu_protocol::PipelineData {
|
||||||
let buffered_input = BufReader::new(response);
|
let buffered_input = BufReader::new(response);
|
||||||
|
|
||||||
PipelineData::RawStream(
|
PipelineData::ExternalStream {
|
||||||
RawStream::new(
|
stdout: RawStream::new(
|
||||||
Box::new(BufferedReader {
|
Box::new(BufferedReader {
|
||||||
input: buffered_input,
|
input: buffered_input,
|
||||||
}),
|
}),
|
||||||
engine_state.ctrlc.clone(),
|
engine_state.ctrlc.clone(),
|
||||||
span,
|
span,
|
||||||
),
|
),
|
||||||
|
stderr: None,
|
||||||
|
exit_code: None,
|
||||||
span,
|
span,
|
||||||
None,
|
metadata: None,
|
||||||
)
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Only panics if the user agent is invalid but we define it statically so either
|
// Only panics if the user agent is invalid but we define it statically so either
|
||||||
|
|
|
@ -378,17 +378,19 @@ fn response_to_buffer(
|
||||||
) -> nu_protocol::PipelineData {
|
) -> nu_protocol::PipelineData {
|
||||||
let buffered_input = BufReader::new(response);
|
let buffered_input = BufReader::new(response);
|
||||||
|
|
||||||
PipelineData::RawStream(
|
PipelineData::ExternalStream {
|
||||||
RawStream::new(
|
stdout: RawStream::new(
|
||||||
Box::new(BufferedReader {
|
Box::new(BufferedReader {
|
||||||
input: buffered_input,
|
input: buffered_input,
|
||||||
}),
|
}),
|
||||||
engine_state.ctrlc.clone(),
|
engine_state.ctrlc.clone(),
|
||||||
span,
|
span,
|
||||||
),
|
),
|
||||||
|
stderr: None,
|
||||||
|
exit_code: None,
|
||||||
span,
|
span,
|
||||||
None,
|
metadata: None,
|
||||||
)
|
}
|
||||||
}
|
}
|
||||||
// Only panics if the user agent is invalid but we define it statically so either
|
// Only panics if the user agent is invalid but we define it statically so either
|
||||||
// it always or never fails
|
// it always or never fails
|
||||||
|
|
|
@ -44,7 +44,7 @@ impl Command for Decode {
|
||||||
let encoding: Spanned<String> = call.req(engine_state, stack, 0)?;
|
let encoding: Spanned<String> = call.req(engine_state, stack, 0)?;
|
||||||
|
|
||||||
match input {
|
match input {
|
||||||
PipelineData::RawStream(stream, ..) => {
|
PipelineData::ExternalStream { stdout: stream, .. } => {
|
||||||
let bytes: Vec<u8> = stream.into_bytes()?.item;
|
let bytes: Vec<u8> = stream.into_bytes()?.item;
|
||||||
|
|
||||||
let encoding = match Encoding::for_label(encoding.item.as_bytes()) {
|
let encoding = match Encoding::for_label(encoding.item.as_bytes()) {
|
||||||
|
|
100
crates/nu-command/src/system/complete.rs
Normal file
100
crates/nu-command/src/system/complete.rs
Normal file
|
@ -0,0 +1,100 @@
|
||||||
|
use nu_protocol::{
|
||||||
|
ast::Call,
|
||||||
|
engine::{Command, EngineState, Stack},
|
||||||
|
Category, Example, IntoPipelineData, PipelineData, ShellError, Signature, Value,
|
||||||
|
};
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct Complete;
|
||||||
|
|
||||||
|
impl Command for Complete {
|
||||||
|
fn name(&self) -> &str {
|
||||||
|
"complete"
|
||||||
|
}
|
||||||
|
|
||||||
|
fn signature(&self) -> Signature {
|
||||||
|
Signature::build("complete").category(Category::System)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn usage(&self) -> &str {
|
||||||
|
"Complete the external piped in, collecting outputs and exit code"
|
||||||
|
}
|
||||||
|
|
||||||
|
fn run(
|
||||||
|
&self,
|
||||||
|
_engine_state: &EngineState,
|
||||||
|
_stack: &mut Stack,
|
||||||
|
call: &Call,
|
||||||
|
input: PipelineData,
|
||||||
|
) -> Result<nu_protocol::PipelineData, nu_protocol::ShellError> {
|
||||||
|
match input {
|
||||||
|
PipelineData::ExternalStream {
|
||||||
|
stdout,
|
||||||
|
stderr,
|
||||||
|
exit_code,
|
||||||
|
..
|
||||||
|
} => {
|
||||||
|
let mut cols = vec!["stdout".to_string()];
|
||||||
|
let mut vals = vec![];
|
||||||
|
|
||||||
|
let stdout = stdout.into_bytes()?;
|
||||||
|
if let Ok(st) = String::from_utf8(stdout.item.clone()) {
|
||||||
|
vals.push(Value::String {
|
||||||
|
val: st,
|
||||||
|
span: stdout.span,
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
vals.push(Value::Binary {
|
||||||
|
val: stdout.item,
|
||||||
|
span: stdout.span,
|
||||||
|
})
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(stderr) = stderr {
|
||||||
|
cols.push("stderr".to_string());
|
||||||
|
let stderr = stderr.into_bytes()?;
|
||||||
|
if let Ok(st) = String::from_utf8(stderr.item.clone()) {
|
||||||
|
vals.push(Value::String {
|
||||||
|
val: st,
|
||||||
|
span: stderr.span,
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
vals.push(Value::Binary {
|
||||||
|
val: stderr.item,
|
||||||
|
span: stderr.span,
|
||||||
|
})
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(exit_code) = exit_code {
|
||||||
|
let mut v: Vec<_> = exit_code.collect();
|
||||||
|
|
||||||
|
if let Some(v) = v.pop() {
|
||||||
|
cols.push("exit_code".to_string());
|
||||||
|
vals.push(v);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(Value::Record {
|
||||||
|
cols,
|
||||||
|
vals,
|
||||||
|
span: call.head,
|
||||||
|
}
|
||||||
|
.into_pipeline_data())
|
||||||
|
}
|
||||||
|
_ => Err(ShellError::SpannedLabeledError(
|
||||||
|
"Complete only works with external streams".to_string(),
|
||||||
|
"complete only works on external streams".to_string(),
|
||||||
|
call.head,
|
||||||
|
)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn examples(&self) -> Vec<Example> {
|
||||||
|
vec![Example {
|
||||||
|
description: "Run the external completion",
|
||||||
|
example: "^external arg1 | complete",
|
||||||
|
result: None,
|
||||||
|
}]
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,4 +1,5 @@
|
||||||
mod benchmark;
|
mod benchmark;
|
||||||
|
mod complete;
|
||||||
mod exec;
|
mod exec;
|
||||||
mod ps;
|
mod ps;
|
||||||
mod run_external;
|
mod run_external;
|
||||||
|
@ -6,6 +7,7 @@ mod sys;
|
||||||
mod which_;
|
mod which_;
|
||||||
|
|
||||||
pub use benchmark::Benchmark;
|
pub use benchmark::Benchmark;
|
||||||
|
pub use complete::Complete;
|
||||||
pub use exec::Exec;
|
pub use exec::Exec;
|
||||||
pub use ps::Ps;
|
pub use ps::Ps;
|
||||||
pub use run_external::{External, ExternalCommand};
|
pub use run_external::{External, ExternalCommand};
|
||||||
|
|
|
@ -8,7 +8,7 @@ use std::sync::mpsc;
|
||||||
use nu_engine::env_to_strings;
|
use nu_engine::env_to_strings;
|
||||||
use nu_protocol::engine::{EngineState, Stack};
|
use nu_protocol::engine::{EngineState, Stack};
|
||||||
use nu_protocol::{ast::Call, engine::Command, ShellError, Signature, SyntaxShape, Value};
|
use nu_protocol::{ast::Call, engine::Command, ShellError, Signature, SyntaxShape, Value};
|
||||||
use nu_protocol::{Category, Example, PipelineData, RawStream, Span, Spanned};
|
use nu_protocol::{Category, Example, ListStream, PipelineData, RawStream, Span, Spanned};
|
||||||
|
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
|
|
||||||
|
@ -192,14 +192,51 @@ impl ExternalCommand {
|
||||||
let redirect_stderr = self.redirect_stderr;
|
let redirect_stderr = self.redirect_stderr;
|
||||||
let span = self.name.span;
|
let span = self.name.span;
|
||||||
let output_ctrlc = ctrlc.clone();
|
let output_ctrlc = ctrlc.clone();
|
||||||
let (tx, rx) = mpsc::channel();
|
let (stdout_tx, stdout_rx) = mpsc::channel();
|
||||||
|
let (stderr_tx, stderr_rx) = mpsc::channel();
|
||||||
|
let (exit_code_tx, exit_code_rx) = mpsc::channel();
|
||||||
|
|
||||||
std::thread::spawn(move || {
|
std::thread::spawn(move || {
|
||||||
// If this external is not the last expression, then its output is piped to a channel
|
// If this external is not the last expression, then its output is piped to a channel
|
||||||
// and we create a ListStream that can be consumed
|
// and we create a ListStream that can be consumed
|
||||||
|
|
||||||
if redirect_stderr {
|
if redirect_stderr {
|
||||||
let _ = child.stderr.take();
|
let stderr = child.stderr.take().ok_or_else(|| {
|
||||||
|
ShellError::ExternalCommand(
|
||||||
|
"Error taking stderr from external".to_string(),
|
||||||
|
"Redirects need access to stderr of an external command"
|
||||||
|
.to_string(),
|
||||||
|
span,
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
|
||||||
|
// Stderr is read using the Buffer reader. It will do so until there is an
|
||||||
|
// error or there are no more bytes to read
|
||||||
|
let mut buf_read = BufReader::with_capacity(OUTPUT_BUFFER_SIZE, stderr);
|
||||||
|
while let Ok(bytes) = buf_read.fill_buf() {
|
||||||
|
if bytes.is_empty() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// The Cow generated from the function represents the conversion
|
||||||
|
// from bytes to String. If no replacements are required, then the
|
||||||
|
// borrowed value is a proper UTF-8 string. The Owned option represents
|
||||||
|
// a string where the values had to be replaced, thus marking it as bytes
|
||||||
|
let bytes = bytes.to_vec();
|
||||||
|
let length = bytes.len();
|
||||||
|
buf_read.consume(length);
|
||||||
|
|
||||||
|
if let Some(ctrlc) = &ctrlc {
|
||||||
|
if ctrlc.load(Ordering::SeqCst) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
match stderr_tx.send(bytes) {
|
||||||
|
Ok(_) => continue,
|
||||||
|
Err(_) => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if redirect_stdout {
|
if redirect_stdout {
|
||||||
|
@ -234,7 +271,7 @@ impl ExternalCommand {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
match tx.send(bytes) {
|
match stdout_tx.send(bytes) {
|
||||||
Ok(_) => continue,
|
Ok(_) => continue,
|
||||||
Err(_) => break,
|
Err(_) => break,
|
||||||
}
|
}
|
||||||
|
@ -247,16 +284,42 @@ impl ExternalCommand {
|
||||||
err.to_string(),
|
err.to_string(),
|
||||||
span,
|
span,
|
||||||
)),
|
)),
|
||||||
Ok(_) => Ok(()),
|
Ok(x) => {
|
||||||
|
if let Some(code) = x.code() {
|
||||||
|
let _ = exit_code_tx.send(Value::Int {
|
||||||
|
val: code as i64,
|
||||||
|
span: head,
|
||||||
|
});
|
||||||
|
} else if x.success() {
|
||||||
|
let _ = exit_code_tx.send(Value::Int { val: 0, span: head });
|
||||||
|
} else {
|
||||||
|
let _ = exit_code_tx.send(Value::Int {
|
||||||
|
val: -1,
|
||||||
|
span: head,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
let receiver = ChannelReceiver::new(rx);
|
let stdout_receiver = ChannelReceiver::new(stdout_rx);
|
||||||
|
let stderr_receiver = ChannelReceiver::new(stderr_rx);
|
||||||
|
let exit_code_receiver = ValueReceiver::new(exit_code_rx);
|
||||||
|
|
||||||
Ok(PipelineData::RawStream(
|
Ok(PipelineData::ExternalStream {
|
||||||
RawStream::new(Box::new(receiver), output_ctrlc, head),
|
stdout: RawStream::new(Box::new(stdout_receiver), output_ctrlc.clone(), head),
|
||||||
head,
|
stderr: Some(RawStream::new(
|
||||||
None,
|
Box::new(stderr_receiver),
|
||||||
))
|
output_ctrlc.clone(),
|
||||||
|
head,
|
||||||
|
)),
|
||||||
|
exit_code: Some(ListStream::from_stream(
|
||||||
|
Box::new(exit_code_receiver),
|
||||||
|
output_ctrlc,
|
||||||
|
)),
|
||||||
|
span: head,
|
||||||
|
metadata: None,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -452,8 +515,8 @@ fn trim_enclosing_quotes(input: &str) -> String {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Receiver used for the ListStream
|
// Receiver used for the RawStream
|
||||||
// It implements iterator so it can be used as a ListStream
|
// It implements iterator so it can be used as a RawStream
|
||||||
struct ChannelReceiver {
|
struct ChannelReceiver {
|
||||||
rx: mpsc::Receiver<Vec<u8>>,
|
rx: mpsc::Receiver<Vec<u8>>,
|
||||||
}
|
}
|
||||||
|
@ -474,3 +537,26 @@ impl Iterator for ChannelReceiver {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Receiver used for the ListStream
|
||||||
|
// It implements iterator so it can be used as a ListStream
|
||||||
|
struct ValueReceiver {
|
||||||
|
rx: mpsc::Receiver<Value>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ValueReceiver {
|
||||||
|
pub fn new(rx: mpsc::Receiver<Value>) -> Self {
|
||||||
|
Self { rx }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Iterator for ValueReceiver {
|
||||||
|
type Item = Value;
|
||||||
|
|
||||||
|
fn next(&mut self) -> Option<Self::Item> {
|
||||||
|
match self.rx.recv() {
|
||||||
|
Ok(v) => Some(v),
|
||||||
|
Err(_) => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -62,21 +62,25 @@ impl Command for Table {
|
||||||
};
|
};
|
||||||
|
|
||||||
match input {
|
match input {
|
||||||
PipelineData::RawStream(..) => Ok(input),
|
PipelineData::ExternalStream { .. } => Ok(input),
|
||||||
PipelineData::Value(Value::Binary { val, .. }, ..) => Ok(PipelineData::RawStream(
|
PipelineData::Value(Value::Binary { val, .. }, ..) => {
|
||||||
RawStream::new(
|
Ok(PipelineData::ExternalStream {
|
||||||
Box::new(
|
stdout: RawStream::new(
|
||||||
vec![Ok(format!("{}\n", nu_pretty_hex::pretty_hex(&val))
|
Box::new(
|
||||||
.as_bytes()
|
vec![Ok(format!("{}\n", nu_pretty_hex::pretty_hex(&val))
|
||||||
.to_vec())]
|
.as_bytes()
|
||||||
.into_iter(),
|
.to_vec())]
|
||||||
|
.into_iter(),
|
||||||
|
),
|
||||||
|
ctrlc,
|
||||||
|
head,
|
||||||
),
|
),
|
||||||
ctrlc,
|
stderr: None,
|
||||||
head,
|
exit_code: None,
|
||||||
),
|
span: head,
|
||||||
head,
|
metadata: None,
|
||||||
None,
|
})
|
||||||
)),
|
}
|
||||||
PipelineData::Value(Value::List { vals, .. }, metadata) => handle_row_stream(
|
PipelineData::Value(Value::List { vals, .. }, metadata) => handle_row_stream(
|
||||||
engine_state,
|
engine_state,
|
||||||
stack,
|
stack,
|
||||||
|
@ -255,8 +259,8 @@ fn handle_row_stream(
|
||||||
|
|
||||||
let head = call.head;
|
let head = call.head;
|
||||||
|
|
||||||
Ok(PipelineData::RawStream(
|
Ok(PipelineData::ExternalStream {
|
||||||
RawStream::new(
|
stdout: RawStream::new(
|
||||||
Box::new(PagingTableCreator {
|
Box::new(PagingTableCreator {
|
||||||
row_offset,
|
row_offset,
|
||||||
config,
|
config,
|
||||||
|
@ -267,9 +271,11 @@ fn handle_row_stream(
|
||||||
ctrlc,
|
ctrlc,
|
||||||
head,
|
head,
|
||||||
),
|
),
|
||||||
head,
|
stderr: None,
|
||||||
None,
|
exit_code: None,
|
||||||
))
|
span: head,
|
||||||
|
metadata: None,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn convert_to_table(
|
fn convert_to_table(
|
||||||
|
|
|
@ -39,6 +39,9 @@ impl Command for KnownExternal {
|
||||||
let call_span = call.span();
|
let call_span = call.span();
|
||||||
let contents = engine_state.get_span_contents(&call_span);
|
let contents = engine_state.get_span_contents(&call_span);
|
||||||
|
|
||||||
|
let redirect_stdout = call.redirect_stdout;
|
||||||
|
let redirect_stderr = call.redirect_stderr;
|
||||||
|
|
||||||
let (lexed, _) = crate::lex(contents, call_span.start, &[], &[], true);
|
let (lexed, _) = crate::lex(contents, call_span.start, &[], &[], true);
|
||||||
|
|
||||||
let spans: Vec<_> = lexed.into_iter().map(|x| x.span).collect();
|
let spans: Vec<_> = lexed.into_iter().map(|x| x.span).collect();
|
||||||
|
@ -61,7 +64,7 @@ impl Command for KnownExternal {
|
||||||
call.positional.push(arg.clone())
|
call.positional.push(arg.clone())
|
||||||
}
|
}
|
||||||
|
|
||||||
if call.redirect_stdout {
|
if redirect_stdout {
|
||||||
call.named.push((
|
call.named.push((
|
||||||
Spanned {
|
Spanned {
|
||||||
item: "redirect-stdout".into(),
|
item: "redirect-stdout".into(),
|
||||||
|
@ -71,7 +74,7 @@ impl Command for KnownExternal {
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
if call.redirect_stderr {
|
if redirect_stderr {
|
||||||
call.named.push((
|
call.named.push((
|
||||||
Spanned {
|
Spanned {
|
||||||
item: "redirect-stderr".into(),
|
item: "redirect-stderr".into(),
|
||||||
|
|
|
@ -35,7 +35,13 @@ use crate::{ast::PathMember, Config, ListStream, RawStream, ShellError, Span, Va
|
||||||
pub enum PipelineData {
|
pub enum PipelineData {
|
||||||
Value(Value, Option<PipelineMetadata>),
|
Value(Value, Option<PipelineMetadata>),
|
||||||
ListStream(ListStream, Option<PipelineMetadata>),
|
ListStream(ListStream, Option<PipelineMetadata>),
|
||||||
RawStream(RawStream, Span, Option<PipelineMetadata>),
|
ExternalStream {
|
||||||
|
stdout: RawStream,
|
||||||
|
stderr: Option<RawStream>,
|
||||||
|
exit_code: Option<ListStream>,
|
||||||
|
span: Span,
|
||||||
|
metadata: Option<PipelineMetadata>,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
|
@ -60,7 +66,7 @@ impl PipelineData {
|
||||||
pub fn metadata(&self) -> Option<PipelineMetadata> {
|
pub fn metadata(&self) -> Option<PipelineMetadata> {
|
||||||
match self {
|
match self {
|
||||||
PipelineData::ListStream(_, x) => x.clone(),
|
PipelineData::ListStream(_, x) => x.clone(),
|
||||||
PipelineData::RawStream(_, _, x) => x.clone(),
|
PipelineData::ExternalStream { metadata: x, .. } => x.clone(),
|
||||||
PipelineData::Value(_, x) => x.clone(),
|
PipelineData::Value(_, x) => x.clone(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -68,7 +74,7 @@ impl PipelineData {
|
||||||
pub fn set_metadata(mut self, metadata: Option<PipelineMetadata>) -> Self {
|
pub fn set_metadata(mut self, metadata: Option<PipelineMetadata>) -> Self {
|
||||||
match &mut self {
|
match &mut self {
|
||||||
PipelineData::ListStream(_, x) => *x = metadata,
|
PipelineData::ListStream(_, x) => *x = metadata,
|
||||||
PipelineData::RawStream(_, _, x) => *x = metadata,
|
PipelineData::ExternalStream { metadata: x, .. } => *x = metadata,
|
||||||
PipelineData::Value(_, x) => *x = metadata,
|
PipelineData::Value(_, x) => *x = metadata,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -87,7 +93,7 @@ impl PipelineData {
|
||||||
vals: s.collect(),
|
vals: s.collect(),
|
||||||
span, // FIXME?
|
span, // FIXME?
|
||||||
},
|
},
|
||||||
PipelineData::RawStream(mut s, ..) => {
|
PipelineData::ExternalStream { stdout: mut s, .. } => {
|
||||||
let mut items = vec![];
|
let mut items = vec![];
|
||||||
|
|
||||||
for val in &mut s {
|
for val in &mut s {
|
||||||
|
@ -151,7 +157,7 @@ impl PipelineData {
|
||||||
match self {
|
match self {
|
||||||
PipelineData::Value(v, ..) => Ok(v.into_string(separator, config)),
|
PipelineData::Value(v, ..) => Ok(v.into_string(separator, config)),
|
||||||
PipelineData::ListStream(s, ..) => Ok(s.into_string(separator, config)),
|
PipelineData::ListStream(s, ..) => Ok(s.into_string(separator, config)),
|
||||||
PipelineData::RawStream(s, ..) => {
|
PipelineData::ExternalStream { stdout: s, .. } => {
|
||||||
let mut items = vec![];
|
let mut items = vec![];
|
||||||
|
|
||||||
for val in s {
|
for val in s {
|
||||||
|
@ -230,7 +236,7 @@ impl PipelineData {
|
||||||
Ok(vals.into_iter().map(f).into_pipeline_data(ctrlc))
|
Ok(vals.into_iter().map(f).into_pipeline_data(ctrlc))
|
||||||
}
|
}
|
||||||
PipelineData::ListStream(stream, ..) => Ok(stream.map(f).into_pipeline_data(ctrlc)),
|
PipelineData::ListStream(stream, ..) => Ok(stream.map(f).into_pipeline_data(ctrlc)),
|
||||||
PipelineData::RawStream(stream, ..) => {
|
PipelineData::ExternalStream { stdout: stream, .. } => {
|
||||||
let collected = stream.into_bytes()?;
|
let collected = stream.into_bytes()?;
|
||||||
|
|
||||||
if let Ok(st) = String::from_utf8(collected.clone().item) {
|
if let Ok(st) = String::from_utf8(collected.clone().item) {
|
||||||
|
@ -277,7 +283,7 @@ impl PipelineData {
|
||||||
PipelineData::ListStream(stream, ..) => {
|
PipelineData::ListStream(stream, ..) => {
|
||||||
Ok(stream.flat_map(f).into_pipeline_data(ctrlc))
|
Ok(stream.flat_map(f).into_pipeline_data(ctrlc))
|
||||||
}
|
}
|
||||||
PipelineData::RawStream(stream, ..) => {
|
PipelineData::ExternalStream { stdout: stream, .. } => {
|
||||||
let collected = stream.into_bytes()?;
|
let collected = stream.into_bytes()?;
|
||||||
|
|
||||||
if let Ok(st) = String::from_utf8(collected.clone().item) {
|
if let Ok(st) = String::from_utf8(collected.clone().item) {
|
||||||
|
@ -318,7 +324,7 @@ impl PipelineData {
|
||||||
Ok(vals.into_iter().filter(f).into_pipeline_data(ctrlc))
|
Ok(vals.into_iter().filter(f).into_pipeline_data(ctrlc))
|
||||||
}
|
}
|
||||||
PipelineData::ListStream(stream, ..) => Ok(stream.filter(f).into_pipeline_data(ctrlc)),
|
PipelineData::ListStream(stream, ..) => Ok(stream.filter(f).into_pipeline_data(ctrlc)),
|
||||||
PipelineData::RawStream(stream, ..) => {
|
PipelineData::ExternalStream { stdout: stream, .. } => {
|
||||||
let collected = stream.into_bytes()?;
|
let collected = stream.into_bytes()?;
|
||||||
|
|
||||||
if let Ok(st) = String::from_utf8(collected.clone().item) {
|
if let Ok(st) = String::from_utf8(collected.clone().item) {
|
||||||
|
@ -408,7 +414,7 @@ impl Iterator for PipelineIterator {
|
||||||
PipelineData::Value(Value::Nothing { .. }, ..) => None,
|
PipelineData::Value(Value::Nothing { .. }, ..) => None,
|
||||||
PipelineData::Value(v, ..) => Some(std::mem::take(v)),
|
PipelineData::Value(v, ..) => Some(std::mem::take(v)),
|
||||||
PipelineData::ListStream(stream, ..) => stream.next(),
|
PipelineData::ListStream(stream, ..) => stream.next(),
|
||||||
PipelineData::RawStream(stream, ..) => stream.next().map(|x| match x {
|
PipelineData::ExternalStream { stdout: stream, .. } => stream.next().map(|x| match x {
|
||||||
Ok(x) => x,
|
Ok(x) => x,
|
||||||
Err(err) => Value::Error { error: err },
|
Err(err) => Value::Error { error: err },
|
||||||
}),
|
}),
|
||||||
|
|
12
src/main.rs
12
src/main.rs
|
@ -171,15 +171,17 @@ fn main() -> Result<()> {
|
||||||
let stdin = std::io::stdin();
|
let stdin = std::io::stdin();
|
||||||
let buf_reader = BufReader::new(stdin);
|
let buf_reader = BufReader::new(stdin);
|
||||||
|
|
||||||
PipelineData::RawStream(
|
PipelineData::ExternalStream {
|
||||||
RawStream::new(
|
stdout: RawStream::new(
|
||||||
Box::new(BufferedReader::new(buf_reader)),
|
Box::new(BufferedReader::new(buf_reader)),
|
||||||
Some(ctrlc),
|
Some(ctrlc),
|
||||||
redirect_stdin.span,
|
redirect_stdin.span,
|
||||||
),
|
),
|
||||||
redirect_stdin.span,
|
stderr: None,
|
||||||
None,
|
exit_code: None,
|
||||||
)
|
span: redirect_stdin.span,
|
||||||
|
metadata: None,
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
PipelineData::new(Span::new(0, 0))
|
PipelineData::new(Span::new(0, 0))
|
||||||
};
|
};
|
||||||
|
|
Loading…
Reference in a new issue