mirror of
https://github.com/bevyengine/bevy
synced 2024-11-22 12:43:34 +00:00
Make Task
s functional on WASM (#13889)
# Objective Right not bevy's task pool abstraction is kind of useless on wasm, since it returns a `FakeTask` which can't be interacted with. This is only good for fire-and-forget it tasks, and isn't even that useful since it's just a thin wrapper around `wasm-bindgen-futures::spawn_local` ## Solution Add a simple `Task<T>` handler type to wasm targets that allow waiting for a task's output or periodically checking for its completion. This PR aims to give the wasm version of these tasks feature parity with the native, multi-threaded version of the task ## Testing - Did you test these changes? *Not yet* --------- Co-authored-by: Periwink <charlesbour@gmail.com> Co-authored-by: Jan Hohenheim <jan@hohenheim.ch>
This commit is contained in:
parent
c3057d4353
commit
ee15be8549
4 changed files with 99 additions and 23 deletions
|
@ -20,6 +20,8 @@ concurrent-queue = { version = "2.0.0", optional = true }
|
||||||
|
|
||||||
[target.'cfg(target_arch = "wasm32")'.dependencies]
|
[target.'cfg(target_arch = "wasm32")'.dependencies]
|
||||||
wasm-bindgen-futures = "0.4"
|
wasm-bindgen-futures = "0.4"
|
||||||
|
pin-project = "1"
|
||||||
|
futures-channel = "0.3"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
web-time = { version = "1.1" }
|
web-time = { version = "1.1" }
|
||||||
|
|
|
@ -8,7 +8,9 @@
|
||||||
mod slice;
|
mod slice;
|
||||||
pub use slice::{ParallelSlice, ParallelSliceMut};
|
pub use slice::{ParallelSlice, ParallelSliceMut};
|
||||||
|
|
||||||
|
#[cfg_attr(target_arch = "wasm32", path = "wasm_task.rs")]
|
||||||
mod task;
|
mod task;
|
||||||
|
|
||||||
pub use task::Task;
|
pub use task::Task;
|
||||||
|
|
||||||
#[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
|
#[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
|
||||||
|
@ -19,7 +21,7 @@ pub use task_pool::{Scope, TaskPool, TaskPoolBuilder};
|
||||||
#[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]
|
#[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]
|
||||||
mod single_threaded_task_pool;
|
mod single_threaded_task_pool;
|
||||||
#[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]
|
#[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]
|
||||||
pub use single_threaded_task_pool::{FakeTask, Scope, TaskPool, TaskPoolBuilder, ThreadExecutor};
|
pub use single_threaded_task_pool::{Scope, TaskPool, TaskPoolBuilder, ThreadExecutor};
|
||||||
|
|
||||||
mod usages;
|
mod usages;
|
||||||
#[cfg(not(target_arch = "wasm32"))]
|
#[cfg(not(target_arch = "wasm32"))]
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::{cell::RefCell, future::Future, marker::PhantomData, mem, rc::Rc};
|
use std::{cell::RefCell, future::Future, marker::PhantomData, mem, rc::Rc};
|
||||||
|
|
||||||
|
use crate::Task;
|
||||||
|
|
||||||
thread_local! {
|
thread_local! {
|
||||||
static LOCAL_EXECUTOR: async_executor::LocalExecutor<'static> = const { async_executor::LocalExecutor::new() };
|
static LOCAL_EXECUTOR: async_executor::LocalExecutor<'static> = const { async_executor::LocalExecutor::new() };
|
||||||
}
|
}
|
||||||
|
@ -145,34 +147,33 @@ impl TaskPool {
|
||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Spawns a static future onto the thread pool. The returned Task is a future. It can also be
|
/// Spawns a static future onto the thread pool. The returned Task is a future, which can be polled
|
||||||
/// cancelled and "detached" allowing it to continue running without having to be polled by the
|
/// to retrieve the output of the original future. Dropping the task will attempt to cancel it.
|
||||||
|
/// It can also be "detached", allowing it to continue running without having to be polled by the
|
||||||
/// end-user.
|
/// end-user.
|
||||||
///
|
///
|
||||||
/// If the provided future is non-`Send`, [`TaskPool::spawn_local`] should be used instead.
|
/// If the provided future is non-`Send`, [`TaskPool::spawn_local`] should be used instead.
|
||||||
pub fn spawn<T>(&self, future: impl Future<Output = T> + 'static) -> FakeTask
|
pub fn spawn<T>(&self, future: impl Future<Output = T> + 'static) -> Task<T>
|
||||||
where
|
where
|
||||||
T: 'static,
|
T: 'static,
|
||||||
{
|
{
|
||||||
#[cfg(target_arch = "wasm32")]
|
#[cfg(target_arch = "wasm32")]
|
||||||
wasm_bindgen_futures::spawn_local(async move {
|
return Task::wrap_future(future);
|
||||||
future.await;
|
|
||||||
});
|
|
||||||
|
|
||||||
#[cfg(not(target_arch = "wasm32"))]
|
#[cfg(not(target_arch = "wasm32"))]
|
||||||
{
|
{
|
||||||
LOCAL_EXECUTOR.with(|executor| {
|
LOCAL_EXECUTOR.with(|executor| {
|
||||||
let _task = executor.spawn(future);
|
let task = executor.spawn(future);
|
||||||
// Loop until all tasks are done
|
// Loop until all tasks are done
|
||||||
while executor.try_tick() {}
|
while executor.try_tick() {}
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
FakeTask
|
Task::new(task)
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Spawns a static future on the JS event loop. This is exactly the same as [`TaskPool::spawn`].
|
/// Spawns a static future on the JS event loop. This is exactly the same as [`TaskPool::spawn`].
|
||||||
pub fn spawn_local<T>(&self, future: impl Future<Output = T> + 'static) -> FakeTask
|
pub fn spawn_local<T>(&self, future: impl Future<Output = T> + 'static) -> Task<T>
|
||||||
where
|
where
|
||||||
T: 'static,
|
T: 'static,
|
||||||
{
|
{
|
||||||
|
@ -198,17 +199,6 @@ impl TaskPool {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// An empty task used in single-threaded contexts.
|
|
||||||
///
|
|
||||||
/// This does nothing and is therefore safe, and recommended, to ignore.
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct FakeTask;
|
|
||||||
|
|
||||||
impl FakeTask {
|
|
||||||
/// No op on the single threaded task pool
|
|
||||||
pub fn detach(self) {}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A `TaskPool` scope for running one or more non-`'static` futures.
|
/// A `TaskPool` scope for running one or more non-`'static` futures.
|
||||||
///
|
///
|
||||||
/// For more information, see [`TaskPool::scope`].
|
/// For more information, see [`TaskPool::scope`].
|
||||||
|
|
82
crates/bevy_tasks/src/wasm_task.rs
Normal file
82
crates/bevy_tasks/src/wasm_task.rs
Normal file
|
@ -0,0 +1,82 @@
|
||||||
|
use std::{
|
||||||
|
any::Any,
|
||||||
|
future::{Future, IntoFuture},
|
||||||
|
panic::{AssertUnwindSafe, UnwindSafe},
|
||||||
|
pin::Pin,
|
||||||
|
task::Poll,
|
||||||
|
};
|
||||||
|
|
||||||
|
use futures_channel::oneshot;
|
||||||
|
|
||||||
|
/// Wraps an asynchronous task, a spawned future.
|
||||||
|
///
|
||||||
|
/// Tasks are also futures themselves and yield the output of the spawned future.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Task<T>(oneshot::Receiver<Result<T, Panic>>);
|
||||||
|
|
||||||
|
impl<T: 'static> Task<T> {
|
||||||
|
pub(crate) fn wrap_future(future: impl Future<Output = T> + 'static) -> Self {
|
||||||
|
let (sender, receiver) = oneshot::channel();
|
||||||
|
wasm_bindgen_futures::spawn_local(async move {
|
||||||
|
// Catch any panics that occur when polling the future so they can
|
||||||
|
// be propagated back to the task handle.
|
||||||
|
let value = CatchUnwind(AssertUnwindSafe(future)).await;
|
||||||
|
let _ = sender.send(value);
|
||||||
|
});
|
||||||
|
Self(receiver.into_future())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// When building for Wasm, this method has no effect.
|
||||||
|
/// This is only included for feature parity with other platforms.
|
||||||
|
pub fn detach(self) {}
|
||||||
|
|
||||||
|
/// Requests a task to be cancelled and returns a future that suspends until it completes.
|
||||||
|
/// Returns the output of the future if it has already completed.
|
||||||
|
///
|
||||||
|
/// # Implementation
|
||||||
|
///
|
||||||
|
/// When building for Wasm, it is not possible to cancel tasks, which means this is the same
|
||||||
|
/// as just awaiting the task. This method is only included for feature parity with other platforms.
|
||||||
|
pub async fn cancel(self) -> Option<T> {
|
||||||
|
match self.0.await {
|
||||||
|
Ok(Ok(value)) => Some(value),
|
||||||
|
Err(_) => None,
|
||||||
|
Ok(Err(panic)) => {
|
||||||
|
// drop this to prevent the panic payload from resuming the panic on drop.
|
||||||
|
// this also leaks the box but I'm not sure how to avoid that
|
||||||
|
std::mem::forget(panic);
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Future for Task<T> {
|
||||||
|
type Output = T;
|
||||||
|
fn poll(
|
||||||
|
mut self: std::pin::Pin<&mut Self>,
|
||||||
|
cx: &mut std::task::Context<'_>,
|
||||||
|
) -> std::task::Poll<Self::Output> {
|
||||||
|
match Pin::new(&mut self.0).poll(cx) {
|
||||||
|
Poll::Ready(Ok(Ok(value))) => Poll::Ready(value),
|
||||||
|
// NOTE: Propagating the panic here sorta has parity with the async_executor behavior.
|
||||||
|
// For those tasks, polling them after a panic returns a `None` which gets `unwrap`ed, so
|
||||||
|
// using `resume_unwind` here is essentially keeping the same behavior while adding more information.
|
||||||
|
Poll::Ready(Ok(Err(panic))) => std::panic::resume_unwind(panic),
|
||||||
|
Poll::Ready(Err(_)) => panic!("Polled a task after it was cancelled"),
|
||||||
|
Poll::Pending => Poll::Pending,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type Panic = Box<dyn Any + Send + 'static>;
|
||||||
|
|
||||||
|
#[pin_project::pin_project]
|
||||||
|
struct CatchUnwind<F: UnwindSafe>(#[pin] F);
|
||||||
|
|
||||||
|
impl<F: Future + UnwindSafe> Future for CatchUnwind<F> {
|
||||||
|
type Output = Result<F::Output, Panic>;
|
||||||
|
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context) -> Poll<Self::Output> {
|
||||||
|
std::panic::catch_unwind(AssertUnwindSafe(|| self.project().0.poll(cx)))?.map(Ok)
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue