start moving away from using wall clock time for synchronisation

This commit is contained in:
Hailey Somerville 2023-08-18 18:41:03 +10:00
parent 7cb45764b7
commit ade2ee4d57
7 changed files with 183 additions and 79 deletions

15
Cargo.lock generated
View file

@ -20,7 +20,7 @@ dependencies = [
"alsa-sys",
"bitflags",
"libc",
"nix",
"nix 0.24.3",
]
[[package]]
@ -66,6 +66,7 @@ dependencies = [
"bytemuck",
"cpal",
"derive_more",
"nix 0.26.2",
"static_assertions",
"structopt",
"termcolor",
@ -481,6 +482,18 @@ dependencies = [
"libc",
]
[[package]]
name = "nix"
version = "0.26.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfdda3d196821d6af13126e40375cdf7da646a96114af134d5f417a9a1dc8e1a"
dependencies = [
"bitflags",
"cfg-if",
"libc",
"static_assertions",
]
[[package]]
name = "nom"
version = "7.1.3"

View file

@ -9,6 +9,7 @@ edition = "2021"
bytemuck = { version = "1.13.1", features = ["derive"] }
cpal = "0.15.2"
derive_more = "0.99.17"
nix = { version = "0.26.2", features = ["time"], default-features = false }
static_assertions = "1.1.0"
structopt = "0.3.26"
termcolor = "1.2.0"

View file

@ -173,15 +173,15 @@ fn run_stream(opt: StreamOpt) -> Result<(), RunError> {
let socket = Arc::clone(&socket);
move || {
loop {
let t1 = TimestampMicros::now();
let now = TimestampMicros::now();
let packet = TimePacket {
magic: protocol::MAGIC_TIME,
flags: 0,
sid,
t1,
t2: TimestampMicros(0),
t3: TimestampMicros(0),
stream_1: now,
receive_2: TimestampMicros(0),
stream_3: TimestampMicros(0),
_pad: TimePacketPadding::zeroed(),
};
@ -213,7 +213,7 @@ fn run_stream(opt: StreamOpt) -> Result<(), RunError> {
Some(Packet::Time(packet)) => {
// only handle packet if it belongs to our stream:
if packet.sid.0 == sid.0 {
packet.t3 = TimestampMicros::now();
packet.stream_3 = TimestampMicros::now();
socket.send_to(bytemuck::bytes_of(packet), addr)
.expect("socket.send responding to time packet");
}
@ -286,17 +286,17 @@ fn run_receive(opt: ReceiveOpt) -> Result<(), RunError> {
.map_err(RunError::Socket)?;
match Packet::try_from_bytes_mut(&mut packet_raw[0..nbytes]) {
Some(Packet::Time(packet)) => {
if packet.t3.0 == 0 {
Some(Packet::Time(time)) => {
if time.stream_3.0 == 0 {
// we need to respond to this packet
packet.t2 = TimestampMicros::now();
socket.send_to(bytemuck::bytes_of(packet), addr)
time.receive_2 = TimestampMicros::now();
socket.send_to(bytemuck::bytes_of(time), addr)
.expect("reply to time packet");
continue;
}
let mut state = state.lock().unwrap();
state.recv.receive_time(packet);
state.recv.receive_time(time);
}
Some(Packet::Audio(packet)) => {
let mut state = state.lock().unwrap();

View file

@ -1,7 +1,7 @@
use std::time::SystemTime;
use bytemuck::{Pod, Zeroable};
use cpal::{SampleFormat, SampleRate, ChannelCount};
use nix::time::ClockId;
use nix::sys::time::TimeValLike;
pub const SAMPLE_FORMAT: SampleFormat = SampleFormat::F32;
pub const SAMPLE_RATE: SampleRate = SampleRate(48000);
@ -48,9 +48,10 @@ pub struct TimePacket {
pub magic: u32,
pub flags: u32,
pub sid: TimestampMicros,
pub t1: TimestampMicros,
pub t2: TimestampMicros,
pub t3: TimestampMicros,
pub stream_1: TimestampMicros,
pub receive_2: TimestampMicros,
pub stream_3: TimestampMicros,
// packet delay has a linear relationship to packet size - it's important
// that time packets experience as similar delay as possible to audio
@ -126,15 +127,11 @@ pub struct TimestampMicros(pub u64);
impl TimestampMicros {
pub fn now() -> TimestampMicros {
// SystemTime::now uses CLOCK_REALTIME on Linux, which is exactly what we want
// https://doc.rust-lang.org/std/time/struct.SystemTime.html#platform-specific-behavior
let micros = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("SystemTime::now before UNIX_EPOCH!")
.as_micros();
let timespec = nix::time::clock_gettime(ClockId::CLOCK_BOOTTIME)
.expect("clock_gettime(CLOCK_BOOTTIME) failed, are we on Linux?");
let micros = u64::try_from(micros)
.expect("can't narrow timestamp to u64");
let micros = u64::try_from(timespec.num_microseconds())
.expect("cannot convert i64 time value to u64");
TimestampMicros(micros)
}

View file

@ -3,7 +3,7 @@ use std::time::Duration;
use crate::protocol::{AudioPacket, self, TimePacket, TimestampMicros};
use crate::time::{Timestamp, SampleDuration, TimestampDelta, ClockDelta};
use crate::status::Status;
use crate::status::{Status, StreamStatus};
use crate::resample::Resampler;
pub struct Receiver {
@ -39,6 +39,7 @@ struct Stream {
adjust: TimestampDelta,
sync: bool,
resampler: Resampler,
latency_usec: Aggregate,
}
impl Stream {
@ -52,6 +53,7 @@ impl Stream {
adjust: TimestampDelta::zero(),
sync: false,
resampler,
latency_usec: Aggregate::default(),
}
}
@ -60,6 +62,10 @@ impl Stream {
let duration = SampleDuration::ONE_PACKET.mul(seq_delta);
self.start_pts.add(duration).adjust(self.adjust)
}
pub fn network_latency(&self) -> Duration {
Duration::from_micros(self.latency_usec.average())
}
}
#[derive(Clone, Copy)]
@ -91,9 +97,18 @@ impl Receiver {
return;
}
let network_latency_usec = (packet.t3.0 - packet.t1.0) / 2;
let network_latency = Duration::from_micros(network_latency_usec);
self.status.record_network_latency(network_latency);
let stream_1_usec = packet.stream_1.0;
let stream_3_usec = packet.stream_3.0;
let Some(rtt_usec) = stream_1_usec.checked_sub(stream_3_usec) else {
// invalid packet, ignore
return;
};
let network_latency_usec = rtt_usec / 2;
stream.latency_usec.observe(network_latency_usec);
self.status.record_network_latency(stream.network_latency());
let clock_delta = ClockDelta::from_time_packet(packet);
self.status.record_clock_delta(clock_delta);
@ -111,7 +126,7 @@ impl Receiver {
// new stream is taking over! switch over to it
println!("\nnew stream beginning");
self.stream = Some(Stream::start_from_packet(packet));
self.status.clear_sync();
self.status.clear_stream();
self.queue.clear();
return true;
}
@ -132,7 +147,7 @@ impl Receiver {
if back.seq + self.opt.max_seq_gap as u64 <= packet.seq {
println!("\nreceived packet with seq too far in future, resetting stream");
self.stream = Some(Stream::start_from_packet(packet));
self.status.clear_sync();
self.status.clear_stream();
self.queue.clear();
}
}
@ -140,7 +155,7 @@ impl Receiver {
true
} else {
self.stream = Some(Stream::start_from_packet(packet));
self.status.clear_sync();
self.status.clear_stream();
true
}
}
@ -205,6 +220,7 @@ impl Receiver {
let Some(stream) = self.stream.as_mut() else {
// stream hasn't started, just fill buffer with silence and return
data.fill(0f32);
self.status.render();
return;
};
@ -216,6 +232,7 @@ impl Receiver {
let Some(front) = self.queue.front_mut() else {
// nothing at front of queue?
data.fill(0f32);
self.status.render();
return;
};
@ -225,7 +242,6 @@ impl Receiver {
if late >= SampleDuration::ONE_PACKET {
// we are late by more than a packet, skip to the next
println!("\nlate by more than a packet, pts: {:?}, front pts: {:?}, late: {:?}", pts, front.pts, late);
self.queue.pop_front();
continue;
}
@ -235,7 +251,7 @@ impl Receiver {
// we are synced
stream.sync = true;
self.status.set_sync();
self.status.set_stream(StreamStatus::Sync);
break;
}
@ -246,6 +262,7 @@ impl Receiver {
// we are early by more than what was asked of us in this
// call, fill with zeroes and return
data.fill(0f32);
self.status.render();
return;
}
@ -257,7 +274,7 @@ impl Receiver {
// then mark ourselves as synced and fall through to regular processing
stream.sync = true;
self.status.set_sync();
self.status.set_stream(StreamStatus::Sync);
break;
}
}
@ -267,8 +284,9 @@ impl Receiver {
// copy data to out
while data.len() > 0 {
let Some(front) = self.queue.front_mut() else {
println!("\nqueue underrun, stream-side delay too low");
data.fill(0f32);
self.status.set_stream(StreamStatus::Miss);
self.status.render();
return;
};
@ -296,8 +314,13 @@ impl Receiver {
}
if let Some(copy_end_ts) = copy_end_ts {
let rate = adjusted_playback_rate(request_end_ts, copy_end_ts);
let _ = stream.resampler.set_input_rate(rate);
if let Some(rate) = adjusted_playback_rate(request_end_ts, copy_end_ts) {
let _ = stream.resampler.set_input_rate(rate);
self.status.set_stream(StreamStatus::Slew);
} else {
let _ = stream.resampler.set_input_rate(protocol::SAMPLE_RATE.0);
self.status.set_stream(StreamStatus::Sync);
}
self.status.record_audio_latency(request_end_ts, copy_end_ts);
}
@ -310,21 +333,56 @@ impl Receiver {
}
}
fn adjusted_playback_rate(real_ts: Timestamp, play_ts: Timestamp) -> u32 {
let base_rate = i64::from(protocol::SAMPLE_RATE.0);
fn adjusted_playback_rate(real_ts: Timestamp, play_ts: Timestamp) -> Option<u32> {
let delta = real_ts.delta(play_ts).as_frames();
let one_sec = i64::from(protocol::SAMPLE_RATE.0);
let one_ms = one_sec / 1000;
let max_adjust_percent = 2;
let max_rate = (base_rate * (100 + max_adjust_percent)) / 100;
let min_rate = (base_rate * (100 - max_adjust_percent)) / 100;
if delta.abs() > one_sec {
// we should desync here
}
let packet_delta = real_ts.delta(play_ts).as_frames() as f64 / protocol::FRAMES_PER_PACKET as f64;
let packet_delta_sq = packet_delta * packet_delta * f64::signum(packet_delta);
if delta.abs() < one_ms {
// no need to adjust
return None;
}
let adjust = (packet_delta_sq * protocol::FRAMES_PER_PACKET as f64) as i64;
let adjusted_rate = base_rate + adjust;
let adjusted_rate = std::cmp::min(adjusted_rate, max_rate);
let adjusted_rate = std::cmp::max(adjusted_rate, min_rate);
u32::try_from(adjusted_rate).unwrap()
if delta > 0 {
// real_ts > play_ts, ie. we are running slow
// speed up playback rate by 1%
let rate = protocol::SAMPLE_RATE.0 * 101 / 100;
return Some(rate);
} else {
// real_ts < play_ts, ie. we are running fast
// speed up playback rate by 1%
let rate = protocol::SAMPLE_RATE.0 * 99 / 100;
return Some(rate);
}
}
#[derive(Default)]
struct Aggregate {
samples: [u64; 32],
count: usize,
index: usize,
}
impl Aggregate {
pub fn observe(&mut self, value: u64) {
self.samples[self.index] += value;
if self.count < self.samples.len() {
self.count += 1;
}
self.index += 1;
self.index %= self.samples.len();
}
pub fn average(&self) -> u64 {
self.samples[0..self.count]
.iter()
.copied()
.sum::<u64>() / self.count as u64
}
}

View file

@ -7,7 +7,7 @@ use crate::time::{Timestamp, ClockDelta, SampleDuration};
const RENDER_INTERVAL: Duration = Duration::from_millis(32);
pub struct Status {
sync: bool,
stream: StreamStatus,
audio_latency_sec: Option<f64>,
buffer_length_sec: Option<f64>,
network_latency_sec: Option<f64>,
@ -15,10 +15,58 @@ pub struct Status {
last_render: Option<Instant>,
}
pub enum StreamStatus {
Seek,
Sync,
Slew,
Miss,
}
impl StreamStatus {
pub fn color(&self) -> ColorSpec {
let mut spec = ColorSpec::new();
match self {
StreamStatus::Seek => {
spec.set_dimmed(true);
}
StreamStatus::Sync => {
spec.set_bg(Some(Color::Green))
.set_fg(Some(Color::Rgb(0, 0, 0))) // dark black
.set_bold(true)
.set_intense(true);
}
StreamStatus::Slew => {
spec.set_bg(Some(Color::Yellow))
.set_fg(Some(Color::Rgb(0, 0, 0))) // dark black
.set_bold(true)
.set_intense(true);
}
StreamStatus::Miss => {
spec.set_bg(Some(Color::Red))
.set_fg(Some(Color::Rgb(0, 0, 0))) // dark black
.set_bold(true)
.set_intense(true);
}
}
spec
}
pub fn text(&self) -> &'static str {
match self {
StreamStatus::Seek => "SEEK",
StreamStatus::Sync => "SYNC",
StreamStatus::Slew => "SLEW",
StreamStatus::Miss => "MISS",
}
}
}
impl Status {
pub fn new() -> Self {
Status {
sync: false,
stream: StreamStatus::Seek,
audio_latency_sec: None,
buffer_length_sec: None,
network_latency_sec: None,
@ -27,12 +75,12 @@ impl Status {
}
}
pub fn set_sync(&mut self) {
self.sync = true;
pub fn set_stream(&mut self, status: StreamStatus) {
self.stream = status;
}
pub fn clear_sync(&mut self) {
self.sync = false;
pub fn clear_stream(&mut self) {
self.stream = StreamStatus::Seek;
self.audio_latency_sec = None;
self.buffer_length_sec = None;
self.network_latency_sec = None;
@ -74,22 +122,9 @@ impl Status {
let _ = write!(&mut out, "\r");
if self.sync {
let _ = out.set_color(&ColorSpec::new()
.set_bg(Some(Color::Green))
.set_fg(Some(Color::Rgb(0, 0, 0))) // dark black
.set_bold(true)
.set_intense(true));
let _ = out.write_all(b" SYNC ");
let _ = out.set_color(&ColorSpec::new());
} else {
let _ = out.set_color(&ColorSpec::new()
.set_dimmed(true));
let _ = out.write_all(b" UNSYNC ");
}
let _ = out.set_color(&self.stream.color());
let _ = write!(&mut out, " {} ", self.stream.text());
let _ = out.set_color(&ColorSpec::new());
if let Some(latency_sec) = self.audio_latency_sec {
let _ = write!(&mut out, " Audio:[{:>8.3} ms]", latency_sec * 1000.0);

View file

@ -111,13 +111,13 @@ impl ClockDelta {
pub fn from_time_packet(packet: &TimePacket) -> ClockDelta {
// all fields should be non-zero here, it's a programming error if
// they're not.
assert!(packet.t1.0 != 0);
assert!(packet.t2.0 != 0);
assert!(packet.t3.0 != 0);
assert!(packet.stream_1.0 != 0);
assert!(packet.receive_2.0 != 0);
assert!(packet.stream_3.0 != 0);
let t1_usec = packet.t1.0 as i64;
let t2_usec = packet.t2.0 as i64;
let t3_usec = packet.t3.0 as i64;
let t1_usec = packet.stream_1.0 as i64;
let t2_usec = packet.receive_2.0 as i64;
let t3_usec = packet.stream_3.0 as i64;
// algorithm from the Precision Time Protocol page on Wikipedia
ClockDelta((t2_usec - t1_usec + t2_usec - t3_usec) / 2)