refactor: Extract parallel queue abstraction (#7348)
# Objective
There's a repeating pattern of `ThreadLocal<Cell<Vec<T>>>` which is very
useful for low overhead, low contention multithreaded queues that have
cropped up in a few places in the engine. This pattern is surprisingly
useful when building deferred mutation across multiple threads, as noted
by it's use in `ParallelCommands`.
However, `ThreadLocal<Cell<Vec<T>>>` is not only a mouthful, it's also
hard to ensure the thread-local queue is replaced after it's been
temporarily removed from the `Cell`.
## Solution
Wrap the pattern into `bevy_utils::Parallel<T>` which codifies the
entire pattern and ensures the user follows the contract. Instead of
fetching indivdual cells, removing the value, mutating it, and replacing
it, `Parallel::get` returns a `ParRef<'a, T>` which contains the
temporarily removed value and a reference back to the cell, and will
write the mutated value back to the cell upon being dropped.
I would like to use this to simplify the remaining part of #4899 that
has not been adopted/merged.
---
## Changelog
TODO
---------
Co-authored-by: Joseph <21144246+JoJoJet@users.noreply.github.com>
2024-02-19 16:31:15 +00:00
|
|
|
use core::cell::Cell;
|
|
|
|
use thread_local::ThreadLocal;
|
|
|
|
|
|
|
|
/// A cohesive set of thread-local values of a given type.
|
|
|
|
///
|
|
|
|
/// Mutable references can be fetched if `T: Default` via [`Parallel::scope`].
|
|
|
|
#[derive(Default)]
|
|
|
|
pub struct Parallel<T: Send> {
|
|
|
|
locals: ThreadLocal<Cell<T>>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<T: Send> Parallel<T> {
|
|
|
|
/// Gets a mutable iterator over all of the per-thread queues.
|
|
|
|
pub fn iter_mut(&mut self) -> impl Iterator<Item = &'_ mut T> {
|
|
|
|
self.locals.iter_mut().map(|cell| cell.get_mut())
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Clears all of the stored thread local values.
|
|
|
|
pub fn clear(&mut self) {
|
|
|
|
self.locals.clear();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<T: Default + Send> Parallel<T> {
|
|
|
|
/// Retrieves the thread-local value for the current thread and runs `f` on it.
|
|
|
|
///
|
|
|
|
/// If there is no thread-local value, it will be initialized to it's default.
|
|
|
|
pub fn scope<R>(&self, f: impl FnOnce(&mut T) -> R) -> R {
|
|
|
|
let cell = self.locals.get_or_default();
|
|
|
|
let mut value = cell.take();
|
|
|
|
let ret = f(&mut value);
|
|
|
|
cell.set(value);
|
|
|
|
ret
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<T, I> Parallel<I>
|
|
|
|
where
|
|
|
|
I: IntoIterator<Item = T> + Default + Send + 'static,
|
|
|
|
{
|
|
|
|
/// Drains all enqueued items from all threads and returns an iterator over them.
|
|
|
|
///
|
|
|
|
/// Unlike [`Vec::drain`], this will piecemeal remove chunks of the data stored.
|
|
|
|
/// If iteration is terminated part way, the rest of the enqueued items in the same
|
|
|
|
/// chunk will be dropped, and the rest of the undrained elements will remain.
|
|
|
|
///
|
|
|
|
/// The ordering is not guaranteed.
|
|
|
|
pub fn drain<B>(&mut self) -> impl Iterator<Item = T> + '_
|
|
|
|
where
|
|
|
|
B: FromIterator<T>,
|
|
|
|
{
|
|
|
|
self.locals.iter_mut().flat_map(|item| item.take())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<T: Send> Parallel<Vec<T>> {
|
|
|
|
/// Collect all enqueued items from all threads and appends them to the end of a
|
|
|
|
/// single Vec.
|
|
|
|
///
|
2024-02-22 18:55:22 +00:00
|
|
|
/// The ordering is not guaranteed.
|
refactor: Extract parallel queue abstraction (#7348)
# Objective
There's a repeating pattern of `ThreadLocal<Cell<Vec<T>>>` which is very
useful for low overhead, low contention multithreaded queues that have
cropped up in a few places in the engine. This pattern is surprisingly
useful when building deferred mutation across multiple threads, as noted
by it's use in `ParallelCommands`.
However, `ThreadLocal<Cell<Vec<T>>>` is not only a mouthful, it's also
hard to ensure the thread-local queue is replaced after it's been
temporarily removed from the `Cell`.
## Solution
Wrap the pattern into `bevy_utils::Parallel<T>` which codifies the
entire pattern and ensures the user follows the contract. Instead of
fetching indivdual cells, removing the value, mutating it, and replacing
it, `Parallel::get` returns a `ParRef<'a, T>` which contains the
temporarily removed value and a reference back to the cell, and will
write the mutated value back to the cell upon being dropped.
I would like to use this to simplify the remaining part of #4899 that
has not been adopted/merged.
---
## Changelog
TODO
---------
Co-authored-by: Joseph <21144246+JoJoJet@users.noreply.github.com>
2024-02-19 16:31:15 +00:00
|
|
|
pub fn drain_into(&mut self, out: &mut Vec<T>) {
|
|
|
|
let size = self
|
|
|
|
.locals
|
|
|
|
.iter_mut()
|
|
|
|
.map(|queue| queue.get_mut().len())
|
|
|
|
.sum();
|
|
|
|
out.reserve(size);
|
|
|
|
for queue in self.locals.iter_mut() {
|
|
|
|
out.append(queue.get_mut());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|