feat: Scoped Futures (#1761)

This commit is contained in:
jquesada2016 2023-09-28 14:20:18 -05:00 committed by GitHub
parent 181bcadbe2
commit 609afce544
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 175 additions and 15 deletions

View file

@ -42,6 +42,7 @@ web-sys = { version = "0.3", optional = true, features = [
cfg-if = "1"
indexmap = "2"
self_cell = "1.0.0"
pin-project = "1"
[dev-dependencies]
criterion = { version = "0.5.1", features = ["html_reports"] }

View file

@ -114,8 +114,11 @@ pub use resource::*;
use runtime::*;
pub use runtime::{
as_child_of_current_owner, batch, create_runtime, current_runtime,
on_cleanup, run_as_child, set_current_runtime, untrack,
untrack_with_diagnostics, with_current_owner, with_owner, Owner, RuntimeId,
on_cleanup, run_as_child, set_current_runtime,
spawn_local_with_current_owner, spawn_local_with_owner,
try_spawn_local_with_current_owner, try_spawn_local_with_owner,
try_with_owner, untrack, untrack_with_diagnostics, with_current_owner,
with_owner, Owner, RuntimeId, ScopedFuture,
};
pub use selector::*;
pub use serialization::*;

View file

@ -13,6 +13,7 @@ use cfg_if::cfg_if;
use core::hash::BuildHasherDefault;
use futures::stream::FuturesUnordered;
use indexmap::IndexSet;
use pin_project::pin_project;
use rustc_hash::{FxHashMap, FxHasher};
use slotmap::{SecondaryMap, SlotMap, SparseSecondaryMap};
use std::{
@ -23,6 +24,7 @@ use std::{
marker::PhantomData,
pin::Pin,
rc::Rc,
task::Poll,
};
pub(crate) type PinnedFuture<T> = Pin<Box<dyn Future<Output = T>>>;
@ -817,25 +819,39 @@ where
///
/// ## Panics
/// Panics if there is no current reactive runtime.
pub fn with_owner<T>(owner: Owner, f: impl FnOnce() -> T + 'static) -> T
where
T: 'static,
{
pub fn with_owner<T>(owner: Owner, f: impl FnOnce() -> T) -> T {
try_with_owner(owner, f)
.expect("runtime/scope should be alive when with_owner runs")
}
/// Runs the given code with the given reactive owner.
pub fn try_with_owner<T>(owner: Owner, f: impl FnOnce() -> T) -> Option<T> {
with_runtime(|runtime| {
let prev_observer = runtime.observer.take();
let prev_owner = runtime.owner.take();
runtime
.nodes
.try_borrow()
.map(|nodes| nodes.contains_key(owner.0))
.map(|scope_exists| {
scope_exists.then(|| {
let prev_observer = runtime.observer.take();
let prev_owner = runtime.owner.take();
runtime.owner.set(Some(owner.0));
runtime.observer.set(Some(owner.0));
runtime.owner.set(Some(owner.0));
runtime.observer.set(Some(owner.0));
let v = f();
let v = f();
runtime.observer.set(prev_observer);
runtime.owner.set(prev_owner);
runtime.observer.set(prev_observer);
runtime.owner.set(prev_owner);
v
v
})
})
.ok()
.flatten()
})
.expect("runtime should be alive when with_owner runs")
.ok()
.flatten()
}
/// Runs the given function as a child of the current Owner, once.
@ -1469,3 +1485,143 @@ pub fn untrack<T>(f: impl FnOnce() -> T) -> T {
pub fn untrack_with_diagnostics<T>(f: impl FnOnce() -> T) -> T {
Runtime::current().untrack(f, true)
}
/// Allows running a future that has access to a given scope.
#[pin_project]
pub struct ScopedFuture<Fut: Future> {
owner: Owner,
#[pin]
future: Fut,
}
impl<Fut: Future + 'static> Future for ScopedFuture<Fut> {
type Output = Option<Fut::Output>;
fn poll(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Self::Output> {
// TODO: we need to think about how to make this
// not panic for scopes that have been cleaned up...
// or perhaps we can force the scope to not be cleaned
// up until all futures that have a handle to them are
// dropped...
let this = self.project();
if let Some(poll) = try_with_owner(*this.owner, || this.future.poll(cx))
{
match poll {
Poll::Ready(res) => Poll::Ready(Some(res)),
Poll::Pending => Poll::Pending,
}
} else {
Poll::Ready(None)
}
}
}
impl<Fut: Future> ScopedFuture<Fut> {
/// Creates a new future that will have access to the `[Owner]`'s
/// scope context.
pub fn new(owner: Owner, fut: Fut) -> Self {
Self { owner, future: fut }
}
/// Runs the future in the current [`Owner`]'s scope context.
///
/// # Panics
/// Panics if there is no current [`Owner`] context available.
#[track_caller]
pub fn new_current(fut: Fut) -> Self {
Self {
owner: Owner::current().expect(
"`ScopedFuture::new_current()` to be called within an `Owner` \
context",
),
future: fut,
}
}
}
/// Runs a future that has access to the provided [`Owner`]'s
/// scope context.
pub fn spawn_local_with_owner(
owner: Owner,
fut: impl Future<Output = ()> + 'static,
) {
let scoped_future = ScopedFuture::new(owner, fut);
crate::spawn_local(async move {
if scoped_future.await.is_none() {
// TODO: should we warn here?
// /* warning message */
}
});
}
/// Runs a future that has access to the provided [`Owner`]'s
/// scope context.
///
/// # Panics
/// Panics if there is no [`Owner`] context available.
#[track_caller]
pub fn spawn_local_with_current_owner(fut: impl Future<Output = ()> + 'static) {
let scoped_future = ScopedFuture::new_current(fut);
crate::spawn_local(async move {
if scoped_future.await.is_none() {
// TODO: should we warn here?
// /* warning message */
}
});
}
/// Runs a future that has access to the provided [`Owner`]'s
/// scope context.
///
/// Since futures run in the background, it is possible that
/// the scope has been cleaned up since the future started running.
/// If this happens, the future will not be completed.
///
/// The `on_cancelled` callback can be used to notify you that the
/// future was cancelled.
pub fn try_spawn_local_with_owner(
owner: Owner,
fut: impl Future<Output = ()> + 'static,
on_cancelled: impl FnOnce() + 'static,
) {
let scoped_future = ScopedFuture::new(owner, fut);
crate::spawn_local(async move {
if scoped_future.await.is_none() {
on_cancelled();
}
});
}
/// Runs a future that has access to the provided [`Owner`]'s
/// scope context.
///
/// Since futures run in the background, it is possible that
/// the scope has been cleaned up since the future started running.
/// If this happens, the future will not be completed.
///
/// The `on_cancelled` callback can be used to notify you that the
/// future was cancelled.
///
/// # Panics
/// Panics if there is no [`Owner`] context available.
#[track_caller]
pub fn try_spawn_local_with_current_owner(
fut: impl Future<Output = ()> + 'static,
on_cancelled: impl FnOnce() + 'static,
) {
let scoped_future = ScopedFuture::new_current(fut);
crate::spawn_local(async move {
if scoped_future.await.is_none() {
on_cancelled();
}
});
}