Rework the engine

This commit is contained in:
Pierre Krieger 2017-02-07 10:27:07 +01:00
parent 78281513a1
commit 8aca2cb6eb
5 changed files with 212 additions and 385 deletions

View file

@ -1,9 +1,9 @@
use std::collections::HashMap;
use std::collections::hash_map::Entry;
use std::thread::Builder;
use std::sync::mpsc::{self, Sender, Receiver};
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::Weak;
use futures::stream::Stream;
use futures::task;
@ -11,17 +11,13 @@ use futures::task::Executor;
use futures::task::Run;
use cpal;
use cpal::Format;
use cpal::UnknownTypeBuffer;
use cpal::EventLoop;
use cpal::Voice;
use cpal::Endpoint;
use conversions::Sample;
use source::Pauseable;
use dynamic_mixer;
use source::Source;
use source::VolumeFilter;
use source::UniformSourceIterator;
/// The internal engine of this library.
///
@ -31,13 +27,7 @@ pub struct Engine {
events_loop: Arc<EventLoop>,
// TODO: don't use the endpoint name, as it's slow
end_points: Mutex<HashMap<String, Arc<EndPointVoices>>>,
}
struct EndPointVoices {
format: Format,
next_id: AtomicUsize,
pending_sounds: Mutex<Vec<(usize, QueueIterator)>>,
end_points: Mutex<HashMap<String, Weak<dynamic_mixer::DynamicMixerController<f32>>>>,
}
impl Engine {
@ -63,376 +53,108 @@ impl Engine {
}
/// Builds a new sink that targets a given endpoint.
pub fn start(&self, endpoint: &Endpoint) -> Handle {
let mut future_to_exec = None;
pub fn start<S>(&self, endpoint: &Endpoint, source: S)
where S: Source<Item = f32> + Send + 'static
{
let mixer = {
let mut end_points = self.end_points.lock().unwrap();
// Getting the `EndPointVoices` struct of the requested endpoint.
let end_point = self.end_points
.lock()
.unwrap()
.entry(endpoint.get_name())
.or_insert_with(|| {
// TODO: handle possible errors here
// determining the format to use for the new voice
let format = endpoint.get_supported_formats_list()
.unwrap()
.fold(None, |f1, f2| {
if f1.is_none() {
return Some(f2);
}
let f1 = f1.unwrap();
// we privilege f32 formats to avoid a conversion
if f2.data_type == cpal::SampleFormat::F32 &&
f1.data_type != cpal::SampleFormat::F32 {
return Some(f2);
}
// do not go below 44100 if possible
if f1.samples_rate.0 < 44100 {
return Some(f2);
}
// priviledge outputs with 2 channels for now
if f2.channels.len() == 2 && f1.channels.len() != 2 {
return Some(f2);
}
Some(f1)
})
.expect("The endpoint doesn't support any format!?");
let (mut voice, stream) = Voice::new(&endpoint, &format, &self.events_loop)
.unwrap();
let end_point_voices = Arc::new(EndPointVoices {
format: format,
next_id: AtomicUsize::new(1),
pending_sounds: Mutex::new(Vec::with_capacity(8)),
});
let epv = end_point_voices.clone();
let sounds = Arc::new(Mutex::new(Vec::new()));
future_to_exec = Some(stream.for_each(move |mut buffer| -> Result<_, ()> {
let mut sounds = sounds.lock().unwrap();
{
let mut pending = epv.pending_sounds.lock().unwrap();
sounds.append(&mut pending);
match end_points.entry(endpoint.get_name()) {
Entry::Vacant(e) => {
let arc = new_voice(endpoint, &self.events_loop);
e.insert(Arc::downgrade(&arc));
arc
},
Entry::Occupied(mut e) => {
if let Some(m) = e.get().upgrade() {
m.clone()
} else {
let voice = new_voice(endpoint, &self.events_loop);
e.insert(Arc::downgrade(&voice));
voice
}
if sounds.len() == 0 {
return Ok(());
}
// Drop if it's not playing a real source, and it's sink is detached
// or the sink was dropped before being detached.
sounds.retain(|s| {
if s.1.local_dead {
return false;
}
if !s.1.is_playing_real_source {
return !s.1.local_handle_dead;
} else {
true
}
});
let samples_iter = (0..).map(|_| {
let v = sounds.iter_mut()
.map(|s| s.1.next().unwrap_or(0.0))
.fold(0.0, |a, b| a + b);
if v < -1.0 {
-1.0
} else if v > 1.0 {
1.0
} else {
v
}
});
match buffer {
UnknownTypeBuffer::U16(ref mut buffer) => {
for (o, i) in buffer.iter_mut().zip(samples_iter) {
*o = i.to_u16();
}
}
UnknownTypeBuffer::I16(ref mut buffer) => {
for (o, i) in buffer.iter_mut().zip(samples_iter) {
*o = i.to_i16();
}
}
UnknownTypeBuffer::F32(ref mut buffer) => {
for (o, i) in buffer.iter_mut().zip(samples_iter) {
*o = i;
}
}
};
Ok(())
}));
voice.play(); // TODO: don't do this now
end_point_voices
})
.clone();
// Assigning an id for the handle.
let handle_id = end_point.next_id.fetch_add(1, Ordering::Relaxed);
// Initialize the volume
let volume = Arc::new(Mutex::new(1.0));
// If paused is set to true then don't play from this handle.
let paused = Arc::new(AtomicBool::new(false));
// If dead is set to true then this Handle should be removed.
let dead = Arc::new(AtomicBool::new(false));
// Used to detect detached handles.
let handle_dead = Arc::new(AtomicBool::new(false));
// `next_sounds` contains a Vec that can later be used to append new iterators to the sink
let next_sounds = Arc::new(Mutex::new(Vec::new()));
// Frequency with which dead value should be updated.
let update_frequency = (5 * end_point.format.samples_rate.0) / 1000;
let queue_iterator = QueueIterator {
current: Box::new(None.into_iter()),
signal_after_end: None,
next: next_sounds.clone(),
local_dead: false,
remote_dead: dead.clone(),
local_handle_dead: false,
remote_handle_dead: handle_dead.clone(),
samples_until_update: update_frequency,
update_frequency: update_frequency,
is_playing_real_source: true,
},
}
};
// Adding the new sound to the list of parallel sounds.
end_point.pending_sounds.lock().unwrap().push((handle_id, queue_iterator));
if let Some(future_to_exec) = future_to_exec {
struct MyExecutor;
impl Executor for MyExecutor {
fn execute(&self, r: Run) {
r.run();
}
}
task::spawn(future_to_exec).execute(Arc::new(MyExecutor));
}
// Returning the handle.
Handle {
handle_id: handle_id,
samples_rate: end_point.format.samples_rate.0,
channels: end_point.format.channels.len() as u16,
next_sounds: next_sounds,
dead: dead,
handle_dead: handle_dead,
paused: paused,
volume: volume,
end: Mutex::new(None),
}
mixer.add(source);
}
}
/// A sink.
///
/// Note that dropping the handle doesn't delete the sink. You must call `stop` explicitly.
pub struct Handle {
handle_id: usize,
// TODO: handle possible errors here
fn new_voice(endpoint: &Endpoint, events_loop: &Arc<EventLoop>) -> Arc<dynamic_mixer::DynamicMixerController<f32>> {
// Determine the format to use for the new voice.
let format = endpoint.get_supported_formats_list()
.unwrap()
.fold(None, |f1, f2| {
if f1.is_none() {
return Some(f2);
}
samples_rate: u32,
channels: u16,
let f1 = f1.unwrap();
// Pointer to paused value in Pausable
paused: Arc<AtomicBool>,
// We privilege f32 formats to avoid a conversion.
if f2.data_type == cpal::SampleFormat::F32 &&
f1.data_type != cpal::SampleFormat::F32 {
return Some(f2);
}
// Pointer to volume value in VolumeFilter
volume: Arc<Mutex<f32>>,
// Do not go below 44100 if possible.
if f1.samples_rate.0 < 44100 {
return Some(f2);
}
// Pointer to dead value in QueueIterator
dead: Arc<AtomicBool>,
// Priviledge outputs with 2 channels for now.
if f2.channels.len() == 2 && f1.channels.len() != 2 {
return Some(f2);
}
// Set this to true when we are dropped, regardless of if we were detached or not.
// This is read by the engine thread.
handle_dead: Arc<AtomicBool>,
Some(f1)
})
.expect("The endpoint doesn't support any format!?");
// Holds a pointer to the list of iterators to be played after the current one has
// finished playing.
next_sounds: Arc<Mutex<Vec<(Box<Iterator<Item = f32> + Send>, Option<Sender<()>>)>>>,
let (mut voice, stream) = Voice::new(&endpoint, &format, events_loop)
.unwrap();
// Receiver that is triggered when the last sound ends.
end: Mutex<Option<Receiver<()>>>,
}
let (mixer_tx, mut mixer_rx) = {
dynamic_mixer::mixer::<f32>(format.channels.len() as u16, format.samples_rate.0)
};
let future_to_exec = stream.for_each(move |mut buffer| -> Result<_, ()> {
match buffer {
UnknownTypeBuffer::U16(ref mut buffer) => {
for (o, i) in buffer.iter_mut().zip(mixer_rx.by_ref()) {
*o = i.to_u16();
}
}
UnknownTypeBuffer::I16(ref mut buffer) => {
for (o, i) in buffer.iter_mut().zip(mixer_rx.by_ref()) {
*o = i.to_i16();
}
}
UnknownTypeBuffer::F32(ref mut buffer) => {
for (o, i) in buffer.iter_mut().zip(mixer_rx.by_ref()) {
*o = i;
}
}
};
Ok(())
});
impl Handle {
/// Appends a new source of data after the current one.
///
/// Returns a receiver that is triggered when the sound is finished playing.
#[inline]
pub fn append<S>(&self, source: S)
where S: Source + Send + 'static,
S::Item: Sample + Clone + Send
{
// Updating `end`.
let (tx, rx) = mpsc::channel();
*self.end.lock().unwrap() = Some(rx);
let source = Pauseable::new(source, self.paused.clone(), 5);
let source = VolumeFilter::new(source, self.volume.clone(), 5);
// Pushing the source and the `tx` to `next_sounds`.
let source = UniformSourceIterator::new(source, self.channels, self.samples_rate);
let source = Box::new(source);
self.next_sounds.lock().unwrap().push((source, Some(tx)));
}
/// Gets the volume of the sound played by this sink.
#[inline]
pub fn volume(&self) -> f32 {
*self.volume.lock().unwrap()
}
/// Changes the volume of the sound played by this sink.
#[inline]
pub fn set_volume(&self, _value: f32) {
*self.volume.lock().unwrap() = _value.min(1.0).max(0.0);
}
/// If the sound is paused then resume playing it.
#[inline]
pub fn play(&self) {
self.paused.store(false, Ordering::Relaxed);
}
/// Pause the sound
#[inline]
pub fn pause(&self) {
self.paused.store(true, Ordering::Relaxed);
}
/// Returns true if the sound is currently paused
#[inline]
pub fn is_paused(&self) -> bool {
self.paused.load(Ordering::Relaxed)
}
/// Stops the sound.
// note that this method could take `self` instead of `&self`, but it makes the `Sink` object's
// life easier not to take `self`
#[inline]
pub fn stop(&self) {
self.dead.store(true, Ordering::Relaxed);
}
/// Sleeps the current thread until the sound ends.
#[inline]
pub fn sleep_until_end(&self) {
// Will either block when reading `end`, or will block in the mutex lock if another
// thread is already reading `end`.
let mut end = self.end.lock().unwrap();
if let Some(end) = end.take() {
let _ = end.recv();
}
}
}
impl Drop for Handle {
#[inline]
fn drop(&mut self) {
self.handle_dead.store(true, Ordering::Relaxed);
}
}
// Main source of samples for a voice.
struct QueueIterator {
// The current iterator that produces samples.
current: Box<Iterator<Item = f32> + Send>,
// Signal this sender before picking from `next`.
signal_after_end: Option<Sender<()>>,
// A `Vec` containing the next iterators to play. Shared with other threads so they can add
// sounds to the list.
next: Arc<Mutex<Vec<(Box<Iterator<Item = f32> + Send>, Option<Sender<()>>)>>>,
//Local storage of the dead value. Allows us to only check the remote occasionally.
local_dead: bool,
//The dead value which may be manipulated by another thread.
remote_dead: Arc<AtomicBool>,
// Local storage of the handle_dead value. Allows us to only check the remote occasionally.
local_handle_dead: bool,
// Is our handle dead? Used to identify situations with a dropped handle that's been detached.
remote_handle_dead: Arc<AtomicBool>,
//The frequency with which local_dead should be updated by remote_dead
update_frequency: u32,
//How many samples remain until it is time to update local_dead with remote_dead.
samples_until_update: u32,
// Whether we're playing a source from a sink, or a dummy iter
is_playing_real_source: bool,
}
impl Iterator for QueueIterator {
type Item = f32;
#[inline]
fn next(&mut self) -> Option<f32> {
self.samples_until_update -= 1;
if self.samples_until_update == 0 {
self.local_dead = self.remote_dead.load(Ordering::Relaxed);
self.local_handle_dead = self.remote_handle_dead.load(Ordering::Relaxed);
self.samples_until_update = self.update_frequency;
}
if self.local_dead {
return Some(0.0);
}
loop {
// basic situation that will happen most of the time
if let Some(sample) = self.current.next() {
return Some(sample);
struct MyExecutor;
impl Executor for MyExecutor {
fn execute(&self, r: Run) {
r.run();
}
if let Some(signal_after_end) = self.signal_after_end.take() {
let _ = signal_after_end.send(());
}
let (next, signal_after_end) = {
let mut next = self.next.lock().unwrap();
if next.len() == 0 {
self.is_playing_real_source = false;
// if there's no iter waiting, we create a dummy iter with 1000 null samples
// this avoids a spinlock
(Box::new((0..1000).map(|_| 0.0f32)) as Box<Iterator<Item = f32> + Send>, None)
} else {
self.is_playing_real_source = true;
next.remove(0)
}
};
self.current = next;
self.signal_after_end = signal_after_end;
}
task::spawn(future_to_exec).execute(Arc::new(MyExecutor));
}
#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
// TODO: slow? benchmark this
let next_hints = self.next
.lock()
.unwrap()
.iter()
.map(|i| i.0.size_hint().0)
.fold(0, |a, b| a + b);
(self.current.size_hint().0 + next_hints, None)
}
voice.play(); // TODO: don't do this now
// TODO: is there a chance that a context switch here starts playing the sound and
// immediately drops it?
mixer_tx
}

View file

@ -64,6 +64,11 @@ pub use decoder::Decoder;
pub use source::Source;
use std::io::{Read, Seek};
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::mpsc::Receiver;
use std::sync::Arc;
use std::sync::Mutex;
mod conversions;
mod engine;
@ -82,18 +87,25 @@ lazy_static! {
/// Dropping the `Sink` stops all sounds. You can use `detach` if you want the sounds to continue
/// playing.
pub struct Sink {
handle: engine::Handle,
// if true, then the sound will stop playing at the end
stop: bool,
queue_tx: Arc<queue::SourcesQueueInput<f32>>,
sleep_until_end: Mutex<Option<Receiver<()>>>,
pause: Arc<AtomicBool>,
volume: Arc<Mutex<f32>>,
}
impl Sink {
/// Builds a new `Sink`.
#[inline]
pub fn new(endpoint: &Endpoint) -> Sink {
let (queue_tx, queue_rx) = queue::queue(false);
ENGINE.start(endpoint, queue_rx);
Sink {
handle: ENGINE.start(&endpoint),
stop: true,
queue_tx: queue_tx,
sleep_until_end: Mutex::new(None),
pause: Arc::new(AtomicBool::new(false)),
volume: Arc::new(Mutex::new(1.0)),
}
}
@ -104,7 +116,10 @@ impl Sink {
S::Item: Sample,
S::Item: Send
{
self.handle.append(source);
let source = source::Pauseable::new(source, self.pause.clone(), 5);
let source = source::VolumeFilter::new(source, self.volume.clone(), 5);
let source = source::SamplesConverter::new(source);
*self.sleep_until_end.lock().unwrap() = Some(self.queue_tx.append_with_signal(source));
}
// Gets the volume of the sound.
@ -113,7 +128,7 @@ impl Sink {
/// multiply each sample by this value.
#[inline]
pub fn volume(&self) -> f32 {
self.handle.volume()
*self.volume.lock().unwrap()
}
/// Changes the volume of the sound.
@ -122,48 +137,51 @@ impl Sink {
/// multiply each sample by this value.
#[inline]
pub fn set_volume(&mut self, value: f32) {
self.handle.set_volume(value);
*self.volume.lock().unwrap() = value;
}
/// Resumes playback of a paused sound.
///
/// No effect if not paused.
#[inline]
pub fn play(&self) {
self.handle.play();
self.pause.store(false, Ordering::SeqCst);
}
/// Pauses playback of this sink.
///
/// A paused sound can be resumed with play
/// No effect if already paused.
///
/// A paused sound can be resumed with `play()`.
pub fn pause(&self) {
self.handle.pause();
self.pause.store(true, Ordering::SeqCst);
}
/// Gets if a sound is paused
///
/// Sounds can be paused and resumed using pause() and play(). This gets if a sound is paused.
/// Sounds can be paused and resumed using pause() and play(). This gets if a sound is paused.
pub fn is_paused(&self) -> bool {
self.handle.is_paused()
self.pause.load(Ordering::SeqCst)
}
/// Destroys the sink without stopping the sounds that are still playing.
#[inline]
pub fn detach(mut self) {
self.stop = false;
unimplemented!()
}
/// Sleeps the current thread until the sound ends.
#[inline]
pub fn sleep_until_end(&self) {
self.handle.sleep_until_end();
if let Some(sleep_until_end) = self.sleep_until_end.lock().unwrap().take() {
let _ = sleep_until_end.recv();
}
}
}
impl Drop for Sink {
#[inline]
fn drop(&mut self) {
if self.stop {
self.handle.stop();
}
}
}

View file

@ -148,7 +148,7 @@ impl<S> SourcesQueueOutput<S> where S: Sample + Send + 'static {
if self.keep_alive_if_empty {
// Play a short silence in order to avoid spinlocking.
let silence = Zero::<S>::new(1, 44000); // TODO: meh
(Box::new(silence) as Box<_>, None)
(Box::new(silence.take_duration(Duration::from_millis(10))) as Box<_>, None)
} else {
return Err(());
}

View file

@ -10,6 +10,7 @@ pub use self::fadein::FadeIn;
pub use self::mix::Mix;
pub use self::pauseable::Pauseable;
pub use self::repeat::Repeat;
pub use self::samples_converter::SamplesConverter;
pub use self::sine::SineWave;
pub use self::speed::Speed;
pub use self::take::TakeDuration;
@ -25,6 +26,7 @@ mod fadein;
mod mix;
mod pauseable;
mod repeat;
mod samples_converter;
mod sine;
mod speed;
mod take;

View file

@ -0,0 +1,85 @@
use std::marker::PhantomData;
use std::time::Duration;
use Sample;
use Source;
/// An iterator that reads from a `Source` and converts the samples to a specific rate and
/// channels count.
///
/// It implements `Source` as well, but all the data is guaranteed to be in a single frame whose
/// channels and samples rate have been passed to `new`.
#[derive(Clone)]
pub struct SamplesConverter<I, D>
where I: Source,
I::Item: Sample,
D: Sample
{
inner: I,
dest: PhantomData<D>,
}
impl<I, D> SamplesConverter<I, D>
where I: Source,
I::Item: Sample,
D: Sample
{
#[inline]
pub fn new(input: I) -> SamplesConverter<I, D> {
SamplesConverter {
inner: input,
dest: PhantomData,
}
}
}
impl<I, D> Iterator for SamplesConverter<I, D>
where I: Source,
I::Item: Sample,
D: Sample
{
type Item = D;
#[inline]
fn next(&mut self) -> Option<D> {
self.inner.next().map(|s| Sample::from(&s))
}
#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
}
impl<I, D> ExactSizeIterator for SamplesConverter<I, D>
where I: Source + ExactSizeIterator,
I::Item: Sample,
D: Sample
{
}
impl<I, D> Source for SamplesConverter<I, D>
where I: Source,
I::Item: Sample,
D: Sample
{
#[inline]
fn get_current_frame_len(&self) -> Option<usize> {
self.inner.get_current_frame_len()
}
#[inline]
fn get_channels(&self) -> u16 {
self.inner.get_channels()
}
#[inline]
fn get_samples_rate(&self) -> u32 {
self.inner.get_samples_rate()
}
#[inline]
fn get_total_duration(&self) -> Option<Duration> {
self.inner.get_total_duration()
}
}