From 4f3ed196faa2cda8035cbe7f5313fe36e19b6b1a Mon Sep 17 00:00:00 2001 From: Mike Date: Fri, 3 Feb 2023 03:19:41 +0000 Subject: [PATCH] Stageless: move MainThreadExecutor to schedule_v3 (#7444) # Objective - Trying to move some of the fixes from https://github.com/bevyengine/bevy/pull/7267 to make that one easier to review - The MainThreadExecutor is how the render world runs nonsend systems on the main thread for pipelined rendering. - The multithread executor for stageless wasn't using the MainThreadExecutor. - MainThreadExecutor was declared in the old executor_parallel module that is getting deleted. - The way the MainThreadExecutor was getting passed to the scope was actually unsound as the resource could be dropped from the World while the schedule was running ## Solution - Move MainThreadExecutor to the new multithreaded_executor's file. - Make the multithreaded executor use the MainThreadExecutor - Clone the MainThreadExecutor onto the stack and pass that ref in ## Changelog - Move MainThreadExecutor for stageless migration. --- .../src/schedule/executor_parallel.rs | 22 ++--- .../bevy_ecs/src/schedule_v3/executor/mod.rs | 2 +- .../schedule_v3/executor/multi_threaded.rs | 89 ++++++++++++------- crates/bevy_render/src/pipelined_rendering.rs | 3 +- 4 files changed, 65 insertions(+), 51 deletions(-) diff --git a/crates/bevy_ecs/src/schedule/executor_parallel.rs b/crates/bevy_ecs/src/schedule/executor_parallel.rs index 0db9627633..dd966ac3cf 100644 --- a/crates/bevy_ecs/src/schedule/executor_parallel.rs +++ b/crates/bevy_ecs/src/schedule/executor_parallel.rs @@ -1,15 +1,12 @@ -use std::sync::Arc; - -use crate as bevy_ecs; use crate::{ archetype::ArchetypeComponentId, query::Access, schedule::{ParallelSystemExecutor, SystemContainer}, - system::Resource, + schedule_v3::MainThreadExecutor, world::World, }; use async_channel::{Receiver, Sender}; -use bevy_tasks::{ComputeTaskPool, Scope, TaskPool, ThreadExecutor}; +use bevy_tasks::{ComputeTaskPool, Scope, TaskPool}; #[cfg(feature = "trace")] use bevy_utils::tracing::Instrument; use event_listener::Event; @@ -18,16 +15,6 @@ use fixedbitset::FixedBitSet; #[cfg(test)] use scheduling_event::*; -/// New-typed [`ThreadExecutor`] [`Resource`] that is used to run systems on the main thread -#[derive(Resource, Default, Clone)] -pub struct MainThreadExecutor(pub Arc>); - -impl MainThreadExecutor { - pub fn new() -> Self { - MainThreadExecutor(Arc::new(ThreadExecutor::new())) - } -} - struct SystemSchedulingMetadata { /// Used to signal the system's task to start the system. start: Event, @@ -138,7 +125,10 @@ impl ParallelSystemExecutor for ParallelExecutor { } } - let thread_executor = world.get_resource::().map(|e| &*e.0); + let thread_executor = world + .get_resource::() + .map(|e| e.0.clone()); + let thread_executor = thread_executor.as_deref(); ComputeTaskPool::init(TaskPool::default).scope_with_executor( false, diff --git a/crates/bevy_ecs/src/schedule_v3/executor/mod.rs b/crates/bevy_ecs/src/schedule_v3/executor/mod.rs index bfc1eef14d..8fc1788cb7 100644 --- a/crates/bevy_ecs/src/schedule_v3/executor/mod.rs +++ b/crates/bevy_ecs/src/schedule_v3/executor/mod.rs @@ -2,7 +2,7 @@ mod multi_threaded; mod simple; mod single_threaded; -pub use self::multi_threaded::MultiThreadedExecutor; +pub use self::multi_threaded::{MainThreadExecutor, MultiThreadedExecutor}; pub use self::simple::SimpleExecutor; pub use self::single_threaded::SingleThreadedExecutor; diff --git a/crates/bevy_ecs/src/schedule_v3/executor/multi_threaded.rs b/crates/bevy_ecs/src/schedule_v3/executor/multi_threaded.rs index 21d6c5d4cd..0796ce1963 100644 --- a/crates/bevy_ecs/src/schedule_v3/executor/multi_threaded.rs +++ b/crates/bevy_ecs/src/schedule_v3/executor/multi_threaded.rs @@ -1,14 +1,16 @@ -use bevy_tasks::{ComputeTaskPool, Scope, TaskPool}; +use bevy_tasks::{ComputeTaskPool, Scope, TaskPool, ThreadExecutor}; use bevy_utils::default; use bevy_utils::syncunsafecell::SyncUnsafeCell; #[cfg(feature = "trace")] use bevy_utils::tracing::{info_span, Instrument}; +use std::sync::Arc; use async_channel::{Receiver, Sender}; use fixedbitset::FixedBitSet; use crate::{ archetype::ArchetypeComponentId, + prelude::Resource, query::Access, schedule_v3::{ is_apply_system_buffers, BoxedCondition, ExecutorKind, SystemExecutor, SystemSchedule, @@ -17,6 +19,8 @@ use crate::{ world::World, }; +use crate as bevy_ecs; + /// A funky borrow split of [`SystemSchedule`] required by the [`MultiThreadedExecutor`]. struct SyncUnsafeSchedule<'a> { systems: &'a [SyncUnsafeCell], @@ -145,47 +149,56 @@ impl SystemExecutor for MultiThreadedExecutor { } } + let thread_executor = world + .get_resource::() + .map(|e| e.0.clone()); + let thread_executor = thread_executor.as_deref(); + let world = SyncUnsafeCell::from_mut(world); let SyncUnsafeSchedule { systems, mut conditions, } = SyncUnsafeSchedule::new(schedule); - ComputeTaskPool::init(TaskPool::default).scope(|scope| { - // the executor itself is a `Send` future so that it can run - // alongside systems that claim the local thread - let executor = async { - while self.num_completed_systems < num_systems { - // SAFETY: self.ready_systems does not contain running systems - unsafe { - self.spawn_system_tasks(scope, systems, &mut conditions, world); - } - - if self.num_running_systems > 0 { - // wait for systems to complete - let index = self - .receiver - .recv() - .await - .unwrap_or_else(|error| unreachable!("{}", error)); - - self.finish_system_and_signal_dependents(index); - - while let Ok(index) = self.receiver.try_recv() { - self.finish_system_and_signal_dependents(index); + ComputeTaskPool::init(TaskPool::default).scope_with_executor( + false, + thread_executor, + |scope| { + // the executor itself is a `Send` future so that it can run + // alongside systems that claim the local thread + let executor = async { + while self.num_completed_systems < num_systems { + // SAFETY: self.ready_systems does not contain running systems + unsafe { + self.spawn_system_tasks(scope, systems, &mut conditions, world); } - self.rebuild_active_access(); - } - } - }; + if self.num_running_systems > 0 { + // wait for systems to complete + let index = self + .receiver + .recv() + .await + .unwrap_or_else(|error| unreachable!("{}", error)); - #[cfg(feature = "trace")] - let executor_span = info_span!("schedule_task"); - #[cfg(feature = "trace")] - let executor = executor.instrument(executor_span); - scope.spawn(executor); - }); + self.finish_system_and_signal_dependents(index); + + while let Ok(index) = self.receiver.try_recv() { + self.finish_system_and_signal_dependents(index); + } + + self.rebuild_active_access(); + } + } + }; + + #[cfg(feature = "trace")] + let executor_span = info_span!("schedule_task"); + #[cfg(feature = "trace")] + let executor = executor.instrument(executor_span); + scope.spawn(executor); + }, + ); // Do one final apply buffers after all systems have completed // SAFETY: all systems have completed, and so no outstanding accesses remain @@ -574,3 +587,13 @@ fn evaluate_and_fold_conditions(conditions: &mut [BoxedCondition], world: &World }) .fold(true, |acc, res| acc && res) } + +/// New-typed [`ThreadExecutor`] [`Resource`] that is used to run systems on the main thread +#[derive(Resource, Default, Clone)] +pub struct MainThreadExecutor(pub Arc>); + +impl MainThreadExecutor { + pub fn new() -> Self { + MainThreadExecutor(Arc::new(ThreadExecutor::new())) + } +} diff --git a/crates/bevy_render/src/pipelined_rendering.rs b/crates/bevy_render/src/pipelined_rendering.rs index 43bbbf8409..516911aa03 100644 --- a/crates/bevy_render/src/pipelined_rendering.rs +++ b/crates/bevy_render/src/pipelined_rendering.rs @@ -2,7 +2,8 @@ use async_channel::{Receiver, Sender}; use bevy_app::{App, AppLabel, Plugin, SubApp}; use bevy_ecs::{ - schedule::{MainThreadExecutor, StageLabel, SystemStage}, + schedule::{StageLabel, SystemStage}, + schedule_v3::MainThreadExecutor, system::Resource, world::{Mut, World}, };