rework protocol handling

This commit is contained in:
Hailey Somerville 2023-08-22 19:10:00 +10:00
parent 91936e99f7
commit 76a4b7b18d
11 changed files with 599 additions and 305 deletions

View file

@ -7,7 +7,7 @@ edition = "2021"
[dependencies]
bitflags = { version = "2.4.0", features = ["bytemuck"] }
bytemuck = { version = "1.13.1", features = ["derive"] }
bytemuck = { version = "1.13.1", features = ["derive", "extern_crate_alloc"] }
cpal = "0.15.2"
derive_more = "0.99.17"
libc = "0.2.147"

View file

@ -1,4 +1,8 @@
// pub mod source;
pub mod types;
pub mod packet;
use std::io;
pub use cpal::{SampleFormat, SampleRate, ChannelCount};
@ -8,4 +12,38 @@ pub const CHANNELS: ChannelCount = 2;
pub const FRAMES_PER_PACKET: usize = 160;
pub const SAMPLES_PER_PACKET: usize = CHANNELS as usize * FRAMES_PER_PACKET;
pub const MAX_PACKET_SIZE: usize = ::std::mem::size_of::<types::PacketUnion>();
use crate::socket::{Socket, PeerId};
use crate::protocol::packet::PacketBuffer;
use self::packet::Packet;
pub struct Protocol {
socket: Socket,
}
impl Protocol {
pub fn new(socket: Socket) -> Self {
Protocol { socket }
}
pub fn broadcast(&self, packet: &Packet) -> Result<(), io::Error> {
self.socket.broadcast(packet.as_buffer().as_bytes())
}
pub fn send_to(&self, packet: &Packet, peer: PeerId) -> Result<(), io::Error> {
self.socket.send_to(packet.as_buffer().as_bytes(), peer)
}
pub fn recv_from(&self) -> Result<(Packet, PeerId), io::Error> {
loop {
let mut buffer = PacketBuffer::allocate();
let (nbytes, peer) = self.socket.recv_from(buffer.as_bytes_mut())?;
buffer.set_len(nbytes);
if let Some(packet) = Packet::from_buffer(buffer) {
return Ok((packet, peer));
}
}
}
}

344
src/protocol/packet.rs Normal file
View file

@ -0,0 +1,344 @@
use std::mem::size_of;
use bytemuck::Zeroable;
pub use cpal::{SampleFormat, SampleRate, ChannelCount};
use crate::stats::node::NodeStats;
use crate::stats::receiver::ReceiverStats;
use crate::time::SampleDuration;
use crate::protocol::types::{self, Magic};
use super::types::{AudioPacketHeader, StatsReplyFlags, SessionId};
pub const MAX_PACKET_SIZE: usize =
size_of::<types::PacketHeader>() +
size_of::<types::AudioPacketHeader>() +
size_of::<types::AudioPacketBuffer>();
pub struct PacketBuffer {
raw: Box<[u8]>,
len: usize,
}
impl PacketBuffer {
pub fn allocate() -> Self {
PacketBuffer {
raw: bytemuck::allocation::zeroed_slice_box(MAX_PACKET_SIZE),
len: 0,
}
}
pub fn len(&self) -> usize {
self.len
}
pub fn set_len(&mut self, len: usize) {
self.len = len;
}
pub fn as_bytes(&self) -> &[u8] {
&self.raw[0..self.len]
}
pub fn as_bytes_mut(&mut self) -> &mut [u8] {
&mut self.raw[0..self.len]
}
}
pub struct Packet(PacketBuffer);
impl Packet {
fn allocate(magic: Magic, len: usize) -> Self {
let mut packet = Packet(PacketBuffer::allocate());
packet.set_len(len);
packet.header_mut().magic = magic;
return packet;
}
pub fn from_buffer(buffer: PacketBuffer) -> Option<Packet> {
let header_size = size_of::<types::PacketHeader>();
if buffer.len() < header_size {
None
} else {
Some(Packet(buffer))
}
}
pub fn as_buffer(&self) -> &PacketBuffer {
&self.0
}
pub fn parse(self) -> Option<PacketKind> {
match self.header().magic {
Magic::AUDIO => Audio::parse(self).map(PacketKind::Audio),
Magic::TIME => Time::parse(self).map(PacketKind::Time),
Magic::STATS_REQ => StatsRequest::parse(self).map(PacketKind::StatsRequest),
Magic::STATS_REPLY => StatsReply::parse(self).map(PacketKind::StatsReply),
_ => None,
}
}
pub fn header(&self) -> &types::PacketHeader {
let header_size = size_of::<types::PacketHeader>();
let header_bytes = &self.0.as_bytes()[0..header_size];
bytemuck::from_bytes(header_bytes)
}
pub fn header_mut(&mut self) -> &mut types::PacketHeader {
let header_size = size_of::<types::PacketHeader>();
let header_bytes = &mut self.0.as_bytes_mut()[0..header_size];
bytemuck::from_bytes_mut(header_bytes)
}
pub fn len(&self) -> usize {
let header_size = size_of::<types::PacketHeader>();
self.0.len() - header_size
}
pub fn set_len(&mut self, len: usize) {
let header_size = size_of::<types::PacketHeader>();
self.0.set_len(header_size + len);
}
pub fn as_bytes(&self) -> &[u8] {
&self.as_raw_buffer()[0..self.len()]
}
pub fn as_bytes_mut(&mut self) -> &mut [u8] {
let len = self.len();
&mut self.as_raw_buffer_mut()[0..len]
}
fn as_raw_buffer(&self) -> &[u8] {
let header_size = size_of::<types::PacketHeader>();
&self.0.as_bytes()[header_size..]
}
fn as_raw_buffer_mut(&mut self) -> &mut [u8] {
let header_size = size_of::<types::PacketHeader>();
&mut self.0.as_bytes_mut()[header_size..]
}
}
pub enum PacketKind {
Audio(Audio),
Time(Time),
StatsRequest(StatsRequest),
StatsReply(StatsReply),
}
pub struct Audio(Packet);
impl Audio {
const LENGTH: usize =
size_of::<types::AudioPacketHeader>() +
size_of::<types::AudioPacketBuffer>();
pub fn write() -> AudioWriter {
let packet = Packet::allocate(Magic::AUDIO, Self::LENGTH);
AudioWriter {
packet: Audio(packet),
written: SampleDuration::zero(),
}
}
pub fn parse(packet: Packet) -> Option<Self> {
if packet.len() != Self::LENGTH {
return None;
}
if packet.header().flags != 0 {
return None;
}
Some(Audio(packet))
}
pub fn as_packet(&self) -> &Packet {
&self.0
}
pub fn buffer(&self) -> &[f32] {
let header_size = size_of::<types::AudioPacketHeader>();
let buffer_bytes = &self.0.as_bytes()[header_size..];
bytemuck::cast_slice(buffer_bytes)
}
pub fn buffer_mut(&mut self) -> &mut [f32] {
let header_size = size_of::<types::AudioPacketHeader>();
let buffer_bytes = &mut self.0.as_bytes_mut()[header_size..];
bytemuck::cast_slice_mut(buffer_bytes)
}
pub fn header(&self) -> &types::AudioPacketHeader {
let header_size = size_of::<types::AudioPacketHeader>();
let header_bytes = &self.0.as_bytes()[0..header_size];
bytemuck::from_bytes(header_bytes)
}
pub fn header_mut(&mut self) -> &mut types::AudioPacketHeader {
let header_size = size_of::<types::AudioPacketHeader>();
let header_bytes = &mut self.0.as_bytes_mut()[0..header_size];
bytemuck::from_bytes_mut(header_bytes)
}
}
pub struct AudioWriter {
packet: Audio,
written: SampleDuration,
}
impl AudioWriter {
pub fn length(&self) -> SampleDuration {
self.written
}
pub fn remaining(&self) -> SampleDuration {
SampleDuration::ONE_PACKET.sub(self.length())
}
fn remaining_buffer_mut(&mut self) -> &mut [f32] {
let offset = self.length().as_buffer_offset();
&mut self.packet.buffer_mut()[offset..]
}
pub fn valid_length(&self) -> bool {
self.remaining() == SampleDuration::zero()
}
pub fn write(&mut self, audio: &[f32]) -> SampleDuration {
let input_duration = SampleDuration::from_buffer_offset(audio.len());
let copy_duration = std::cmp::min(input_duration, self.remaining());
let copy_len = copy_duration.as_buffer_offset();
let source_buffer = &audio[0..copy_len];
let dest_buffer = &mut self.remaining_buffer_mut()[0..copy_len];
dest_buffer.copy_from_slice(source_buffer);
copy_duration
}
pub fn finalize(mut self, header: AudioPacketHeader) -> Audio {
if !self.valid_length() {
panic!("into_audio_packet called on writer with invalid length");
}
*self.packet.header_mut() = header;
self.packet
}
}
pub struct Time(Packet);
impl Time {
const LENGTH: usize = size_of::<types::TimePacket>();
pub fn allocate() -> Self {
Time(Packet::allocate(Magic::TIME, Self::LENGTH))
}
pub fn parse(packet: Packet) -> Option<Self> {
if packet.len() != Self::LENGTH {
return None;
}
if packet.header().flags != 0 {
return None;
}
Some(Time(packet))
}
pub fn as_packet(&self) -> &Packet {
&self.0
}
pub fn data(&self) -> &types::TimePacket {
bytemuck::from_bytes(self.0.as_bytes())
}
pub fn data_mut(&mut self) -> &mut types::TimePacket {
bytemuck::from_bytes_mut(self.0.as_bytes_mut())
}
}
pub struct StatsRequest(Packet);
impl StatsRequest {
pub fn new() -> Self {
StatsRequest(Packet::allocate(Magic::STATS_REQ, 0))
}
pub fn parse(packet: Packet) -> Option<Self> {
if packet.len() != 0 {
return None;
}
if packet.header().flags != 0 {
return None;
}
Some(StatsRequest(packet))
}
pub fn as_packet(&self) -> &Packet {
&self.0
}
}
pub struct StatsReply(Packet);
impl StatsReply {
const LENGTH: usize = size_of::<types::StatsReplyPacket>();
fn new(flags: StatsReplyFlags, data: types::StatsReplyPacket) -> Self {
let mut packet = Packet::allocate(Magic::STATS_REPLY, Self::LENGTH);
packet.header_mut().flags = bytemuck::cast(flags);
let mut reply = StatsReply(packet);
*reply.data_mut() = data;
reply
}
pub fn source(sid: SessionId, node: NodeStats) -> Self {
let receiver = ReceiverStats::zeroed();
Self::new(
StatsReplyFlags::IS_STREAM,
types::StatsReplyPacket { sid, receiver, node },
)
}
pub fn receiver(sid: SessionId, receiver: ReceiverStats, node: NodeStats) -> Self {
Self::new(
StatsReplyFlags::IS_RECEIVER,
types::StatsReplyPacket { sid, receiver, node },
)
}
pub fn parse(packet: Packet) -> Option<Self> {
if packet.len() != Self::LENGTH {
return None;
}
Some(StatsReply(packet))
}
pub fn as_packet(&self) -> &Packet {
&self.0
}
pub fn flags(&self) -> types::StatsReplyFlags {
bytemuck::cast(self.0.header().flags)
}
pub fn data(&self) -> &types::StatsReplyPacket {
bytemuck::from_bytes(self.0.as_bytes())
}
pub fn data_mut(&mut self) -> &mut types::StatsReplyPacket {
bytemuck::from_bytes_mut(self.0.as_bytes_mut())
}
}

24
src/protocol/source.rs Normal file
View file

@ -0,0 +1,24 @@
// use crate::socket::Socket;
// use crate::protocol;
// pub struct SourceProtocol {
// socket: Socket,
// }
// impl SourceProtocol {
// pub fn new(socket: Socket) -> Self {
// SourceProtocol { socket }
// }
// }
// pub struct AudioPacket {
// raw: Box<protocol::types::AudioPacket>,
// }
// impl AudioPacket {
// pub fn new() -> AudioPacket {
// AudioPacket {
// raw: bytemuck::allocation::zeroed_box(),
// }
// }
// }

View file

@ -5,10 +5,25 @@ use nix::sys::time::TimeValLike;
use crate::stats;
use crate::protocol;
pub const MAGIC_AUDIO: u32 = 0x00a79ae2;
pub const MAGIC_TIME: u32 = 0x01a79ae2;
pub const MAGIC_STATS_REQ: u32 = 0x02a79ae2;
pub const MAGIC_STATS_REPLY: u32 = 0x03a79ae2;
#[derive(Debug, Clone, Copy, Zeroable, Pod, PartialEq, Eq)]
#[repr(transparent)]
pub struct Magic(u32);
impl Magic {
pub const AUDIO: Magic = Magic(0x00a79ae2);
pub const TIME: Magic = Magic(0x01a79ae2);
pub const STATS_REQ: Magic = Magic(0x02a79ae2);
pub const STATS_REPLY: Magic = Magic(0x03a79ae2);
}
#[derive(Debug, Clone, Copy, Zeroable, Pod)]
#[repr(C)]
pub struct PacketHeader {
// magic and flags. there is a distinct magic value for each packet type,
// and flags has a packet-dependent meaning.
pub magic: Magic,
pub flags: u32,
}
/// our network Packet struct
/// we don't need to worry about endianness, because according to the rust docs:
@ -19,12 +34,7 @@ pub const MAGIC_STATS_REPLY: u32 = 0x03a79ae2;
/// - https://doc.rust-lang.org/std/primitive.f32.html
#[derive(Debug, Clone, Copy, Zeroable, Pod)]
#[repr(C)]
pub struct AudioPacket {
// magic and flags. magic is always MAGIC_AUDIO and indicates that this
// is an audio packet. flags is always 0 for now.
pub magic: u32,
pub flags: u32,
pub struct AudioPacketHeader {
// stream id - set to the start time of a stream, used by receivers to
// detect new stream starts, used by senders to detect stream takeovers
pub sid: SessionId,
@ -38,27 +48,19 @@ pub struct AudioPacket {
// data timestamp - the stream's clock when packet is sent
pub dts: TimestampMicros,
// audio data:
pub buffer: PacketBuffer,
}
pub type AudioPacketBuffer = [f32; protocol::SAMPLES_PER_PACKET];
#[derive(Debug, Clone, Copy, Zeroable, Pod)]
#[repr(C)]
pub struct TimePacket {
pub magic: u32,
pub flags: u32,
pub sid: SessionId,
pub rid: ReceiverId,
pub stream_1: TimestampMicros,
pub receive_2: TimestampMicros,
pub stream_3: TimestampMicros,
// packet delay has a linear relationship to packet size - it's important
// that time packets experience as similar delay as possible to audio
// packets for most accurate synchronisation, so we add some padding here
pub _pad: TimePacketPadding,
}
#[derive(Debug, PartialEq)]
@ -97,19 +99,9 @@ impl TimePacket {
}
}
#[derive(Debug, Clone, Copy, Zeroable, Pod)]
#[repr(C)]
pub struct StatsRequestPacket {
pub magic: u32,
pub flags: u32,
}
#[derive(Debug, Clone, Copy, Zeroable, Pod)]
#[repr(C)]
pub struct StatsReplyPacket {
pub magic: u32,
pub flags: StatsReplyFlags,
pub sid: SessionId,
pub receiver: stats::receiver::ReceiverStats,
pub node: stats::node::NodeStats,
@ -124,78 +116,6 @@ bitflags::bitflags! {
}
}
#[derive(Debug, Clone, Copy)]
#[repr(transparent)]
pub struct PacketBuffer(pub [f32; protocol::SAMPLES_PER_PACKET]);
/// SAFETY: Pod is impl'd for f32, and [T: Pod; N: usize]
/// but for some reason doesn't like N == SAMPLES_PER_PACKET?
unsafe impl Pod for PacketBuffer {}
/// SAFETY: Zeroable is impl'd for f32, and [T: Zeroable; N: usize]
/// but for some reason doesn't like N == SAMPLES_PER_PACKET?
unsafe impl Zeroable for PacketBuffer {
fn zeroed() -> Self {
PacketBuffer([0f32; protocol::SAMPLES_PER_PACKET])
}
}
#[derive(Debug, Clone, Copy)]
pub struct TimePacketPadding([u8; 1272]);
// SAFETY: same as above in PacketBuffer
unsafe impl Pod for TimePacketPadding {}
// SAFETY: same as above in PacketBuffer
unsafe impl Zeroable for TimePacketPadding {
fn zeroed() -> Self {
TimePacketPadding([0u8; 1272])
}
}
// assert that AudioPacket and TimePacket are the same size, see comment for
// TimePacket::_pad field
static_assertions::assert_eq_size!(AudioPacket, TimePacket);
#[repr(C)]
pub union PacketUnion {
_1: AudioPacket,
_2: TimePacket,
_3: StatsRequestPacket,
_4: StatsReplyPacket,
}
pub enum Packet<'a> {
Audio(&'a mut AudioPacket),
Time(&'a mut TimePacket),
StatsRequest(&'a mut StatsRequestPacket),
StatsReply(&'a mut StatsReplyPacket),
}
impl<'a> Packet<'a> {
pub fn try_from_bytes_mut(raw: &'a mut [u8]) -> Option<Packet<'a>> {
let magic: u32 = *bytemuck::try_from_bytes(&raw[0..4]).ok()?;
if magic == MAGIC_TIME {
return Some(Packet::Time(bytemuck::try_from_bytes_mut(raw).ok()?));
}
if magic == MAGIC_AUDIO {
return Some(Packet::Audio(bytemuck::try_from_bytes_mut(raw).ok()?));
}
if magic == MAGIC_STATS_REQ {
return Some(Packet::StatsRequest(bytemuck::try_from_bytes_mut(raw).ok()?));
}
if magic == MAGIC_STATS_REPLY {
return Some(Packet::StatsReply(bytemuck::try_from_bytes_mut(raw).ok()?));
}
None
}
}
#[derive(Debug, Clone, Copy, Zeroable, Pod)]
#[repr(transparent)]
pub struct TimestampMicros(pub u64);

View file

@ -8,8 +8,9 @@ use cpal::{SampleRate, OutputCallbackInfo};
use cpal::traits::{HostTrait, DeviceTrait};
use structopt::StructOpt;
use crate::protocol;
use crate::protocol::types::{AudioPacket, TimePacket, TimestampMicros, Packet, SessionId, ReceiverId, TimePhase, StatsReplyPacket, StatsReplyFlags};
use crate::protocol::{self, Protocol};
use crate::protocol::packet::{Audio, Time, PacketKind, StatsReply};
use crate::protocol::types::{TimestampMicros, SessionId, ReceiverId, TimePhase};
use crate::resample::Resampler;
use crate::socket::{Socket, SocketOpt};
use crate::stats::node::NodeStats;
@ -29,13 +30,13 @@ struct QueueEntry {
seq: u64,
pts: Option<Timestamp>,
consumed: SampleDuration,
packet: Option<AudioPacket>,
packet: Option<Audio>,
}
impl QueueEntry {
pub fn as_full_buffer(&self) -> &[f32; protocol::SAMPLES_PER_PACKET] {
pub fn as_full_buffer(&self) -> &[f32] {
self.packet.as_ref()
.map(|packet| &packet.buffer.0)
.map(|packet| packet.buffer())
.unwrap_or(&[0f32; protocol::SAMPLES_PER_PACKET])
}
}
@ -51,12 +52,12 @@ struct Stream {
}
impl Stream {
pub fn start_from_packet(packet: &AudioPacket) -> Self {
pub fn start_from_packet(audio: &Audio) -> Self {
let resampler = Resampler::new();
Stream {
sid: packet.sid,
start_seq: packet.seq,
sid: audio.header().sid,
start_seq: audio.header().seq,
sync: false,
resampler,
rate_adjust: RateAdjust::new(),
@ -102,19 +103,19 @@ impl Receiver {
self.stream.as_ref().map(|s| s.sid)
}
pub fn receive_time(&mut self, packet: &TimePacket) {
pub fn receive_time(&mut self, packet: Time) {
let Some(stream) = self.stream.as_mut() else {
// no stream, nothing we can do with a time packet
return;
};
if stream.sid != packet.sid {
if stream.sid != packet.data().sid {
// not relevant to our stream, ignore
return;
}
let stream_1_usec = packet.stream_1.0;
let stream_3_usec = packet.stream_3.0;
let stream_1_usec = packet.data().stream_1.0;
let stream_3_usec = packet.data().stream_3.0;
let Some(rtt_usec) = stream_3_usec.checked_sub(stream_1_usec) else {
// invalid packet, ignore
@ -128,18 +129,20 @@ impl Receiver {
self.stats.set_network_latency(latency);
}
let clock_delta = ClockDelta::from_time_packet(packet);
let clock_delta = ClockDelta::from_time_packet(&packet);
stream.clock_delta.observe(clock_delta);
}
fn prepare_stream(&mut self, packet: &AudioPacket) -> bool {
fn prepare_stream(&mut self, packet: &Audio) -> bool {
if let Some(stream) = self.stream.as_mut() {
if packet.sid < stream.sid {
let header = packet.header();
if header.sid < stream.sid {
// packet belongs to a previous stream, ignore
return false;
}
if packet.sid > stream.sid {
if header.sid > stream.sid {
// new stream is taking over! switch over to it
println!("\nnew stream beginning");
self.stream = Some(Stream::start_from_packet(packet));
@ -148,20 +151,20 @@ impl Receiver {
return true;
}
if packet.seq < stream.start_seq {
if header.seq < stream.start_seq {
println!("\nreceived packet with seq before start, dropping");
return false;
}
if let Some(front) = self.queue.front() {
if packet.seq <= front.seq {
if header.seq <= front.seq {
println!("\nreceived packet with seq <= queue front seq, dropping");
return false;
}
}
if let Some(back) = self.queue.back() {
if back.seq + self.opt.max_seq_gap as u64 <= packet.seq {
if back.seq + self.opt.max_seq_gap as u64 <= header.seq {
println!("\nreceived packet with seq too far in future, resetting stream");
self.stream = Some(Stream::start_from_packet(packet));
self.stats.clear();
@ -177,15 +180,10 @@ impl Receiver {
}
}
pub fn receive_audio(&mut self, packet: &AudioPacket) {
pub fn receive_audio(&mut self, packet: Audio) {
let now = TimestampMicros::now();
if packet.flags != 0 {
println!("\nunknown flags in packet, ignoring entire packet");
return;
}
if !self.prepare_stream(packet) {
if !self.prepare_stream(&packet) {
return;
}
@ -198,7 +196,7 @@ impl Receiver {
let latency_usec = u64::try_from(latency.as_micros()).unwrap();
let delta_usec = clock_delta.as_micros();
let predict_dts = (now.0 - latency_usec).checked_add_signed(-delta_usec).unwrap();
let predict_diff = predict_dts as i64 - packet.dts.0 as i64;
let predict_diff = predict_dts as i64 - packet.header().dts.0 as i64;
self.stats.set_predict_offset(predict_diff)
}
}
@ -209,10 +207,10 @@ impl Receiver {
// expand queue to make space for new packet
if let Some(back) = self.queue.back() {
if packet.seq > back.seq {
if packet.header().seq > back.seq {
// extend queue from back to make space for new packet
// this also allows for out of order packets
for seq in (back.seq + 1)..=packet.seq {
for seq in (back.seq + 1)..=packet.header().seq {
self.queue.push_back(QueueEntry {
seq,
pts: None,
@ -224,7 +222,7 @@ impl Receiver {
} else {
// queue is empty, insert missing packet slot for the packet we are about to receive
self.queue.push_back(QueueEntry {
seq: packet.seq,
seq: packet.header().seq,
pts: None,
consumed: SampleDuration::zero(),
packet: None,
@ -234,12 +232,12 @@ impl Receiver {
// INVARIANT: at this point queue is non-empty and contains an
// allocated slot for the packet we just received
let front_seq = self.queue.front().unwrap().seq;
let idx_for_packet = (packet.seq - front_seq) as usize;
let idx_for_packet = (packet.header().seq - front_seq) as usize;
let slot = self.queue.get_mut(idx_for_packet).unwrap();
assert!(slot.seq == packet.seq);
slot.packet = Some(*packet);
slot.pts = stream.adjust_pts(Timestamp::from_micros_lossy(packet.pts))
assert!(slot.seq == packet.header().seq);
slot.pts = stream.adjust_pts(Timestamp::from_micros_lossy(packet.header().pts));
slot.packet = Some(packet);
}
pub fn fill_stream_buffer(&mut self, mut data: &mut [f32], pts: Timestamp) {
@ -533,28 +531,29 @@ pub fn run(opt: ReceiveOpt) -> Result<(), RunError> {
let socket = Socket::open(opt.socket)
.map_err(RunError::Listen)?;
let protocol = Protocol::new(socket);
crate::thread::set_name("bark/network");
crate::thread::set_realtime_priority();
loop {
let mut packet_raw = [0u8; protocol::MAX_PACKET_SIZE];
let (packet, peer) = protocol.recv_from().map_err(RunError::Socket)?;
let (nbytes, addr) = socket.recv_from(&mut packet_raw)
.map_err(RunError::Socket)?;
match Packet::try_from_bytes_mut(&mut packet_raw[0..nbytes]) {
Some(Packet::Time(time)) => {
if !time.rid.matches(&receiver_id) {
match packet.parse() {
Some(PacketKind::Time(mut time)) => {
if !time.data().rid.matches(&receiver_id) {
// not for us - time packets are usually unicast,
// but there can be multiple receivers on a machine
continue;
}
match time.phase() {
match time.data().phase() {
Some(TimePhase::Broadcast) => {
time.receive_2 = TimestampMicros::now();
time.rid = receiver_id;
socket.send_to(bytemuck::bytes_of(time), addr)
let data = time.data_mut();
data.receive_2 = TimestampMicros::now();
data.rid = receiver_id;
protocol.send_to(time.as_packet(), peer)
.expect("reply to time packet");
}
Some(TimePhase::StreamReply) => {
@ -567,27 +566,20 @@ pub fn run(opt: ReceiveOpt) -> Result<(), RunError> {
}
}
}
Some(Packet::Audio(packet)) => {
Some(PacketKind::Audio(packet)) => {
let mut state = state.lock().unwrap();
state.recv.receive_audio(packet);
}
Some(Packet::StatsRequest(_)) => {
Some(PacketKind::StatsRequest(_)) => {
let state = state.lock().unwrap();
let sid = state.recv.current_session();
let stats = *state.recv.stats();
let sid = state.recv.current_session().unwrap_or(SessionId::zeroed());
let receiver = *state.recv.stats();
drop(state);
let reply = StatsReplyPacket {
magic: protocol::types::MAGIC_STATS_REPLY,
flags: StatsReplyFlags::IS_RECEIVER,
sid: sid.unwrap_or(SessionId::zeroed()),
receiver: stats,
node,
};
let _ = socket.send_to(bytemuck::bytes_of(&reply), addr);
let reply = StatsReply::receiver(sid, receiver, node);
let _ = protocol.send_to(reply.as_packet(), peer);
}
Some(Packet::StatsReply(_)) => {
Some(PacketKind::StatsReply(_)) => {
// ignore
}
None => {

View file

@ -2,6 +2,7 @@ use std::io;
use std::net::{Ipv4Addr, UdpSocket, SocketAddr, SocketAddrV4};
use std::os::fd::AsRawFd;
use derive_more::Display;
use nix::poll::{PollFd, PollFlags};
use socket2::{Domain, Type};
use structopt::StructOpt;
@ -37,7 +38,8 @@ pub struct Socket {
rx: UdpSocket,
}
pub struct PeerId(SocketAddrV4);
#[derive(Clone, Copy, Debug, Display, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct PeerId(SocketAddr);
impl Socket {
pub fn open(opt: SocketOpt) -> Result<Socket, ListenError> {
@ -55,15 +57,16 @@ impl Socket {
}
pub fn broadcast(&self, msg: &[u8]) -> Result<(), io::Error> {
self.send_to(msg, self.multicast.into())
}
pub fn send_to(&self, msg: &[u8], dest: SocketAddr) -> Result<(), io::Error> {
self.tx.send_to(msg, dest)?;
self.tx.send_to(msg, self.multicast)?;
Ok(())
}
pub fn recv_from(&self, buf: &mut [u8]) -> Result<(usize, SocketAddr), io::Error> {
pub fn send_to(&self, msg: &[u8], dest: PeerId) -> Result<(), io::Error> {
self.tx.send_to(msg, dest.0)?;
Ok(())
}
pub fn recv_from(&self, buf: &mut [u8]) -> Result<(usize, PeerId), io::Error> {
let mut poll = [
PollFd::new(self.tx.as_raw_fd(), PollFlags::POLLIN),
PollFd::new(self.rx.as_raw_fd(), PollFlags::POLLIN),
@ -71,13 +74,16 @@ impl Socket {
nix::poll::poll(&mut poll, -1)?;
if poll[0].any() == Some(true) {
self.tx.recv_from(buf)
} else if poll[1].any() == Some(true) {
self.rx.recv_from(buf)
} else {
unreachable!()
}
let (nbytes, addr) =
if poll[0].any() == Some(true) {
self.tx.recv_from(buf)?
} else if poll[1].any() == Some(true) {
self.rx.recv_from(buf)?
} else {
unreachable!("poll returned with no readable sockets");
};
Ok((nbytes, PeerId(addr)))
}
}

View file

@ -3,19 +3,17 @@ pub mod receiver;
pub mod render;
use std::collections::HashMap;
use std::mem::size_of;
use std::net::{SocketAddrV4, SocketAddr};
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::io::Write;
use bytemuck::Zeroable;
use structopt::StructOpt;
use termcolor::BufferedStandardStream;
use crate::protocol;
use crate::protocol::types::{StatsRequestPacket, StatsReplyPacket, StatsReplyFlags};
use crate::socket::{Socket, SocketOpt};
use crate::protocol::Protocol;
use crate::protocol::packet::{StatsRequest, StatsReply, PacketKind};
use crate::protocol::types::StatsReplyFlags;
use crate::socket::{Socket, SocketOpt, PeerId};
use crate::RunError;
use self::render::Padding;
@ -30,48 +28,33 @@ pub fn run(opt: StatsOpt) -> Result<(), RunError> {
let socket = Socket::open(opt.socket)
.map_err(RunError::Listen)?;
let socket = Arc::new(socket);
let protocol = Arc::new(Protocol::new(socket));
// spawn poller thread
std::thread::spawn({
let socket = socket.clone();
let protocol = Arc::clone(&protocol);
move || {
let request = StatsRequest::new();
loop {
let packet = StatsRequestPacket {
magic: protocol::types::MAGIC_STATS_REQ,
flags: 0,
};
let _ = socket.broadcast(bytemuck::bytes_of(&packet));
let _ = protocol.broadcast(request.as_packet());
std::thread::sleep(Duration::from_millis(100));
}
}
});
let mut stats = HashMap::<SocketAddrV4, Entry>::new();
let mut stats = HashMap::<PeerId, Entry>::new();
loop {
let mut reply = StatsReplyPacket::zeroed();
let (nbytes, addr) = socket.recv_from(bytemuck::bytes_of_mut(&mut reply))
.map_err(RunError::Socket)?;
let (reply, peer) = protocol.recv_from().map_err(RunError::Socket)?;
if nbytes != size_of::<StatsReplyPacket>() {
continue;
}
if reply.magic != protocol::types::MAGIC_STATS_REPLY {
continue;
}
let SocketAddr::V4(addr) = addr else {
let Some(PacketKind::StatsReply(reply)) = reply.parse() else {
continue;
};
let prev_entries = stats.len();
let now = Instant::now();
stats.insert(addr, Entry { time: now, packet: Box::new(reply) });
stats.insert(peer, Entry { time: now, reply });
stats.retain(|_, ent| ent.valid_at(now));
let current_entries = stats.len();
@ -83,18 +66,18 @@ pub fn run(opt: StatsOpt) -> Result<(), RunError> {
// write stats for stream sources first
let mut stats = stats.iter().collect::<Vec<_>>();
stats.sort_by_key(|(addr, entry)| (entry.is_receiver(), *addr));
stats.sort_by_key(|(peer, entry)| (entry.is_receiver(), *peer));
let mut padding = Padding::default();
for (addr, entry) in &stats {
render::calculate(&mut padding, &entry.packet, **addr);
for (peer, entry) in &stats {
render::calculate(&mut padding, entry.reply.data(), **peer);
}
for (addr, entry) in &stats {
for (peer, entry) in &stats {
// kill line
kill_line(&mut out);
render::line(&mut out, &padding, &entry.packet, **addr);
render::line(&mut out, &padding, &entry.reply, **peer);
new_line(&mut out);
}
@ -127,12 +110,12 @@ fn new_line(out: &mut BufferedStandardStream) {
struct Entry {
time: Instant,
packet: Box<StatsReplyPacket>,
reply: StatsReply,
}
impl Entry {
pub fn is_receiver(&self) -> bool {
self.packet.flags.contains(StatsReplyFlags::IS_RECEIVER)
self.reply.flags().contains(StatsReplyFlags::IS_RECEIVER)
}
pub fn valid_at(&self, now: Instant) -> bool {

View file

@ -1,31 +1,31 @@
use std::net::SocketAddrV4;
use termcolor::{WriteColor, ColorSpec, Color};
use crate::protocol::packet::StatsReply;
use crate::protocol::types::{StatsReplyPacket, StatsReplyFlags};
use crate::socket::PeerId;
use crate::stats::receiver::{ReceiverStats, StreamStatus};
use crate::stats::node::NodeStats;
#[derive(Default)]
pub struct Padding {
node_width: usize,
addr_width: usize,
peer_width: usize,
}
pub fn calculate(padding: &mut Padding, stats: &StatsReplyPacket, addr: SocketAddrV4) {
pub fn calculate(padding: &mut Padding, stats: &StatsReplyPacket, peer: PeerId) {
let node_width = stats.node.display().len();
let addr_width = addr.to_string().len();
let peer_width = peer.to_string().len();
padding.node_width = std::cmp::max(padding.node_width, node_width);
padding.addr_width = std::cmp::max(padding.node_width, addr_width);
padding.peer_width = std::cmp::max(padding.peer_width, peer_width);
}
pub fn line(out: &mut dyn WriteColor, padding: &Padding, stats: &StatsReplyPacket, addr: SocketAddrV4) {
node(out, padding, &stats.node, addr);
pub fn line(out: &mut dyn WriteColor, padding: &Padding, stats: &StatsReply, peer: PeerId) {
node(out, padding, &stats.data().node, peer);
if stats.flags.contains(StatsReplyFlags::IS_RECEIVER) {
receiver(out, &stats.receiver);
} else if stats.flags.contains(StatsReplyFlags::IS_STREAM) {
if stats.flags().contains(StatsReplyFlags::IS_RECEIVER) {
receiver(out, &stats.data().receiver);
} else if stats.flags().contains(StatsReplyFlags::IS_STREAM) {
let _ = out.set_color(&ColorSpec::new()
.set_fg(Some(Color::White))
.set_bold(true));
@ -34,7 +34,7 @@ pub fn line(out: &mut dyn WriteColor, padding: &Padding, stats: &StatsReplyPacke
}
}
fn node(out: &mut dyn WriteColor, padding: &Padding, node: &NodeStats, addr: SocketAddrV4) {
fn node(out: &mut dyn WriteColor, padding: &Padding, node: &NodeStats, peer: PeerId) {
let _ = out.set_color(&ColorSpec::new()
.set_fg(Some(Color::Blue))
.set_bold(true));
@ -44,7 +44,7 @@ fn node(out: &mut dyn WriteColor, padding: &Padding, node: &NodeStats, addr: Soc
let _ = out.set_color(&ColorSpec::new()
.set_dimmed(true));
let _ = write!(out, "{:<width$} ", addr, width = padding.addr_width);
let _ = write!(out, "{:<width$} ", peer, width = padding.peer_width);
let _ = out.set_color(&ColorSpec::new());
}

View file

@ -1,16 +1,15 @@
use std::sync::Arc;
use std::time::Duration;
use bytemuck::Zeroable;
use cpal::traits::{HostTrait, DeviceTrait, StreamTrait};
use cpal::InputCallbackInfo;
use structopt::StructOpt;
use crate::protocol;
use crate::protocol::types::{Packet, TimestampMicros, AudioPacket, PacketBuffer, TimePacket, TimePacketPadding, SessionId, ReceiverId, TimePhase, StatsReplyPacket, StatsReplyFlags};
use crate::protocol::{self, Protocol};
use crate::protocol::packet::{self, Audio, StatsReply, PacketKind};
use crate::protocol::types::{TimestampMicros, AudioPacketHeader, SessionId, ReceiverId, TimePhase};
use crate::socket::{Socket, SocketOpt};
use crate::stats::node::NodeStats;
use crate::stats::receiver::ReceiverStats;
use crate::time::{SampleDuration, Timestamp};
use crate::util;
use crate::RunError;
@ -49,7 +48,7 @@ pub fn run(opt: StreamOpt) -> Result<(), RunError> {
let socket = Socket::open(opt.socket)
.map_err(RunError::Listen)?;
let socket = Arc::new(socket);
let protocol = Arc::new(Protocol::new(socket));
let delay = Duration::from_millis(opt.delay_ms);
let delay = SampleDuration::from_std_duration_lossy(delay);
@ -57,21 +56,18 @@ pub fn run(opt: StreamOpt) -> Result<(), RunError> {
let sid = SessionId::generate();
let node = NodeStats::get();
let mut packet = AudioPacket {
magic: protocol::types::MAGIC_AUDIO,
flags: 0,
let mut audio_header = AudioPacketHeader {
sid,
seq: 1,
pts: TimestampMicros(0),
dts: TimestampMicros(0),
buffer: PacketBuffer::zeroed(),
};
let mut packet_written = SampleDuration::zero();
let mut audio_buffer = Audio::write();
let stream = device.build_input_stream(&config,
{
let socket = Arc::clone(&socket);
let protocol = Arc::clone(&protocol);
let mut initialized_thread = false;
move |mut data: &[f32], _: &InputCallbackInfo| {
if !initialized_thread {
@ -85,33 +81,35 @@ pub fn run(opt: StreamOpt) -> Result<(), RunError> {
let mut timestamp = Timestamp::now().add(delay);
if packet.pts.0 == 0 {
packet.pts = timestamp.to_micros_lossy();
if audio_header.pts.0 == 0 {
audio_header.pts = timestamp.to_micros_lossy();
}
while data.len() > 0 {
let buffer_offset = packet_written.as_buffer_offset();
let buffer_remaining = packet.buffer.0.len() - buffer_offset;
// write some data to the waiting packet buffer
let written = audio_buffer.write(data);
let copy_count = std::cmp::min(data.len(), buffer_remaining);
let buffer_copy_end = buffer_offset + copy_count;
// advance
timestamp = timestamp.add(written);
data = &data[written.as_buffer_offset()..];
packet.buffer.0[buffer_offset..buffer_copy_end]
.copy_from_slice(&data[0..copy_count]);
// if packet buffer is full, finalize it and send off the packet:
if audio_buffer.valid_length() {
// take packet writer and replace with new
let audio = std::mem::replace(&mut audio_buffer, Audio::write());
data = &data[copy_count..];
packet_written = SampleDuration::from_buffer_offset(buffer_copy_end);
timestamp = timestamp.add(SampleDuration::from_buffer_offset(copy_count));
// finalize packet
let audio_packet = audio.finalize(AudioPacketHeader {
dts: TimestampMicros::now(),
..audio_header
});
if packet_written == SampleDuration::ONE_PACKET {
// packet is full! set dts and send
packet.dts = TimestampMicros::now();
socket.broadcast(bytemuck::bytes_of(&packet)).expect("broadcast");
// send it
protocol.broadcast(audio_packet.as_packet()).expect("broadcast");
// reset rest of packet for next:
packet.seq += 1;
packet.pts = timestamp.to_micros_lossy();
packet_written = SampleDuration::zero();
// reset header for next packet:
audio_header.seq += 1;
audio_header.pts = timestamp.to_micros_lossy();
}
}
@ -119,8 +117,8 @@ pub fn run(opt: StreamOpt) -> Result<(), RunError> {
// callback, the pts we just calculated is valid. if the packet is
// empty, reset the pts to 0. this signals the next callback to set
// pts to the current time when it fires.
if packet_written == SampleDuration::zero() {
packet.pts.0 = 0;
if audio_buffer.length() == SampleDuration::zero() {
audio_header.pts.0 = 0;
}
}
},
@ -135,23 +133,19 @@ pub fn run(opt: StreamOpt) -> Result<(), RunError> {
crate::thread::set_name("bark/clock");
crate::thread::set_realtime_priority();
let socket = Arc::clone(&socket);
let protocol = Arc::clone(&protocol);
move || {
let mut time = packet::Time::allocate();
// set up packet
let data = time.data_mut();
data.sid = sid;
data.rid = ReceiverId::broadcast();
loop {
let now = TimestampMicros::now();
time.data_mut().stream_1 = TimestampMicros::now();
let packet = TimePacket {
magic: protocol::types::MAGIC_TIME,
flags: 0,
sid,
rid: ReceiverId::broadcast(),
stream_1: now,
receive_2: TimestampMicros(0),
stream_3: TimestampMicros(0),
_pad: TimePacketPadding::zeroed(),
};
socket.broadcast(bytemuck::bytes_of(&packet))
protocol.broadcast(time.as_packet())
.expect("broadcast time");
std::thread::sleep(Duration::from_millis(200));
@ -165,32 +159,29 @@ pub fn run(opt: StreamOpt) -> Result<(), RunError> {
crate::thread::set_realtime_priority();
loop {
let mut packet_raw = [0u8; protocol::MAX_PACKET_SIZE];
let (packet, peer) = protocol.recv_from().expect("protocol.recv_from");
let (nbytes, addr) = socket.recv_from(&mut packet_raw)
.expect("socket.recv_from");
match Packet::try_from_bytes_mut(&mut packet_raw[0..nbytes]) {
Some(Packet::Audio(packet)) => {
match packet.parse() {
Some(PacketKind::Audio(audio)) => {
// we should only ever receive an audio packet if another
// stream is present. check if it should take over
if packet.sid > sid {
eprintln!("Another stream has taken over from {addr}, exiting");
if audio.header().sid > sid {
eprintln!("Peer {peer} has taken over stream, exiting");
break;
}
}
Some(Packet::Time(packet)) => {
Some(PacketKind::Time(mut time)) => {
// only handle packet if it belongs to our stream:
if packet.sid != sid {
if time.data().sid != sid {
continue;
}
match packet.phase() {
match time.data().phase() {
Some(TimePhase::ReceiverReply) => {
packet.stream_3 = TimestampMicros::now();
time.data_mut().stream_3 = TimestampMicros::now();
socket.send_to(bytemuck::bytes_of(packet), addr)
.expect("socket.send responding to time packet");
protocol.send_to(time.as_packet(), peer)
.expect("protocol.send_to responding to time packet");
}
_ => {
// any other packet here must be destined for
@ -199,18 +190,11 @@ pub fn run(opt: StreamOpt) -> Result<(), RunError> {
}
}
Some(Packet::StatsRequest(_)) => {
let reply = StatsReplyPacket {
magic: protocol::types::MAGIC_STATS_REPLY,
flags: StatsReplyFlags::IS_STREAM,
sid: sid,
receiver: ReceiverStats::zeroed(),
node,
};
let _ = socket.send_to(bytemuck::bytes_of(&reply), addr);
Some(PacketKind::StatsRequest(_)) => {
let reply = StatsReply::source(sid, node);
let _ = protocol.send_to(reply.as_packet(), peer);
}
Some(Packet::StatsReply(_)) => {
Some(PacketKind::StatsReply(_)) => {
// ignore
}
None => {

View file

@ -1,5 +1,6 @@
use crate::protocol;
use crate::protocol::types::{TimestampMicros, TimePacket};
use crate::protocol::packet;
use crate::protocol::types::TimestampMicros;
/// A timestamp with implicit denominator SAMPLE_RATE
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
@ -105,16 +106,18 @@ impl ClockDelta {
}
/// Calculates clock difference between machines based on a complete TimePacket
pub fn from_time_packet(packet: &TimePacket) -> ClockDelta {
pub fn from_time_packet(packet: &packet::Time) -> ClockDelta {
let time = packet.data();
// all fields should be non-zero here, it's a programming error if
// they're not.
assert!(packet.stream_1.0 != 0);
assert!(packet.receive_2.0 != 0);
assert!(packet.stream_3.0 != 0);
assert!(time.stream_1.0 != 0);
assert!(time.receive_2.0 != 0);
assert!(time.stream_3.0 != 0);
let t1_usec = packet.stream_1.0 as i64;
let t2_usec = packet.receive_2.0 as i64;
let t3_usec = packet.stream_3.0 as i64;
let t1_usec = time.stream_1.0 as i64;
let t2_usec = time.receive_2.0 as i64;
let t3_usec = time.stream_3.0 as i64;
// algorithm from the Precision Time Protocol page on Wikipedia
ClockDelta((t2_usec - t1_usec + t2_usec - t3_usec) / 2)