Add ParallelCommands system parameter (#4749)

(follow-up to #4423)
# Objective
Currently, it isn't possible to easily fire commands from within par_for_each blocks. This PR allows for issuing commands from within parallel scopes.
This commit is contained in:
TheRawMeatball 2022-06-06 14:46:41 +00:00
parent 2f5a1c6e16
commit 85cd0eb445
4 changed files with 102 additions and 1 deletions

View file

@ -21,6 +21,7 @@ bevy_utils = { path = "../bevy_utils", version = "0.8.0-dev" }
bevy_ecs_macros = { path = "macros", version = "0.8.0-dev" }
async-channel = "1.4"
thread_local = "1.1.4"
fixedbitset = "0.4"
fxhash = "0.2"
downcast-rs = "1.2"

View file

@ -39,7 +39,7 @@ pub mod prelude {
},
system::{
Commands, In, IntoChainSystem, IntoExclusiveSystem, IntoSystem, Local, NonSend,
NonSendMut, ParamSet, Query, RemovedComponents, Res, ResMut, System,
NonSendMut, ParallelCommands, ParamSet, Query, RemovedComponents, Res, ResMut, System,
SystemParamFunction,
},
world::{FromWorld, Mut, World},

View file

@ -1,4 +1,5 @@
mod command_queue;
mod parallel_scope;
use crate::{
bundle::Bundle,
@ -8,6 +9,7 @@ use crate::{
};
use bevy_utils::tracing::{error, warn};
pub use command_queue::CommandQueue;
pub use parallel_scope::*;
use std::marker::PhantomData;
use super::Resource;

View file

@ -0,0 +1,98 @@
use std::cell::Cell;
use thread_local::ThreadLocal;
use crate::{
entity::Entities,
prelude::World,
system::{SystemParam, SystemParamFetch, SystemParamState},
};
use super::{CommandQueue, Commands};
#[doc(hidden)]
#[derive(Default)]
/// The internal [`SystemParamState`] of the [`ParallelCommands`] type
pub struct ParallelCommandsState {
thread_local_storage: ThreadLocal<Cell<CommandQueue>>,
}
/// An alternative to [`Commands`] that can be used in parallel contexts, such as those in [`Query::par_for_each`](crate::system::Query::par_for_each)
///
/// Note: Because command application order will depend on how many threads are ran, non-commutative commands may result in non-deterministic results.
///
/// Example:
/// ```
/// # use bevy_ecs::prelude::*;
/// # use bevy_tasks::ComputeTaskPool;
/// #
/// # #[derive(Component)]
/// # struct Velocity;
/// # impl Velocity { fn magnitude(&self) -> f32 { 42.0 } }
/// fn parallel_command_system(
/// mut query: Query<(Entity, &Velocity)>,
/// par_commands: ParallelCommands
/// ) {
/// query.par_for_each(32, |(entity, velocity)| {
/// if velocity.magnitude() > 10.0 {
/// par_commands.command_scope(|mut commands| {
/// commands.entity(entity).despawn();
/// });
/// }
/// });
/// }
/// # bevy_ecs::system::assert_is_system(parallel_command_system);
///```
pub struct ParallelCommands<'w, 's> {
state: &'s mut ParallelCommandsState,
entities: &'w Entities,
}
impl SystemParam for ParallelCommands<'_, '_> {
type Fetch = ParallelCommandsState;
}
impl<'w, 's> SystemParamFetch<'w, 's> for ParallelCommandsState {
type Item = ParallelCommands<'w, 's>;
unsafe fn get_param(
state: &'s mut Self,
_: &crate::system::SystemMeta,
world: &'w World,
_: u32,
) -> Self::Item {
ParallelCommands {
state,
entities: world.entities(),
}
}
}
// SAFE: no component or resource access to report
unsafe impl SystemParamState for ParallelCommandsState {
fn init(_: &mut World, _: &mut crate::system::SystemMeta) -> Self {
Self::default()
}
fn apply(&mut self, world: &mut World) {
for cq in self.thread_local_storage.iter_mut() {
cq.get_mut().apply(world);
}
}
}
impl<'w, 's> ParallelCommands<'w, 's> {
pub fn command_scope<R>(&self, f: impl FnOnce(Commands) -> R) -> R {
let store = &self.state.thread_local_storage;
let command_queue_cell = store.get_or_default();
let mut command_queue = command_queue_cell.take();
let r = f(Commands::new_from_entities(
&mut command_queue,
self.entities,
));
command_queue_cell.set(command_queue);
r
}
}