diff --git a/Cargo.toml b/Cargo.toml index 8e8f2f4997..3f3f3fdc24 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,6 +44,7 @@ default = [ "bevy_sprite", "bevy_text", "bevy_ui", + "multi-threaded", "png", "hdr", "ktx2", @@ -199,6 +200,9 @@ filesystem_watcher = ["bevy_internal/filesystem_watcher"] # Enable serialization support through serde serialize = ["bevy_internal/serialize"] +# Enables multithreaded parallelism in the engine. Disabling it forces all engine tasks to run on a single thread. +multi-threaded = ["bevy_internal/multi-threaded"] + # Wayland display server support wayland = ["bevy_internal/wayland"] diff --git a/crates/bevy_ecs/Cargo.toml b/crates/bevy_ecs/Cargo.toml index c19073b2c9..7551a987ae 100644 --- a/crates/bevy_ecs/Cargo.toml +++ b/crates/bevy_ecs/Cargo.toml @@ -11,7 +11,8 @@ categories = ["game-engines", "data-structures"] [features] trace = [] -default = ["bevy_reflect"] +multi-threaded = ["bevy_tasks/multi-threaded"] +default = ["bevy_reflect", "multi-threaded"] [dependencies] bevy_ptr = { path = "../bevy_ptr", version = "0.11.0-dev" } diff --git a/crates/bevy_ecs/src/query/par_iter.rs b/crates/bevy_ecs/src/query/par_iter.rs index 426858b89a..6df124848a 100644 --- a/crates/bevy_ecs/src/query/par_iter.rs +++ b/crates/bevy_ecs/src/query/par_iter.rs @@ -1,5 +1,4 @@ use crate::{component::Tick, world::unsafe_world_cell::UnsafeWorldCell}; -use bevy_tasks::ComputeTaskPool; use std::ops::Range; use super::{QueryItem, QueryState, ROQueryItem, ReadOnlyWorldQuery, WorldQuery}; @@ -34,6 +33,8 @@ pub struct BatchingStrategy { /// increase the scheduling overhead for the iteration. /// /// Defaults to 1. + /// + /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool pub batches_per_thread: usize, } @@ -148,23 +149,36 @@ impl<'w, 's, Q: WorldQuery, F: ReadOnlyWorldQuery> QueryParIter<'w, 's, Q, F> { /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool #[inline] unsafe fn for_each_unchecked) + Send + Sync + Clone>(&self, func: FN) { - let thread_count = ComputeTaskPool::get().thread_num(); - if thread_count <= 1 { + #[cfg(any(target = "wasm32", not(feature = "multi-threaded")))] + { self.state .for_each_unchecked_manual(self.world, func, self.last_run, self.this_run); - } else { - // Need a batch size of at least 1. - let batch_size = self.get_batch_size(thread_count).max(1); - self.state.par_for_each_unchecked_manual( - self.world, - batch_size, - func, - self.last_run, - self.this_run, - ); + } + #[cfg(all(not(target = "wasm32"), feature = "multi-threaded"))] + { + let thread_count = bevy_tasks::ComputeTaskPool::get().thread_num(); + if thread_count <= 1 { + self.state.for_each_unchecked_manual( + self.world, + func, + self.last_run, + self.this_run, + ); + } else { + // Need a batch size of at least 1. + let batch_size = self.get_batch_size(thread_count).max(1); + self.state.par_for_each_unchecked_manual( + self.world, + batch_size, + func, + self.last_run, + self.this_run, + ); + } } } + #[cfg(all(not(target = "wasm32"), feature = "multi-threaded"))] fn get_batch_size(&self, thread_count: usize) -> usize { if self.batching_strategy.batch_size_limits.is_empty() { return self.batching_strategy.batch_size_limits.start; diff --git a/crates/bevy_ecs/src/query/state.rs b/crates/bevy_ecs/src/query/state.rs index aee797ccf6..ef01805937 100644 --- a/crates/bevy_ecs/src/query/state.rs +++ b/crates/bevy_ecs/src/query/state.rs @@ -10,7 +10,6 @@ use crate::{ storage::{TableId, TableRow}, world::{unsafe_world_cell::UnsafeWorldCell, World, WorldId}, }; -use bevy_tasks::ComputeTaskPool; #[cfg(feature = "trace")] use bevy_utils::tracing::Instrument; use fixedbitset::FixedBitSet; @@ -1031,6 +1030,9 @@ impl QueryState { /// have unique access to the components they query. /// This does not validate that `world.id()` matches `self.world_id`. Calling this on a `world` /// with a mismatched [`WorldId`] is unsound. + /// + /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool + #[cfg(all(not(target = "wasm32"), feature = "multi-threaded"))] pub(crate) unsafe fn par_for_each_unchecked_manual< 'w, FN: Fn(Q::Item<'w>) + Send + Sync + Clone, @@ -1044,7 +1046,7 @@ impl QueryState { ) { // NOTE: If you are changing query iteration code, remember to update the following places, where relevant: // QueryIter, QueryIterationCursor, QueryManyIter, QueryCombinationIter, QueryState::for_each_unchecked_manual, QueryState::par_for_each_unchecked_manual - ComputeTaskPool::get().scope(|scope| { + bevy_tasks::ComputeTaskPool::get().scope(|scope| { if Q::IS_DENSE && F::IS_DENSE { // SAFETY: We only access table data that has been registered in `self.archetype_component_access`. let tables = &world.storages().tables; diff --git a/crates/bevy_ecs/src/schedule/executor/mod.rs b/crates/bevy_ecs/src/schedule/executor/mod.rs index ffc4ca1bd7..2b2a6c139e 100644 --- a/crates/bevy_ecs/src/schedule/executor/mod.rs +++ b/crates/bevy_ecs/src/schedule/executor/mod.rs @@ -33,13 +33,13 @@ pub enum ExecutorKind { /// /// Useful if you're dealing with a single-threaded environment, saving your threads for /// other things, or just trying minimize overhead. - #[cfg_attr(target_arch = "wasm32", default)] + #[cfg_attr(any(target_arch = "wasm32", not(feature = "multi-threaded")), default)] SingleThreaded, /// Like [`SingleThreaded`](ExecutorKind::SingleThreaded) but calls [`apply_deferred`](crate::system::System::apply_deferred) /// immediately after running each system. Simple, /// Runs the schedule using a thread pool. Non-conflicting systems can run in parallel. - #[cfg_attr(not(target_arch = "wasm32"), default)] + #[cfg_attr(all(not(target_arch = "wasm32"), feature = "multi-threaded"), default)] MultiThreaded, } diff --git a/crates/bevy_internal/Cargo.toml b/crates/bevy_internal/Cargo.toml index 8c8fa2c4df..658ab05977 100644 --- a/crates/bevy_internal/Cargo.toml +++ b/crates/bevy_internal/Cargo.toml @@ -66,6 +66,7 @@ shader_format_spirv = ["bevy_render/shader_format_spirv"] filesystem_watcher = ["bevy_asset/filesystem_watcher"] serialize = ["bevy_core/serialize", "bevy_input/serialize", "bevy_time/serialize", "bevy_window/serialize", "bevy_transform/serialize", "bevy_math/serialize", "bevy_scene/serialize"] +multi-threaded = ["bevy_ecs/multi-threaded", "bevy_tasks/multi-threaded"] # Display server protocol support (X11 is enabled by default) wayland = ["bevy_winit/wayland"] diff --git a/crates/bevy_internal/src/default_plugins.rs b/crates/bevy_internal/src/default_plugins.rs index fe3be50bf9..95bee297d8 100644 --- a/crates/bevy_internal/src/default_plugins.rs +++ b/crates/bevy_internal/src/default_plugins.rs @@ -81,7 +81,7 @@ impl PluginGroup for DefaultPlugins { // compressed texture formats .add(bevy_render::texture::ImagePlugin::default()); - #[cfg(not(target_arch = "wasm32"))] + #[cfg(all(not(target_arch = "wasm32"), feature = "multi-threaded"))] { group = group .add(bevy_render::pipelined_rendering::PipelinedRenderingPlugin::default()); diff --git a/crates/bevy_tasks/Cargo.toml b/crates/bevy_tasks/Cargo.toml index 6016348595..9005800f15 100644 --- a/crates/bevy_tasks/Cargo.toml +++ b/crates/bevy_tasks/Cargo.toml @@ -8,6 +8,10 @@ repository = "https://github.com/bevyengine/bevy" license = "MIT OR Apache-2.0" keywords = ["bevy"] +[features] +multi-threaded = [] +default = ["multi-threaded"] + [dependencies] futures-lite = "1.4.0" async-executor = "1.3.0" diff --git a/crates/bevy_tasks/src/lib.rs b/crates/bevy_tasks/src/lib.rs index 5382228830..ffdce2d63c 100644 --- a/crates/bevy_tasks/src/lib.rs +++ b/crates/bevy_tasks/src/lib.rs @@ -8,14 +8,14 @@ pub use slice::{ParallelSlice, ParallelSliceMut}; mod task; pub use task::Task; -#[cfg(not(target_arch = "wasm32"))] +#[cfg(all(not(target_arch = "wasm32"), feature = "multi-threaded"))] mod task_pool; -#[cfg(not(target_arch = "wasm32"))] +#[cfg(all(not(target_arch = "wasm32"), feature = "multi-threaded"))] pub use task_pool::{Scope, TaskPool, TaskPoolBuilder}; -#[cfg(target_arch = "wasm32")] +#[cfg(any(target_arch = "wasm32", not(feature = "multi-threaded")))] mod single_threaded_task_pool; -#[cfg(target_arch = "wasm32")] +#[cfg(any(target_arch = "wasm32", not(feature = "multi-threaded")))] pub use single_threaded_task_pool::{Scope, TaskPool, TaskPoolBuilder, ThreadExecutor}; mod usages; diff --git a/crates/bevy_tasks/src/single_threaded_task_pool.rs b/crates/bevy_tasks/src/single_threaded_task_pool.rs index b1546884bc..92e2928124 100644 --- a/crates/bevy_tasks/src/single_threaded_task_pool.rs +++ b/crates/bevy_tasks/src/single_threaded_task_pool.rs @@ -1,9 +1,10 @@ -use std::{ - future::Future, - marker::PhantomData, - mem, - sync::{Arc, Mutex}, -}; +#[cfg(target_arch = "wasm32")] +use std::sync::Arc; +use std::{cell::RefCell, future::Future, marker::PhantomData, mem, rc::Rc}; + +thread_local! { + static LOCAL_EXECUTOR: async_executor::LocalExecutor<'static> = async_executor::LocalExecutor::new(); +} /// Used to create a TaskPool #[derive(Debug, Default, Clone)] @@ -76,7 +77,7 @@ impl TaskPool { 1 } - /// Allows spawning non-`static futures on the thread pool. The function takes a callback, + /// Allows spawning non-'static futures on the thread pool. The function takes a callback, /// passing a scope object into it. The scope object provided to the callback can be used /// to spawn tasks. This function will await the completion of all tasks before returning. /// @@ -108,8 +109,9 @@ impl TaskPool { let executor: &'env async_executor::LocalExecutor<'env> = unsafe { mem::transmute(executor) }; - let results: Mutex>>>> = Mutex::new(Vec::new()); - let results: &'env Mutex>>>> = unsafe { mem::transmute(&results) }; + let results: RefCell>>>> = RefCell::new(Vec::new()); + let results: &'env RefCell>>>> = + unsafe { mem::transmute(&results) }; let mut scope = Scope { executor, @@ -125,29 +127,36 @@ impl TaskPool { // Loop until all tasks are done while executor.try_tick() {} - let results = scope.results.lock().unwrap(); + let results = scope.results.borrow(); results .iter() - .map(|result| result.lock().unwrap().take().unwrap()) + .map(|result| result.borrow_mut().take().unwrap()) .collect() } - /// Spawns a static future onto the JS event loop. For now it is returning FakeTask - /// instance with no-op detach method. Returning real Task is possible here, but tricky: - /// future is running on JS event loop, Task is running on async_executor::LocalExecutor - /// so some proxy future is needed. Moreover currently we don't have long-living - /// LocalExecutor here (above `spawn` implementation creates temporary one) - /// But for typical use cases it seems that current implementation should be sufficient: - /// caller can spawn long-running future writing results to some channel / event queue - /// and simply call detach on returned Task (like AssetServer does) - spawned future - /// can write results to some channel / event queue. + /// Spawns a static future onto the thread pool. The returned Task is a future. It can also be + /// cancelled and "detached" allowing it to continue running without having to be polled by the + /// end-user. + /// + /// If the provided future is non-`Send`, [`TaskPool::spawn_local`] should be used instead. pub fn spawn(&self, future: impl Future + 'static) -> FakeTask where T: 'static, { + #[cfg(target_arch = "wasm32")] wasm_bindgen_futures::spawn_local(async move { future.await; }); + + #[cfg(not(target_arch = "wasm32"))] + { + LOCAL_EXECUTOR.with(|executor| { + let _task = executor.spawn(future); + // Loop until all tasks are done + while executor.try_tick() {} + }); + } + FakeTask } @@ -158,6 +167,24 @@ impl TaskPool { { self.spawn(future) } + + /// Runs a function with the local executor. Typically used to tick + /// the local executor on the main thread as it needs to share time with + /// other things. + /// + /// ```rust + /// use bevy_tasks::TaskPool; + /// + /// TaskPool::new().with_local_executor(|local_executor| { + /// local_executor.try_tick(); + /// }); + /// ``` + pub fn with_local_executor(&self, f: F) -> R + where + F: FnOnce(&async_executor::LocalExecutor) -> R, + { + LOCAL_EXECUTOR.with(f) + } } #[derive(Debug)] @@ -175,7 +202,7 @@ impl FakeTask { pub struct Scope<'scope, 'env: 'scope, T> { executor: &'env async_executor::LocalExecutor<'env>, // Vector to gather results of all futures spawned during scope run - results: &'env Mutex>>>>, + results: &'env RefCell>>>>, // make `Scope` invariant over 'scope and 'env scope: PhantomData<&'scope mut &'scope ()>, @@ -211,10 +238,10 @@ impl<'scope, 'env, T: Send + 'env> Scope<'scope, 'env, T> { /// /// For more information, see [`TaskPool::scope`]. pub fn spawn_on_scope + 'env>(&self, f: Fut) { - let result = Arc::new(Mutex::new(None)); - self.results.lock().unwrap().push(result.clone()); + let result = Rc::new(RefCell::new(None)); + self.results.borrow_mut().push(result.clone()); let f = async move { - result.lock().unwrap().replace(f.await); + result.borrow_mut().replace(f.await); }; self.executor.spawn(f).detach(); } diff --git a/docs/cargo_features.md b/docs/cargo_features.md index bf6a5e55fc..497fa403a3 100644 --- a/docs/cargo_features.md +++ b/docs/cargo_features.md @@ -31,6 +31,7 @@ The default feature set enables most of the expected features of a game engine, |filesystem_watcher|Enable watching file system for asset hot reload| |hdr|HDR image format support| |ktx2|KTX2 compressed texture support| +|multi-threaded|Enables multithreaded parallelism in the engine. Disabling it forces all engine tasks to run on a single thread.| |png|PNG image format support| |tonemapping_luts|Include tonemapping Look Up Tables KTX2 files| |vorbis|OGG/VORBIS audio format support|