mirror of
https://github.com/haileys/bark
synced 2025-03-16 22:57:00 +00:00
Merge branch 'stream-thread'
This commit is contained in:
commit
ae63ff6dd5
12 changed files with 391 additions and 139 deletions
|
@ -9,7 +9,7 @@ use bark_protocol::time::{SampleDuration, Timestamp};
|
|||
use crate::consts::MAX_QUEUED_DECODE_SEGMENTS;
|
||||
|
||||
pub struct PacketQueue {
|
||||
queue: Deque<Option<Audio>, MAX_QUEUED_DECODE_SEGMENTS>,
|
||||
queue: Deque<Option<AudioPts>, MAX_QUEUED_DECODE_SEGMENTS>,
|
||||
/// The seq of the first packet in the queue, the rest are implied
|
||||
head_seq: u64,
|
||||
/// We delay yielding packets when a queue is first started (or reset), to
|
||||
|
@ -18,6 +18,19 @@ pub struct PacketQueue {
|
|||
start: DelayStart,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct AudioPts {
|
||||
/// translated into local time:
|
||||
pub pts: Timestamp,
|
||||
pub audio: Audio,
|
||||
}
|
||||
|
||||
impl AudioPts {
|
||||
pub fn header(&self) -> &AudioPacketHeader {
|
||||
self.audio.header()
|
||||
}
|
||||
}
|
||||
|
||||
enum NoSlot {
|
||||
InPast,
|
||||
TooFarInFuture,
|
||||
|
@ -32,7 +45,7 @@ impl PacketQueue {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn pop_front(&mut self) -> Option<Audio> {
|
||||
pub fn pop_front(&mut self) -> Option<AudioPts> {
|
||||
if self.start.yield_packet() {
|
||||
self.head_seq += 1;
|
||||
self.queue.pop_front().flatten()
|
||||
|
@ -41,7 +54,7 @@ impl PacketQueue {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn insert_packet(&mut self, packet: Audio) {
|
||||
pub fn insert_packet(&mut self, packet: AudioPts) {
|
||||
let packet_seq = packet.header().seq;
|
||||
let head_seq = self.head_seq;
|
||||
let tail_seq = self.head_seq + self.queue.capacity() as u64;
|
||||
|
@ -69,7 +82,7 @@ impl PacketQueue {
|
|||
}
|
||||
}
|
||||
|
||||
fn queue_slot_mut(&mut self, seq: u64) -> Result<&mut Option<Audio>, NoSlot> {
|
||||
fn queue_slot_mut(&mut self, seq: u64) -> Result<&mut Option<AudioPts>, NoSlot> {
|
||||
let idx = seq.checked_sub(self.head_seq).ok_or(NoSlot::InPast)? as usize;
|
||||
|
||||
if idx >= self.queue.capacity() {
|
||||
|
|
|
@ -123,6 +123,10 @@ impl ClockDelta {
|
|||
pub struct TimestampDelta(i64);
|
||||
|
||||
impl TimestampDelta {
|
||||
pub fn zero() -> TimestampDelta {
|
||||
TimestampDelta(0)
|
||||
}
|
||||
|
||||
pub fn from_clock_delta_lossy(delta: ClockDelta) -> TimestampDelta {
|
||||
TimestampDelta((delta.0 * i64::from(SAMPLE_RATE.0)) / 1_000_000)
|
||||
}
|
||||
|
@ -134,4 +138,8 @@ impl TimestampDelta {
|
|||
pub fn as_frames(&self) -> i64 {
|
||||
self.0
|
||||
}
|
||||
|
||||
pub fn to_seconds(&self) -> f64 {
|
||||
self.0 as f64 / f64::from(SAMPLE_RATE)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use bitflags::bitflags;
|
||||
use bytemuck::{Zeroable, Pod};
|
||||
|
||||
use crate::time::{SampleDuration, Timestamp};
|
||||
use crate::time::{SampleDuration, TimestampDelta};
|
||||
|
||||
#[derive(Debug, Clone, Copy, Zeroable, Pod)]
|
||||
#[repr(C)]
|
||||
|
@ -17,6 +17,7 @@ pub struct ReceiverStats {
|
|||
predict_offset: f64,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
pub enum StreamStatus {
|
||||
Seek,
|
||||
Sync,
|
||||
|
@ -108,11 +109,8 @@ impl ReceiverStats {
|
|||
self.field(ReceiverStatsFlags::HAS_PREDICT_OFFSET, self.predict_offset)
|
||||
}
|
||||
|
||||
pub fn set_audio_latency(&mut self, request_pts: Timestamp, packet_pts: Timestamp) {
|
||||
let request_micros = request_pts.to_micros_lossy().0 as f64;
|
||||
let packet_micros = packet_pts.to_micros_lossy().0 as f64;
|
||||
|
||||
self.audio_latency = (request_micros - packet_micros) / 1_000_000.0;
|
||||
pub fn set_audio_latency(&mut self, delta: TimestampDelta) {
|
||||
self.audio_latency = delta.to_seconds();
|
||||
self.flags.insert(ReceiverStatsFlags::HAS_AUDIO_LATENCY);
|
||||
}
|
||||
|
||||
|
|
|
@ -13,8 +13,8 @@ pub struct Input {
|
|||
}
|
||||
|
||||
impl Input {
|
||||
pub fn new(opt: DeviceOpt) -> Result<Self, OpenError> {
|
||||
let pcm = config::open_pcm(&opt, Direction::Capture)?;
|
||||
pub fn new(opt: &DeviceOpt) -> Result<Self, OpenError> {
|
||||
let pcm = config::open_pcm(opt, Direction::Capture)?;
|
||||
Ok(Input { pcm })
|
||||
}
|
||||
|
||||
|
|
|
@ -12,8 +12,8 @@ pub struct Output {
|
|||
}
|
||||
|
||||
impl Output {
|
||||
pub fn new(opt: DeviceOpt) -> Result<Self, OpenError> {
|
||||
let pcm = config::open_pcm(&opt, Direction::Playback)?;
|
||||
pub fn new(opt: &DeviceOpt) -> Result<Self, OpenError> {
|
||||
let pcm = config::open_pcm(opt, Direction::Playback)?;
|
||||
Ok(Output { pcm })
|
||||
}
|
||||
|
||||
|
|
|
@ -24,7 +24,7 @@ pub struct Input {
|
|||
}
|
||||
|
||||
impl Input {
|
||||
pub fn new(opt: DeviceOpt) -> Result<Self, OpenError> {
|
||||
pub fn new(opt: &DeviceOpt) -> Result<Self, OpenError> {
|
||||
Ok(Input {
|
||||
alsa: alsa::input::Input::new(opt)?,
|
||||
})
|
||||
|
@ -40,7 +40,7 @@ pub struct Output {
|
|||
}
|
||||
|
||||
impl Output {
|
||||
pub fn new(opt: DeviceOpt) -> Result<Self, OpenError> {
|
||||
pub fn new(opt: &DeviceOpt) -> Result<Self, OpenError> {
|
||||
Ok(Output {
|
||||
alsa: alsa::output::Output::new(opt)?,
|
||||
})
|
||||
|
|
|
@ -31,6 +31,8 @@ pub enum RunError {
|
|||
Receive(std::io::Error),
|
||||
#[error("opening encoder: {0}")]
|
||||
OpenEncoder(#[from] bark_core::encode::NewEncoderError),
|
||||
#[error("{0}")]
|
||||
Disconnected(#[from] receive::queue::Disconnected),
|
||||
}
|
||||
|
||||
fn main() -> Result<(), ExitCode> {
|
||||
|
|
|
@ -1,51 +1,55 @@
|
|||
use std::array;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
|
||||
use bark_core::audio::Frame;
|
||||
use bark_core::receive::pipeline::Pipeline;
|
||||
use bark_core::receive::timing::Timing;
|
||||
use bytemuck::Zeroable;
|
||||
use structopt::StructOpt;
|
||||
|
||||
use bark_core::receive::queue::PacketQueue;
|
||||
use bark_core::receive::queue::AudioPts;
|
||||
|
||||
use bark_protocol::FRAMES_PER_PACKET;
|
||||
use bark_protocol::time::{Timestamp, SampleDuration, TimestampDelta, ClockDelta};
|
||||
use bark_protocol::types::{SessionId, ReceiverId, TimePhase, AudioPacketHeader};
|
||||
use bark_protocol::types::stats::receiver::{ReceiverStats, StreamStatus};
|
||||
use bark_protocol::types::{AudioPacketHeader, ReceiverId, SessionId, TimePhase, TimestampMicros};
|
||||
use bark_protocol::types::stats::receiver::ReceiverStats;
|
||||
use bark_protocol::packet::{Audio, Time, PacketKind, StatsReply};
|
||||
|
||||
use crate::audio::config::{DEFAULT_PERIOD, DEFAULT_BUFFER, DeviceOpt};
|
||||
use crate::audio::Output;
|
||||
use crate::receive::output::OutputRef;
|
||||
use crate::socket::{ProtocolSocket, Socket, SocketOpt};
|
||||
use crate::{time, stats, thread};
|
||||
use crate::RunError;
|
||||
|
||||
use self::output::OwnedOutput;
|
||||
use self::queue::Disconnected;
|
||||
use self::stream::DecodeStream;
|
||||
|
||||
pub mod output;
|
||||
pub mod queue;
|
||||
pub mod stream;
|
||||
|
||||
pub struct Receiver {
|
||||
stats: ReceiverStats,
|
||||
stream: Option<Stream>,
|
||||
output: OwnedOutput,
|
||||
}
|
||||
|
||||
struct Stream {
|
||||
sid: SessionId,
|
||||
decode: DecodeStream,
|
||||
latency: Aggregate<Duration>,
|
||||
clock_delta: Aggregate<ClockDelta>,
|
||||
queue: PacketQueue,
|
||||
pipeline: Pipeline,
|
||||
predict_offset: Aggregate<i64>,
|
||||
}
|
||||
|
||||
impl Stream {
|
||||
pub fn new(header: &AudioPacketHeader) -> Self {
|
||||
let queue = PacketQueue::new(header);
|
||||
|
||||
pub fn new(header: &AudioPacketHeader, output: OutputRef) -> Self {
|
||||
let decode = DecodeStream::new(header, output);
|
||||
|
||||
Stream {
|
||||
sid: header.sid,
|
||||
decode,
|
||||
latency: Aggregate::new(),
|
||||
clock_delta: Aggregate::new(),
|
||||
queue,
|
||||
pipeline: Pipeline::new(header),
|
||||
predict_offset: Aggregate::new(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -58,18 +62,41 @@ impl Stream {
|
|||
pub fn network_latency(&self) -> Option<Duration> {
|
||||
self.latency.median()
|
||||
}
|
||||
|
||||
pub fn predict_offset(&self) -> Option<i64> {
|
||||
self.predict_offset.median()
|
||||
}
|
||||
}
|
||||
|
||||
impl Receiver {
|
||||
pub fn new() -> Self {
|
||||
pub fn new(output: Output) -> Self {
|
||||
Receiver {
|
||||
stream: None,
|
||||
stats: ReceiverStats::new(),
|
||||
output: OwnedOutput::new(output),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn stats(&self) -> &ReceiverStats {
|
||||
&self.stats
|
||||
pub fn stats(&self) -> ReceiverStats {
|
||||
let mut stats = ReceiverStats::new();
|
||||
|
||||
if let Some(stream) = &self.stream {
|
||||
let decode = stream.decode.stats();
|
||||
stats.set_stream(decode.status);
|
||||
stats.set_buffer_length(decode.buffered);
|
||||
stats.set_audio_latency(decode.audio_latency);
|
||||
stats.set_output_latency(decode.output_latency);
|
||||
|
||||
if let Some(latency) = stream.network_latency() {
|
||||
stats.set_network_latency(latency);
|
||||
}
|
||||
|
||||
if let Some(predict) = stream.predict_offset() {
|
||||
stats.set_predict_offset(predict);
|
||||
}
|
||||
}
|
||||
|
||||
stats
|
||||
}
|
||||
|
||||
pub fn current_session(&self) -> Option<SessionId> {
|
||||
|
@ -113,22 +140,40 @@ impl Receiver {
|
|||
};
|
||||
|
||||
if new_stream {
|
||||
// start new stream
|
||||
let stream = Stream::new(header, self.output.steal());
|
||||
|
||||
// new stream is taking over! switch over to it
|
||||
log::info!("new stream beginning: sid={}", header.sid.0);
|
||||
self.stream = Some(Stream::new(header));
|
||||
self.stream = Some(stream);
|
||||
self.stats.clear();
|
||||
}
|
||||
|
||||
self.stream.as_mut().unwrap()
|
||||
}
|
||||
|
||||
pub fn receive_audio(&mut self, packet: Audio) {
|
||||
pub fn receive_audio(&mut self, packet: Audio) -> Result<(), Disconnected> {
|
||||
let now = time::now();
|
||||
|
||||
let packet_dts = packet.header().dts;
|
||||
let header = packet.header();
|
||||
let stream = self.prepare_stream(header);
|
||||
|
||||
let stream = self.prepare_stream(packet.header());
|
||||
stream.queue.insert_packet(packet);
|
||||
let packet_dts = header.dts;
|
||||
|
||||
// translate presentation timestamp of this packet:
|
||||
let pts = Timestamp::from_micros_lossy(header.pts);
|
||||
let pts = stream.adjust_pts(pts).unwrap_or_else(|| {
|
||||
// if we don't yet have the clock information to adjust timestamps,
|
||||
// default to packet pts-dts, added to our current local time
|
||||
let stream_delay = header.pts.0.saturating_sub(header.dts.0);
|
||||
Timestamp::from_micros_lossy(TimestampMicros(now.0 + stream_delay))
|
||||
});
|
||||
|
||||
// TODO - this is where we would take buffer length stats
|
||||
stream.decode.send(AudioPts {
|
||||
pts,
|
||||
audio: packet,
|
||||
})?;
|
||||
|
||||
if let Some(latency) = stream.network_latency() {
|
||||
if let Some(clock_delta) = stream.clock_delta.median() {
|
||||
|
@ -136,57 +181,11 @@ impl Receiver {
|
|||
let delta_usec = clock_delta.as_micros();
|
||||
let predict_dts = (now.0 - latency_usec).checked_add_signed(-delta_usec).unwrap();
|
||||
let predict_diff = predict_dts as i64 - packet_dts.0 as i64;
|
||||
self.stats.set_predict_offset(predict_diff)
|
||||
stream.predict_offset.observe(predict_diff);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn write_audio(&mut self, buffer: &mut [Frame], pts: Timestamp) -> usize {
|
||||
// get stream start timing information:
|
||||
let Some(stream) = self.stream.as_mut() else {
|
||||
// stream hasn't started, just fill buffer with silence and return
|
||||
buffer[0..FRAMES_PER_PACKET].fill(Frame::zeroed());
|
||||
return FRAMES_PER_PACKET;
|
||||
};
|
||||
|
||||
// get next packet from queue, or None if missing (packet loss)
|
||||
let packet = stream.queue.pop_front();
|
||||
|
||||
// calculate stream timing from packet timing info if present
|
||||
let header_pts = packet.as_ref()
|
||||
.map(|packet| packet.header().pts)
|
||||
.map(Timestamp::from_micros_lossy);
|
||||
|
||||
let stream_pts = header_pts
|
||||
.and_then(|header_pts| stream.adjust_pts(header_pts));
|
||||
|
||||
let timing = stream_pts.map(|stream_pts| Timing {
|
||||
real: pts,
|
||||
play: stream_pts,
|
||||
});
|
||||
|
||||
// adjust resampler rate based on stream timing info
|
||||
if let Some(timing) = timing {
|
||||
stream.pipeline.set_timing(timing);
|
||||
|
||||
if stream.pipeline.slew() {
|
||||
self.stats.set_stream(StreamStatus::Slew);
|
||||
} else {
|
||||
self.stats.set_stream(StreamStatus::Sync);
|
||||
}
|
||||
|
||||
self.stats.set_audio_latency(timing.real, timing.play);
|
||||
}
|
||||
|
||||
// pass packet through decode pipeline
|
||||
let frames = stream.pipeline.process(packet.as_ref(), buffer);
|
||||
|
||||
// report stats and return
|
||||
self.stats.set_buffer_length(
|
||||
SampleDuration::from_frame_count(
|
||||
(FRAMES_PER_PACKET * stream.queue.len()).try_into().unwrap()));
|
||||
|
||||
frames
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -243,11 +242,7 @@ pub fn run(opt: ReceiveOpt) -> Result<(), RunError> {
|
|||
let receiver_id = generate_receiver_id();
|
||||
let node = stats::node::get();
|
||||
|
||||
struct SharedState {
|
||||
pub recv: Receiver,
|
||||
}
|
||||
|
||||
let output = Output::new(DeviceOpt {
|
||||
let output = Output::new(&DeviceOpt {
|
||||
device: opt.output_device,
|
||||
period: opt.output_period
|
||||
.map(SampleDuration::from_frame_count)
|
||||
|
@ -257,44 +252,7 @@ pub fn run(opt: ReceiveOpt) -> Result<(), RunError> {
|
|||
.unwrap_or(DEFAULT_BUFFER),
|
||||
}).map_err(RunError::OpenAudioDevice)?;
|
||||
|
||||
let state = Arc::new(Mutex::new(SharedState {
|
||||
recv: Receiver::new(),
|
||||
}));
|
||||
|
||||
std::thread::spawn({
|
||||
let state = state.clone();
|
||||
move || {
|
||||
thread::set_name("bark/audio");
|
||||
|
||||
loop {
|
||||
let mut state = state.lock().unwrap();
|
||||
|
||||
let delay = output.delay().unwrap();
|
||||
state.recv.stats.set_output_latency(delay);
|
||||
|
||||
let pts = time::now();
|
||||
let pts = Timestamp::from_micros_lossy(pts);
|
||||
let pts = pts.add(delay);
|
||||
|
||||
// this should be large enough for `write_audio` to process an
|
||||
// entire packet with:
|
||||
let mut buffer = [Frame::zeroed(); FRAMES_PER_PACKET * 2];
|
||||
let count = state.recv.write_audio(&mut buffer, pts);
|
||||
|
||||
// drop lock before calling `Output::write` (blocking!)
|
||||
drop(state);
|
||||
|
||||
// send audio to ALSA
|
||||
match output.write(&buffer[0..count]) {
|
||||
Ok(()) => {}
|
||||
Err(e) => {
|
||||
log::error!("error playing audio: {e}");
|
||||
break;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
});
|
||||
let mut receiver = Receiver::new(output);
|
||||
|
||||
let socket = Socket::open(opt.socket)
|
||||
.map_err(RunError::Listen)?;
|
||||
|
@ -325,8 +283,8 @@ pub fn run(opt: ReceiveOpt) -> Result<(), RunError> {
|
|||
.expect("reply to time packet");
|
||||
}
|
||||
Some(TimePhase::StreamReply) => {
|
||||
let mut state = state.lock().unwrap();
|
||||
state.recv.receive_time(time);
|
||||
// let mut state = state.lock().unwrap();
|
||||
receiver.receive_time(time);
|
||||
}
|
||||
_ => {
|
||||
// not for us - must be destined for another process
|
||||
|
@ -335,14 +293,13 @@ pub fn run(opt: ReceiveOpt) -> Result<(), RunError> {
|
|||
}
|
||||
}
|
||||
Some(PacketKind::Audio(packet)) => {
|
||||
let mut state = state.lock().unwrap();
|
||||
state.recv.receive_audio(packet);
|
||||
receiver.receive_audio(packet)?;
|
||||
}
|
||||
Some(PacketKind::StatsRequest(_)) => {
|
||||
let state = state.lock().unwrap();
|
||||
let sid = state.recv.current_session().unwrap_or(SessionId::zeroed());
|
||||
let receiver = *state.recv.stats();
|
||||
drop(state);
|
||||
// let state = state.lock().unwrap();
|
||||
let sid = receiver.current_session().unwrap_or(SessionId::zeroed());
|
||||
let receiver = receiver.stats();
|
||||
println!("{:?}", receiver);
|
||||
|
||||
let reply = StatsReply::receiver(sid, receiver, node)
|
||||
.expect("allocate StatsReply packet");
|
||||
|
|
58
bark/src/receive/output.rs
Normal file
58
bark/src/receive/output.rs
Normal file
|
@ -0,0 +1,58 @@
|
|||
use std::ops::{Deref, DerefMut};
|
||||
use std::sync::{Arc, Mutex, MutexGuard};
|
||||
|
||||
use crate::audio::Output;
|
||||
|
||||
pub struct OwnedOutput {
|
||||
output: Arc<Mutex<Option<Output>>>,
|
||||
}
|
||||
|
||||
impl OwnedOutput {
|
||||
pub fn new(output: Output) -> Self {
|
||||
Self { output: Arc::new(Mutex::new(Some(output))) }
|
||||
}
|
||||
|
||||
/// TODO - this may block for the duration of an alsa_pcm_write
|
||||
/// fix this
|
||||
pub fn steal(&mut self) -> OutputRef {
|
||||
let output = self.output.lock().unwrap().take();
|
||||
self.output = Arc::new(Mutex::new(output));
|
||||
|
||||
OutputRef { output: self.output.clone() }
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct OutputRef {
|
||||
output: Arc<Mutex<Option<Output>>>,
|
||||
}
|
||||
|
||||
impl OutputRef {
|
||||
pub fn lock(&self) -> Option<OutputLock> {
|
||||
let guard = self.output.lock().unwrap();
|
||||
|
||||
if guard.is_some() {
|
||||
Some(OutputLock { guard })
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct OutputLock<'a> {
|
||||
guard: MutexGuard<'a, Option<Output>>,
|
||||
}
|
||||
|
||||
impl<'a> Deref for OutputLock<'a> {
|
||||
type Target = Output;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.guard.as_ref().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> DerefMut for OutputLock<'a> {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
self.guard.as_mut().unwrap()
|
||||
}
|
||||
}
|
69
bark/src/receive/queue.rs
Normal file
69
bark/src/receive/queue.rs
Normal file
|
@ -0,0 +1,69 @@
|
|||
use std::sync::{Arc, Condvar, Mutex};
|
||||
|
||||
use bark_core::receive::queue::{PacketQueue, AudioPts};
|
||||
use thiserror::Error;
|
||||
|
||||
pub struct QueueSender {
|
||||
shared: Arc<Shared>,
|
||||
}
|
||||
|
||||
pub struct QueueReceiver {
|
||||
shared: Arc<Shared>,
|
||||
}
|
||||
|
||||
struct Shared {
|
||||
queue: Mutex<Option<PacketQueue>>,
|
||||
notify: Condvar,
|
||||
}
|
||||
|
||||
pub fn channel(queue: PacketQueue) -> (QueueSender, QueueReceiver) {
|
||||
let shared = Arc::new(Shared {
|
||||
queue: Mutex::new(Some(queue)),
|
||||
notify: Condvar::new(),
|
||||
});
|
||||
|
||||
let tx = QueueSender { shared: shared.clone() };
|
||||
let rx = QueueReceiver { shared: shared.clone() };
|
||||
|
||||
(tx, rx)
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, Error)]
|
||||
#[error("audio receiver thread unexpectedly disconnected")]
|
||||
pub struct Disconnected;
|
||||
|
||||
impl QueueSender {
|
||||
pub fn send(&self, packet: AudioPts) -> Result<usize, Disconnected> {
|
||||
let mut queue = self.shared.queue.lock().unwrap();
|
||||
|
||||
let Some(queue) = queue.as_mut() else {
|
||||
return Err(Disconnected);
|
||||
};
|
||||
|
||||
queue.insert_packet(packet);
|
||||
|
||||
self.shared.notify.notify_all();
|
||||
|
||||
Ok(queue.len())
|
||||
}
|
||||
}
|
||||
|
||||
impl QueueReceiver {
|
||||
pub fn recv(&self) -> Result<Option<AudioPts>, Disconnected> {
|
||||
let mut queue_lock = self.shared.queue.lock().unwrap();
|
||||
|
||||
loop {
|
||||
let Some(queue) = queue_lock.as_mut() else {
|
||||
return Err(Disconnected);
|
||||
};
|
||||
|
||||
if queue.len() > 0 {
|
||||
return Ok(queue.pop_front());
|
||||
}
|
||||
|
||||
// if queue is empty we'll block until notified
|
||||
queue_lock = self.shared.notify.wait(queue_lock).unwrap();
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
147
bark/src/receive/stream.rs
Normal file
147
bark/src/receive/stream.rs
Normal file
|
@ -0,0 +1,147 @@
|
|||
use std::sync::{Arc, Mutex};
|
||||
use std::thread;
|
||||
|
||||
use bark_core::{audio::Frame, receive::{pipeline::Pipeline, queue::{AudioPts, PacketQueue}, timing::Timing}};
|
||||
use bark_protocol::time::{SampleDuration, Timestamp, TimestampDelta};
|
||||
use bark_protocol::types::{stats::receiver::StreamStatus, AudioPacketHeader, SessionId};
|
||||
use bark_protocol::FRAMES_PER_PACKET;
|
||||
use bytemuck::Zeroable;
|
||||
|
||||
use crate::time;
|
||||
use crate::receive::output::OutputRef;
|
||||
use crate::receive::queue::{self, Disconnected, QueueReceiver, QueueSender};
|
||||
|
||||
pub struct DecodeStream {
|
||||
tx: QueueSender,
|
||||
sid: SessionId,
|
||||
stats: Arc<Mutex<DecodeStats>>,
|
||||
}
|
||||
|
||||
impl DecodeStream {
|
||||
pub fn new(header: &AudioPacketHeader, output: OutputRef) -> Self {
|
||||
let queue = PacketQueue::new(header);
|
||||
let (tx, rx) = queue::channel(queue);
|
||||
|
||||
let state = State {
|
||||
queue: rx,
|
||||
pipeline: Pipeline::new(header),
|
||||
output,
|
||||
};
|
||||
|
||||
let stats = Arc::new(Mutex::new(DecodeStats::default()));
|
||||
|
||||
thread::spawn({
|
||||
let stats = stats.clone();
|
||||
move || run_stream(state, stats)
|
||||
});
|
||||
|
||||
DecodeStream {
|
||||
tx,
|
||||
sid: header.sid,
|
||||
stats,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn session_id(&self) -> SessionId {
|
||||
self.sid
|
||||
}
|
||||
|
||||
pub fn send(&self, audio: AudioPts) -> Result<usize, Disconnected> {
|
||||
self.tx.send(audio)
|
||||
}
|
||||
|
||||
pub fn stats(&self) -> DecodeStats {
|
||||
self.stats.lock().unwrap().clone()
|
||||
}
|
||||
}
|
||||
|
||||
struct State {
|
||||
queue: QueueReceiver,
|
||||
pipeline: Pipeline,
|
||||
output: OutputRef,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct DecodeStats {
|
||||
pub status: StreamStatus,
|
||||
pub buffered: SampleDuration,
|
||||
pub audio_latency: TimestampDelta,
|
||||
pub output_latency: SampleDuration,
|
||||
}
|
||||
|
||||
impl Default for DecodeStats {
|
||||
fn default() -> Self {
|
||||
DecodeStats {
|
||||
status: StreamStatus::Seek,
|
||||
buffered: SampleDuration::zero(),
|
||||
audio_latency: TimestampDelta::zero(),
|
||||
output_latency: SampleDuration::zero(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn run_stream(mut stream: State, stats_tx: Arc<Mutex<DecodeStats>>) {
|
||||
let mut stats = DecodeStats::default();
|
||||
|
||||
loop {
|
||||
// get next packet from queue, or None if missing (packet loss)
|
||||
let queue_item = match stream.queue.recv() {
|
||||
Ok(rx) => rx,
|
||||
Err(_) => { return; } // disconnected
|
||||
};
|
||||
|
||||
let (packet, stream_pts) = queue_item.as_ref()
|
||||
.map(|item| (Some(&item.audio), Some(item.pts)))
|
||||
.unwrap_or_default();
|
||||
|
||||
// pass packet through decode pipeline
|
||||
let mut buffer = [Frame::zeroed(); FRAMES_PER_PACKET * 2];
|
||||
let frames = stream.pipeline.process(packet, &mut buffer);
|
||||
let buffer = &buffer[0..frames];
|
||||
|
||||
// lock output
|
||||
let Some(output) = stream.output.lock() else {
|
||||
// output has been stolen from us, exit thread
|
||||
break;
|
||||
};
|
||||
|
||||
// get current output delay
|
||||
let delay = output.delay().unwrap();
|
||||
stats.output_latency = delay;
|
||||
|
||||
// calculate presentation timestamp based on output delay
|
||||
let pts = time::now();
|
||||
let pts = Timestamp::from_micros_lossy(pts);
|
||||
let pts = pts.add(delay);
|
||||
|
||||
let timing = stream_pts.map(|stream_pts| Timing {
|
||||
real: pts,
|
||||
play: stream_pts,
|
||||
});
|
||||
|
||||
// adjust resampler rate based on stream timing info
|
||||
if let Some(timing) = timing {
|
||||
stream.pipeline.set_timing(timing);
|
||||
|
||||
if stream.pipeline.slew() {
|
||||
stats.status = StreamStatus::Slew;
|
||||
} else {
|
||||
stats.status = StreamStatus::Sync;
|
||||
}
|
||||
|
||||
stats.audio_latency = timing.real.delta(timing.play);
|
||||
}
|
||||
|
||||
// update stats
|
||||
*stats_tx.lock().unwrap() = stats.clone();
|
||||
|
||||
// send audio to ALSA
|
||||
match output.write(buffer) {
|
||||
Ok(()) => {}
|
||||
Err(e) => {
|
||||
log::error!("error playing audio: {e}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -54,7 +54,7 @@ pub struct StreamOpt {
|
|||
}
|
||||
|
||||
pub fn run(opt: StreamOpt) -> Result<(), RunError> {
|
||||
let input = Input::new(DeviceOpt {
|
||||
let input = Input::new(&DeviceOpt {
|
||||
device: opt.input_device,
|
||||
period: opt.input_period
|
||||
.map(SampleDuration::from_frame_count)
|
||||
|
|
Loading…
Add table
Reference in a new issue