Run the multi-threaded executor at the end of each system task. (#11906)

# Objective

The multi-threaded executor currently runs in a dedicated task on a
single thread. When a system finishes running, it needs to notify that
task and wait for the thread to be available and running before the
executor can process the completion.

See #8304

## Solution

Run the multi-threaded executor at the end of each system task. This
allows it to run immediately instead of needing to wait for the main
thread to wake up. Move the mutable executor state into a separate
struct and wrap it in a mutex so it can be shared among the worker
threads.

While this should be faster in theory, I don't actually know how to
measure the performance impact myself.

---------

Co-authored-by: James Liu <contact@jamessliu.com>
Co-authored-by: Mike <mike.hsu@gmail.com>
This commit is contained in:
Chris Russell 2024-02-25 22:18:34 -05:00 committed by GitHub
parent 3a1b9b98e4
commit c4caebb528
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 309 additions and 244 deletions

View file

@ -22,7 +22,7 @@ bevy_tasks = { path = "../bevy_tasks", version = "0.14.0-dev" }
bevy_utils = { path = "../bevy_utils", version = "0.14.0-dev" }
bevy_ecs_macros = { path = "macros", version = "0.14.0-dev" }
async-channel = "2.2.0"
concurrent-queue = "2.4.0"
fixedbitset = "0.4.2"
rustc-hash = "1.1"
downcast-rs = "1.2"

View file

@ -3,14 +3,14 @@ use std::{
sync::{Arc, Mutex},
};
use bevy_tasks::{block_on, poll_once, ComputeTaskPool, Scope, TaskPool, ThreadExecutor};
use bevy_tasks::{ComputeTaskPool, Scope, TaskPool, ThreadExecutor};
use bevy_utils::default;
use bevy_utils::syncunsafecell::SyncUnsafeCell;
#[cfg(feature = "trace")]
use bevy_utils::tracing::{info_span, Instrument, Span};
use bevy_utils::tracing::{info_span, Span};
use std::panic::AssertUnwindSafe;
use async_channel::{Receiver, Sender};
use concurrent_queue::ConcurrentQueue;
use fixedbitset::FixedBitSet;
use crate::{
@ -24,10 +24,12 @@ use crate::{
use crate as bevy_ecs;
/// A funky borrow split of [`SystemSchedule`] required by the [`MultiThreadedExecutor`].
struct SyncUnsafeSchedule<'a> {
systems: &'a [SyncUnsafeCell<BoxedSystem>],
conditions: Conditions<'a>,
/// Borrowed data used by the [`MultiThreadedExecutor`].
struct Environment<'env, 'sys> {
executor: &'env MultiThreadedExecutor,
systems: &'sys [SyncUnsafeCell<BoxedSystem>],
conditions: Mutex<Conditions<'sys>>,
world_cell: UnsafeWorldCell<'env>,
}
struct Conditions<'a> {
@ -37,16 +39,22 @@ struct Conditions<'a> {
systems_in_sets_with_conditions: &'a [FixedBitSet],
}
impl SyncUnsafeSchedule<'_> {
fn new(schedule: &mut SystemSchedule) -> SyncUnsafeSchedule<'_> {
SyncUnsafeSchedule {
impl<'env, 'sys> Environment<'env, 'sys> {
fn new(
executor: &'env MultiThreadedExecutor,
schedule: &'sys mut SystemSchedule,
world: &'env mut World,
) -> Self {
Environment {
executor,
systems: SyncUnsafeCell::from_mut(schedule.systems.as_mut_slice()).as_slice_of_cells(),
conditions: Conditions {
conditions: Mutex::new(Conditions {
system_conditions: &mut schedule.system_conditions,
set_conditions: &mut schedule.set_conditions,
sets_with_conditions_of_systems: &schedule.sets_with_conditions_of_systems,
systems_in_sets_with_conditions: &schedule.systems_in_sets_with_conditions,
},
}),
world_cell: world.as_unsafe_world_cell(),
}
}
}
@ -75,10 +83,21 @@ struct SystemResult {
/// Runs the schedule using a thread pool. Non-conflicting systems can run in parallel.
pub struct MultiThreadedExecutor {
/// Sends system completion events.
sender: Sender<SystemResult>,
/// Receives system completion events.
receiver: Receiver<SystemResult>,
/// The running state, protected by a mutex so that a reference to the executor can be shared across tasks.
state: Mutex<ExecutorState>,
/// Queue of system completion events.
system_completion: ConcurrentQueue<SystemResult>,
/// Setting when true applies deferred system buffers after all systems have run
apply_final_deferred: bool,
/// When set, tells the executor that a thread has panicked.
panic_payload: Mutex<Option<Box<dyn Any + Send>>>,
/// Cached tracing span
#[cfg(feature = "trace")]
executor_span: Span,
}
/// The state of the executor while running.
pub struct ExecutorState {
/// Metadata for scheduling and running system tasks.
system_task_metadata: Vec<SystemTaskMetadata>,
/// Union of the accesses of all currently running systems.
@ -109,14 +128,20 @@ pub struct MultiThreadedExecutor {
completed_systems: FixedBitSet,
/// Systems that have run but have not had their buffers applied.
unapplied_systems: FixedBitSet,
/// Setting when true applies deferred system buffers after all systems have run
apply_final_deferred: bool,
/// When set, tells the executor that a thread has panicked.
panic_payload: Arc<Mutex<Option<Box<dyn Any + Send>>>>,
/// When set, stops the executor from running any more systems.
stop_spawning: bool,
}
/// References to data required by the executor.
/// This is copied to each system task so that can invoke the executor when they complete.
// These all need to outlive 'scope in order to be sent to new tasks,
// and keeping them all in a struct means we can use lifetime elision.
#[derive(Copy, Clone)]
struct Context<'scope, 'env, 'sys> {
environment: &'env Environment<'env, 'sys>,
scope: &'scope Scope<'scope, 'env, ()>,
}
impl Default for MultiThreadedExecutor {
fn default() -> Self {
Self::new()
@ -129,25 +154,23 @@ impl SystemExecutor for MultiThreadedExecutor {
}
fn init(&mut self, schedule: &SystemSchedule) {
let state = self.state.get_mut().unwrap();
// pre-allocate space
let sys_count = schedule.system_ids.len();
let set_count = schedule.set_ids.len();
let (tx, rx) = async_channel::bounded(sys_count.max(1));
self.system_completion = ConcurrentQueue::bounded(sys_count.max(1));
state.evaluated_sets = FixedBitSet::with_capacity(set_count);
state.ready_systems = FixedBitSet::with_capacity(sys_count);
state.ready_systems_copy = FixedBitSet::with_capacity(sys_count);
state.running_systems = FixedBitSet::with_capacity(sys_count);
state.completed_systems = FixedBitSet::with_capacity(sys_count);
state.skipped_systems = FixedBitSet::with_capacity(sys_count);
state.unapplied_systems = FixedBitSet::with_capacity(sys_count);
self.sender = tx;
self.receiver = rx;
self.evaluated_sets = FixedBitSet::with_capacity(set_count);
self.ready_systems = FixedBitSet::with_capacity(sys_count);
self.ready_systems_copy = FixedBitSet::with_capacity(sys_count);
self.running_systems = FixedBitSet::with_capacity(sys_count);
self.completed_systems = FixedBitSet::with_capacity(sys_count);
self.skipped_systems = FixedBitSet::with_capacity(sys_count);
self.unapplied_systems = FixedBitSet::with_capacity(sys_count);
self.system_task_metadata = Vec::with_capacity(sys_count);
state.system_task_metadata = Vec::with_capacity(sys_count);
for index in 0..sys_count {
self.system_task_metadata.push(SystemTaskMetadata {
state.system_task_metadata.push(SystemTaskMetadata {
archetype_component_access: default(),
dependents: schedule.system_dependents[index].clone(),
is_send: schedule.systems[index].is_send(),
@ -160,7 +183,7 @@ impl SystemExecutor for MultiThreadedExecutor {
});
}
self.num_dependencies_remaining = Vec::with_capacity(sys_count);
state.num_dependencies_remaining = Vec::with_capacity(sys_count);
}
fn run(
@ -169,20 +192,23 @@ impl SystemExecutor for MultiThreadedExecutor {
world: &mut World,
_skip_systems: Option<&FixedBitSet>,
) {
let state = self.state.get_mut().unwrap();
// reset counts
self.num_systems = schedule.systems.len();
if self.num_systems == 0 {
state.num_systems = schedule.systems.len();
if state.num_systems == 0 {
return;
}
self.num_running_systems = 0;
self.num_completed_systems = 0;
self.num_dependencies_remaining.clear();
self.num_dependencies_remaining
state.num_running_systems = 0;
state.num_completed_systems = 0;
state.num_dependencies_remaining.clear();
state
.num_dependencies_remaining
.extend_from_slice(&schedule.system_dependencies);
for (system_index, dependencies) in self.num_dependencies_remaining.iter_mut().enumerate() {
for (system_index, dependencies) in state.num_dependencies_remaining.iter_mut().enumerate()
{
if *dependencies == 0 {
self.ready_systems.insert(system_index);
state.ready_systems.insert(system_index);
}
}
@ -190,16 +216,16 @@ impl SystemExecutor for MultiThreadedExecutor {
// not be run.
#[cfg(feature = "bevy_debug_stepping")]
if let Some(skipped_systems) = _skip_systems {
debug_assert_eq!(skipped_systems.len(), self.completed_systems.len());
debug_assert_eq!(skipped_systems.len(), state.completed_systems.len());
// mark skipped systems as completed
self.completed_systems |= skipped_systems;
self.num_completed_systems = self.completed_systems.count_ones(..);
state.completed_systems |= skipped_systems;
state.num_completed_systems = state.completed_systems.count_ones(..);
// signal the dependencies for each of the skipped systems, as
// though they had run
for system_index in skipped_systems.ones() {
self.signal_dependents(system_index);
self.ready_systems.set(system_index, false);
state.signal_dependents(system_index);
state.ready_systems.set(system_index, false);
}
}
@ -208,69 +234,34 @@ impl SystemExecutor for MultiThreadedExecutor {
.map(|e| e.0.clone());
let thread_executor = thread_executor.as_deref();
let SyncUnsafeSchedule {
systems,
mut conditions,
} = SyncUnsafeSchedule::new(schedule);
let environment = &Environment::new(self, schedule, world);
ComputeTaskPool::get_or_init(TaskPool::default).scope_with_executor(
false,
thread_executor,
|scope| {
// the executor itself is a `Send` future so that it can run
// alongside systems that claim the local thread
#[allow(unused_mut)]
let mut executor = Box::pin(async {
let world_cell = world.as_unsafe_world_cell();
while self.num_completed_systems < self.num_systems {
// SAFETY:
// - self.ready_systems does not contain running systems.
// - `world_cell` has mutable access to the entire world.
unsafe {
self.spawn_system_tasks(scope, systems, &mut conditions, world_cell);
}
let context = Context { environment, scope };
if self.num_running_systems > 0 {
// wait for systems to complete
if let Ok(result) = self.receiver.recv().await {
self.finish_system_and_handle_dependents(result);
} else {
panic!("Channel closed unexpectedly!");
}
while let Ok(result) = self.receiver.try_recv() {
self.finish_system_and_handle_dependents(result);
}
self.rebuild_active_access();
}
}
});
#[cfg(feature = "trace")]
let executor_span = info_span!("multithreaded executor");
#[cfg(feature = "trace")]
let mut executor = executor.instrument(executor_span);
// Immediately poll the task once to avoid the overhead of the executor
// and thread wake-up. Only spawn the task if the executor does not immediately
// terminate.
if block_on(poll_once(&mut executor)).is_none() {
scope.spawn(executor);
}
// The first tick won't need to process finished systems, but we still need to run the loop in
// tick_executor() in case a system completes while the first tick still holds the mutex.
context.tick_executor();
},
);
// End the borrows of self and world in environment by copying out the reference to systems.
let systems = environment.systems;
let state = self.state.get_mut().unwrap();
if self.apply_final_deferred {
// Do one final apply buffers after all systems have completed
// Commands should be applied while on the scope's thread, not the executor's thread
let res = apply_deferred(&self.unapplied_systems, systems, world);
let res = apply_deferred(&state.unapplied_systems, systems, world);
if let Err(payload) = res {
let mut panic_payload = self.panic_payload.lock().unwrap();
*panic_payload = Some(payload);
}
self.unapplied_systems.clear();
debug_assert!(self.unapplied_systems.is_clear());
state.unapplied_systems.clear();
debug_assert!(state.unapplied_systems.is_clear());
}
// check to see if there was a panic
@ -279,12 +270,12 @@ impl SystemExecutor for MultiThreadedExecutor {
std::panic::resume_unwind(payload);
}
debug_assert!(self.ready_systems.is_clear());
debug_assert!(self.running_systems.is_clear());
self.active_access.clear();
self.evaluated_sets.clear();
self.skipped_systems.clear();
self.completed_systems.clear();
debug_assert!(state.ready_systems.is_clear());
debug_assert!(state.running_systems.is_clear());
state.active_access.clear();
state.evaluated_sets.clear();
state.skipped_systems.clear();
state.completed_systems.clear();
}
fn set_apply_final_deferred(&mut self, value: bool) {
@ -292,15 +283,72 @@ impl SystemExecutor for MultiThreadedExecutor {
}
}
impl<'scope, 'env: 'scope, 'sys> Context<'scope, 'env, 'sys> {
fn system_completed(
&self,
system_index: usize,
res: Result<(), Box<dyn Any + Send>>,
system: &BoxedSystem,
) {
// tell the executor that the system finished
self.environment
.executor
.system_completion
.push(SystemResult {
system_index,
success: res.is_ok(),
})
.unwrap_or_else(|error| unreachable!("{}", error));
if let Err(payload) = res {
eprintln!("Encountered a panic in system `{}`!", &*system.name());
// set the payload to propagate the error
{
let mut panic_payload = self.environment.executor.panic_payload.lock().unwrap();
*panic_payload = Some(payload);
}
}
self.tick_executor();
}
fn tick_executor(&self) {
// Ensure that the executor handles any events pushed to the system_completion queue by this thread.
// If this thread acquires the lock, the exector runs after the push() and they are processed.
// If this thread does not acquire the lock, then the is_empty() check on the other thread runs
// after the lock is released, which is after try_lock() failed, which is after the push()
// on this thread, so the is_empty() check will see the new events and loop.
loop {
let Ok(mut guard) = self.environment.executor.state.try_lock() else {
return;
};
guard.tick(self);
// Make sure we drop the guard before checking system_completion.is_empty(), or we could lose events.
drop(guard);
if self.environment.executor.system_completion.is_empty() {
return;
}
}
}
}
impl MultiThreadedExecutor {
/// Creates a new multi-threaded executor for use with a [`Schedule`].
///
/// [`Schedule`]: crate::schedule::Schedule
pub fn new() -> Self {
let (sender, receiver) = async_channel::unbounded();
Self {
sender,
receiver,
state: Mutex::new(ExecutorState::new()),
system_completion: ConcurrentQueue::unbounded(),
apply_final_deferred: true,
panic_payload: Mutex::new(None),
#[cfg(feature = "trace")]
executor_span: info_span!("multithreaded executor"),
}
}
}
impl ExecutorState {
fn new() -> Self {
Self {
system_task_metadata: Vec::new(),
num_systems: 0,
num_running_systems: 0,
@ -316,78 +364,114 @@ impl MultiThreadedExecutor {
skipped_systems: FixedBitSet::new(),
completed_systems: FixedBitSet::new(),
unapplied_systems: FixedBitSet::new(),
apply_final_deferred: true,
panic_payload: Arc::new(Mutex::new(None)),
stop_spawning: false,
}
}
fn tick(&mut self, context: &Context) {
#[cfg(feature = "trace")]
let _span = context.environment.executor.executor_span.enter();
for result in context.environment.executor.system_completion.try_iter() {
self.finish_system_and_handle_dependents(result);
}
self.rebuild_active_access();
// SAFETY:
// - `finish_system_and_handle_dependents` has updated the currently running systems.
// - `rebuild_active_access` locks access for all currently running systems.
unsafe {
self.spawn_system_tasks(context);
}
}
/// # Safety
/// - Caller must ensure that `self.ready_systems` does not contain any systems that
/// have been mutably borrowed (such as the systems currently running).
/// - `world_cell` must have permission to access all world data (not counting
/// any world data that is claimed by systems currently running on this executor).
unsafe fn spawn_system_tasks<'scope>(
&mut self,
scope: &Scope<'_, 'scope, ()>,
systems: &'scope [SyncUnsafeCell<BoxedSystem>],
conditions: &mut Conditions,
world_cell: UnsafeWorldCell<'scope>,
) {
unsafe fn spawn_system_tasks(&mut self, context: &Context) {
if self.exclusive_running {
return;
}
let mut conditions = context
.environment
.conditions
.try_lock()
.expect("Conditions should only be locked while owning the executor state");
// can't borrow since loop mutably borrows `self`
let mut ready_systems = std::mem::take(&mut self.ready_systems_copy);
ready_systems.clear();
ready_systems.union_with(&self.ready_systems);
for system_index in ready_systems.ones() {
assert!(!self.running_systems.contains(system_index));
// SAFETY: Caller assured that these systems are not running.
// Therefore, no other reference to this system exists and there is no aliasing.
let system = unsafe { &mut *systems[system_index].get() };
// Skipping systems may cause their dependents to become ready immediately.
// If that happens, we need to run again immediately or we may fail to spawn those dependents.
let mut check_for_new_ready_systems = true;
while check_for_new_ready_systems {
check_for_new_ready_systems = false;
if !self.can_run(system_index, system, conditions, world_cell) {
// NOTE: exclusive systems with ambiguities are susceptible to
// being significantly displaced here (compared to single-threaded order)
// if systems after them in topological order can run
// if that becomes an issue, `break;` if exclusive system
continue;
}
ready_systems.clear();
ready_systems.union_with(&self.ready_systems);
self.ready_systems.set(system_index, false);
for system_index in ready_systems.ones() {
assert!(!self.running_systems.contains(system_index));
// SAFETY: Caller assured that these systems are not running.
// Therefore, no other reference to this system exists and there is no aliasing.
let system = unsafe { &mut *context.environment.systems[system_index].get() };
// SAFETY: `can_run` returned true, which means that:
// - It must have called `update_archetype_component_access` for each run condition.
// - There can be no systems running whose accesses would conflict with any conditions.
if unsafe { !self.should_run(system_index, system, conditions, world_cell) } {
self.skip_system_and_signal_dependents(system_index);
continue;
}
self.running_systems.insert(system_index);
self.num_running_systems += 1;
if self.system_task_metadata[system_index].is_exclusive {
// SAFETY: `can_run` returned true for this system, which means
// that no other systems currently have access to the world.
let world = unsafe { world_cell.world_mut() };
// SAFETY: `can_run` returned true for this system,
// which means no systems are currently borrowed.
unsafe {
self.spawn_exclusive_system_task(scope, system_index, systems, world);
if !self.can_run(
system_index,
system,
&mut conditions,
context.environment.world_cell,
) {
// NOTE: exclusive systems with ambiguities are susceptible to
// being significantly displaced here (compared to single-threaded order)
// if systems after them in topological order can run
// if that becomes an issue, `break;` if exclusive system
continue;
}
break;
}
// SAFETY:
// - No other reference to this system exists.
// - `can_run` has been called, which calls `update_archetype_component_access` with this system.
// - `can_run` returned true, so no systems with conflicting world access are running.
unsafe {
self.spawn_system_task(scope, system_index, systems, world_cell);
self.ready_systems.set(system_index, false);
// SAFETY: `can_run` returned true, which means that:
// - It must have called `update_archetype_component_access` for each run condition.
// - There can be no systems running whose accesses would conflict with any conditions.
if unsafe {
!self.should_run(
system_index,
system,
&mut conditions,
context.environment.world_cell,
)
} {
self.skip_system_and_signal_dependents(system_index);
// signal_dependents may have set more systems to ready.
check_for_new_ready_systems = true;
continue;
}
self.running_systems.insert(system_index);
self.num_running_systems += 1;
if self.system_task_metadata[system_index].is_exclusive {
// SAFETY: `can_run` returned true for this system,
// which means no systems are currently borrowed.
unsafe {
self.spawn_exclusive_system_task(context, system_index);
}
check_for_new_ready_systems = false;
break;
}
// SAFETY:
// - Caller ensured no other reference to this system exists.
// - `can_run` has been called, which calls `update_archetype_component_access` with this system.
// - `can_run` returned true, so no systems with conflicting world access are running.
unsafe {
self.spawn_system_task(context, system_index);
}
}
}
@ -516,132 +600,80 @@ impl MultiThreadedExecutor {
/// used by the specified system.
/// - `update_archetype_component_access` must have been called with `world`
/// on the system associated with `system_index`.
unsafe fn spawn_system_task<'scope>(
&mut self,
scope: &Scope<'_, 'scope, ()>,
system_index: usize,
systems: &'scope [SyncUnsafeCell<BoxedSystem>],
world: UnsafeWorldCell<'scope>,
) {
unsafe fn spawn_system_task(&mut self, context: &Context, system_index: usize) {
// SAFETY: this system is not running, no other reference exists
let system = unsafe { &mut *systems[system_index].get() };
let sender = self.sender.clone();
let panic_payload = self.panic_payload.clone();
let system = unsafe { &mut *context.environment.systems[system_index].get() };
// Move the full context object into the new future.
let context = *context;
let system_meta = &self.system_task_metadata[system_index];
#[cfg(feature = "trace")]
let system_span = system_meta.system_task_span.clone();
let task = async move {
let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
#[cfg(feature = "trace")]
let _span = system_span.enter();
// SAFETY:
// - The caller ensures that we have permission to
// access the world data used by the system.
// - `update_archetype_component_access` has been called.
unsafe { system.run_unsafe((), world) };
unsafe { system.run_unsafe((), context.environment.world_cell) };
}));
// tell the executor that the system finished
sender
.try_send(SystemResult {
system_index,
success: res.is_ok(),
})
.unwrap_or_else(|error| unreachable!("{}", error));
if let Err(payload) = res {
eprintln!("Encountered a panic in system `{}`!", &*system.name());
// set the payload to propagate the error
{
let mut panic_payload = panic_payload.lock().unwrap();
*panic_payload = Some(payload);
}
}
context.system_completed(system_index, res, system);
};
#[cfg(feature = "trace")]
let task = task.instrument(
self.system_task_metadata[system_index]
.system_task_span
.clone(),
);
let system_meta = &self.system_task_metadata[system_index];
self.active_access
.extend(&system_meta.archetype_component_access);
if system_meta.is_send {
scope.spawn(task);
context.scope.spawn(task);
} else {
self.local_thread_running = true;
scope.spawn_on_external(task);
context.scope.spawn_on_external(task);
}
}
/// # Safety
/// Caller must ensure no systems are currently borrowed.
unsafe fn spawn_exclusive_system_task<'scope>(
&mut self,
scope: &Scope<'_, 'scope, ()>,
system_index: usize,
systems: &'scope [SyncUnsafeCell<BoxedSystem>],
world: &'scope mut World,
) {
unsafe fn spawn_exclusive_system_task(&mut self, context: &Context, system_index: usize) {
// SAFETY: `can_run` returned true for this system, which means
// that no other systems currently have access to the world.
let world = unsafe { context.environment.world_cell.world_mut() };
// SAFETY: this system is not running, no other reference exists
let system = unsafe { &mut *systems[system_index].get() };
let system = unsafe { &mut *context.environment.systems[system_index].get() };
// Move the full context object into the new future.
let context = *context;
#[cfg(feature = "trace")]
let system_span = self.system_task_metadata[system_index]
.system_task_span
.clone();
let sender = self.sender.clone();
let panic_payload = self.panic_payload.clone();
if is_apply_deferred(system) {
// TODO: avoid allocation
let unapplied_systems = self.unapplied_systems.clone();
self.unapplied_systems.clear();
let task = async move {
let res = apply_deferred(&unapplied_systems, systems, world);
// tell the executor that the system finished
sender
.try_send(SystemResult {
system_index,
success: res.is_ok(),
})
.unwrap_or_else(|error| unreachable!("{}", error));
if let Err(payload) = res {
// set the payload to propagate the error
let mut panic_payload = panic_payload.lock().unwrap();
*panic_payload = Some(payload);
}
let res = {
#[cfg(feature = "trace")]
let _span = system_span.enter();
apply_deferred(&unapplied_systems, context.environment.systems, world)
};
context.system_completed(system_index, res, system);
};
#[cfg(feature = "trace")]
let task = task.instrument(
self.system_task_metadata[system_index]
.system_task_span
.clone(),
);
scope.spawn_on_scope(task);
context.scope.spawn_on_scope(task);
} else {
let task = async move {
let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
#[cfg(feature = "trace")]
let _span = system_span.enter();
system.run((), world);
}));
// tell the executor that the system finished
sender
.try_send(SystemResult {
system_index,
success: res.is_ok(),
})
.unwrap_or_else(|error| unreachable!("{}", error));
if let Err(payload) = res {
eprintln!(
"Encountered a panic in exclusive system `{}`!",
&*system.name()
);
// set the payload to propagate the error
let mut panic_payload = panic_payload.lock().unwrap();
*panic_payload = Some(payload);
}
context.system_completed(system_index, res, system);
};
#[cfg(feature = "trace")]
let task = task.instrument(
self.system_task_metadata[system_index]
.system_task_span
.clone(),
);
scope.spawn_on_scope(task);
context.scope.spawn_on_scope(task);
}
self.exclusive_running = true;
@ -769,3 +801,36 @@ impl MainThreadExecutor {
MainThreadExecutor(TaskPool::get_thread_executor())
}
}
#[cfg(test)]
mod tests {
use crate::{
self as bevy_ecs,
prelude::Resource,
schedule::{ExecutorKind, IntoSystemConfigs, Schedule},
system::Commands,
world::World,
};
#[derive(Resource)]
struct R;
#[test]
fn skipped_systems_notify_dependents() {
let mut world = World::new();
let mut schedule = Schedule::default();
schedule.set_executor_kind(ExecutorKind::MultiThreaded);
schedule.add_systems(
(
(|| {}).run_if(|| false),
// This system depends on a sytem that is always skipped.
|mut commands: Commands| {
commands.insert_resource(R);
},
)
.chain(),
);
schedule.run(&mut world);
assert!(world.get_resource::<R>().is_some());
}
}