diff --git a/crates/nu-cli/src/util.rs b/crates/nu-cli/src/util.rs index 7ebea0deb2..e4912e012f 100644 --- a/crates/nu-cli/src/util.rs +++ b/crates/nu-cli/src/util.rs @@ -276,8 +276,8 @@ fn evaluate_source( eval_block::(engine_state, stack, &block, input) }?; - let status = if let PipelineData::ByteStream(stream, ..) = pipeline { - stream.print(false)? + let status = if let PipelineData::ByteStream(..) = pipeline { + pipeline.print(engine_state, stack, false, false)? } else { if let Some(hook) = engine_state.get_config().hooks.display_output.clone() { let pipeline = eval_hook( diff --git a/crates/nu-cmd-lang/src/core_commands/describe.rs b/crates/nu-cmd-lang/src/core_commands/describe.rs index 7d6d7f6f83..3d992f3f33 100644 --- a/crates/nu-cmd-lang/src/core_commands/describe.rs +++ b/crates/nu-cmd-lang/src/core_commands/describe.rs @@ -163,6 +163,8 @@ fn run( let description = match input { PipelineData::ByteStream(stream, ..) => { + let type_ = stream.type_().describe(); + let description = if options.detailed { let origin = match stream.source() { ByteStreamSource::Read(_) => "unknown", @@ -172,14 +174,14 @@ fn run( Value::record( record! { - "type" => Value::string("byte stream", head), + "type" => Value::string(type_, head), "origin" => Value::string(origin, head), "metadata" => metadata_to_value(metadata, head), }, head, ) } else { - Value::string("byte stream", head) + Value::string(type_, head) }; if !options.no_collect { diff --git a/crates/nu-command/src/bytes/collect.rs b/crates/nu-command/src/bytes/collect.rs index 9cd34496e4..74ea3e5d14 100644 --- a/crates/nu-command/src/bytes/collect.rs +++ b/crates/nu-command/src/bytes/collect.rs @@ -1,3 +1,4 @@ +use itertools::Itertools; use nu_engine::command_prelude::*; #[derive(Clone, Copy)] @@ -35,46 +36,33 @@ impl Command for BytesCollect { input: PipelineData, ) -> Result { let separator: Option> = call.opt(engine_state, stack, 0)?; + + let span = call.head; + // input should be a list of binary data. - let mut output_binary = vec![]; - for value in input { - match value { - Value::Binary { mut val, .. } => { - output_binary.append(&mut val); - // manually concat - // TODO: make use of std::slice::Join when it's available in stable. - if let Some(sep) = &separator { - let mut work_sep = sep.clone(); - output_binary.append(&mut work_sep) - } - } - // Explicitly propagate errors instead of dropping them. - Value::Error { error, .. } => return Err(*error), - other => { - return Err(ShellError::OnlySupportsThisInputType { + let metadata = input.metadata(); + let iter = Itertools::intersperse( + input.into_iter_strict(span)?.map(move |value| { + // Everything is wrapped in Some in case there's a separator, so we can flatten + Some(match value { + // Explicitly propagate errors instead of dropping them. + Value::Error { error, .. } => Err(*error), + Value::Binary { val, .. } => Ok(val), + other => Err(ShellError::OnlySupportsThisInputType { exp_input_type: "binary".into(), wrong_type: other.get_type().to_string(), - dst_span: call.head, + dst_span: span, src_span: other.span(), - }); - } - } - } + }), + }) + }), + Ok(separator).transpose(), + ) + .flatten(); - match separator { - None => Ok(Value::binary(output_binary, call.head).into_pipeline_data()), - Some(sep) => { - if output_binary.is_empty() { - Ok(Value::binary(output_binary, call.head).into_pipeline_data()) - } else { - // have push one extra separator in previous step, pop them out. - for _ in sep { - let _ = output_binary.pop(); - } - Ok(Value::binary(output_binary, call.head).into_pipeline_data()) - } - } - } + let output = ByteStream::from_result_iter(iter, span, None, ByteStreamType::Binary); + + Ok(PipelineData::ByteStream(output, metadata)) } fn examples(&self) -> Vec { diff --git a/crates/nu-command/src/conversions/into/binary.rs b/crates/nu-command/src/conversions/into/binary.rs index 479b0fc7d7..8eb7715754 100644 --- a/crates/nu-command/src/conversions/into/binary.rs +++ b/crates/nu-command/src/conversions/into/binary.rs @@ -127,15 +127,18 @@ fn into_binary( let cell_paths = call.rest(engine_state, stack, 0)?; let cell_paths = (!cell_paths.is_empty()).then_some(cell_paths); - if let PipelineData::ByteStream(stream, ..) = input { - // TODO: in the future, we may want this to stream out, converting each to bytes - Ok(Value::binary(stream.into_bytes()?, head).into_pipeline_data()) + if let PipelineData::ByteStream(stream, metadata) = input { + // Just set the type - that should be good enough + Ok(PipelineData::ByteStream( + stream.with_type(ByteStreamType::Binary), + metadata, + )) } else { let args = Arguments { cell_paths, compact: call.has_flag(engine_state, stack, "compact")?, }; - operate(action, args, input, call.head, engine_state.ctrlc.clone()) + operate(action, args, input, head, engine_state.ctrlc.clone()) } } diff --git a/crates/nu-command/src/conversions/into/cell_path.rs b/crates/nu-command/src/conversions/into/cell_path.rs index 6da317abd3..c05dad57ba 100644 --- a/crates/nu-command/src/conversions/into/cell_path.rs +++ b/crates/nu-command/src/conversions/into/cell_path.rs @@ -103,7 +103,7 @@ fn into_cell_path(call: &Call, input: PipelineData) -> Result Err(ShellError::OnlySupportsThisInputType { exp_input_type: "list, int".into(), - wrong_type: "byte stream".into(), + wrong_type: stream.type_().describe().into(), dst_span: head, src_span: stream.span(), }), diff --git a/crates/nu-command/src/conversions/into/string.rs b/crates/nu-command/src/conversions/into/string.rs index eda4f7e5a5..c0731b2e20 100644 --- a/crates/nu-command/src/conversions/into/string.rs +++ b/crates/nu-command/src/conversions/into/string.rs @@ -156,9 +156,23 @@ fn string_helper( let cell_paths = call.rest(engine_state, stack, 0)?; let cell_paths = (!cell_paths.is_empty()).then_some(cell_paths); - if let PipelineData::ByteStream(stream, ..) = input { - // TODO: in the future, we may want this to stream out, converting each to bytes - Ok(Value::string(stream.into_string()?, head).into_pipeline_data()) + if let PipelineData::ByteStream(stream, metadata) = input { + // Just set the type - that should be good enough. There is no guarantee that the data + // within a string stream is actually valid UTF-8. But refuse to do it if it was already set + // to binary + if stream.type_() != ByteStreamType::Binary { + Ok(PipelineData::ByteStream( + stream.with_type(ByteStreamType::String), + metadata, + )) + } else { + Err(ShellError::CantConvert { + to_type: "string".into(), + from_type: "binary".into(), + span: stream.span(), + help: Some("try using the `decode` command".into()), + }) + } } else { let config = engine_state.get_config().clone(); let args = Arguments { diff --git a/crates/nu-command/src/filters/drop/column.rs b/crates/nu-command/src/filters/drop/column.rs index 01c13deee4..94c0308ea8 100644 --- a/crates/nu-command/src/filters/drop/column.rs +++ b/crates/nu-command/src/filters/drop/column.rs @@ -135,7 +135,7 @@ fn drop_cols( PipelineData::Empty => Ok(PipelineData::Empty), PipelineData::ByteStream(stream, ..) => Err(ShellError::OnlySupportsThisInputType { exp_input_type: "table or record".into(), - wrong_type: "byte stream".into(), + wrong_type: stream.type_().describe().into(), dst_span: head, src_span: stream.span(), }), diff --git a/crates/nu-command/src/filters/first.rs b/crates/nu-command/src/filters/first.rs index e581c3e84d..f625847a3f 100644 --- a/crates/nu-command/src/filters/first.rs +++ b/crates/nu-command/src/filters/first.rs @@ -170,12 +170,43 @@ fn first_helper( )) } } - PipelineData::ByteStream(stream, ..) => Err(ShellError::OnlySupportsThisInputType { - exp_input_type: "list, binary or range".into(), - wrong_type: "byte stream".into(), - dst_span: head, - src_span: stream.span(), - }), + PipelineData::ByteStream(stream, metadata) => { + if stream.type_() == ByteStreamType::Binary { + let span = stream.span(); + if let Some(mut reader) = stream.reader() { + use std::io::Read; + if return_single_element { + // Take a single byte + let mut byte = [0u8]; + if reader.read(&mut byte).err_span(span)? > 0 { + Ok(Value::int(byte[0] as i64, head).into_pipeline_data()) + } else { + Err(ShellError::AccessEmptyContent { span: head }) + } + } else { + // Just take 'rows' bytes off the stream, mimicking the binary behavior + Ok(PipelineData::ByteStream( + ByteStream::read( + reader.take(rows as u64), + head, + None, + ByteStreamType::Binary, + ), + metadata, + )) + } + } else { + Ok(PipelineData::Empty) + } + } else { + Err(ShellError::OnlySupportsThisInputType { + exp_input_type: "list, binary or range".into(), + wrong_type: stream.type_().describe().into(), + dst_span: head, + src_span: stream.span(), + }) + } + } PipelineData::Empty => Err(ShellError::OnlySupportsThisInputType { exp_input_type: "list, binary or range".into(), wrong_type: "null".into(), diff --git a/crates/nu-command/src/filters/insert.rs b/crates/nu-command/src/filters/insert.rs index e8794304c8..5f1380b2ac 100644 --- a/crates/nu-command/src/filters/insert.rs +++ b/crates/nu-command/src/filters/insert.rs @@ -261,8 +261,8 @@ fn insert( type_name: "empty pipeline".to_string(), span: head, }), - PipelineData::ByteStream(..) => Err(ShellError::IncompatiblePathAccess { - type_name: "byte stream".to_string(), + PipelineData::ByteStream(stream, ..) => Err(ShellError::IncompatiblePathAccess { + type_name: stream.type_().describe().into(), span: head, }), } diff --git a/crates/nu-command/src/filters/items.rs b/crates/nu-command/src/filters/items.rs index 6afc0bc536..ed30486bee 100644 --- a/crates/nu-command/src/filters/items.rs +++ b/crates/nu-command/src/filters/items.rs @@ -86,7 +86,7 @@ impl Command for Items { }), PipelineData::ByteStream(stream, ..) => Err(ShellError::OnlySupportsThisInputType { exp_input_type: "record".into(), - wrong_type: "byte stream".into(), + wrong_type: stream.type_().describe().into(), dst_span: call.head, src_span: stream.span(), }), diff --git a/crates/nu-command/src/filters/last.rs b/crates/nu-command/src/filters/last.rs index 7530126c26..510e6457a8 100644 --- a/crates/nu-command/src/filters/last.rs +++ b/crates/nu-command/src/filters/last.rs @@ -160,12 +160,48 @@ impl Command for Last { }), } } - PipelineData::ByteStream(stream, ..) => Err(ShellError::OnlySupportsThisInputType { - exp_input_type: "list, binary or range".into(), - wrong_type: "byte stream".into(), - dst_span: head, - src_span: stream.span(), - }), + PipelineData::ByteStream(stream, ..) => { + if stream.type_() == ByteStreamType::Binary { + let span = stream.span(); + if let Some(mut reader) = stream.reader() { + use std::io::Read; + // Have to be a bit tricky here, but just consume into a VecDeque that we + // shrink to fit each time + const TAKE: u64 = 8192; + let mut buf = VecDeque::with_capacity(rows + TAKE as usize); + loop { + let taken = std::io::copy(&mut (&mut reader).take(TAKE), &mut buf) + .err_span(span)?; + if buf.len() > rows { + buf.drain(..(buf.len() - rows)); + } + if taken < TAKE { + // This must be EOF. + if return_single_element { + if !buf.is_empty() { + return Ok( + Value::int(buf[0] as i64, head).into_pipeline_data() + ); + } else { + return Err(ShellError::AccessEmptyContent { span: head }); + } + } else { + return Ok(Value::binary(buf, head).into_pipeline_data()); + } + } + } + } else { + Ok(PipelineData::Empty) + } + } else { + Err(ShellError::OnlySupportsThisInputType { + exp_input_type: "list, binary or range".into(), + wrong_type: stream.type_().describe().into(), + dst_span: head, + src_span: stream.span(), + }) + } + } PipelineData::Empty => Err(ShellError::OnlySupportsThisInputType { exp_input_type: "list, binary or range".into(), wrong_type: "null".into(), diff --git a/crates/nu-command/src/filters/skip/skip_.rs b/crates/nu-command/src/filters/skip/skip_.rs index 9048b34a58..df53cfacba 100644 --- a/crates/nu-command/src/filters/skip/skip_.rs +++ b/crates/nu-command/src/filters/skip/skip_.rs @@ -12,6 +12,7 @@ impl Command for Skip { Signature::build(self.name()) .input_output_types(vec![ (Type::table(), Type::table()), + (Type::Binary, Type::Binary), ( Type::List(Box::new(Type::Any)), Type::List(Box::new(Type::Any)), @@ -51,6 +52,11 @@ impl Command for Skip { "editions" => Value::test_int(2021), })])), }, + Example { + description: "Skip 2 bytes of a binary value", + example: "0x[01 23 45 67] | skip 2", + result: Some(Value::test_binary(vec![0x45, 0x67])), + }, ] } fn run( @@ -87,12 +93,30 @@ impl Command for Skip { let ctrlc = engine_state.ctrlc.clone(); let input_span = input.span().unwrap_or(call.head); match input { - PipelineData::ByteStream(stream, ..) => Err(ShellError::OnlySupportsThisInputType { - exp_input_type: "list, binary or range".into(), - wrong_type: "byte stream".into(), - dst_span: call.head, - src_span: stream.span(), - }), + PipelineData::ByteStream(stream, metadata) => { + if stream.type_() == ByteStreamType::Binary { + let span = stream.span(); + if let Some(mut reader) = stream.reader() { + use std::io::Read; + // Copy the number of skipped bytes into the sink before proceeding + std::io::copy(&mut (&mut reader).take(n as u64), &mut std::io::sink()) + .err_span(span)?; + Ok(PipelineData::ByteStream( + ByteStream::read(reader, call.head, None, ByteStreamType::Binary), + metadata, + )) + } else { + Ok(PipelineData::Empty) + } + } else { + Err(ShellError::OnlySupportsThisInputType { + exp_input_type: "list, binary or range".into(), + wrong_type: stream.type_().describe().into(), + dst_span: call.head, + src_span: stream.span(), + }) + } + } PipelineData::Value(Value::Binary { val, .. }, metadata) => { let bytes = val.into_iter().skip(n).collect::>(); Ok(Value::binary(bytes, input_span).into_pipeline_data_with_metadata(metadata)) diff --git a/crates/nu-command/src/filters/take/take_.rs b/crates/nu-command/src/filters/take/take_.rs index 12840aa8d6..d4bf455c4a 100644 --- a/crates/nu-command/src/filters/take/take_.rs +++ b/crates/nu-command/src/filters/take/take_.rs @@ -78,12 +78,32 @@ impl Command for Take { stream.modify(|iter| iter.take(rows_desired)), metadata, )), - PipelineData::ByteStream(stream, ..) => Err(ShellError::OnlySupportsThisInputType { - exp_input_type: "list, binary or range".into(), - wrong_type: "byte stream".into(), - dst_span: head, - src_span: stream.span(), - }), + PipelineData::ByteStream(stream, metadata) => { + if stream.type_() == ByteStreamType::Binary { + if let Some(reader) = stream.reader() { + use std::io::Read; + // Just take 'rows' bytes off the stream, mimicking the binary behavior + Ok(PipelineData::ByteStream( + ByteStream::read( + reader.take(rows_desired as u64), + head, + None, + ByteStreamType::Binary, + ), + metadata, + )) + } else { + Ok(PipelineData::Empty) + } + } else { + Err(ShellError::OnlySupportsThisInputType { + exp_input_type: "list, binary or range".into(), + wrong_type: stream.type_().describe().into(), + dst_span: head, + src_span: stream.span(), + }) + } + } PipelineData::Empty => Err(ShellError::OnlySupportsThisInputType { exp_input_type: "list, binary or range".into(), wrong_type: "null".into(), diff --git a/crates/nu-command/src/filters/tee.rs b/crates/nu-command/src/filters/tee.rs index 936dee5c79..d6decd3bc6 100644 --- a/crates/nu-command/src/filters/tee.rs +++ b/crates/nu-command/src/filters/tee.rs @@ -1,7 +1,7 @@ use nu_engine::{command_prelude::*, get_eval_block_with_early_return}; use nu_protocol::{ byte_stream::copy_with_interrupt, engine::Closure, process::ChildPipe, ByteStream, - ByteStreamSource, OutDest, + ByteStreamSource, OutDest, PipelineMetadata, }; use std::{ io::{self, Read, Write}, @@ -104,9 +104,13 @@ use it in your pipeline."# if let PipelineData::ByteStream(stream, metadata) = input { let span = stream.span(); let ctrlc = engine_state.ctrlc.clone(); - let eval_block = { - let metadata = metadata.clone(); - move |stream| eval_block(PipelineData::ByteStream(stream, metadata)) + let type_ = stream.type_(); + + let info = StreamInfo { + span, + ctrlc: ctrlc.clone(), + type_, + metadata: metadata.clone(), }; match stream.into_source() { @@ -115,10 +119,11 @@ use it in your pipeline."# return stderr_misuse(span, head); } - let tee = IoTee::new(read, span, eval_block)?; + let tee_thread = spawn_tee(info, eval_block)?; + let tee = IoTee::new(read, tee_thread); Ok(PipelineData::ByteStream( - ByteStream::read(tee, span, ctrlc), + ByteStream::read(tee, span, ctrlc, type_), metadata, )) } @@ -127,44 +132,32 @@ use it in your pipeline."# return stderr_misuse(span, head); } - let tee = IoTee::new(file, span, eval_block)?; + let tee_thread = spawn_tee(info, eval_block)?; + let tee = IoTee::new(file, tee_thread); Ok(PipelineData::ByteStream( - ByteStream::read(tee, span, ctrlc), + ByteStream::read(tee, span, ctrlc, type_), metadata, )) } ByteStreamSource::Child(mut child) => { let stderr_thread = if use_stderr { let stderr_thread = if let Some(stderr) = child.stderr.take() { + let tee_thread = spawn_tee(info.clone(), eval_block)?; + let tee = IoTee::new(stderr, tee_thread); match stack.stderr() { OutDest::Pipe | OutDest::Capture => { - let tee = IoTee::new(stderr, span, eval_block)?; child.stderr = Some(ChildPipe::Tee(Box::new(tee))); - None + Ok(None) } - OutDest::Null => Some(tee_pipe_on_thread( - stderr, - io::sink(), - span, - ctrlc.as_ref(), - eval_block, - )?), - OutDest::Inherit => Some(tee_pipe_on_thread( - stderr, - io::stderr(), - span, - ctrlc.as_ref(), - eval_block, - )?), - OutDest::File(file) => Some(tee_pipe_on_thread( - stderr, - file.clone(), - span, - ctrlc.as_ref(), - eval_block, - )?), - } + OutDest::Null => copy_on_thread(tee, io::sink(), &info).map(Some), + OutDest::Inherit => { + copy_on_thread(tee, io::stderr(), &info).map(Some) + } + OutDest::File(file) => { + copy_on_thread(tee, file.clone(), &info).map(Some) + } + }? } else { None }; @@ -175,37 +168,29 @@ use it in your pipeline."# child.stdout = Some(stdout); Ok(()) } - OutDest::Null => { - copy_pipe(stdout, io::sink(), span, ctrlc.as_deref()) - } - OutDest::Inherit => { - copy_pipe(stdout, io::stdout(), span, ctrlc.as_deref()) - } - OutDest::File(file) => { - copy_pipe(stdout, file.as_ref(), span, ctrlc.as_deref()) - } + OutDest::Null => copy_pipe(stdout, io::sink(), &info), + OutDest::Inherit => copy_pipe(stdout, io::stdout(), &info), + OutDest::File(file) => copy_pipe(stdout, file.as_ref(), &info), }?; } stderr_thread } else { let stderr_thread = if let Some(stderr) = child.stderr.take() { + let info = info.clone(); match stack.stderr() { OutDest::Pipe | OutDest::Capture => { child.stderr = Some(stderr); Ok(None) } OutDest::Null => { - copy_pipe_on_thread(stderr, io::sink(), span, ctrlc.as_ref()) - .map(Some) + copy_pipe_on_thread(stderr, io::sink(), &info).map(Some) } OutDest::Inherit => { - copy_pipe_on_thread(stderr, io::stderr(), span, ctrlc.as_ref()) - .map(Some) + copy_pipe_on_thread(stderr, io::stderr(), &info).map(Some) } OutDest::File(file) => { - copy_pipe_on_thread(stderr, file.clone(), span, ctrlc.as_ref()) - .map(Some) + copy_pipe_on_thread(stderr, file.clone(), &info).map(Some) } }? } else { @@ -213,29 +198,16 @@ use it in your pipeline."# }; if let Some(stdout) = child.stdout.take() { + let tee_thread = spawn_tee(info.clone(), eval_block)?; + let tee = IoTee::new(stdout, tee_thread); match stack.stdout() { OutDest::Pipe | OutDest::Capture => { - let tee = IoTee::new(stdout, span, eval_block)?; child.stdout = Some(ChildPipe::Tee(Box::new(tee))); Ok(()) } - OutDest::Null => { - tee_pipe(stdout, io::sink(), span, ctrlc.as_deref(), eval_block) - } - OutDest::Inherit => tee_pipe( - stdout, - io::stdout(), - span, - ctrlc.as_deref(), - eval_block, - ), - OutDest::File(file) => tee_pipe( - stdout, - file.as_ref(), - span, - ctrlc.as_deref(), - eval_block, - ), + OutDest::Null => copy(tee, io::sink(), &info), + OutDest::Inherit => copy(tee, io::stdout(), &info), + OutDest::File(file) => copy(tee, file.as_ref(), &info), }?; } @@ -350,7 +322,7 @@ where fn stderr_misuse(span: Span, head: Span) -> Result { Err(ShellError::UnsupportedInput { msg: "--stderr can only be used on external commands".into(), - input: "the input to `tee` is not an external commands".into(), + input: "the input to `tee` is not an external command".into(), msg_span: head, input_span: span, }) @@ -363,23 +335,12 @@ struct IoTee { } impl IoTee { - fn new( - reader: R, - span: Span, - eval_block: impl FnOnce(ByteStream) -> Result<(), ShellError> + Send + 'static, - ) -> Result { - let (sender, receiver) = mpsc::channel(); - - let thread = thread::Builder::new() - .name("tee".into()) - .spawn(move || eval_block(ByteStream::from_iter(receiver, span, None))) - .err_span(span)?; - - Ok(Self { + fn new(reader: R, tee: TeeThread) -> Self { + Self { reader, - sender: Some(sender), - thread: Some(thread), - }) + sender: Some(tee.sender), + thread: Some(tee.thread), + } } } @@ -411,68 +372,74 @@ impl Read for IoTee { } } -fn tee_pipe( - pipe: ChildPipe, - mut dest: impl Write, +struct TeeThread { + sender: Sender>, + thread: JoinHandle>, +} + +fn spawn_tee( + info: StreamInfo, + mut eval_block: impl FnMut(PipelineData) -> Result<(), ShellError> + Send + 'static, +) -> Result { + let (sender, receiver) = mpsc::channel(); + + let thread = thread::Builder::new() + .name("tee".into()) + .spawn(move || { + // We don't use ctrlc here because we assume it already has it on the other side + let stream = ByteStream::from_iter(receiver.into_iter(), info.span, None, info.type_); + eval_block(PipelineData::ByteStream(stream, info.metadata)) + }) + .err_span(info.span)?; + + Ok(TeeThread { sender, thread }) +} + +#[derive(Clone)] +struct StreamInfo { span: Span, - ctrlc: Option<&AtomicBool>, - eval_block: impl FnOnce(ByteStream) -> Result<(), ShellError> + Send + 'static, -) -> Result<(), ShellError> { - match pipe { - ChildPipe::Pipe(pipe) => { - let mut tee = IoTee::new(pipe, span, eval_block)?; - copy_with_interrupt(&mut tee, &mut dest, span, ctrlc)?; - } - ChildPipe::Tee(tee) => { - let mut tee = IoTee::new(tee, span, eval_block)?; - copy_with_interrupt(&mut tee, &mut dest, span, ctrlc)?; - } - } + ctrlc: Option>, + type_: ByteStreamType, + metadata: Option, +} + +fn copy(mut src: impl Read, mut dest: impl Write, info: &StreamInfo) -> Result<(), ShellError> { + copy_with_interrupt(&mut src, &mut dest, info.span, info.ctrlc.as_deref())?; Ok(()) } -fn tee_pipe_on_thread( - pipe: ChildPipe, - dest: impl Write + Send + 'static, - span: Span, - ctrlc: Option<&Arc>, - eval_block: impl FnOnce(ByteStream) -> Result<(), ShellError> + Send + 'static, +fn copy_pipe(pipe: ChildPipe, dest: impl Write, info: &StreamInfo) -> Result<(), ShellError> { + match pipe { + ChildPipe::Pipe(pipe) => copy(pipe, dest, info), + ChildPipe::Tee(tee) => copy(tee, dest, info), + } +} + +fn copy_on_thread( + mut src: impl Read + Send + 'static, + mut dest: impl Write + Send + 'static, + info: &StreamInfo, ) -> Result>, ShellError> { - let ctrlc = ctrlc.cloned(); + let span = info.span; + let ctrlc = info.ctrlc.clone(); thread::Builder::new() - .name("stderr tee".into()) - .spawn(move || tee_pipe(pipe, dest, span, ctrlc.as_deref(), eval_block)) + .name("stderr copier".into()) + .spawn(move || { + copy_with_interrupt(&mut src, &mut dest, span, ctrlc.as_deref())?; + Ok(()) + }) .map_err(|e| e.into_spanned(span).into()) } -fn copy_pipe( - pipe: ChildPipe, - mut dest: impl Write, - span: Span, - ctrlc: Option<&AtomicBool>, -) -> Result<(), ShellError> { - match pipe { - ChildPipe::Pipe(mut pipe) => { - copy_with_interrupt(&mut pipe, &mut dest, span, ctrlc)?; - } - ChildPipe::Tee(mut tee) => { - copy_with_interrupt(&mut tee, &mut dest, span, ctrlc)?; - } - } - Ok(()) -} - fn copy_pipe_on_thread( pipe: ChildPipe, dest: impl Write + Send + 'static, - span: Span, - ctrlc: Option<&Arc>, + info: &StreamInfo, ) -> Result>, ShellError> { - let ctrlc = ctrlc.cloned(); - thread::Builder::new() - .name("stderr copier".into()) - .spawn(move || copy_pipe(pipe, dest, span, ctrlc.as_deref())) - .map_err(|e| e.into_spanned(span).into()) + match pipe { + ChildPipe::Pipe(pipe) => copy_on_thread(pipe, dest, info), + ChildPipe::Tee(tee) => copy_on_thread(tee, dest, info), + } } #[test] diff --git a/crates/nu-command/src/filters/update.rs b/crates/nu-command/src/filters/update.rs index 0d914d2d8e..e724ae77ad 100644 --- a/crates/nu-command/src/filters/update.rs +++ b/crates/nu-command/src/filters/update.rs @@ -225,8 +225,8 @@ fn update( type_name: "empty pipeline".to_string(), span: head, }), - PipelineData::ByteStream(..) => Err(ShellError::IncompatiblePathAccess { - type_name: "byte stream".to_string(), + PipelineData::ByteStream(stream, ..) => Err(ShellError::IncompatiblePathAccess { + type_name: stream.type_().describe().into(), span: head, }), } diff --git a/crates/nu-command/src/filters/upsert.rs b/crates/nu-command/src/filters/upsert.rs index 4313addd89..e3678972fb 100644 --- a/crates/nu-command/src/filters/upsert.rs +++ b/crates/nu-command/src/filters/upsert.rs @@ -285,8 +285,8 @@ fn upsert( type_name: "empty pipeline".to_string(), span: head, }), - PipelineData::ByteStream(..) => Err(ShellError::IncompatiblePathAccess { - type_name: "byte stream".to_string(), + PipelineData::ByteStream(stream, ..) => Err(ShellError::IncompatiblePathAccess { + type_name: stream.type_().describe().into(), span: head, }), } diff --git a/crates/nu-command/src/filters/values.rs b/crates/nu-command/src/filters/values.rs index ed33ebf643..f6ff8cda2e 100644 --- a/crates/nu-command/src/filters/values.rs +++ b/crates/nu-command/src/filters/values.rs @@ -182,7 +182,7 @@ fn values( } PipelineData::ByteStream(stream, ..) => Err(ShellError::OnlySupportsThisInputType { exp_input_type: "record or table".into(), - wrong_type: "byte stream".into(), + wrong_type: stream.type_().describe().into(), dst_span: head, src_span: stream.span(), }), diff --git a/crates/nu-command/src/formats/to/text.rs b/crates/nu-command/src/formats/to/text.rs index 7f1d632c13..fb240654f6 100644 --- a/crates/nu-command/src/formats/to/text.rs +++ b/crates/nu-command/src/formats/to/text.rs @@ -51,7 +51,12 @@ impl Command for ToText { str }); Ok(PipelineData::ByteStream( - ByteStream::from_iter(iter, span, engine_state.ctrlc.clone()), + ByteStream::from_iter( + iter, + span, + engine_state.ctrlc.clone(), + ByteStreamType::String, + ), meta, )) } diff --git a/crates/nu-command/src/network/http/client.rs b/crates/nu-command/src/network/http/client.rs index 54f7749627..8317fb50bc 100644 --- a/crates/nu-command/src/network/http/client.rs +++ b/crates/nu-command/src/network/http/client.rs @@ -117,10 +117,20 @@ pub fn response_to_buffer( _ => None, }; + // Try to guess whether the response is definitely intended to binary or definitely intended to + // be UTF-8 text. Otherwise specify `None` and just guess. This doesn't have to be thorough. + let content_type_lowercase = response.header("content-type").map(|s| s.to_lowercase()); + let response_type = match content_type_lowercase.as_deref() { + Some("application/octet-stream") => ByteStreamType::Binary, + Some(h) if h.contains("charset=utf-8") => ByteStreamType::String, + _ => ByteStreamType::Unknown, + }; + let reader = response.into_reader(); PipelineData::ByteStream( - ByteStream::read(reader, span, engine_state.ctrlc.clone()).with_known_size(buffer_size), + ByteStream::read(reader, span, engine_state.ctrlc.clone(), response_type) + .with_known_size(buffer_size), None, ) } diff --git a/crates/nu-command/src/strings/str_/join.rs b/crates/nu-command/src/strings/str_/join.rs index 732434b20f..dd3a87dd61 100644 --- a/crates/nu-command/src/strings/str_/join.rs +++ b/crates/nu-command/src/strings/str_/join.rs @@ -1,4 +1,5 @@ use nu_engine::command_prelude::*; +use std::io::Write; #[derive(Clone)] pub struct StrJoin; @@ -40,31 +41,40 @@ impl Command for StrJoin { ) -> Result { let separator: Option = call.opt(engine_state, stack, 0)?; - let config = engine_state.get_config(); + let config = engine_state.config.clone(); - // let output = input.collect_string(&separator.unwrap_or_default(), &config)?; - // Hmm, not sure what we actually want. - // `to_formatted_string` formats dates as human readable which feels funny. - let mut strings: Vec = vec![]; + let span = call.head; - for value in input { - let str = match value { - Value::Error { error, .. } => { - return Err(*error); + let metadata = input.metadata(); + let mut iter = input.into_iter(); + let mut first = true; + + let output = ByteStream::from_fn(span, None, ByteStreamType::String, move |buffer| { + // Write each input to the buffer + if let Some(value) = iter.next() { + // Write the separator if this is not the first + if first { + first = false; + } else if let Some(separator) = &separator { + write!(buffer, "{}", separator)?; } - Value::Date { val, .. } => format!("{val:?}"), - value => value.to_expanded_string("\n", config), - }; - strings.push(str); - } - let output = if let Some(separator) = separator { - strings.join(&separator) - } else { - strings.join("") - }; + match value { + Value::Error { error, .. } => { + return Err(*error); + } + // Hmm, not sure what we actually want. + // `to_expanded_string` formats dates as human readable which feels funny. + Value::Date { val, .. } => write!(buffer, "{val:?}")?, + value => write!(buffer, "{}", value.to_expanded_string("\n", &config))?, + } + Ok(true) + } else { + Ok(false) + } + }); - Ok(Value::string(output, call.head).into_pipeline_data()) + Ok(PipelineData::ByteStream(output, metadata)) } fn examples(&self) -> Vec { diff --git a/crates/nu-command/src/system/run_external.rs b/crates/nu-command/src/system/run_external.rs index b12b89263c..b37d3a2fcb 100644 --- a/crates/nu-command/src/system/run_external.rs +++ b/crates/nu-command/src/system/run_external.rs @@ -416,6 +416,7 @@ impl ExternalCommand { .name("external stdin worker".to_string()) .spawn(move || { let input = match input { + // Don't touch binary input or byte streams input @ PipelineData::ByteStream(..) => input, input @ PipelineData::Value(Value::Binary { .. }, ..) => input, input => { diff --git a/crates/nu-command/src/viewers/table.rs b/crates/nu-command/src/viewers/table.rs index 26b8c921c5..2fe9319821 100644 --- a/crates/nu-command/src/viewers/table.rs +++ b/crates/nu-command/src/viewers/table.rs @@ -5,6 +5,7 @@ use lscolors::{LsColors, Style}; use nu_color_config::{color_from_hex, StyleComputer, TextStyle}; use nu_engine::{command_prelude::*, env::get_config, env_to_string}; +use nu_pretty_hex::HexConfig; use nu_protocol::{ ByteStream, Config, DataSource, ListStream, PipelineMetadata, TableMode, ValueIterator, }; @@ -15,7 +16,7 @@ use nu_table::{ use nu_utils::get_ls_colors; use std::{ collections::VecDeque, - io::{Cursor, IsTerminal}, + io::{IsTerminal, Read}, path::PathBuf, str::FromStr, sync::{atomic::AtomicBool, Arc}, @@ -364,16 +365,18 @@ fn handle_table_command( ) -> Result { let span = input.data.span().unwrap_or(input.call.head); match input.data { + // Binary streams should behave as if they really are `binary` data, and printed as hex + PipelineData::ByteStream(stream, _) if stream.type_() == ByteStreamType::Binary => Ok( + PipelineData::ByteStream(pretty_hex_stream(stream, input.call.head), None), + ), PipelineData::ByteStream(..) => Ok(input.data), PipelineData::Value(Value::Binary { val, .. }, ..) => { - let bytes = { - let mut str = nu_pretty_hex::pretty_hex(&val); - str.push('\n'); - str.into_bytes() - }; let ctrlc = input.engine_state.ctrlc.clone(); - let stream = ByteStream::read(Cursor::new(bytes), input.call.head, ctrlc); - Ok(PipelineData::ByteStream(stream, None)) + let stream = ByteStream::read_binary(val, input.call.head, ctrlc); + Ok(PipelineData::ByteStream( + pretty_hex_stream(stream, input.call.head), + None, + )) } // None of these two receive a StyleComputer because handle_row_stream() can produce it by itself using engine_state and stack. PipelineData::Value(Value::List { vals, .. }, metadata) => { @@ -410,6 +413,70 @@ fn handle_table_command( } } +fn pretty_hex_stream(stream: ByteStream, span: Span) -> ByteStream { + let mut cfg = HexConfig { + // We are going to render the title manually first + title: true, + // If building on 32-bit, the stream size might be bigger than a usize + length: stream.known_size().and_then(|sz| sz.try_into().ok()), + ..HexConfig::default() + }; + + // This won't really work for us + debug_assert!(cfg.width > 0, "the default hex config width was zero"); + + let mut read_buf = Vec::with_capacity(cfg.width); + + let mut reader = if let Some(reader) = stream.reader() { + reader + } else { + // No stream to read from + return ByteStream::read_string("".into(), span, None); + }; + + ByteStream::from_fn(span, None, ByteStreamType::String, move |buffer| { + // Turn the buffer into a String we can write to + let mut write_buf = std::mem::take(buffer); + write_buf.clear(); + // SAFETY: we just truncated it empty + let mut write_buf = unsafe { String::from_utf8_unchecked(write_buf) }; + + // Write the title at the beginning + if cfg.title { + nu_pretty_hex::write_title(&mut write_buf, cfg, true).expect("format error"); + cfg.title = false; + + // Put the write_buf back into buffer + *buffer = write_buf.into_bytes(); + + Ok(true) + } else { + // Read up to `cfg.width` bytes + read_buf.clear(); + (&mut reader) + .take(cfg.width as u64) + .read_to_end(&mut read_buf) + .err_span(span)?; + + if !read_buf.is_empty() { + nu_pretty_hex::hex_write(&mut write_buf, &read_buf, cfg, Some(true)) + .expect("format error"); + write_buf.push('\n'); + + // Advance the address offset for next time + cfg.address_offset += read_buf.len(); + + // Put the write_buf back into buffer + *buffer = write_buf.into_bytes(); + + Ok(true) + } else { + Ok(false) + } + } + }) +} + fn handle_record( input: CmdInput, cfg: TableConfig, @@ -608,7 +675,8 @@ fn handle_row_stream( ctrlc.clone(), cfg, ); - let stream = ByteStream::from_result_iter(paginator, input.call.head, None); + let stream = + ByteStream::from_result_iter(paginator, input.call.head, None, ByteStreamType::String); Ok(PipelineData::ByteStream(stream, None)) } diff --git a/crates/nu-command/tests/commands/bytes/collect.rs b/crates/nu-command/tests/commands/bytes/collect.rs new file mode 100644 index 0000000000..768ab16df4 --- /dev/null +++ b/crates/nu-command/tests/commands/bytes/collect.rs @@ -0,0 +1,27 @@ +use nu_test_support::{nu, pipeline}; + +#[test] +fn test_stream() { + let actual = nu!(pipeline( + " + [0x[01] 0x[02] 0x[03] 0x[04]] + | filter {true} + | bytes collect 0x[aa aa] + | encode hex + " + )); + assert_eq!(actual.out, "01AAAA02AAAA03AAAA04"); +} + +#[test] +fn test_stream_type() { + let actual = nu!(pipeline( + " + [0x[01] 0x[02] 0x[03] 0x[04]] + | filter {true} + | bytes collect 0x[00] + | describe -n + " + )); + assert_eq!(actual.out, "binary (stream)"); +} diff --git a/crates/nu-command/tests/commands/bytes/mod.rs b/crates/nu-command/tests/commands/bytes/mod.rs new file mode 100644 index 0000000000..10b2a494f8 --- /dev/null +++ b/crates/nu-command/tests/commands/bytes/mod.rs @@ -0,0 +1 @@ +mod collect; diff --git a/crates/nu-command/tests/commands/first.rs b/crates/nu-command/tests/commands/first.rs index e01478f820..23ccda6669 100644 --- a/crates/nu-command/tests/commands/first.rs +++ b/crates/nu-command/tests/commands/first.rs @@ -68,6 +68,20 @@ fn gets_first_byte() { assert_eq!(actual.out, "170"); } +#[test] +fn gets_first_bytes_from_stream() { + let actual = nu!("(1.. | each { 0x[aa bb cc] } | bytes collect | first 2) == 0x[aa bb]"); + + assert_eq!(actual.out, "true"); +} + +#[test] +fn gets_first_byte_from_stream() { + let actual = nu!("1.. | each { 0x[aa bb cc] } | bytes collect | first"); + + assert_eq!(actual.out, "170"); +} + #[test] // covers a situation where `first` used to behave strangely on list input fn works_with_binary_list() { diff --git a/crates/nu-command/tests/commands/last.rs b/crates/nu-command/tests/commands/last.rs index b0c67e49be..986b433ea7 100644 --- a/crates/nu-command/tests/commands/last.rs +++ b/crates/nu-command/tests/commands/last.rs @@ -68,6 +68,20 @@ fn gets_last_byte() { assert_eq!(actual.out, "204"); } +#[test] +fn gets_last_bytes_from_stream() { + let actual = nu!("(1..10 | each { 0x[aa bb cc] } | bytes collect | last 2) == 0x[bb cc]"); + + assert_eq!(actual.out, "true"); +} + +#[test] +fn gets_last_byte_from_stream() { + let actual = nu!("1..10 | each { 0x[aa bb cc] } | bytes collect | last"); + + assert_eq!(actual.out, "204"); +} + #[test] fn last_errors_on_negative_index() { let actual = nu!("[1, 2, 3] | last -2"); diff --git a/crates/nu-command/tests/commands/mod.rs b/crates/nu-command/tests/commands/mod.rs index d7215e002b..922e804405 100644 --- a/crates/nu-command/tests/commands/mod.rs +++ b/crates/nu-command/tests/commands/mod.rs @@ -4,6 +4,7 @@ mod any; mod append; mod assignment; mod break_; +mod bytes; mod cal; mod cd; mod compact; diff --git a/crates/nu-command/tests/commands/skip/skip_.rs b/crates/nu-command/tests/commands/skip/skip_.rs index 790c58db4e..c98de496c8 100644 --- a/crates/nu-command/tests/commands/skip/skip_.rs +++ b/crates/nu-command/tests/commands/skip/skip_.rs @@ -1,13 +1,17 @@ use nu_test_support::nu; #[test] -fn binary_skip_will_raise_error() { - let actual = nu!( - cwd: "tests/fixtures/formats", - "open sample_data.ods --raw | skip 2" - ); +fn skips_bytes() { + let actual = nu!("(0x[aa bb cc] | skip 2) == 0x[cc]"); - assert!(actual.err.contains("only_supports_this_input_type")); + assert_eq!(actual.out, "true"); +} + +#[test] +fn skips_bytes_from_stream() { + let actual = nu!("([0 1] | each { 0x[aa bb cc] } | bytes collect | skip 2) == 0x[cc aa bb cc]"); + + assert_eq!(actual.out, "true"); } #[test] diff --git a/crates/nu-command/tests/commands/str_/collect.rs b/crates/nu-command/tests/commands/str_/join.rs similarity index 65% rename from crates/nu-command/tests/commands/str_/collect.rs rename to crates/nu-command/tests/commands/str_/join.rs index 154ce30537..e04652e810 100644 --- a/crates/nu-command/tests/commands/str_/collect.rs +++ b/crates/nu-command/tests/commands/str_/join.rs @@ -22,6 +22,18 @@ fn test_2() { assert_eq!(actual.out, "abcd"); } +#[test] +fn test_stream() { + let actual = nu!("[a b c d] | filter {true} | str join ."); + assert_eq!(actual.out, "a.b.c.d"); +} + +#[test] +fn test_stream_type() { + let actual = nu!("[a b c d] | filter {true} | str join . | describe -n"); + assert_eq!(actual.out, "string (stream)"); +} + #[test] fn construct_a_path() { let actual = nu!(pipeline( diff --git a/crates/nu-command/tests/commands/str_/mod.rs b/crates/nu-command/tests/commands/str_/mod.rs index 9f1e90e853..9efa28b1ef 100644 --- a/crates/nu-command/tests/commands/str_/mod.rs +++ b/crates/nu-command/tests/commands/str_/mod.rs @@ -1,5 +1,5 @@ -mod collect; mod into_string; +mod join; use nu_test_support::fs::Stub::FileWithContent; use nu_test_support::playground::Playground; diff --git a/crates/nu-command/tests/commands/take/rows.rs b/crates/nu-command/tests/commands/take/rows.rs index d5f3d1c601..6c34b61310 100644 --- a/crates/nu-command/tests/commands/take/rows.rs +++ b/crates/nu-command/tests/commands/take/rows.rs @@ -35,6 +35,20 @@ fn fails_on_string() { assert!(actual.err.contains("command doesn't support")); } +#[test] +fn takes_bytes() { + let actual = nu!("(0x[aa bb cc] | take 2) == 0x[aa bb]"); + + assert_eq!(actual.out, "true"); +} + +#[test] +fn takes_bytes_from_stream() { + let actual = nu!("(1.. | each { 0x[aa bb cc] } | bytes collect | take 2) == 0x[aa bb]"); + + assert_eq!(actual.out, "true"); +} + #[test] // covers a situation where `take` used to behave strangely on list input fn works_with_binary_list() { diff --git a/crates/nu-engine/src/command_prelude.rs b/crates/nu-engine/src/command_prelude.rs index 089a2fb8fa..112f280db5 100644 --- a/crates/nu-engine/src/command_prelude.rs +++ b/crates/nu-engine/src/command_prelude.rs @@ -2,7 +2,7 @@ pub use crate::CallExt; pub use nu_protocol::{ ast::{Call, CellPath}, engine::{Command, EngineState, Stack}, - record, Category, ErrSpan, Example, IntoInterruptiblePipelineData, IntoPipelineData, - IntoSpanned, PipelineData, Record, ShellError, Signature, Span, Spanned, SyntaxShape, Type, - Value, + record, ByteStream, ByteStreamType, Category, ErrSpan, Example, IntoInterruptiblePipelineData, + IntoPipelineData, IntoSpanned, PipelineData, Record, ShellError, Signature, Span, Spanned, + SyntaxShape, Type, Value, }; diff --git a/crates/nu-plugin-core/src/interface/mod.rs b/crates/nu-plugin-core/src/interface/mod.rs index b4a2bc9a25..4f287f39c0 100644 --- a/crates/nu-plugin-core/src/interface/mod.rs +++ b/crates/nu-plugin-core/src/interface/mod.rs @@ -183,7 +183,7 @@ pub trait InterfaceManager { PipelineDataHeader::ByteStream(info) => { let handle = self.stream_manager().get_handle(); let reader = handle.read_stream(info.id, self.get_interface())?; - ByteStream::from_result_iter(reader, info.span, ctrlc.cloned()).into() + ByteStream::from_result_iter(reader, info.span, ctrlc.cloned(), info.type_).into() } }) } @@ -261,9 +261,10 @@ pub trait Interface: Clone + Send { } PipelineData::ByteStream(stream, ..) => { let span = stream.span(); + let type_ = stream.type_(); if let Some(reader) = stream.reader() { let (id, writer) = new_stream(RAW_STREAM_HIGH_PRESSURE)?; - let header = PipelineDataHeader::ByteStream(ByteStreamInfo { id, span }); + let header = PipelineDataHeader::ByteStream(ByteStreamInfo { id, span, type_ }); Ok((header, PipelineDataWriter::ByteStream(writer, reader))) } else { Ok((PipelineDataHeader::Empty, PipelineDataWriter::None)) diff --git a/crates/nu-plugin-core/src/interface/tests.rs b/crates/nu-plugin-core/src/interface/tests.rs index fb3d737190..e318a2648e 100644 --- a/crates/nu-plugin-core/src/interface/tests.rs +++ b/crates/nu-plugin-core/src/interface/tests.rs @@ -10,8 +10,8 @@ use nu_plugin_protocol::{ StreamMessage, }; use nu_protocol::{ - ByteStream, ByteStreamSource, DataSource, ListStream, PipelineData, PipelineMetadata, - ShellError, Span, Value, + ByteStream, ByteStreamSource, ByteStreamType, DataSource, ListStream, PipelineData, + PipelineMetadata, ShellError, Span, Value, }; use std::{path::Path, sync::Arc}; @@ -208,6 +208,7 @@ fn read_pipeline_data_byte_stream() -> Result<(), ShellError> { let header = PipelineDataHeader::ByteStream(ByteStreamInfo { id: 12, span: test_span, + type_: ByteStreamType::Unknown, }); let pipe = manager.read_pipeline_data(header, None)?; @@ -401,7 +402,12 @@ fn write_pipeline_data_byte_stream() -> Result<(), ShellError> { // Set up pipeline data for a byte stream let data = PipelineData::ByteStream( - ByteStream::read(std::io::Cursor::new(expected), span, None), + ByteStream::read( + std::io::Cursor::new(expected), + span, + None, + ByteStreamType::Unknown, + ), None, ); diff --git a/crates/nu-plugin-engine/src/interface/tests.rs b/crates/nu-plugin-engine/src/interface/tests.rs index aca59a664e..e718886b3b 100644 --- a/crates/nu-plugin-engine/src/interface/tests.rs +++ b/crates/nu-plugin-engine/src/interface/tests.rs @@ -17,8 +17,8 @@ use nu_plugin_protocol::{ use nu_protocol::{ ast::{Math, Operator}, engine::Closure, - CustomValue, IntoInterruptiblePipelineData, IntoSpanned, PipelineData, PluginSignature, - ShellError, Span, Spanned, Value, + ByteStreamType, CustomValue, IntoInterruptiblePipelineData, IntoSpanned, PipelineData, + PluginSignature, ShellError, Span, Spanned, Value, }; use serde::{Deserialize, Serialize}; use std::{ @@ -157,6 +157,7 @@ fn manager_consume_all_propagates_message_error_to_readers() -> Result<(), Shell PipelineDataHeader::ByteStream(ByteStreamInfo { id: 0, span: Span::test_data(), + type_: ByteStreamType::Unknown, }), None, )?; @@ -384,6 +385,7 @@ fn manager_consume_call_response_registers_streams() -> Result<(), ShellError> { PluginCallResponse::PipelineData(PipelineDataHeader::ByteStream(ByteStreamInfo { id: 1, span: Span::test_data(), + type_: ByteStreamType::Unknown, })), ))?; diff --git a/crates/nu-plugin-protocol/src/lib.rs b/crates/nu-plugin-protocol/src/lib.rs index ea27f82654..db19ee02f6 100644 --- a/crates/nu-plugin-protocol/src/lib.rs +++ b/crates/nu-plugin-protocol/src/lib.rs @@ -22,8 +22,8 @@ mod tests; pub mod test_util; use nu_protocol::{ - ast::Operator, engine::Closure, Config, LabeledError, PipelineData, PluginSignature, - ShellError, Span, Spanned, Value, + ast::Operator, engine::Closure, ByteStreamType, Config, LabeledError, PipelineData, + PluginSignature, ShellError, Span, Spanned, Value, }; use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -112,6 +112,8 @@ pub struct ListStreamInfo { pub struct ByteStreamInfo { pub id: StreamId, pub span: Span, + #[serde(rename = "type")] + pub type_: ByteStreamType, } /// Calls that a plugin can execute. The type parameter determines the input type. diff --git a/crates/nu-plugin/src/plugin/interface/tests.rs b/crates/nu-plugin/src/plugin/interface/tests.rs index ed04190712..6c3dfdf6c9 100644 --- a/crates/nu-plugin/src/plugin/interface/tests.rs +++ b/crates/nu-plugin/src/plugin/interface/tests.rs @@ -9,8 +9,8 @@ use nu_plugin_protocol::{ PluginCustomValue, PluginInput, PluginOutput, Protocol, ProtocolInfo, StreamData, }; use nu_protocol::{ - engine::Closure, Config, CustomValue, IntoInterruptiblePipelineData, LabeledError, - PipelineData, PluginSignature, ShellError, Span, Spanned, Value, + engine::Closure, ByteStreamType, Config, CustomValue, IntoInterruptiblePipelineData, + LabeledError, PipelineData, PluginSignature, ShellError, Span, Spanned, Value, }; use std::{ collections::HashMap, @@ -160,6 +160,7 @@ fn manager_consume_all_propagates_message_error_to_readers() -> Result<(), Shell PipelineDataHeader::ByteStream(ByteStreamInfo { id: 0, span: Span::test_data(), + type_: ByteStreamType::Unknown, }), None, )?; diff --git a/crates/nu-pretty-hex/src/pretty_hex.rs b/crates/nu-pretty-hex/src/pretty_hex.rs index 81bd5451c4..2fab2a9b43 100644 --- a/crates/nu-pretty-hex/src/pretty_hex.rs +++ b/crates/nu-pretty-hex/src/pretty_hex.rs @@ -174,20 +174,14 @@ where .collect(); if cfg.title { - if use_color { - writeln!( - writer, - "Length: {0} (0x{0:x}) bytes | {1}printable {2}whitespace {3}ascii_other {4}non_ascii{5}", - source_part_vec.len(), - Style::default().fg(Color::Cyan).bold().prefix(), - Style::default().fg(Color::Green).bold().prefix(), - Style::default().fg(Color::Purple).bold().prefix(), - Style::default().fg(Color::Yellow).bold().prefix(), - Style::default().fg(Color::Yellow).suffix() - )?; - } else { - writeln!(writer, "Length: {0} (0x{0:x}) bytes", source_part_vec.len(),)?; - } + write_title( + writer, + HexConfig { + length: Some(source_part_vec.len()), + ..cfg + }, + use_color, + )?; } let lines = source_part_vec.chunks(if cfg.width > 0 { @@ -256,6 +250,34 @@ where Ok(()) } +/// Write the title for the given config. The length will be taken from `cfg.length`. +pub fn write_title(writer: &mut W, cfg: HexConfig, use_color: bool) -> Result<(), fmt::Error> +where + W: fmt::Write, +{ + let write = |writer: &mut W, length: fmt::Arguments<'_>| { + if use_color { + writeln!( + writer, + "Length: {length} | {0}printable {1}whitespace {2}ascii_other {3}non_ascii{4}", + Style::default().fg(Color::Cyan).bold().prefix(), + Style::default().fg(Color::Green).bold().prefix(), + Style::default().fg(Color::Purple).bold().prefix(), + Style::default().fg(Color::Yellow).bold().prefix(), + Style::default().fg(Color::Yellow).suffix() + ) + } else { + writeln!(writer, "Length: {length}") + } + }; + + if let Some(len) = cfg.length { + write(writer, format_args!("{len} (0x{len:x}) bytes")) + } else { + write(writer, format_args!("unknown (stream)")) + } +} + /// Reference wrapper for use in arguments formatting. pub struct Hex<'a, T: 'a>(&'a T, HexConfig); diff --git a/crates/nu-protocol/src/errors/shell_error.rs b/crates/nu-protocol/src/errors/shell_error.rs index 525f32e925..81139d1a52 100644 --- a/crates/nu-protocol/src/errors/shell_error.rs +++ b/crates/nu-protocol/src/errors/shell_error.rs @@ -1017,7 +1017,10 @@ pub enum ShellError { /// /// Check your input's encoding. Are there any funny characters/bytes? #[error("Non-UTF8 string")] - #[diagnostic(code(nu::parser::non_utf8))] + #[diagnostic( + code(nu::parser::non_utf8), + help("see `decode` for handling character sets other than UTF-8") + )] NonUtf8 { #[label("non-UTF8 string")] span: Span, @@ -1029,7 +1032,10 @@ pub enum ShellError { /// /// Check your input's encoding. Are there any funny characters/bytes? #[error("Non-UTF8 string")] - #[diagnostic(code(nu::parser::non_utf8_custom))] + #[diagnostic( + code(nu::parser::non_utf8_custom), + help("see `decode` for handling character sets other than UTF-8") + )] NonUtf8Custom { msg: String, #[label("{msg}")] diff --git a/crates/nu-protocol/src/pipeline/byte_stream.rs b/crates/nu-protocol/src/pipeline/byte_stream.rs index 64b566a625..e77c2cc855 100644 --- a/crates/nu-protocol/src/pipeline/byte_stream.rs +++ b/crates/nu-protocol/src/pipeline/byte_stream.rs @@ -1,6 +1,8 @@ +use serde::{Deserialize, Serialize}; + use crate::{ process::{ChildPipe, ChildProcess, ExitStatus}, - ErrSpan, IntoSpanned, OutDest, PipelineData, ShellError, Span, Value, + ErrSpan, IntoSpanned, OutDest, PipelineData, ShellError, Span, Type, Value, }; #[cfg(unix)] use std::os::fd::OwnedFd; @@ -41,6 +43,24 @@ impl ByteStreamSource { }), } } + + /// Source is a `Child` or `File`, rather than `Read`. Currently affects trimming + fn is_external(&self) -> bool { + matches!( + self, + ByteStreamSource::File(..) | ByteStreamSource::Child(..) + ) + } +} + +impl Debug for ByteStreamSource { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ByteStreamSource::Read(_) => f.debug_tuple("Read").field(&"..").finish(), + ByteStreamSource::File(file) => f.debug_tuple("File").field(file).finish(), + ByteStreamSource::Child(child) => f.debug_tuple("Child").field(child).finish(), + } + } } enum SourceReader { @@ -57,6 +77,55 @@ impl Read for SourceReader { } } +impl Debug for SourceReader { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + SourceReader::Read(_) => f.debug_tuple("Read").field(&"..").finish(), + SourceReader::File(file) => f.debug_tuple("File").field(file).finish(), + } + } +} + +/// Optional type color for [`ByteStream`], which determines type compatibility. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] +pub enum ByteStreamType { + /// Compatible with [`Type::Binary`], and should only be converted to binary, even when the + /// desired type is unknown. + Binary, + /// Compatible with [`Type::String`], and should only be converted to string, even when the + /// desired type is unknown. + /// + /// This does not guarantee valid UTF-8 data, but it is conventionally so. Converting to + /// `String` still requires validation of the data. + String, + /// Unknown whether the stream should contain binary or string data. This usually is the result + /// of an external stream, e.g. an external command or file. + #[default] + Unknown, +} + +impl ByteStreamType { + /// Returns the string that describes the byte stream type - i.e., the same as what `describe` + /// produces. This can be used in type mismatch error messages. + pub fn describe(self) -> &'static str { + match self { + ByteStreamType::Binary => "binary (stream)", + ByteStreamType::String => "string (stream)", + ByteStreamType::Unknown => "byte stream", + } + } +} + +impl From for Type { + fn from(value: ByteStreamType) -> Self { + match value { + ByteStreamType::Binary => Type::Binary, + ByteStreamType::String => Type::String, + ByteStreamType::Unknown => Type::Any, + } + } +} + /// A potentially infinite, interruptible stream of bytes. /// /// To create a [`ByteStream`], you can use any of the following methods: @@ -65,20 +134,31 @@ impl Read for SourceReader { /// - [`from_iter`](ByteStream::from_iter): takes an [`Iterator`] whose items implement `AsRef<[u8]>`. /// - [`from_result_iter`](ByteStream::from_result_iter): same as [`from_iter`](ByteStream::from_iter), /// but each item is a `Result`. +/// - [`from_fn`](ByteStream::from_fn): uses a generator function to fill a buffer whenever it is +/// empty. This has high performance because it doesn't need to allocate for each chunk of data, +/// and can just reuse the same buffer. +/// +/// Byte streams have a [type](.type_()) which is used to preserve type compatibility when they +/// are the result of an internal command. It is important that this be set to the correct value. +/// [`Unknown`](ByteStreamType::Unknown) is used only for external sources where the type can not +/// be inherently determined, and having it automatically act as a string or binary depending on +/// whether it parses as UTF-8 or not is desirable. /// /// The data of a [`ByteStream`] can be accessed using one of the following methods: /// - [`reader`](ByteStream::reader): returns a [`Read`]-able type to get the raw bytes in the stream. /// - [`lines`](ByteStream::lines): splits the bytes on lines and returns an [`Iterator`] /// where each item is a `Result`. -/// - [`chunks`](ByteStream::chunks): returns an [`Iterator`] of [`Value`]s where each value is either a string or binary. +/// - [`chunks`](ByteStream::chunks): returns an [`Iterator`] of [`Value`]s where each value is +/// either a string or binary. /// Try not to use this method if possible. Rather, please use [`reader`](ByteStream::reader) /// (or [`lines`](ByteStream::lines) if it matches the situation). /// /// Additionally, there are few methods to collect a [`Bytestream`] into memory: /// - [`into_bytes`](ByteStream::into_bytes): collects all bytes into a [`Vec`]. /// - [`into_string`](ByteStream::into_string): collects all bytes into a [`String`], erroring if utf-8 decoding failed. -/// - [`into_value`](ByteStream::into_value): collects all bytes into a string [`Value`]. -/// If utf-8 decoding failed, then a binary [`Value`] is returned instead. +/// - [`into_value`](ByteStream::into_value): collects all bytes into a value typed appropriately +/// for the [type](.type_()) of this stream. If the type is [`Unknown`](ByteStreamType::Unknown), +/// it will produce a string value if the data is valid UTF-8, or a binary value otherwise. /// /// There are also a few other methods to consume all the data of a [`Bytestream`]: /// - [`drain`](ByteStream::drain): consumes all bytes and outputs nothing. @@ -88,54 +168,135 @@ impl Read for SourceReader { /// /// Internally, [`ByteStream`]s currently come in three flavors according to [`ByteStreamSource`]. /// See its documentation for more information. +#[derive(Debug)] pub struct ByteStream { stream: ByteStreamSource, span: Span, ctrlc: Option>, + type_: ByteStreamType, known_size: Option, } impl ByteStream { /// Create a new [`ByteStream`] from a [`ByteStreamSource`]. - pub fn new(stream: ByteStreamSource, span: Span, interrupt: Option>) -> Self { + pub fn new( + stream: ByteStreamSource, + span: Span, + interrupt: Option>, + type_: ByteStreamType, + ) -> Self { Self { stream, span, ctrlc: interrupt, + type_, known_size: None, } } - /// Create a new [`ByteStream`] from a [`ByteStreamSource::Read`]. + /// Create a [`ByteStream`] from an arbitrary reader. The type must be provided. pub fn read( reader: impl Read + Send + 'static, span: Span, interrupt: Option>, + type_: ByteStreamType, ) -> Self { - Self::new(ByteStreamSource::Read(Box::new(reader)), span, interrupt) + Self::new( + ByteStreamSource::Read(Box::new(reader)), + span, + interrupt, + type_, + ) } - /// Create a new [`ByteStream`] from a [`ByteStreamSource::File`]. + /// Create a [`ByteStream`] from a string. The type of the stream is always `String`. + pub fn read_string(string: String, span: Span, interrupt: Option>) -> Self { + let len = string.len(); + ByteStream::read( + Cursor::new(string.into_bytes()), + span, + interrupt, + ByteStreamType::String, + ) + .with_known_size(Some(len as u64)) + } + + /// Create a [`ByteStream`] from a byte vector. The type of the stream is always `Binary`. + pub fn read_binary(bytes: Vec, span: Span, interrupt: Option>) -> Self { + let len = bytes.len(); + ByteStream::read(Cursor::new(bytes), span, interrupt, ByteStreamType::Binary) + .with_known_size(Some(len as u64)) + } + + /// Create a [`ByteStream`] from a file. + /// + /// The type is implicitly `Unknown`, as it's not typically known whether files will + /// return text or binary. pub fn file(file: File, span: Span, interrupt: Option>) -> Self { - Self::new(ByteStreamSource::File(file), span, interrupt) + Self::new( + ByteStreamSource::File(file), + span, + interrupt, + ByteStreamType::Unknown, + ) } - /// Create a new [`ByteStream`] from a [`ByteStreamSource::Child`]. + /// Create a [`ByteStream`] from a child process's stdout and stderr. + /// + /// The type is implicitly `Unknown`, as it's not typically known whether child processes will + /// return text or binary. pub fn child(child: ChildProcess, span: Span) -> Self { - Self::new(ByteStreamSource::Child(Box::new(child)), span, None) + Self::new( + ByteStreamSource::Child(Box::new(child)), + span, + None, + ByteStreamType::Unknown, + ) } - /// Create a new [`ByteStream`] that reads from stdin. + /// Create a [`ByteStream`] that reads from stdin. + /// + /// The type is implicitly `Unknown`, as it's not typically known whether stdin is text or + /// binary. pub fn stdin(span: Span) -> Result { let stdin = os_pipe::dup_stdin().err_span(span)?; let source = ByteStreamSource::File(convert_file(stdin)); - Ok(Self::new(source, span, None)) + Ok(Self::new(source, span, None, ByteStreamType::Unknown)) + } + + /// Create a [`ByteStream`] from a generator function that writes data to the given buffer + /// when called, and returns `Ok(false)` on end of stream. + pub fn from_fn( + span: Span, + interrupt: Option>, + type_: ByteStreamType, + generator: impl FnMut(&mut Vec) -> Result + Send + 'static, + ) -> Self { + Self::read( + ReadGenerator { + buffer: Cursor::new(Vec::new()), + generator, + }, + span, + interrupt, + type_, + ) + } + + pub fn with_type(mut self, type_: ByteStreamType) -> Self { + self.type_ = type_; + self } /// Create a new [`ByteStream`] from an [`Iterator`] of bytes slices. /// /// The returned [`ByteStream`] will have a [`ByteStreamSource`] of `Read`. - pub fn from_iter(iter: I, span: Span, interrupt: Option>) -> Self + pub fn from_iter( + iter: I, + span: Span, + interrupt: Option>, + type_: ByteStreamType, + ) -> Self where I: IntoIterator, I::IntoIter: Send + 'static, @@ -143,13 +304,18 @@ impl ByteStream { { let iter = iter.into_iter(); let cursor = Some(Cursor::new(I::Item::default())); - Self::read(ReadIterator { iter, cursor }, span, interrupt) + Self::read(ReadIterator { iter, cursor }, span, interrupt, type_) } /// Create a new [`ByteStream`] from an [`Iterator`] of [`Result`] bytes slices. /// /// The returned [`ByteStream`] will have a [`ByteStreamSource`] of `Read`. - pub fn from_result_iter(iter: I, span: Span, interrupt: Option>) -> Self + pub fn from_result_iter( + iter: I, + span: Span, + interrupt: Option>, + type_: ByteStreamType, + ) -> Self where I: IntoIterator>, I::IntoIter: Send + 'static, @@ -157,7 +323,7 @@ impl ByteStream { { let iter = iter.into_iter(); let cursor = Some(Cursor::new(T::default())); - Self::read(ReadResultIterator { iter, cursor }, span, interrupt) + Self::read(ReadResultIterator { iter, cursor }, span, interrupt, type_) } /// Set the known size, in number of bytes, of the [`ByteStream`]. @@ -181,6 +347,11 @@ impl ByteStream { self.span } + /// Returns the [`ByteStreamType`] associated with the [`ByteStream`]. + pub fn type_(&self) -> ByteStreamType { + self.type_ + } + /// Returns the known size, in number of bytes, of the [`ByteStream`]. pub fn known_size(&self) -> Option { self.known_size @@ -220,8 +391,10 @@ impl ByteStream { /// Convert the [`ByteStream`] into a [`Chunks`] iterator where each element is a `Result`. /// /// Each call to [`next`](Iterator::next) reads the currently available data from the byte stream source, - /// up to a maximum size. If the chunk of bytes, or an expected portion of it, succeeds utf-8 decoding, - /// then it is returned as a [`Value::String`]. Otherwise, it is turned into a [`Value::Binary`]. + /// up to a maximum size. The values are typed according to the [type](.type_()) of the + /// stream, and if that type is [`Unknown`](ByteStreamType::Unknown), string values will be + /// produced as long as the stream continues to parse as valid UTF-8, but binary values will + /// be produced instead of the stream fails to parse as UTF-8 instead at any point. /// Any and all newlines are kept intact in each chunk. /// /// Where possible, prefer [`reader`](ByteStream::reader) or [`lines`](ByteStream::lines) over this method. @@ -232,12 +405,7 @@ impl ByteStream { /// then the stream is considered empty and `None` will be returned. pub fn chunks(self) -> Option { let reader = self.stream.reader()?; - Some(Chunks { - reader: BufReader::new(reader), - span: self.span, - ctrlc: self.ctrlc, - leftover: Vec::new(), - }) + Some(Chunks::new(reader, self.span, self.ctrlc, self.type_)) } /// Convert the [`ByteStream`] into its inner [`ByteStreamSource`]. @@ -305,33 +473,64 @@ impl ByteStream { } } - /// Collect all the bytes of the [`ByteStream`] into a [`String`]. + /// Collect the stream into a `String` in-memory. This can only succeed if the data contained is + /// valid UTF-8. /// - /// The trailing new line (`\n` or `\r\n`), if any, is removed from the [`String`] prior to being returned. + /// The trailing new line (`\n` or `\r\n`), if any, is removed from the [`String`] prior to + /// being returned, if this is a stream coming from an external process or file. /// - /// If utf-8 decoding fails, an error is returned. + /// If the [type](.type_()) is specified as `Binary`, this operation always fails, even if the + /// data would have been valid UTF-8. pub fn into_string(self) -> Result { let span = self.span; - let bytes = self.into_bytes()?; - let mut string = String::from_utf8(bytes).map_err(|_| ShellError::NonUtf8 { span })?; - trim_end_newline(&mut string); - Ok(string) + if self.type_ != ByteStreamType::Binary { + let trim = self.stream.is_external(); + let bytes = self.into_bytes()?; + let mut string = String::from_utf8(bytes).map_err(|err| ShellError::NonUtf8Custom { + span, + msg: err.to_string(), + })?; + if trim { + trim_end_newline(&mut string); + } + Ok(string) + } else { + Err(ShellError::TypeMismatch { + err_message: "expected string, but got binary".into(), + span, + }) + } } /// Collect all the bytes of the [`ByteStream`] into a [`Value`]. /// - /// If the collected bytes are successfully decoded as utf-8, then a [`Value::String`] is returned. - /// The trailing new line (`\n` or `\r\n`), if any, is removed from the [`String`] prior to being returned. - /// Otherwise, a [`Value::Binary`] is returned with any trailing new lines preserved. + /// If this is a `String` stream, the stream is decoded to UTF-8. If the stream came from an + /// external process or file, the trailing new line (`\n` or `\r\n`), if any, is removed from + /// the [`String`] prior to being returned. + /// + /// If this is a `Binary` stream, a [`Value::Binary`] is returned with any trailing new lines + /// preserved. + /// + /// If this is an `Unknown` stream, the behavior depends on whether the stream parses as valid + /// UTF-8 or not. If it does, this is uses the `String` behavior; if not, it uses the `Binary` + /// behavior. pub fn into_value(self) -> Result { let span = self.span; - let bytes = self.into_bytes()?; - let value = match String::from_utf8(bytes) { - Ok(mut str) => { - trim_end_newline(&mut str); - Value::string(str, span) - } - Err(err) => Value::binary(err.into_bytes(), span), + let trim = self.stream.is_external(); + let value = match self.type_ { + // If the type is specified, then the stream should always become that type: + ByteStreamType::Binary => Value::binary(self.into_bytes()?, span), + ByteStreamType::String => Value::string(self.into_string()?, span), + // If the type is not specified, then it just depends on whether it parses or not: + ByteStreamType::Unknown => match String::from_utf8(self.into_bytes()?) { + Ok(mut str) => { + if trim { + trim_end_newline(&mut str); + } + Value::string(str, span) + } + Err(err) => Value::binary(err.into_bytes(), span), + }, }; Ok(value) } @@ -477,12 +676,6 @@ impl ByteStream { } } -impl Debug for ByteStream { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("ByteStream").finish() - } -} - impl From for PipelineData { fn from(stream: ByteStream) -> Self { Self::ByteStream(stream, None) @@ -613,54 +806,157 @@ impl Iterator for Lines { } } +/// Turn a readable stream into [`Value`]s. +/// +/// The `Value` type depends on the type of the stream ([`ByteStreamType`]). If `Unknown`, the +/// stream will return strings as long as UTF-8 parsing succeeds, but will start returning binary +/// if it fails. pub struct Chunks { reader: BufReader, + pos: u64, + error: bool, span: Span, ctrlc: Option>, - leftover: Vec, + type_: ByteStreamType, } impl Chunks { + fn new( + reader: SourceReader, + span: Span, + ctrlc: Option>, + type_: ByteStreamType, + ) -> Self { + Self { + reader: BufReader::new(reader), + pos: 0, + error: false, + span, + ctrlc, + type_, + } + } + pub fn span(&self) -> Span { self.span } + + fn next_string(&mut self) -> Result, (Vec, ShellError)> { + // Get some data from the reader + let buf = self + .reader + .fill_buf() + .err_span(self.span) + .map_err(|err| (vec![], ShellError::from(err)))?; + + // If empty, this is EOF + if buf.is_empty() { + return Ok(None); + } + + let mut buf = buf.to_vec(); + let mut consumed = 0; + + // If the buf length is under 4 bytes, it could be invalid, so try to get more + if buf.len() < 4 { + consumed += buf.len(); + self.reader.consume(buf.len()); + match self.reader.fill_buf().err_span(self.span) { + Ok(more_bytes) => buf.extend_from_slice(more_bytes), + Err(err) => return Err((buf, err.into())), + } + } + + // Try to parse utf-8 and decide what to do + match String::from_utf8(buf) { + Ok(string) => { + self.reader.consume(string.len() - consumed); + self.pos += string.len() as u64; + Ok(Some(string)) + } + Err(err) if err.utf8_error().error_len().is_none() => { + // There is some valid data at the beginning, and this is just incomplete, so just + // consume that and return it + let valid_up_to = err.utf8_error().valid_up_to(); + if valid_up_to > consumed { + self.reader.consume(valid_up_to - consumed); + } + let mut buf = err.into_bytes(); + buf.truncate(valid_up_to); + buf.shrink_to_fit(); + let string = String::from_utf8(buf) + .expect("failed to parse utf-8 even after correcting error"); + self.pos += string.len() as u64; + Ok(Some(string)) + } + Err(err) => { + // There is an error at the beginning and we have no hope of parsing further. + let shell_error = ShellError::NonUtf8Custom { + msg: format!("invalid utf-8 sequence starting at index {}", self.pos), + span: self.span, + }; + let buf = err.into_bytes(); + // We are consuming the entire buf though, because we're returning it in case it + // will be cast to binary + if buf.len() > consumed { + self.reader.consume(buf.len() - consumed); + } + self.pos += buf.len() as u64; + Err((buf, shell_error)) + } + } + } } impl Iterator for Chunks { type Item = Result; fn next(&mut self) -> Option { - if nu_utils::ctrl_c::was_pressed(&self.ctrlc) { + if self.error || nu_utils::ctrl_c::was_pressed(&self.ctrlc) { None } else { - loop { - match self.reader.fill_buf() { - Ok(buf) => { - self.leftover.extend_from_slice(buf); + match self.type_ { + // Binary should always be binary + ByteStreamType::Binary => { + let buf = match self.reader.fill_buf().err_span(self.span) { + Ok(buf) => buf, + Err(err) => { + self.error = true; + return Some(Err(err.into())); + } + }; + if !buf.is_empty() { let len = buf.len(); + let value = Value::binary(buf, self.span); self.reader.consume(len); - break; - } - Err(e) if e.kind() == io::ErrorKind::Interrupted => continue, - Err(err) => return Some(Err(err.into_spanned(self.span).into())), - }; - } - - if self.leftover.is_empty() { - return None; - } - - match String::from_utf8(std::mem::take(&mut self.leftover)) { - Ok(str) => Some(Ok(Value::string(str, self.span))), - Err(err) => { - if err.utf8_error().error_len().is_some() { - Some(Ok(Value::binary(err.into_bytes(), self.span))) + self.pos += len as u64; + Some(Ok(value)) } else { - let i = err.utf8_error().valid_up_to(); - let mut bytes = err.into_bytes(); - self.leftover = bytes.split_off(i); - let str = String::from_utf8(bytes).expect("valid utf8"); - Some(Ok(Value::string(str, self.span))) + None + } + } + // String produces an error if UTF-8 can't be parsed + ByteStreamType::String => match self.next_string().transpose()? { + Ok(string) => Some(Ok(Value::string(string, self.span))), + Err((_, err)) => { + self.error = true; + Some(Err(err)) + } + }, + // For Unknown, we try to create strings, but we switch to binary mode if we + // fail + ByteStreamType::Unknown => { + match self.next_string().transpose()? { + Ok(string) => Some(Ok(Value::string(string, self.span))), + Err((buf, _)) if !buf.is_empty() => { + // Switch to binary mode + self.type_ = ByteStreamType::Binary; + Some(Ok(Value::binary(buf, self.span))) + } + Err((_, err)) => { + self.error = true; + Some(Err(err)) + } } } } @@ -776,11 +1072,58 @@ where Ok(len as u64) } +struct ReadGenerator +where + F: FnMut(&mut Vec) -> Result + Send + 'static, +{ + buffer: Cursor>, + generator: F, +} + +impl BufRead for ReadGenerator +where + F: FnMut(&mut Vec) -> Result + Send + 'static, +{ + fn fill_buf(&mut self) -> std::io::Result<&[u8]> { + // We have to loop, because it's important that we don't leave the buffer empty unless we're + // truly at the end of the stream. + while self.buffer.fill_buf()?.is_empty() { + // Reset the cursor to the beginning and truncate + self.buffer.set_position(0); + self.buffer.get_mut().clear(); + // Ask the generator to generate data + if !(self.generator)(self.buffer.get_mut())? { + // End of stream + break; + } + } + self.buffer.fill_buf() + } + + fn consume(&mut self, amt: usize) { + self.buffer.consume(amt); + } +} + +impl Read for ReadGenerator +where + F: FnMut(&mut Vec) -> Result + Send + 'static, +{ + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + // Straightforward implementation on top of BufRead + let slice = self.fill_buf()?; + let len = buf.len().min(slice.len()); + buf[..len].copy_from_slice(&slice[..len]); + self.consume(len); + Ok(len) + } +} + #[cfg(test)] mod tests { use super::*; - fn test_chunks(data: Vec) -> Chunks + fn test_chunks(data: Vec, type_: ByteStreamType) -> Chunks where T: AsRef<[u8]> + Default + Send + 'static, { @@ -788,46 +1131,89 @@ mod tests { iter: data.into_iter(), cursor: Some(Cursor::new(T::default())), }; - Chunks { - reader: BufReader::new(SourceReader::Read(Box::new(reader))), - span: Span::test_data(), - ctrlc: None, - leftover: Vec::new(), - } + Chunks::new( + SourceReader::Read(Box::new(reader)), + Span::test_data(), + None, + type_, + ) } #[test] - fn chunks_read_string() { - let data = vec!["Nushell", "が好きです"]; - let chunks = test_chunks(data.clone()); - let actual = chunks.collect::, _>>().unwrap(); - let expected = data.into_iter().map(Value::test_string).collect::>(); - assert_eq!(expected, actual); - } + fn chunks_read_binary_passthrough() { + let bins = vec![&[0, 1][..], &[2, 3][..]]; + let iter = test_chunks(bins.clone(), ByteStreamType::Binary); - #[test] - fn chunks_read_string_split_utf8() { - let expected = "Nushell最高!"; - let chunks = test_chunks(vec![&b"Nushell\xe6"[..], b"\x9c\x80\xe9", b"\xab\x98!"]); - - let actual = chunks + let bins_values: Vec = bins .into_iter() - .map(|value| value.and_then(Value::into_string)) - .collect::>() - .unwrap(); - - assert_eq!(expected, actual); + .map(|bin| Value::binary(bin, Span::test_data())) + .collect(); + assert_eq!( + bins_values, + iter.collect::, _>>().expect("error") + ); } #[test] - fn chunks_returns_string_or_binary() { - let chunks = test_chunks(vec![b"Nushell".as_slice(), b"\x9c\x80\xe9abcd", b"efgh"]); - let actual = chunks.collect::, _>>().unwrap(); - let expected = vec![ - Value::test_string("Nushell"), - Value::test_binary(b"\x9c\x80\xe9abcd"), - Value::test_string("efgh"), - ]; - assert_eq!(actual, expected) + fn chunks_read_string_clean() { + let strs = vec!["Nushell", "が好きです"]; + let iter = test_chunks(strs.clone(), ByteStreamType::String); + + let strs_values: Vec = strs + .into_iter() + .map(|string| Value::string(string, Span::test_data())) + .collect(); + assert_eq!( + strs_values, + iter.collect::, _>>().expect("error") + ); + } + + #[test] + fn chunks_read_string_split_boundary() { + let real = "Nushell最高!"; + let chunks = vec![&b"Nushell\xe6"[..], &b"\x9c\x80\xe9"[..], &b"\xab\x98!"[..]]; + let iter = test_chunks(chunks.clone(), ByteStreamType::String); + + let mut string = String::new(); + for value in iter { + let chunk_string = value.expect("error").into_string().expect("not a string"); + string.push_str(&chunk_string); + } + assert_eq!(real, string); + } + + #[test] + fn chunks_read_string_utf8_error() { + let chunks = vec![&b"Nushell\xe6"[..], &b"\x9c\x80\xe9"[..], &b"\xab"[..]]; + let iter = test_chunks(chunks, ByteStreamType::String); + + let mut string = String::new(); + for value in iter { + match value { + Ok(value) => string.push_str(&value.into_string().expect("not a string")), + Err(err) => { + println!("string so far: {:?}", string); + println!("got error: {err:?}"); + assert!(!string.is_empty()); + assert!(matches!(err, ShellError::NonUtf8Custom { .. })); + return; + } + } + } + panic!("no error"); + } + + #[test] + fn chunks_read_unknown_fallback() { + let chunks = vec![&b"Nushell"[..], &b"\x9c\x80\xe9abcd"[..], &b"efgh"[..]]; + let mut iter = test_chunks(chunks, ByteStreamType::Unknown); + + let mut get = || iter.next().expect("end of iter").expect("error"); + + assert_eq!(Value::test_string("Nushell"), get()); + assert_eq!(Value::test_binary(b"\x9c\x80\xe9abcd"), get()); + // Once it's in binary mode it won't go back + assert_eq!(Value::test_binary(b"efgh"), get()); } } diff --git a/crates/nu-protocol/src/pipeline/pipeline_data.rs b/crates/nu-protocol/src/pipeline/pipeline_data.rs index 7faa4ed221..0a13ffa4b3 100644 --- a/crates/nu-protocol/src/pipeline/pipeline_data.rs +++ b/crates/nu-protocol/src/pipeline/pipeline_data.rs @@ -2,8 +2,8 @@ use crate::{ ast::{Call, PathMember}, engine::{EngineState, Stack}, process::{ChildPipe, ChildProcess, ExitStatus}, - ByteStream, Config, ErrSpan, ListStream, OutDest, PipelineMetadata, Range, ShellError, Span, - Value, + ByteStream, ByteStreamType, Config, ErrSpan, ListStream, OutDest, PipelineMetadata, Range, + ShellError, Span, Value, }; use nu_utils::{stderr_write_all_and_flush, stdout_write_all_and_flush}; use std::{ @@ -170,6 +170,8 @@ impl PipelineData { /// Try convert from self into iterator /// /// It returns Err if the `self` cannot be converted to an iterator. + /// + /// The `span` should be the span of the command or operation that would raise an error. pub fn into_iter_strict(self, span: Span) -> Result { Ok(PipelineIterator(match self { PipelineData::Value(value, ..) => { @@ -274,7 +276,7 @@ impl PipelineData { span: head, }), PipelineData::ByteStream(stream, ..) => Err(ShellError::IncompatiblePathAccess { - type_name: "byte stream".to_string(), + type_name: stream.type_().describe().to_owned(), span: stream.span(), }), } @@ -313,16 +315,7 @@ impl PipelineData { Ok(PipelineData::ListStream(stream.map(f), metadata)) } PipelineData::ByteStream(stream, metadata) => { - // TODO: is this behavior desired / correct ? - let span = stream.span(); - let value = match String::from_utf8(stream.into_bytes()?) { - Ok(mut str) => { - str.truncate(str.trim_end_matches(LINE_ENDING_PATTERN).len()); - f(Value::string(str, span)) - } - Err(err) => f(Value::binary(err.into_bytes(), span)), - }; - Ok(value.into_pipeline_data_with_metadata(metadata)) + Ok(f(stream.into_value()?).into_pipeline_data_with_metadata(metadata)) } } } @@ -543,22 +536,26 @@ impl PipelineData { no_newline: bool, to_stderr: bool, ) -> Result, ShellError> { - if let PipelineData::ByteStream(stream, ..) = self { - stream.print(to_stderr) - } else { - // If the table function is in the declarations, then we can use it - // to create the table value that will be printed in the terminal - if let Some(decl_id) = engine_state.table_decl_id { - let command = engine_state.get_decl(decl_id); - if command.block_id().is_some() { - self.write_all_and_flush(engine_state, no_newline, to_stderr) + match self { + // Print byte streams directly as long as they aren't binary. + PipelineData::ByteStream(stream, ..) if stream.type_() != ByteStreamType::Binary => { + stream.print(to_stderr) + } + _ => { + // If the table function is in the declarations, then we can use it + // to create the table value that will be printed in the terminal + if let Some(decl_id) = engine_state.table_decl_id { + let command = engine_state.get_decl(decl_id); + if command.block_id().is_some() { + self.write_all_and_flush(engine_state, no_newline, to_stderr) + } else { + let call = Call::new(Span::new(0, 0)); + let table = command.run(engine_state, stack, &call, self)?; + table.write_all_and_flush(engine_state, no_newline, to_stderr) + } } else { - let call = Call::new(Span::new(0, 0)); - let table = command.run(engine_state, stack, &call, self)?; - table.write_all_and_flush(engine_state, no_newline, to_stderr) + self.write_all_and_flush(engine_state, no_newline, to_stderr) } - } else { - self.write_all_and_flush(engine_state, no_newline, to_stderr) } } } diff --git a/crates/nu_plugin_example/src/commands/collect_bytes.rs b/crates/nu_plugin_example/src/commands/collect_bytes.rs index 51ca1d4222..398a1de4b1 100644 --- a/crates/nu_plugin_example/src/commands/collect_bytes.rs +++ b/crates/nu_plugin_example/src/commands/collect_bytes.rs @@ -1,6 +1,7 @@ use nu_plugin::{EngineInterface, EvaluatedCall, PluginCommand}; use nu_protocol::{ - ByteStream, Category, Example, LabeledError, PipelineData, Signature, Type, Value, + ByteStream, ByteStreamType, Category, Example, LabeledError, PipelineData, Signature, Type, + Value, }; use crate::ExamplePlugin; @@ -52,6 +53,7 @@ impl PluginCommand for CollectBytes { input.into_iter().map(Value::coerce_into_binary), call.head, None, + ByteStreamType::Unknown, ), None, ))