Add binary input support to chunks (#14649)

# Description

Adds support for `Value::Binary` and `ByteStream` inputs to `chunks`.
In case of `ByteStream`, stream is not collected, and chunked as it
comes.

This works:
```nushell
open --raw /dev/urandom | chunks 4 | take 4
```

# User-Facing Changes

`chunks` can now be used on binary values and streams.

# Tests + Formatting

- 🟢 toolkit fmt
- 🟢 toolkit clippy
- 🟢 toolkit test
- 🟢 toolkit test stdlib

# After Submitting
N/A
This commit is contained in:
Bahex 2024-12-25 15:14:48 +03:00 committed by GitHub
parent f1ce0c98fd
commit f2dcae570c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -1,6 +1,9 @@
use nu_engine::command_prelude::*; use nu_engine::command_prelude::*;
use nu_protocol::ListStream; use nu_protocol::ListStream;
use std::num::NonZeroUsize; use std::{
io::{BufRead, Cursor, ErrorKind},
num::NonZeroUsize,
};
#[derive(Clone)] #[derive(Clone)]
pub struct Chunks; pub struct Chunks;
@ -15,6 +18,7 @@ impl Command for Chunks {
.input_output_types(vec![ .input_output_types(vec![
(Type::table(), Type::list(Type::table())), (Type::table(), Type::list(Type::table())),
(Type::list(Type::Any), Type::list(Type::list(Type::Any))), (Type::list(Type::Any), Type::list(Type::list(Type::Any))),
(Type::Binary, Type::list(Type::Binary)),
]) ])
.required("chunk_size", SyntaxShape::Int, "The size of each chunk.") .required("chunk_size", SyntaxShape::Int, "The size of each chunk.")
.category(Category::Filters) .category(Category::Filters)
@ -72,6 +76,15 @@ impl Command for Chunks {
]), ]),
])), ])),
}, },
Example {
example: "0x[11 22 33 44 55 66 77 88] | chunks 3",
description: "Chunk the bytes of a binary into triplets",
result: Some(Value::test_list(vec![
Value::test_binary(vec![0x11, 0x22, 0x33]),
Value::test_binary(vec![0x44, 0x55, 0x66]),
Value::test_binary(vec![0x77, 0x88]),
])),
},
] ]
} }
@ -116,6 +129,43 @@ pub fn chunks(
let stream = stream.modify(|iter| ChunksIter::new(iter, chunk_size, span)); let stream = stream.modify(|iter| ChunksIter::new(iter, chunk_size, span));
Ok(PipelineData::ListStream(stream, metadata)) Ok(PipelineData::ListStream(stream, metadata))
} }
PipelineData::Value(Value::Binary { val, .. }, metadata) => {
let chunk_read = ChunkRead {
reader: Cursor::new(val),
size: chunk_size,
};
let value_stream = chunk_read.map(move |chunk| match chunk {
Ok(chunk) => Value::binary(chunk, span),
Err(e) => Value::error(e.into(), span),
});
let pipeline_data_with_metadata = value_stream.into_pipeline_data_with_metadata(
span,
engine_state.signals().clone(),
metadata,
);
Ok(pipeline_data_with_metadata)
}
PipelineData::ByteStream(stream, metadata) => {
let pipeline_data = match stream.reader() {
None => PipelineData::Empty,
Some(reader) => {
let chunk_read = ChunkRead {
reader,
size: chunk_size,
};
let value_stream = chunk_read.map(move |chunk| match chunk {
Ok(chunk) => Value::binary(chunk, span),
Err(e) => Value::error(e.into(), span),
});
value_stream.into_pipeline_data_with_metadata(
span,
engine_state.signals().clone(),
metadata,
)
}
};
Ok(pipeline_data)
}
input => Err(input.unsupported_input_error("list", span)), input => Err(input.unsupported_input_error("list", span)),
} }
} }
@ -148,10 +198,71 @@ impl<I: Iterator<Item = Value>> Iterator for ChunksIter<I> {
} }
} }
struct ChunkRead<R: BufRead> {
reader: R,
size: NonZeroUsize,
}
impl<R: BufRead> Iterator for ChunkRead<R> {
type Item = Result<Vec<u8>, std::io::Error>;
fn next(&mut self) -> Option<Self::Item> {
let mut buf = Vec::with_capacity(self.size.get());
while buf.len() < self.size.get() {
let available = match self.reader.fill_buf() {
Ok([]) if buf.is_empty() => return None,
Ok([]) => return Some(Ok(buf)),
Ok(n) => n,
Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
Err(e) => return Some(Err(e)),
};
let needed = self.size.get() - buf.len();
let have = available.len().min(needed);
buf.extend_from_slice(&available[..have]);
self.reader.consume(have);
}
Some(Ok(buf))
}
}
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use std::io::Read;
use super::*; use super::*;
#[test]
fn chunk_read() {
let s = "hello world";
let data = Cursor::new(s);
let chunk_read = ChunkRead {
reader: data,
size: NonZeroUsize::new(4).unwrap(),
};
let chunks = chunk_read.map(|e| e.unwrap()).collect::<Vec<_>>();
assert_eq!(
chunks,
[s[..4].as_bytes(), s[4..8].as_bytes(), s[8..].as_bytes()]
);
}
#[test]
fn chunk_read_stream() {
let s = "hello world";
let data = Cursor::new(&s[..3])
.chain(Cursor::new(&s[3..9]))
.chain(Cursor::new(&s[9..]));
let chunk_read = ChunkRead {
reader: data,
size: NonZeroUsize::new(4).unwrap(),
};
let chunks = chunk_read.map(|e| e.unwrap()).collect::<Vec<_>>();
assert_eq!(
chunks,
[s[..4].as_bytes(), s[4..8].as_bytes(), s[8..].as_bytes()]
);
}
#[test] #[test]
fn test_examples() { fn test_examples() {
use crate::test_examples; use crate::test_examples;