diff --git a/ogg_pager/src/lib.rs b/ogg_pager/src/lib.rs index 11e59a40..cb7e436e 100644 --- a/ogg_pager/src/lib.rs +++ b/ogg_pager/src/lib.rs @@ -3,12 +3,14 @@ mod crc; mod error; mod header; +mod paginate; -use std::io::{Read, Seek, SeekFrom}; +use std::io::{Read, Seek, SeekFrom, Write}; pub use crc::crc32; pub use error::{PageError, Result}; pub use header::PageHeader; +pub use paginate::paginate; const CONTINUED_PACKET: u8 = 0x01; @@ -183,78 +185,6 @@ impl Page { } } -/// Create pages from a packet -/// -/// # Example -/// -/// ```rust,ignore -/// use ogg_pager::paginate; -/// -/// // Creating the comment header -/// let comment_header_packet = vec![...]; -/// let stream_serial_number = 2784419176; -/// -/// let pages = paginate(&comment_header_packet, stream_serial_number, 0, 0); -/// ``` -#[allow(clippy::mixed_read_write_in_expression)] -pub fn paginate(packet: &[u8], stream_serial: u32, abgp: u64, flags: u8) -> Vec { - let mut pages = Vec::new(); - - let mut first_page = true; - let mut pos = 0; - - for (idx, page) in packet.chunks(MAX_CONTENT_SIZE).enumerate() { - let p = Page { - content: page.to_vec(), - header: PageHeader { - start: pos, - header_type_flag: { - if first_page { - if flags & CONTAINS_FIRST_PAGE_OF_BITSTREAM == 0x02 { - CONTAINS_LAST_PAGE_OF_BITSTREAM - } else { - 0 - } - } else { - CONTINUED_PACKET - } - }, - abgp, - stream_serial, - sequence_number: (idx + 1) as u32, - checksum: 0, - }, - end: { - pos += page.len() as u64; - pos - }, - }; - - first_page = false; - pages.push(p); - } - - if flags & CONTAINS_LAST_PAGE_OF_BITSTREAM == 0x04 { - if let Some(last) = pages.last_mut() { - last.header.header_type_flag |= CONTAINS_LAST_PAGE_OF_BITSTREAM; - } - } - - if pages.len() > 1 { - let last_idx = pages.len() - 1; - - for (idx, p) in pages.iter_mut().enumerate() { - if idx == last_idx { - break; - } - - p.header.abgp = 1_u64.wrapping_neg(); - } - } - - pages -} - /// Creates a segment table based on the length /// /// # Errors @@ -507,9 +437,43 @@ impl Packets { true } + + pub fn paginate(&self, stream_serial: u32, abgp: u64, flags: u8) -> Result> { + let mut packets = Vec::new(); + + let mut pos = 0; + for packet_size in self.packet_sizes.iter().copied() { + packets.push(&self.content[pos..pos + packet_size as usize]); + pos += packet_size as usize; + } + + paginate(packets, stream_serial, abgp, flags) + } + + /// Write packets to a writer + pub fn write_to( + &self, + writer: &mut W, + flags: u8, + stream_serial: u32, + abgp: u64, + ) -> Result<()> + where + W: Write, + { + let paginated = self.paginate(stream_serial, abgp, flags)?; + + for mut page in paginated.into_iter() { + page.gen_crc()?; + writer.write_all(&page.as_bytes()?)?; + } + + Ok(()) + } } /// An iterator over packets +#[derive(Clone, PartialEq, Eq, Debug)] pub struct PacketsIter<'a> { content: &'a [u8], packet_sizes: &'a [u64], @@ -584,7 +548,7 @@ mod tests { fn paginate_large() { let packet = std::fs::read("test_assets/large_comment_packet.page").unwrap(); - let pages = paginate(&packet, 1234, 0, 0); + let pages = paginate([packet.as_slice()], 1234, 0, 0).unwrap(); let len = pages.len(); @@ -611,7 +575,7 @@ mod tests { assert_eq!(header.abgp, u64::MAX); } - assert_eq!(header.sequence_number, (i + 1) as u32); + assert_eq!(header.sequence_number, i as u32); if i == 0 { assert_eq!(header.header_type_flag, 0); diff --git a/ogg_pager/src/paginate.rs b/ogg_pager/src/paginate.rs new file mode 100644 index 00000000..357852b5 --- /dev/null +++ b/ogg_pager/src/paginate.rs @@ -0,0 +1,197 @@ +use crate::error::Result; +use crate::{ + segment_table, Page, PageHeader, CONTAINS_FIRST_PAGE_OF_BITSTREAM, + CONTAINS_LAST_PAGE_OF_BITSTREAM, CONTINUED_PACKET, MAX_CONTENT_SIZE, +}; + +use std::io::Read; + +struct PaginateContext { + pages: Vec, + abgp: u64, + stream_serial: u32, + header_flags: u8, + flags: PaginateContextFlags, + pos: u64, + idx: usize, + remaining_page_size: usize, + current_packet_len: usize, +} + +impl PaginateContext { + fn new(abgp: u64, stream_serial: u32, header_flags: u8) -> Self { + Self { + pages: Vec::new(), + abgp, + stream_serial, + header_flags, + flags: PaginateContextFlags { + first_page: true, + fresh_packet: true, + packet_spans_multiple_pages: false, + packet_finished_on_page: false, + }, + pos: 0, + idx: 0, + remaining_page_size: MAX_CONTENT_SIZE, + current_packet_len: 0, + } + } + + fn flush(&mut self, content: &mut Vec, segments: &mut Vec) { + let header = PageHeader { + start: self.pos, + header_type_flag: { + match self.flags.first_page { + true if self.header_flags & CONTAINS_FIRST_PAGE_OF_BITSTREAM != 0 => { + CONTAINS_FIRST_PAGE_OF_BITSTREAM + }, + // A packet from the previous page continues onto this page + false if !self.flags.fresh_packet => CONTINUED_PACKET, + _ => 0, + } + }, + abgp: if self.flags.packet_finished_on_page { + self.abgp + } else { + // A special value of '-1' (in two's complement) indicates that no packets + // finish on this page. + 1_u64.wrapping_neg() + }, + stream_serial: self.stream_serial, + sequence_number: self.idx as u32, + // No need to calculate this yet + checksum: 0, + }; + + let content = core::mem::take(content); + let segments = core::mem::take(segments); + + let content_len = content.len(); + self.pos += content_len as u64; + + self.pages.push(Page { + content, + segments, + header, + end: self.pos, + }); + + self.idx += 1; + self.flags.packet_finished_on_page = false; + self.remaining_page_size = MAX_CONTENT_SIZE; + + // Moving on to a new packet + if self.pos > self.current_packet_len as u64 { + self.flags.packet_spans_multiple_pages = false; + } + } +} + +struct PaginateContextFlags { + first_page: bool, + fresh_packet: bool, + packet_spans_multiple_pages: bool, + packet_finished_on_page: bool, +} + +/// Create pages from a list of packets +/// +/// # Example +/// +/// ```rust,ignore +/// use ogg_pager::paginate; +/// +/// // Creating the comment header +/// let comment_header_packet = vec![...]; +/// let stream_serial_number = 2784419176; +/// +/// let pages = paginate(&comment_header_packet, stream_serial_number, 0, 0); +/// ``` +pub fn paginate<'a, I: 'a>( + packets: I, + stream_serial: u32, + abgp: u64, + flags: u8, +) -> Result> +where + I: IntoIterator, +{ + const MAX_SEGMENT_COUNT: usize = 255; + + let mut ctx = PaginateContext::new(abgp, stream_serial, flags); + + let mut packets_iter = packets.into_iter(); + let mut packet = match packets_iter.next() { + Some(packet) => packet, + // We weren't given any content to paginate + None => return Ok(ctx.pages), + }; + ctx.current_packet_len = packet.len(); + + let mut segments = Vec::with_capacity(255); + let mut page_content = Vec::new(); + + loop { + if !ctx.flags.packet_spans_multiple_pages && !ctx.flags.first_page { + match packets_iter.next() { + Some(packet_) => { + packet = packet_; + ctx.current_packet_len = packet.len(); + ctx.flags.fresh_packet = true; + }, + None => break, + }; + } + + // We read as much of the packet as we can, given the amount of space left in the page. + // The packet may need to span multiple pages. + let bytes_read = packet + .take(ctx.remaining_page_size as u64) + .read_to_end(&mut page_content)?; + ctx.remaining_page_size -= bytes_read; + packet = &packet[bytes_read..]; + + segments.append(&mut segment_table(bytes_read)?); + let remaining_segments = MAX_SEGMENT_COUNT - segments.len(); + + // We have a maximum of 255 segments available per page, if we require more than + // is left in the segment table, we'll have to split the packet into multiple pages. + let segments_required = (packet.len() / 255) + 1; + ctx.flags.packet_spans_multiple_pages = segments_required > remaining_segments; + + // We need to indicate whether or not any packet was finished on this page. + // This is used for the absolute granule position. + if packet.is_empty() { + ctx.flags.packet_finished_on_page = true; + } + + // The first packet of the bitstream must have its own page, unlike any other packet. + let first_packet_finished_on_page = ctx.flags.first_page + && ctx.header_flags & CONTAINS_FIRST_PAGE_OF_BITSTREAM != 0 + && ctx.flags.packet_finished_on_page; + + if first_packet_finished_on_page + // We've completely filled this page, we need to flush before moving on + || (ctx.remaining_page_size == 0 || remaining_segments == 0) + { + ctx.flush(&mut page_content, &mut segments); + } + + ctx.flags.first_page = false; + ctx.flags.fresh_packet = false; + } + + // Flush any content leftover + if !page_content.is_empty() { + ctx.flush(&mut page_content, &mut segments); + } + + if flags & CONTAINS_LAST_PAGE_OF_BITSTREAM == 0x04 { + if let Some(last) = ctx.pages.last_mut() { + last.header.header_type_flag |= CONTAINS_LAST_PAGE_OF_BITSTREAM; + } + } + + Ok(ctx.pages) +}