asset: use bevy_tasks in AssetServer (#550)

This commit is contained in:
Carter Anderson 2020-09-21 20:23:09 -07:00 committed by GitHub
parent dd6f0b5e04
commit 028a22b129
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 141 additions and 174 deletions

View file

@ -19,7 +19,6 @@ dynamic_plugins = ["libloading"]
# bevy
bevy_derive = { path = "../bevy_derive", version = "0.2.1" }
bevy_ecs = { path = "../bevy_ecs", version = "0.2.1" }
bevy_tasks = { path = "../bevy_tasks", version = "0.2.1" }
bevy_math = { path = "../bevy_math", version = "0.2.1" }
# other

View file

@ -1,4 +1,4 @@
use crate::{app_builder::AppBuilder, DefaultTaskPoolOptions};
use crate::app_builder::AppBuilder;
use bevy_ecs::{ParallelExecutor, Resources, Schedule, World};
#[allow(clippy::needless_doctest_main)]
@ -64,20 +64,16 @@ impl App {
}
pub fn run(mut self) {
// Setup the default bevy task pools
self.resources
.get_cloned::<DefaultTaskPoolOptions>()
.unwrap_or_else(DefaultTaskPoolOptions::default)
.create_default_pools(&mut self.resources);
self.startup_schedule
.initialize(&mut self.world, &mut self.resources);
self.startup_executor.initialize(&mut self.resources);
self.startup_executor.run(
&mut self.startup_schedule,
&mut self.world,
&mut self.resources,
);
self.executor.initialize(&mut self.resources);
let runner = std::mem::replace(&mut self.runner, Box::new(run_once));
(runner)(self);
}

View file

@ -8,7 +8,6 @@ mod app_builder;
mod event;
mod plugin;
mod schedule_runner;
mod task_pool_options;
pub use app::*;
pub use app_builder::*;
@ -16,7 +15,6 @@ pub use bevy_derive::DynamicPlugin;
pub use event::*;
pub use plugin::*;
pub use schedule_runner::*;
pub use task_pool_options::*;
pub mod prelude {
pub use crate::{

View file

@ -20,6 +20,7 @@ filesystem_watcher = ["notify"]
# bevy
bevy_app = { path = "../bevy_app", version = "0.2.1" }
bevy_ecs = { path = "../bevy_ecs", version = "0.2.1" }
bevy_tasks = { path = "../bevy_tasks", version = "0.2.1" }
bevy_type_registry = { path = "../bevy_type_registry", version = "0.2.1" }
bevy_property = { path = "../bevy_property", version = "0.2.1" }
bevy_utils = { path = "../bevy_utils", version = "0.2.1" }

View file

@ -4,6 +4,7 @@ use crate::{
};
use anyhow::Result;
use bevy_ecs::{Res, Resource, Resources};
use bevy_tasks::TaskPool;
use bevy_utils::{HashMap, HashSet};
use crossbeam_channel::TryRecvError;
use parking_lot::RwLock;
@ -11,7 +12,6 @@ use std::{
env, fs, io,
path::{Path, PathBuf},
sync::Arc,
thread,
};
use thiserror::Error;
@ -38,12 +38,6 @@ pub enum AssetServerError {
AssetWatchError { path: PathBuf },
}
struct LoaderThread {
// NOTE: these must remain private. the LoaderThread Arc counters are used to determine thread liveness
// if there is one reference, the loader thread is dead. if there are two references, the loader thread is active
requests: Arc<RwLock<Vec<LoadRequest>>>,
}
/// Info about a specific asset, such as its path and its current load state
#[derive(Clone, Debug)]
pub struct AssetInfo {
@ -73,11 +67,10 @@ impl LoadState {
/// Loads assets from the filesystem on background threads
pub struct AssetServer {
asset_folders: RwLock<Vec<PathBuf>>,
loader_threads: RwLock<Vec<LoaderThread>>,
max_loader_threads: usize,
asset_handlers: Arc<RwLock<Vec<Box<dyn AssetLoadRequestHandler>>>>,
// TODO: this is a hack to enable retrieving generic AssetLoader<T>s. there must be a better way!
loaders: Vec<Resources>,
task_pool: TaskPool,
extension_to_handler_index: HashMap<String, usize>,
extension_to_loader_index: HashMap<String, usize>,
asset_info: RwLock<HashMap<HandleId, AssetInfo>>,
@ -86,25 +79,22 @@ pub struct AssetServer {
filesystem_watcher: Arc<RwLock<Option<FilesystemWatcher>>>,
}
impl Default for AssetServer {
fn default() -> Self {
impl AssetServer {
pub fn new(task_pool: TaskPool) -> Self {
AssetServer {
#[cfg(feature = "filesystem_watcher")]
filesystem_watcher: Arc::new(RwLock::new(None)),
max_loader_threads: 4,
asset_folders: Default::default(),
loader_threads: Default::default(),
asset_handlers: Default::default(),
loaders: Default::default(),
extension_to_handler_index: Default::default(),
extension_to_loader_index: Default::default(),
asset_info_paths: Default::default(),
asset_info: Default::default(),
task_pool,
#[cfg(feature = "filesystem_watcher")]
filesystem_watcher: Arc::new(RwLock::new(None)),
}
}
}
impl AssetServer {
pub fn add_handler<T>(&mut self, asset_handler: T)
where
T: AssetLoadRequestHandler,
@ -183,46 +173,6 @@ impl AssetServer {
Ok(())
}
#[cfg(feature = "filesystem_watcher")]
pub fn filesystem_watcher_system(asset_server: Res<AssetServer>) {
let mut changed = HashSet::default();
loop {
let result = {
let rwlock_guard = asset_server.filesystem_watcher.read();
if let Some(filesystem_watcher) = rwlock_guard.as_ref() {
filesystem_watcher.receiver.try_recv()
} else {
break;
}
};
let event = match result {
Ok(result) => result.unwrap(),
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => panic!("FilesystemWatcher disconnected"),
};
if let notify::event::Event {
kind: notify::event::EventKind::Modify(_),
paths,
..
} = event
{
for path in paths.iter() {
if !changed.contains(path) {
let root_path = asset_server.get_root_path().unwrap();
let relative_path = path.strip_prefix(root_path).unwrap();
match asset_server.load_untyped(relative_path) {
Ok(_) => {}
Err(AssetServerError::AssetLoadError(error)) => panic!("{:?}", error),
Err(_) => {}
}
}
}
changed.extend(paths);
}
}
}
fn get_root_path(&self) -> Result<PathBuf, AssetServerError> {
if let Ok(manifest_dir) = env::var("CARGO_MANIFEST_DIR") {
Ok(PathBuf::from(manifest_dir))
@ -315,12 +265,21 @@ impl AssetServer {
}
};
self.send_request_to_loader_thread(LoadRequest {
let load_request = LoadRequest {
handle_id,
path: path.to_owned(),
handler_index: *index,
version: new_version,
});
};
let asset_handlers = self.asset_handlers.clone();
self.task_pool
.spawn(async move {
let handlers = asset_handlers.read();
let request_handler = &handlers[load_request.handler_index];
request_handler.handle_request(&load_request);
})
.detach();
// TODO: watching each asset explicitly is a simpler implementation, its possible it would be more efficient to watch
// folders instead (when possible)
@ -370,56 +329,6 @@ impl AssetServer {
Some(load_state)
}
fn send_request_to_loader_thread(&self, load_request: LoadRequest) {
// NOTE: This lock makes the call to Arc::strong_count safe. Removing (or reordering) it could result in undefined behavior
let mut loader_threads = self.loader_threads.write();
if loader_threads.len() < self.max_loader_threads {
let loader_thread = LoaderThread {
requests: Arc::new(RwLock::new(vec![load_request])),
};
let requests = loader_thread.requests.clone();
loader_threads.push(loader_thread);
Self::start_thread(self.asset_handlers.clone(), requests);
} else {
let most_free_thread = loader_threads
.iter()
.min_by_key(|l| l.requests.read().len())
.unwrap();
let mut requests = most_free_thread.requests.write();
requests.push(load_request);
// if most free thread only has one reference, the thread as spun down. if so, we need to spin it back up!
if Arc::strong_count(&most_free_thread.requests) == 1 {
Self::start_thread(
self.asset_handlers.clone(),
most_free_thread.requests.clone(),
);
}
}
}
fn start_thread(
request_handlers: Arc<RwLock<Vec<Box<dyn AssetLoadRequestHandler>>>>,
requests: Arc<RwLock<Vec<LoadRequest>>>,
) {
thread::spawn(move || {
loop {
let request = {
let mut current_requests = requests.write();
if current_requests.len() == 0 {
// if there are no requests, spin down the thread
break;
}
current_requests.pop().unwrap()
};
let handlers = request_handlers.read();
let request_handler = &handlers[request.handler_index];
request_handler.handle_request(&request);
}
});
}
fn load_assets_in_folder_recursive(
&self,
path: &Path,
@ -456,3 +365,43 @@ impl AssetServer {
Ok(handle_ids)
}
}
#[cfg(feature = "filesystem_watcher")]
pub fn filesystem_watcher_system(asset_server: Res<AssetServer>) {
let mut changed = HashSet::default();
loop {
let result = {
let rwlock_guard = asset_server.filesystem_watcher.read();
if let Some(filesystem_watcher) = rwlock_guard.as_ref() {
filesystem_watcher.receiver.try_recv()
} else {
break;
}
};
let event = match result {
Ok(result) => result.unwrap(),
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => panic!("FilesystemWatcher disconnected"),
};
if let notify::event::Event {
kind: notify::event::EventKind::Modify(_),
paths,
..
} = event
{
for path in paths.iter() {
if !changed.contains(path) {
let root_path = asset_server.get_root_path().unwrap();
let relative_path = path.strip_prefix(root_path).unwrap();
match asset_server.load_untyped(relative_path) {
Ok(_) => {}
Err(AssetServerError::AssetLoadError(error)) => panic!("{:?}", error),
Err(_) => {}
}
}
}
changed.extend(paths);
}
}
}

View file

@ -8,6 +8,7 @@ mod loader;
pub use asset_server::*;
pub use assets::*;
use bevy_tasks::IoTaskPool;
pub use handle::*;
pub use load_request::*;
pub use loader::*;
@ -33,15 +34,21 @@ pub struct AssetPlugin;
impl Plugin for AssetPlugin {
fn build(&self, app: &mut AppBuilder) {
let task_pool = app
.resources()
.get::<IoTaskPool>()
.expect("IoTaskPool resource not found")
.0
.clone();
app.add_stage_before(bevy_app::stage::PRE_UPDATE, stage::LOAD_ASSETS)
.add_stage_after(bevy_app::stage::POST_UPDATE, stage::ASSET_EVENTS)
.init_resource::<AssetServer>()
.add_resource(AssetServer::new(task_pool))
.register_property::<HandleId>();
#[cfg(feature = "filesystem_watcher")]
app.add_system_to_stage(
stage::LOAD_ASSETS,
AssetServer::filesystem_watcher_system.system(),
asset_server::filesystem_watcher_system.system(),
);
}
}

View file

@ -26,6 +26,9 @@ bevy_property = { path = "../bevy_property", version = "0.2.1" }
bevy_type_registry = { path = "../bevy_type_registry", version = "0.2.1" }
bevy_math = { path = "../bevy_math", version = "0.2.1" }
bevy_utils = { path = "../bevy_utils", version = "0.2.1" }
bevy_tasks = { path = "../bevy_tasks", version = "0.2.1" }
log = { version = "0.4", features = ["release_max_level_info"] }
[target.'cfg(target_arch = "wasm32")'.dependencies]
instant = "0.1.6"

View file

@ -1,15 +1,17 @@
mod bytes;
mod float_ord;
mod label;
mod task_pool_options;
mod time;
pub use bytes::*;
pub use float_ord::*;
pub use label::*;
pub use task_pool_options::DefaultTaskPoolOptions;
pub use time::*;
pub mod prelude {
pub use crate::{EntityLabels, Labels, Time, Timer};
pub use crate::{DefaultTaskPoolOptions, EntityLabels, Labels, Time, Timer};
}
use bevy_app::prelude::*;
@ -23,6 +25,12 @@ pub struct CorePlugin;
impl Plugin for CorePlugin {
fn build(&self, app: &mut AppBuilder) {
// Setup the default bevy task pools
app.resources_mut()
.get_cloned::<DefaultTaskPoolOptions>()
.unwrap_or_else(DefaultTaskPoolOptions::default)
.create_default_pools(app.resources_mut());
app.init_resource::<Time>()
.init_resource::<EntityLabels>()
.register_component::<Timer>()

View file

@ -1,5 +1,5 @@
use bevy_ecs::Resources;
use bevy_tasks::{AsyncComputeTaskPool, ComputeTaskPool, IOTaskPool, TaskPoolBuilder};
use bevy_tasks::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool, TaskPoolBuilder};
/// Defines a simple way to determine how many threads to use given the number of remaining cores
/// and number of total cores
@ -100,7 +100,7 @@ impl DefaultTaskPoolOptions {
let mut remaining_threads = total_threads;
if !resources.contains::<IOTaskPool>() {
if !resources.contains::<IoTaskPool>() {
// Determine the number of IO threads we will use
let io_threads = self
.io
@ -109,7 +109,7 @@ impl DefaultTaskPoolOptions {
log::trace!("IO Threads: {}", io_threads);
remaining_threads = remaining_threads.saturating_sub(io_threads);
resources.insert(IOTaskPool(
resources.insert(IoTaskPool(
TaskPoolBuilder::default()
.num_threads(io_threads)
.thread_name("IO Task Pool".to_string())

View file

@ -41,6 +41,12 @@ impl ParallelExecutor {
}
}
pub fn initialize(&mut self, resources: &mut Resources) {
if resources.get::<ComputeTaskPool>().is_none() {
resources.insert(ComputeTaskPool(TaskPool::default()));
}
}
pub fn run(&mut self, schedule: &mut Schedule, world: &mut World, resources: &mut Resources) {
let schedule_generation = schedule.generation();
let schedule_changed = schedule.generation() != self.last_schedule_generation;

View file

@ -15,7 +15,7 @@ mod single_threaded_task_pool;
pub use single_threaded_task_pool::{Scope, TaskPool, TaskPoolBuilder};
mod usages;
pub use usages::{AsyncComputeTaskPool, ComputeTaskPool, IOTaskPool};
pub use usages::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool};
mod countdown_event;
pub use countdown_event::CountdownEvent;
@ -27,7 +27,7 @@ pub mod prelude {
pub use crate::{
iter::ParallelIterator,
slice::{ParallelSlice, ParallelSliceMut},
usages::{AsyncComputeTaskPool, ComputeTaskPool, IOTaskPool},
usages::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool},
};
}

View file

@ -16,6 +16,11 @@ use std::{
pub struct Task<T>(async_executor::Task<T>);
impl<T> Task<T> {
/// Creates a new task from a given async_executor::Task
pub fn new(task: async_executor::Task<T>) -> Self {
Self(task)
}
/// Detaches the task to let it keep running in the background. See `async_executor::Task::detach`
pub fn detach(self) {
self.0.detach();

View file

@ -8,6 +8,8 @@ use std::{
use futures_lite::{future, pin};
use crate::Task;
/// Used to create a TaskPool
#[derive(Debug, Default, Clone)]
pub struct TaskPoolBuilder {
@ -197,14 +199,11 @@ impl TaskPool {
/// Spawns a static future onto the thread pool. The returned Task is a future. It can also be
/// cancelled and "detached" allowing it to continue running without having to be polled by the
/// end-user.
pub fn spawn<T>(
&self,
future: impl Future<Output = T> + Send + 'static,
) -> impl Future<Output = T> + Send
pub fn spawn<T>(&self, future: impl Future<Output = T> + Send + 'static) -> Task<T>
where
T: Send + 'static,
{
self.executor.spawn(future)
Task::new(self.executor.spawn(future))
}
}

View file

@ -41,9 +41,9 @@ impl Deref for AsyncComputeTaskPool {
/// A newtype for a task pool for IO-intensive work (i.e. tasks that spend very little time in a
/// "woken" state)
#[derive(Clone)]
pub struct IOTaskPool(pub TaskPool);
pub struct IoTaskPool(pub TaskPool);
impl Deref for IOTaskPool {
impl Deref for IoTaskPool {
type Target = TaskPool;
fn deref(&self) -> &Self::Target {

View file

@ -1,7 +1,6 @@
use crate::{CalculatedSize, Node};
use bevy_asset::{Assets, Handle};
use bevy_core::FloatOrd;
use bevy_ecs::{Changed, Local, Query, Res, ResMut};
use bevy_ecs::{Changed, Entity, Local, Query, Res, ResMut};
use bevy_math::Size;
use bevy_render::{
draw::{Draw, DrawContext, Drawable},
@ -12,11 +11,10 @@ use bevy_render::{
use bevy_sprite::TextureAtlas;
use bevy_text::{DrawableText, Font, FontAtlasSet, TextStyle};
use bevy_transform::prelude::GlobalTransform;
use bevy_utils::HashSet;
#[derive(Default)]
pub struct QueuedTextGlyphs {
glyphs: HashSet<(Handle<Font>, FloatOrd, char)>,
pub struct QueuedText {
entities: Vec<Entity>,
}
#[derive(Default, Clone)]
@ -27,43 +25,48 @@ pub struct Text {
}
pub fn text_system(
mut queued_text_glyphs: Local<QueuedTextGlyphs>,
mut queued_text: Local<QueuedText>,
mut textures: ResMut<Assets<Texture>>,
fonts: Res<Assets<Font>>,
mut font_atlas_sets: ResMut<Assets<FontAtlasSet>>,
mut texture_atlases: ResMut<Assets<TextureAtlas>>,
mut query: Query<(Changed<Text>, &mut CalculatedSize)>,
mut query: Query<(Entity, Changed<Text>, &mut CalculatedSize)>,
mut text_query: Query<(&Text, &mut CalculatedSize)>,
) {
// add queued glyphs to atlases
if !queued_text_glyphs.glyphs.is_empty() {
let mut glyphs_to_queue = Vec::new();
for (font_handle, FloatOrd(font_size), character) in queued_text_glyphs.glyphs.drain() {
let font_atlases = font_atlas_sets
.get_or_insert_with(Handle::from_id(font_handle.id), || {
FontAtlasSet::new(font_handle)
});
// try adding the glyph to an atlas. if it fails, re-queue
if let Ok(char_str) = std::str::from_utf8(&[character as u8]) {
if font_atlases
.add_glyphs_to_atlas(
&fonts,
&mut texture_atlases,
&mut textures,
font_size,
char_str,
)
.is_none()
{
glyphs_to_queue.push((font_handle, FloatOrd(font_size), character));
// add queued text to atlases
let mut new_queued_text = Vec::new();
for entity in queued_text.entities.drain(..) {
if let Ok(mut result) = text_query.entity(entity) {
if let Some((text, mut calculated_size)) = result.get() {
let font_atlases = font_atlas_sets
.get_or_insert_with(Handle::from_id(text.font.id), || {
FontAtlasSet::new(text.font)
});
// TODO: this call results in one or more TextureAtlases, whose render resources are created in the RENDER_GRAPH_SYSTEMS
// stage. That logic runs _before_ the DRAW stage, which means we cant call add_glyphs_to_atlas in the draw stage
// without our render resources being a frame behind. Therefore glyph atlasing either needs its own system or the TextureAtlas
// resource generation needs to happen AFTER the render graph systems. maybe draw systems should execute within the
// render graph so ordering like this can be taken into account? Maybe the RENDER_GRAPH_SYSTEMS stage should be removed entirely
// in favor of node.update()? Regardless, in the immediate short term the current approach is fine.
if let Some(width) = font_atlases.add_glyphs_to_atlas(
&fonts,
&mut texture_atlases,
&mut textures,
text.style.font_size,
&text.value,
) {
calculated_size.size = Size::new(width, text.style.font_size);
} else {
new_queued_text.push(entity);
}
}
}
queued_text_glyphs.glyphs.extend(glyphs_to_queue);
}
for (text, mut calculated_size) in &mut query.iter() {
queued_text.entities = new_queued_text;
// add changed text to atlases
for (entity, text, mut calculated_size) in &mut query.iter() {
let font_atlases = font_atlas_sets
.get_or_insert_with(Handle::from_id(text.font.id), || {
FontAtlasSet::new(text.font)
@ -83,13 +86,7 @@ pub fn text_system(
) {
calculated_size.size = Size::new(width, text.style.font_size);
} else {
for character in text.value.chars() {
queued_text_glyphs.glyphs.insert((
text.font,
FloatOrd(text.style.font_size),
character,
));
}
queued_text.entities.push(entity);
}
}
}

View file

@ -1,5 +1,4 @@
use bevy::prelude::*;
use bevy_app::DefaultTaskPoolOptions;
/// This example illustrates how to customize the thread pool used internally (e.g. to only use a
/// certain number of threads).