Start running systems while prepare_systems is running (#4919)

# Objective
While using the ParallelExecutor, systems do not actually start until `prepare_systems` completes. In stages where there are large numbers of "empty" systems with very little work to do, this delay adds significant overhead, which can add up over many stages.

## Solution
Immediately and synchronously signal the start of systems that can run without dependencies inside `prepare_systems` instead of waiting for the first executor iteration after `prepare_systems` completes. Any system that is dependent on them still cannot run until after `prepare_systems` completes, but there are a large number of unconstrained systems in the base engine where this is a general benefit in almost every case.

## Performance

This change was tested against `many_foxes` in the default configuration. As this change is sensitive to the overhead around scheduling systems, the spans for measuring system timing, system overhead, and system commands were all commented out for these measurements.

The median stage timings between `main` and this PR are as follows:

|stage|main|this PR|
|:--|:--|:--|
|First|75.54 us|61.61 us|
|LoadAssets|51.05 us|42.32 us|
|PreUpdate|54.6 us|55.56 us|
|Update|61.89 us|51.5 us|
|PostUpdate|7.27 ms|6.71 ms|
|AssetEvents|47.82 us|35.95 us|
|Last|39.19 us|37.71 us|
|reserve_and_flush|57.83 us|48.2 us|
|Extract|1.41 ms|1.28 ms|
|Prepare|554.49 us|502.53 us|
|Queue|216.29 us|207.51 us|
|Sort|67.03 us|60.99 us|
|Render|1.73 ms|1.58 ms|
|Cleanup|33.55 us|30.76 us|
|Clear Entities|18.56 us|17.05 us|
|**full frame**|**11.9 ms**|**10.91 ms**|

For the first few stages, the benefit is small but cumulative over each. For PostUpdate in particular, this allows `parent_update` to run while prepare_systems is running, which is required for the animation and transform propagation systems, which dominate the time spent in the stage, but also frontloads the contention as the other "empty" systems are also running while `parent_update` is running. For Render, where there is just a single large exclusive system, the benefit comes from not waiting on a spuriously scheduled task on the task pool to kick off the system: it's immediately scheduled to run.
This commit is contained in:
James Liu 2022-09-13 19:28:13 +00:00
parent d8d191fdd5
commit 5d821fe1a7
2 changed files with 114 additions and 43 deletions

View file

@ -21,6 +21,7 @@ bevy_utils = { path = "../bevy_utils", version = "0.9.0-dev" }
bevy_ecs_macros = { path = "macros", version = "0.9.0-dev" }
async-channel = "1.4"
event-listener = "2.5"
thread_local = "1.1.4"
fixedbitset = "0.4"
fxhash = "0.2"

View file

@ -8,6 +8,7 @@ use async_channel::{Receiver, Sender};
use bevy_tasks::{ComputeTaskPool, Scope, TaskPool};
#[cfg(feature = "trace")]
use bevy_utils::tracing::Instrument;
use event_listener::Event;
use fixedbitset::FixedBitSet;
#[cfg(test)]
@ -15,9 +16,7 @@ use scheduling_event::*;
struct SystemSchedulingMetadata {
/// Used to signal the system's task to start the system.
start_sender: Sender<()>,
/// Receives the signal to start the system.
start_receiver: Receiver<()>,
start: Event,
/// Indices of systems that depend on this one, used to decrement their
/// dependency counters when this system finishes.
dependants: Vec<usize>,
@ -56,7 +55,11 @@ pub struct ParallelExecutor {
impl Default for ParallelExecutor {
fn default() -> Self {
let (finish_sender, finish_receiver) = async_channel::unbounded();
// Using a bounded channel here as it avoids allocations when signaling
// and generally remains hotter in memory. It'll take 128 systems completing
// before the parallel executor runs before this overflows. If it overflows
// all systems will just suspend until the parallel executor runs.
let (finish_sender, finish_receiver) = async_channel::bounded(128);
Self {
system_metadata: Default::default(),
finish_sender,
@ -84,10 +87,8 @@ impl ParallelSystemExecutor for ParallelExecutor {
for container in systems {
let dependencies_total = container.dependencies().len();
let system = container.system();
let (start_sender, start_receiver) = async_channel::bounded(1);
self.system_metadata.push(SystemSchedulingMetadata {
start_sender,
start_receiver,
start: Event::new(),
dependants: vec![],
dependencies_total,
dependencies_now: 0,
@ -125,10 +126,13 @@ impl ParallelSystemExecutor for ParallelExecutor {
ComputeTaskPool::init(TaskPool::default).scope(|scope| {
self.prepare_systems(scope, systems, world);
if self.should_run.count_ones(..) == 0 {
return;
}
let parallel_executor = async {
// All systems have been ran if there are no queued or running systems.
while 0 != self.queued.count_ones(..) + self.running.count_ones(..) {
self.process_queued_systems().await;
self.process_queued_systems();
// Avoid deadlocking if no systems were actually started.
if self.running.count_ones(..) != 0 {
// Wait until at least one system has finished.
@ -166,34 +170,96 @@ impl ParallelExecutor {
systems: &'scope mut [ParallelSystemContainer],
world: &'scope World,
) {
// These are used as a part of a unit test.
#[cfg(test)]
let mut started_systems = 0;
#[cfg(feature = "trace")]
let _span = bevy_utils::tracing::info_span!("prepare_systems").entered();
self.should_run.clear();
for (index, (system_data, system)) in
self.system_metadata.iter_mut().zip(systems).enumerate()
{
let should_run = system.should_run();
let can_start = should_run
&& system_data.dependencies_total == 0
&& Self::can_start_now(
self.non_send_running,
system_data,
&self.active_archetype_component_access,
);
// Queue the system if it has no dependencies, otherwise reset its dependency counter.
if system_data.dependencies_total == 0 {
if !can_start {
self.queued.insert(index);
}
} else {
system_data.dependencies_now = system_data.dependencies_total;
}
if !should_run {
continue;
}
// Spawn the system task.
if system.should_run() {
self.should_run.set(index, true);
let start_receiver = system_data.start_receiver.clone();
let finish_sender = self.finish_sender.clone();
let system = system.system_mut();
#[cfg(feature = "trace")] // NB: outside the task to get the TLS current span
let system_span = bevy_utils::tracing::info_span!("system", name = &*system.name());
self.should_run.insert(index);
let finish_sender = self.finish_sender.clone();
let system = system.system_mut();
#[cfg(feature = "trace")] // NB: outside the task to get the TLS current span
let system_span = bevy_utils::tracing::info_span!("system", name = &*system.name());
#[cfg(feature = "trace")]
let overhead_span =
bevy_utils::tracing::info_span!("system overhead", name = &*system.name());
let mut run = move || {
#[cfg(feature = "trace")]
let overhead_span =
bevy_utils::tracing::info_span!("system overhead", name = &*system.name());
let _system_guard = system_span.enter();
// SAFETY: the executor prevents two systems with conflicting access from running simultaneously.
unsafe { system.run_unsafe((), world) };
};
if can_start {
let task = async move {
start_receiver
.recv()
run();
// This will never panic:
// - The channel is never closed or dropped.
// - Overflowing the bounded size will just suspend until
// there is capacity.
finish_sender
.send(index)
.await
.unwrap_or_else(|error| unreachable!("{}", error));
#[cfg(feature = "trace")]
let system_guard = system_span.enter();
// SAFETY: the executor prevents two systems with conflicting access from running simultaneously.
unsafe { system.run_unsafe((), world) };
#[cfg(feature = "trace")]
drop(system_guard);
};
#[cfg(feature = "trace")]
let task = task.instrument(overhead_span);
if system_data.is_send {
scope.spawn(task);
} else {
scope.spawn_local(task);
}
#[cfg(test)]
{
started_systems += 1;
}
self.running.insert(index);
if !system_data.is_send {
self.non_send_running = true;
}
// Add this system's access information to the active access information.
self.active_archetype_component_access
.extend(&system_data.archetype_component_access);
} else {
let start_listener = system_data.start.listen();
let task = async move {
start_listener.await;
run();
// This will never panic:
// - The channel is never closed or dropped.
// - Overflowing the bounded size will just suspend until
// there is capacity.
finish_sender
.send(index)
.await
@ -208,29 +274,33 @@ impl ParallelExecutor {
scope.spawn_local(task);
}
}
// Queue the system if it has no dependencies, otherwise reset its dependency counter.
if system_data.dependencies_total == 0 {
self.queued.insert(index);
} else {
system_data.dependencies_now = system_data.dependencies_total;
}
}
#[cfg(test)]
if started_systems != 0 {
self.emit_event(SchedulingEvent::StartedSystems(started_systems));
}
}
/// Determines if the system with given index has no conflicts with already running systems.
fn can_start_now(&self, index: usize) -> bool {
let system_data = &self.system_metadata[index];
#[inline]
fn can_start_now(
non_send_running: bool,
system_data: &SystemSchedulingMetadata,
active_archetype_component_access: &Access<ArchetypeComponentId>,
) -> bool {
// Non-send systems are considered conflicting with each other.
(!self.non_send_running || system_data.is_send)
(!non_send_running || system_data.is_send)
&& system_data
.archetype_component_access
.is_compatible(&self.active_archetype_component_access)
.is_compatible(active_archetype_component_access)
}
/// Starts all non-conflicting queued systems, moves them from `queued` to `running`,
/// adds their access information to active access information;
/// processes queued systems that shouldn't run this iteration as completed immediately.
async fn process_queued_systems(&mut self) {
fn process_queued_systems(&mut self) {
// These are used as a part of a unit test as seen in `process_queued_systems`.
// Removing them will cause the test to fail.
#[cfg(test)]
let mut started_systems = 0;
for index in self.queued.ones() {
@ -239,17 +309,17 @@ impl ParallelExecutor {
let system_metadata = &self.system_metadata[index];
if !self.should_run[index] {
self.dependants_scratch.extend(&system_metadata.dependants);
} else if self.can_start_now(index) {
} else if Self::can_start_now(
self.non_send_running,
system_metadata,
&self.active_archetype_component_access,
) {
#[cfg(test)]
{
started_systems += 1;
}
system_metadata
.start_sender
.send(())
.await
.unwrap_or_else(|error| unreachable!("{}", error));
self.running.set(index, true);
system_metadata.start.notify_additional_relaxed(1);
self.running.insert(index);
if !system_metadata.is_send {
self.non_send_running = true;
}