add thread local resources (#671)

This commit is contained in:
Carter Anderson 2020-10-12 15:09:44 -07:00 committed by GitHub
parent 53d6d10506
commit 930eba4ccd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 287 additions and 40 deletions

View file

@ -227,6 +227,14 @@ impl AppBuilder {
self
}
pub fn add_thread_local_resource<T>(&mut self, resource: T) -> &mut Self
where
T: 'static,
{
self.app.resources.insert_thread_local(resource);
self
}
pub fn init_resource<R>(&mut self) -> &mut Self
where
R: FromResources + Send + Sync + 'static,
@ -237,6 +245,16 @@ impl AppBuilder {
self
}
pub fn init_thread_local_resource<R>(&mut self) -> &mut Self
where
R: FromResources + 'static,
{
let resource = R::from_resources(&self.app.resources);
self.app.resources.insert_thread_local(resource);
self
}
pub fn set_runner(&mut self, run_fn: impl Fn(App) + 'static) -> &mut Self {
self.app.runner = Box::new(run_fn);
self

View file

@ -22,14 +22,17 @@ use core::{
use crate::{archetype::Archetype, Component, MissingComponent};
/// Atomically enforces Rust-style borrow checking at runtime
#[derive(Debug)]
pub struct AtomicBorrow(AtomicUsize);
impl AtomicBorrow {
/// Creates a new AtomicBorrow
pub const fn new() -> Self {
Self(AtomicUsize::new(0))
}
/// Starts a new immutable borrow. This can be called any number of times
pub fn borrow(&self) -> bool {
let value = self.0.fetch_add(1, Ordering::Acquire).wrapping_add(1);
if value == 0 {
@ -44,18 +47,21 @@ impl AtomicBorrow {
}
}
/// Starts a new mutable borrow. This must be unique. It cannot be done in parallel with other borrows or borrow_muts
pub fn borrow_mut(&self) -> bool {
self.0
.compare_exchange(0, UNIQUE_BIT, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
}
/// Release an immutable borrow.
pub fn release(&self) {
let value = self.0.fetch_sub(1, Ordering::Release);
debug_assert!(value != 0, "unbalanced release");
debug_assert!(value & UNIQUE_BIT == 0, "shared release of unique borrow");
}
/// Release a mutable borrow.
pub fn release_mut(&self) {
let value = self.0.fetch_and(!UNIQUE_BIT, Ordering::Release);
debug_assert_ne!(value & UNIQUE_BIT, 0, "unique release of shared borrow");

View file

@ -76,7 +76,7 @@ mod serde;
mod world;
pub use archetype::{Archetype, TypeState};
pub use borrow::{Ref, RefMut};
pub use borrow::{AtomicBorrow, Ref, RefMut};
pub use bundle::{Bundle, DynamicBundle, MissingComponent};
pub use entities::{Entity, EntityReserver, Location, NoSuchEntity};
pub use entity_builder::{BuiltEntity, EntityBuilder};

View file

@ -1,9 +1,15 @@
use super::{FetchResource, ResourceQuery};
use crate::system::SystemId;
use bevy_hecs::{Archetype, Entity, Ref, RefMut, TypeInfo, TypeState};
use bevy_hecs::{Archetype, AtomicBorrow, Entity, Ref, RefMut, TypeInfo, TypeState};
use bevy_utils::HashMap;
use core::any::TypeId;
use std::ptr::NonNull;
use downcast_rs::{impl_downcast, Downcast};
use std::{
fmt::Debug,
ops::{Deref, DerefMut},
ptr::NonNull,
thread::ThreadId,
};
/// A Resource type
pub trait Resource: Send + Sync + 'static {}
@ -22,10 +28,76 @@ pub enum ResourceIndex {
System(SystemId),
}
// TODO: consider using this for normal resources (would require change tracking)
trait ResourceStorage: Downcast {}
impl_downcast!(ResourceStorage);
struct StoredResource<T: 'static> {
value: T,
atomic_borrow: AtomicBorrow,
}
pub struct VecResourceStorage<T: 'static> {
stored: Vec<StoredResource<T>>,
}
impl<T: 'static> VecResourceStorage<T> {
fn get(&self, index: usize) -> Option<ResourceRef<'_, T>> {
self.stored
.get(index)
.map(|stored| ResourceRef::new(&stored.value, &stored.atomic_borrow))
}
fn get_mut(&self, index: usize) -> Option<ResourceRefMut<'_, T>> {
self.stored.get(index).map(|stored|
// SAFE: ResourceRefMut ensures that this borrow is unique
unsafe {
let value = &stored.value as *const T as *mut T;
ResourceRefMut::new(&mut *value, &stored.atomic_borrow)
})
}
fn push(&mut self, resource: T) {
self.stored.push(StoredResource {
atomic_borrow: AtomicBorrow::new(),
value: resource,
})
}
fn set(&mut self, index: usize, resource: T) {
self.stored[index].value = resource;
}
fn is_empty(&self) -> bool {
self.stored.is_empty()
}
}
impl<T: 'static> Default for VecResourceStorage<T> {
fn default() -> Self {
Self {
stored: Default::default(),
}
}
}
impl<T: 'static> ResourceStorage for VecResourceStorage<T> {}
/// A collection of resource instances identified by their type.
#[derive(Debug, Default)]
pub struct Resources {
pub(crate) resource_data: HashMap<TypeId, ResourceData>,
thread_local_data: HashMap<TypeId, Box<dyn ResourceStorage>>,
main_thread_id: ThreadId,
}
impl Default for Resources {
fn default() -> Self {
Resources {
resource_data: Default::default(),
thread_local_data: Default::default(),
main_thread_id: std::thread::current().id(),
}
}
}
impl Resources {
@ -33,6 +105,26 @@ impl Resources {
self.insert_resource(resource, ResourceIndex::Global);
}
pub fn insert_thread_local<T: 'static>(&mut self, resource: T) {
self.check_thread_local();
let entry = self
.thread_local_data
.entry(TypeId::of::<T>())
.or_insert_with(|| Box::new(VecResourceStorage::<T>::default()));
let resources = entry.downcast_mut::<VecResourceStorage<T>>().unwrap();
if resources.is_empty() {
resources.push(resource);
} else {
resources.set(0, resource);
}
}
fn check_thread_local(&self) {
if std::thread::current().id() != self.main_thread_id {
panic!("Attempted to access a thread local resource off of the main thread.")
}
}
pub fn contains<T: Resource>(&self) -> bool {
self.get_resource::<T>(ResourceIndex::Global).is_some()
}
@ -45,6 +137,26 @@ impl Resources {
self.get_resource_mut(ResourceIndex::Global)
}
pub fn get_thread_local<T: 'static>(&self) -> Option<ResourceRef<'_, T>> {
self.check_thread_local();
self.thread_local_data
.get(&TypeId::of::<T>())
.and_then(|storage| {
let resources = storage.downcast_ref::<VecResourceStorage<T>>().unwrap();
resources.get(0)
})
}
pub fn get_thread_local_mut<T: 'static>(&self) -> Option<ResourceRefMut<'_, T>> {
self.check_thread_local();
self.thread_local_data
.get(&TypeId::of::<T>())
.and_then(|storage| {
let resources = storage.downcast_ref::<VecResourceStorage<T>>().unwrap();
resources.get_mut(0)
})
}
/// Returns a clone of the underlying resource, this is helpful when borrowing something
/// cloneable (like a task pool) without taking a borrow on the resource map
pub fn get_cloned<T: Resource + Clone>(&self) -> Option<T> {
@ -281,6 +393,93 @@ where
}
}
/// Shared borrow of an entity's component
#[derive(Clone)]
pub struct ResourceRef<'a, T: 'static> {
borrow: &'a AtomicBorrow,
resource: &'a T,
}
impl<'a, T: 'static> ResourceRef<'a, T> {
/// Creates a new resource borrow
pub fn new(resource: &'a T, borrow: &'a AtomicBorrow) -> Self {
borrow.borrow();
Self { resource, borrow }
}
}
unsafe impl<T: 'static> Send for ResourceRef<'_, T> {}
unsafe impl<T: 'static> Sync for ResourceRef<'_, T> {}
impl<'a, T: 'static> Drop for ResourceRef<'a, T> {
fn drop(&mut self) {
self.borrow.release()
}
}
impl<'a, T: 'static> Deref for ResourceRef<'a, T> {
type Target = T;
fn deref(&self) -> &T {
self.resource
}
}
impl<'a, T: 'static> Debug for ResourceRef<'a, T>
where
T: Debug,
{
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
self.deref().fmt(f)
}
}
/// Unique borrow of a resource
pub struct ResourceRefMut<'a, T: 'static> {
borrow: &'a AtomicBorrow,
resource: &'a mut T,
}
impl<'a, T: 'static> ResourceRefMut<'a, T> {
/// Creates a new entity component mutable borrow
pub fn new(resource: &'a mut T, borrow: &'a AtomicBorrow) -> Self {
borrow.borrow_mut();
Self { resource, borrow }
}
}
unsafe impl<T: 'static> Send for ResourceRefMut<'_, T> {}
unsafe impl<T: 'static> Sync for ResourceRefMut<'_, T> {}
impl<'a, T: 'static> Drop for ResourceRefMut<'a, T> {
fn drop(&mut self) {
self.borrow.release_mut();
}
}
impl<'a, T: 'static> Deref for ResourceRefMut<'a, T> {
type Target = T;
fn deref(&self) -> &T {
self.resource
}
}
impl<'a, T: 'static> DerefMut for ResourceRefMut<'a, T> {
fn deref_mut(&mut self) -> &mut T {
self.resource
}
}
impl<'a, T: 'static> Debug for ResourceRefMut<'a, T>
where
T: Debug,
{
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
self.deref().fmt(f)
}
}
#[cfg(test)]
mod tests {
use super::Resources;
@ -335,4 +534,44 @@ mod tests {
let _x = resources.get_mut::<i32>();
let _y = resources.get_mut::<i32>();
}
#[test]
fn thread_local_resource() {
let mut resources = Resources::default();
resources.insert_thread_local(123i32);
resources.insert_thread_local(456i64);
assert_eq!(*resources.get_thread_local::<i32>().unwrap(), 123);
assert_eq!(*resources.get_thread_local_mut::<i64>().unwrap(), 456);
}
#[test]
fn thread_local_resource_ref_aliasing() {
let mut resources = Resources::default();
resources.insert_thread_local(123i32);
let a = resources.get_thread_local::<i32>().unwrap();
let b = resources.get_thread_local::<i32>().unwrap();
assert_eq!(*a, 123);
assert_eq!(*b, 123);
}
#[test]
#[should_panic]
fn thread_local_resource_mut_ref_aliasing() {
let mut resources = Resources::default();
resources.insert_thread_local(123i32);
let _a = resources.get_thread_local::<i32>().unwrap();
let _b = resources.get_thread_local_mut::<i32>().unwrap();
}
#[test]
#[should_panic]
fn thread_local_resource_panic() {
let mut resources = Resources::default();
resources.insert_thread_local(0i32);
std::thread::spawn(move || {
let _ = resources.get_thread_local_mut::<i32>();
})
.join()
.unwrap();
}
}

View file

@ -1,34 +1,16 @@
use crate::converter::{convert_axis, convert_button, convert_gamepad_id};
use bevy_app::Events;
use bevy_ecs::{Res, ResMut};
use bevy_ecs::{Resources, World};
use bevy_input::prelude::*;
use gilrs::{Button, EventType, Gilrs};
use std::sync::{Arc, Mutex};
// TODO: remove this if/when bevy_ecs supports thread local resources
#[derive(Debug)]
struct GilrsSendWrapper(Gilrs);
unsafe impl Send for GilrsSendWrapper {}
#[derive(Debug)]
pub struct GilrsArcMutexWrapper(Arc<Mutex<GilrsSendWrapper>>);
impl GilrsArcMutexWrapper {
pub fn new(gilrs: Gilrs) -> GilrsArcMutexWrapper {
GilrsArcMutexWrapper(Arc::new(Mutex::new(GilrsSendWrapper(gilrs))))
}
}
pub fn gilrs_startup_system(
gilrs: Res<GilrsArcMutexWrapper>,
mut gamepad_event: ResMut<Events<GamepadEvent>>,
mut inputs: ResMut<Input<GamepadButton>>,
mut axes: ResMut<Axis<GamepadAxis>>,
) {
pub fn gilrs_startup_system(_world: &mut World, resources: &mut Resources) {
let gilrs = resources.get_thread_local::<Gilrs>().unwrap();
let mut gamepad_event = resources.get_mut::<Events<GamepadEvent>>().unwrap();
let mut inputs = resources.get_mut::<Input<GamepadButton>>().unwrap();
let mut axes = resources.get_mut::<Axis<GamepadAxis>>().unwrap();
gamepad_event.update();
inputs.update();
let gilrs = &gilrs.0.lock().unwrap().0;
for (gilrs_id, gilrs_gamepad) in gilrs.gamepads() {
connect_gamepad(
gilrs_gamepad,
@ -40,15 +22,14 @@ pub fn gilrs_startup_system(
}
}
pub fn gilrs_update_system(
gilrs: Res<GilrsArcMutexWrapper>,
mut gamepad_event: ResMut<Events<GamepadEvent>>,
mut inputs: ResMut<Input<GamepadButton>>,
mut axes: ResMut<Axis<GamepadAxis>>,
) {
pub fn gilrs_update_system(_world: &mut World, resources: &mut Resources) {
let mut gilrs = resources.get_thread_local_mut::<Gilrs>().unwrap();
let mut gamepad_event = resources.get_mut::<Events<GamepadEvent>>().unwrap();
let mut inputs = resources.get_mut::<Input<GamepadButton>>().unwrap();
let mut axes = resources.get_mut::<Axis<GamepadAxis>>().unwrap();
gamepad_event.update();
inputs.update();
let gilrs = &mut gilrs.0.lock().unwrap().0;
while let Some(gilrs_event) = gilrs.next_event() {
match gilrs_event.event {
EventType::Connected => {

View file

@ -2,8 +2,8 @@ mod converter;
mod gilrs_system;
use bevy_app::prelude::*;
use bevy_ecs::IntoQuerySystem;
use gilrs_system::{gilrs_startup_system, gilrs_update_system, GilrsArcMutexWrapper};
use bevy_ecs::prelude::*;
use gilrs_system::{gilrs_startup_system, gilrs_update_system};
#[derive(Default)]
pub struct GilrsPlugin;
@ -12,9 +12,12 @@ impl Plugin for GilrsPlugin {
fn build(&self, app: &mut AppBuilder) {
match gilrs::Gilrs::new() {
Ok(gilrs) => {
app.add_resource(GilrsArcMutexWrapper::new(gilrs))
.add_startup_system(gilrs_startup_system.system())
.add_system_to_stage(stage::EVENT_UPDATE, gilrs_update_system.system());
app.add_thread_local_resource(gilrs)
.add_startup_system(gilrs_startup_system.thread_local_system())
.add_system_to_stage(
stage::EVENT_UPDATE,
gilrs_update_system.thread_local_system(),
);
}
Err(err) => log::error!("Failed to start Gilrs. {}", err),
}