Merge branch 'alsa'

This commit is contained in:
Hailey Somerville 2023-12-25 10:50:18 +11:00
commit 305036b53f
21 changed files with 585 additions and 304 deletions

126
Cargo.lock generated
View file

@ -32,6 +32,18 @@ dependencies = [
"nix 0.24.3",
]
[[package]]
name = "alsa"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ce34de545ad29bcc00cb1b87a94c132256dcf83aa7eeb9674482568405a6ff0a"
dependencies = [
"alsa-sys",
"bitflags 2.4.0",
"libc",
"nix 0.26.2",
]
[[package]]
name = "alsa-sys"
version = "0.3.1"
@ -81,13 +93,22 @@ dependencies = [
"stable_deref_trait",
]
[[package]]
name = "atomic-polyfill"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8cf2bce30dfe09ef0bfaef228b9d414faaf7e563035494d7fe092dba54b300f4"
dependencies = [
"critical-section",
]
[[package]]
name = "atty"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8"
dependencies = [
"hermit-abi",
"hermit-abi 0.1.19",
"libc",
"winapi",
]
@ -102,11 +123,14 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
name = "bark"
version = "0.1.0"
dependencies = [
"alsa 0.8.1",
"bark-core",
"bark-protocol",
"bitflags 2.4.0",
"bytemuck",
"cpal",
"derive_more",
"env_logger",
"libc",
"nix 0.26.2",
"rand",
@ -116,10 +140,22 @@ dependencies = [
"static_assertions",
"structopt",
"termcolor",
"thiserror",
"toml",
"xdg",
]
[[package]]
name = "bark-core"
version = "0.1.0"
dependencies = [
"bark-protocol",
"bytemuck",
"derive_more",
"heapless",
"log",
]
[[package]]
name = "bark-protocol"
version = "0.1.0"
@ -236,6 +272,12 @@ dependencies = [
"syn 2.0.28",
]
[[package]]
name = "byteorder"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
[[package]]
name = "bytes"
version = "1.4.0"
@ -426,7 +468,7 @@ version = "0.15.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d959d90e938c5493000514b446987c07aed46c668faaa7d34d6c7a67b1a578c"
dependencies = [
"alsa",
"alsa 0.7.1",
"core-foundation-sys 0.8.4",
"coreaudio-rs",
"dasp_sample",
@ -445,6 +487,12 @@ dependencies = [
"windows 0.46.0",
]
[[package]]
name = "critical-section"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7059fff8937831a9ae6f0fe4d658ffabf58f2ca96aa9dec1c889f936f705f216"
[[package]]
name = "cvt"
version = "0.1.2"
@ -503,6 +551,18 @@ dependencies = [
"which",
]
[[package]]
name = "env_logger"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95b3f3e67048839cb0d0781f445682a35113da7121f7c949db0e2be96a4fbece"
dependencies = [
"humantime",
"is-terminal",
"log",
"termcolor",
]
[[package]]
name = "envy"
version = "0.4.2"
@ -648,12 +708,34 @@ dependencies = [
"walkdir",
]
[[package]]
name = "hash32"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b0c35f58762feb77d74ebe43bdbc3210f09be9fe6742234d573bacc26ed92b67"
dependencies = [
"byteorder",
]
[[package]]
name = "hashbrown"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a"
[[package]]
name = "heapless"
version = "0.7.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cdc6457c0eb62c71aac4bc17216026d8410337c4126773b9c5daba343f17964f"
dependencies = [
"atomic-polyfill",
"hash32",
"rustc_version",
"spin",
"stable_deref_trait",
]
[[package]]
name = "heck"
version = "0.3.3"
@ -678,6 +760,12 @@ dependencies = [
"libc",
]
[[package]]
name = "hermit-abi"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7"
[[package]]
name = "home"
version = "0.5.5"
@ -687,6 +775,12 @@ dependencies = [
"windows-sys 0.48.0",
]
[[package]]
name = "humantime"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]]
name = "iana-time-zone"
version = "0.1.57"
@ -737,6 +831,17 @@ dependencies = [
"hashbrown",
]
[[package]]
name = "is-terminal"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb0889898416213fab133e1d33a0e5858a48177452750691bde3666d0fdbaf8b"
dependencies = [
"hermit-abi 0.3.3",
"rustix",
"windows-sys 0.48.0",
]
[[package]]
name = "itoa"
version = "1.0.9"
@ -1326,6 +1431,15 @@ dependencies = [
"windows-sys 0.48.0",
]
[[package]]
name = "spin"
version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
dependencies = [
"lock_api",
]
[[package]]
name = "stable_deref_trait"
version = "1.2.0"
@ -1445,18 +1559,18 @@ dependencies = [
[[package]]
name = "thiserror"
version = "1.0.44"
version = "1.0.51"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "611040a08a0439f8248d1990b111c95baa9c704c805fa1f62104b39655fd7f90"
checksum = "f11c217e1416d6f036b870f14e0413d480dbf28edbee1f877abaf0206af43bb7"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.44"
version = "1.0.51"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "090198534930841fab3a5d1bb637cde49e339654e606195f8d9c76eeb081dc96"
checksum = "01742297787513b79cf8e29d1056ede1313e2420b7b3b15d0a768b4921f549df"
dependencies = [
"proc-macro2",
"quote",

View file

@ -2,12 +2,16 @@
resolver = "2"
members = [
"bark",
"bark-core",
"bark-protocol",
]
[workspace.dependencies]
bark-core = { path = "bark-core" }
bark-protocol = { path = "bark-protocol" }
bitflags = { version = "2.4.0", features = ["bytemuck"] }
bytemuck = { version = "1.13", features = ["derive"] }
derive_more = { version = "0.99" }
heapless = "0.7"
log = "0.4"

View file

@ -44,10 +44,10 @@ Note: if using Pipewire, you must have `pipewire-alsa` installed for this to wor
3676 alsa_output.usb-Focusrite_Scarlett_Solo_USB-00.analog-stereo PipeWire s32le 2ch 44100Hz RUNNING
```
* Run the Bark receiver:
* Run the Bark receiver, passing the appropriate ALSA device name for your sound server:
```sh-session
$ bark receive --multicast 224.100.100.100:1530 --device alsa_output.usb-Focusrite_Scarlett_Solo_USB-00.analog-stereo
$ bark receive --multicast 224.100.100.100:1530 --device pipewire:NODE=alsa_output.usb-Focusrite_Scarlett_Solo_USB-00.analog-stereo
```
### Configuration
@ -67,8 +67,10 @@ multicast = "224.100.100.100:1530"
device = "Bark"
delay_ms = 15
[receive]
device = "alsa_output.usb-Focusrite_Scarlett_Solo_USB-00.analog-stereo"
[receive.device]
name = "pipewire:NODE=MyNodeName"
period = 120 # default: send audio to hardware in discrete chunks of 120 frames
buffer = 240 # default: buffer 240 frames of decoded audio in memory
```
### Monitoring the stream

14
bark-core/Cargo.toml Normal file
View file

@ -0,0 +1,14 @@
[package]
name = "bark-core"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
bark-protocol = { workspace = true }
bytemuck = { workspace = true }
derive_more = { workspace = true }
heapless = { workspace = true }
log = { workspace = true }

4
bark-core/src/consts.rs Normal file
View file

@ -0,0 +1,4 @@
use bark_protocol::FRAMES_PER_PACKET;
pub const MAX_QUEUED_DECODE_SEGMENTS: usize = 48;
pub const DECODE_BUFFER_FRAMES: usize = FRAMES_PER_PACKET * 2;

2
bark-core/src/lib.rs Normal file
View file

@ -0,0 +1,2 @@
pub mod receive;
pub mod consts;

View file

@ -0,0 +1 @@
pub mod queue;

View file

@ -0,0 +1,134 @@
use core::num::NonZeroU16;
use bark_protocol::{packet::Audio, types::AudioPacketHeader, time::{SampleDuration, Timestamp}};
use heapless::Deque;
use crate::consts::MAX_QUEUED_DECODE_SEGMENTS;
pub struct PacketQueue {
queue: Deque<Option<Audio>, MAX_QUEUED_DECODE_SEGMENTS>,
/// The seq of the first packet in the queue, the rest are implied
head_seq: u64,
/// We delay yielding packets when a queue is first started (or reset), to
/// allow for some buffering. The amount of packets buffered depends on
/// the difference between dts and pts in the initial packet.
start: DelayStart,
}
enum NoSlot {
InPast,
TooFarInFuture,
}
impl PacketQueue {
pub fn new(initial: &AudioPacketHeader) -> Self {
PacketQueue {
queue: Deque::new(),
head_seq: initial.seq,
start: DelayStart::init(initial),
}
}
pub fn pop_front(&mut self) -> Option<Audio> {
if self.start.yield_packet() {
self.head_seq += 1;
self.queue.pop_front().flatten()
} else {
None
}
}
pub fn insert_packet(&mut self, packet: Audio) {
let packet_seq = packet.header().seq;
let head_seq = self.head_seq;
let tail_seq = self.head_seq + self.queue.capacity() as u64;
match self.queue_slot_mut(packet_seq) {
Ok(slot@&mut None) => {
*slot = Some(packet);
}
Ok(Some(_)) => {
log::warn!("received duplicate packet, retaining first received: packet_seq={packet_seq}");
}
Err(NoSlot::InPast) => {
log::warn!("received packet in past, dropping: head_seq={head_seq}, packet_seq={packet_seq}");
}
Err(NoSlot::TooFarInFuture) => {
log::warn!("received packet too far in future, resetting queue: tail_seq={tail_seq}, packet_seq={packet_seq}");
// reset queue:
self.head_seq = packet_seq;
self.start = DelayStart::init(packet.header());
self.queue.clear();
self.queue.push_back(Some(packet)).expect("always room in queue after clear");
}
}
}
fn queue_slot_mut(&mut self, seq: u64) -> Result<&mut Option<Audio>, NoSlot> {
let idx = seq.checked_sub(self.head_seq).ok_or(NoSlot::InPast)? as usize;
if idx >= self.queue.capacity() {
return Err(NoSlot::TooFarInFuture);
}
// expand deq if needed so we can take mut ref
while self.queue.len() <= idx {
let Ok(()) = self.queue.push_back(None) else {
unreachable!("bounds check above implies this push always succeeds")
};
}
let slices = self.queue.as_mut_slices();
if idx < slices.0.len() {
Ok(&mut slices.0[idx])
} else {
Ok(&mut slices.1[idx - slices.0.len()])
}
}
pub fn len(&self) -> usize {
self.queue.len()
}
}
enum DelayStart {
Delay(NonZeroU16),
Live,
}
impl DelayStart {
pub fn init(header: &AudioPacketHeader) -> Self {
// calculate the stream delay by taking the difference between
// pts and dts in the initial packet:
let initial_pts = Timestamp::from_micros_lossy(header.pts);
let initial_dts = Timestamp::from_micros_lossy(header.dts);
let delay = initial_pts.saturating_duration_since(initial_dts);
// calculate number of packets this delay represents:
let packet_delay = delay.to_frame_count() / SampleDuration::ONE_PACKET.to_frame_count();
// quick n dirty round up:
let packet_delay = packet_delay + 1;
// calculate how many packets we should wait for before starting to
// yield audio segments to the decoder. this allows some time to build
// a buffer before beginning:
u16::try_from(packet_delay)
.and_then(NonZeroU16::try_from)
.map(DelayStart::Delay)
.unwrap_or(DelayStart::Live)
}
pub fn yield_packet(&mut self) -> bool {
if let DelayStart::Delay(count) = self {
*self = NonZeroU16::new(count.get() - 1)
.map(DelayStart::Delay)
.unwrap_or(DelayStart::Live);
}
matches!(self, DelayStart::Live)
}
}

View file

@ -27,6 +27,10 @@ impl Timestamp {
Timestamp(self.0.checked_add(duration.0).unwrap())
}
pub fn saturating_duration_since(&self, other: Timestamp) -> SampleDuration {
SampleDuration(self.0.saturating_sub(other.0))
}
pub fn duration_since(&self, other: Timestamp) -> SampleDuration {
SampleDuration(self.0.checked_sub(other.0).unwrap())
}
@ -57,6 +61,10 @@ impl SampleDuration {
SampleDuration(samples)
}
pub fn to_frame_count(self) -> u64 {
self.0
}
pub fn from_std_duration_lossy(duration: core::time::Duration) -> SampleDuration {
let duration = (duration.as_micros() * u128::from(SAMPLE_RATE)) / 1_000_000;
let duration = u64::try_from(duration).expect("can't narrow duration to u64");

View file

@ -12,6 +12,7 @@ pub struct ReceiverStats {
audio_latency: f64,
buffer_length: f64,
output_latency: f64,
network_latency: f64,
predict_offset: f64,
}
@ -52,6 +53,7 @@ bitflags! {
const HAS_BUFFER_LENGTH = 0x08;
const HAS_NETWORK_LATENCY = 0x10;
const HAS_PREDICT_OFFSET = 0x20;
const HAS_OUTPUT_LATENCY = 0x40;
}
}
@ -86,11 +88,16 @@ impl ReceiverStats {
self.field(ReceiverStatsFlags::HAS_AUDIO_LATENCY, self.audio_latency)
}
/// Duration of buffered audio in seconds
/// Length of Bark-internal audio buffer in seconds
pub fn buffer_length(&self) -> Option<f64> {
self.field(ReceiverStatsFlags::HAS_BUFFER_LENGTH, self.buffer_length)
}
/// Length of output audio buffer (including hardware latency) in seconds
pub fn output_latency(&self) -> Option<f64> {
self.field(ReceiverStatsFlags::HAS_OUTPUT_LATENCY, self.output_latency)
}
/// Duration of buffered audio in seconds
pub fn network_latency(&self) -> Option<f64> {
self.field(ReceiverStatsFlags::HAS_NETWORK_LATENCY, self.network_latency)
@ -114,6 +121,11 @@ impl ReceiverStats {
self.flags.insert(ReceiverStatsFlags::HAS_BUFFER_LENGTH);
}
pub fn set_output_latency(&mut self, latency: SampleDuration) {
self.output_latency = latency.to_std_duration_lossy().as_micros() as f64 / 1_000_000.0;
self.flags.insert(ReceiverStatsFlags::HAS_OUTPUT_LATENCY);
}
pub fn set_network_latency(&mut self, latency: core::time::Duration) {
self.network_latency = latency.as_micros() as f64 / 1_000_000.0;
self.flags.insert(ReceiverStatsFlags::HAS_NETWORK_LATENCY);

View file

@ -1 +1,5 @@
multicast = "224.100.100.100:1530"
[audio]
period = 120
buffer = 240

View file

@ -6,8 +6,10 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
bark-core = { workspace = true }
bark-protocol = { workspace = true }
alsa = "0.8.1"
bitflags = { workspace = true }
bytemuck = { workspace = true, features = ["extern_crate_alloc"] }
cpal = "0.15.2"
@ -23,3 +25,5 @@ structopt = "0.3.26"
termcolor = "1.2.0"
toml = "0.7.6"
xdg = "2.5.2"
thiserror = "1.0.51"
env_logger = { version = "0.10", default-features = false, features = ["color", "auto-color", "humantime"] }

4
bark/src/audio/config.rs Normal file
View file

@ -0,0 +1,4 @@
use bark_protocol::time::SampleDuration;
pub const DEFAULT_PERIOD: SampleDuration = SampleDuration::from_frame_count(120);
pub const DEFAULT_BUFFER: SampleDuration = SampleDuration::from_frame_count(240);

View file

@ -2,19 +2,7 @@ use std::process::{Command, Stdio};
use serde::Deserialize;
pub fn set_sink_env(device: &str) {
let Some(index) = find_pulse_node(Kind::Sink, device) else {
eprintln!("falling back to default audio sink");
return;
};
println!("using audio sink at index {}: {}", index.0, device);
std::env::set_var("PULSE_SINK", device);
std::env::set_var("PIPEWIRE_NODE", index.0.to_string());
}
pub fn set_source_env(device: &str) {
pub fn set_source(device: &str) {
let Some(index) = find_pulse_node(Kind::Source, device) else {
eprintln!("falling back to default audio source");
return;
@ -28,7 +16,6 @@ pub fn set_source_env(device: &str) {
enum Kind {
Source,
Sink,
}
#[derive(Deserialize)]
@ -43,7 +30,6 @@ struct NodeIndex(u64);
fn find_pulse_node(kind: Kind, name: &str) -> Option<NodeIndex> {
let kind = match kind {
Kind::Source => "sources",
Kind::Sink => "sinks",
};
let result = Command::new("pactl")

3
bark/src/audio/mod.rs Normal file
View file

@ -0,0 +1,3 @@
pub mod config;
pub mod env;
pub mod output;

138
bark/src/audio/output.rs Normal file
View file

@ -0,0 +1,138 @@
use alsa::{ValueOr, Direction};
use alsa::pcm::{PCM, HwParams, Format, Access};
use bark_protocol::CHANNELS;
use bark_protocol::time::SampleDuration;
use nix::errno::Errno;
use thiserror::Error;
pub struct Output {
pcm: PCM,
}
pub struct OutputOpt {
pub device: Option<String>,
pub period: SampleDuration,
pub buffer: SampleDuration,
}
#[derive(Debug, Error)]
pub enum OpenError {
#[error("alsa error: {0}")]
Alsa(#[from] alsa::Error),
#[error("invalid period size (min = {min}, max = {max})")]
InvalidPeriodSize { min: i64, max: i64 },
#[error("invalid buffer size (min = {min}, max = {max})")]
InvalidBufferSize { min: i64, max: i64 },
}
#[derive(Debug, Error)]
pub enum WriteAudioError {
#[error("alsa: {0}")]
Alsa(#[from] alsa::Error),
}
impl Output {
pub fn new(opt: OutputOpt) -> Result<Self, OpenError> {
let device_name = opt.device.as_deref().unwrap_or("default");
let pcm = PCM::new(device_name, Direction::Playback, false)?;
{
let hwp = HwParams::any(&pcm)?;
hwp.set_channels(bark_protocol::CHANNELS.0.into())?;
hwp.set_rate(bark_protocol::SAMPLE_RATE.0, ValueOr::Nearest)?;
hwp.set_format(Format::float())?;
hwp.set_access(Access::RWInterleaved)?;
set_period_size(&hwp, opt.period)?;
set_buffer_size(&hwp, opt.buffer)?;
pcm.hw_params(&hwp)?;
}
{
let hwp = pcm.hw_params_current()?;
let swp = pcm.sw_params_current()?;
swp.set_start_threshold(hwp.get_buffer_size()?)?;
}
let (buffer, period) = pcm.get_params()?;
eprintln!("opened ALSA with buffer_size={buffer}, period_size={period}");
Ok(Output { pcm })
}
pub fn write(&self, mut audio: &[f32]) -> Result<(), WriteAudioError> {
while audio.len() > 0 {
let n = self.write_partial(audio)?;
audio = &audio[n..];
}
Ok(())
}
fn write_partial(&self, audio: &[f32]) -> Result<usize, WriteAudioError> {
let io = unsafe {
// the checked versions of this function call
// snd_pcm_hw_params_current which mallocs under the hood
self.pcm.io_unchecked::<f32>()
};
loop {
// try to write audio
let err = match io.writei(audio) {
Ok(n) => {
return Ok(n * CHANNELS.0 as usize);
}
Err(e) => e,
};
// handle recoverable errors
match err.errno() {
| Errno::EPIPE // underrun
| Errno::ESTRPIPE // stream suspended
| Errno::EINTR // interrupted syscall
=> {
eprintln!("recovering from error: {}", err.errno());
// try to recover
self.pcm.recover(err.errno() as i32, false)?;
}
_ => { return Err(err.into()); }
}
}
}
pub fn delay(&self) -> Result<SampleDuration, alsa::Error> {
let frames = self.pcm.delay()?;
Ok(SampleDuration::from_frame_count(frames.try_into().unwrap()))
}
}
// period is the size of the discrete chunks of data that are sent to hardware
fn set_period_size(hwp: &HwParams, period: SampleDuration)
-> Result<(), OpenError>
{
let min = hwp.get_period_size_min()?;
let max = hwp.get_period_size_max()?;
let period = period.to_frame_count().try_into().ok()
.filter(|size| { *size >= min && *size <= max })
.ok_or(OpenError::InvalidPeriodSize { min, max })?;
hwp.set_period_size(period, ValueOr::Nearest)?;
Ok(())
}
// period is the size of the discrete chunks of data that are sent to hardware
fn set_buffer_size(hwp: &HwParams, buffer: SampleDuration)
-> Result<(), OpenError>
{
let min = hwp.get_buffer_size_min()?;
let max = hwp.get_buffer_size_max()?;
let buffer = buffer.to_frame_count().try_into().ok()
.filter(|size| *size >= min && *size <= max)
.ok_or(OpenError::InvalidBufferSize { min, max })?;
hwp.set_buffer_size(buffer)?;
Ok(())
}

View file

@ -21,7 +21,14 @@ pub struct Source {
#[derive(Deserialize, Default)]
pub struct Receive {
output: Output,
}
#[derive(Deserialize, Default)]
pub struct Output {
device: Option<String>,
period: Option<u64>,
buffer: Option<u64>,
}
fn set_env_option<T: ToString>(name: &str, value: Option<T>) {
@ -34,7 +41,9 @@ pub fn load_into_env(config: &Config) {
set_env_option("BARK_MULTICAST", config.multicast);
set_env_option("BARK_SOURCE_DEVICE", config.source.device.as_ref());
set_env_option("BARK_SOURCE_DELAY_MS", config.source.delay_ms);
set_env_option("BARK_RECEIVE_DEVICE", config.receive.device.as_ref());
set_env_option("BARK_RECEIVE_OUTPUT_DEVICE", config.receive.output.device.as_ref());
set_env_option("BARK_RECEIVE_OUTPUT_PERIOD", config.receive.output.period);
set_env_option("BARK_RECEIVE_OUTPUT_BUFFER", config.receive.output.buffer);
}
fn load_file(path: &Path) -> Option<Config> {

View file

@ -25,6 +25,7 @@ pub enum RunError {
Listen(socket::ListenError),
NoDeviceAvailable,
NoSupportedStreamConfig,
OpenAudioOutput(audio::output::OpenError),
StreamConfigs(cpal::SupportedStreamConfigsError),
BuildStream(cpal::BuildStreamError),
Stream(cpal::PlayStreamError),

View file

@ -1,68 +1,50 @@
use std::array;
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use bark_core::receive::queue::PacketQueue;
use bytemuck::Zeroable;
use cpal::OutputCallbackInfo;
use cpal::traits::{HostTrait, DeviceTrait};
use structopt::StructOpt;
use bark_protocol::SampleRate;
use bark_protocol::{SampleRate, SAMPLES_PER_PACKET, FRAMES_PER_PACKET};
use bark_protocol::time::{Timestamp, SampleDuration, TimestampDelta, ClockDelta};
use bark_protocol::types::{SessionId, ReceiverId, TimePhase};
use bark_protocol::types::{SessionId, ReceiverId, TimePhase, AudioPacketHeader};
use bark_protocol::types::stats::receiver::{ReceiverStats, StreamStatus};
use bark_protocol::packet::{Audio, Time, PacketKind, StatsReply};
use crate::audio::config::{DEFAULT_PERIOD, DEFAULT_BUFFER};
use crate::audio::output::{Output, OutputOpt};
use crate::resample::Resampler;
use crate::socket::{ProtocolSocket, Socket, SocketOpt};
use crate::{util, time, stats};
use crate::{time, stats};
use crate::RunError;
pub struct Receiver {
opt: ReceiveOpt,
stats: ReceiverStats,
stream: Option<Stream>,
queue: VecDeque<QueueEntry>,
}
struct QueueEntry {
seq: u64,
pts: Option<Timestamp>,
consumed: SampleDuration,
packet: Option<Audio>,
}
impl QueueEntry {
pub fn as_full_buffer(&self) -> &[f32] {
self.packet.as_ref()
.map(|packet| packet.buffer())
.unwrap_or(&[0f32; bark_protocol::SAMPLES_PER_PACKET])
}
}
struct Stream {
sid: SessionId,
start_seq: u64,
sync: bool,
resampler: Resampler,
rate_adjust: RateAdjust,
latency: Aggregate<Duration>,
clock_delta: Aggregate<ClockDelta>,
queue: PacketQueue,
}
impl Stream {
pub fn start_from_packet(audio: &Audio) -> Self {
pub fn new(header: &AudioPacketHeader) -> Self {
let resampler = Resampler::new();
let queue = PacketQueue::new(header);
Stream {
sid: audio.header().sid,
start_seq: audio.header().seq,
sync: false,
sid: header.sid,
resampler,
rate_adjust: RateAdjust::new(),
latency: Aggregate::new(),
clock_delta: Aggregate::new(),
queue,
}
}
@ -84,13 +66,9 @@ pub struct ClockInfo {
}
impl Receiver {
pub fn new(opt: ReceiveOpt) -> Self {
let queue = VecDeque::with_capacity(opt.max_seq_gap);
pub fn new() -> Self {
Receiver {
opt,
stream: None,
queue,
stats: ReceiverStats::new(),
}
}
@ -133,225 +111,66 @@ impl Receiver {
stream.clock_delta.observe(clock_delta);
}
fn prepare_stream(&mut self, packet: &Audio) -> bool {
if let Some(stream) = self.stream.as_mut() {
let header = packet.header();
fn prepare_stream(&mut self, header: &AudioPacketHeader) -> &mut Stream {
let new_stream = match &self.stream {
Some(stream) => stream.sid < header.sid,
None => true,
};
if header.sid < stream.sid {
// packet belongs to a previous stream, ignore
return false;
}
if header.sid > stream.sid {
// new stream is taking over! switch over to it
println!("\nnew stream beginning");
self.stream = Some(Stream::start_from_packet(packet));
self.stats.clear();
self.queue.clear();
return true;
}
if header.seq < stream.start_seq {
println!("\nreceived packet with seq before start, dropping");
return false;
}
if let Some(front) = self.queue.front() {
if header.seq <= front.seq {
println!("\nreceived packet with seq <= queue front seq, dropping");
return false;
}
}
if let Some(back) = self.queue.back() {
if back.seq + self.opt.max_seq_gap as u64 <= header.seq {
println!("\nreceived packet with seq too far in future, resetting stream");
self.stream = Some(Stream::start_from_packet(packet));
self.stats.clear();
self.queue.clear();
}
}
true
} else {
self.stream = Some(Stream::start_from_packet(packet));
if new_stream {
// new stream is taking over! switch over to it
println!("\nnew stream beginning");
self.stream = Some(Stream::new(header));
self.stats.clear();
true
}
self.stream.as_mut().unwrap()
}
pub fn receive_audio(&mut self, packet: Audio) {
let now = time::now();
if !self.prepare_stream(&packet) {
return;
}
let packet_dts = packet.header().dts;
// we are guaranteed that if prepare_stream returns true,
// self.stream is Some:
let stream = self.stream.as_ref().unwrap();
let stream = self.prepare_stream(packet.header());
stream.queue.insert_packet(packet);
if let Some(latency) = stream.network_latency() {
if let Some(clock_delta) = stream.clock_delta.median() {
let latency_usec = u64::try_from(latency.as_micros()).unwrap();
let delta_usec = clock_delta.as_micros();
let predict_dts = (now.0 - latency_usec).checked_add_signed(-delta_usec).unwrap();
let predict_diff = predict_dts as i64 - packet.header().dts.0 as i64;
let predict_diff = predict_dts as i64 - packet_dts.0 as i64;
self.stats.set_predict_offset(predict_diff)
}
}
// INVARIANT: at this point we are guaranteed that, if there are
// packets in the queue, the seq of the incoming packet is less than
// back.seq + max_seq_gap
// expand queue to make space for new packet
if let Some(back) = self.queue.back() {
if packet.header().seq > back.seq {
// extend queue from back to make space for new packet
// this also allows for out of order packets
for seq in (back.seq + 1)..=packet.header().seq {
self.queue.push_back(QueueEntry {
seq,
pts: None,
consumed: SampleDuration::zero(),
packet: None,
})
}
}
} else {
// queue is empty, insert missing packet slot for the packet we are about to receive
self.queue.push_back(QueueEntry {
seq: packet.header().seq,
pts: None,
consumed: SampleDuration::zero(),
packet: None,
});
}
// INVARIANT: at this point queue is non-empty and contains an
// allocated slot for the packet we just received
let front_seq = self.queue.front().unwrap().seq;
let idx_for_packet = (packet.header().seq - front_seq) as usize;
let slot = self.queue.get_mut(idx_for_packet).unwrap();
assert!(slot.seq == packet.header().seq);
slot.pts = stream.adjust_pts(Timestamp::from_micros_lossy(packet.header().pts));
slot.packet = Some(packet);
}
pub fn fill_stream_buffer(&mut self, mut data: &mut [f32], pts: Timestamp) {
// complete frames only:
assert!(data.len() % 2 == 0);
pub fn write_audio(&mut self, buffer: &mut [f32], pts: Timestamp) -> SampleDuration {
// get stream start timing information:
let Some(stream) = self.stream.as_mut() else {
// stream hasn't started, just fill buffer with silence and return
data.fill(0f32);
return;
buffer[0..SAMPLES_PER_PACKET].fill(0f32);
return SampleDuration::ONE_PACKET;
};
let real_ts_after_fill = pts.add(SampleDuration::from_buffer_offset(data.len()));
let Some(packet) = stream.queue.pop_front() else {
// no packets yet
buffer[0..SAMPLES_PER_PACKET].fill(0f32);
return SampleDuration::ONE_PACKET;
};
// sync up to stream if necessary:
if !stream.sync {
loop {
let Some(front) = self.queue.front_mut() else {
// nothing at front of queue?
data.fill(0f32);
return;
};
let header_pts = Timestamp::from_micros_lossy(packet.header().pts);
let Some(front_pts) = front.pts else {
// haven't received enough info to adjust pts of queue
// front yet, just pop and ignore it
self.queue.pop_front();
// and output silence for this part:
data.fill(0f32);
return;
};
if pts > front_pts {
// frame has already begun, we are late
let late = pts.duration_since(front_pts);
if late >= SampleDuration::ONE_PACKET {
// we are late by more than a packet, skip to the next
self.queue.pop_front();
continue;
}
// partially consume this packet to sync up
front.consumed = late;
// we are synced
stream.sync = true;
self.stats.set_stream(StreamStatus::Sync);
break;
}
// otherwise we are early
let early = front_pts.duration_since(pts);
if early >= SampleDuration::from_buffer_offset(data.len()) {
// we are early by more than what was asked of us in this
// call, fill with zeroes and return
data.fill(0f32);
return;
}
// we are early, but not an entire packet timing's early
// partially output some zeroes
let zero_count = early.as_buffer_offset();
data[0..zero_count].fill(0f32);
data = &mut data[zero_count..];
// then mark ourselves as synced and fall through to regular processing
stream.sync = true;
self.stats.set_stream(StreamStatus::Sync);
break;
}
}
let mut stream_ts = None;
// copy data to out
while data.len() > 0 {
let Some(front) = self.queue.front_mut() else {
data.fill(0f32);
self.stats.set_stream(StreamStatus::Miss);
return;
};
let buffer = front.as_full_buffer();
let buffer_offset = front.consumed.as_buffer_offset();
let buffer_remaining = buffer.len() - buffer_offset;
let copy_count = std::cmp::min(data.len(), buffer_remaining);
let buffer_copy_end = buffer_offset + copy_count;
let input = &buffer[buffer_offset..buffer_copy_end];
let output = &mut data[0..copy_count];
let result = stream.resampler.process_interleaved(input, output)
.expect("resample error!");
data = &mut data[result.output_written.as_buffer_offset()..];
front.consumed = front.consumed.add(result.input_read);
stream_ts = front.pts.map(|front_pts| front_pts.add(front.consumed));
// pop packet if fully consumed
if front.consumed == SampleDuration::ONE_PACKET {
self.queue.pop_front();
}
}
if let Some(stream_ts) = stream_ts {
let rate = stream.rate_adjust.sample_rate(Timing {
real: real_ts_after_fill,
play: stream_ts,
let timing = stream.adjust_pts(header_pts)
.map(|stream_pts| Timing {
real: pts,
play: stream_pts,
});
if let Some(timing) = timing {
let rate = stream.rate_adjust.sample_rate(timing);
let _ = stream.resampler.set_input_rate(rate.0);
if stream.rate_adjust.slew() {
@ -360,12 +179,19 @@ impl Receiver {
self.stats.set_stream(StreamStatus::Sync);
}
self.stats.set_audio_latency(real_ts_after_fill, stream_ts);
self.stats.set_audio_latency(timing.real, timing.play);
}
self.stats.set_buffer_length(self.queue.iter()
.map(|entry| SampleDuration::ONE_PACKET.sub(entry.consumed))
.fold(SampleDuration::zero(), |cum, dur| cum.add(dur)));
let resample = stream.resampler.process_interleaved(packet.buffer(), buffer)
.expect("resample error!");
assert_eq!(resample.input_read.as_buffer_offset(), packet.buffer().len());
self.stats.set_buffer_length(
SampleDuration::from_frame_count(
(FRAMES_PER_PACKET * stream.queue.len()).try_into().unwrap()));
resample.output_written
}
}
@ -397,7 +223,7 @@ impl RateAdjust {
fn adjusted_rate(&mut self, timing: Timing) -> Option<SampleRate> {
// parameters, maybe these could be cli args?
let start_slew_threshold = Duration::from_micros(2000);
let stop_slew_threshold = Duration::from_micros(100);
let stop_slew_threshold = Duration::from_micros(1000);
let slew_target_duration = Duration::from_millis(500);
// turn them into native units
@ -420,12 +246,12 @@ impl RateAdjust {
let rate_offset = frame_offset.as_frames() * 1_000_000 / slew_duration_duration;
let rate = base_sample_rate + rate_offset;
// clamp any potential slow down to 2%, we shouldn't ever get too far
// clamp any potential slow down to 1%, we shouldn't ever get too far
// ahead of the stream
let rate = std::cmp::max(base_sample_rate * 98 / 100, rate);
let rate = std::cmp::max(base_sample_rate * 99 / 100, rate);
// let the speed up run much higher, but keep it reasonable still
let rate = std::cmp::min(base_sample_rate * 2, rate);
let rate = std::cmp::min(base_sample_rate * 101 / 100, rate);
self.slew = true;
Some(SampleRate(u32::try_from(rate).unwrap()))
@ -467,66 +293,76 @@ impl<T: Copy + Default + Ord> Aggregate<T> {
pub struct ReceiveOpt {
#[structopt(flatten)]
pub socket: SocketOpt,
#[structopt(long, env = "BARK_RECEIVE_DEVICE")]
pub device: Option<String>,
#[structopt(long, default_value="12")]
pub max_seq_gap: usize,
/// Audio device name
#[structopt(long, env = "BARK_RECEIVE_OUTPUT_DEVICE")]
pub output_device: Option<String>,
/// Size of discrete audio transfer buffer in frames
#[structopt(long, env = "BARK_RECEIVE_OUTPUT_PERIOD")]
pub output_period: Option<u64>,
/// Size of decoded audio buffer in frames
#[structopt(long, env = "BARK_RECEIVE_OUTPUT_BUFFER")]
pub output_buffer: Option<u64>,
}
pub fn run(opt: ReceiveOpt) -> Result<(), RunError> {
let receiver_id = generate_receiver_id();
let node = stats::node::get();
if let Some(device) = &opt.device {
crate::audio::set_sink_env(device);
}
let host = cpal::default_host();
let device = host.default_output_device()
.ok_or(RunError::NoDeviceAvailable)?;
let config = util::config_for_device(&device)?;
struct SharedState {
pub recv: Receiver,
}
let output = Output::new(OutputOpt {
device: opt.output_device,
period: opt.output_period
.map(SampleDuration::from_frame_count)
.unwrap_or(DEFAULT_PERIOD),
buffer: opt.output_buffer
.map(SampleDuration::from_frame_count)
.unwrap_or(DEFAULT_BUFFER),
}).map_err(RunError::OpenAudioOutput)?;
let state = Arc::new(Mutex::new(SharedState {
recv: Receiver::new(opt.clone()),
recv: Receiver::new(),
}));
let _stream = device.build_output_stream(&config,
{
let state = state.clone();
let mut initialized_thread = false;
move |data: &mut [f32], info: &OutputCallbackInfo| {
if !initialized_thread {
crate::thread::set_name("bark/audio");
crate::thread::set_realtime_priority();
initialized_thread = true;
}
let stream_timestamp = info.timestamp();
let output_latency = stream_timestamp.playback
.duration_since(&stream_timestamp.callback)
.unwrap_or_default();
let output_latency = SampleDuration::from_std_duration_lossy(output_latency);
let now = Timestamp::from_micros_lossy(time::now());
let pts = now.add(output_latency);
std::thread::spawn({
let state = state.clone();
move || {
crate::thread::set_name("bark/audio");
loop {
let mut state = state.lock().unwrap();
state.recv.fill_stream_buffer(data, pts);
let delay = output.delay().unwrap();
state.recv.stats.set_output_latency(delay);
let pts = time::now();
let pts = Timestamp::from_micros_lossy(pts);
let pts = pts.add(delay);
// this should be large enough for `write_audio` to process an
// entire packet with:
let mut buffer = [0f32; SAMPLES_PER_PACKET * 2];
let duration = state.recv.write_audio(&mut buffer, pts);
// drop lock before calling `Output::write` (blocking!)
drop(state);
// send audio to ALSA
match output.write(&buffer[0..duration.as_buffer_offset()]) {
Ok(()) => {}
Err(e) => {
eprintln!("error playing audio: {e}");
break;
}
};
}
},
move |err| {
eprintln!("stream error! {err:?}");
},
None
).map_err(RunError::BuildStream)?;
}
});
let socket = Socket::open(opt.socket)
.map_err(RunError::Listen)?;

View file

@ -56,6 +56,7 @@ fn receiver(out: &mut dyn WriteColor, stats: &ReceiverStats) {
time_field(out, "Audio", stats.audio_latency());
time_field(out, "Buffer", stats.buffer_length());
time_field(out, "Output", stats.output_latency());
time_field(out, "Network", stats.network_latency());
time_field(out, "Predict", stats.predict_offset());
}

View file

@ -36,7 +36,7 @@ pub fn run(opt: StreamOpt) -> Result<(), RunError> {
let host = cpal::default_host();
if let Some(device) = &opt.device {
crate::audio::set_source_env(device);
crate::audio::env::set_source(device);
}
let device = host.default_input_device()