ogg_pager: Redo pagination for proper packet handling

This commit is contained in:
Serial 2022-12-03 13:39:37 -05:00 committed by Alex
parent bf0ac4b0d1
commit 75adb69ed1
2 changed files with 236 additions and 75 deletions

View file

@ -3,12 +3,14 @@
mod crc; mod crc;
mod error; mod error;
mod header; mod header;
mod paginate;
use std::io::{Read, Seek, SeekFrom}; use std::io::{Read, Seek, SeekFrom, Write};
pub use crc::crc32; pub use crc::crc32;
pub use error::{PageError, Result}; pub use error::{PageError, Result};
pub use header::PageHeader; pub use header::PageHeader;
pub use paginate::paginate;
const CONTINUED_PACKET: u8 = 0x01; const CONTINUED_PACKET: u8 = 0x01;
@ -183,78 +185,6 @@ impl Page {
} }
} }
/// Create pages from a packet
///
/// # Example
///
/// ```rust,ignore
/// use ogg_pager::paginate;
///
/// // Creating the comment header
/// let comment_header_packet = vec![...];
/// let stream_serial_number = 2784419176;
///
/// let pages = paginate(&comment_header_packet, stream_serial_number, 0, 0);
/// ```
#[allow(clippy::mixed_read_write_in_expression)]
pub fn paginate(packet: &[u8], stream_serial: u32, abgp: u64, flags: u8) -> Vec<Page> {
let mut pages = Vec::new();
let mut first_page = true;
let mut pos = 0;
for (idx, page) in packet.chunks(MAX_CONTENT_SIZE).enumerate() {
let p = Page {
content: page.to_vec(),
header: PageHeader {
start: pos,
header_type_flag: {
if first_page {
if flags & CONTAINS_FIRST_PAGE_OF_BITSTREAM == 0x02 {
CONTAINS_LAST_PAGE_OF_BITSTREAM
} else {
0
}
} else {
CONTINUED_PACKET
}
},
abgp,
stream_serial,
sequence_number: (idx + 1) as u32,
checksum: 0,
},
end: {
pos += page.len() as u64;
pos
},
};
first_page = false;
pages.push(p);
}
if flags & CONTAINS_LAST_PAGE_OF_BITSTREAM == 0x04 {
if let Some(last) = pages.last_mut() {
last.header.header_type_flag |= CONTAINS_LAST_PAGE_OF_BITSTREAM;
}
}
if pages.len() > 1 {
let last_idx = pages.len() - 1;
for (idx, p) in pages.iter_mut().enumerate() {
if idx == last_idx {
break;
}
p.header.abgp = 1_u64.wrapping_neg();
}
}
pages
}
/// Creates a segment table based on the length /// Creates a segment table based on the length
/// ///
/// # Errors /// # Errors
@ -507,9 +437,43 @@ impl Packets {
true true
} }
pub fn paginate(&self, stream_serial: u32, abgp: u64, flags: u8) -> Result<Vec<Page>> {
let mut packets = Vec::new();
let mut pos = 0;
for packet_size in self.packet_sizes.iter().copied() {
packets.push(&self.content[pos..pos + packet_size as usize]);
pos += packet_size as usize;
}
paginate(packets, stream_serial, abgp, flags)
}
/// Write packets to a writer
pub fn write_to<W>(
&self,
writer: &mut W,
flags: u8,
stream_serial: u32,
abgp: u64,
) -> Result<()>
where
W: Write,
{
let paginated = self.paginate(stream_serial, abgp, flags)?;
for mut page in paginated.into_iter() {
page.gen_crc()?;
writer.write_all(&page.as_bytes()?)?;
}
Ok(())
}
} }
/// An iterator over packets /// An iterator over packets
#[derive(Clone, PartialEq, Eq, Debug)]
pub struct PacketsIter<'a> { pub struct PacketsIter<'a> {
content: &'a [u8], content: &'a [u8],
packet_sizes: &'a [u64], packet_sizes: &'a [u64],
@ -584,7 +548,7 @@ mod tests {
fn paginate_large() { fn paginate_large() {
let packet = std::fs::read("test_assets/large_comment_packet.page").unwrap(); let packet = std::fs::read("test_assets/large_comment_packet.page").unwrap();
let pages = paginate(&packet, 1234, 0, 0); let pages = paginate([packet.as_slice()], 1234, 0, 0).unwrap();
let len = pages.len(); let len = pages.len();
@ -611,7 +575,7 @@ mod tests {
assert_eq!(header.abgp, u64::MAX); assert_eq!(header.abgp, u64::MAX);
} }
assert_eq!(header.sequence_number, (i + 1) as u32); assert_eq!(header.sequence_number, i as u32);
if i == 0 { if i == 0 {
assert_eq!(header.header_type_flag, 0); assert_eq!(header.header_type_flag, 0);

197
ogg_pager/src/paginate.rs Normal file
View file

@ -0,0 +1,197 @@
use crate::error::Result;
use crate::{
segment_table, Page, PageHeader, CONTAINS_FIRST_PAGE_OF_BITSTREAM,
CONTAINS_LAST_PAGE_OF_BITSTREAM, CONTINUED_PACKET, MAX_CONTENT_SIZE,
};
use std::io::Read;
struct PaginateContext {
pages: Vec<Page>,
abgp: u64,
stream_serial: u32,
header_flags: u8,
flags: PaginateContextFlags,
pos: u64,
idx: usize,
remaining_page_size: usize,
current_packet_len: usize,
}
impl PaginateContext {
fn new(abgp: u64, stream_serial: u32, header_flags: u8) -> Self {
Self {
pages: Vec::new(),
abgp,
stream_serial,
header_flags,
flags: PaginateContextFlags {
first_page: true,
fresh_packet: true,
packet_spans_multiple_pages: false,
packet_finished_on_page: false,
},
pos: 0,
idx: 0,
remaining_page_size: MAX_CONTENT_SIZE,
current_packet_len: 0,
}
}
fn flush(&mut self, content: &mut Vec<u8>, segments: &mut Vec<u8>) {
let header = PageHeader {
start: self.pos,
header_type_flag: {
match self.flags.first_page {
true if self.header_flags & CONTAINS_FIRST_PAGE_OF_BITSTREAM != 0 => {
CONTAINS_FIRST_PAGE_OF_BITSTREAM
},
// A packet from the previous page continues onto this page
false if !self.flags.fresh_packet => CONTINUED_PACKET,
_ => 0,
}
},
abgp: if self.flags.packet_finished_on_page {
self.abgp
} else {
// A special value of '-1' (in two's complement) indicates that no packets
// finish on this page.
1_u64.wrapping_neg()
},
stream_serial: self.stream_serial,
sequence_number: self.idx as u32,
// No need to calculate this yet
checksum: 0,
};
let content = core::mem::take(content);
let segments = core::mem::take(segments);
let content_len = content.len();
self.pos += content_len as u64;
self.pages.push(Page {
content,
segments,
header,
end: self.pos,
});
self.idx += 1;
self.flags.packet_finished_on_page = false;
self.remaining_page_size = MAX_CONTENT_SIZE;
// Moving on to a new packet
if self.pos > self.current_packet_len as u64 {
self.flags.packet_spans_multiple_pages = false;
}
}
}
struct PaginateContextFlags {
first_page: bool,
fresh_packet: bool,
packet_spans_multiple_pages: bool,
packet_finished_on_page: bool,
}
/// Create pages from a list of packets
///
/// # Example
///
/// ```rust,ignore
/// use ogg_pager::paginate;
///
/// // Creating the comment header
/// let comment_header_packet = vec![...];
/// let stream_serial_number = 2784419176;
///
/// let pages = paginate(&comment_header_packet, stream_serial_number, 0, 0);
/// ```
pub fn paginate<'a, I: 'a>(
packets: I,
stream_serial: u32,
abgp: u64,
flags: u8,
) -> Result<Vec<Page>>
where
I: IntoIterator<Item = &'a [u8]>,
{
const MAX_SEGMENT_COUNT: usize = 255;
let mut ctx = PaginateContext::new(abgp, stream_serial, flags);
let mut packets_iter = packets.into_iter();
let mut packet = match packets_iter.next() {
Some(packet) => packet,
// We weren't given any content to paginate
None => return Ok(ctx.pages),
};
ctx.current_packet_len = packet.len();
let mut segments = Vec::with_capacity(255);
let mut page_content = Vec::new();
loop {
if !ctx.flags.packet_spans_multiple_pages && !ctx.flags.first_page {
match packets_iter.next() {
Some(packet_) => {
packet = packet_;
ctx.current_packet_len = packet.len();
ctx.flags.fresh_packet = true;
},
None => break,
};
}
// We read as much of the packet as we can, given the amount of space left in the page.
// The packet may need to span multiple pages.
let bytes_read = packet
.take(ctx.remaining_page_size as u64)
.read_to_end(&mut page_content)?;
ctx.remaining_page_size -= bytes_read;
packet = &packet[bytes_read..];
segments.append(&mut segment_table(bytes_read)?);
let remaining_segments = MAX_SEGMENT_COUNT - segments.len();
// We have a maximum of 255 segments available per page, if we require more than
// is left in the segment table, we'll have to split the packet into multiple pages.
let segments_required = (packet.len() / 255) + 1;
ctx.flags.packet_spans_multiple_pages = segments_required > remaining_segments;
// We need to indicate whether or not any packet was finished on this page.
// This is used for the absolute granule position.
if packet.is_empty() {
ctx.flags.packet_finished_on_page = true;
}
// The first packet of the bitstream must have its own page, unlike any other packet.
let first_packet_finished_on_page = ctx.flags.first_page
&& ctx.header_flags & CONTAINS_FIRST_PAGE_OF_BITSTREAM != 0
&& ctx.flags.packet_finished_on_page;
if first_packet_finished_on_page
// We've completely filled this page, we need to flush before moving on
|| (ctx.remaining_page_size == 0 || remaining_segments == 0)
{
ctx.flush(&mut page_content, &mut segments);
}
ctx.flags.first_page = false;
ctx.flags.fresh_packet = false;
}
// Flush any content leftover
if !page_content.is_empty() {
ctx.flush(&mut page_content, &mut segments);
}
if flags & CONTAINS_LAST_PAGE_OF_BITSTREAM == 0x04 {
if let Some(last) = ctx.pages.last_mut() {
last.header.header_type_flag |= CONTAINS_LAST_PAGE_OF_BITSTREAM;
}
}
Ok(ctx.pages)
}