extract decode pipeline

This commit is contained in:
Hailey Somerville 2023-12-29 20:29:25 +11:00
parent 5ea6a9ef6c
commit c3aad89049
4 changed files with 149 additions and 107 deletions

View file

@ -1,2 +1,4 @@
pub mod pipeline;
pub mod queue;
pub mod resample;
pub mod timing;

View file

@ -0,0 +1,70 @@
use bark_protocol::FRAMES_PER_PACKET;
use bytemuck::Zeroable;
use bark_protocol::packet::Audio;
use bark_protocol::types::AudioPacketHeader;
use crate::audio::Frame;
use crate::decode::Decoder;
use crate::receive::resample::Resampler;
use crate::receive::timing::{RateAdjust, Timing};
pub struct Pipeline {
/// None indicates error creating decoder, we cannot decode this stream
decoder: Option<Decoder>,
resampler: Resampler,
rate_adjust: RateAdjust,
}
impl Pipeline {
pub fn new(header: &AudioPacketHeader) -> Self {
let decoder = match Decoder::new(header) {
Ok(dec) => {
log::info!("instantiated decoder for new stream: {}", dec.describe());
Some(dec)
}
Err(err) => {
log::error!("error creating decoder for new stream: {err}");
None
}
};
Pipeline {
decoder,
resampler: Resampler::new(),
rate_adjust: RateAdjust::new(),
}
}
pub fn slew(&self) -> bool {
self.rate_adjust.slew()
}
pub fn set_timing(&mut self, timing: Timing) {
let rate = self.rate_adjust.sample_rate(timing);
let _ = self.resampler.set_input_rate(rate.0);
}
pub fn process(&mut self, packet: Option<&Audio>, out: &mut [Frame]) -> usize {
// decode packet
let mut decode_buffer = [Frame::zeroed(); FRAMES_PER_PACKET];
if let Some(decoder) = self.decoder.as_mut() {
match decoder.decode(packet, &mut decode_buffer) {
Ok(()) => {}
Err(e) => {
log::warn!("error in decoder, skipping packet: {e}");
decode_buffer.fill(Frame::zeroed());
}
}
}
// resample decoded audio
let resample = self.resampler.process(&decode_buffer, out)
.expect("resample error!");
assert_eq!(resample.input_read.0, decode_buffer.len());
resample.output_written.0
}
}

View file

@ -0,0 +1,67 @@
use core::time::Duration;
use bark_protocol::time::{Timestamp, SampleDuration};
use bark_protocol::SampleRate;
pub struct RateAdjust {
slew: bool,
}
#[derive(Copy, Clone)]
pub struct Timing {
pub real: Timestamp,
pub play: Timestamp,
}
impl RateAdjust {
pub fn new() -> Self {
RateAdjust {
slew: false
}
}
pub fn slew(&self) -> bool {
self.slew
}
pub fn sample_rate(&mut self, timing: Timing) -> SampleRate {
self.adjusted_rate(timing).unwrap_or(bark_protocol::SAMPLE_RATE)
}
fn adjusted_rate(&mut self, timing: Timing) -> Option<SampleRate> {
// parameters, maybe these could be cli args?
let start_slew_threshold = Duration::from_micros(2000);
let stop_slew_threshold = Duration::from_micros(1000);
let slew_target_duration = Duration::from_millis(500);
// turn them into native units
let start_slew_threshold = SampleDuration::from_std_duration_lossy(start_slew_threshold);
let stop_slew_threshold = SampleDuration::from_std_duration_lossy(stop_slew_threshold);
let frame_offset = timing.real.delta(timing.play);
if frame_offset.abs() < stop_slew_threshold {
self.slew = false;
return None;
}
if frame_offset.abs() < start_slew_threshold && !self.slew {
return None;
}
let slew_duration_duration = i64::try_from(slew_target_duration.as_micros()).unwrap();
let base_sample_rate = i64::from(bark_protocol::SAMPLE_RATE);
let rate_offset = frame_offset.as_frames() * 1_000_000 / slew_duration_duration;
let rate = base_sample_rate + rate_offset;
// clamp any potential slow down to 1%, we shouldn't ever get too far
// ahead of the stream
let rate = std::cmp::max(base_sample_rate * 99 / 100, rate);
// let the speed up run much higher, but keep it reasonable still
let rate = std::cmp::min(base_sample_rate * 101 / 100, rate);
self.slew = true;
Some(SampleRate(u32::try_from(rate).unwrap()))
}
}

View file

@ -3,14 +3,14 @@ use std::sync::{Arc, Mutex};
use std::time::Duration;
use bark_core::audio::Frame;
use bark_core::decode::{Decoder, FrameBuffer};
use bark_core::receive::pipeline::Pipeline;
use bark_core::receive::timing::Timing;
use bytemuck::Zeroable;
use structopt::StructOpt;
use bark_core::receive::queue::PacketQueue;
use bark_core::receive::resample::Resampler;
use bark_protocol::{SampleRate, FRAMES_PER_PACKET};
use bark_protocol::FRAMES_PER_PACKET;
use bark_protocol::time::{Timestamp, SampleDuration, TimestampDelta, ClockDelta};
use bark_protocol::types::{SessionId, ReceiverId, TimePhase, AudioPacketHeader};
use bark_protocol::types::stats::receiver::{ReceiverStats, StreamStatus};
@ -32,37 +32,20 @@ struct Stream {
latency: Aggregate<Duration>,
clock_delta: Aggregate<ClockDelta>,
queue: PacketQueue,
/// None indicates error creating decoder, we cannot decode this stream
decoder: Option<Decoder>,
resampler: Resampler,
rate_adjust: RateAdjust,
pipeline: Pipeline,
}
impl Stream {
pub fn new(header: &AudioPacketHeader) -> Self {
let queue = PacketQueue::new(header);
let decoder = match Decoder::new(header) {
Ok(dec) => {
log::info!("instantiated decoder for new stream: {}", dec.describe());
Some(dec)
}
Err(err) => {
log::error!("error creating decoder for new stream: {err}");
None
}
};
let resampler = Resampler::new();
Stream {
sid: header.sid,
latency: Aggregate::new(),
clock_delta: Aggregate::new(),
queue,
decoder,
resampler,
rate_adjust: RateAdjust::new(),
pipeline: Pipeline::new(header),
}
}
@ -190,11 +173,9 @@ impl Receiver {
// adjust resampler rate based on stream timing info
if let Some(timing) = timing {
let rate = stream.rate_adjust.sample_rate(timing);
stream.pipeline.set_timing(timing);
let _ = stream.resampler.set_input_rate(rate.0);
if stream.rate_adjust.slew() {
if stream.pipeline.slew() {
self.stats.set_stream(StreamStatus::Slew);
} else {
self.stats.set_stream(StreamStatus::Sync);
@ -203,93 +184,15 @@ impl Receiver {
self.stats.set_audio_latency(timing.real, timing.play);
}
// decode packet
let mut decode_buffer: FrameBuffer = array::from_fn(|_| Frame::zeroed());
if let Some(decoder) = stream.decoder.as_mut() {
match decoder.decode(packet.as_ref(), &mut decode_buffer) {
Ok(()) => {}
Err(e) => {
log::warn!("error in decoder, skipping packet: {e}");
decode_buffer.fill(Frame::zeroed());
}
}
}
// resample decoded audio
let resample = stream.resampler.process(&decode_buffer, buffer)
.expect("resample error!");
assert_eq!(resample.input_read.0, decode_buffer.len());
// pass packet through decode pipeline
let frames = stream.pipeline.process(packet.as_ref(), buffer);
// report stats and return
self.stats.set_buffer_length(
SampleDuration::from_frame_count(
(FRAMES_PER_PACKET * stream.queue.len()).try_into().unwrap()));
resample.output_written.0
}
}
struct RateAdjust {
slew: bool,
}
#[derive(Copy, Clone)]
pub struct Timing {
pub real: Timestamp,
pub play: Timestamp,
}
impl RateAdjust {
pub fn new() -> Self {
RateAdjust {
slew: false
}
}
pub fn slew(&self) -> bool {
self.slew
}
pub fn sample_rate(&mut self, timing: Timing) -> SampleRate {
self.adjusted_rate(timing).unwrap_or(bark_protocol::SAMPLE_RATE)
}
fn adjusted_rate(&mut self, timing: Timing) -> Option<SampleRate> {
// parameters, maybe these could be cli args?
let start_slew_threshold = Duration::from_micros(2000);
let stop_slew_threshold = Duration::from_micros(1000);
let slew_target_duration = Duration::from_millis(500);
// turn them into native units
let start_slew_threshold = SampleDuration::from_std_duration_lossy(start_slew_threshold);
let stop_slew_threshold = SampleDuration::from_std_duration_lossy(stop_slew_threshold);
let frame_offset = timing.real.delta(timing.play);
if frame_offset.abs() < stop_slew_threshold {
self.slew = false;
return None;
}
if frame_offset.abs() < start_slew_threshold && !self.slew {
return None;
}
let slew_duration_duration = i64::try_from(slew_target_duration.as_micros()).unwrap();
let base_sample_rate = i64::from(bark_protocol::SAMPLE_RATE);
let rate_offset = frame_offset.as_frames() * 1_000_000 / slew_duration_duration;
let rate = base_sample_rate + rate_offset;
// clamp any potential slow down to 1%, we shouldn't ever get too far
// ahead of the stream
let rate = std::cmp::max(base_sample_rate * 99 / 100, rate);
// let the speed up run much higher, but keep it reasonable still
let rate = std::cmp::min(base_sample_rate * 101 / 100, rate);
self.slew = true;
Some(SampleRate(u32::try_from(rate).unwrap()))
frames
}
}