pull decoding off onto its own thread

This commit is contained in:
Hailey Somerville 2024-06-22 11:43:41 +10:00
parent 0358d4d9ec
commit d465c30bae
9 changed files with 163 additions and 58 deletions

View file

@ -9,7 +9,7 @@ use bark_protocol::time::{SampleDuration, Timestamp};
use crate::consts::MAX_QUEUED_DECODE_SEGMENTS;
pub struct PacketQueue {
queue: Deque<Option<Audio>, MAX_QUEUED_DECODE_SEGMENTS>,
queue: Deque<Option<AudioPts>, MAX_QUEUED_DECODE_SEGMENTS>,
/// The seq of the first packet in the queue, the rest are implied
head_seq: u64,
/// We delay yielding packets when a queue is first started (or reset), to
@ -18,6 +18,19 @@ pub struct PacketQueue {
start: DelayStart,
}
#[derive(Debug)]
pub struct AudioPts {
/// translated into local time:
pub pts: Timestamp,
pub audio: Audio,
}
impl AudioPts {
pub fn header(&self) -> &AudioPacketHeader {
self.audio.header()
}
}
enum NoSlot {
InPast,
TooFarInFuture,
@ -32,7 +45,7 @@ impl PacketQueue {
}
}
pub fn pop_front(&mut self) -> Option<Audio> {
pub fn pop_front(&mut self) -> Option<AudioPts> {
if self.start.yield_packet() {
self.head_seq += 1;
self.queue.pop_front().flatten()
@ -41,7 +54,7 @@ impl PacketQueue {
}
}
pub fn insert_packet(&mut self, packet: Audio) {
pub fn insert_packet(&mut self, packet: AudioPts) {
let packet_seq = packet.header().seq;
let head_seq = self.head_seq;
let tail_seq = self.head_seq + self.queue.capacity() as u64;
@ -69,7 +82,7 @@ impl PacketQueue {
}
}
fn queue_slot_mut(&mut self, seq: u64) -> Result<&mut Option<Audio>, NoSlot> {
fn queue_slot_mut(&mut self, seq: u64) -> Result<&mut Option<AudioPts>, NoSlot> {
let idx = seq.checked_sub(self.head_seq).ok_or(NoSlot::InPast)? as usize;
if idx >= self.queue.capacity() {

View file

@ -13,8 +13,8 @@ pub struct Input {
}
impl Input {
pub fn new(opt: DeviceOpt) -> Result<Self, OpenError> {
let pcm = config::open_pcm(&opt, Direction::Capture)?;
pub fn new(opt: &DeviceOpt) -> Result<Self, OpenError> {
let pcm = config::open_pcm(opt, Direction::Capture)?;
Ok(Input { pcm })
}

View file

@ -12,8 +12,8 @@ pub struct Output {
}
impl Output {
pub fn new(opt: DeviceOpt) -> Result<Self, OpenError> {
let pcm = config::open_pcm(&opt, Direction::Playback)?;
pub fn new(opt: &DeviceOpt) -> Result<Self, OpenError> {
let pcm = config::open_pcm(opt, Direction::Playback)?;
Ok(Output { pcm })
}

View file

@ -24,7 +24,7 @@ pub struct Input {
}
impl Input {
pub fn new(opt: DeviceOpt) -> Result<Self, OpenError> {
pub fn new(opt: &DeviceOpt) -> Result<Self, OpenError> {
Ok(Input {
alsa: alsa::input::Input::new(opt)?,
})
@ -40,7 +40,7 @@ pub struct Output {
}
impl Output {
pub fn new(opt: DeviceOpt) -> Result<Self, OpenError> {
pub fn new(opt: &DeviceOpt) -> Result<Self, OpenError> {
Ok(Output {
alsa: alsa::output::Output::new(opt)?,
})

View file

@ -8,46 +8,51 @@ use bark_core::receive::timing::Timing;
use bytemuck::Zeroable;
use structopt::StructOpt;
use bark_core::receive::queue::PacketQueue;
use bark_core::receive::queue::{AudioPts, PacketQueue};
use bark_protocol::FRAMES_PER_PACKET;
use bark_protocol::time::{Timestamp, SampleDuration, TimestampDelta, ClockDelta};
use bark_protocol::types::{SessionId, ReceiverId, TimePhase, AudioPacketHeader};
use bark_protocol::types::{AudioPacketHeader, ReceiverId, SessionId, TimePhase, TimestampMicros};
use bark_protocol::types::stats::receiver::{ReceiverStats, StreamStatus};
use bark_protocol::packet::{Audio, Time, PacketKind, StatsReply};
use crate::audio::config::{DEFAULT_PERIOD, DEFAULT_BUFFER, DeviceOpt};
use crate::audio::Output;
use crate::config::Device;
use crate::receive::output::OutputRef;
use crate::receive::stream::Stream as ReceiveStream;
use crate::socket::{ProtocolSocket, Socket, SocketOpt};
use crate::{time, stats, thread};
use crate::RunError;
use self::output::OwnedOutput;
mod output;
mod queue;
mod stream;
pub struct Receiver {
stats: ReceiverStats,
stream: Option<Stream>,
output: OwnedOutput,
}
struct Stream {
sid: SessionId,
latency: Aggregate<Duration>,
clock_delta: Aggregate<ClockDelta>,
queue: PacketQueue,
pipeline: Pipeline,
stream: ReceiveStream,
}
impl Stream {
pub fn new(header: &AudioPacketHeader) -> Self {
let queue = PacketQueue::new(header);
pub fn new(header: &AudioPacketHeader, output: OutputRef) -> Self {
let stream = ReceiveStream::new(header, output);
Stream {
sid: header.sid,
latency: Aggregate::new(),
clock_delta: Aggregate::new(),
queue,
pipeline: Pipeline::new(header),
stream,
}
}
@ -63,10 +68,11 @@ impl Stream {
}
impl Receiver {
pub fn new() -> Self {
pub fn new(output: Output) -> Self {
Receiver {
stream: None,
stats: ReceiverStats::new(),
output: OwnedOutput::new(output),
}
}
@ -115,9 +121,12 @@ impl Receiver {
};
if new_stream {
// start new stream
let stream = Stream::new(header, self.output.steal());
// new stream is taking over! switch over to it
log::info!("new stream beginning: sid={}", header.sid.0);
self.stream = Some(Stream::new(header));
self.stream = Some(stream);
self.stats.clear();
}
@ -127,10 +136,25 @@ impl Receiver {
pub fn receive_audio(&mut self, packet: Audio) {
let now = time::now();
let packet_dts = packet.header().dts;
let header = packet.header();
let stream = self.prepare_stream(header);
let stream = self.prepare_stream(packet.header());
stream.queue.insert_packet(packet);
let packet_dts = header.dts;
// translate presentation timestamp of this packet:
let pts = Timestamp::from_micros_lossy(header.pts);
let pts = stream.adjust_pts(pts).unwrap_or_else(|| {
// if we don't yet have the clock information to adjust timestamps,
// default to packet pts-dts, added to our current local time
let stream_delay = header.pts.0.saturating_sub(header.dts.0);
Timestamp::from_micros_lossy(TimestampMicros(now.0 + stream_delay))
});
// TODO - this is where we would take buffer length stats
stream.stream.send(AudioPts {
pts,
audio: packet,
});
if let Some(latency) = stream.network_latency() {
if let Some(clock_delta) = stream.clock_delta.median() {
@ -143,6 +167,7 @@ impl Receiver {
}
}
/*
pub fn write_audio(&mut self, buffer: &mut [Frame], pts: Timestamp) -> usize {
// get stream start timing information:
let Some(stream) = self.stream.as_mut() else {
@ -152,15 +177,11 @@ impl Receiver {
};
// get next packet from queue, or None if missing (packet loss)
let packet = stream.queue.pop_front();
let queue_item = stream.queue.pop_front();
// calculate stream timing from packet timing info if present
let header_pts = packet.as_ref()
.map(|packet| packet.header().pts)
.map(Timestamp::from_micros_lossy);
let stream_pts = header_pts
.and_then(|header_pts| stream.adjust_pts(header_pts));
let (packet, stream_pts) = queue_item.as_ref()
.map(|item| (Some(&item.audio), Some(item.pts)))
.unwrap_or_default();
let timing = stream_pts.map(|stream_pts| Timing {
real: pts,
@ -181,7 +202,7 @@ impl Receiver {
}
// pass packet through decode pipeline
let frames = stream.pipeline.process(packet.as_ref(), buffer);
let frames = stream.pipeline.process(packet, buffer);
// report stats and return
self.stats.set_buffer_length(
@ -190,6 +211,7 @@ impl Receiver {
frames
}
*/
}
struct Aggregate<T> {
@ -249,7 +271,7 @@ pub fn run(opt: ReceiveOpt) -> Result<(), RunError> {
pub recv: Receiver,
}
let output = Output::new(DeviceOpt {
let output = Output::new(&DeviceOpt {
device: opt.output_device,
period: opt.output_period
.map(SampleDuration::from_frame_count)
@ -260,9 +282,10 @@ pub fn run(opt: ReceiveOpt) -> Result<(), RunError> {
}).map_err(RunError::OpenAudioDevice)?;
let state = Arc::new(Mutex::new(SharedState {
recv: Receiver::new(),
recv: Receiver::new(output),
}));
/*
std::thread::spawn({
let state = state.clone();
move || {
@ -297,6 +320,7 @@ pub fn run(opt: ReceiveOpt) -> Result<(), RunError> {
}
}
});
*/
let socket = Socket::open(opt.socket)
.map_err(RunError::Listen)?;

View file

@ -0,0 +1,58 @@
use std::ops::{Deref, DerefMut};
use std::sync::{Arc, Mutex, MutexGuard};
use crate::audio::Output;
pub struct OwnedOutput {
output: Arc<Mutex<Option<Output>>>,
}
impl OwnedOutput {
pub fn new(output: Output) -> Self {
Self { output: Arc::new(Mutex::new(Some(output))) }
}
/// TODO - this may block for the duration of an alsa_pcm_write
/// fix this
pub fn steal(&mut self) -> OutputRef {
let output = self.output.lock().unwrap().take();
self.output = Arc::new(Mutex::new(output));
OutputRef { output: self.output.clone() }
}
}
#[derive(Clone)]
pub struct OutputRef {
output: Arc<Mutex<Option<Output>>>,
}
impl OutputRef {
pub fn lock(&self) -> Option<OutputLock> {
let guard = self.output.lock().unwrap();
if guard.is_some() {
Some(OutputLock { guard })
} else {
None
}
}
}
pub struct OutputLock<'a> {
guard: MutexGuard<'a, Option<Output>>,
}
impl<'a> Deref for OutputLock<'a> {
type Target = Output;
fn deref(&self) -> &Self::Target {
self.guard.as_ref().unwrap()
}
}
impl<'a> DerefMut for OutputLock<'a> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.guard.as_mut().unwrap()
}
}

View file

@ -1,7 +1,6 @@
use std::sync::{Arc, Condvar, Mutex};
use bark_core::receive::queue::PacketQueue;
use bark_protocol::packet::Audio;
use bark_core::receive::queue::{PacketQueue, AudioPts};
pub struct QueueSender {
shared: Arc<Shared>,
@ -32,7 +31,7 @@ pub fn channel(queue: PacketQueue) -> (QueueSender, QueueReceiver) {
pub struct Disconnected;
impl QueueSender {
pub fn send(&self, packet: Audio) -> Result<usize, Disconnected> {
pub fn send(&self, packet: AudioPts) -> Result<usize, Disconnected> {
let mut queue = self.shared.queue.lock().unwrap();
let Some(queue) = queue.as_mut() else {
@ -48,7 +47,7 @@ impl QueueSender {
}
impl QueueReceiver {
pub fn recv(&self) -> Result<Option<Audio>, Disconnected> {
pub fn recv(&self) -> Result<Option<AudioPts>, Disconnected> {
let mut queue_lock = self.shared.queue.lock().unwrap();
loop {

View file

@ -1,12 +1,13 @@
use std::{thread, time::Duration};
use std::{sync::Mutex, thread};
use bark_core::{audio::Frame, receive::{pipeline::Pipeline, queue::PacketQueue, timing::Timing}};
use bark_core::{audio::Frame, receive::{pipeline::Pipeline, queue::{AudioPts, PacketQueue}, timing::Timing}};
use bark_protocol::{time::{ClockDelta, SampleDuration, Timestamp, TimestampDelta}, types::{stats::receiver::StreamStatus, AudioPacketHeader, SessionId}, FRAMES_PER_PACKET};
use bytemuck::Zeroable;
use crate::{audio::Output, time};
use super::{queue::{self, Disconnected, QueueReceiver, QueueSender}, Aggregate};
use crate::time;
use crate::receive::output::OutputRef;
use crate::receive::queue::{self, Disconnected, QueueReceiver, QueueSender};
use crate::receive::Aggregate;
pub struct Stream {
tx: QueueSender,
@ -14,7 +15,7 @@ pub struct Stream {
}
impl Stream {
pub fn new(header: &AudioPacketHeader, output: Output) -> Self {
pub fn new(header: &AudioPacketHeader, output: OutputRef) -> Self {
let queue = PacketQueue::new(header);
let (tx, rx) = queue::channel(queue);
@ -34,13 +35,21 @@ impl Stream {
sid: header.sid,
}
}
pub fn session_id(&self) -> SessionId {
self.sid
}
pub fn send(&self, audio: AudioPts) -> Result<usize, Disconnected> {
self.tx.send(audio)
}
}
struct StreamState {
clock_delta: Aggregate<ClockDelta>,
queue: QueueReceiver,
pipeline: Pipeline,
output: Output,
output: OutputRef,
}
pub struct StreamStats {
@ -64,18 +73,28 @@ fn run_stream(mut stream: StreamState) {
loop {
// get next packet from queue, or None if missing (packet loss)
let packet = match stream.queue.recv() {
let queue_item = match stream.queue.recv() {
Ok(rx) => rx,
Err(_) => { return; } // disconnected
};
let (packet, stream_pts) = queue_item.as_ref()
.map(|item| (Some(&item.audio), Some(item.pts)))
.unwrap_or_default();
// pass packet through decode pipeline
let mut buffer = [Frame::zeroed(); FRAMES_PER_PACKET * 2];
let frames = stream.pipeline.process(packet.as_ref(), &mut buffer);
let frames = stream.pipeline.process(packet, &mut buffer);
let buffer = &buffer[0..frames];
// lock output
let Some(output) = stream.output.lock() else {
// output has been stolen from us, exit thread
break;
};
// get current output delay
let delay = stream.output.delay().unwrap();
let delay = output.delay().unwrap();
stats.output_latency = delay;
// calculate presentation timestamp based on output delay
@ -83,14 +102,6 @@ fn run_stream(mut stream: StreamState) {
let pts = Timestamp::from_micros_lossy(pts);
let pts = pts.add(delay);
// calculate stream timing from packet timing info if present
let header_pts = packet.as_ref()
.map(|packet| packet.header().pts)
.map(Timestamp::from_micros_lossy);
let stream_pts = header_pts
.and_then(|header_pts| adjust_pts(&stream, header_pts));
let timing = stream_pts.map(|stream_pts| Timing {
real: pts,
play: stream_pts,
@ -110,7 +121,7 @@ fn run_stream(mut stream: StreamState) {
}
// send audio to ALSA
match stream.output.write(buffer) {
match output.write(buffer) {
Ok(()) => {}
Err(e) => {
log::error!("error playing audio: {e}");

View file

@ -54,7 +54,7 @@ pub struct StreamOpt {
}
pub fn run(opt: StreamOpt) -> Result<(), RunError> {
let input = Input::new(DeviceOpt {
let input = Input::new(&DeviceOpt {
device: opt.input_device,
period: opt.input_period
.map(SampleDuration::from_frame_count)