mirror of
https://github.com/bevyengine/bevy
synced 2024-11-21 20:23:28 +00:00
Add optional single-threaded feature to bevy_ecs/bevy_tasks (#6690)
# Objective Fixes #6689. ## Solution Add `single-threaded` as an optional non-default feature to `bevy_ecs` and `bevy_tasks` that: - disable the `ParallelExecutor` as a default runner - disables the multi-threaded `TaskPool` - internally replace `QueryParIter::for_each` calls with `Query::for_each`. Removed the `Mutex` and `Arc` usage in the single-threaded task pool. ![image](https://user-images.githubusercontent.com/3137680/202833253-dd2d520f-75e6-4c7b-be2d-5ce1523cbd38.png) ## Future Work/TODO Create type aliases for `Mutex`, `Arc` that change to single-threaaded equivalents where possible. --- ## Changelog Added: Optional default feature `multi-theaded` to that enables multithreaded parallelism in the engine. Disabling it disables all multithreading in exchange for higher single threaded performance. Does nothing on WASM targets. --------- Co-authored-by: Carter Anderson <mcanders1@gmail.com>
This commit is contained in:
parent
a1feab939a
commit
d33f5c759c
11 changed files with 101 additions and 47 deletions
|
@ -44,6 +44,7 @@ default = [
|
|||
"bevy_sprite",
|
||||
"bevy_text",
|
||||
"bevy_ui",
|
||||
"multi-threaded",
|
||||
"png",
|
||||
"hdr",
|
||||
"ktx2",
|
||||
|
@ -199,6 +200,9 @@ filesystem_watcher = ["bevy_internal/filesystem_watcher"]
|
|||
# Enable serialization support through serde
|
||||
serialize = ["bevy_internal/serialize"]
|
||||
|
||||
# Enables multithreaded parallelism in the engine. Disabling it forces all engine tasks to run on a single thread.
|
||||
multi-threaded = ["bevy_internal/multi-threaded"]
|
||||
|
||||
# Wayland display server support
|
||||
wayland = ["bevy_internal/wayland"]
|
||||
|
||||
|
|
|
@ -11,7 +11,8 @@ categories = ["game-engines", "data-structures"]
|
|||
|
||||
[features]
|
||||
trace = []
|
||||
default = ["bevy_reflect"]
|
||||
multi-threaded = ["bevy_tasks/multi-threaded"]
|
||||
default = ["bevy_reflect", "multi-threaded"]
|
||||
|
||||
[dependencies]
|
||||
bevy_ptr = { path = "../bevy_ptr", version = "0.11.0-dev" }
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
use crate::{component::Tick, world::unsafe_world_cell::UnsafeWorldCell};
|
||||
use bevy_tasks::ComputeTaskPool;
|
||||
use std::ops::Range;
|
||||
|
||||
use super::{QueryItem, QueryState, ROQueryItem, ReadOnlyWorldQuery, WorldQuery};
|
||||
|
@ -34,6 +33,8 @@ pub struct BatchingStrategy {
|
|||
/// increase the scheduling overhead for the iteration.
|
||||
///
|
||||
/// Defaults to 1.
|
||||
///
|
||||
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
|
||||
pub batches_per_thread: usize,
|
||||
}
|
||||
|
||||
|
@ -148,23 +149,36 @@ impl<'w, 's, Q: WorldQuery, F: ReadOnlyWorldQuery> QueryParIter<'w, 's, Q, F> {
|
|||
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
|
||||
#[inline]
|
||||
unsafe fn for_each_unchecked<FN: Fn(QueryItem<'w, Q>) + Send + Sync + Clone>(&self, func: FN) {
|
||||
let thread_count = ComputeTaskPool::get().thread_num();
|
||||
if thread_count <= 1 {
|
||||
#[cfg(any(target = "wasm32", not(feature = "multi-threaded")))]
|
||||
{
|
||||
self.state
|
||||
.for_each_unchecked_manual(self.world, func, self.last_run, self.this_run);
|
||||
} else {
|
||||
// Need a batch size of at least 1.
|
||||
let batch_size = self.get_batch_size(thread_count).max(1);
|
||||
self.state.par_for_each_unchecked_manual(
|
||||
self.world,
|
||||
batch_size,
|
||||
func,
|
||||
self.last_run,
|
||||
self.this_run,
|
||||
);
|
||||
}
|
||||
#[cfg(all(not(target = "wasm32"), feature = "multi-threaded"))]
|
||||
{
|
||||
let thread_count = bevy_tasks::ComputeTaskPool::get().thread_num();
|
||||
if thread_count <= 1 {
|
||||
self.state.for_each_unchecked_manual(
|
||||
self.world,
|
||||
func,
|
||||
self.last_run,
|
||||
self.this_run,
|
||||
);
|
||||
} else {
|
||||
// Need a batch size of at least 1.
|
||||
let batch_size = self.get_batch_size(thread_count).max(1);
|
||||
self.state.par_for_each_unchecked_manual(
|
||||
self.world,
|
||||
batch_size,
|
||||
func,
|
||||
self.last_run,
|
||||
self.this_run,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(all(not(target = "wasm32"), feature = "multi-threaded"))]
|
||||
fn get_batch_size(&self, thread_count: usize) -> usize {
|
||||
if self.batching_strategy.batch_size_limits.is_empty() {
|
||||
return self.batching_strategy.batch_size_limits.start;
|
||||
|
|
|
@ -10,7 +10,6 @@ use crate::{
|
|||
storage::{TableId, TableRow},
|
||||
world::{unsafe_world_cell::UnsafeWorldCell, World, WorldId},
|
||||
};
|
||||
use bevy_tasks::ComputeTaskPool;
|
||||
#[cfg(feature = "trace")]
|
||||
use bevy_utils::tracing::Instrument;
|
||||
use fixedbitset::FixedBitSet;
|
||||
|
@ -1031,6 +1030,9 @@ impl<Q: WorldQuery, F: ReadOnlyWorldQuery> QueryState<Q, F> {
|
|||
/// have unique access to the components they query.
|
||||
/// This does not validate that `world.id()` matches `self.world_id`. Calling this on a `world`
|
||||
/// with a mismatched [`WorldId`] is unsound.
|
||||
///
|
||||
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
|
||||
#[cfg(all(not(target = "wasm32"), feature = "multi-threaded"))]
|
||||
pub(crate) unsafe fn par_for_each_unchecked_manual<
|
||||
'w,
|
||||
FN: Fn(Q::Item<'w>) + Send + Sync + Clone,
|
||||
|
@ -1044,7 +1046,7 @@ impl<Q: WorldQuery, F: ReadOnlyWorldQuery> QueryState<Q, F> {
|
|||
) {
|
||||
// NOTE: If you are changing query iteration code, remember to update the following places, where relevant:
|
||||
// QueryIter, QueryIterationCursor, QueryManyIter, QueryCombinationIter, QueryState::for_each_unchecked_manual, QueryState::par_for_each_unchecked_manual
|
||||
ComputeTaskPool::get().scope(|scope| {
|
||||
bevy_tasks::ComputeTaskPool::get().scope(|scope| {
|
||||
if Q::IS_DENSE && F::IS_DENSE {
|
||||
// SAFETY: We only access table data that has been registered in `self.archetype_component_access`.
|
||||
let tables = &world.storages().tables;
|
||||
|
|
|
@ -33,13 +33,13 @@ pub enum ExecutorKind {
|
|||
///
|
||||
/// Useful if you're dealing with a single-threaded environment, saving your threads for
|
||||
/// other things, or just trying minimize overhead.
|
||||
#[cfg_attr(target_arch = "wasm32", default)]
|
||||
#[cfg_attr(any(target_arch = "wasm32", not(feature = "multi-threaded")), default)]
|
||||
SingleThreaded,
|
||||
/// Like [`SingleThreaded`](ExecutorKind::SingleThreaded) but calls [`apply_deferred`](crate::system::System::apply_deferred)
|
||||
/// immediately after running each system.
|
||||
Simple,
|
||||
/// Runs the schedule using a thread pool. Non-conflicting systems can run in parallel.
|
||||
#[cfg_attr(not(target_arch = "wasm32"), default)]
|
||||
#[cfg_attr(all(not(target_arch = "wasm32"), feature = "multi-threaded"), default)]
|
||||
MultiThreaded,
|
||||
}
|
||||
|
||||
|
|
|
@ -66,6 +66,7 @@ shader_format_spirv = ["bevy_render/shader_format_spirv"]
|
|||
filesystem_watcher = ["bevy_asset/filesystem_watcher"]
|
||||
|
||||
serialize = ["bevy_core/serialize", "bevy_input/serialize", "bevy_time/serialize", "bevy_window/serialize", "bevy_transform/serialize", "bevy_math/serialize", "bevy_scene/serialize"]
|
||||
multi-threaded = ["bevy_ecs/multi-threaded", "bevy_tasks/multi-threaded"]
|
||||
|
||||
# Display server protocol support (X11 is enabled by default)
|
||||
wayland = ["bevy_winit/wayland"]
|
||||
|
|
|
@ -81,7 +81,7 @@ impl PluginGroup for DefaultPlugins {
|
|||
// compressed texture formats
|
||||
.add(bevy_render::texture::ImagePlugin::default());
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
#[cfg(all(not(target_arch = "wasm32"), feature = "multi-threaded"))]
|
||||
{
|
||||
group = group
|
||||
.add(bevy_render::pipelined_rendering::PipelinedRenderingPlugin::default());
|
||||
|
|
|
@ -8,6 +8,10 @@ repository = "https://github.com/bevyengine/bevy"
|
|||
license = "MIT OR Apache-2.0"
|
||||
keywords = ["bevy"]
|
||||
|
||||
[features]
|
||||
multi-threaded = []
|
||||
default = ["multi-threaded"]
|
||||
|
||||
[dependencies]
|
||||
futures-lite = "1.4.0"
|
||||
async-executor = "1.3.0"
|
||||
|
|
|
@ -8,14 +8,14 @@ pub use slice::{ParallelSlice, ParallelSliceMut};
|
|||
mod task;
|
||||
pub use task::Task;
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
#[cfg(all(not(target_arch = "wasm32"), feature = "multi-threaded"))]
|
||||
mod task_pool;
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
#[cfg(all(not(target_arch = "wasm32"), feature = "multi-threaded"))]
|
||||
pub use task_pool::{Scope, TaskPool, TaskPoolBuilder};
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
#[cfg(any(target_arch = "wasm32", not(feature = "multi-threaded")))]
|
||||
mod single_threaded_task_pool;
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
#[cfg(any(target_arch = "wasm32", not(feature = "multi-threaded")))]
|
||||
pub use single_threaded_task_pool::{Scope, TaskPool, TaskPoolBuilder, ThreadExecutor};
|
||||
|
||||
mod usages;
|
||||
|
|
|
@ -1,9 +1,10 @@
|
|||
use std::{
|
||||
future::Future,
|
||||
marker::PhantomData,
|
||||
mem,
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
use std::sync::Arc;
|
||||
use std::{cell::RefCell, future::Future, marker::PhantomData, mem, rc::Rc};
|
||||
|
||||
thread_local! {
|
||||
static LOCAL_EXECUTOR: async_executor::LocalExecutor<'static> = async_executor::LocalExecutor::new();
|
||||
}
|
||||
|
||||
/// Used to create a TaskPool
|
||||
#[derive(Debug, Default, Clone)]
|
||||
|
@ -76,7 +77,7 @@ impl TaskPool {
|
|||
1
|
||||
}
|
||||
|
||||
/// Allows spawning non-`static futures on the thread pool. The function takes a callback,
|
||||
/// Allows spawning non-'static futures on the thread pool. The function takes a callback,
|
||||
/// passing a scope object into it. The scope object provided to the callback can be used
|
||||
/// to spawn tasks. This function will await the completion of all tasks before returning.
|
||||
///
|
||||
|
@ -108,8 +109,9 @@ impl TaskPool {
|
|||
let executor: &'env async_executor::LocalExecutor<'env> =
|
||||
unsafe { mem::transmute(executor) };
|
||||
|
||||
let results: Mutex<Vec<Arc<Mutex<Option<T>>>>> = Mutex::new(Vec::new());
|
||||
let results: &'env Mutex<Vec<Arc<Mutex<Option<T>>>>> = unsafe { mem::transmute(&results) };
|
||||
let results: RefCell<Vec<Rc<RefCell<Option<T>>>>> = RefCell::new(Vec::new());
|
||||
let results: &'env RefCell<Vec<Rc<RefCell<Option<T>>>>> =
|
||||
unsafe { mem::transmute(&results) };
|
||||
|
||||
let mut scope = Scope {
|
||||
executor,
|
||||
|
@ -125,29 +127,36 @@ impl TaskPool {
|
|||
// Loop until all tasks are done
|
||||
while executor.try_tick() {}
|
||||
|
||||
let results = scope.results.lock().unwrap();
|
||||
let results = scope.results.borrow();
|
||||
results
|
||||
.iter()
|
||||
.map(|result| result.lock().unwrap().take().unwrap())
|
||||
.map(|result| result.borrow_mut().take().unwrap())
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Spawns a static future onto the JS event loop. For now it is returning FakeTask
|
||||
/// instance with no-op detach method. Returning real Task is possible here, but tricky:
|
||||
/// future is running on JS event loop, Task is running on async_executor::LocalExecutor
|
||||
/// so some proxy future is needed. Moreover currently we don't have long-living
|
||||
/// LocalExecutor here (above `spawn` implementation creates temporary one)
|
||||
/// But for typical use cases it seems that current implementation should be sufficient:
|
||||
/// caller can spawn long-running future writing results to some channel / event queue
|
||||
/// and simply call detach on returned Task (like AssetServer does) - spawned future
|
||||
/// can write results to some channel / event queue.
|
||||
/// Spawns a static future onto the thread pool. The returned Task is a future. It can also be
|
||||
/// cancelled and "detached" allowing it to continue running without having to be polled by the
|
||||
/// end-user.
|
||||
///
|
||||
/// If the provided future is non-`Send`, [`TaskPool::spawn_local`] should be used instead.
|
||||
pub fn spawn<T>(&self, future: impl Future<Output = T> + 'static) -> FakeTask
|
||||
where
|
||||
T: 'static,
|
||||
{
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
wasm_bindgen_futures::spawn_local(async move {
|
||||
future.await;
|
||||
});
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
{
|
||||
LOCAL_EXECUTOR.with(|executor| {
|
||||
let _task = executor.spawn(future);
|
||||
// Loop until all tasks are done
|
||||
while executor.try_tick() {}
|
||||
});
|
||||
}
|
||||
|
||||
FakeTask
|
||||
}
|
||||
|
||||
|
@ -158,6 +167,24 @@ impl TaskPool {
|
|||
{
|
||||
self.spawn(future)
|
||||
}
|
||||
|
||||
/// Runs a function with the local executor. Typically used to tick
|
||||
/// the local executor on the main thread as it needs to share time with
|
||||
/// other things.
|
||||
///
|
||||
/// ```rust
|
||||
/// use bevy_tasks::TaskPool;
|
||||
///
|
||||
/// TaskPool::new().with_local_executor(|local_executor| {
|
||||
/// local_executor.try_tick();
|
||||
/// });
|
||||
/// ```
|
||||
pub fn with_local_executor<F, R>(&self, f: F) -> R
|
||||
where
|
||||
F: FnOnce(&async_executor::LocalExecutor) -> R,
|
||||
{
|
||||
LOCAL_EXECUTOR.with(f)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
@ -175,7 +202,7 @@ impl FakeTask {
|
|||
pub struct Scope<'scope, 'env: 'scope, T> {
|
||||
executor: &'env async_executor::LocalExecutor<'env>,
|
||||
// Vector to gather results of all futures spawned during scope run
|
||||
results: &'env Mutex<Vec<Arc<Mutex<Option<T>>>>>,
|
||||
results: &'env RefCell<Vec<Rc<RefCell<Option<T>>>>>,
|
||||
|
||||
// make `Scope` invariant over 'scope and 'env
|
||||
scope: PhantomData<&'scope mut &'scope ()>,
|
||||
|
@ -211,10 +238,10 @@ impl<'scope, 'env, T: Send + 'env> Scope<'scope, 'env, T> {
|
|||
///
|
||||
/// For more information, see [`TaskPool::scope`].
|
||||
pub fn spawn_on_scope<Fut: Future<Output = T> + 'env>(&self, f: Fut) {
|
||||
let result = Arc::new(Mutex::new(None));
|
||||
self.results.lock().unwrap().push(result.clone());
|
||||
let result = Rc::new(RefCell::new(None));
|
||||
self.results.borrow_mut().push(result.clone());
|
||||
let f = async move {
|
||||
result.lock().unwrap().replace(f.await);
|
||||
result.borrow_mut().replace(f.await);
|
||||
};
|
||||
self.executor.spawn(f).detach();
|
||||
}
|
||||
|
|
|
@ -31,6 +31,7 @@ The default feature set enables most of the expected features of a game engine,
|
|||
|filesystem_watcher|Enable watching file system for asset hot reload|
|
||||
|hdr|HDR image format support|
|
||||
|ktx2|KTX2 compressed texture support|
|
||||
|multi-threaded|Enables multithreaded parallelism in the engine. Disabling it forces all engine tasks to run on a single thread.|
|
||||
|png|PNG image format support|
|
||||
|tonemapping_luts|Include tonemapping Look Up Tables KTX2 files|
|
||||
|vorbis|OGG/VORBIS audio format support|
|
||||
|
|
Loading…
Reference in a new issue