split into protocol and main crate

This commit is contained in:
Hailey Somerville 2023-09-14 17:11:00 +10:00
parent 122927fa47
commit 13e072d67c
25 changed files with 299 additions and 219 deletions

11
Cargo.lock generated
View file

@ -63,6 +63,7 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
name = "bark"
version = "0.1.0"
dependencies = [
"bark-protocol",
"bitflags 2.4.0",
"bytemuck",
"cpal",
@ -80,6 +81,16 @@ dependencies = [
"xdg",
]
[[package]]
name = "bark-protocol"
version = "0.1.0"
dependencies = [
"bitflags 2.4.0",
"bytemuck",
"derive_more",
"rand",
]
[[package]]
name = "bindgen"
version = "0.64.0"

View file

@ -1,23 +1,14 @@
[package]
name = "bark"
version = "0.1.0"
edition = "2021"
[workspace]
resolver = "2"
members = [
"bark",
"bark-protocol",
]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[workspace.dependencies]
bark-protocol = { path = "bark-protocol" }
[dependencies]
bitflags = { version = "2.4.0", features = ["bytemuck"] }
bytemuck = { version = "1.13.1", features = ["derive", "extern_crate_alloc"] }
cpal = "0.15.2"
derive_more = "0.99.17"
libc = "0.2.147"
nix = { version = "0.26.2", features = ["time", "socket", "net", "poll", "user", "hostname"], default-features = false }
rand = "0.8.5"
serde = { version = "1.0.183", features = ["derive"] }
serde_json = "1.0.105"
socket2 = "0.5.3"
static_assertions = "1.1.0"
structopt = "0.3.26"
termcolor = "1.2.0"
toml = "0.7.6"
xdg = "2.5.2"
bytemuck = { version = "1.13", features = ["derive"] }
derive_more = { version = "0.99" }
rand = { version = "0.8.5", no-default-features = true }

10
bark-protocol/Cargo.toml Normal file
View file

@ -0,0 +1,10 @@
[package]
name = "bark-protocol"
version = "0.1.0"
edition = "2021"
[dependencies]
bitflags = { workspace = true }
bytemuck = { workspace = true }
derive_more = { workspace = true }
rand = { workspace = true }

62
bark-protocol/src/lib.rs Normal file
View file

@ -0,0 +1,62 @@
pub mod time;
pub mod types;
pub mod packet;
pub const SAMPLE_RATE: SampleRate = SampleRate(48000);
pub const CHANNELS: ChannelCount = ChannelCount(2);
pub const FRAMES_PER_PACKET: usize = 160;
pub const SAMPLES_PER_PACKET: usize = CHANNELS.0 as usize * FRAMES_PER_PACKET;
#[derive(Copy, Clone, Debug)]
pub struct SampleRate(pub u32);
#[derive(Copy, Clone, Debug)]
pub struct ChannelCount(pub u16);
impl From<SampleRate> for usize {
fn from(value: SampleRate) -> Self {
value.0.try_into().expect("SampleRate -> usize")
}
}
impl From<SampleRate> for u32 {
fn from(value: SampleRate) -> Self {
value.0.into()
}
}
impl From<SampleRate> for u64 {
fn from(value: SampleRate) -> Self {
value.0.into()
}
}
impl From<SampleRate> for u128 {
fn from(value: SampleRate) -> Self {
value.0.into()
}
}
impl From<SampleRate> for i64 {
fn from(value: SampleRate) -> Self {
value.0.into()
}
}
impl From<ChannelCount> for usize {
fn from(value: ChannelCount) -> Self {
value.0.into()
}
}
impl From<ChannelCount> for u64 {
fn from(value: ChannelCount) -> Self {
value.0.into()
}
}
impl From<ChannelCount> for u32 {
fn from(value: ChannelCount) -> Self {
value.0.into()
}
}

View file

@ -1,12 +1,11 @@
use std::mem::size_of;
use bytemuck::Zeroable;
pub use cpal::{SampleFormat, SampleRate, ChannelCount};
use crate::stats::node::NodeStats;
use crate::stats::receiver::ReceiverStats;
use crate::types::stats::node::NodeStats;
use crate::types::stats::receiver::ReceiverStats;
use crate::types::{self, Magic};
use crate::time::SampleDuration;
use crate::protocol::types::{self, Magic};
use super::types::{AudioPacketHeader, StatsReplyFlags, SessionId};

View file

@ -1,21 +1,15 @@
use crate::protocol;
use crate::protocol::packet;
use crate::protocol::types::TimestampMicros;
use crate::packet;
use crate::types::{TimestampMicros};
use crate::{SAMPLE_RATE, FRAMES_PER_PACKET, CHANNELS};
/// A timestamp with implicit denominator SAMPLE_RATE
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct Timestamp(u64);
impl Timestamp {
pub fn now() -> Timestamp {
Timestamp::from_micros_lossy(TimestampMicros::now())
}
}
impl Timestamp {
pub fn to_micros_lossy(&self) -> TimestampMicros {
let ts = u128::from(self.0);
let micros = (ts * 1_000_000) / u128::from(protocol::SAMPLE_RATE.0);
let micros = (ts * 1_000_000) / u128::from(SAMPLE_RATE.0);
let micros = u64::try_from(micros)
.expect("can't narrow timestamp to u64");
TimestampMicros(micros)
@ -23,7 +17,7 @@ impl Timestamp {
pub fn from_micros_lossy(micros: TimestampMicros) -> Timestamp {
let micros = u128::from(micros.0);
let ts = (micros * u128::from(protocol::SAMPLE_RATE.0)) / 1_000_000;
let ts = (micros * u128::from(SAMPLE_RATE.0)) / 1_000_000;
let ts = u64::try_from(ts)
.expect("can't narrow timestamp to u64");
Timestamp(ts)
@ -53,7 +47,7 @@ impl Timestamp {
pub struct SampleDuration(u64);
impl SampleDuration {
pub const ONE_PACKET: SampleDuration = SampleDuration::from_frame_count(protocol::FRAMES_PER_PACKET as u64);
pub const ONE_PACKET: SampleDuration = SampleDuration::from_frame_count(FRAMES_PER_PACKET as u64);
pub const fn zero() -> Self {
SampleDuration(0)
@ -64,24 +58,24 @@ impl SampleDuration {
}
pub fn from_std_duration_lossy(duration: std::time::Duration) -> SampleDuration {
let duration = (duration.as_micros() * u128::from(protocol::SAMPLE_RATE.0)) / 1_000_000;
let duration = (duration.as_micros() * u128::from(SAMPLE_RATE)) / 1_000_000;
let duration = u64::try_from(duration).expect("can't narrow duration to u64");
SampleDuration(duration)
}
pub fn to_std_duration_lossy(&self) -> std::time::Duration {
let usecs = (u128::from(self.0) * 1_000_000) / u128::from(protocol::SAMPLE_RATE.0);
let usecs = (u128::from(self.0) * 1_000_000) / u128::from(SAMPLE_RATE);
let usecs = u64::try_from(usecs).expect("can't narrow usecs to u64");
std::time::Duration::from_micros(usecs)
}
pub fn as_buffer_offset(&self) -> usize {
let offset = self.0 * u64::from(protocol::CHANNELS);
let offset = self.0 * u64::from(CHANNELS);
usize::try_from(offset).unwrap()
}
pub fn from_buffer_offset(offset: usize) -> Self {
let channels = usize::from(protocol::CHANNELS);
let channels = usize::from(CHANNELS);
assert!(offset % channels == 0);
SampleDuration(u64::try_from(offset / channels).unwrap())
@ -130,7 +124,7 @@ pub struct TimestampDelta(i64);
impl TimestampDelta {
pub fn from_clock_delta_lossy(delta: ClockDelta) -> TimestampDelta {
TimestampDelta((delta.0 * i64::from(protocol::SAMPLE_RATE.0)) / 1_000_000)
TimestampDelta((delta.0 * i64::from(SAMPLE_RATE.0)) / 1_000_000)
}
pub fn abs(&self) -> SampleDuration {

View file

@ -1,9 +1,8 @@
use bytemuck::{Pod, Zeroable};
use nix::time::ClockId;
use nix::sys::time::TimeValLike;
use crate::stats;
use crate::protocol;
pub mod stats;
use crate::SAMPLES_PER_PACKET;
#[derive(Debug, Clone, Copy, Zeroable, Pod, PartialEq, Eq)]
#[repr(transparent)]
@ -50,7 +49,7 @@ pub struct AudioPacketHeader {
pub dts: TimestampMicros,
}
pub type AudioPacketBuffer = [f32; protocol::SAMPLES_PER_PACKET];
pub type AudioPacketBuffer = [f32; SAMPLES_PER_PACKET];
#[derive(Debug, Clone, Copy, Zeroable, Pod)]
#[repr(C)]
@ -120,21 +119,9 @@ bitflags::bitflags! {
#[repr(transparent)]
pub struct TimestampMicros(pub u64);
impl TimestampMicros {
pub fn now() -> TimestampMicros {
let timespec = nix::time::clock_gettime(ClockId::CLOCK_BOOTTIME)
.expect("clock_gettime(CLOCK_BOOTTIME) failed, are we on Linux?");
let micros = u64::try_from(timespec.num_microseconds())
.expect("cannot convert i64 time value to u64");
TimestampMicros(micros)
}
}
#[derive(Debug, Clone, Copy, Zeroable, Pod)]
#[repr(transparent)]
pub struct ReceiverId(u64);
pub struct ReceiverId(pub u64);
impl ReceiverId {
pub fn broadcast() -> Self {
@ -148,21 +135,8 @@ impl ReceiverId {
pub fn matches(&self, this: &ReceiverId) -> bool {
self.is_broadcast() || self.0 == this.0
}
pub fn generate() -> Self {
ReceiverId(rand::random())
}
}
#[derive(Debug, Clone, Copy, Zeroable, Pod, PartialEq, PartialOrd)]
#[repr(transparent)]
pub struct SessionId(i64);
impl SessionId {
pub fn generate() -> Self {
let timespec = nix::time::clock_gettime(ClockId::CLOCK_REALTIME)
.expect("clock_gettime(CLOCK_REALTIME)");
SessionId(timespec.num_microseconds())
}
}
pub struct SessionId(pub i64);

View file

@ -0,0 +1,2 @@
pub mod node;
pub mod receiver;

View file

@ -0,0 +1,8 @@
use bytemuck::{Zeroable, Pod};
#[derive(Debug, Clone, Copy, Zeroable, Pod)]
#[repr(C)]
pub struct NodeStats {
pub username: [u8; 32],
pub hostname: [u8; 32],
}

View file

@ -1,13 +1,14 @@
use std::time::Duration;
use bitflags::bitflags;
use bytemuck::{Zeroable, Pod};
use crate::time::{Timestamp, SampleDuration};
use crate::time::{SampleDuration, Timestamp};
#[derive(Debug, Clone, Copy, Zeroable, Pod)]
#[repr(C)]
pub struct ReceiverStats {
flags: Flags,
flags: ReceiverStatsFlags,
stream_status: u8,
_pad: [u8; 6],
@ -45,10 +46,10 @@ impl StreamStatus {
}
}
bitflags::bitflags! {
bitflags! {
#[derive(Debug, Clone, Copy, Zeroable, Pod)]
#[repr(transparent)]
pub struct Flags: u8 {
pub struct ReceiverStatsFlags: u8 {
const HAS_AUDIO_LATENCY = 0x04;
const HAS_BUFFER_LENGTH = 0x08;
const HAS_NETWORK_LATENCY = 0x10;
@ -71,10 +72,10 @@ impl ReceiverStats {
pub fn clear(&mut self) {
self.set_stream(StreamStatus::Seek);
self.flags = Flags::empty();
self.flags = ReceiverStatsFlags::empty();
}
fn field(&self, flag: Flags, value: f64) -> Option<f64> {
fn field(&self, flag: ReceiverStatsFlags, value: f64) -> Option<f64> {
if self.flags.contains(flag) {
Some(value)
} else {
@ -84,22 +85,22 @@ impl ReceiverStats {
/// Audio latency in seconds
pub fn audio_latency(&self) -> Option<f64> {
self.field(Flags::HAS_AUDIO_LATENCY, self.audio_latency)
self.field(ReceiverStatsFlags::HAS_AUDIO_LATENCY, self.audio_latency)
}
/// Duration of buffered audio in seconds
pub fn buffer_length(&self) -> Option<f64> {
self.field(Flags::HAS_BUFFER_LENGTH, self.buffer_length)
self.field(ReceiverStatsFlags::HAS_BUFFER_LENGTH, self.buffer_length)
}
/// Duration of buffered audio in seconds
pub fn network_latency(&self) -> Option<f64> {
self.field(Flags::HAS_NETWORK_LATENCY, self.network_latency)
self.field(ReceiverStatsFlags::HAS_NETWORK_LATENCY, self.network_latency)
}
/// Running prediction offset in seconds
pub fn predict_offset(&self) -> Option<f64> {
self.field(Flags::HAS_PREDICT_OFFSET, self.predict_offset)
self.field(ReceiverStatsFlags::HAS_PREDICT_OFFSET, self.predict_offset)
}
pub fn set_audio_latency(&mut self, request_pts: Timestamp, packet_pts: Timestamp) {
@ -107,21 +108,21 @@ impl ReceiverStats {
let packet_micros = packet_pts.to_micros_lossy().0 as f64;
self.audio_latency = (request_micros - packet_micros) / 1_000_000.0;
self.flags.insert(Flags::HAS_AUDIO_LATENCY);
self.flags.insert(ReceiverStatsFlags::HAS_AUDIO_LATENCY);
}
pub fn set_buffer_length(&mut self, length: SampleDuration) {
self.buffer_length = length.to_std_duration_lossy().as_micros() as f64 / 1_000_000.0;
self.flags.insert(Flags::HAS_BUFFER_LENGTH);
self.flags.insert(ReceiverStatsFlags::HAS_BUFFER_LENGTH);
}
pub fn set_network_latency(&mut self, latency: Duration) {
self.network_latency = latency.as_micros() as f64 / 1_000_000.0;
self.flags.insert(Flags::HAS_NETWORK_LATENCY);
self.flags.insert(ReceiverStatsFlags::HAS_NETWORK_LATENCY);
}
pub fn set_predict_offset(&mut self, diff_usec: i64) {
self.predict_offset = diff_usec as f64 / 1_000_000.0;
self.flags.insert(Flags::HAS_PREDICT_OFFSET);
self.flags.insert(ReceiverStatsFlags::HAS_PREDICT_OFFSET);
}
}

25
bark/Cargo.toml Normal file
View file

@ -0,0 +1,25 @@
[package]
name = "bark"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
bark-protocol = { workspace = true }
bitflags = { workspace = true }
bytemuck = { workspace = true, features = ["extern_crate_alloc"] }
cpal = "0.15.2"
derive_more = { workspace = true }
libc = "0.2.147"
nix = { version = "0.26.2", features = ["time", "socket", "net", "poll", "user", "hostname"], default-features = false }
rand = { workspace = true, features = ["default"] }
serde = { version = "1.0.183", features = ["derive"] }
serde_json = "1.0.105"
socket2 = "0.5.3"
static_assertions = "1.1.0"
structopt = "0.3.26"
termcolor = "1.2.0"
toml = "0.7.6"
xdg = "2.5.2"

View file

@ -1,6 +1,5 @@
mod audio;
mod config;
mod protocol;
mod receive;
mod resample;
mod socket;

View file

@ -4,19 +4,19 @@ use std::sync::{Arc, Mutex};
use std::time::Duration;
use bytemuck::Zeroable;
use cpal::{SampleRate, OutputCallbackInfo};
use cpal::OutputCallbackInfo;
use cpal::traits::{HostTrait, DeviceTrait};
use structopt::StructOpt;
use crate::protocol::{self, Protocol};
use crate::protocol::packet::{Audio, Time, PacketKind, StatsReply};
use crate::protocol::types::{TimestampMicros, SessionId, ReceiverId, TimePhase};
use bark_protocol::SampleRate;
use bark_protocol::time::{Timestamp, SampleDuration, TimestampDelta, ClockDelta};
use bark_protocol::types::{SessionId, ReceiverId, TimePhase};
use bark_protocol::types::stats::receiver::{ReceiverStats, StreamStatus};
use bark_protocol::packet::{Audio, Time, PacketKind, StatsReply};
use crate::resample::Resampler;
use crate::socket::{Socket, SocketOpt};
use crate::stats::node::NodeStats;
use crate::stats::receiver::{ReceiverStats, StreamStatus};
use crate::time::{Timestamp, SampleDuration, TimestampDelta, ClockDelta};
use crate::util;
use crate::socket::{ProtocolSocket, Socket, SocketOpt};
use crate::{util, time, stats};
use crate::RunError;
pub struct Receiver {
@ -37,7 +37,7 @@ impl QueueEntry {
pub fn as_full_buffer(&self) -> &[f32] {
self.packet.as_ref()
.map(|packet| packet.buffer())
.unwrap_or(&[0f32; protocol::SAMPLES_PER_PACKET])
.unwrap_or(&[0f32; bark_protocol::SAMPLES_PER_PACKET])
}
}
@ -181,7 +181,7 @@ impl Receiver {
}
pub fn receive_audio(&mut self, packet: Audio) {
let now = TimestampMicros::now();
let now = time::now();
if !self.prepare_stream(&packet) {
return;
@ -391,7 +391,7 @@ impl RateAdjust {
}
pub fn sample_rate(&mut self, timing: Timing) -> SampleRate {
self.adjusted_rate(timing).unwrap_or(protocol::SAMPLE_RATE)
self.adjusted_rate(timing).unwrap_or(bark_protocol::SAMPLE_RATE)
}
fn adjusted_rate(&mut self, timing: Timing) -> Option<SampleRate> {
@ -416,7 +416,7 @@ impl RateAdjust {
}
let slew_duration_duration = i64::try_from(slew_target_duration.as_micros()).unwrap();
let base_sample_rate = i64::from(protocol::SAMPLE_RATE.0);
let base_sample_rate = i64::from(bark_protocol::SAMPLE_RATE);
let rate_offset = frame_offset.as_frames() * 1_000_000 / slew_duration_duration;
let rate = base_sample_rate + rate_offset;
@ -474,8 +474,8 @@ pub struct ReceiveOpt {
}
pub fn run(opt: ReceiveOpt) -> Result<(), RunError> {
let receiver_id = ReceiverId::generate();
let node = NodeStats::get();
let receiver_id = generate_receiver_id();
let node = stats::node::get();
if let Some(device) = &opt.device {
crate::audio::set_sink_env(device);
@ -515,7 +515,7 @@ pub fn run(opt: ReceiveOpt) -> Result<(), RunError> {
let output_latency = SampleDuration::from_std_duration_lossy(output_latency);
let now = Timestamp::now();
let now = Timestamp::from_micros_lossy(time::now());
let pts = now.add(output_latency);
let mut state = state.lock().unwrap();
@ -531,7 +531,7 @@ pub fn run(opt: ReceiveOpt) -> Result<(), RunError> {
let socket = Socket::open(opt.socket)
.map_err(RunError::Listen)?;
let protocol = Protocol::new(socket);
let protocol = ProtocolSocket::new(socket);
crate::thread::set_name("bark/network");
crate::thread::set_realtime_priority();
@ -550,7 +550,7 @@ pub fn run(opt: ReceiveOpt) -> Result<(), RunError> {
match time.data().phase() {
Some(TimePhase::Broadcast) => {
let data = time.data_mut();
data.receive_2 = TimestampMicros::now();
data.receive_2 = time::now();
data.rid = receiver_id;
protocol.send_to(time.as_packet(), peer)
@ -588,3 +588,7 @@ pub fn run(opt: ReceiveOpt) -> Result<(), RunError> {
}
}
}
pub fn generate_receiver_id() -> ReceiverId {
ReceiverId(rand::random())
}

View file

@ -2,8 +2,7 @@ use std::ffi::{c_void, c_int, CStr};
use std::fmt::Debug;
use std::ptr;
use crate::protocol;
use crate::time::SampleDuration;
use bark_protocol::time::SampleDuration;
use self::ffi::speex_resampler_strerror;
@ -59,9 +58,9 @@ impl Resampler {
let ptr = unsafe {
ffi::speex_resampler_init(
u32::from(protocol::CHANNELS),
protocol::SAMPLE_RATE.0,
protocol::SAMPLE_RATE.0,
bark_protocol::CHANNELS.into(),
bark_protocol::SAMPLE_RATE.into(),
bark_protocol::SAMPLE_RATE.into(),
10,
&mut err
)
@ -83,7 +82,7 @@ impl Resampler {
ffi::speex_resampler_set_rate(
self.ptr.0,
rate,
protocol::SAMPLE_RATE.0,
bark_protocol::SAMPLE_RATE.into(),
)
};
@ -98,8 +97,8 @@ impl Resampler {
-> Result<ProcessResult, SpeexError>
{
// speex API takes frame count:
let input_len = input.len() / usize::from(protocol::CHANNELS);
let output_len = output.len() / usize::from(protocol::CHANNELS);
let input_len = input.len() / usize::from(bark_protocol::CHANNELS);
let output_len = output.len() / usize::from(bark_protocol::CHANNELS);
// usize could technically be 64 bit, speex only takes u32 sizes,
// we don't want to panic or truncate, so let's just pick a reasonable

View file

@ -7,6 +7,9 @@ use nix::poll::{PollFd, PollFlags};
use socket2::{Domain, Type};
use structopt::StructOpt;
use bark_protocol::packet::Packet;
use bark_protocol::packet::PacketBuffer;
// expedited forwarding - IP header field indicating that switches should
// prioritise our packets for minimal delay
const IPTOS_DSCP_EF: u32 = 0xb8;
@ -115,3 +118,34 @@ fn bind_socket(bind: SocketAddrV4) -> Result<socket2::Socket, ListenError> {
Ok(socket)
}
pub struct ProtocolSocket {
socket: Socket,
}
impl ProtocolSocket {
pub fn new(socket: Socket) -> Self {
ProtocolSocket { socket }
}
pub fn broadcast(&self, packet: &Packet) -> Result<(), io::Error> {
self.socket.broadcast(packet.as_buffer().as_bytes())
}
pub fn send_to(&self, packet: &Packet, peer: PeerId) -> Result<(), io::Error> {
self.socket.send_to(packet.as_buffer().as_bytes(), peer)
}
pub fn recv_from(&self) -> Result<(Packet, PeerId), io::Error> {
loop {
let mut buffer = PacketBuffer::allocate();
let (nbytes, peer) = self.socket.recv_from(buffer.as_full_buffer_mut())?;
buffer.set_len(nbytes);
if let Some(packet) = Packet::from_buffer(buffer) {
return Ok((packet, peer));
}
}
}
}

View file

@ -1,5 +1,4 @@
pub mod node;
pub mod receiver;
pub mod render;
use std::collections::HashMap;
@ -10,10 +9,10 @@ use std::io::Write;
use structopt::StructOpt;
use termcolor::BufferedStandardStream;
use crate::protocol::Protocol;
use crate::protocol::packet::{StatsRequest, StatsReply, PacketKind};
use crate::protocol::types::StatsReplyFlags;
use crate::socket::{Socket, SocketOpt, PeerId};
use bark_protocol::packet::{StatsRequest, StatsReply, PacketKind};
use bark_protocol::types::StatsReplyFlags;
use crate::socket::{Socket, SocketOpt, PeerId, ProtocolSocket};
use crate::RunError;
use self::render::Padding;
@ -28,7 +27,7 @@ pub fn run(opt: StatsOpt) -> Result<(), RunError> {
let socket = Socket::open(opt.socket)
.map_err(RunError::Listen)?;
let protocol = Arc::new(Protocol::new(socket));
let protocol = Arc::new(ProtocolSocket::new(socket));
// spawn poller thread
std::thread::spawn({

View file

@ -1,28 +1,19 @@
use bytemuck::{Zeroable, Pod};
use bark_protocol::types::stats::node::NodeStats;
#[derive(Debug, Clone, Copy, Zeroable, Pod)]
#[repr(C)]
pub struct NodeStats {
pub username: [u8; 32],
pub hostname: [u8; 32],
pub fn get() -> NodeStats {
let username = get_username();
let hostname = get_hostname();
NodeStats {
username: as_fixed(&username),
hostname: as_fixed(&hostname),
}
}
impl NodeStats {
pub fn get() -> Self {
let username = get_username();
let hostname = get_hostname();
NodeStats {
username: as_fixed(&username),
hostname: as_fixed(&hostname),
}
}
pub fn display(&self) -> String {
let username = from_fixed(&self.username);
let hostname = from_fixed(&self.hostname);
format!("{username}@{hostname}")
}
pub fn display(stats: &NodeStats) -> String {
let username = from_fixed(&stats.username);
let hostname = from_fixed(&stats.hostname);
format!("{username}@{hostname}")
}
fn from_fixed(bytes: &[u8]) -> &str {

View file

@ -1,10 +1,12 @@
use termcolor::{WriteColor, ColorSpec, Color};
use crate::protocol::packet::StatsReply;
use crate::protocol::types::{StatsReplyPacket, StatsReplyFlags};
use bark_protocol::packet::StatsReply;
use bark_protocol::types::{StatsReplyPacket, StatsReplyFlags};
use bark_protocol::types::stats::receiver::{ReceiverStats, StreamStatus};
use bark_protocol::types::stats::node::NodeStats;
use crate::socket::PeerId;
use crate::stats::receiver::{ReceiverStats, StreamStatus};
use crate::stats::node::NodeStats;
use super::node;
#[derive(Default)]
pub struct Padding {
@ -13,7 +15,7 @@ pub struct Padding {
}
pub fn calculate(padding: &mut Padding, stats: &StatsReplyPacket, peer: PeerId) {
let node_width = stats.node.display().len();
let node_width = node::display(&stats.node).len();
let peer_width = peer.to_string().len();
padding.node_width = std::cmp::max(padding.node_width, node_width);
@ -39,7 +41,7 @@ fn node(out: &mut dyn WriteColor, padding: &Padding, node: &NodeStats, peer: Pee
.set_fg(Some(Color::Blue))
.set_bold(true));
let _ = write!(out, "{:<width$} ", node.display(), width = padding.node_width);
let _ = write!(out, "{:<width$} ", node::display(node), width = padding.node_width);
let _ = out.set_color(&ColorSpec::new()
.set_dimmed(true));

View file

@ -5,13 +5,12 @@ use cpal::traits::{HostTrait, DeviceTrait, StreamTrait};
use cpal::InputCallbackInfo;
use structopt::StructOpt;
use crate::protocol::{self, Protocol};
use crate::protocol::packet::{self, Audio, StatsReply, PacketKind};
use crate::protocol::types::{TimestampMicros, AudioPacketHeader, SessionId, ReceiverId, TimePhase};
use crate::socket::{Socket, SocketOpt};
use crate::stats::node::NodeStats;
use crate::time::{SampleDuration, Timestamp};
use crate::util;
use bark_protocol::time::{SampleDuration, Timestamp};
use bark_protocol::packet::{self, Audio, StatsReply, PacketKind};
use bark_protocol::types::{TimestampMicros, AudioPacketHeader, SessionId, ReceiverId, TimePhase};
use crate::socket::{Socket, SocketOpt, ProtocolSocket};
use crate::{util, stats, time};
use crate::RunError;
#[derive(StructOpt)]
@ -48,13 +47,13 @@ pub fn run(opt: StreamOpt) -> Result<(), RunError> {
let socket = Socket::open(opt.socket)
.map_err(RunError::Listen)?;
let protocol = Arc::new(Protocol::new(socket));
let protocol = Arc::new(ProtocolSocket::new(socket));
let delay = Duration::from_millis(opt.delay_ms);
let delay = SampleDuration::from_std_duration_lossy(delay);
let sid = SessionId::generate();
let node = NodeStats::get();
let sid = generate_session_id();
let node = stats::node::get();
let mut audio_header = AudioPacketHeader {
sid,
@ -77,9 +76,9 @@ pub fn run(opt: StreamOpt) -> Result<(), RunError> {
}
// assert data only contains complete frames:
assert!(data.len() % usize::from(protocol::CHANNELS) == 0);
assert!(data.len() % usize::from(bark_protocol::CHANNELS) == 0);
let mut timestamp = Timestamp::now().add(delay);
let mut timestamp = Timestamp::from_micros_lossy(time::now()).add(delay);
if audio_header.pts.0 == 0 {
audio_header.pts = timestamp.to_micros_lossy();
@ -100,7 +99,7 @@ pub fn run(opt: StreamOpt) -> Result<(), RunError> {
// finalize packet
let audio_packet = audio.finalize(AudioPacketHeader {
dts: TimestampMicros::now(),
dts: time::now(),
..audio_header
});
@ -143,7 +142,7 @@ pub fn run(opt: StreamOpt) -> Result<(), RunError> {
data.rid = ReceiverId::broadcast();
loop {
time.data_mut().stream_1 = TimestampMicros::now();
time.data_mut().stream_1 = time::now();
protocol.broadcast(time.as_packet())
.expect("broadcast time");
@ -178,7 +177,7 @@ pub fn run(opt: StreamOpt) -> Result<(), RunError> {
match time.data().phase() {
Some(TimePhase::ReceiverReply) => {
time.data_mut().stream_3 = TimestampMicros::now();
time.data_mut().stream_3 = time::now();
protocol.send_to(time.as_packet(), peer)
.expect("protocol.send_to responding to time packet");
@ -205,3 +204,12 @@ pub fn run(opt: StreamOpt) -> Result<(), RunError> {
Ok(())
}
pub fn generate_session_id() -> SessionId {
use nix::sys::time::TimeValLike;
let timespec = nix::time::clock_gettime(nix::time::ClockId::CLOCK_REALTIME)
.expect("clock_gettime(CLOCK_REALTIME)");
SessionId(timespec.num_microseconds())
}

14
bark/src/time.rs Normal file
View file

@ -0,0 +1,14 @@
use nix::sys::time::TimeValLike;
use nix::time::ClockId;
use bark_protocol::types::TimestampMicros;
pub fn now() -> TimestampMicros {
let timespec = nix::time::clock_gettime(ClockId::CLOCK_BOOTTIME)
.expect("clock_gettime(CLOCK_BOOTTIME) failed, are we on Linux?");
let micros = u64::try_from(timespec.num_microseconds())
.expect("cannot convert i64 time value to u64");
TimestampMicros(micros)
}

View file

@ -1,31 +1,32 @@
use cpal::{StreamConfig, BufferSize, SupportedBufferSize};
use cpal::{StreamConfig, BufferSize, SupportedBufferSize, SampleFormat};
use cpal::traits::DeviceTrait;
use crate::RunError;
use crate::protocol;
pub const SAMPLE_FORMAT: SampleFormat = SampleFormat::F32;
pub fn config_for_device(device: &cpal::Device) -> Result<StreamConfig, RunError> {
let configs = device.supported_input_configs()
.map_err(RunError::StreamConfigs)?;
let config = configs
.filter(|config| config.sample_format() == protocol::SAMPLE_FORMAT)
.filter(|config| config.channels() == protocol::CHANNELS)
.filter(|config| config.sample_format() == SAMPLE_FORMAT)
.filter(|config| config.channels() == bark_protocol::CHANNELS.0)
.nth(0)
.ok_or(RunError::NoSupportedStreamConfig)?;
let buffer_size = match config.buffer_size() {
SupportedBufferSize::Range { min, .. } => {
std::cmp::max(*min, protocol::FRAMES_PER_PACKET as u32)
std::cmp::max(*min, bark_protocol::FRAMES_PER_PACKET as u32)
}
SupportedBufferSize::Unknown => {
protocol::FRAMES_PER_PACKET as u32
bark_protocol::FRAMES_PER_PACKET as u32
}
};
Ok(StreamConfig {
channels: protocol::CHANNELS,
sample_rate: protocol::SAMPLE_RATE,
channels: bark_protocol::CHANNELS.0,
sample_rate: cpal::SampleRate(bark_protocol::SAMPLE_RATE.0),
buffer_size: BufferSize::Fixed(buffer_size),
})
}

View file

@ -1,48 +0,0 @@
pub mod types;
pub mod packet;
use std::io;
pub use cpal::{SampleFormat, SampleRate, ChannelCount};
pub const SAMPLE_FORMAT: SampleFormat = SampleFormat::F32;
pub const SAMPLE_RATE: SampleRate = SampleRate(48000);
pub const CHANNELS: ChannelCount = 2;
pub const FRAMES_PER_PACKET: usize = 160;
pub const SAMPLES_PER_PACKET: usize = CHANNELS as usize * FRAMES_PER_PACKET;
use crate::socket::{Socket, PeerId};
use crate::protocol::packet::PacketBuffer;
use self::packet::Packet;
pub struct Protocol {
socket: Socket,
}
impl Protocol {
pub fn new(socket: Socket) -> Self {
Protocol { socket }
}
pub fn broadcast(&self, packet: &Packet) -> Result<(), io::Error> {
self.socket.broadcast(packet.as_buffer().as_bytes())
}
pub fn send_to(&self, packet: &Packet, peer: PeerId) -> Result<(), io::Error> {
self.socket.send_to(packet.as_buffer().as_bytes(), peer)
}
pub fn recv_from(&self) -> Result<(Packet, PeerId), io::Error> {
loop {
let mut buffer = PacketBuffer::allocate();
let (nbytes, peer) = self.socket.recv_from(buffer.as_full_buffer_mut())?;
buffer.set_len(nbytes);
if let Some(packet) = Packet::from_buffer(buffer) {
return Ok((packet, peer));
}
}
}
}