diff --git a/bark/src/receive.rs b/bark/src/receive.rs index a5c6aa8..34d2dfe 100644 --- a/bark/src/receive.rs +++ b/bark/src/receive.rs @@ -2,23 +2,18 @@ use std::array; use std::sync::{Arc, Mutex}; use std::time::Duration; -use bark_core::audio::Frame; -use bark_core::receive::pipeline::Pipeline; -use bark_core::receive::timing::Timing; use bytemuck::Zeroable; use structopt::StructOpt; -use bark_core::receive::queue::{AudioPts, PacketQueue}; +use bark_core::receive::queue::AudioPts; -use bark_protocol::FRAMES_PER_PACKET; use bark_protocol::time::{Timestamp, SampleDuration, TimestampDelta, ClockDelta}; use bark_protocol::types::{AudioPacketHeader, ReceiverId, SessionId, TimePhase, TimestampMicros}; -use bark_protocol::types::stats::receiver::{ReceiverStats, StreamStatus}; +use bark_protocol::types::stats::receiver::ReceiverStats; use bark_protocol::packet::{Audio, Time, PacketKind, StatsReply}; use crate::audio::config::{DEFAULT_PERIOD, DEFAULT_BUFFER, DeviceOpt}; use crate::audio::Output; -use crate::config::Device; use crate::receive::output::OutputRef; use crate::receive::stream::Stream as ReceiveStream; use crate::socket::{ProtocolSocket, Socket, SocketOpt}; @@ -166,52 +161,6 @@ impl Receiver { } } } - - /* - pub fn write_audio(&mut self, buffer: &mut [Frame], pts: Timestamp) -> usize { - // get stream start timing information: - let Some(stream) = self.stream.as_mut() else { - // stream hasn't started, just fill buffer with silence and return - buffer[0..FRAMES_PER_PACKET].fill(Frame::zeroed()); - return FRAMES_PER_PACKET; - }; - - // get next packet from queue, or None if missing (packet loss) - let queue_item = stream.queue.pop_front(); - - let (packet, stream_pts) = queue_item.as_ref() - .map(|item| (Some(&item.audio), Some(item.pts))) - .unwrap_or_default(); - - let timing = stream_pts.map(|stream_pts| Timing { - real: pts, - play: stream_pts, - }); - - // adjust resampler rate based on stream timing info - if let Some(timing) = timing { - stream.pipeline.set_timing(timing); - - if stream.pipeline.slew() { - self.stats.set_stream(StreamStatus::Slew); - } else { - self.stats.set_stream(StreamStatus::Sync); - } - - self.stats.set_audio_latency(timing.real, timing.play); - } - - // pass packet through decode pipeline - let frames = stream.pipeline.process(packet, buffer); - - // report stats and return - self.stats.set_buffer_length( - SampleDuration::from_frame_count( - (FRAMES_PER_PACKET * stream.queue.len()).try_into().unwrap())); - - frames - } - */ } struct Aggregate { @@ -285,43 +234,6 @@ pub fn run(opt: ReceiveOpt) -> Result<(), RunError> { recv: Receiver::new(output), })); - /* - std::thread::spawn({ - let state = state.clone(); - move || { - thread::set_name("bark/audio"); - - loop { - let mut state = state.lock().unwrap(); - - let delay = output.delay().unwrap(); - state.recv.stats.set_output_latency(delay); - - let pts = time::now(); - let pts = Timestamp::from_micros_lossy(pts); - let pts = pts.add(delay); - - // this should be large enough for `write_audio` to process an - // entire packet with: - let mut buffer = [Frame::zeroed(); FRAMES_PER_PACKET * 2]; - let count = state.recv.write_audio(&mut buffer, pts); - - // drop lock before calling `Output::write` (blocking!) - drop(state); - - // send audio to ALSA - match output.write(&buffer[0..count]) { - Ok(()) => {} - Err(e) => { - log::error!("error playing audio: {e}"); - break; - } - }; - } - } - }); - */ - let socket = Socket::open(opt.socket) .map_err(RunError::Listen)?; diff --git a/bark/src/receive/stream.rs b/bark/src/receive/stream.rs index 2244074..ad34fa3 100644 --- a/bark/src/receive/stream.rs +++ b/bark/src/receive/stream.rs @@ -1,13 +1,14 @@ -use std::{sync::Mutex, thread}; +use std::thread; use bark_core::{audio::Frame, receive::{pipeline::Pipeline, queue::{AudioPts, PacketQueue}, timing::Timing}}; -use bark_protocol::{time::{ClockDelta, SampleDuration, Timestamp, TimestampDelta}, types::{stats::receiver::StreamStatus, AudioPacketHeader, SessionId}, FRAMES_PER_PACKET}; +use bark_protocol::time::{SampleDuration, Timestamp, TimestampDelta}; +use bark_protocol::types::{stats::receiver::StreamStatus, AudioPacketHeader, SessionId}; +use bark_protocol::FRAMES_PER_PACKET; use bytemuck::Zeroable; use crate::time; use crate::receive::output::OutputRef; use crate::receive::queue::{self, Disconnected, QueueReceiver, QueueSender}; -use crate::receive::Aggregate; pub struct Stream { tx: QueueSender, @@ -20,7 +21,6 @@ impl Stream { let (tx, rx) = queue::channel(queue); let state = StreamState { - clock_delta: Aggregate::new(), queue: rx, pipeline: Pipeline::new(header), output, @@ -46,7 +46,6 @@ impl Stream { } struct StreamState { - clock_delta: Aggregate, queue: QueueReceiver, pipeline: Pipeline, output: OutputRef, @@ -130,10 +129,3 @@ fn run_stream(mut stream: StreamState) { } } } - -/// Adjust pts from remote time to local time -fn adjust_pts(stream: &StreamState, pts: Timestamp) -> Option { - stream.clock_delta.median().map(|delta| { - pts.adjust(TimestampDelta::from_clock_delta_lossy(delta)) - }) -}