Cluster small table/archetype into single Task in parallel iteration (#12846)

# Objective

- Fix #7303
- bevy would spawn a lot of tasks in parallel iteration when it matchs a
large storage and many small storage ,it significantly increase the
overhead of schedule.

## Solution

- collect small storage into one task
This commit is contained in:
re0312 2024-04-04 15:09:26 +08:00 committed by GitHub
parent 344e28d095
commit 4ca8cf5d66
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 170 additions and 41 deletions

View file

@ -18,6 +18,7 @@ mod iter_simple_sparse_set;
mod iter_simple_system;
mod iter_simple_wide;
mod iter_simple_wide_sparse_set;
mod par_iter_simple;
use heavy_compute::*;
@ -27,6 +28,7 @@ criterion_group!(
iter_frag_sparse,
iter_simple,
heavy_compute,
par_iter_simple,
);
fn iter_simple(c: &mut Criterion) {
@ -117,3 +119,15 @@ fn iter_frag_sparse(c: &mut Criterion) {
});
group.finish();
}
fn par_iter_simple(c: &mut Criterion) {
let mut group = c.benchmark_group("par_iter_simple");
group.warm_up_time(std::time::Duration::from_millis(500));
group.measurement_time(std::time::Duration::from_secs(4));
for f in [0, 10, 100, 1000] {
group.bench_function(format!("with_{}_fragment", f), |b| {
let mut bench = par_iter_simple::Benchmark::new(f);
b.iter(move || bench.run());
});
}
}

View file

@ -0,0 +1,73 @@
use bevy_ecs::prelude::*;
use bevy_tasks::{ComputeTaskPool, TaskPool};
use glam::*;
#[derive(Component, Copy, Clone)]
struct Transform(Mat4);
#[derive(Component, Copy, Clone)]
struct Position(Vec3);
#[derive(Component, Copy, Clone)]
struct Rotation(Vec3);
#[derive(Component, Copy, Clone)]
struct Velocity(Vec3);
#[derive(Component, Copy, Clone, Default)]
struct Data<const X: u16>(f32);
pub struct Benchmark<'w>(World, QueryState<(&'w Velocity, &'w mut Position)>);
fn insert_if_bit_enabled<const B: u16>(entity: &mut EntityWorldMut, i: u16) {
if i & 1 << B != 0 {
entity.insert(Data::<B>(1.0));
}
}
impl<'w> Benchmark<'w> {
pub fn new(fragment: u16) -> Self {
ComputeTaskPool::get_or_init(TaskPool::default);
let mut world = World::new();
let iter = world.spawn_batch(
std::iter::repeat((
Transform(Mat4::from_scale(Vec3::ONE)),
Position(Vec3::X),
Rotation(Vec3::X),
Velocity(Vec3::X),
))
.take(100_000),
);
let entities = iter.into_iter().collect::<Vec<Entity>>();
for i in 0..fragment {
let mut e = world.entity_mut(entities[i as usize]);
insert_if_bit_enabled::<0>(&mut e, i);
insert_if_bit_enabled::<1>(&mut e, i);
insert_if_bit_enabled::<2>(&mut e, i);
insert_if_bit_enabled::<3>(&mut e, i);
insert_if_bit_enabled::<4>(&mut e, i);
insert_if_bit_enabled::<5>(&mut e, i);
insert_if_bit_enabled::<6>(&mut e, i);
insert_if_bit_enabled::<7>(&mut e, i);
insert_if_bit_enabled::<8>(&mut e, i);
insert_if_bit_enabled::<9>(&mut e, i);
insert_if_bit_enabled::<10>(&mut e, i);
insert_if_bit_enabled::<11>(&mut e, i);
insert_if_bit_enabled::<12>(&mut e, i);
insert_if_bit_enabled::<13>(&mut e, i);
insert_if_bit_enabled::<14>(&mut e, i);
insert_if_bit_enabled::<15>(&mut e, i);
}
let query = world.query::<(&Velocity, &mut Position)>();
Self(world, query)
}
#[inline(never)]
pub fn run(&mut self) {
self.1
.par_iter_mut(&mut self.0)
.for_each(|(v, mut p)| p.0 += v.0);
}
}

View file

@ -11,7 +11,7 @@ categories = ["game-engines", "data-structures"]
[features]
trace = []
multi-threaded = ["bevy_tasks/multi-threaded"]
multi-threaded = ["bevy_tasks/multi-threaded", "arrayvec"]
bevy_debug_stepping = []
default = ["bevy_reflect"]
@ -30,6 +30,7 @@ rustc-hash = "1.1"
serde = "1"
thiserror = "1.0"
nonmax = "0.5"
arrayvec = { version = "0.7.4", optional = true }
[dev-dependencies]
rand = "0.8"

View file

@ -1387,58 +1387,99 @@ impl<D: QueryData, F: QueryFilter> QueryState<D, F> {
) {
// NOTE: If you are changing query iteration code, remember to update the following places, where relevant:
// QueryIter, QueryIterationCursor, QueryManyIter, QueryCombinationIter, QueryState::for_each_unchecked_manual, QueryState::par_for_each_unchecked_manual
use arrayvec::ArrayVec;
bevy_tasks::ComputeTaskPool::get().scope(|scope| {
// SAFETY: We only access table data that has been registered in `self.archetype_component_access`.
let tables = unsafe { &world.storages().tables };
let archetypes = world.archetypes();
for storage_id in &self.matched_storage_ids {
if D::IS_DENSE && F::IS_DENSE {
let table_id = storage_id.table_id;
let table = &tables[table_id];
if table.is_empty() {
continue;
}
let mut batch_queue = ArrayVec::new();
let mut queue_entity_count = 0;
let mut offset = 0;
while offset < table.entity_count() {
let mut func = func.clone();
let len = batch_size.min(table.entity_count() - offset);
let batch = offset..offset + len;
scope.spawn(async move {
#[cfg(feature = "trace")]
let _span = self.par_iter_span.enter();
let table =
&world.storages().tables.get(table_id).debug_checked_unwrap();
// submit a list of storages which smaller than batch_size as single task
let submit_batch_queue = |queue: &mut ArrayVec<StorageId, 128>| {
if queue.is_empty() {
return;
}
let queue = std::mem::take(queue);
let mut func = func.clone();
scope.spawn(async move {
#[cfg(feature = "trace")]
let _span = self.par_iter_span.enter();
let mut iter = self.iter_unchecked_manual(world, last_run, this_run);
for storage_id in queue {
if D::IS_DENSE && F::IS_DENSE {
let id = storage_id.table_id;
let table = &world.storages().tables.get(id).debug_checked_unwrap();
iter.for_each_in_table_range(&mut func, table, 0..table.entity_count());
} else {
let id = storage_id.archetype_id;
let archetype = world.archetypes().get(id).debug_checked_unwrap();
iter.for_each_in_archetype_range(
&mut func,
archetype,
0..archetype.len(),
);
}
}
});
};
// submit single storage larger than batch_size
let submit_single = |count, storage_id: StorageId| {
for offset in (0..count).step_by(batch_size) {
let mut func = func.clone();
let len = batch_size.min(count - offset);
let batch = offset..offset + len;
scope.spawn(async move {
#[cfg(feature = "trace")]
let _span = self.par_iter_span.enter();
if D::IS_DENSE && F::IS_DENSE {
let id = storage_id.table_id;
let table = world.storages().tables.get(id).debug_checked_unwrap();
self.iter_unchecked_manual(world, last_run, this_run)
.for_each_in_table_range(&mut func, table, batch);
});
offset += batch_size;
}
} else {
let archetype_id = storage_id.archetype_id;
let archetype = &archetypes[archetype_id];
if archetype.is_empty() {
continue;
}
let mut offset = 0;
while offset < archetype.len() {
let mut func = func.clone();
let len = batch_size.min(archetype.len() - offset);
let batch = offset..offset + len;
scope.spawn(async move {
#[cfg(feature = "trace")]
let _span = self.par_iter_span.enter();
let archetype =
world.archetypes().get(archetype_id).debug_checked_unwrap();
} else {
let id = storage_id.archetype_id;
let archetype = world.archetypes().get(id).debug_checked_unwrap();
self.iter_unchecked_manual(world, last_run, this_run)
.for_each_in_archetype_range(&mut func, archetype, batch);
});
offset += batch_size;
}
}
});
}
};
let storage_entity_count = |storage_id: StorageId| -> usize {
if D::IS_DENSE && F::IS_DENSE {
tables[storage_id.table_id].entity_count()
} else {
archetypes[storage_id.archetype_id].len()
}
};
for storage_id in &self.matched_storage_ids {
let count = storage_entity_count(*storage_id);
// skip empty storage
if count == 0 {
continue;
}
// immediately submit large storage
if count >= batch_size {
submit_single(count, *storage_id);
continue;
}
// merge small storage
batch_queue.push(*storage_id);
queue_entity_count += count;
// submit batch_queue
if queue_entity_count >= batch_size || batch_queue.is_full() {
submit_batch_queue(&mut batch_queue);
queue_entity_count = 0;
}
}
submit_batch_queue(&mut batch_queue);
});
}