Fix bug where events are not being dropped (#11528)

# Objective

Fix an issue where events are not being dropped after being read. I
believe #10077 introduced this issue. The code currently works as
follows:

1. `EventUpdateSignal` is **shared for all event types**
2. During the fixed update phase, `EventUpdateSignal` is set to true
3. `event_update_system`, **unique per event type**, runs to update
Events<T>
4. `event_update_system` reads value of `EventUpdateSignal` to check if
it should update, and then **resets** the value to false

If there are multiple event types, the first `event_update_system` run
will reset the shared `EventUpdateSignal` signal, preventing other
events from being cleared.

## Solution

I've updated the code to have separate signals per event type and added
a shared signal to notify all systems that the time plugin is installed.

## Changelog

- Fixed bug where events were not being dropped
This commit is contained in:
Doug Roeper 2024-02-02 16:14:54 -05:00 committed by GitHub
parent 041731b7e0
commit c859eacdc8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 89 additions and 7 deletions

View file

@ -213,6 +213,7 @@ pub enum PluginsState {
// Dummy plugin used to temporary hold the place in the plugin registry // Dummy plugin used to temporary hold the place in the plugin registry
struct PlaceholderPlugin; struct PlaceholderPlugin;
impl Plugin for PlaceholderPlugin { impl Plugin for PlaceholderPlugin {
fn build(&self, _app: &mut App) {} fn build(&self, _app: &mut App) {}
} }
@ -505,6 +506,7 @@ impl App {
self.init_resource::<Events<T>>().add_systems( self.init_resource::<Events<T>>().add_systems(
First, First,
bevy_ecs::event::event_update_system::<T> bevy_ecs::event::event_update_system::<T>
.in_set(bevy_ecs::event::EventUpdates)
.run_if(bevy_ecs::event::event_update_condition::<T>), .run_if(bevy_ecs::event::event_update_condition::<T>),
); );
} }

View file

@ -3,6 +3,7 @@
use crate as bevy_ecs; use crate as bevy_ecs;
use crate::system::{Local, Res, ResMut, Resource, SystemParam}; use crate::system::{Local, Res, ResMut, Resource, SystemParam};
pub use bevy_ecs_macros::Event; pub use bevy_ecs_macros::Event;
use bevy_ecs_macros::SystemSet;
use bevy_utils::detailed_trace; use bevy_utils::detailed_trace;
use std::ops::{Deref, DerefMut}; use std::ops::{Deref, DerefMut};
use std::{ use std::{
@ -13,6 +14,7 @@ use std::{
marker::PhantomData, marker::PhantomData,
slice::Iter, slice::Iter,
}; };
/// A type that can be stored in an [`Events<E>`] resource /// A type that can be stored in an [`Events<E>`] resource
/// You can conveniently access events using the [`EventReader`] and [`EventWriter`] system parameter. /// You can conveniently access events using the [`EventReader`] and [`EventWriter`] system parameter.
/// ///
@ -33,6 +35,7 @@ pub struct EventId<E: Event> {
} }
impl<E: Event> Copy for EventId<E> {} impl<E: Event> Copy for EventId<E> {}
impl<E: Event> Clone for EventId<E> { impl<E: Event> Clone for EventId<E> {
fn clone(&self) -> Self { fn clone(&self) -> Self {
*self *self
@ -790,22 +793,33 @@ impl<'a, E: Event> ExactSizeIterator for EventIteratorWithId<'a, E> {
#[derive(Resource, Default)] #[derive(Resource, Default)]
pub struct EventUpdateSignal(bool); pub struct EventUpdateSignal(bool);
/// A system that queues a call to [`Events::update`]. #[doc(hidden)]
pub fn event_queue_update_system(signal: Option<ResMut<EventUpdateSignal>>) { #[derive(SystemSet, Clone, Debug, PartialEq, Eq, Hash)]
pub struct EventUpdates;
/// Signals the [`event_update_system`] to run after `FixedUpdate` systems.
pub fn signal_event_update_system(signal: Option<ResMut<EventUpdateSignal>>) {
if let Some(mut s) = signal { if let Some(mut s) = signal {
s.0 = true; s.0 = true;
} }
} }
/// Resets the `EventUpdateSignal`
pub fn reset_event_update_signal_system(signal: Option<ResMut<EventUpdateSignal>>) {
if let Some(mut s) = signal {
s.0 = false;
}
}
/// A system that calls [`Events::update`]. /// A system that calls [`Events::update`].
pub fn event_update_system<T: Event>( pub fn event_update_system<T: Event>(
signal: Option<ResMut<EventUpdateSignal>>, update_signal: Option<Res<EventUpdateSignal>>,
mut events: ResMut<Events<T>>, mut events: ResMut<Events<T>>,
) { ) {
if let Some(mut s) = signal { if let Some(signal) = update_signal {
// If we haven't got a signal to update the events, but we *could* get such a signal // If we haven't got a signal to update the events, but we *could* get such a signal
// return early and update the events later. // return early and update the events later.
if !std::mem::replace(&mut s.0, false) { if !signal.0 {
return; return;
} }
} }

View file

@ -25,7 +25,7 @@ pub mod prelude {
} }
use bevy_app::{prelude::*, RunFixedMainLoop}; use bevy_app::{prelude::*, RunFixedMainLoop};
use bevy_ecs::event::{event_queue_update_system, EventUpdateSignal}; use bevy_ecs::event::{signal_event_update_system, EventUpdateSignal, EventUpdates};
use bevy_ecs::prelude::*; use bevy_ecs::prelude::*;
use bevy_utils::{tracing::warn, Duration, Instant}; use bevy_utils::{tracing::warn, Duration, Instant};
pub use crossbeam_channel::TrySendError; pub use crossbeam_channel::TrySendError;
@ -61,7 +61,11 @@ impl Plugin for TimePlugin {
// ensure the events are not dropped until `FixedMain` systems can observe them // ensure the events are not dropped until `FixedMain` systems can observe them
app.init_resource::<EventUpdateSignal>() app.init_resource::<EventUpdateSignal>()
.add_systems(FixedPostUpdate, event_queue_update_system); .add_systems(
First,
bevy_ecs::event::reset_event_update_signal_system.after(EventUpdates),
)
.add_systems(FixedPostUpdate, signal_event_update_system);
#[cfg(feature = "bevy_ci_testing")] #[cfg(feature = "bevy_ci_testing")]
if let Some(ci_testing_config) = app if let Some(ci_testing_config) = app
@ -142,3 +146,65 @@ fn time_system(
TimeUpdateStrategy::ManualDuration(duration) => time.update_with_duration(*duration), TimeUpdateStrategy::ManualDuration(duration) => time.update_with_duration(*duration),
} }
} }
#[cfg(test)]
mod tests {
use crate::{Fixed, Time, TimePlugin, TimeUpdateStrategy};
use bevy_app::{App, Startup, Update};
use bevy_ecs::event::{Event, EventReader, EventWriter};
use std::error::Error;
#[derive(Event)]
struct TestEvent<T: Default> {
sender: std::sync::mpsc::Sender<T>,
}
impl<T: Default> Drop for TestEvent<T> {
fn drop(&mut self) {
self.sender
.send(T::default())
.expect("Failed to send drop signal");
}
}
#[test]
fn events_get_dropped_regression_test_11528() -> Result<(), impl Error> {
let (tx1, rx1) = std::sync::mpsc::channel();
let (tx2, rx2) = std::sync::mpsc::channel();
let mut app = App::new();
app.add_plugins(TimePlugin)
.add_event::<TestEvent<i32>>()
.add_event::<TestEvent<()>>()
.add_systems(Startup, move |mut ev2: EventWriter<TestEvent<()>>| {
ev2.send(TestEvent {
sender: tx2.clone(),
});
})
.add_systems(Update, move |mut ev1: EventWriter<TestEvent<i32>>| {
// Keep adding events so this event type is processed every update
ev1.send(TestEvent {
sender: tx1.clone(),
});
})
.add_systems(
Update,
|mut ev1: EventReader<TestEvent<i32>>, mut ev2: EventReader<TestEvent<()>>| {
// Read events so they can be dropped
for _ in ev1.read() {}
for _ in ev2.read() {}
},
)
.insert_resource(TimeUpdateStrategy::ManualDuration(
Time::<Fixed>::default().timestep(),
));
for _ in 0..10 {
app.update();
}
// Check event type 1 as been dropped at least once
let _drop_signal = rx1.try_recv()?;
// Check event type 2 has been dropped
rx2.try_recv()
}
}