mirror of
https://github.com/bevyengine/bevy
synced 2024-11-22 12:43:34 +00:00
Mark events as read during EventReader::par_read
(#13836)
# Objective - Fix issue #13821 ## Solution - Rewrote the test to ensure that it actually tests the functionality correctly. Then made the par_read function correctly change the values of self.reader.last_event_count. ## Testing - Rewrote the test for par_read to run the system schedule twice, checking the output each time --------- Co-authored-by: Martín Maita <47983254+mnmaita@users.noreply.github.com>
This commit is contained in:
parent
f51a306b30
commit
4c3b4a445d
3 changed files with 44 additions and 27 deletions
|
@ -1,8 +1,7 @@
|
|||
use crate as bevy_ecs;
|
||||
use bevy_ecs::{
|
||||
batching::BatchingStrategy,
|
||||
event::{Event, EventId, EventInstance, Events, ManualEventReader},
|
||||
};
|
||||
#[cfg(feature = "multi_threaded")]
|
||||
use bevy_ecs::batching::BatchingStrategy;
|
||||
use bevy_ecs::event::{Event, EventId, EventInstance, Events, ManualEventReader};
|
||||
use bevy_utils::detailed_trace;
|
||||
use std::{iter::Chain, slice::Iter};
|
||||
|
||||
|
@ -141,12 +140,15 @@ impl<'a, E: Event> ExactSizeIterator for EventIteratorWithId<'a, E> {
|
|||
|
||||
/// A parallel iterator over `Event`s.
|
||||
#[derive(Debug)]
|
||||
#[cfg(feature = "multi_threaded")]
|
||||
pub struct EventParIter<'a, E: Event> {
|
||||
reader: &'a mut ManualEventReader<E>,
|
||||
slices: [&'a [EventInstance<E>]; 2],
|
||||
batching_strategy: BatchingStrategy,
|
||||
unread: usize,
|
||||
}
|
||||
|
||||
#[cfg(feature = "multi_threaded")]
|
||||
impl<'a, E: Event> EventParIter<'a, E> {
|
||||
/// Creates a new parallel iterator over `events` that have not yet been seen by `reader`.
|
||||
pub fn new(reader: &'a mut ManualEventReader<E>, events: &'a Events<E>) -> Self {
|
||||
|
@ -168,6 +170,7 @@ impl<'a, E: Event> EventParIter<'a, E> {
|
|||
reader,
|
||||
slices: [a, b],
|
||||
batching_strategy: BatchingStrategy::default(),
|
||||
unread: unread_count,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -203,13 +206,13 @@ impl<'a, E: Event> EventParIter<'a, E> {
|
|||
/// initialized and run from the ECS scheduler, this should never panic.
|
||||
///
|
||||
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
|
||||
pub fn for_each_with_id<FN: Fn(&'a E, EventId<E>) + Send + Sync + Clone>(self, func: FN) {
|
||||
#[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]
|
||||
pub fn for_each_with_id<FN: Fn(&'a E, EventId<E>) + Send + Sync + Clone>(mut self, func: FN) {
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
{
|
||||
self.into_iter().for_each(|(e, i)| func(e, i));
|
||||
}
|
||||
|
||||
#[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
{
|
||||
let pool = bevy_tasks::ComputeTaskPool::get();
|
||||
let thread_count = pool.thread_num();
|
||||
|
@ -233,6 +236,10 @@ impl<'a, E: Event> EventParIter<'a, E> {
|
|||
});
|
||||
}
|
||||
});
|
||||
|
||||
// Events are guaranteed to be read at this point.
|
||||
self.reader.last_event_count += self.unread;
|
||||
self.unread = 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -247,6 +254,7 @@ impl<'a, E: Event> EventParIter<'a, E> {
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "multi_threaded")]
|
||||
impl<'a, E: Event> IntoIterator for EventParIter<'a, E> {
|
||||
type IntoIter = EventIteratorWithId<'a, E>;
|
||||
type Item = <Self::IntoIter as Iterator>::Item;
|
||||
|
|
|
@ -11,7 +11,9 @@ pub(crate) use base::EventInstance;
|
|||
pub use base::{Event, EventId};
|
||||
pub use bevy_ecs_macros::Event;
|
||||
pub use collections::{Events, SendBatchIds};
|
||||
pub use iterators::{EventIterator, EventIteratorWithId, EventParIter};
|
||||
#[cfg(feature = "multi_threaded")]
|
||||
pub use iterators::EventParIter;
|
||||
pub use iterators::{EventIterator, EventIteratorWithId};
|
||||
pub use reader::{EventReader, ManualEventReader};
|
||||
pub use registry::{EventRegistry, ShouldUpdateEvents};
|
||||
pub use update::{
|
||||
|
@ -421,30 +423,33 @@ mod tests {
|
|||
#[cfg(feature = "multi_threaded")]
|
||||
#[test]
|
||||
fn test_events_par_iter() {
|
||||
use std::{collections::HashSet, sync::mpsc};
|
||||
|
||||
use crate::prelude::*;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
|
||||
#[derive(Resource)]
|
||||
struct Counter(AtomicUsize);
|
||||
|
||||
let mut world = World::new();
|
||||
world.init_resource::<Events<TestEvent>>();
|
||||
for i in 0..100 {
|
||||
world.send_event(TestEvent { i });
|
||||
for _ in 0..100 {
|
||||
world.send_event(TestEvent { i: 1 });
|
||||
}
|
||||
|
||||
let mut schedule = Schedule::default();
|
||||
|
||||
schedule.add_systems(|mut events: EventReader<TestEvent>| {
|
||||
let (tx, rx) = mpsc::channel();
|
||||
events.par_read().for_each(|event| {
|
||||
tx.send(event.i).unwrap();
|
||||
});
|
||||
drop(tx);
|
||||
|
||||
let observed: HashSet<_> = rx.into_iter().collect();
|
||||
assert_eq!(observed, HashSet::from_iter(0..100));
|
||||
});
|
||||
schedule.add_systems(
|
||||
|mut events: EventReader<TestEvent>, counter: ResMut<Counter>| {
|
||||
events.par_read().for_each(|event| {
|
||||
counter.0.fetch_add(event.i, Ordering::Relaxed);
|
||||
});
|
||||
},
|
||||
);
|
||||
world.insert_resource(Counter(AtomicUsize::new(0)));
|
||||
schedule.run(&mut world);
|
||||
}
|
||||
let counter = world.remove_resource::<Counter>().unwrap();
|
||||
assert_eq!(counter.0.into_inner(), 100);
|
||||
|
||||
// Peak tests
|
||||
world.insert_resource(Counter(AtomicUsize::new(0)));
|
||||
schedule.run(&mut world);
|
||||
let counter = world.remove_resource::<Counter>().unwrap();
|
||||
assert_eq!(counter.0.into_inner(), 0);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
use crate as bevy_ecs;
|
||||
#[cfg(feature = "multi_threaded")]
|
||||
use bevy_ecs::event::EventParIter;
|
||||
use bevy_ecs::{
|
||||
event::{Event, EventIterator, EventIteratorWithId, EventParIter, Events},
|
||||
event::{Event, EventIterator, EventIteratorWithId, Events},
|
||||
system::{Local, Res, SystemParam},
|
||||
};
|
||||
use std::marker::PhantomData;
|
||||
|
@ -66,6 +68,7 @@ impl<'w, 's, E: Event> EventReader<'w, 's, E> {
|
|||
/// assert_eq!(counter.into_inner(), 4950);
|
||||
/// ```
|
||||
///
|
||||
#[cfg(feature = "multi_threaded")]
|
||||
pub fn par_read(&mut self) -> EventParIter<'_, E> {
|
||||
self.reader.par_read(&self.events)
|
||||
}
|
||||
|
@ -189,6 +192,7 @@ impl<E: Event> ManualEventReader<E> {
|
|||
}
|
||||
|
||||
/// See [`EventReader::par_read`]
|
||||
#[cfg(feature = "multi_threaded")]
|
||||
pub fn par_read<'a>(&'a mut self, events: &'a Events<E>) -> EventParIter<'a, E> {
|
||||
EventParIter::new(self, events)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue