ecs: prepare system ranges based on stage and thread locals

This commit is contained in:
Carter Anderson 2020-07-30 13:19:55 -07:00
parent 44c08f90aa
commit f85ec04a48
2 changed files with 156 additions and 60 deletions

View file

@ -7,8 +7,12 @@ use crossbeam_channel::{Receiver, Sender};
use fixedbitset::FixedBitSet;
use hecs::{ArchetypesGeneration, World};
use rayon::ScopeFifo;
use std::sync::{Arc, Mutex};
use std::{
ops::Range,
sync::{Arc, Mutex},
};
#[derive(Debug)]
pub struct ParallelExecutor {
stages: Vec<ExecutorStage>,
last_schedule_generation: usize,
@ -33,47 +37,38 @@ impl ParallelExecutor {
}
}
pub fn prepare(&mut self, schedule: &mut Schedule, world: &World) {
pub fn run(&mut self, schedule: &mut Schedule, world: &mut World, resources: &mut Resources) {
let schedule_generation = schedule.generation();
let schedule_changed = schedule_generation != self.last_schedule_generation;
let schedule_changed = schedule.generation() != self.last_schedule_generation;
if schedule_changed {
self.stages.clear();
self.stages
.resize_with(schedule.stage_order.len(), || ExecutorStage::default());
}
for (stage_index, stage_name) in schedule.stage_order.iter().enumerate() {
let executor_stage = &mut self.stages[stage_index];
if let Some(systems) = schedule.stages.get(stage_name) {
executor_stage.prepare(world, systems, schedule_changed);
}
}
self.last_schedule_generation = schedule_generation;
}
pub fn run(&mut self, schedule: &mut Schedule, world: &mut World, resources: &mut Resources) {
self.prepare(schedule, world);
for (stage_name, executor_stage) in schedule.stage_order.iter().zip(self.stages.iter_mut())
{
if let Some(stage_systems) = schedule.stages.get_mut(stage_name) {
executor_stage.run(world, resources, stage_systems);
executor_stage.run(world, resources, stage_systems, schedule_changed);
}
}
if self.clear_trackers {
world.clear_trackers();
}
self.last_schedule_generation = schedule_generation;
}
}
#[derive(Clone)]
#[derive(Debug, Clone)]
pub struct ExecutorStage {
/// each system's set of dependencies
system_dependencies: Vec<FixedBitSet>,
/// each system's dependents (the systems that can't run until this system has run)
system_dependents: Vec<Vec<usize>>,
/// stores the indices of thread local systems in this stage, which are used during stage.prepare()
thread_local_system_indices: Vec<usize>,
next_thread_local_index: usize,
/// the currently finished systems
finished_systems: FixedBitSet,
running_systems: FixedBitSet,
@ -89,6 +84,8 @@ impl Default for ExecutorStage {
Self {
system_dependents: Default::default(),
system_dependencies: Default::default(),
thread_local_system_indices: Default::default(),
next_thread_local_index: 0,
finished_systems: Default::default(),
running_systems: Default::default(),
sender,
@ -104,48 +101,54 @@ enum RunReadyResult {
}
enum RunReadyType {
All,
Range(Range<usize>),
Dependents(usize),
}
impl ExecutorStage {
// TODO: add a start_index parameter here
pub fn prepare(
pub fn prepare_to_next_thread_local(
&mut self,
world: &World,
systems: &Vec<Arc<Mutex<Box<dyn System>>>>,
systems: &[Arc<Mutex<Box<dyn System>>>],
schedule_changed: bool,
) {
// if the schedule has changed, clear executor state / fill it with new defaults
if schedule_changed {
self.system_dependencies.clear();
self.system_dependencies
.resize_with(systems.len(), || FixedBitSet::with_capacity(systems.len()));
let (prepare_system_start_index, last_thread_local_index) =
if self.next_thread_local_index == 0 {
(0, None)
} else {
// start right after the last thread local system
(
self.thread_local_system_indices[self.next_thread_local_index - 1] + 1,
Some(self.thread_local_system_indices[self.next_thread_local_index - 1]),
)
};
self.system_dependents.clear();
self.system_dependents.resize(systems.len(), Vec::new());
let prepare_system_index_range = if let Some(index) = self
.thread_local_system_indices
.get(self.next_thread_local_index)
{
// if there is an upcoming thread local system, prepare up to (and including) it
prepare_system_start_index..(*index + 1)
} else {
// if there are no upcoming thread local systems, prepare everything right now
prepare_system_start_index..systems.len()
};
self.finished_systems.grow(systems.len());
self.running_systems.grow(systems.len());
}
let archetypes_generation = world.archetypes_generation();
let archetypes_generation_changed =
self.last_archetypes_generation != archetypes_generation;
self.last_archetypes_generation != world.archetypes_generation();
if schedule_changed || archetypes_generation_changed {
// update each system's archetype access to latest world archetypes
for system in systems.iter() {
let mut system = system.lock().unwrap();
for system_index in prepare_system_index_range.clone() {
let mut system = systems[system_index].lock().unwrap();
system.update_archetype_access(world);
}
// calculate dependencies between systems and build execution order
let mut current_archetype_access = ArchetypeAccess::default();
let mut current_resource_access = TypeAccess::default();
let mut last_thread_local_index: Option<usize> = None;
for (system_index, system) in systems.iter().enumerate() {
let system = system.lock().unwrap();
for system_index in prepare_system_index_range.clone() {
let system = systems[system_index].lock().unwrap();
let archetype_access = system.archetype_access();
match system.thread_local_execution() {
ThreadLocalExecution::NextFlush => {
@ -154,15 +157,16 @@ impl ExecutorStage {
if current_archetype_access.is_compatible(archetype_access) == false
|| current_resource_access.is_compatible(resource_access) == false
{
for earlier_system_index in 0..system_index {
for earlier_system_index in
prepare_system_index_range.start..system_index
{
let earlier_system = systems[earlier_system_index].lock().unwrap();
// ignore "immediate" thread local systems, we handle them separately
if let ThreadLocalExecution::Immediate =
earlier_system.thread_local_execution()
{
continue;
}
// due to how prepare ranges work, previous systems should all be "NextFlush"
debug_assert_eq!(
earlier_system.thread_local_execution(),
ThreadLocalExecution::NextFlush
);
// if earlier system is incompatible, make the current system dependent
if earlier_system
@ -190,8 +194,7 @@ impl ExecutorStage {
}
}
ThreadLocalExecution::Immediate => {
last_thread_local_index = Some(system_index);
for earlier_system_index in 0..system_index {
for earlier_system_index in prepare_system_index_range.start..system_index {
// treat all earlier systems as "incompatible" to ensure we run this thread local system exclusively
self.system_dependents[earlier_system_index].push(system_index);
self.system_dependencies[system_index].insert(earlier_system_index);
@ -201,7 +204,7 @@ impl ExecutorStage {
}
}
self.last_archetypes_generation = archetypes_generation;
self.next_thread_local_index += 1;
}
fn run_ready_systems<'run>(
@ -216,8 +219,8 @@ impl ExecutorStage {
let mut all;
let mut dependents;
let system_index_iter: &mut dyn Iterator<Item = usize> = match run_ready_type {
RunReadyType::All => {
all = 0..systems.len();
RunReadyType::Range(range) => {
all = range;
&mut all
}
RunReadyType::Dependents(system_index) => {
@ -272,14 +275,49 @@ impl ExecutorStage {
world: &mut World,
resources: &mut Resources,
systems: &[Arc<Mutex<Box<dyn System>>>],
schedule_changed: bool,
) {
// if the schedule has changed, clear executor state / fill it with new defaults
if schedule_changed {
self.system_dependencies.clear();
self.system_dependencies
.resize_with(systems.len(), || FixedBitSet::with_capacity(systems.len()));
self.thread_local_system_indices = Vec::new();
self.system_dependents.clear();
self.system_dependents.resize(systems.len(), Vec::new());
self.finished_systems.grow(systems.len());
self.running_systems.grow(systems.len());
for (system_index, system) in systems.iter().enumerate() {
let system = system.lock().unwrap();
if system.thread_local_execution() == ThreadLocalExecution::Immediate {
self.thread_local_system_indices.push(system_index);
}
}
}
self.next_thread_local_index = 0;
self.prepare_to_next_thread_local(world, systems, schedule_changed);
self.finished_systems.clear();
self.running_systems.clear();
let mut run_ready_result = RunReadyResult::Ok;
let run_ready_system_index_range = if let Some(index) = self
.thread_local_system_indices
.get(0)
{
// if there is an upcoming thread local system, run up to (and including) it
0..(*index + 1)
} else {
// if there are no upcoming thread local systems, run everything right now
0..systems.len()
};
rayon::scope_fifo(|scope| {
run_ready_result =
self.run_ready_systems(systems, RunReadyType::All, scope, world, resources);
self.run_ready_systems(systems, RunReadyType::Range(run_ready_system_index_range), scope, world, resources);
});
loop {
// if all systems in the stage are finished, break out of the loop
@ -296,7 +334,7 @@ impl ExecutorStage {
self.finished_systems.insert(thread_local_index);
self.sender.send(thread_local_index).unwrap();
// TODO: if archetype generation has changed, call "prepare" on all systems after this one
self.prepare_to_next_thread_local(world, systems, schedule_changed);
run_ready_result = RunReadyResult::Ok;
} else {
@ -335,6 +373,8 @@ impl ExecutorStage {
ThreadLocalExecution::Immediate => { /* already ran */ }
}
}
self.last_archetypes_generation = world.archetypes_generation();
}
}
@ -345,9 +385,10 @@ mod tests {
resource::{Res, ResMut, Resources},
schedule::Schedule,
system::{IntoQuerySystem, IntoThreadLocalSystem, Query},
Commands,
};
use fixedbitset::FixedBitSet;
use hecs::World;
use hecs::{Entity, World};
use std::sync::{Arc, Mutex};
#[derive(Default)]
@ -355,6 +396,63 @@ mod tests {
count: Arc<Mutex<usize>>,
}
#[test]
fn cross_stage_archetype_change_prepare() {
let mut world = World::new();
let mut resources = Resources::default();
let mut schedule = Schedule::default();
schedule.add_stage("PreArchetypeChange");
schedule.add_stage("PostArchetypeChange");
fn insert(mut commands: Commands) {
commands.spawn((1u32,));
}
fn read(query: Query<&u32>, mut entities: Query<Entity>) {
for entity in &mut entities.iter() {
// query.get() does a "system permission check" that will fail if the entity is from a
// new archetype which hasnt been "prepared yet"
query.get::<u32>(entity).unwrap();
}
assert_eq!(1, entities.iter().iter().count());
}
schedule.add_system_to_stage("PreArchetypeChange", insert.system());
schedule.add_system_to_stage("PostArchetypeChange", read.system());
let mut executor = ParallelExecutor::default();
executor.run(&mut schedule, &mut world, &mut resources);
}
#[test]
fn intra_stage_archetype_change_prepare() {
let mut world = World::new();
let mut resources = Resources::default();
let mut schedule = Schedule::default();
schedule.add_stage("update");
fn insert(world: &mut World, _resources: &mut Resources) {
world.spawn((1u32,));
}
fn read(query: Query<&u32>, mut entities: Query<Entity>) {
for entity in &mut entities.iter() {
// query.get() does a "system permission check" that will fail if the entity is from a
// new archetype which hasnt been "prepared yet"
query.get::<u32>(entity).unwrap();
}
assert_eq!(1, entities.iter().iter().count());
}
schedule.add_system_to_stage("update", insert.thread_local_system());
schedule.add_system_to_stage("update", read.system());
let mut executor = ParallelExecutor::default();
executor.run(&mut schedule, &mut world, &mut resources);
}
#[test]
fn schedule() {
let mut world = World::new();
@ -475,7 +573,7 @@ mod tests {
world: &mut World,
resources: &mut Resources,
) {
executor.prepare(schedule, world);
executor.run(schedule, world, resources);
assert_eq!(
executor.stages[0].system_dependents,
@ -536,8 +634,6 @@ mod tests {
]
);
executor.run(schedule, world, resources);
let counter = resources.get::<Counter>().unwrap();
assert_eq!(
*counter.count.lock().unwrap(),

View file

@ -3,7 +3,7 @@ use fixedbitset::FixedBitSet;
use hecs::{Access, Query, World};
use std::{any::TypeId, borrow::Cow, collections::HashSet};
#[derive(Copy, Clone)]
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub enum ThreadLocalExecution {
Immediate,
NextFlush,