drop pending asset loads (#14808)

# Objective

when handles for loading assets are dropped, we currently wait until
load is completed before dropping the handle. drop asset-load tasks
immediately

## Solution

- track tasks for loading assets and drop them immediately when all
handles are dropped.
~~- use `join_all` in `gltf_loader.rs` to allow it to yield and be
dropped.~~

doesn't cover all the load apis - for those it doesn't cover the task
will still be detached and will still complete before the result is
discarded.

separated out from #13170
This commit is contained in:
robtfm 2024-08-27 01:16:44 +01:00 committed by GitHub
parent 6ddbf9771a
commit f06cd448db
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 103 additions and 73 deletions

View file

@ -5,6 +5,7 @@ use crate::{
UntypedAssetId, UntypedHandle, UntypedAssetId, UntypedHandle,
}; };
use bevy_ecs::world::World; use bevy_ecs::world::World;
use bevy_tasks::Task;
use bevy_utils::tracing::warn; use bevy_utils::tracing::warn;
use bevy_utils::{Entry, HashMap, HashSet, TypeIdMap}; use bevy_utils::{Entry, HashMap, HashSet, TypeIdMap};
use crossbeam_channel::Sender; use crossbeam_channel::Sender;
@ -76,6 +77,7 @@ pub(crate) struct AssetInfos {
pub(crate) dependency_loaded_event_sender: TypeIdMap<fn(&mut World, UntypedAssetId)>, pub(crate) dependency_loaded_event_sender: TypeIdMap<fn(&mut World, UntypedAssetId)>,
pub(crate) dependency_failed_event_sender: pub(crate) dependency_failed_event_sender:
TypeIdMap<fn(&mut World, UntypedAssetId, AssetPath<'static>, AssetLoadError)>, TypeIdMap<fn(&mut World, UntypedAssetId, AssetPath<'static>, AssetLoadError)>,
pub(crate) pending_tasks: HashMap<UntypedAssetId, Task<()>>,
} }
impl std::fmt::Debug for AssetInfos { impl std::fmt::Debug for AssetInfos {
@ -364,6 +366,7 @@ impl AssetInfos {
&mut self.path_to_id, &mut self.path_to_id,
&mut self.loader_dependants, &mut self.loader_dependants,
&mut self.living_labeled_assets, &mut self.living_labeled_assets,
&mut self.pending_tasks,
self.watching_for_changes, self.watching_for_changes,
id, id,
) )
@ -587,6 +590,11 @@ impl AssetInfos {
} }
pub(crate) fn process_asset_fail(&mut self, failed_id: UntypedAssetId, error: AssetLoadError) { pub(crate) fn process_asset_fail(&mut self, failed_id: UntypedAssetId, error: AssetLoadError) {
// Check whether the handle has been dropped since the asset was loaded.
if !self.infos.contains_key(&failed_id) {
return;
}
let (dependants_waiting_on_load, dependants_waiting_on_rec_load) = { let (dependants_waiting_on_load, dependants_waiting_on_rec_load) = {
let Some(info) = self.get_mut(failed_id) else { let Some(info) = self.get_mut(failed_id) else {
// The asset was already dropped. // The asset was already dropped.
@ -648,6 +656,7 @@ impl AssetInfos {
path_to_id: &mut HashMap<AssetPath<'static>, TypeIdMap<UntypedAssetId>>, path_to_id: &mut HashMap<AssetPath<'static>, TypeIdMap<UntypedAssetId>>,
loader_dependants: &mut HashMap<AssetPath<'static>, HashSet<AssetPath<'static>>>, loader_dependants: &mut HashMap<AssetPath<'static>, HashSet<AssetPath<'static>>>,
living_labeled_assets: &mut HashMap<AssetPath<'static>, HashSet<Box<str>>>, living_labeled_assets: &mut HashMap<AssetPath<'static>, HashSet<Box<str>>>,
pending_tasks: &mut HashMap<UntypedAssetId, Task<()>>,
watching_for_changes: bool, watching_for_changes: bool,
id: UntypedAssetId, id: UntypedAssetId,
) -> bool { ) -> bool {
@ -662,6 +671,8 @@ impl AssetInfos {
return false; return false;
} }
pending_tasks.remove(&id);
let type_id = entry.key().type_id(); let type_id = entry.key().type_id();
let info = entry.remove(); let info = entry.remove();
@ -704,6 +715,7 @@ impl AssetInfos {
&mut self.path_to_id, &mut self.path_to_id,
&mut self.loader_dependants, &mut self.loader_dependants,
&mut self.living_labeled_assets, &mut self.living_labeled_assets,
&mut self.pending_tasks,
self.watching_for_changes, self.watching_for_changes,
id.untyped(provider.type_id), id.untyped(provider.type_id),
); );

View file

@ -368,7 +368,8 @@ impl AssetServer {
guard: G, guard: G,
) -> Handle<A> { ) -> Handle<A> {
let path = path.into().into_owned(); let path = path.into().into_owned();
let (handle, should_load) = self.data.infos.write().get_or_create_path_handle::<A>( let mut infos = self.data.infos.write();
let (handle, should_load) = infos.get_or_create_path_handle::<A>(
path.clone(), path.clone(),
HandleLoadingMode::Request, HandleLoadingMode::Request,
meta_transform, meta_transform,
@ -377,14 +378,18 @@ impl AssetServer {
if should_load { if should_load {
let owned_handle = Some(handle.clone().untyped()); let owned_handle = Some(handle.clone().untyped());
let server = self.clone(); let server = self.clone();
IoTaskPool::get() let task = IoTaskPool::get().spawn(async move {
.spawn(async move {
if let Err(err) = server.load_internal(owned_handle, path, false, None).await { if let Err(err) = server.load_internal(owned_handle, path, false, None).await {
error!("{}", err); error!("{}", err);
} }
drop(guard); drop(guard);
}) });
.detach();
#[cfg(not(any(target_arch = "wasm32", not(feature = "multi_threaded"))))]
infos.pending_tasks.insert(handle.id().untyped(), task);
#[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]
task.detach();
} }
handle handle
@ -414,11 +419,8 @@ impl AssetServer {
CowArc::Owned(format!("{source}--{UNTYPED_SOURCE_SUFFIX}").into()) CowArc::Owned(format!("{source}--{UNTYPED_SOURCE_SUFFIX}").into())
} }
}); });
let (handle, should_load) = self let mut infos = self.data.infos.write();
.data let (handle, should_load) = infos.get_or_create_path_handle::<LoadedUntypedAsset>(
.infos
.write()
.get_or_create_path_handle::<LoadedUntypedAsset>(
path.clone().with_source(untyped_source), path.clone().with_source(untyped_source),
HandleLoadingMode::Request, HandleLoadingMode::Request,
meta_transform, meta_transform,
@ -427,12 +429,12 @@ impl AssetServer {
return handle; return handle;
} }
let id = handle.id().untyped(); let id = handle.id().untyped();
let owned_handle = Some(handle.clone().untyped());
let server = self.clone(); let server = self.clone();
IoTaskPool::get() let task = IoTaskPool::get().spawn(async move {
.spawn(async move {
let path_clone = path.clone(); let path_clone = path.clone();
match server.load_untyped_async(path).await { match server.load_internal(owned_handle, path, false, None).await {
Ok(handle) => server.send_asset_event(InternalAssetEvent::Loaded { Ok(handle) => server.send_asset_event(InternalAssetEvent::Loaded {
id, id,
loaded_asset: LoadedAsset::new_with_dependencies( loaded_asset: LoadedAsset::new_with_dependencies(
@ -450,8 +452,14 @@ impl AssetServer {
}); });
} }
} }
}) });
.detach();
#[cfg(not(any(target_arch = "wasm32", not(feature = "multi_threaded"))))]
infos.pending_tasks.insert(handle.id().untyped(), task);
#[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]
task.detach();
handle handle
} }
@ -488,7 +496,7 @@ impl AssetServer {
/// avoid looking up `should_load` twice, but it means you _must_ be sure a load is necessary when calling this function with [`Some`]. /// avoid looking up `should_load` twice, but it means you _must_ be sure a load is necessary when calling this function with [`Some`].
async fn load_internal<'a>( async fn load_internal<'a>(
&self, &self,
input_handle: Option<UntypedHandle>, mut input_handle: Option<UntypedHandle>,
path: AssetPath<'a>, path: AssetPath<'a>,
force: bool, force: bool,
meta_transform: Option<MetaTransform>, meta_transform: Option<MetaTransform>,
@ -512,6 +520,13 @@ impl AssetServer {
} }
})?; })?;
if let Some(meta_transform) = input_handle.as_ref().and_then(|h| h.meta_transform()) {
(*meta_transform)(&mut *meta);
}
// downgrade the input handle so we don't keep the asset alive just because we're loading it
// note we can't just pass a weak handle in, as only strong handles contain the asset meta transform
input_handle = input_handle.map(|h| h.clone_weak());
// This contains Some(UntypedHandle), if it was retrievable // This contains Some(UntypedHandle), if it was retrievable
// If it is None, that is because it was _not_ retrievable, due to // If it is None, that is because it was _not_ retrievable, due to
// 1. The handle was not already passed in for this path, meaning we can't just use that // 1. The handle was not already passed in for this path, meaning we can't just use that
@ -580,10 +595,6 @@ impl AssetServer {
(handle.clone().unwrap(), path.clone()) (handle.clone().unwrap(), path.clone())
}; };
if let Some(meta_transform) = base_handle.meta_transform() {
(*meta_transform)(&mut *meta);
}
match self match self
.load_with_meta_loader_and_reader(&base_path, meta, &*loader, &mut *reader, true, false) .load_with_meta_loader_and_reader(&base_path, meta, &*loader, &mut *reader, true, false)
.await .await
@ -721,17 +732,14 @@ impl AssetServer {
&self, &self,
future: impl Future<Output = Result<A, E>> + Send + 'static, future: impl Future<Output = Result<A, E>> + Send + 'static,
) -> Handle<A> { ) -> Handle<A> {
let handle = self let mut infos = self.data.infos.write();
.data let handle =
.infos infos.create_loading_handle_untyped(TypeId::of::<A>(), std::any::type_name::<A>());
.write()
.create_loading_handle_untyped(TypeId::of::<A>(), std::any::type_name::<A>());
let id = handle.id(); let id = handle.id();
let event_sender = self.data.asset_event_sender.clone(); let event_sender = self.data.asset_event_sender.clone();
IoTaskPool::get() let task = IoTaskPool::get().spawn(async move {
.spawn(async move {
match future.await { match future.await {
Ok(asset) => { Ok(asset) => {
let loaded_asset = LoadedAsset::new_with_dependencies(asset, None).into(); let loaded_asset = LoadedAsset::new_with_dependencies(asset, None).into();
@ -753,8 +761,13 @@ impl AssetServer {
.unwrap(); .unwrap();
} }
} }
}) });
.detach();
#[cfg(not(any(target_arch = "wasm32", not(feature = "multi_threaded"))))]
infos.pending_tasks.insert(id, task);
#[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]
task.detach();
handle.typed_debug_checked() handle.typed_debug_checked()
} }
@ -1312,6 +1325,11 @@ pub fn handle_internal_asset_events(world: &mut World) {
info!("Reloading {path} because it has changed"); info!("Reloading {path} because it has changed");
server.reload(path); server.reload(path);
} }
#[cfg(not(any(target_arch = "wasm32", not(feature = "multi_threaded"))))]
infos
.pending_tasks
.retain(|_, load_task| !load_task.is_finished());
}); });
} }