bubble up disconnected error

This commit is contained in:
Hailey Somerville 2024-06-23 18:15:56 +10:00
parent 1553d883cd
commit 7f12d53fd1
3 changed files with 20 additions and 22 deletions

View file

@ -31,6 +31,8 @@ pub enum RunError {
Receive(std::io::Error),
#[error("opening encoder: {0}")]
OpenEncoder(#[from] bark_core::encode::NewEncoderError),
#[error("{0}")]
Disconnected(#[from] receive::queue::Disconnected),
}
fn main() -> Result<(), ExitCode> {

View file

@ -1,5 +1,4 @@
use std::array;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use bytemuck::Zeroable;
@ -21,10 +20,11 @@ use crate::{time, stats, thread};
use crate::RunError;
use self::output::OwnedOutput;
use self::queue::Disconnected;
mod output;
mod queue;
mod stream;
pub mod output;
pub mod queue;
pub mod stream;
pub struct Receiver {
stats: ReceiverStats,
@ -128,7 +128,7 @@ impl Receiver {
self.stream.as_mut().unwrap()
}
pub fn receive_audio(&mut self, packet: Audio) {
pub fn receive_audio(&mut self, packet: Audio) -> Result<(), Disconnected> {
let now = time::now();
let header = packet.header();
@ -149,7 +149,7 @@ impl Receiver {
stream.stream.send(AudioPts {
pts,
audio: packet,
});
})?;
if let Some(latency) = stream.network_latency() {
if let Some(clock_delta) = stream.clock_delta.median() {
@ -160,6 +160,8 @@ impl Receiver {
self.stats.set_predict_offset(predict_diff)
}
}
Ok(())
}
}
@ -216,10 +218,6 @@ pub fn run(opt: ReceiveOpt) -> Result<(), RunError> {
let receiver_id = generate_receiver_id();
let node = stats::node::get();
struct SharedState {
pub recv: Receiver,
}
let output = Output::new(&DeviceOpt {
device: opt.output_device,
period: opt.output_period
@ -230,9 +228,7 @@ pub fn run(opt: ReceiveOpt) -> Result<(), RunError> {
.unwrap_or(DEFAULT_BUFFER),
}).map_err(RunError::OpenAudioDevice)?;
let state = Arc::new(Mutex::new(SharedState {
recv: Receiver::new(output),
}));
let mut receiver = Receiver::new(output);
let socket = Socket::open(opt.socket)
.map_err(RunError::Listen)?;
@ -263,8 +259,8 @@ pub fn run(opt: ReceiveOpt) -> Result<(), RunError> {
.expect("reply to time packet");
}
Some(TimePhase::StreamReply) => {
let mut state = state.lock().unwrap();
state.recv.receive_time(time);
// let mut state = state.lock().unwrap();
receiver.receive_time(time);
}
_ => {
// not for us - must be destined for another process
@ -273,14 +269,12 @@ pub fn run(opt: ReceiveOpt) -> Result<(), RunError> {
}
}
Some(PacketKind::Audio(packet)) => {
let mut state = state.lock().unwrap();
state.recv.receive_audio(packet);
receiver.receive_audio(packet)?;
}
Some(PacketKind::StatsRequest(_)) => {
let state = state.lock().unwrap();
let sid = state.recv.current_session().unwrap_or(SessionId::zeroed());
let receiver = *state.recv.stats();
drop(state);
// let state = state.lock().unwrap();
let sid = receiver.current_session().unwrap_or(SessionId::zeroed());
let receiver = *receiver.stats();
let reply = StatsReply::receiver(sid, receiver, node)
.expect("allocate StatsReply packet");

View file

@ -1,6 +1,7 @@
use std::sync::{Arc, Condvar, Mutex};
use bark_core::receive::queue::{PacketQueue, AudioPts};
use thiserror::Error;
pub struct QueueSender {
shared: Arc<Shared>,
@ -27,7 +28,8 @@ pub fn channel(queue: PacketQueue) -> (QueueSender, QueueReceiver) {
(tx, rx)
}
#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone, Copy, Error)]
#[error("audio receiver thread unexpectedly disconnected")]
pub struct Disconnected;
impl QueueSender {