diff --git a/bark-protocol/src/time.rs b/bark-protocol/src/time.rs index 9ca50a1..4ebcaa5 100644 --- a/bark-protocol/src/time.rs +++ b/bark-protocol/src/time.rs @@ -138,4 +138,8 @@ impl TimestampDelta { pub fn as_frames(&self) -> i64 { self.0 } + + pub fn to_seconds(&self) -> f64 { + self.0 as f64 / f64::from(SAMPLE_RATE) + } } diff --git a/bark-protocol/src/types/stats/receiver.rs b/bark-protocol/src/types/stats/receiver.rs index 53d4cc2..094e812 100644 --- a/bark-protocol/src/types/stats/receiver.rs +++ b/bark-protocol/src/types/stats/receiver.rs @@ -1,7 +1,7 @@ use bitflags::bitflags; use bytemuck::{Zeroable, Pod}; -use crate::time::{SampleDuration, Timestamp}; +use crate::time::{SampleDuration, TimestampDelta}; #[derive(Debug, Clone, Copy, Zeroable, Pod)] #[repr(C)] @@ -17,6 +17,7 @@ pub struct ReceiverStats { predict_offset: f64, } +#[derive(Clone, Copy)] pub enum StreamStatus { Seek, Sync, @@ -108,11 +109,8 @@ impl ReceiverStats { self.field(ReceiverStatsFlags::HAS_PREDICT_OFFSET, self.predict_offset) } - pub fn set_audio_latency(&mut self, request_pts: Timestamp, packet_pts: Timestamp) { - let request_micros = request_pts.to_micros_lossy().0 as f64; - let packet_micros = packet_pts.to_micros_lossy().0 as f64; - - self.audio_latency = (request_micros - packet_micros) / 1_000_000.0; + pub fn set_audio_latency(&mut self, delta: TimestampDelta) { + self.audio_latency = delta.to_seconds(); self.flags.insert(ReceiverStatsFlags::HAS_AUDIO_LATENCY); } diff --git a/bark/src/receive.rs b/bark/src/receive.rs index 001e6f6..02bfbb4 100644 --- a/bark/src/receive.rs +++ b/bark/src/receive.rs @@ -14,13 +14,13 @@ use bark_protocol::packet::{Audio, Time, PacketKind, StatsReply}; use crate::audio::config::{DEFAULT_PERIOD, DEFAULT_BUFFER, DeviceOpt}; use crate::audio::Output; use crate::receive::output::OutputRef; -use crate::receive::stream::Stream as ReceiveStream; use crate::socket::{ProtocolSocket, Socket, SocketOpt}; use crate::{time, stats, thread}; use crate::RunError; use self::output::OwnedOutput; use self::queue::Disconnected; +use self::stream::DecodeStream; pub mod output; pub mod queue; @@ -34,20 +34,22 @@ pub struct Receiver { struct Stream { sid: SessionId, + decode: DecodeStream, latency: Aggregate, clock_delta: Aggregate, - stream: ReceiveStream, + predict_offset: Aggregate, } impl Stream { pub fn new(header: &AudioPacketHeader, output: OutputRef) -> Self { - let stream = ReceiveStream::new(header, output); + let decode = DecodeStream::new(header, output); Stream { sid: header.sid, + decode, latency: Aggregate::new(), clock_delta: Aggregate::new(), - stream, + predict_offset: Aggregate::new(), } } @@ -60,6 +62,10 @@ impl Stream { pub fn network_latency(&self) -> Option { self.latency.median() } + + pub fn predict_offset(&self) -> Option { + self.predict_offset.median() + } } impl Receiver { @@ -71,8 +77,26 @@ impl Receiver { } } - pub fn stats(&self) -> &ReceiverStats { - &self.stats + pub fn stats(&self) -> ReceiverStats { + let mut stats = ReceiverStats::new(); + + if let Some(stream) = &self.stream { + let decode = stream.decode.stats(); + stats.set_stream(decode.status); + stats.set_buffer_length(decode.buffered); + stats.set_audio_latency(decode.audio_latency); + stats.set_output_latency(decode.output_latency); + + if let Some(latency) = stream.network_latency() { + stats.set_network_latency(latency); + } + + if let Some(predict) = stream.predict_offset() { + stats.set_predict_offset(predict); + } + } + + stats } pub fn current_session(&self) -> Option { @@ -146,7 +170,7 @@ impl Receiver { }); // TODO - this is where we would take buffer length stats - stream.stream.send(AudioPts { + stream.decode.send(AudioPts { pts, audio: packet, })?; @@ -157,7 +181,7 @@ impl Receiver { let delta_usec = clock_delta.as_micros(); let predict_dts = (now.0 - latency_usec).checked_add_signed(-delta_usec).unwrap(); let predict_diff = predict_dts as i64 - packet_dts.0 as i64; - self.stats.set_predict_offset(predict_diff) + stream.predict_offset.observe(predict_diff); } } @@ -274,7 +298,8 @@ pub fn run(opt: ReceiveOpt) -> Result<(), RunError> { Some(PacketKind::StatsRequest(_)) => { // let state = state.lock().unwrap(); let sid = receiver.current_session().unwrap_or(SessionId::zeroed()); - let receiver = *receiver.stats(); + let receiver = receiver.stats(); + println!("{:?}", receiver); let reply = StatsReply::receiver(sid, receiver, node) .expect("allocate StatsReply packet"); diff --git a/bark/src/receive/stream.rs b/bark/src/receive/stream.rs index ad34fa3..aca79cb 100644 --- a/bark/src/receive/stream.rs +++ b/bark/src/receive/stream.rs @@ -1,3 +1,4 @@ +use std::sync::{Arc, Mutex}; use std::thread; use bark_core::{audio::Frame, receive::{pipeline::Pipeline, queue::{AudioPts, PacketQueue}, timing::Timing}}; @@ -10,29 +11,34 @@ use crate::time; use crate::receive::output::OutputRef; use crate::receive::queue::{self, Disconnected, QueueReceiver, QueueSender}; -pub struct Stream { +pub struct DecodeStream { tx: QueueSender, sid: SessionId, + stats: Arc>, } -impl Stream { +impl DecodeStream { pub fn new(header: &AudioPacketHeader, output: OutputRef) -> Self { let queue = PacketQueue::new(header); let (tx, rx) = queue::channel(queue); - let state = StreamState { + let state = State { queue: rx, pipeline: Pipeline::new(header), output, }; - thread::spawn(move || { - run_stream(state); + let stats = Arc::new(Mutex::new(DecodeStats::default())); + + thread::spawn({ + let stats = stats.clone(); + move || run_stream(state, stats) }); - Stream { + DecodeStream { tx, sid: header.sid, + stats, } } @@ -43,32 +49,39 @@ impl Stream { pub fn send(&self, audio: AudioPts) -> Result { self.tx.send(audio) } + + pub fn stats(&self) -> DecodeStats { + self.stats.lock().unwrap().clone() + } } -struct StreamState { +struct State { queue: QueueReceiver, pipeline: Pipeline, output: OutputRef, } -pub struct StreamStats { - status: StreamStatus, - audio_latency: TimestampDelta, - output_latency: SampleDuration, +#[derive(Clone)] +pub struct DecodeStats { + pub status: StreamStatus, + pub buffered: SampleDuration, + pub audio_latency: TimestampDelta, + pub output_latency: SampleDuration, } -impl Default for StreamStats { +impl Default for DecodeStats { fn default() -> Self { - StreamStats { + DecodeStats { status: StreamStatus::Seek, + buffered: SampleDuration::zero(), audio_latency: TimestampDelta::zero(), output_latency: SampleDuration::zero(), } } } -fn run_stream(mut stream: StreamState) { - let mut stats = StreamStats::default(); +fn run_stream(mut stream: State, stats_tx: Arc>) { + let mut stats = DecodeStats::default(); loop { // get next packet from queue, or None if missing (packet loss) @@ -119,6 +132,9 @@ fn run_stream(mut stream: StreamState) { stats.audio_latency = timing.real.delta(timing.play); } + // update stats + *stats_tx.lock().unwrap() = stats.clone(); + // send audio to ALSA match output.write(buffer) { Ok(()) => {}