diff --git a/bark-core/src/encode/mod.rs b/bark-core/src/encode/mod.rs new file mode 100644 index 0000000..b3059b8 --- /dev/null +++ b/bark-core/src/encode/mod.rs @@ -0,0 +1,17 @@ +pub mod pcm; + +use core::fmt::Display; + +use bark_protocol::types::AudioPacketFormat; +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum EncodeError { + #[error("output buffer too small, need at least {need} bytes")] + OutputBufferTooSmall { need: usize }, +} + +pub trait Encode: Display + Send { + fn header_format(&self) -> AudioPacketFormat; + fn encode_packet(&mut self, samples: &[f32], out: &mut [u8]) -> Result; +} diff --git a/bark-core/src/encode/pcm.rs b/bark-core/src/encode/pcm.rs new file mode 100644 index 0000000..00a3d28 --- /dev/null +++ b/bark-core/src/encode/pcm.rs @@ -0,0 +1,68 @@ +use core::fmt::{self, Display}; + +use bark_protocol::types::AudioPacketFormat; + +use super::{Encode, EncodeError}; + +pub struct S16LEEncoder; + +impl Display for S16LEEncoder { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "signed16 (little endian)") + } +} + +impl Encode for S16LEEncoder { + fn header_format(&self) -> AudioPacketFormat { + AudioPacketFormat::S16LE + } + + fn encode_packet(&mut self, samples: &[f32], out: &mut [u8]) -> Result { + encode_packed(samples, out, |sample| { + let scale = (u16::MIN as f32).abs(); + let sample = sample.clamp(-1.0, 1.0) * scale; + i16::to_le_bytes(sample as i16) + }) + } +} + +pub struct F32LEEncoder; + +impl Display for F32LEEncoder { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "float32 (little endian)") + } +} + +impl Encode for F32LEEncoder { + fn header_format(&self) -> AudioPacketFormat { + AudioPacketFormat::F32LE + } + + fn encode_packet(&mut self, samples: &[f32], out: &mut [u8]) -> Result { + encode_packed(samples, out, f32::to_le_bytes) + } +} + +fn encode_packed( + samples: &[f32], + out: &mut [u8], + func: impl Fn(f32) -> [u8; N], +) -> Result { + let out = check_length(out, samples.len() * N)?; + + for (output, input) in out.chunks_exact_mut(N).zip(samples) { + let bytes = func(*input); + output.copy_from_slice(&bytes); + } + + Ok(out.len()) +} + +fn check_length(out: &mut [u8], need: usize) -> Result<&mut [u8], EncodeError> { + if out.len() >= need { + Ok(&mut out[0..need]) + } else { + Err(EncodeError::OutputBufferTooSmall { need }) + } +} diff --git a/bark-core/src/lib.rs b/bark-core/src/lib.rs index 1f485db..451d36b 100644 --- a/bark-core/src/lib.rs +++ b/bark-core/src/lib.rs @@ -1,3 +1,4 @@ -pub mod receive; pub mod consts; pub mod decode; +pub mod encode; +pub mod receive; diff --git a/bark-protocol/src/packet.rs b/bark-protocol/src/packet.rs index dd65508..884dfcc 100644 --- a/bark-protocol/src/packet.rs +++ b/bark-protocol/src/packet.rs @@ -3,10 +3,11 @@ use core::ops::Range; use bytemuck::Zeroable; +use crate::SAMPLES_PER_PACKET; use crate::buffer::{AllocError, PacketBuffer}; use crate::types::stats::node::NodeStats; use crate::types::stats::receiver::ReceiverStats; -use crate::types::{self, Magic, SessionId, StatsReplyFlags}; +use crate::types::{self, Magic, SessionId, StatsReplyFlags, AudioPacketHeader}; pub const MAX_PACKET_SIZE: usize = size_of::() + @@ -89,14 +90,18 @@ pub enum PacketKind { pub struct Audio(Packet); impl Audio { - const HEADER_LENGTH: usize = + pub const HEADER_LENGTH: usize = size_of::(); - pub fn allocate(buffer_length: usize) -> Result { - let length = Self::HEADER_LENGTH + buffer_length; - let packet = Packet::allocate(Magic::AUDIO, length)?; + pub const MAX_BUFFER_LENGTH: usize = + size_of::<[f32; SAMPLES_PER_PACKET]>(); - Ok(Audio(packet)) + pub fn new(header: &AudioPacketHeader, data: &[u8]) -> Result { + let length = Self::HEADER_LENGTH + data.len(); + let mut packet = Audio(Packet::allocate(Magic::AUDIO, length)?); + *packet.header_mut() = *header; + packet.buffer_bytes_mut().copy_from_slice(data); + Ok(packet) } pub fn parse(packet: Packet) -> Option { diff --git a/bark.toml b/bark.toml index f82bd04..bb76462 100644 --- a/bark.toml +++ b/bark.toml @@ -1,4 +1,4 @@ -multicast = "224.100.100.100:1530" +multicast = "224.101.101.101:1530" [source.input] device = "pipewire:NODE=Bark" diff --git a/bark/src/stream.rs b/bark/src/stream.rs index eed6975..988e3ad 100644 --- a/bark/src/stream.rs +++ b/bark/src/stream.rs @@ -1,12 +1,14 @@ use std::sync::Arc; use std::time::Duration; +use bark_core::encode::Encode; +use bark_core::encode::pcm::{S16LEEncoder, F32LEEncoder}; use bark_protocol::SAMPLES_PER_PACKET; use structopt::StructOpt; use bark_protocol::time::SampleDuration; use bark_protocol::packet::{self, Audio, StatsReply, PacketKind}; -use bark_protocol::types::{TimestampMicros, AudioPacketHeader, SessionId, ReceiverId, TimePhase, AudioPacketFormat}; +use bark_protocol::types::{TimestampMicros, AudioPacketHeader, SessionId, ReceiverId, TimePhase}; use crate::audio::config::{DeviceOpt, DEFAULT_PERIOD, DEFAULT_BUFFER}; use crate::audio::input::Input; @@ -38,7 +40,11 @@ pub struct StreamOpt { )] pub delay_ms: u64, - #[structopt(long, env = "BARK_SOURCE_FORMAT")] + #[structopt( + long, + env = "BARK_SOURCE_FORMAT", + default_value = "f32le", + )] pub format: config::Format, } @@ -64,12 +70,19 @@ pub fn run(opt: StreamOpt) -> Result<(), RunError> { let sid = generate_session_id(); let node = stats::node::get(); + let mut encoder = match opt.format { + config::Format::S16LE => Box::new(S16LEEncoder) as Box, + config::Format::F32LE => Box::new(F32LEEncoder) as Box, + }; + + log::info!("instantiated encoder: {}", encoder); + let mut audio_header = AudioPacketHeader { sid, seq: 1, pts: TimestampMicros(0), dts: TimestampMicros(0), - format: AudioPacketFormat::F32LE, + format: encoder.header_format(), }; std::thread::spawn({ @@ -78,15 +91,10 @@ pub fn run(opt: StreamOpt) -> Result<(), RunError> { crate::thread::set_name("bark/audio"); loop { - // create new audio buffer - let buffer_bytes_length = core::mem::size_of::() * SAMPLES_PER_PACKET; - let mut audio = Audio::allocate(buffer_bytes_length) - .expect("allocate Audio packet"); - - let sample_buffer = bytemuck::cast_slice_mut(audio.buffer_bytes_mut()); + let mut sample_buffer = [0f32; SAMPLES_PER_PACKET]; // read audio input - let timestamp = match input.read(sample_buffer) { + let timestamp = match input.read(&mut sample_buffer) { Ok(ts) => ts, Err(e) => { log::error!("error reading audio input: {e}"); @@ -94,15 +102,29 @@ pub fn run(opt: StreamOpt) -> Result<(), RunError> { } }; + // encode audio + let mut encode_buffer = [0; Audio::MAX_BUFFER_LENGTH]; + let encoded_data = match encoder.encode_packet(&sample_buffer, &mut encode_buffer) { + Ok(size) => &encode_buffer[0..size], + Err(e) => { + log::error!("error encoding audio: {e}"); + break; + } + }; + + // assemble new packet header let pts = timestamp.add(delay); - // write packet header - *audio.header_mut() = AudioPacketHeader { + let header = AudioPacketHeader { pts: pts.to_micros_lossy(), dts: time::now(), ..audio_header }; + // allocate new audio packet and copy encoded data in + let audio = Audio::new(&header, encoded_data) + .expect("allocate Audio packet"); + // send it protocol.broadcast(audio.as_packet()).expect("broadcast"); @@ -191,7 +213,7 @@ pub fn run(opt: StreamOpt) -> Result<(), RunError> { Ok(()) } -pub fn generate_session_id() -> SessionId { +fn generate_session_id() -> SessionId { use nix::sys::time::TimeValLike; let timespec = nix::time::clock_gettime(nix::time::ClockId::CLOCK_REALTIME)