implement stats

This commit is contained in:
Hailey Somerville 2024-06-28 14:55:43 +10:00
parent 7f12d53fd1
commit e811ce25a6
4 changed files with 73 additions and 30 deletions

View file

@ -138,4 +138,8 @@ impl TimestampDelta {
pub fn as_frames(&self) -> i64 { pub fn as_frames(&self) -> i64 {
self.0 self.0
} }
pub fn to_seconds(&self) -> f64 {
self.0 as f64 / f64::from(SAMPLE_RATE)
}
} }

View file

@ -1,7 +1,7 @@
use bitflags::bitflags; use bitflags::bitflags;
use bytemuck::{Zeroable, Pod}; use bytemuck::{Zeroable, Pod};
use crate::time::{SampleDuration, Timestamp}; use crate::time::{SampleDuration, TimestampDelta};
#[derive(Debug, Clone, Copy, Zeroable, Pod)] #[derive(Debug, Clone, Copy, Zeroable, Pod)]
#[repr(C)] #[repr(C)]
@ -17,6 +17,7 @@ pub struct ReceiverStats {
predict_offset: f64, predict_offset: f64,
} }
#[derive(Clone, Copy)]
pub enum StreamStatus { pub enum StreamStatus {
Seek, Seek,
Sync, Sync,
@ -108,11 +109,8 @@ impl ReceiverStats {
self.field(ReceiverStatsFlags::HAS_PREDICT_OFFSET, self.predict_offset) self.field(ReceiverStatsFlags::HAS_PREDICT_OFFSET, self.predict_offset)
} }
pub fn set_audio_latency(&mut self, request_pts: Timestamp, packet_pts: Timestamp) { pub fn set_audio_latency(&mut self, delta: TimestampDelta) {
let request_micros = request_pts.to_micros_lossy().0 as f64; self.audio_latency = delta.to_seconds();
let packet_micros = packet_pts.to_micros_lossy().0 as f64;
self.audio_latency = (request_micros - packet_micros) / 1_000_000.0;
self.flags.insert(ReceiverStatsFlags::HAS_AUDIO_LATENCY); self.flags.insert(ReceiverStatsFlags::HAS_AUDIO_LATENCY);
} }

View file

@ -14,13 +14,13 @@ use bark_protocol::packet::{Audio, Time, PacketKind, StatsReply};
use crate::audio::config::{DEFAULT_PERIOD, DEFAULT_BUFFER, DeviceOpt}; use crate::audio::config::{DEFAULT_PERIOD, DEFAULT_BUFFER, DeviceOpt};
use crate::audio::Output; use crate::audio::Output;
use crate::receive::output::OutputRef; use crate::receive::output::OutputRef;
use crate::receive::stream::Stream as ReceiveStream;
use crate::socket::{ProtocolSocket, Socket, SocketOpt}; use crate::socket::{ProtocolSocket, Socket, SocketOpt};
use crate::{time, stats, thread}; use crate::{time, stats, thread};
use crate::RunError; use crate::RunError;
use self::output::OwnedOutput; use self::output::OwnedOutput;
use self::queue::Disconnected; use self::queue::Disconnected;
use self::stream::DecodeStream;
pub mod output; pub mod output;
pub mod queue; pub mod queue;
@ -34,20 +34,22 @@ pub struct Receiver {
struct Stream { struct Stream {
sid: SessionId, sid: SessionId,
decode: DecodeStream,
latency: Aggregate<Duration>, latency: Aggregate<Duration>,
clock_delta: Aggregate<ClockDelta>, clock_delta: Aggregate<ClockDelta>,
stream: ReceiveStream, predict_offset: Aggregate<i64>,
} }
impl Stream { impl Stream {
pub fn new(header: &AudioPacketHeader, output: OutputRef) -> Self { pub fn new(header: &AudioPacketHeader, output: OutputRef) -> Self {
let stream = ReceiveStream::new(header, output); let decode = DecodeStream::new(header, output);
Stream { Stream {
sid: header.sid, sid: header.sid,
decode,
latency: Aggregate::new(), latency: Aggregate::new(),
clock_delta: Aggregate::new(), clock_delta: Aggregate::new(),
stream, predict_offset: Aggregate::new(),
} }
} }
@ -60,6 +62,10 @@ impl Stream {
pub fn network_latency(&self) -> Option<Duration> { pub fn network_latency(&self) -> Option<Duration> {
self.latency.median() self.latency.median()
} }
pub fn predict_offset(&self) -> Option<i64> {
self.predict_offset.median()
}
} }
impl Receiver { impl Receiver {
@ -71,8 +77,26 @@ impl Receiver {
} }
} }
pub fn stats(&self) -> &ReceiverStats { pub fn stats(&self) -> ReceiverStats {
&self.stats let mut stats = ReceiverStats::new();
if let Some(stream) = &self.stream {
let decode = stream.decode.stats();
stats.set_stream(decode.status);
stats.set_buffer_length(decode.buffered);
stats.set_audio_latency(decode.audio_latency);
stats.set_output_latency(decode.output_latency);
if let Some(latency) = stream.network_latency() {
stats.set_network_latency(latency);
}
if let Some(predict) = stream.predict_offset() {
stats.set_predict_offset(predict);
}
}
stats
} }
pub fn current_session(&self) -> Option<SessionId> { pub fn current_session(&self) -> Option<SessionId> {
@ -146,7 +170,7 @@ impl Receiver {
}); });
// TODO - this is where we would take buffer length stats // TODO - this is where we would take buffer length stats
stream.stream.send(AudioPts { stream.decode.send(AudioPts {
pts, pts,
audio: packet, audio: packet,
})?; })?;
@ -157,7 +181,7 @@ impl Receiver {
let delta_usec = clock_delta.as_micros(); let delta_usec = clock_delta.as_micros();
let predict_dts = (now.0 - latency_usec).checked_add_signed(-delta_usec).unwrap(); let predict_dts = (now.0 - latency_usec).checked_add_signed(-delta_usec).unwrap();
let predict_diff = predict_dts as i64 - packet_dts.0 as i64; let predict_diff = predict_dts as i64 - packet_dts.0 as i64;
self.stats.set_predict_offset(predict_diff) stream.predict_offset.observe(predict_diff);
} }
} }
@ -274,7 +298,8 @@ pub fn run(opt: ReceiveOpt) -> Result<(), RunError> {
Some(PacketKind::StatsRequest(_)) => { Some(PacketKind::StatsRequest(_)) => {
// let state = state.lock().unwrap(); // let state = state.lock().unwrap();
let sid = receiver.current_session().unwrap_or(SessionId::zeroed()); let sid = receiver.current_session().unwrap_or(SessionId::zeroed());
let receiver = *receiver.stats(); let receiver = receiver.stats();
println!("{:?}", receiver);
let reply = StatsReply::receiver(sid, receiver, node) let reply = StatsReply::receiver(sid, receiver, node)
.expect("allocate StatsReply packet"); .expect("allocate StatsReply packet");

View file

@ -1,3 +1,4 @@
use std::sync::{Arc, Mutex};
use std::thread; use std::thread;
use bark_core::{audio::Frame, receive::{pipeline::Pipeline, queue::{AudioPts, PacketQueue}, timing::Timing}}; use bark_core::{audio::Frame, receive::{pipeline::Pipeline, queue::{AudioPts, PacketQueue}, timing::Timing}};
@ -10,29 +11,34 @@ use crate::time;
use crate::receive::output::OutputRef; use crate::receive::output::OutputRef;
use crate::receive::queue::{self, Disconnected, QueueReceiver, QueueSender}; use crate::receive::queue::{self, Disconnected, QueueReceiver, QueueSender};
pub struct Stream { pub struct DecodeStream {
tx: QueueSender, tx: QueueSender,
sid: SessionId, sid: SessionId,
stats: Arc<Mutex<DecodeStats>>,
} }
impl Stream { impl DecodeStream {
pub fn new(header: &AudioPacketHeader, output: OutputRef) -> Self { pub fn new(header: &AudioPacketHeader, output: OutputRef) -> Self {
let queue = PacketQueue::new(header); let queue = PacketQueue::new(header);
let (tx, rx) = queue::channel(queue); let (tx, rx) = queue::channel(queue);
let state = StreamState { let state = State {
queue: rx, queue: rx,
pipeline: Pipeline::new(header), pipeline: Pipeline::new(header),
output, output,
}; };
thread::spawn(move || { let stats = Arc::new(Mutex::new(DecodeStats::default()));
run_stream(state);
thread::spawn({
let stats = stats.clone();
move || run_stream(state, stats)
}); });
Stream { DecodeStream {
tx, tx,
sid: header.sid, sid: header.sid,
stats,
} }
} }
@ -43,32 +49,39 @@ impl Stream {
pub fn send(&self, audio: AudioPts) -> Result<usize, Disconnected> { pub fn send(&self, audio: AudioPts) -> Result<usize, Disconnected> {
self.tx.send(audio) self.tx.send(audio)
} }
pub fn stats(&self) -> DecodeStats {
self.stats.lock().unwrap().clone()
}
} }
struct StreamState { struct State {
queue: QueueReceiver, queue: QueueReceiver,
pipeline: Pipeline, pipeline: Pipeline,
output: OutputRef, output: OutputRef,
} }
pub struct StreamStats { #[derive(Clone)]
status: StreamStatus, pub struct DecodeStats {
audio_latency: TimestampDelta, pub status: StreamStatus,
output_latency: SampleDuration, pub buffered: SampleDuration,
pub audio_latency: TimestampDelta,
pub output_latency: SampleDuration,
} }
impl Default for StreamStats { impl Default for DecodeStats {
fn default() -> Self { fn default() -> Self {
StreamStats { DecodeStats {
status: StreamStatus::Seek, status: StreamStatus::Seek,
buffered: SampleDuration::zero(),
audio_latency: TimestampDelta::zero(), audio_latency: TimestampDelta::zero(),
output_latency: SampleDuration::zero(), output_latency: SampleDuration::zero(),
} }
} }
} }
fn run_stream(mut stream: StreamState) { fn run_stream(mut stream: State, stats_tx: Arc<Mutex<DecodeStats>>) {
let mut stats = StreamStats::default(); let mut stats = DecodeStats::default();
loop { loop {
// get next packet from queue, or None if missing (packet loss) // get next packet from queue, or None if missing (packet loss)
@ -119,6 +132,9 @@ fn run_stream(mut stream: StreamState) {
stats.audio_latency = timing.real.delta(timing.play); stats.audio_latency = timing.real.delta(timing.play);
} }
// update stats
*stats_tx.lock().unwrap() = stats.clone();
// send audio to ALSA // send audio to ALSA
match output.write(buffer) { match output.write(buffer) {
Ok(()) => {} Ok(()) => {}