diff --git a/crates/nu-command/src/filters/chunks.rs b/crates/nu-command/src/filters/chunks.rs index 4db436ca25..93757c36e8 100644 --- a/crates/nu-command/src/filters/chunks.rs +++ b/crates/nu-command/src/filters/chunks.rs @@ -1,6 +1,9 @@ use nu_engine::command_prelude::*; use nu_protocol::ListStream; -use std::num::NonZeroUsize; +use std::{ + io::{BufRead, Cursor, ErrorKind}, + num::NonZeroUsize, +}; #[derive(Clone)] pub struct Chunks; @@ -15,6 +18,7 @@ impl Command for Chunks { .input_output_types(vec![ (Type::table(), Type::list(Type::table())), (Type::list(Type::Any), Type::list(Type::list(Type::Any))), + (Type::Binary, Type::list(Type::Binary)), ]) .required("chunk_size", SyntaxShape::Int, "The size of each chunk.") .category(Category::Filters) @@ -72,6 +76,15 @@ impl Command for Chunks { ]), ])), }, + Example { + example: "0x[11 22 33 44 55 66 77 88] | chunks 3", + description: "Chunk the bytes of a binary into triplets", + result: Some(Value::test_list(vec![ + Value::test_binary(vec![0x11, 0x22, 0x33]), + Value::test_binary(vec![0x44, 0x55, 0x66]), + Value::test_binary(vec![0x77, 0x88]), + ])), + }, ] } @@ -116,6 +129,43 @@ pub fn chunks( let stream = stream.modify(|iter| ChunksIter::new(iter, chunk_size, span)); Ok(PipelineData::ListStream(stream, metadata)) } + PipelineData::Value(Value::Binary { val, .. }, metadata) => { + let chunk_read = ChunkRead { + reader: Cursor::new(val), + size: chunk_size, + }; + let value_stream = chunk_read.map(move |chunk| match chunk { + Ok(chunk) => Value::binary(chunk, span), + Err(e) => Value::error(e.into(), span), + }); + let pipeline_data_with_metadata = value_stream.into_pipeline_data_with_metadata( + span, + engine_state.signals().clone(), + metadata, + ); + Ok(pipeline_data_with_metadata) + } + PipelineData::ByteStream(stream, metadata) => { + let pipeline_data = match stream.reader() { + None => PipelineData::Empty, + Some(reader) => { + let chunk_read = ChunkRead { + reader, + size: chunk_size, + }; + let value_stream = chunk_read.map(move |chunk| match chunk { + Ok(chunk) => Value::binary(chunk, span), + Err(e) => Value::error(e.into(), span), + }); + value_stream.into_pipeline_data_with_metadata( + span, + engine_state.signals().clone(), + metadata, + ) + } + }; + Ok(pipeline_data) + } input => Err(input.unsupported_input_error("list", span)), } } @@ -148,10 +198,71 @@ impl> Iterator for ChunksIter { } } +struct ChunkRead { + reader: R, + size: NonZeroUsize, +} + +impl Iterator for ChunkRead { + type Item = Result, std::io::Error>; + + fn next(&mut self) -> Option { + let mut buf = Vec::with_capacity(self.size.get()); + while buf.len() < self.size.get() { + let available = match self.reader.fill_buf() { + Ok([]) if buf.is_empty() => return None, + Ok([]) => return Some(Ok(buf)), + Ok(n) => n, + Err(ref e) if e.kind() == ErrorKind::Interrupted => continue, + Err(e) => return Some(Err(e)), + }; + let needed = self.size.get() - buf.len(); + let have = available.len().min(needed); + buf.extend_from_slice(&available[..have]); + self.reader.consume(have); + } + Some(Ok(buf)) + } +} + #[cfg(test)] mod test { + use std::io::Read; + use super::*; + #[test] + fn chunk_read() { + let s = "hello world"; + let data = Cursor::new(s); + let chunk_read = ChunkRead { + reader: data, + size: NonZeroUsize::new(4).unwrap(), + }; + let chunks = chunk_read.map(|e| e.unwrap()).collect::>(); + assert_eq!( + chunks, + [s[..4].as_bytes(), s[4..8].as_bytes(), s[8..].as_bytes()] + ); + } + + #[test] + fn chunk_read_stream() { + let s = "hello world"; + let data = Cursor::new(&s[..3]) + .chain(Cursor::new(&s[3..9])) + .chain(Cursor::new(&s[9..])); + let chunk_read = ChunkRead { + reader: data, + size: NonZeroUsize::new(4).unwrap(), + }; + let chunks = chunk_read.map(|e| e.unwrap()).collect::>(); + assert_eq!( + chunks, + [s[..4].as_bytes(), s[4..8].as_bytes(), s[8..].as_bytes()] + ); + } + #[test] fn test_examples() { use crate::test_examples;