diff --git a/Cargo.lock b/Cargo.lock index 50867bf..492b197 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -20,7 +20,7 @@ dependencies = [ "alsa-sys", "bitflags", "libc", - "nix", + "nix 0.24.3", ] [[package]] @@ -66,6 +66,7 @@ dependencies = [ "bytemuck", "cpal", "derive_more", + "nix 0.26.2", "static_assertions", "structopt", "termcolor", @@ -481,6 +482,18 @@ dependencies = [ "libc", ] +[[package]] +name = "nix" +version = "0.26.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfdda3d196821d6af13126e40375cdf7da646a96114af134d5f417a9a1dc8e1a" +dependencies = [ + "bitflags", + "cfg-if", + "libc", + "static_assertions", +] + [[package]] name = "nom" version = "7.1.3" diff --git a/Cargo.toml b/Cargo.toml index 4d56261..a3a4ac2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ edition = "2021" bytemuck = { version = "1.13.1", features = ["derive"] } cpal = "0.15.2" derive_more = "0.99.17" +nix = { version = "0.26.2", features = ["time"], default-features = false } static_assertions = "1.1.0" structopt = "0.3.26" termcolor = "1.2.0" diff --git a/src/main.rs b/src/main.rs index 8a349eb..8907fc7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -173,15 +173,15 @@ fn run_stream(opt: StreamOpt) -> Result<(), RunError> { let socket = Arc::clone(&socket); move || { loop { - let t1 = TimestampMicros::now(); + let now = TimestampMicros::now(); let packet = TimePacket { magic: protocol::MAGIC_TIME, flags: 0, sid, - t1, - t2: TimestampMicros(0), - t3: TimestampMicros(0), + stream_1: now, + receive_2: TimestampMicros(0), + stream_3: TimestampMicros(0), _pad: TimePacketPadding::zeroed(), }; @@ -213,7 +213,7 @@ fn run_stream(opt: StreamOpt) -> Result<(), RunError> { Some(Packet::Time(packet)) => { // only handle packet if it belongs to our stream: if packet.sid.0 == sid.0 { - packet.t3 = TimestampMicros::now(); + packet.stream_3 = TimestampMicros::now(); socket.send_to(bytemuck::bytes_of(packet), addr) .expect("socket.send responding to time packet"); } @@ -286,17 +286,17 @@ fn run_receive(opt: ReceiveOpt) -> Result<(), RunError> { .map_err(RunError::Socket)?; match Packet::try_from_bytes_mut(&mut packet_raw[0..nbytes]) { - Some(Packet::Time(packet)) => { - if packet.t3.0 == 0 { + Some(Packet::Time(time)) => { + if time.stream_3.0 == 0 { // we need to respond to this packet - packet.t2 = TimestampMicros::now(); - socket.send_to(bytemuck::bytes_of(packet), addr) + time.receive_2 = TimestampMicros::now(); + socket.send_to(bytemuck::bytes_of(time), addr) .expect("reply to time packet"); continue; } let mut state = state.lock().unwrap(); - state.recv.receive_time(packet); + state.recv.receive_time(time); } Some(Packet::Audio(packet)) => { let mut state = state.lock().unwrap(); diff --git a/src/protocol.rs b/src/protocol.rs index a98ca3a..c1e34b0 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -1,7 +1,7 @@ -use std::time::SystemTime; - use bytemuck::{Pod, Zeroable}; use cpal::{SampleFormat, SampleRate, ChannelCount}; +use nix::time::ClockId; +use nix::sys::time::TimeValLike; pub const SAMPLE_FORMAT: SampleFormat = SampleFormat::F32; pub const SAMPLE_RATE: SampleRate = SampleRate(48000); @@ -48,9 +48,10 @@ pub struct TimePacket { pub magic: u32, pub flags: u32, pub sid: TimestampMicros, - pub t1: TimestampMicros, - pub t2: TimestampMicros, - pub t3: TimestampMicros, + + pub stream_1: TimestampMicros, + pub receive_2: TimestampMicros, + pub stream_3: TimestampMicros, // packet delay has a linear relationship to packet size - it's important // that time packets experience as similar delay as possible to audio @@ -126,15 +127,11 @@ pub struct TimestampMicros(pub u64); impl TimestampMicros { pub fn now() -> TimestampMicros { - // SystemTime::now uses CLOCK_REALTIME on Linux, which is exactly what we want - // https://doc.rust-lang.org/std/time/struct.SystemTime.html#platform-specific-behavior - let micros = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .expect("SystemTime::now before UNIX_EPOCH!") - .as_micros(); + let timespec = nix::time::clock_gettime(ClockId::CLOCK_BOOTTIME) + .expect("clock_gettime(CLOCK_BOOTTIME) failed, are we on Linux?"); - let micros = u64::try_from(micros) - .expect("can't narrow timestamp to u64"); + let micros = u64::try_from(timespec.num_microseconds()) + .expect("cannot convert i64 time value to u64"); TimestampMicros(micros) } diff --git a/src/receive.rs b/src/receive.rs index dffb2e3..59c76b2 100644 --- a/src/receive.rs +++ b/src/receive.rs @@ -3,7 +3,7 @@ use std::time::Duration; use crate::protocol::{AudioPacket, self, TimePacket, TimestampMicros}; use crate::time::{Timestamp, SampleDuration, TimestampDelta, ClockDelta}; -use crate::status::Status; +use crate::status::{Status, StreamStatus}; use crate::resample::Resampler; pub struct Receiver { @@ -39,6 +39,7 @@ struct Stream { adjust: TimestampDelta, sync: bool, resampler: Resampler, + latency_usec: Aggregate, } impl Stream { @@ -52,6 +53,7 @@ impl Stream { adjust: TimestampDelta::zero(), sync: false, resampler, + latency_usec: Aggregate::default(), } } @@ -60,6 +62,10 @@ impl Stream { let duration = SampleDuration::ONE_PACKET.mul(seq_delta); self.start_pts.add(duration).adjust(self.adjust) } + + pub fn network_latency(&self) -> Duration { + Duration::from_micros(self.latency_usec.average()) + } } #[derive(Clone, Copy)] @@ -91,9 +97,18 @@ impl Receiver { return; } - let network_latency_usec = (packet.t3.0 - packet.t1.0) / 2; - let network_latency = Duration::from_micros(network_latency_usec); - self.status.record_network_latency(network_latency); + let stream_1_usec = packet.stream_1.0; + let stream_3_usec = packet.stream_3.0; + + let Some(rtt_usec) = stream_1_usec.checked_sub(stream_3_usec) else { + // invalid packet, ignore + return; + }; + + let network_latency_usec = rtt_usec / 2; + stream.latency_usec.observe(network_latency_usec); + + self.status.record_network_latency(stream.network_latency()); let clock_delta = ClockDelta::from_time_packet(packet); self.status.record_clock_delta(clock_delta); @@ -111,7 +126,7 @@ impl Receiver { // new stream is taking over! switch over to it println!("\nnew stream beginning"); self.stream = Some(Stream::start_from_packet(packet)); - self.status.clear_sync(); + self.status.clear_stream(); self.queue.clear(); return true; } @@ -132,7 +147,7 @@ impl Receiver { if back.seq + self.opt.max_seq_gap as u64 <= packet.seq { println!("\nreceived packet with seq too far in future, resetting stream"); self.stream = Some(Stream::start_from_packet(packet)); - self.status.clear_sync(); + self.status.clear_stream(); self.queue.clear(); } } @@ -140,7 +155,7 @@ impl Receiver { true } else { self.stream = Some(Stream::start_from_packet(packet)); - self.status.clear_sync(); + self.status.clear_stream(); true } } @@ -205,6 +220,7 @@ impl Receiver { let Some(stream) = self.stream.as_mut() else { // stream hasn't started, just fill buffer with silence and return data.fill(0f32); + self.status.render(); return; }; @@ -216,6 +232,7 @@ impl Receiver { let Some(front) = self.queue.front_mut() else { // nothing at front of queue? data.fill(0f32); + self.status.render(); return; }; @@ -225,7 +242,6 @@ impl Receiver { if late >= SampleDuration::ONE_PACKET { // we are late by more than a packet, skip to the next - println!("\nlate by more than a packet, pts: {:?}, front pts: {:?}, late: {:?}", pts, front.pts, late); self.queue.pop_front(); continue; } @@ -235,7 +251,7 @@ impl Receiver { // we are synced stream.sync = true; - self.status.set_sync(); + self.status.set_stream(StreamStatus::Sync); break; } @@ -246,6 +262,7 @@ impl Receiver { // we are early by more than what was asked of us in this // call, fill with zeroes and return data.fill(0f32); + self.status.render(); return; } @@ -257,7 +274,7 @@ impl Receiver { // then mark ourselves as synced and fall through to regular processing stream.sync = true; - self.status.set_sync(); + self.status.set_stream(StreamStatus::Sync); break; } } @@ -267,8 +284,9 @@ impl Receiver { // copy data to out while data.len() > 0 { let Some(front) = self.queue.front_mut() else { - println!("\nqueue underrun, stream-side delay too low"); data.fill(0f32); + self.status.set_stream(StreamStatus::Miss); + self.status.render(); return; }; @@ -296,8 +314,13 @@ impl Receiver { } if let Some(copy_end_ts) = copy_end_ts { - let rate = adjusted_playback_rate(request_end_ts, copy_end_ts); - let _ = stream.resampler.set_input_rate(rate); + if let Some(rate) = adjusted_playback_rate(request_end_ts, copy_end_ts) { + let _ = stream.resampler.set_input_rate(rate); + self.status.set_stream(StreamStatus::Slew); + } else { + let _ = stream.resampler.set_input_rate(protocol::SAMPLE_RATE.0); + self.status.set_stream(StreamStatus::Sync); + } self.status.record_audio_latency(request_end_ts, copy_end_ts); } @@ -310,21 +333,56 @@ impl Receiver { } } -fn adjusted_playback_rate(real_ts: Timestamp, play_ts: Timestamp) -> u32 { - let base_rate = i64::from(protocol::SAMPLE_RATE.0); +fn adjusted_playback_rate(real_ts: Timestamp, play_ts: Timestamp) -> Option { + let delta = real_ts.delta(play_ts).as_frames(); + let one_sec = i64::from(protocol::SAMPLE_RATE.0); + let one_ms = one_sec / 1000; - let max_adjust_percent = 2; - let max_rate = (base_rate * (100 + max_adjust_percent)) / 100; - let min_rate = (base_rate * (100 - max_adjust_percent)) / 100; + if delta.abs() > one_sec { + // we should desync here + } - let packet_delta = real_ts.delta(play_ts).as_frames() as f64 / protocol::FRAMES_PER_PACKET as f64; - let packet_delta_sq = packet_delta * packet_delta * f64::signum(packet_delta); + if delta.abs() < one_ms { + // no need to adjust + return None; + } - let adjust = (packet_delta_sq * protocol::FRAMES_PER_PACKET as f64) as i64; - - let adjusted_rate = base_rate + adjust; - let adjusted_rate = std::cmp::min(adjusted_rate, max_rate); - let adjusted_rate = std::cmp::max(adjusted_rate, min_rate); - - u32::try_from(adjusted_rate).unwrap() + if delta > 0 { + // real_ts > play_ts, ie. we are running slow + // speed up playback rate by 1% + let rate = protocol::SAMPLE_RATE.0 * 101 / 100; + return Some(rate); + } else { + // real_ts < play_ts, ie. we are running fast + // speed up playback rate by 1% + let rate = protocol::SAMPLE_RATE.0 * 99 / 100; + return Some(rate); + } +} + +#[derive(Default)] +struct Aggregate { + samples: [u64; 32], + count: usize, + index: usize, +} + +impl Aggregate { + pub fn observe(&mut self, value: u64) { + self.samples[self.index] += value; + + if self.count < self.samples.len() { + self.count += 1; + } + + self.index += 1; + self.index %= self.samples.len(); + } + + pub fn average(&self) -> u64 { + self.samples[0..self.count] + .iter() + .copied() + .sum::() / self.count as u64 + } } diff --git a/src/status.rs b/src/status.rs index 74821d6..8356103 100644 --- a/src/status.rs +++ b/src/status.rs @@ -7,7 +7,7 @@ use crate::time::{Timestamp, ClockDelta, SampleDuration}; const RENDER_INTERVAL: Duration = Duration::from_millis(32); pub struct Status { - sync: bool, + stream: StreamStatus, audio_latency_sec: Option, buffer_length_sec: Option, network_latency_sec: Option, @@ -15,10 +15,58 @@ pub struct Status { last_render: Option, } +pub enum StreamStatus { + Seek, + Sync, + Slew, + Miss, +} + +impl StreamStatus { + pub fn color(&self) -> ColorSpec { + let mut spec = ColorSpec::new(); + + match self { + StreamStatus::Seek => { + spec.set_dimmed(true); + } + StreamStatus::Sync => { + spec.set_bg(Some(Color::Green)) + .set_fg(Some(Color::Rgb(0, 0, 0))) // dark black + .set_bold(true) + .set_intense(true); + } + StreamStatus::Slew => { + spec.set_bg(Some(Color::Yellow)) + .set_fg(Some(Color::Rgb(0, 0, 0))) // dark black + .set_bold(true) + .set_intense(true); + } + StreamStatus::Miss => { + spec.set_bg(Some(Color::Red)) + .set_fg(Some(Color::Rgb(0, 0, 0))) // dark black + .set_bold(true) + .set_intense(true); + } + } + + spec + } + + pub fn text(&self) -> &'static str { + match self { + StreamStatus::Seek => "SEEK", + StreamStatus::Sync => "SYNC", + StreamStatus::Slew => "SLEW", + StreamStatus::Miss => "MISS", + } + } +} + impl Status { pub fn new() -> Self { Status { - sync: false, + stream: StreamStatus::Seek, audio_latency_sec: None, buffer_length_sec: None, network_latency_sec: None, @@ -27,12 +75,12 @@ impl Status { } } - pub fn set_sync(&mut self) { - self.sync = true; + pub fn set_stream(&mut self, status: StreamStatus) { + self.stream = status; } - pub fn clear_sync(&mut self) { - self.sync = false; + pub fn clear_stream(&mut self) { + self.stream = StreamStatus::Seek; self.audio_latency_sec = None; self.buffer_length_sec = None; self.network_latency_sec = None; @@ -74,22 +122,9 @@ impl Status { let _ = write!(&mut out, "\r"); - if self.sync { - let _ = out.set_color(&ColorSpec::new() - .set_bg(Some(Color::Green)) - .set_fg(Some(Color::Rgb(0, 0, 0))) // dark black - .set_bold(true) - .set_intense(true)); - - let _ = out.write_all(b" SYNC "); - - let _ = out.set_color(&ColorSpec::new()); - } else { - let _ = out.set_color(&ColorSpec::new() - .set_dimmed(true)); - - let _ = out.write_all(b" UNSYNC "); - } + let _ = out.set_color(&self.stream.color()); + let _ = write!(&mut out, " {} ", self.stream.text()); + let _ = out.set_color(&ColorSpec::new()); if let Some(latency_sec) = self.audio_latency_sec { let _ = write!(&mut out, " Audio:[{:>8.3} ms]", latency_sec * 1000.0); diff --git a/src/time.rs b/src/time.rs index f6fbfe1..a375b1b 100644 --- a/src/time.rs +++ b/src/time.rs @@ -111,13 +111,13 @@ impl ClockDelta { pub fn from_time_packet(packet: &TimePacket) -> ClockDelta { // all fields should be non-zero here, it's a programming error if // they're not. - assert!(packet.t1.0 != 0); - assert!(packet.t2.0 != 0); - assert!(packet.t3.0 != 0); + assert!(packet.stream_1.0 != 0); + assert!(packet.receive_2.0 != 0); + assert!(packet.stream_3.0 != 0); - let t1_usec = packet.t1.0 as i64; - let t2_usec = packet.t2.0 as i64; - let t3_usec = packet.t3.0 as i64; + let t1_usec = packet.stream_1.0 as i64; + let t2_usec = packet.receive_2.0 as i64; + let t3_usec = packet.stream_3.0 as i64; // algorithm from the Precision Time Protocol page on Wikipedia ClockDelta((t2_usec - t1_usec + t2_usec - t3_usec) / 2)